blob: f52c2f0c078667bd3d5557d4fb823d27b223da7d [file] [log] [blame]
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package firestore
import (
"context"
"errors"
"fmt"
"sync"
"time"
vkit "cloud.google.com/go/firestore/apiv1"
pb "cloud.google.com/go/firestore/apiv1/firestorepb"
"golang.org/x/time/rate"
"google.golang.org/api/support/bundler"
)
const (
// maxBatchSize is the max number of writes to send in a request
maxBatchSize = 20
// maxRetryAttempts is the max number of times to retry a write
maxRetryAttempts = 10
// defaultStartingMaximumOpsPerSecond is the starting max number of requests to the service per second
defaultStartingMaximumOpsPerSecond = 500
// maxWritesPerSecond is the starting limit of writes allowed to callers per second
maxWritesPerSecond = maxBatchSize * defaultStartingMaximumOpsPerSecond
)
// bulkWriterResult contains the WriteResult or error results from an individual
// write to the database.
type bulkWriterResult struct {
result *pb.WriteResult // (cached) result from the operation
err error // (cached) any errors that occurred
}
// BulkWriterJob provides read-only access to the results of a BulkWriter write attempt.
type BulkWriterJob struct {
resultChan chan bulkWriterResult // send errors and results to this channel
write *pb.Write // the writes to apply to the database
attempts int // number of times this write has been attempted
resultsLock sync.Mutex // guards the cached wr and e values for the job
result *WriteResult // (cached) result from the operation
err error // (cached) any errors that occurred
ctx context.Context // context for canceling/timing out results
}
// Results gets the results of the BulkWriter write attempt.
// This method blocks if the results for this BulkWriterJob haven't been
// received.
func (j *BulkWriterJob) Results() (*WriteResult, error) {
j.resultsLock.Lock()
defer j.resultsLock.Unlock()
if j.result == nil && j.err == nil {
j.result, j.err = j.processResults() // cache the results for additional calls
}
return j.result, j.err
}
// processResults checks for errors returned from send() and packages up the
// results as WriteResult objects
func (j *BulkWriterJob) processResults() (*WriteResult, error) {
select {
case <-j.ctx.Done():
return nil, j.ctx.Err()
case bwr := <-j.resultChan:
if bwr.err != nil {
return nil, bwr.err
}
return writeResultFromProto(bwr.result)
}
}
// setError ensures that an error is returned on the error channel of BulkWriterJob.
func (j *BulkWriterJob) setError(e error) {
bwr := bulkWriterResult{
err: e,
result: nil,
}
j.resultChan <- bwr
close(j.resultChan)
}
// A BulkWriter supports concurrent writes to multiple documents. The BulkWriter
// submits document writes in maximum batches of 20 writes per request. Each
// request can contain many different document writes: create, delete, update,
// and set are all supported.
//
// Only one operation (create, set, update, delete) per document is allowed.
// BulkWriter cannot promise atomicity: individual writes can fail or succeed
// independent of each other. Bulkwriter does not apply writes in any set order;
// thus a document can't have set on it immediately after creation.
type BulkWriter struct {
database string // the database as resource name: projects/[PROJECT]/databases/[DATABASE]
start time.Time // when this BulkWriter was started; used to calculate qps and rate increases
vc *vkit.Client // internal client
maxOpsPerSecond int // number of requests that can be sent per second
docUpdatePaths map[string]bool // document paths with corresponding writes in the queue
limiter rate.Limiter // limit requests to server to <= 500 qps
bundler *bundler.Bundler // handle bundling up writes to Firestore
ctx context.Context // context for canceling all BulkWriter operations
isOpenLock sync.RWMutex // guards against setting isOpen concurrently
isOpen bool // flag that the BulkWriter is closed
}
// newBulkWriter creates a new instance of the BulkWriter.
func newBulkWriter(ctx context.Context, c *Client, database string) *BulkWriter {
// Although typically we shouldn't store Context objects, in this case we
// need to pass this Context through to the Bundler handler.
ctx = withResourceHeader(ctx, c.path())
bw := &BulkWriter{
database: database,
start: time.Now(),
vc: c.c,
isOpen: true,
maxOpsPerSecond: defaultStartingMaximumOpsPerSecond,
docUpdatePaths: make(map[string]bool),
ctx: ctx,
limiter: *rate.NewLimiter(rate.Limit(maxWritesPerSecond), 1),
}
// can't initialize within struct above; need instance reference to BulkWriter.send()
bw.bundler = bundler.NewBundler(&BulkWriterJob{}, bw.send)
bw.bundler.HandlerLimit = bw.maxOpsPerSecond
bw.bundler.BundleCountThreshold = maxBatchSize
return bw
}
// End sends all enqueued writes in parallel and closes the BulkWriter to new requests.
// After calling End(), calling any additional method automatically returns
// with an error. This method completes when there are no more pending writes
// in the queue.
func (bw *BulkWriter) End() {
bw.isOpenLock.Lock()
bw.isOpen = false
bw.isOpenLock.Unlock()
bw.Flush()
}
// Flush commits all writes that have been enqueued up to this point in parallel.
// This method blocks execution.
func (bw *BulkWriter) Flush() {
bw.bundler.Flush()
}
// Create adds a document creation write to the queue of writes to send.
// Note: You cannot write to (Create, Update, Set, or Delete) the same document more than once.
func (bw *BulkWriter) Create(doc *DocumentRef, datum interface{}) (*BulkWriterJob, error) {
bw.isOpenLock.RLock()
defer bw.isOpenLock.RUnlock()
err := bw.checkWriteConditions(doc)
if err != nil {
return nil, err
}
w, err := doc.newCreateWrites(datum)
if err != nil {
return nil, fmt.Errorf("firestore: cannot create %v with %v", doc.ID, datum)
}
if len(w) > 1 {
return nil, fmt.Errorf("firestore: too many document writes sent to bulkwriter")
}
j := bw.write(w[0])
return j, nil
}
// Delete adds a document deletion write to the queue of writes to send.
// Note: You cannot write to (Create, Update, Set, or Delete) the same document more than once.
func (bw *BulkWriter) Delete(doc *DocumentRef, preconds ...Precondition) (*BulkWriterJob, error) {
bw.isOpenLock.RLock()
defer bw.isOpenLock.RUnlock()
err := bw.checkWriteConditions(doc)
if err != nil {
return nil, err
}
w, err := doc.newDeleteWrites(preconds)
if err != nil {
return nil, fmt.Errorf("firestore: cannot delete doc %v", doc.ID)
}
if len(w) > 1 {
return nil, fmt.Errorf("firestore: too many document writes sent to bulkwriter")
}
j := bw.write(w[0])
return j, nil
}
// Set adds a document set write to the queue of writes to send.
// Note: You cannot write to (Create, Update, Set, or Delete) the same document more than once.
func (bw *BulkWriter) Set(doc *DocumentRef, datum interface{}, opts ...SetOption) (*BulkWriterJob, error) {
bw.isOpenLock.RLock()
defer bw.isOpenLock.RUnlock()
err := bw.checkWriteConditions(doc)
if err != nil {
return nil, err
}
w, err := doc.newSetWrites(datum, opts)
if err != nil {
return nil, fmt.Errorf("firestore: cannot set %v on doc %v", datum, doc.ID)
}
if len(w) > 1 {
return nil, fmt.Errorf("firestore: too many writes sent to bulkwriter")
}
j := bw.write(w[0])
return j, nil
}
// Update adds a document update write to the queue of writes to send.
// Note: You cannot write to (Create, Update, Set, or Delete) the same document more than once.
func (bw *BulkWriter) Update(doc *DocumentRef, updates []Update, preconds ...Precondition) (*BulkWriterJob, error) {
bw.isOpenLock.RLock()
defer bw.isOpenLock.RUnlock()
err := bw.checkWriteConditions(doc)
if err != nil {
return nil, err
}
w, err := doc.newUpdatePathWrites(updates, preconds)
if err != nil {
return nil, fmt.Errorf("firestore: cannot update doc %v", doc.ID)
}
if len(w) > 1 {
return nil, fmt.Errorf("firestore: too many writes sent to bulkwriter")
}
j := bw.write(w[0])
return j, nil
}
// checkConditions determines whether this write attempt is valid. It returns
// an error if either the BulkWriter has already been closed or if it
// receives a nil document reference.
func (bw *BulkWriter) checkWriteConditions(doc *DocumentRef) error {
if !bw.isOpen {
return errors.New("firestore: BulkWriter has been closed")
}
if doc == nil {
return errors.New("firestore: nil document contents")
}
_, havePath := bw.docUpdatePaths[doc.shortPath]
if havePath {
return fmt.Errorf("firestore: BulkWriter received duplicate write for path: %v", doc.shortPath)
}
bw.docUpdatePaths[doc.shortPath] = true
return nil
}
// write packages up write requests into bulkWriterJob objects.
func (bw *BulkWriter) write(w *pb.Write) *BulkWriterJob {
j := &BulkWriterJob{
resultChan: make(chan bulkWriterResult, 1),
write: w,
ctx: bw.ctx,
}
bw.limiter.Wait(bw.ctx)
// ignore operation size constraints and related errors; can't be inferred at compile time
// Bundler is set to accept an unlimited amount of bytes
_ = bw.bundler.Add(j, 0)
return j
}
// send transmits writes to the service and matches response results to job channels.
func (bw *BulkWriter) send(i interface{}) {
bwj := i.([]*BulkWriterJob)
if len(bwj) == 0 {
return
}
var ws []*pb.Write
for _, w := range bwj {
ws = append(ws, w.write)
}
bwr := &pb.BatchWriteRequest{
Database: bw.database,
Writes: ws,
Labels: map[string]string{},
}
select {
case <-bw.ctx.Done():
return
default:
resp, err := bw.vc.BatchWrite(bw.ctx, bwr)
if err != nil {
// Do we need to be selective about what kind of errors we send?
for _, j := range bwj {
j.setError(err)
}
return
}
// Match write results with BulkWriterJob objects
for i, res := range resp.WriteResults {
s := resp.Status[i]
c := s.GetCode()
if c != 0 { // Should we do an explicit check against rpc.Code enum?
j := bwj[i]
j.attempts++
// Do we need separate retry bundler?
if j.attempts < maxRetryAttempts {
// ignore operation size constraints and related errors; job size can't be inferred at compile time
// Bundler is set to accept an unlimited amount of bytes
_ = bw.bundler.Add(j, 0)
} else {
j.setError(fmt.Errorf("firestore: write failed with status: %v", s))
}
continue
}
bwj[i].resultChan <- bulkWriterResult{err: nil, result: res}
close(bwj[i].resultChan)
}
}
}