| // Copyright 2016 Google Inc. All Rights Reserved. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package pubsub |
| |
| import ( |
| "fmt" |
| "math" |
| "strings" |
| |
| pb "google.golang.org/genproto/googleapis/pubsub/v1" |
| "google.golang.org/grpc/codes" |
| "google.golang.org/grpc/status" |
| ) |
| |
| // maxPayload is the maximum number of bytes to devote to actual ids in |
| // acknowledgement or modifyAckDeadline requests. A serialized |
| // AcknowledgeRequest proto has a small constant overhead, plus the size of the |
| // subscription name, plus 3 bytes per ID (a tag byte and two size bytes). A |
| // ModifyAckDeadlineRequest has an additional few bytes for the deadline. We |
| // don't know the subscription name here, so we just assume the size exclusive |
| // of ids is 100 bytes. |
| // |
| // With gRPC there is no way for the client to know the server's max message size (it is |
| // configurable on the server). We know from experience that it |
| // it 512K. |
| const ( |
| maxPayload = 512 * 1024 |
| reqFixedOverhead = 100 |
| overheadPerID = 3 |
| maxSendRecvBytes = 20 * 1024 * 1024 // 20M |
| ) |
| |
| func convertMessages(rms []*pb.ReceivedMessage) ([]*Message, error) { |
| msgs := make([]*Message, 0, len(rms)) |
| for i, m := range rms { |
| msg, err := toMessage(m) |
| if err != nil { |
| return nil, fmt.Errorf("pubsub: cannot decode the retrieved message at index: %d, message: %+v", i, m) |
| } |
| msgs = append(msgs, msg) |
| } |
| return msgs, nil |
| } |
| |
| func trunc32(i int64) int32 { |
| if i > math.MaxInt32 { |
| i = math.MaxInt32 |
| } |
| return int32(i) |
| } |
| |
| // func newStreamingPuller(ctx context.Context, subc *vkit.SubscriberClient, subName string, ackDeadlineSecs int32) *streamingPuller { |
| // p := &streamingPuller{ |
| // ctx: ctx, |
| // subName: subName, |
| // ackDeadlineSecs: ackDeadlineSecs, |
| // subc: subc, |
| // } |
| // p.c = sync.NewCond(&p.mu) |
| // return p |
| // } |
| |
| // type streamingPuller struct { |
| // ctx context.Context |
| // subName string |
| // ackDeadlineSecs int32 |
| // subc *vkit.SubscriberClient |
| |
| // mu sync.Mutex |
| // c *sync.Cond |
| // inFlight bool |
| // closed bool // set after CloseSend called |
| // spc pb.Subscriber_StreamingPullClient |
| // err error |
| // } |
| |
| // // open establishes (or re-establishes) a stream for pulling messages. |
| // // It takes care that only one RPC is in flight at a time. |
| // func (p *streamingPuller) open() error { |
| // p.c.L.Lock() |
| // defer p.c.L.Unlock() |
| // p.openLocked() |
| // return p.err |
| // } |
| |
| // func (p *streamingPuller) openLocked() { |
| // if p.inFlight { |
| // // Another goroutine is opening; wait for it. |
| // for p.inFlight { |
| // p.c.Wait() |
| // } |
| // return |
| // } |
| // // No opens in flight; start one. |
| // // Keep the lock held, to avoid a race where we |
| // // close the old stream while opening a new one. |
| // p.inFlight = true |
| // spc, err := p.subc.StreamingPull(p.ctx, gax.WithGRPCOptions(grpc.MaxCallRecvMsgSize(maxSendRecvBytes))) |
| // if err == nil { |
| // err = spc.Send(&pb.StreamingPullRequest{ |
| // Subscription: p.subName, |
| // StreamAckDeadlineSeconds: p.ackDeadlineSecs, |
| // }) |
| // } |
| // p.spc = spc |
| // p.err = err |
| // p.inFlight = false |
| // p.c.Broadcast() |
| // } |
| |
| // func (p *streamingPuller) call(f func(pb.Subscriber_StreamingPullClient) error) error { |
| // p.c.L.Lock() |
| // defer p.c.L.Unlock() |
| // // Wait for an open in flight. |
| // for p.inFlight { |
| // p.c.Wait() |
| // } |
| // var err error |
| // var bo gax.Backoff |
| // for { |
| // select { |
| // case <-p.ctx.Done(): |
| // p.err = p.ctx.Err() |
| // default: |
| // } |
| // if p.err != nil { |
| // return p.err |
| // } |
| // spc := p.spc |
| // // Do not call f with the lock held. Only one goroutine calls Send |
| // // (streamingMessageIterator.sender) and only one calls Recv |
| // // (streamingMessageIterator.receiver). If we locked, then a |
| // // blocked Recv would prevent a Send from happening. |
| // p.c.L.Unlock() |
| // err = f(spc) |
| // p.c.L.Lock() |
| // if !p.closed && err != nil && isRetryable(err) { |
| // // Sleep with exponential backoff. Normally we wouldn't hold the lock while sleeping, |
| // // but here it can't do any harm, since the stream is broken anyway. |
| // gax.Sleep(p.ctx, bo.Pause()) |
| // p.openLocked() |
| // continue |
| // } |
| // // Not an error, or not a retryable error; stop retrying. |
| // p.err = err |
| // return err |
| // } |
| // } |
| |
| // Logic from https://github.com/GoogleCloudPlatform/google-cloud-java/blob/master/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StatusUtil.java. |
| func isRetryable(err error) bool { |
| s, ok := status.FromError(err) |
| if !ok { // includes io.EOF, normal stream close, which causes us to reopen |
| return true |
| } |
| switch s.Code() { |
| case codes.DeadlineExceeded, codes.Internal, codes.Canceled, codes.ResourceExhausted: |
| return true |
| case codes.Unavailable: |
| return !strings.Contains(s.Message(), "Server shutdownNow invoked") |
| default: |
| return false |
| } |
| } |
| |
| // func (p *streamingPuller) fetchMessages() ([]*Message, error) { |
| // var res *pb.StreamingPullResponse |
| // err := p.call(func(spc pb.Subscriber_StreamingPullClient) error { |
| // var err error |
| // res, err = spc.Recv() |
| // return err |
| // }) |
| // if err != nil { |
| // return nil, err |
| // } |
| // return convertMessages(res.ReceivedMessages) |
| // } |
| |
| // func (p *streamingPuller) send(req *pb.StreamingPullRequest) error { |
| // // Note: len(modAckIDs) == len(modSecs) |
| // var rest *pb.StreamingPullRequest |
| // for len(req.AckIds) > 0 || len(req.ModifyDeadlineAckIds) > 0 { |
| // req, rest = splitRequest(req, maxPayload) |
| // err := p.call(func(spc pb.Subscriber_StreamingPullClient) error { |
| // x := spc.Send(req) |
| // return x |
| // }) |
| // if err != nil { |
| // return err |
| // } |
| // req = rest |
| // } |
| // return nil |
| // } |
| |
| // func (p *streamingPuller) closeSend() { |
| // p.mu.Lock() |
| // p.closed = true |
| // p.spc.CloseSend() |
| // p.mu.Unlock() |
| // } |
| |
| // Split req into a prefix that is smaller than maxSize, and a remainder. |
| func splitRequest(req *pb.StreamingPullRequest, maxSize int) (prefix, remainder *pb.StreamingPullRequest) { |
| const int32Bytes = 4 |
| |
| // Copy all fields before splitting the variable-sized ones. |
| remainder = &pb.StreamingPullRequest{} |
| *remainder = *req |
| // Split message so it isn't too big. |
| size := reqFixedOverhead |
| i := 0 |
| for size < maxSize && (i < len(req.AckIds) || i < len(req.ModifyDeadlineAckIds)) { |
| if i < len(req.AckIds) { |
| size += overheadPerID + len(req.AckIds[i]) |
| } |
| if i < len(req.ModifyDeadlineAckIds) { |
| size += overheadPerID + len(req.ModifyDeadlineAckIds[i]) + int32Bytes |
| } |
| i++ |
| } |
| |
| min := func(a, b int) int { |
| if a < b { |
| return a |
| } |
| return b |
| } |
| |
| j := i |
| if size > maxSize { |
| j-- |
| } |
| k := min(j, len(req.AckIds)) |
| remainder.AckIds = req.AckIds[k:] |
| req.AckIds = req.AckIds[:k] |
| k = min(j, len(req.ModifyDeadlineAckIds)) |
| remainder.ModifyDeadlineAckIds = req.ModifyDeadlineAckIds[k:] |
| remainder.ModifyDeadlineSeconds = req.ModifyDeadlineSeconds[k:] |
| req.ModifyDeadlineAckIds = req.ModifyDeadlineAckIds[:k] |
| req.ModifyDeadlineSeconds = req.ModifyDeadlineSeconds[:k] |
| return req, remainder |
| } |