pubsub: send initial modack in synchronous=true mode

Messages that do not have an initial modack sent often become invisibly
expired. How this happens:

(given some ack distribution spooled up to 5m)
(given subscription config AckDeadline=15s)

- kaTick happens, send modacks. Next kaTick is 5m later
- 10s after this kaTick occurs, a message arrives. We do not send a modack, so
the expiration is 15s.
- 15s goes by. If the user didn't ack by now this message is dead-in-the-water,
because now the server has decided that this message has expired.
- 4m35s goes by and we send a modack. But, of course, it has already expired.

Whenever the user gets around to sending the ack, if they do so after the 15s
this ack will be recorded as status=expired in stackdriver.

This CL makes all messages - including synchronous - send a modack on receipt
to fix this issue. Users relying on Synchronous=true as a way to force the
subscription's AckDeadline to be used should instead use MaxExtension:

```
	cfg, err := sub.Config(ctx)
	if err != nil {
		// TODO handle err
	}

	sub.ReceiveSettings.MaxExtension = cfg.AckDeadline
```

This CL also removes the customized logger in favour of t.Log and a panic.

Fixes #1247

Change-Id: I6aa65e083ab12a79e61e8a4d968910dc794c964d
Reviewed-on: https://code-review.googlesource.com/c/36430
Reviewed-by: kokoro <noreply+kokoro@google.com>
Reviewed-by: Eno Compton <enocom@google.com>
2 files changed
tree: 89979b96c23ac0eb53cce235fb80048159a351c3
  1. asset/
  2. bigquery/
  3. bigtable/
  4. civil/
  5. cloudtasks/
  6. cmd/
  7. compute/
  8. container/
  9. containeranalysis/
  10. dataproc/
  11. datastore/
  12. debugger/
  13. dialogflow/
  14. dlp/
  15. errorreporting/
  16. expr/
  17. firestore/
  18. functions/
  19. httpreplay/
  20. iam/
  21. internal/
  22. kms/
  23. language/
  24. logging/
  25. longrunning/
  26. monitoring/
  27. oslogin/
  28. profiler/
  29. pubsub/
  30. redis/
  31. rpcreplay/
  32. scheduler/
  33. securitycenter/
  34. spanner/
  35. speech/
  36. storage/
  37. texttospeech/
  38. trace/
  39. translate/
  40. videointelligence/
  41. vision/
  42. authexample_test.go
  43. AUTHORS
  44. CHANGES.md
  45. cloud.go
  46. CODE_OF_CONDUCT.md
  47. CONTRIBUTING.md
  48. CONTRIBUTORS
  49. examples_test.go
  50. issue_template.md
  51. keys.tar.enc
  52. LICENSE
  53. license_test.go
  54. old-news.md
  55. README.md
  56. regen-gapic.sh
  57. RELEASING.md
  58. run-tests.sh
README.md

Google Cloud Client Libraries for Go

GoDoc

Go packages for Google Cloud Platform services.

import "cloud.google.com/go"

To install the packages on your system, do not clone the repo. Instead use

$ go get -u cloud.google.com/go/...

NOTE: Some of these packages are under development, and may occasionally make backwards-incompatible changes.

NOTE: Github repo is a mirror of https://code.googlesource.com/gocloud.

News

7 August 2018

As of November 1, the code in the repo will no longer support Go versions 1.8 and earlier. No one other than AppEngine users should be on those old versions, and AppEngine Standard and Flex will stop supporting new deployments with those versions on that date.

Changes have been moved to CHANGES.

Supported APIs

Google APIStatusPackage
Assetalphacloud.google.com/go/asset/v1beta
BigQuerystablecloud.google.com/go/bigquery
Bigtablestablecloud.google.com/go/bigtable
Cloudtasksbetacloud.google.com/go/cloudtasks/apiv2beta3
Containerstablecloud.google.com/go/container/apiv1
ContainerAnalysisbetacloud.google.com/go/containeranalysis/apiv1beta1
Dataprocstablecloud.google.com/go/dataproc/apiv1
Datastorestablecloud.google.com/go/datastore
Debuggeralphacloud.google.com/go/debugger/apiv2
Dialogflowalphacloud.google.com/go/dialogflow/apiv2
Data Loss Preventionalphacloud.google.com/go/dlp/apiv2
ErrorReportingalphacloud.google.com/go/errorreporting
Firestorebetacloud.google.com/go/firestore
IAMstablecloud.google.com/go/iam
KMSstablecloud.google.com/go/kms
Natural Languagestablecloud.google.com/go/language/apiv1
Loggingstablecloud.google.com/go/logging
Monitoringalphacloud.google.com/go/monitoring/apiv3
OS Loginalphacloud.google.com/compute/docs/oslogin/rest
Pub/Substablecloud.google.com/go/pubsub
Memorystorestablecloud.google.com/go/redis/apiv1beta1
Spannerstablecloud.google.com/go/spanner
Speechstablecloud.google.com/go/speech/apiv1
Storagestablecloud.google.com/go/storage
Text To Speechalphacloud.google.com/go/texttospeech/apiv1
Tracealphacloud.google.com/go/trace/apiv2
Translationstablecloud.google.com/go/translate
Video Intelligencealphacloud.google.com/go/videointelligence/apiv1beta1
Visionstablecloud.google.com/go/vision/apiv1

Alpha status: the API is still being actively developed. As a result, it might change in backward-incompatible ways and is not recommended for production use.

Beta status: the API is largely complete, but still has outstanding features and bugs to be addressed. There may be minor backwards-incompatible changes where necessary.

Stable status: the API is mature and ready for production use. We will continue addressing bugs and feature requests.

Documentation and examples are available at https://godoc.org/cloud.google.com/go

Visit or join the google-api-go-announce group for updates on these packages.

Go Versions Supported

We support the two most recent major versions of Go. If Google App Engine uses an older version, we support that as well.

Authorization

By default, each API will use Google Application Default Credentials for authorization credentials used in calling the API endpoints. This will allow your application to run in many environments without requiring explicit configuration.

client, err := storage.NewClient(ctx)

To authorize using a JSON key file, pass option.WithCredentialsFile to the NewClient function of the desired package. For example:

client, err := storage.NewClient(ctx, option.WithCredentialsFile("path/to/keyfile.json"))

You can exert more control over authorization by using the golang.org/x/oauth2 package to create an oauth2.TokenSource. Then pass option.WithTokenSource to the NewClient function: snip:# (auth-ts)

tokenSource := ...
client, err := storage.NewClient(ctx, option.WithTokenSource(tokenSource))

Cloud Datastore GoDoc

Example Usage

First create a datastore.Client to use throughout your application:

client, err := datastore.NewClient(ctx, "my-project-id")
if err != nil {
	log.Fatal(err)
}

Then use that client to interact with the API:

type Post struct {
	Title       string
	Body        string `datastore:",noindex"`
	PublishedAt time.Time
}
keys := []*datastore.Key{
	datastore.NameKey("Post", "post1", nil),
	datastore.NameKey("Post", "post2", nil),
}
posts := []*Post{
	{Title: "Post 1", Body: "...", PublishedAt: time.Now()},
	{Title: "Post 2", Body: "...", PublishedAt: time.Now()},
}
if _, err := client.PutMulti(ctx, keys, posts); err != nil {
	log.Fatal(err)
}

Cloud Storage GoDoc

Example Usage

First create a storage.Client to use throughout your application:

client, err := storage.NewClient(ctx)
if err != nil {
	log.Fatal(err)
}
// Read the object1 from bucket.
rc, err := client.Bucket("bucket").Object("object1").NewReader(ctx)
if err != nil {
	log.Fatal(err)
}
defer rc.Close()
body, err := ioutil.ReadAll(rc)
if err != nil {
	log.Fatal(err)
}

Cloud Pub/Sub GoDoc

Example Usage

First create a pubsub.Client to use throughout your application:

client, err := pubsub.NewClient(ctx, "project-id")
if err != nil {
	log.Fatal(err)
}

Then use the client to publish and subscribe:

// Publish "hello world" on topic1.
topic := client.Topic("topic1")
res := topic.Publish(ctx, &pubsub.Message{
	Data: []byte("hello world"),
})
// The publish happens asynchronously.
// Later, you can get the result from res:
...
msgID, err := res.Get(ctx)
if err != nil {
	log.Fatal(err)
}

// Use a callback to receive messages via subscription1.
sub := client.Subscription("subscription1")
err = sub.Receive(ctx, func(ctx context.Context, m *pubsub.Message) {
	fmt.Println(m.Data)
	m.Ack() // Acknowledge that we've consumed the message.
})
if err != nil {
	log.Println(err)
}

BigQuery GoDoc

Example Usage

First create a bigquery.Client to use throughout your application: snip:# (bq-1)

c, err := bigquery.NewClient(ctx, "my-project-ID")
if err != nil {
	// TODO: Handle error.
}

Then use that client to interact with the API: snip:# (bq-2)

// Construct a query.
q := c.Query(`
    SELECT year, SUM(number)
    FROM [bigquery-public-data:usa_names.usa_1910_2013]
    WHERE name = "William"
    GROUP BY year
    ORDER BY year
`)
// Execute the query.
it, err := q.Read(ctx)
if err != nil {
	// TODO: Handle error.
}
// Iterate through the results.
for {
	var values []bigquery.Value
	err := it.Next(&values)
	if err == iterator.Done {
		break
	}
	if err != nil {
		// TODO: Handle error.
	}
	fmt.Println(values)
}

Stackdriver Logging GoDoc

Example Usage

First create a logging.Client to use throughout your application: snip:# (logging-1)

ctx := context.Background()
client, err := logging.NewClient(ctx, "my-project")
if err != nil {
	// TODO: Handle error.
}

Usually, you'll want to add log entries to a buffer to be periodically flushed (automatically and asynchronously) to the Stackdriver Logging service. snip:# (logging-2)

logger := client.Logger("my-log")
logger.Log(logging.Entry{Payload: "something happened!"})

Close your client before your program exits, to flush any buffered log entries. snip:# (logging-3)

err = client.Close()
if err != nil {
	// TODO: Handle error.
}

Cloud Spanner GoDoc

Example Usage

First create a spanner.Client to use throughout your application:

client, err := spanner.NewClient(ctx, "projects/P/instances/I/databases/D")
if err != nil {
	log.Fatal(err)
}
// Simple Reads And Writes
_, err = client.Apply(ctx, []*spanner.Mutation{
	spanner.Insert("Users",
		[]string{"name", "email"},
		[]interface{}{"alice", "a@example.com"})})
if err != nil {
	log.Fatal(err)
}
row, err := client.Single().ReadRow(ctx, "Users",
	spanner.Key{"alice"}, []string{"email"})
if err != nil {
	log.Fatal(err)
}

Contributing

Contributions are welcome. Please, see the CONTRIBUTING document for details. We‘re using Gerrit for our code reviews. Please don’t open pull requests against this repo, new pull requests will be automatically closed.

Please note that this project is released with a Contributor Code of Conduct. By participating in this project you agree to abide by its terms. See Contributor Code of Conduct for more information.