transport/grpc: add WithConnPool dialer option
This "internal" option allows clients to share a connection pool for
longrunning GAPIC clients.
Most usages of WithGRPCConn internally should be changed to
WithConnPool.
Change-Id: I496a6e229c0d5695de8e1381704ac11ca078b162
Reviewed-on: https://code-review.googlesource.com/c/google-api-go-client/+/51532
Reviewed-by: kokoro <noreply+kokoro@google.com>
Reviewed-by: Noah Dietz <ndietz@google.com>
Reviewed-by: Tyler Bui-Palsulich <tbp@google.com>
diff --git a/internal/conn_pool.go b/internal/conn_pool.go
new file mode 100644
index 0000000..fedcce1
--- /dev/null
+++ b/internal/conn_pool.go
@@ -0,0 +1,30 @@
+// Copyright 2020 Google LLC.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package internal
+
+import (
+ "google.golang.org/grpc"
+)
+
+// ConnPool is a pool of grpc.ClientConns.
+type ConnPool interface {
+ // Conn returns a ClientConn from the pool.
+ //
+ // Conns aren't returned to the pool.
+ Conn() *grpc.ClientConn
+
+ // Num returns the number of connections in the pool.
+ //
+ // It will always return the same value.
+ Num() int
+
+ // Close closes every ClientConn in the pool.
+ //
+ // The error returned by Close may be a single error or multiple errors.
+ Close() error
+
+ // ConnPool implements grpc.ClientConnInterface to enable it to be used directly with generated proto stubs.
+ grpc.ClientConnInterface
+}
diff --git a/internal/settings.go b/internal/settings.go
index de946a0..3af6559 100644
--- a/internal/settings.go
+++ b/internal/settings.go
@@ -29,7 +29,8 @@
HTTPClient *http.Client
GRPCDialOpts []grpc.DialOption
GRPCConn *grpc.ClientConn
- GRPCConnPool int
+ GRPCConnPool ConnPool
+ GRPCConnPoolSize int
NoAuth bool
TelemetryDisabled bool
@@ -71,6 +72,12 @@
if nCreds > 1 && !(nCreds == 2 && ds.TokenSource != nil && ds.CredentialsFile != "") {
return errors.New("multiple credential options provided")
}
+ if ds.GRPCConn != nil && ds.GRPCConnPool != nil {
+ return errors.New("WithGRPCConn is incompatible with WithConnPool")
+ }
+ if ds.HTTPClient != nil && ds.GRPCConnPool != nil {
+ return errors.New("WithHTTPClient is incompatible with WithConnPool")
+ }
if ds.HTTPClient != nil && ds.GRPCConn != nil {
return errors.New("WithHTTPClient is incompatible with WithGRPCConn")
}
diff --git a/option/option.go b/option/option.go
index fe91869..0de9466 100644
--- a/option/option.go
+++ b/option/option.go
@@ -150,7 +150,7 @@
type withGRPCConnectionPool int
func (w withGRPCConnectionPool) Apply(o *internal.DialSettings) {
- o.GRPCConnPool = int(w)
+ o.GRPCConnPoolSize = int(w)
}
// WithAPIKey returns a ClientOption that specifies an API key to be used
diff --git a/transport/grpc/dial.go b/transport/grpc/dial.go
index adad182..3c0782e 100644
--- a/transport/grpc/dial.go
+++ b/transport/grpc/dial.go
@@ -36,13 +36,16 @@
// Dial returns a GRPC connection for use communicating with a Google cloud
// service, configured with the given ClientOptions.
func Dial(ctx context.Context, opts ...option.ClientOption) (*grpc.ClientConn, error) {
- var o internal.DialSettings
- for _, opt := range opts {
- opt.Apply(&o)
+ o, err := processAndValidateOpts(opts)
+ if err != nil {
+ return nil, err
}
- if o.GRPCConnPool != 0 {
+ if o.GRPCConnPool != nil {
+ return o.GRPCConnPool.Conn(), nil
+ }
+ if o.GRPCConnPoolSize != 0 {
// NOTE(cbro): RoundRobin and WithBalancer are deprecated and we need to remove usages of it.
- balancer := grpc.RoundRobin(internal.NewPoolResolver(o.GRPCConnPool, &o))
+ balancer := grpc.RoundRobin(internal.NewPoolResolver(o.GRPCConnPoolSize, o))
o.GRPCDialOpts = append(o.GRPCDialOpts, grpc.WithBalancer(balancer))
}
return dial(ctx, false, o)
@@ -52,9 +55,9 @@
// with fake or mock Google cloud service implementations, such as emulators.
// The connection is configured with the given ClientOptions.
func DialInsecure(ctx context.Context, opts ...option.ClientOption) (*grpc.ClientConn, error) {
- var o internal.DialSettings
- for _, opt := range opts {
- opt.Apply(&o)
+ o, err := processAndValidateOpts(opts)
+ if err != nil {
+ return nil, err
}
return dial(ctx, true, o)
}
@@ -67,12 +70,15 @@
//
// This API is subject to change as we further refine requirements. It will go away if gRPC stubs accept an interface instead of the concrete ClientConn type. See https://github.com/grpc/grpc-go/issues/1287.
func DialPool(ctx context.Context, opts ...option.ClientOption) (ConnPool, error) {
- var o internal.DialSettings
- for _, opt := range opts {
- opt.Apply(&o)
+ o, err := processAndValidateOpts(opts)
+ if err != nil {
+ return nil, err
}
- poolSize := o.GRPCConnPool
- o.GRPCConnPool = 0 // we don't *need* to set this to zero, but it's safe to.
+ if o.GRPCConnPool != nil {
+ return o.GRPCConnPool, nil
+ }
+ poolSize := o.GRPCConnPoolSize
+ o.GRPCConnPoolSize = 0 // we don't *need* to set this to zero, but it's safe to.
if poolSize == 0 || poolSize == 1 {
// Fast path for common case for a connection pool with a single connection.
@@ -95,10 +101,7 @@
return pool, nil
}
-func dial(ctx context.Context, insecure bool, o internal.DialSettings) (*grpc.ClientConn, error) {
- if err := o.Validate(); err != nil {
- return nil, err
- }
+func dial(ctx context.Context, insecure bool, o *internal.DialSettings) (*grpc.ClientConn, error) {
if o.HTTPClient != nil {
return nil, errors.New("unsupported HTTP client specified")
}
@@ -112,7 +115,7 @@
if o.APIKey != "" {
log.Print("API keys are not supported for gRPC APIs. Remove the WithAPIKey option from your client-creating call.")
}
- creds, err := internal.Creds(ctx, &o)
+ creds, err := internal.Creds(ctx, o)
if err != nil {
return nil, err
}
@@ -180,7 +183,7 @@
return grpc.DialContext(ctx, o.Endpoint, grpcOpts...)
}
-func addOCStatsHandler(opts []grpc.DialOption, settings internal.DialSettings) []grpc.DialOption {
+func addOCStatsHandler(opts []grpc.DialOption, settings *internal.DialSettings) []grpc.DialOption {
if settings.TelemetryDisabled {
return opts
}
@@ -255,3 +258,30 @@
}
return false
}
+
+func processAndValidateOpts(opts []option.ClientOption) (*internal.DialSettings, error) {
+ var o internal.DialSettings
+ for _, opt := range opts {
+ opt.Apply(&o)
+ }
+ if err := o.Validate(); err != nil {
+ return nil, err
+ }
+ return &o, nil
+}
+
+type connPoolOption struct{ ConnPool }
+
+// WithConnPool returns a ClientOption that specifies the ConnPool
+// connection to use as the basis of communications.
+//
+// This is only to be used by Google client libraries internally, for example
+// when creating a longrunning API client that shares the same connection pool
+// as a service client.
+func WithConnPool(p ConnPool) option.ClientOption {
+ return connPoolOption{p}
+}
+
+func (o connPoolOption) Apply(s *internal.DialSettings) {
+ s.GRPCConnPool = o.ConnPool
+}
diff --git a/transport/grpc/pool.go b/transport/grpc/pool.go
index 9d6143f..32c0293 100644
--- a/transport/grpc/pool.go
+++ b/transport/grpc/pool.go
@@ -9,56 +9,23 @@
"fmt"
"sync/atomic"
+ "google.golang.org/api/internal"
"google.golang.org/grpc"
)
// ConnPool is a pool of grpc.ClientConns.
-type ConnPool interface {
- // Conn returns a ClientConn from the pool.
- //
- // Conns aren't returned to the pool.
- Conn() *grpc.ClientConn
-
- // Num returns the number of connections in the pool.
- //
- // It will always return the same value.
- Num() int
-
- // Close closes every ClientConn in the pool.
- //
- // The error returned by Close may be a single error or multiple errors.
- Close() error
-
- grpc.ClientConnInterface
-}
+type ConnPool = internal.ConnPool // NOTE(cbro): type alias to export the type. It must live in internal to avoid a circular dependency.
var _ ConnPool = &roundRobinConnPool{}
var _ ConnPool = &singleConnPool{}
// singleConnPool is a special case for a single connection.
type singleConnPool struct {
- conn *grpc.ClientConn
+ *grpc.ClientConn
}
-func (p *singleConnPool) Conn() *grpc.ClientConn {
- return p.conn
-}
-
-func (p *singleConnPool) Num() int {
- return 1
-}
-
-func (p *singleConnPool) Close() error {
- return p.conn.Close()
-}
-
-func (p *singleConnPool) Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...grpc.CallOption) error {
- return p.conn.Invoke(ctx, method, args, reply, opts...)
-}
-
-func (p *singleConnPool) NewStream(ctx context.Context, desc *grpc.StreamDesc, method string, opts ...grpc.CallOption) (grpc.ClientStream, error) {
- return p.conn.NewStream(ctx, desc, method, opts...)
-}
+func (p *singleConnPool) Conn() *grpc.ClientConn { return p.ClientConn }
+func (p *singleConnPool) Num() int { return 1 }
type roundRobinConnPool struct {
conns []*grpc.ClientConn