| // Copyright 2022 Google LLC |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package firestore |
| |
| import ( |
| "context" |
| "errors" |
| "fmt" |
| "sync" |
| "time" |
| |
| vkit "cloud.google.com/go/firestore/apiv1" |
| pb "cloud.google.com/go/firestore/apiv1/firestorepb" |
| "golang.org/x/time/rate" |
| "google.golang.org/api/support/bundler" |
| "google.golang.org/grpc/codes" |
| "google.golang.org/grpc/status" |
| ) |
| |
| const ( |
| // maxBatchSize is the max number of writes to send in a request |
| maxBatchSize = 20 |
| // maxRetryAttempts is the max number of times to retry a write |
| maxRetryAttempts = 10 |
| // defaultStartingMaximumOpsPerSecond is the starting max number of requests to the service per second |
| defaultStartingMaximumOpsPerSecond = 500 |
| // maxWritesPerSecond is the starting limit of writes allowed to callers per second |
| maxWritesPerSecond = maxBatchSize * defaultStartingMaximumOpsPerSecond |
| ) |
| |
| // bulkWriterResult contains the WriteResult or error results from an individual |
| // write to the database. |
| type bulkWriterResult struct { |
| result *pb.WriteResult // (cached) result from the operation |
| err error // (cached) any errors that occurred |
| } |
| |
| // BulkWriterJob provides read-only access to the results of a BulkWriter write attempt. |
| type BulkWriterJob struct { |
| resultChan chan bulkWriterResult // send errors and results to this channel |
| write *pb.Write // the writes to apply to the database |
| attempts int // number of times this write has been attempted |
| resultsLock sync.Mutex // guards the cached wr and e values for the job |
| result *WriteResult // (cached) result from the operation |
| err error // (cached) any errors that occurred |
| ctx context.Context // context for canceling/timing out results |
| } |
| |
| // Results gets the results of the BulkWriter write attempt. |
| // This method blocks if the results for this BulkWriterJob haven't been |
| // received. |
| func (j *BulkWriterJob) Results() (*WriteResult, error) { |
| j.resultsLock.Lock() |
| defer j.resultsLock.Unlock() |
| if j.result == nil && j.err == nil { |
| j.result, j.err = j.processResults() // cache the results for additional calls |
| } |
| return j.result, j.err |
| } |
| |
| // processResults checks for errors returned from send() and packages up the |
| // results as WriteResult objects |
| func (j *BulkWriterJob) processResults() (*WriteResult, error) { |
| select { |
| case <-j.ctx.Done(): |
| return nil, j.ctx.Err() |
| case bwr := <-j.resultChan: |
| if bwr.err != nil { |
| return nil, bwr.err |
| } |
| return writeResultFromProto(bwr.result) |
| } |
| } |
| |
| // setError ensures that an error is returned on the error channel of BulkWriterJob. |
| func (j *BulkWriterJob) setError(e error) { |
| bwr := bulkWriterResult{ |
| err: e, |
| result: nil, |
| } |
| j.resultChan <- bwr |
| close(j.resultChan) |
| } |
| |
| // A BulkWriter supports concurrent writes to multiple documents. The BulkWriter |
| // submits document writes in maximum batches of 20 writes per request. Each |
| // request can contain many different document writes: create, delete, update, |
| // and set are all supported. |
| // |
| // Only one operation (create, set, update, delete) per document is allowed. |
| // BulkWriter cannot promise atomicity: individual writes can fail or succeed |
| // independent of each other. Bulkwriter does not apply writes in any set order; |
| // thus a document can't have set on it immediately after creation. |
| type BulkWriter struct { |
| database string // the database as resource name: projects/[PROJECT]/databases/[DATABASE] |
| start time.Time // when this BulkWriter was started; used to calculate qps and rate increases |
| vc *vkit.Client // internal client |
| maxOpsPerSecond int // number of requests that can be sent per second |
| docUpdatePaths map[string]bool // document paths with corresponding writes in the queue |
| limiter rate.Limiter // limit requests to server to <= 500 qps |
| bundler *bundler.Bundler // handle bundling up writes to Firestore |
| ctx context.Context // context for canceling all BulkWriter operations |
| isOpenLock sync.RWMutex // guards against setting isOpen concurrently |
| isOpen bool // flag that the BulkWriter is closed |
| } |
| |
| // newBulkWriter creates a new instance of the BulkWriter. |
| func newBulkWriter(ctx context.Context, c *Client, database string) *BulkWriter { |
| // Although typically we shouldn't store Context objects, in this case we |
| // need to pass this Context through to the Bundler handler. |
| ctx = withResourceHeader(ctx, c.path()) |
| |
| bw := &BulkWriter{ |
| database: database, |
| start: time.Now(), |
| vc: c.c, |
| isOpen: true, |
| maxOpsPerSecond: defaultStartingMaximumOpsPerSecond, |
| docUpdatePaths: make(map[string]bool), |
| ctx: ctx, |
| limiter: *rate.NewLimiter(rate.Limit(maxWritesPerSecond), 1), |
| } |
| |
| // can't initialize within struct above; need instance reference to BulkWriter.send() |
| bw.bundler = bundler.NewBundler(&BulkWriterJob{}, bw.send) |
| bw.bundler.HandlerLimit = bw.maxOpsPerSecond |
| bw.bundler.BundleCountThreshold = maxBatchSize |
| |
| return bw |
| } |
| |
| // End sends all enqueued writes in parallel and closes the BulkWriter to new requests. |
| // After calling End(), calling any additional method automatically returns |
| // with an error. This method completes when there are no more pending writes |
| // in the queue. |
| func (bw *BulkWriter) End() { |
| bw.isOpenLock.Lock() |
| bw.isOpen = false |
| bw.isOpenLock.Unlock() |
| bw.Flush() |
| } |
| |
| // Flush commits all writes that have been enqueued up to this point in parallel. |
| // This method blocks execution. |
| func (bw *BulkWriter) Flush() { |
| bw.bundler.Flush() |
| } |
| |
| // Create adds a document creation write to the queue of writes to send. |
| // Note: You cannot write to (Create, Update, Set, or Delete) the same document more than once. |
| func (bw *BulkWriter) Create(doc *DocumentRef, datum interface{}) (*BulkWriterJob, error) { |
| bw.isOpenLock.RLock() |
| defer bw.isOpenLock.RUnlock() |
| err := bw.checkWriteConditions(doc) |
| if err != nil { |
| return nil, err |
| } |
| |
| w, err := doc.newCreateWrites(datum) |
| if err != nil { |
| return nil, fmt.Errorf("firestore: cannot create %v with %v", doc.ID, datum) |
| } |
| |
| if len(w) > 1 { |
| return nil, fmt.Errorf("firestore: too many document writes sent to bulkwriter") |
| } |
| |
| j := bw.write(w[0]) |
| return j, nil |
| } |
| |
| // Delete adds a document deletion write to the queue of writes to send. |
| // Note: You cannot write to (Create, Update, Set, or Delete) the same document more than once. |
| func (bw *BulkWriter) Delete(doc *DocumentRef, preconds ...Precondition) (*BulkWriterJob, error) { |
| bw.isOpenLock.RLock() |
| defer bw.isOpenLock.RUnlock() |
| err := bw.checkWriteConditions(doc) |
| if err != nil { |
| return nil, err |
| } |
| |
| w, err := doc.newDeleteWrites(preconds) |
| if err != nil { |
| return nil, fmt.Errorf("firestore: cannot delete doc %v", doc.ID) |
| } |
| |
| if len(w) > 1 { |
| return nil, fmt.Errorf("firestore: too many document writes sent to bulkwriter") |
| } |
| |
| j := bw.write(w[0]) |
| return j, nil |
| } |
| |
| // Set adds a document set write to the queue of writes to send. |
| // Note: You cannot write to (Create, Update, Set, or Delete) the same document more than once. |
| func (bw *BulkWriter) Set(doc *DocumentRef, datum interface{}, opts ...SetOption) (*BulkWriterJob, error) { |
| bw.isOpenLock.RLock() |
| defer bw.isOpenLock.RUnlock() |
| err := bw.checkWriteConditions(doc) |
| if err != nil { |
| return nil, err |
| } |
| |
| w, err := doc.newSetWrites(datum, opts) |
| if err != nil { |
| return nil, fmt.Errorf("firestore: cannot set %v on doc %v", datum, doc.ID) |
| } |
| |
| if len(w) > 1 { |
| return nil, fmt.Errorf("firestore: too many writes sent to bulkwriter") |
| } |
| |
| j := bw.write(w[0]) |
| return j, nil |
| } |
| |
| // Update adds a document update write to the queue of writes to send. |
| // Note: You cannot write to (Create, Update, Set, or Delete) the same document more than once. |
| func (bw *BulkWriter) Update(doc *DocumentRef, updates []Update, preconds ...Precondition) (*BulkWriterJob, error) { |
| bw.isOpenLock.RLock() |
| defer bw.isOpenLock.RUnlock() |
| err := bw.checkWriteConditions(doc) |
| if err != nil { |
| return nil, err |
| } |
| |
| w, err := doc.newUpdatePathWrites(updates, preconds) |
| if err != nil { |
| return nil, fmt.Errorf("firestore: cannot update doc %v", doc.ID) |
| } |
| |
| if len(w) > 1 { |
| return nil, fmt.Errorf("firestore: too many writes sent to bulkwriter") |
| } |
| |
| j := bw.write(w[0]) |
| return j, nil |
| } |
| |
| // checkConditions determines whether this write attempt is valid. It returns |
| // an error if either the BulkWriter has already been closed or if it |
| // receives a nil document reference. |
| func (bw *BulkWriter) checkWriteConditions(doc *DocumentRef) error { |
| if !bw.isOpen { |
| return errors.New("firestore: BulkWriter has been closed") |
| } |
| |
| if doc == nil { |
| return errors.New("firestore: nil document contents") |
| } |
| |
| _, havePath := bw.docUpdatePaths[doc.shortPath] |
| if havePath { |
| return fmt.Errorf("firestore: BulkWriter received duplicate write for path: %v", doc.shortPath) |
| } |
| |
| bw.docUpdatePaths[doc.shortPath] = true |
| |
| return nil |
| } |
| |
| // write packages up write requests into bulkWriterJob objects. |
| func (bw *BulkWriter) write(w *pb.Write) *BulkWriterJob { |
| |
| j := &BulkWriterJob{ |
| resultChan: make(chan bulkWriterResult, 1), |
| write: w, |
| ctx: bw.ctx, |
| } |
| |
| bw.limiter.Wait(bw.ctx) |
| // ignore operation size constraints and related errors; can't be inferred at compile time |
| // Bundler is set to accept an unlimited amount of bytes |
| _ = bw.bundler.Add(j, 0) |
| |
| return j |
| } |
| |
| // send transmits writes to the service and matches response results to job channels. |
| func (bw *BulkWriter) send(i interface{}) { |
| bwj := i.([]*BulkWriterJob) |
| |
| if len(bwj) == 0 { |
| return |
| } |
| |
| var ws []*pb.Write |
| for _, w := range bwj { |
| ws = append(ws, w.write) |
| } |
| |
| bwr := &pb.BatchWriteRequest{ |
| Database: bw.database, |
| Writes: ws, |
| Labels: map[string]string{}, |
| } |
| |
| select { |
| case <-bw.ctx.Done(): |
| return |
| default: |
| resp, err := bw.vc.BatchWrite(bw.ctx, bwr) |
| if err != nil { |
| // Do we need to be selective about what kind of errors we send? |
| for _, j := range bwj { |
| j.setError(err) |
| } |
| return |
| } |
| // Match write results with BulkWriterJob objects |
| for i, res := range resp.WriteResults { |
| s := resp.Status[i] |
| c := s.GetCode() |
| if c != 0 { // Should we do an explicit check against rpc.Code enum? |
| j := bwj[i] |
| j.attempts++ |
| |
| // Do we need separate retry bundler? |
| if j.attempts < maxRetryAttempts { |
| // ignore operation size constraints and related errors; job size can't be inferred at compile time |
| // Bundler is set to accept an unlimited amount of bytes |
| _ = bw.bundler.Add(j, 0) |
| } else { |
| j.setError(status.Error(codes.Code(s.Code), s.Message)) |
| } |
| continue |
| } |
| |
| bwj[i].resultChan <- bulkWriterResult{err: nil, result: res} |
| close(bwj[i].resultChan) |
| } |
| } |
| } |