| // Copyright 2014 Google LLC |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package storage |
| |
| import ( |
| "context" |
| "encoding/base64" |
| "errors" |
| "fmt" |
| "io" |
| "sync" |
| "time" |
| "unicode/utf8" |
| |
| "github.com/golang/protobuf/proto" |
| "google.golang.org/api/googleapi" |
| raw "google.golang.org/api/storage/v1" |
| storagepb "google.golang.org/genproto/googleapis/storage/v2" |
| "google.golang.org/grpc/codes" |
| "google.golang.org/grpc/status" |
| ) |
| |
| const ( |
| // Maximum amount of content that can be sent per WriteObjectRequest message. |
| // A buffer reaching this amount will precipitate a flush of the buffer. |
| // |
| // This is only used for the gRPC-based Writer. |
| maxPerMessageWriteSize int = int(storagepb.ServiceConstants_MAX_WRITE_CHUNK_BYTES) |
| ) |
| |
| // A Writer writes a Cloud Storage object. |
| type Writer struct { |
| // ObjectAttrs are optional attributes to set on the object. Any attributes |
| // must be initialized before the first Write call. Nil or zero-valued |
| // attributes are ignored. |
| ObjectAttrs |
| |
| // SendCRC32C specifies whether to transmit a CRC32C field. It should be set |
| // to true in addition to setting the Writer's CRC32C field, because zero |
| // is a valid CRC and normally a zero would not be transmitted. |
| // If a CRC32C is sent, and the data written does not match the checksum, |
| // the write will be rejected. |
| // |
| // Note: SendCRC32C must be set to true BEFORE the first call to |
| // Writer.Write() in order to send the checksum. If it is set after that |
| // point, the checksum will be ignored. |
| SendCRC32C bool |
| |
| // ChunkSize controls the maximum number of bytes of the object that the |
| // Writer will attempt to send to the server in a single request. Objects |
| // smaller than the size will be sent in a single request, while larger |
| // objects will be split over multiple requests. The value will be rounded up |
| // to the nearest multiple of 256K. The default ChunkSize is 16MiB. |
| // |
| // Each Writer will internally allocate a buffer of size ChunkSize. This is |
| // used to buffer input data and allow for the input to be sent again if a |
| // request must be retried. |
| // |
| // If you upload small objects (< 16MiB), you should set ChunkSize |
| // to a value slightly larger than the objects' sizes to avoid memory bloat. |
| // This is especially important if you are uploading many small objects |
| // concurrently. See |
| // https://cloud.google.com/storage/docs/json_api/v1/how-tos/upload#size |
| // for more information about performance trade-offs related to ChunkSize. |
| // |
| // If ChunkSize is set to zero, chunking will be disabled and the object will |
| // be uploaded in a single request without the use of a buffer. This will |
| // further reduce memory used during uploads, but will also prevent the writer |
| // from retrying in case of a transient error from the server or resuming an |
| // upload that fails midway through, since the buffer is required in order to |
| // retry the failed request. |
| // |
| // ChunkSize must be set before the first Write call. |
| ChunkSize int |
| |
| // ChunkRetryDeadline sets a per-chunk retry deadline for multi-chunk |
| // resumable uploads. |
| // |
| // For uploads of larger files, the Writer will attempt to retry if the |
| // request to upload a particular chunk fails with a transient error. |
| // If a single chunk has been attempting to upload for longer than this |
| // deadline and the request fails, it will no longer be retried, and the error |
| // will be returned to the caller. This is only applicable for files which are |
| // large enough to require a multi-chunk resumable upload. The default value |
| // is 32s. Users may want to pick a longer deadline if they are using larger |
| // values for ChunkSize or if they expect to have a slow or unreliable |
| // internet connection. |
| // |
| // To set a deadline on the entire upload, use context timeout or |
| // cancellation. |
| ChunkRetryDeadline time.Duration |
| |
| // ProgressFunc can be used to monitor the progress of a large write. |
| // operation. If ProgressFunc is not nil and writing requires multiple |
| // calls to the underlying service (see |
| // https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload), |
| // then ProgressFunc will be invoked after each call with the number of bytes of |
| // content copied so far. |
| // |
| // ProgressFunc should return quickly without blocking. |
| ProgressFunc func(int64) |
| |
| ctx context.Context |
| o *ObjectHandle |
| |
| opened bool |
| pw *io.PipeWriter |
| |
| donec chan struct{} // closed after err and obj are set. |
| obj *ObjectAttrs |
| |
| mu sync.Mutex |
| err error |
| |
| // The gRPC client-stream used for sending buffers. |
| // |
| // This is an experimental API and not intended for public use. |
| stream storagepb.Storage_WriteObjectClient |
| |
| // The Resumable Upload ID started by a gRPC-based Writer. |
| // |
| // This is an experimental API and not intended for public use. |
| upid string |
| } |
| |
| func (w *Writer) open() error { |
| if err := w.validateWriteAttrs(); err != nil { |
| return err |
| } |
| |
| pr, pw := io.Pipe() |
| w.pw = pw |
| w.opened = true |
| |
| go w.monitorCancel() |
| |
| attrs := w.ObjectAttrs |
| mediaOpts := []googleapi.MediaOption{ |
| googleapi.ChunkSize(w.ChunkSize), |
| } |
| if c := attrs.ContentType; c != "" { |
| mediaOpts = append(mediaOpts, googleapi.ContentType(c)) |
| } |
| if w.ChunkRetryDeadline != 0 { |
| mediaOpts = append(mediaOpts, googleapi.ChunkRetryDeadline(w.ChunkRetryDeadline)) |
| } |
| |
| go func() { |
| defer close(w.donec) |
| |
| rawObj := attrs.toRawObject(w.o.bucket) |
| if w.SendCRC32C { |
| rawObj.Crc32c = encodeUint32(attrs.CRC32C) |
| } |
| if w.MD5 != nil { |
| rawObj.Md5Hash = base64.StdEncoding.EncodeToString(w.MD5) |
| } |
| call := w.o.c.raw.Objects.Insert(w.o.bucket, rawObj). |
| Media(pr, mediaOpts...). |
| Projection("full"). |
| Context(w.ctx). |
| Name(w.o.object) |
| |
| if w.ProgressFunc != nil { |
| call.ProgressUpdater(func(n, _ int64) { w.ProgressFunc(n) }) |
| } |
| if attrs.KMSKeyName != "" { |
| call.KmsKeyName(attrs.KMSKeyName) |
| } |
| if attrs.PredefinedACL != "" { |
| call.PredefinedAcl(attrs.PredefinedACL) |
| } |
| if err := setEncryptionHeaders(call.Header(), w.o.encryptionKey, false); err != nil { |
| w.mu.Lock() |
| w.err = err |
| w.mu.Unlock() |
| pr.CloseWithError(err) |
| return |
| } |
| var resp *raw.Object |
| err := applyConds("NewWriter", w.o.gen, w.o.conds, call) |
| if err == nil { |
| if w.o.userProject != "" { |
| call.UserProject(w.o.userProject) |
| } |
| setClientHeader(call.Header()) |
| |
| // The internals that perform call.Do automatically retry both the initial |
| // call to set up the upload as well as calls to upload individual chunks |
| // for a resumable upload (as long as the chunk size is non-zero). Hence |
| // there is no need to add retries here. |
| |
| // Retry only when the operation is idempotent or the retry policy is RetryAlways. |
| isIdempotent := w.o.conds != nil && (w.o.conds.GenerationMatch >= 0 || w.o.conds.DoesNotExist == true) |
| var useRetry bool |
| if (w.o.retry == nil || w.o.retry.policy == RetryIdempotent) && isIdempotent { |
| useRetry = true |
| } else if w.o.retry != nil && w.o.retry.policy == RetryAlways { |
| useRetry = true |
| } |
| if useRetry { |
| if w.o.retry != nil { |
| call.WithRetry(w.o.retry.backoff, w.o.retry.shouldRetry) |
| } else { |
| call.WithRetry(nil, nil) |
| } |
| } |
| resp, err = call.Do() |
| } |
| if err != nil { |
| w.mu.Lock() |
| w.err = err |
| w.mu.Unlock() |
| pr.CloseWithError(err) |
| return |
| } |
| w.obj = newObject(resp) |
| }() |
| return nil |
| } |
| |
| // Write appends to w. It implements the io.Writer interface. |
| // |
| // Since writes happen asynchronously, Write may return a nil |
| // error even though the write failed (or will fail). Always |
| // use the error returned from Writer.Close to determine if |
| // the upload was successful. |
| // |
| // Writes will be retried on transient errors from the server, unless |
| // Writer.ChunkSize has been set to zero. |
| func (w *Writer) Write(p []byte) (n int, err error) { |
| w.mu.Lock() |
| werr := w.err |
| w.mu.Unlock() |
| if werr != nil { |
| return 0, werr |
| } |
| if !w.opened { |
| // gRPC client has been initialized - use gRPC to upload. |
| if w.o.c.gc != nil { |
| if err := w.openGRPC(); err != nil { |
| return 0, err |
| } |
| } else if err := w.open(); err != nil { |
| return 0, err |
| } |
| } |
| n, err = w.pw.Write(p) |
| if err != nil { |
| w.mu.Lock() |
| werr := w.err |
| w.mu.Unlock() |
| // Preserve existing functionality that when context is canceled, Write will return |
| // context.Canceled instead of "io: read/write on closed pipe". This hides the |
| // pipe implementation detail from users and makes Write seem as though it's an RPC. |
| if errors.Is(werr, context.Canceled) || errors.Is(werr, context.DeadlineExceeded) { |
| return n, werr |
| } |
| } |
| return n, err |
| } |
| |
| // Close completes the write operation and flushes any buffered data. |
| // If Close doesn't return an error, metadata about the written object |
| // can be retrieved by calling Attrs. |
| func (w *Writer) Close() error { |
| if !w.opened { |
| if err := w.open(); err != nil { |
| return err |
| } |
| } |
| |
| // Closing either the read or write causes the entire pipe to close. |
| if err := w.pw.Close(); err != nil { |
| return err |
| } |
| |
| <-w.donec |
| w.mu.Lock() |
| defer w.mu.Unlock() |
| return w.err |
| } |
| |
| // monitorCancel is intended to be used as a background goroutine. It monitors the |
| // context, and when it observes that the context has been canceled, it manually |
| // closes things that do not take a context. |
| func (w *Writer) monitorCancel() { |
| select { |
| case <-w.ctx.Done(): |
| w.mu.Lock() |
| werr := w.ctx.Err() |
| w.err = werr |
| w.mu.Unlock() |
| |
| // Closing either the read or write causes the entire pipe to close. |
| w.CloseWithError(werr) |
| case <-w.donec: |
| } |
| } |
| |
| // CloseWithError aborts the write operation with the provided error. |
| // CloseWithError always returns nil. |
| // |
| // Deprecated: cancel the context passed to NewWriter instead. |
| func (w *Writer) CloseWithError(err error) error { |
| if !w.opened { |
| return nil |
| } |
| return w.pw.CloseWithError(err) |
| } |
| |
| // Attrs returns metadata about a successfully-written object. |
| // It's only valid to call it after Close returns nil. |
| func (w *Writer) Attrs() *ObjectAttrs { |
| return w.obj |
| } |
| |
| func (w *Writer) validateWriteAttrs() error { |
| attrs := w.ObjectAttrs |
| // Check the developer didn't change the object Name (this is unfortunate, but |
| // we don't want to store an object under the wrong name). |
| if attrs.Name != w.o.object { |
| return fmt.Errorf("storage: Writer.Name %q does not match object name %q", attrs.Name, w.o.object) |
| } |
| if !utf8.ValidString(attrs.Name) { |
| return fmt.Errorf("storage: object name %q is not valid UTF-8", attrs.Name) |
| } |
| if attrs.KMSKeyName != "" && w.o.encryptionKey != nil { |
| return errors.New("storage: cannot use KMSKeyName with a customer-supplied encryption key") |
| } |
| if w.ChunkSize < 0 { |
| return errors.New("storage: Writer.ChunkSize must be non-negative") |
| } |
| return nil |
| } |
| |
| // progress is a convenience wrapper that reports write progress to the Writer |
| // ProgressFunc if it is set and progress is non-zero. |
| func (w *Writer) progress(p int64) { |
| if w.ProgressFunc != nil && p != 0 { |
| w.ProgressFunc(p) |
| } |
| } |
| |
| // error acquires the Writer's lock, sets the Writer's err to the given error, |
| // then relinquishes the lock. |
| func (w *Writer) error(err error) { |
| w.mu.Lock() |
| w.err = err |
| w.mu.Unlock() |
| } |
| |
| // openGRPC initializes a pipe for the user to write data to, and a routine to |
| // read from that pipe and upload the data to GCS via gRPC. |
| // |
| // This is an experimental API and not intended for public use. |
| func (w *Writer) openGRPC() error { |
| if err := w.validateWriteAttrs(); err != nil { |
| return err |
| } |
| |
| pr, pw := io.Pipe() |
| w.pw = pw |
| w.opened = true |
| |
| go w.monitorCancel() |
| |
| bufSize := w.ChunkSize |
| if w.ChunkSize == 0 { |
| // TODO: Should we actually use the minimum of 256 KB here when the user |
| // indicates they want minimal memory usage? We cannot do a zero-copy, |
| // bufferless upload like HTTP/JSON can. |
| // TODO: We need to determine if we can avoid starting a |
| // resumable upload when the user *plans* to send more than bufSize but |
| // with a bufferless upload. |
| bufSize = maxPerMessageWriteSize |
| } |
| buf := make([]byte, bufSize) |
| |
| var offset int64 |
| |
| // This function reads the data sent to the pipe and sends sets of messages |
| // on the gRPC client-stream as the buffer is filled. |
| go func() { |
| defer close(w.donec) |
| |
| // Loop until there is an error or the Object has been finalized. |
| for { |
| // Note: This blocks until either the buffer is full or EOF is read. |
| recvd, doneReading, err := read(pr, buf) |
| if err != nil { |
| err = checkCanceled(err) |
| w.error(err) |
| pr.CloseWithError(err) |
| return |
| } |
| toWrite := buf[:recvd] |
| |
| // TODO: Figure out how to set up encryption via CommonObjectRequestParams. |
| |
| // The chunk buffer is full, but there is no end in sight. This |
| // means that a resumable upload will need to be used to send |
| // multiple chunks, until we are done reading data. Start a |
| // resumable upload if it has not already been started. |
| // Otherwise, all data will be sent over a single gRPC stream. |
| if !doneReading && w.upid == "" { |
| err = w.startResumableUpload() |
| if err != nil { |
| err = checkCanceled(err) |
| w.error(err) |
| pr.CloseWithError(err) |
| return |
| } |
| } |
| |
| o, off, finalized, err := w.uploadBuffer(toWrite, recvd, offset, doneReading) |
| if err != nil { |
| err = checkCanceled(err) |
| w.error(err) |
| pr.CloseWithError(err) |
| return |
| } |
| // At this point, the current buffer has been uploaded. Capture the |
| // committed offset here in case the upload was not finalized and |
| // another chunk is to be uploaded. |
| offset = off |
| w.progress(offset) |
| |
| // When we are done reading data and the chunk has been finalized, |
| // we are done. |
| if doneReading && finalized { |
| // Build Object from server's response. |
| w.obj = newObjectFromProto(o) |
| return |
| } |
| } |
| }() |
| |
| return nil |
| } |
| |
| // startResumableUpload initializes a Resumable Upload with gRPC and sets the |
| // upload ID on the Writer. |
| // |
| // This is an experimental API and not intended for public use. |
| func (w *Writer) startResumableUpload() error { |
| spec, err := w.writeObjectSpec() |
| if err != nil { |
| return err |
| } |
| upres, err := w.o.c.gc.StartResumableWrite(w.ctx, &storagepb.StartResumableWriteRequest{ |
| WriteObjectSpec: spec, |
| }) |
| |
| w.upid = upres.GetUploadId() |
| return err |
| } |
| |
| // queryProgress is a helper that queries the status of the resumable upload |
| // associated with the given upload ID. |
| // |
| // This is an experimental API and not intended for public use. |
| func (w *Writer) queryProgress() (int64, error) { |
| q, err := w.o.c.gc.QueryWriteStatus(w.ctx, &storagepb.QueryWriteStatusRequest{UploadId: w.upid}) |
| |
| // q.GetCommittedSize() will return 0 if q is nil. |
| return q.GetPersistedSize(), err |
| } |
| |
| // uploadBuffer opens a Write stream and uploads the buffer at the given offset (if |
| // uploading a chunk for a resumable uploadBuffer), and will mark the write as |
| // finished if we are done receiving data from the user. The resulting write |
| // offset after uploading the buffer is returned, as well as a boolean |
| // indicating if the Object has been finalized. If it has been finalized, the |
| // final Object will be returned as well. Finalizing the upload is primarily |
| // important for Resumable Uploads. A simple or multi-part upload will always |
| // be finalized once the entire buffer has been written. |
| // |
| // This is an experimental API and not intended for public use. |
| func (w *Writer) uploadBuffer(buf []byte, recvd int, start int64, doneReading bool) (*storagepb.Object, int64, bool, error) { |
| var err error |
| var finishWrite bool |
| var sent, limit int = 0, maxPerMessageWriteSize |
| offset := start |
| for { |
| first := sent == 0 |
| // This indicates that this is the last message and the remaining |
| // data fits in one message. |
| belowLimit := recvd-sent <= limit |
| if belowLimit { |
| limit = recvd - sent |
| } |
| if belowLimit && doneReading { |
| finishWrite = true |
| } |
| |
| // Prepare chunk section for upload. |
| data := buf[sent : sent+limit] |
| req := &storagepb.WriteObjectRequest{ |
| Data: &storagepb.WriteObjectRequest_ChecksummedData{ |
| ChecksummedData: &storagepb.ChecksummedData{ |
| Content: data, |
| }, |
| }, |
| WriteOffset: offset, |
| FinishWrite: finishWrite, |
| } |
| |
| // Open a new stream and set the first_message field on the request. |
| // The first message on the WriteObject stream must either be the |
| // Object or the Resumable Upload ID. |
| if first { |
| w.stream, err = w.o.c.gc.WriteObject(w.ctx) |
| if err != nil { |
| return nil, 0, false, err |
| } |
| |
| if w.upid != "" { |
| req.FirstMessage = &storagepb.WriteObjectRequest_UploadId{UploadId: w.upid} |
| } else { |
| spec, err := w.writeObjectSpec() |
| if err != nil { |
| return nil, 0, false, err |
| } |
| req.FirstMessage = &storagepb.WriteObjectRequest_WriteObjectSpec{ |
| WriteObjectSpec: spec, |
| } |
| } |
| |
| // TODO: Currently the checksums are only sent on the first message |
| // of the stream, but in the future, we must also support sending it |
| // on the *last* message of the stream (instead of the first). |
| if w.SendCRC32C { |
| req.ObjectChecksums = &storagepb.ObjectChecksums{ |
| Crc32C: proto.Uint32(w.CRC32C), |
| Md5Hash: w.MD5, |
| } |
| } |
| } |
| |
| err = w.stream.Send(req) |
| if err == io.EOF { |
| // err was io.EOF. The client-side of a stream only gets an EOF on Send |
| // when the backend closes the stream and wants to return an error |
| // status. Closing the stream receives the status as an error. |
| _, err = w.stream.CloseAndRecv() |
| |
| // Retriable errors mean we should start over and attempt to |
| // resend the entire buffer via a new stream. |
| // If not retriable, falling through will return the error received |
| // from closing the stream. |
| if shouldRetry(err) { |
| sent = 0 |
| finishWrite = false |
| // TODO: Add test case for failure modes of querying progress. |
| offset, err = w.determineOffset(start) |
| if err == nil { |
| continue |
| } |
| } |
| } |
| if err != nil { |
| return nil, 0, false, err |
| } |
| |
| // Update the immediate stream's sent total and the upload offset with |
| // the data sent. |
| sent += len(data) |
| offset += int64(len(data)) |
| |
| // Not done sending data, do not attempt to commit it yet, loop around |
| // and send more data. |
| if recvd-sent > 0 { |
| continue |
| } |
| |
| // Done sending data. Close the stream to "commit" the data sent. |
| resp, finalized, err := w.commit() |
| // Retriable errors mean we should start over and attempt to |
| // resend the entire buffer via a new stream. |
| // If not retriable, falling through will return the error received |
| // from closing the stream. |
| if shouldRetry(err) { |
| sent = 0 |
| finishWrite = false |
| offset, err = w.determineOffset(start) |
| if err == nil { |
| continue |
| } |
| } |
| if err != nil { |
| return nil, 0, false, err |
| } |
| |
| return resp.GetResource(), offset, finalized, nil |
| } |
| } |
| |
| // determineOffset either returns the offset given to it in the case of a simple |
| // upload, or queries the write status in the case a resumable upload is being |
| // used. |
| // |
| // This is an experimental API and not intended for public use. |
| func (w *Writer) determineOffset(offset int64) (int64, error) { |
| // For a Resumable Upload, we must start from however much data |
| // was committed. |
| if w.upid != "" { |
| committed, err := w.queryProgress() |
| if err != nil { |
| return 0, err |
| } |
| offset = committed |
| } |
| return offset, nil |
| } |
| |
| // commit closes the stream to commit the data sent and potentially receive |
| // the finalized object if finished uploading. If the last request sent |
| // indicated that writing was finished, the Object will be finalized and |
| // returned. If not, then the Object will be nil, and the boolean returned will |
| // be false. |
| // |
| // This is an experimental API and not intended for public use. |
| func (w *Writer) commit() (*storagepb.WriteObjectResponse, bool, error) { |
| finalized := true |
| resp, err := w.stream.CloseAndRecv() |
| if err == io.EOF { |
| // Closing a stream for a resumable upload finish_write = false results |
| // in an EOF which can be ignored, as we aren't done uploading yet. |
| finalized = false |
| err = nil |
| } |
| // Drop the stream reference as it has been closed. |
| w.stream = nil |
| |
| return resp, finalized, err |
| } |
| |
| // writeObjectSpec constructs a WriteObjectSpec proto using the Writer's |
| // ObjectAttrs and applies its Conditions. This is only used for gRPC. |
| // |
| // This is an experimental API and not intended for public use. |
| func (w *Writer) writeObjectSpec() (*storagepb.WriteObjectSpec, error) { |
| spec := &storagepb.WriteObjectSpec{ |
| Resource: w.ObjectAttrs.toProtoObject(w.o.bucket), |
| } |
| // WriteObject doesn't support the generation condition, so use -1. |
| if err := applyCondsProto("WriteObject", -1, w.o.conds, spec); err != nil { |
| return nil, err |
| } |
| return spec, nil |
| } |
| |
| // read copies the data in the reader to the given buffer and reports how much |
| // data was read into the buffer and if there is no more data to read (EOF). |
| // |
| // This is an experimental API and not intended for public use. |
| func read(r io.Reader, buf []byte) (int, bool, error) { |
| // Set n to -1 to start the Read loop. |
| var n, recvd int = -1, 0 |
| var err error |
| for err == nil && n != 0 { |
| // The routine blocks here until data is received. |
| n, err = r.Read(buf[recvd:]) |
| recvd += n |
| } |
| var done bool |
| if err == io.EOF { |
| done = true |
| err = nil |
| } |
| return recvd, done, err |
| } |
| |
| func checkCanceled(err error) error { |
| if status.Code(err) == codes.Canceled { |
| return context.Canceled |
| } |
| |
| return err |
| } |