| // Copyright 2021 Google LLC |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package pubsub |
| |
| import ( |
| "context" |
| "fmt" |
| "time" |
| |
| "google.golang.org/api/option" |
| |
| vkit "cloud.google.com/go/pubsub/apiv1" |
| pb "cloud.google.com/go/pubsub/apiv1/pubsubpb" |
| ) |
| |
| // SchemaClient is a Pub/Sub schema client scoped to a single project. |
| type SchemaClient struct { |
| sc *vkit.SchemaClient |
| projectID string |
| } |
| |
| // Close closes the schema client and frees up resources. |
| func (s *SchemaClient) Close() error { |
| return s.sc.Close() |
| } |
| |
| // NewSchemaClient creates a new Pub/Sub Schema client. |
| func NewSchemaClient(ctx context.Context, projectID string, opts ...option.ClientOption) (*SchemaClient, error) { |
| sc, err := vkit.NewSchemaClient(ctx, opts...) |
| if err != nil { |
| return nil, err |
| } |
| return &SchemaClient{sc: sc, projectID: projectID}, nil |
| } |
| |
| // SchemaConfig is a reference to a PubSub schema. |
| type SchemaConfig struct { |
| // Name of the schema. |
| // Format is `projects/{project}/schemas/{schema}` |
| Name string |
| |
| // The type of the schema definition. |
| Type SchemaType |
| |
| // The definition of the schema. This should contain a string representing |
| // the full definition of the schema that is a valid schema definition of |
| // the type specified in `type`. |
| Definition string |
| |
| // RevisionID is the revision ID of the schema. |
| // This field is output only. |
| RevisionID string |
| |
| // RevisionCreateTime is the timestamp that the revision was created. |
| // This field is output only. |
| RevisionCreateTime time.Time |
| } |
| |
| // SchemaType is the possible schema definition types. |
| type SchemaType pb.Schema_Type |
| |
| const ( |
| // SchemaTypeUnspecified is the unused default value. |
| SchemaTypeUnspecified SchemaType = 0 |
| // SchemaProtocolBuffer is a protobuf schema definition. |
| SchemaProtocolBuffer SchemaType = 1 |
| // SchemaAvro is an Avro schema definition. |
| SchemaAvro SchemaType = 2 |
| ) |
| |
| // SchemaView is a view of Schema object fields to be returned |
| // by GetSchema and ListSchemas. |
| type SchemaView pb.SchemaView |
| |
| const ( |
| // SchemaViewUnspecified is the default/unset value. |
| SchemaViewUnspecified SchemaView = 0 |
| // SchemaViewBasic includes the name and type of the schema, but not the definition. |
| SchemaViewBasic SchemaView = 1 |
| // SchemaViewFull includes all Schema object fields. |
| SchemaViewFull SchemaView = 2 |
| ) |
| |
| // SchemaSettings are settings for validating messages |
| // published against a schema. |
| type SchemaSettings struct { |
| // The name of the schema that messages published should be |
| // validated against. Format is `projects/{project}/schemas/{schema}` |
| Schema string |
| |
| // The encoding of messages validated against the schema. |
| Encoding SchemaEncoding |
| |
| // The minimum (inclusive) revision allowed for validating messages. If empty |
| // or not present, allow any revision to be validated against LastRevisionID or |
| // any revision created before. |
| FirstRevisionID string |
| |
| // The maximum (inclusive) revision allowed for validating messages. If empty |
| // or not present, allow any revision to be validated against FirstRevisionID |
| // or any revision created after. |
| LastRevisionID string |
| } |
| |
| func schemaSettingsToProto(schema *SchemaSettings) *pb.SchemaSettings { |
| if schema == nil { |
| return nil |
| } |
| return &pb.SchemaSettings{ |
| Schema: schema.Schema, |
| Encoding: pb.Encoding(schema.Encoding), |
| FirstRevisionId: schema.FirstRevisionID, |
| LastRevisionId: schema.LastRevisionID, |
| } |
| } |
| |
| func protoToSchemaSettings(pbs *pb.SchemaSettings) *SchemaSettings { |
| if pbs == nil { |
| return nil |
| } |
| return &SchemaSettings{ |
| Schema: pbs.Schema, |
| Encoding: SchemaEncoding(pbs.Encoding), |
| FirstRevisionID: pbs.FirstRevisionId, |
| LastRevisionID: pbs.LastRevisionId, |
| } |
| } |
| |
| // SchemaEncoding is the encoding expected for messages. |
| type SchemaEncoding pb.Encoding |
| |
| const ( |
| // EncodingUnspecified is the default unused value. |
| EncodingUnspecified SchemaEncoding = 0 |
| // EncodingJSON is the JSON encoding type for a message. |
| EncodingJSON SchemaEncoding = 1 |
| // EncodingBinary is the binary encoding type for a message. |
| // For some schema types, binary encoding may not be available. |
| EncodingBinary SchemaEncoding = 2 |
| ) |
| |
| func (s *SchemaConfig) toProto() *pb.Schema { |
| pbs := &pb.Schema{ |
| Name: s.Name, |
| Type: pb.Schema_Type(s.Type), |
| Definition: s.Definition, |
| } |
| return pbs |
| } |
| |
| func protoToSchemaConfig(pbs *pb.Schema) *SchemaConfig { |
| return &SchemaConfig{ |
| Name: pbs.Name, |
| Type: SchemaType(pbs.Type), |
| Definition: pbs.Definition, |
| RevisionID: pbs.RevisionId, |
| RevisionCreateTime: pbs.RevisionCreateTime.AsTime(), |
| } |
| } |
| |
| // CreateSchema creates a new schema with the given schemaID |
| // and config. Schemas cannot be updated after creation. |
| func (c *SchemaClient) CreateSchema(ctx context.Context, schemaID string, s SchemaConfig) (*SchemaConfig, error) { |
| req := &pb.CreateSchemaRequest{ |
| Parent: fmt.Sprintf("projects/%s", c.projectID), |
| Schema: s.toProto(), |
| SchemaId: schemaID, |
| } |
| pbs, err := c.sc.CreateSchema(ctx, req) |
| if err != nil { |
| return nil, err |
| } |
| return protoToSchemaConfig(pbs), nil |
| } |
| |
| // Schema retrieves the configuration of a schema given a schemaID and a view. |
| func (c *SchemaClient) Schema(ctx context.Context, schemaID string, view SchemaView) (*SchemaConfig, error) { |
| schemaPath := fmt.Sprintf("projects/%s/schemas/%s", c.projectID, schemaID) |
| req := &pb.GetSchemaRequest{ |
| Name: schemaPath, |
| View: pb.SchemaView(view), |
| } |
| s, err := c.sc.GetSchema(ctx, req) |
| if err != nil { |
| return nil, err |
| } |
| return protoToSchemaConfig(s), nil |
| } |
| |
| // Schemas returns an iterator which returns all of the schemas for the client's project. |
| func (c *SchemaClient) Schemas(ctx context.Context, view SchemaView) *SchemaIterator { |
| return &SchemaIterator{ |
| it: c.sc.ListSchemas(ctx, &pb.ListSchemasRequest{ |
| Parent: fmt.Sprintf("projects/%s", c.projectID), |
| View: pb.SchemaView(view), |
| }), |
| } |
| } |
| |
| // SchemaIterator is a struct used to iterate over schemas. |
| type SchemaIterator struct { |
| it *vkit.SchemaIterator |
| err error |
| } |
| |
| // Next returns the next schema. If there are no more schemas, iterator.Done will be returned. |
| func (s *SchemaIterator) Next() (*SchemaConfig, error) { |
| if s.err != nil { |
| return nil, s.err |
| } |
| pbs, err := s.it.Next() |
| if err != nil { |
| return nil, err |
| } |
| return protoToSchemaConfig(pbs), nil |
| } |
| |
| // ListSchemaRevisions lists all schema revisions for the named schema. |
| func (c *SchemaClient) ListSchemaRevisions(ctx context.Context, schemaID string, view SchemaView) *SchemaIterator { |
| return &SchemaIterator{ |
| it: c.sc.ListSchemaRevisions(ctx, &pb.ListSchemaRevisionsRequest{ |
| Name: fmt.Sprintf("projects/%s/schemas/%s", c.projectID, schemaID), |
| View: pb.SchemaView(view), |
| }), |
| } |
| } |
| |
| // CommitSchema commits a new schema revision to an existing schema. |
| func (c *SchemaClient) CommitSchema(ctx context.Context, schemaID string, s SchemaConfig) (*SchemaConfig, error) { |
| req := &pb.CommitSchemaRequest{ |
| Name: fmt.Sprintf("projects/%s/schemas/%s", c.projectID, schemaID), |
| Schema: s.toProto(), |
| } |
| pbs, err := c.sc.CommitSchema(ctx, req) |
| if err != nil { |
| return nil, err |
| } |
| return protoToSchemaConfig(pbs), nil |
| } |
| |
| // RollbackSchema creates a new schema revision that is a copy of the provided revision. |
| func (c *SchemaClient) RollbackSchema(ctx context.Context, schemaID, revisionID string) (*SchemaConfig, error) { |
| req := &pb.RollbackSchemaRequest{ |
| Name: fmt.Sprintf("projects/%s/schemas/%s", c.projectID, schemaID), |
| RevisionId: revisionID, |
| } |
| pbs, err := c.sc.RollbackSchema(ctx, req) |
| if err != nil { |
| return nil, err |
| } |
| return protoToSchemaConfig(pbs), nil |
| } |
| |
| // DeleteSchemaRevision deletes a specific schema revision. |
| func (c *SchemaClient) DeleteSchemaRevision(ctx context.Context, schemaID, revisionID string) (*SchemaConfig, error) { |
| schemaPath := fmt.Sprintf("projects/%s/schemas/%s@%s", c.projectID, schemaID, revisionID) |
| schema, err := c.sc.DeleteSchemaRevision(ctx, &pb.DeleteSchemaRevisionRequest{ |
| Name: schemaPath, |
| }) |
| if err != nil { |
| return nil, err |
| } |
| return protoToSchemaConfig(schema), nil |
| } |
| |
| // DeleteSchema deletes an existing schema given a schema ID. |
| func (c *SchemaClient) DeleteSchema(ctx context.Context, schemaID string) error { |
| schemaPath := fmt.Sprintf("projects/%s/schemas/%s", c.projectID, schemaID) |
| return c.sc.DeleteSchema(ctx, &pb.DeleteSchemaRequest{ |
| Name: schemaPath, |
| }) |
| } |
| |
| // ValidateSchemaResult is the response for the ValidateSchema method. |
| // Reserved for future use. |
| type ValidateSchemaResult struct{} |
| |
| // ValidateSchema validates a schema config and returns an error if invalid. |
| func (c *SchemaClient) ValidateSchema(ctx context.Context, schema SchemaConfig) (*ValidateSchemaResult, error) { |
| req := &pb.ValidateSchemaRequest{ |
| Parent: fmt.Sprintf("projects/%s", c.projectID), |
| Schema: schema.toProto(), |
| } |
| _, err := c.sc.ValidateSchema(ctx, req) |
| if err != nil { |
| return nil, err |
| } |
| return &ValidateSchemaResult{}, nil |
| } |
| |
| // ValidateMessageResult is the response for the ValidateMessage method. |
| // Reserved for future use. |
| type ValidateMessageResult struct{} |
| |
| // ValidateMessageWithConfig validates a message against an schema specified |
| // by a schema config. |
| func (c *SchemaClient) ValidateMessageWithConfig(ctx context.Context, msg []byte, encoding SchemaEncoding, config SchemaConfig) (*ValidateMessageResult, error) { |
| req := &pb.ValidateMessageRequest{ |
| Parent: fmt.Sprintf("projects/%s", c.projectID), |
| SchemaSpec: &pb.ValidateMessageRequest_Schema{ |
| Schema: config.toProto(), |
| }, |
| Message: msg, |
| Encoding: pb.Encoding(encoding), |
| } |
| _, err := c.sc.ValidateMessage(ctx, req) |
| if err != nil { |
| return nil, err |
| } |
| return &ValidateMessageResult{}, nil |
| } |
| |
| // ValidateMessageWithID validates a message against an schema specified |
| // by the schema ID of an existing schema. |
| func (c *SchemaClient) ValidateMessageWithID(ctx context.Context, msg []byte, encoding SchemaEncoding, schemaID string) (*ValidateMessageResult, error) { |
| req := &pb.ValidateMessageRequest{ |
| Parent: fmt.Sprintf("projects/%s", c.projectID), |
| SchemaSpec: &pb.ValidateMessageRequest_Name{ |
| Name: fmt.Sprintf("projects/%s/schemas/%s", c.projectID, schemaID), |
| }, |
| Message: msg, |
| Encoding: pb.Encoding(encoding), |
| } |
| _, err := c.sc.ValidateMessage(ctx, req) |
| if err != nil { |
| return nil, err |
| } |
| return &ValidateMessageResult{}, nil |
| } |