pubsub: Set gRPC send and receive limits for publish and pull, respectively.

Fixes #676.

Change-Id: Id5d0b2c50a838c927aff6eb60ef4539f88506338
Reviewed-on: https://code-review.googlesource.com/14091
Reviewed-by: kokoro <noreply+kokoro@google.com>
Reviewed-by: Jonathan Amsterdam <jba@google.com>
diff --git a/pubsub/service.go b/pubsub/service.go
index ade1bfe..611feb1 100644
--- a/pubsub/service.go
+++ b/pubsub/service.go
@@ -27,6 +27,7 @@
 	"cloud.google.com/go/internal/version"
 	vkit "cloud.google.com/go/pubsub/apiv1"
 	durpb "github.com/golang/protobuf/ptypes/duration"
+	gax "github.com/googleapis/gax-go"
 	"golang.org/x/net/context"
 	"google.golang.org/api/option"
 	pb "google.golang.org/genproto/googleapis/pubsub/v1"
@@ -255,6 +256,7 @@
 	maxPayload       = 512 * 1024
 	reqFixedOverhead = 100
 	overheadPerID    = 3
+	maxSendRecvBytes = 20 * 1024 * 1024 // 20M
 )
 
 // splitAckIDs splits ids into two slices, the first of which contains at most maxPayload bytes of ackID data.
@@ -280,7 +282,7 @@
 	resp, err := s.subc.Pull(ctx, &pb.PullRequest{
 		Subscription: subName,
 		MaxMessages:  maxMessages,
-	})
+	}, gax.WithGRPCOptions(grpc.MaxCallRecvMsgSize(maxSendRecvBytes)))
 	if err != nil {
 		return nil, err
 	}
@@ -310,7 +312,7 @@
 	resp, err := s.pubc.Publish(ctx, &pb.PublishRequest{
 		Topic:    topicName,
 		Messages: rawMsgs,
-	})
+	}, gax.WithGRPCOptions(grpc.MaxCallSendMsgSize(maxSendRecvBytes)))
 	if err != nil {
 		return nil, err
 	}