| // Copyright 2018 Google LLC |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // https://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package pubsub_test |
| |
| import ( |
| "context" |
| "fmt" |
| "log" |
| "time" |
| |
| pubsub "cloud.google.com/go/pubsub/apiv1" |
| pubsubpb "google.golang.org/genproto/googleapis/pubsub/v1" |
| ) |
| |
| func ExampleSubscriberClient_Pull_lengthyClientProcessing() { |
| projectID := "some-project" |
| subscriptionID := "some-subscription" |
| |
| ctx := context.Background() |
| client, err := pubsub.NewSubscriberClient(ctx) |
| if err != nil { |
| log.Fatal(err) |
| } |
| defer client.Close() |
| |
| sub := fmt.Sprintf("projects/%s/subscriptions/%s", projectID, subscriptionID) |
| // Be sure to tune the MaxMessages parameter per your project's needs, and accordingly |
| // adjust the ack behavior below to batch acknowledgements. |
| req := pubsubpb.PullRequest{ |
| Subscription: sub, |
| MaxMessages: 1, |
| } |
| |
| fmt.Println("Listening..") |
| |
| for { |
| res, err := client.Pull(ctx, &req) |
| if err != nil { |
| log.Fatal(err) |
| } |
| |
| // client.Pull returns an empty list if there are no messages available in the |
| // backlog. We should skip processing steps when that happens. |
| if len(res.ReceivedMessages) == 0 { |
| continue |
| } |
| |
| var recvdAckIDs []string |
| for _, m := range res.ReceivedMessages { |
| recvdAckIDs = append(recvdAckIDs, m.AckId) |
| } |
| |
| var done = make(chan struct{}) |
| var delay = 0 * time.Second // Tick immediately upon reception |
| var ackDeadline = 10 * time.Second |
| |
| // Continuously notify the server that processing is still happening on this batch. |
| go func() { |
| for { |
| select { |
| case <-ctx.Done(): |
| return |
| case <-done: |
| return |
| case <-time.After(delay): |
| err := client.ModifyAckDeadline(ctx, &pubsubpb.ModifyAckDeadlineRequest{ |
| Subscription: sub, |
| AckIds: recvdAckIDs, |
| AckDeadlineSeconds: int32(ackDeadline.Seconds()), |
| }) |
| if err != nil { |
| log.Fatal(err) |
| } |
| delay = ackDeadline - 5*time.Second // 5 seconds grace period. |
| } |
| } |
| }() |
| |
| for _, m := range res.ReceivedMessages { |
| // Process the message here, possibly in a goroutine. |
| log.Printf("Got message: %s", string(m.Message.Data)) |
| |
| err := client.Acknowledge(ctx, &pubsubpb.AcknowledgeRequest{ |
| Subscription: sub, |
| AckIds: []string{m.AckId}, |
| }) |
| if err != nil { |
| log.Fatal(err) |
| } |
| } |
| |
| close(done) |
| } |
| } |