pubsub: add benchwrapper

Change-Id: I0eec480d2955045a55fad7c8c6fa48d01be5ac8e
Reviewed-on: https://code-review.googlesource.com/c/gocloud/+/46250
Reviewed-by: kokoro <noreply+kokoro@google.com>
Reviewed-by: Jean de Klerk <deklerk@google.com>
diff --git a/pubsub/go.mod b/pubsub/go.mod
index 3d1f83c..763aa6f 100644
--- a/pubsub/go.mod
+++ b/pubsub/go.mod
@@ -4,6 +4,7 @@
 
 require (
 	cloud.google.com/go v0.46.3
+	cloud.google.com/go/spanner v1.0.0 // indirect
 	cloud.google.com/go/storage v1.0.0 // indirect
 	github.com/golang/protobuf v1.3.2
 	github.com/google/go-cmp v0.3.0
diff --git a/pubsub/go.sum b/pubsub/go.sum
index 8bfbb90..8ae7130 100644
--- a/pubsub/go.sum
+++ b/pubsub/go.sum
@@ -5,6 +5,7 @@
 cloud.google.com/go v0.44.2/go.mod h1:60680Gw3Yr4ikxnPRS/oxxkBccT6SA1yMk63TGekxKY=
 cloud.google.com/go v0.45.1 h1:lRi0CHyU+ytlvylOlFKKq0af6JncuyoRh1J+QJBqQx0=
 cloud.google.com/go v0.45.1/go.mod h1:RpBamKRgapWJb87xiFSdk4g1CME7QZg3uwTez+TSTjc=
+cloud.google.com/go v0.46.2/go.mod h1:a6bKKbmY7er1mI7TEI4lsAkts/mkhTSZK8w33B4RAg0=
 cloud.google.com/go v0.46.3 h1:AVXDdKsrtX33oR9fbCMu/+c1o8Ofjq6Ku/MInaLVg5Y=
 cloud.google.com/go v0.46.3/go.mod h1:a6bKKbmY7er1mI7TEI4lsAkts/mkhTSZK8w33B4RAg0=
 cloud.google.com/go/bigquery v1.0.1 h1:hL+ycaJpVE9M7nLoiXb/Pn10ENE2u+oddxbD8uu0ZVU=
@@ -12,6 +13,8 @@
 cloud.google.com/go/datastore v1.0.0 h1:Kt+gOPPp2LEPWp8CSfxhsM8ik9CcyE/gYu+0r+RnZvM=
 cloud.google.com/go/datastore v1.0.0/go.mod h1:LXYbyblFSglQ5pkeyhO+Qmw7ukd3C+pD7TKLgZqpHYE=
 cloud.google.com/go/pubsub v1.0.1/go.mod h1:R0Gpsv3s54REJCy4fxDixWD93lHJMoZTyQ2kNxGRt3I=
+cloud.google.com/go/spanner v1.0.0 h1:jLKThep5kbWLeBhLgtEfm/OPT08n1z7itVTR82WUBQg=
+cloud.google.com/go/spanner v1.0.0/go.mod h1:z7t0U9rMHnkwMx9CZr/AVr3h60tTWRyR4n17+emFjFE=
 cloud.google.com/go/storage v1.0.0 h1:VV2nUM3wwLLGh9lSABFgZMjInyUbJeaRSE64WuAIQ+4=
 cloud.google.com/go/storage v1.0.0/go.mod h1:IhtSnM/ZTZV8YYJWCY8RULGVqBDmpoyjwiyrjsg+URw=
 github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
diff --git a/pubsub/internal/benchwrapper/README.md b/pubsub/internal/benchwrapper/README.md
new file mode 100644
index 0000000..49e5983
--- /dev/null
+++ b/pubsub/internal/benchwrapper/README.md
@@ -0,0 +1,12 @@
+# Benchwrapper
+
+A small gRPC wrapper around the pubsub client library. This allows the
+benchmarking code to prod at pubsub without speaking Go.
+
+## Running
+
+```bash
+cd pubsub/internal/benchwrapper
+export PUBSUB_EMULATOR_HOST=localhost:8080
+go run *.go --port=8081
+```
diff --git a/pubsub/internal/benchwrapper/main.go b/pubsub/internal/benchwrapper/main.go
new file mode 100644
index 0000000..f9a05c4
--- /dev/null
+++ b/pubsub/internal/benchwrapper/main.go
@@ -0,0 +1,83 @@
+// Copyright 2019 Google LLC
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package main wraps the client library in a gRPC interface that a benchmarker
+// can communicate through.
+package main
+
+import (
+	"context"
+	"flag"
+	"fmt"
+	"log"
+	"net"
+	"os"
+	"strings"
+
+	"cloud.google.com/go/pubsub"
+	pb "cloud.google.com/go/pubsub/internal/benchwrapper/proto"
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/status"
+)
+
+var port = flag.String("port", "", "specify a port to run on")
+
+func main() {
+	flag.Parse()
+	if *port == "" {
+		log.Fatalf("usage: %s --port=8081", os.Args[0])
+	}
+
+	if os.Getenv("PUBSUB_EMULATOR_HOST") == "" {
+		log.Fatal("This benchmarking server only works when connected to an emulator. Please set PUBSUB_EMULATOR_HOST.")
+	}
+
+	ctx := context.Background()
+	c, err := pubsub.NewClient(ctx, "someproject")
+	if err != nil {
+		log.Fatal(err)
+	}
+
+	lis, err := net.Listen("tcp", fmt.Sprintf(":%s", *port))
+	if err != nil {
+		log.Fatal(err)
+	}
+	s := grpc.NewServer()
+	pb.RegisterPubsubBenchWrapperServer(s, &server{
+		c: c,
+	})
+	log.Printf("Running on localhost:%s\n", *port)
+	log.Fatal(s.Serve(lis))
+}
+
+type server struct {
+	c *pubsub.Client
+}
+
+func (s *server) Recv(ctx context.Context, req *pb.PubsubRecv) (*pb.EmptyResponse, error) {
+	sub := s.c.Subscription(req.SubName)
+	err := sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
+		msg.Ack()
+	})
+
+	if err != nil {
+		s, _ := status.FromError(err)
+		// Return success on server initiated EOF, which is expected.
+		if strings.Contains(s.Message(), "EOF") {
+			return &pb.EmptyResponse{}, nil
+		}
+		return nil, err
+	}
+	return &pb.EmptyResponse{}, nil
+}
diff --git a/pubsub/internal/benchwrapper/proto/README.md b/pubsub/internal/benchwrapper/proto/README.md
new file mode 100644
index 0000000..a468ca8
--- /dev/null
+++ b/pubsub/internal/benchwrapper/proto/README.md
@@ -0,0 +1,6 @@
+# Regenerating protos
+
+```
+cd pubsub/internal/benchwrapper/proto
+protoc --go_out=plugins=grpc:. *.proto
+```
diff --git a/pubsub/internal/benchwrapper/proto/pubsub.pb.go b/pubsub/internal/benchwrapper/proto/pubsub.pb.go
new file mode 100644
index 0000000..a28c08d
--- /dev/null
+++ b/pubsub/internal/benchwrapper/proto/pubsub.pb.go
@@ -0,0 +1,201 @@
+// Code generated by protoc-gen-go. DO NOT EDIT.
+// source: pubsub.proto
+
+package pubsub_bench
+
+import (
+	context "context"
+	fmt "fmt"
+	math "math"
+
+	proto "github.com/golang/protobuf/proto"
+	grpc "google.golang.org/grpc"
+	codes "google.golang.org/grpc/codes"
+	status "google.golang.org/grpc/status"
+)
+
+// Reference imports to suppress errors if they are not otherwise used.
+var _ = proto.Marshal
+var _ = fmt.Errorf
+var _ = math.Inf
+
+// This is a compile-time assertion to ensure that this generated file
+// is compatible with the proto package it is being compiled against.
+// A compilation error at this line likely means your copy of the
+// proto package needs to be updated.
+const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
+
+type PubsubRecv struct {
+	// The subscription identifier corresponding to number of messages sent.
+	SubName              string   `protobuf:"bytes,1,opt,name=sub_name,json=subName,proto3" json:"sub_name,omitempty"`
+	XXX_NoUnkeyedLiteral struct{} `json:"-"`
+	XXX_unrecognized     []byte   `json:"-"`
+	XXX_sizecache        int32    `json:"-"`
+}
+
+func (m *PubsubRecv) Reset()         { *m = PubsubRecv{} }
+func (m *PubsubRecv) String() string { return proto.CompactTextString(m) }
+func (*PubsubRecv) ProtoMessage()    {}
+func (*PubsubRecv) Descriptor() ([]byte, []int) {
+	return fileDescriptor_91df006b05e20cf7, []int{0}
+}
+
+func (m *PubsubRecv) XXX_Unmarshal(b []byte) error {
+	return xxx_messageInfo_PubsubRecv.Unmarshal(m, b)
+}
+func (m *PubsubRecv) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+	return xxx_messageInfo_PubsubRecv.Marshal(b, m, deterministic)
+}
+func (m *PubsubRecv) XXX_Merge(src proto.Message) {
+	xxx_messageInfo_PubsubRecv.Merge(m, src)
+}
+func (m *PubsubRecv) XXX_Size() int {
+	return xxx_messageInfo_PubsubRecv.Size(m)
+}
+func (m *PubsubRecv) XXX_DiscardUnknown() {
+	xxx_messageInfo_PubsubRecv.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_PubsubRecv proto.InternalMessageInfo
+
+func (m *PubsubRecv) GetSubName() string {
+	if m != nil {
+		return m.SubName
+	}
+	return ""
+}
+
+// TODO(deklerk): Replace with Google's canonical Empty.
+type EmptyResponse struct {
+	XXX_NoUnkeyedLiteral struct{} `json:"-"`
+	XXX_unrecognized     []byte   `json:"-"`
+	XXX_sizecache        int32    `json:"-"`
+}
+
+func (m *EmptyResponse) Reset()         { *m = EmptyResponse{} }
+func (m *EmptyResponse) String() string { return proto.CompactTextString(m) }
+func (*EmptyResponse) ProtoMessage()    {}
+func (*EmptyResponse) Descriptor() ([]byte, []int) {
+	return fileDescriptor_91df006b05e20cf7, []int{1}
+}
+
+func (m *EmptyResponse) XXX_Unmarshal(b []byte) error {
+	return xxx_messageInfo_EmptyResponse.Unmarshal(m, b)
+}
+func (m *EmptyResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+	return xxx_messageInfo_EmptyResponse.Marshal(b, m, deterministic)
+}
+func (m *EmptyResponse) XXX_Merge(src proto.Message) {
+	xxx_messageInfo_EmptyResponse.Merge(m, src)
+}
+func (m *EmptyResponse) XXX_Size() int {
+	return xxx_messageInfo_EmptyResponse.Size(m)
+}
+func (m *EmptyResponse) XXX_DiscardUnknown() {
+	xxx_messageInfo_EmptyResponse.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_EmptyResponse proto.InternalMessageInfo
+
+func init() {
+	proto.RegisterType((*PubsubRecv)(nil), "pubsub_bench.PubsubRecv")
+	proto.RegisterType((*EmptyResponse)(nil), "pubsub_bench.EmptyResponse")
+}
+
+func init() { proto.RegisterFile("pubsub.proto", fileDescriptor_91df006b05e20cf7) }
+
+var fileDescriptor_91df006b05e20cf7 = []byte{
+	// 147 bytes of a gzipped FileDescriptorProto
+	0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x29, 0x28, 0x4d, 0x2a,
+	0x2e, 0x4d, 0xd2, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x82, 0xf2, 0xe2, 0x93, 0x52, 0xf3, 0x92,
+	0x33, 0x94, 0xd4, 0xb9, 0xb8, 0x02, 0xc0, 0xfc, 0xa0, 0xd4, 0xe4, 0x32, 0x21, 0x49, 0x2e, 0x0e,
+	0x90, 0x54, 0x5e, 0x62, 0x6e, 0xaa, 0x04, 0xa3, 0x02, 0xa3, 0x06, 0x67, 0x10, 0x7b, 0x71, 0x69,
+	0x92, 0x5f, 0x62, 0x6e, 0xaa, 0x12, 0x3f, 0x17, 0xaf, 0x6b, 0x6e, 0x41, 0x49, 0x65, 0x50, 0x6a,
+	0x71, 0x41, 0x7e, 0x5e, 0x71, 0xaa, 0x51, 0x28, 0x97, 0x10, 0x44, 0xa7, 0x13, 0xc8, 0xa0, 0xf0,
+	0xa2, 0xc4, 0x82, 0x82, 0xd4, 0x22, 0x21, 0x7b, 0x2e, 0x16, 0xb0, 0x49, 0x12, 0x7a, 0xc8, 0xd6,
+	0xe8, 0x21, 0xec, 0x90, 0x92, 0x46, 0x95, 0x41, 0x31, 0x54, 0x89, 0xc1, 0x89, 0x29, 0x80, 0x31,
+	0x89, 0x0d, 0xec, 0x52, 0x63, 0x40, 0x00, 0x00, 0x00, 0xff, 0xff, 0xe3, 0xea, 0xe3, 0xed, 0xb9,
+	0x00, 0x00, 0x00,
+}
+
+// Reference imports to suppress errors if they are not otherwise used.
+var _ context.Context
+var _ grpc.ClientConn
+
+// This is a compile-time assertion to ensure that this generated file
+// is compatible with the grpc package it is being compiled against.
+const _ = grpc.SupportPackageIsVersion4
+
+// PubsubBenchWrapperClient is the client API for PubsubBenchWrapper service.
+//
+// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
+type PubsubBenchWrapperClient interface {
+	// Recv represents opening a streaming pull stream to receive messages on.
+	Recv(ctx context.Context, in *PubsubRecv, opts ...grpc.CallOption) (*EmptyResponse, error)
+}
+
+type pubsubBenchWrapperClient struct {
+	cc *grpc.ClientConn
+}
+
+func NewPubsubBenchWrapperClient(cc *grpc.ClientConn) PubsubBenchWrapperClient {
+	return &pubsubBenchWrapperClient{cc}
+}
+
+func (c *pubsubBenchWrapperClient) Recv(ctx context.Context, in *PubsubRecv, opts ...grpc.CallOption) (*EmptyResponse, error) {
+	out := new(EmptyResponse)
+	err := c.cc.Invoke(ctx, "/pubsub_bench.PubsubBenchWrapper/Recv", in, out, opts...)
+	if err != nil {
+		return nil, err
+	}
+	return out, nil
+}
+
+// PubsubBenchWrapperServer is the server API for PubsubBenchWrapper service.
+type PubsubBenchWrapperServer interface {
+	// Recv represents opening a streaming pull stream to receive messages on.
+	Recv(context.Context, *PubsubRecv) (*EmptyResponse, error)
+}
+
+// UnimplementedPubsubBenchWrapperServer can be embedded to have forward compatible implementations.
+type UnimplementedPubsubBenchWrapperServer struct {
+}
+
+func (*UnimplementedPubsubBenchWrapperServer) Recv(ctx context.Context, req *PubsubRecv) (*EmptyResponse, error) {
+	return nil, status.Errorf(codes.Unimplemented, "method Recv not implemented")
+}
+
+func RegisterPubsubBenchWrapperServer(s *grpc.Server, srv PubsubBenchWrapperServer) {
+	s.RegisterService(&_PubsubBenchWrapper_serviceDesc, srv)
+}
+
+func _PubsubBenchWrapper_Recv_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
+	in := new(PubsubRecv)
+	if err := dec(in); err != nil {
+		return nil, err
+	}
+	if interceptor == nil {
+		return srv.(PubsubBenchWrapperServer).Recv(ctx, in)
+	}
+	info := &grpc.UnaryServerInfo{
+		Server:     srv,
+		FullMethod: "/pubsub_bench.PubsubBenchWrapper/Recv",
+	}
+	handler := func(ctx context.Context, req interface{}) (interface{}, error) {
+		return srv.(PubsubBenchWrapperServer).Recv(ctx, req.(*PubsubRecv))
+	}
+	return interceptor(ctx, in, info, handler)
+}
+
+var _PubsubBenchWrapper_serviceDesc = grpc.ServiceDesc{
+	ServiceName: "pubsub_bench.PubsubBenchWrapper",
+	HandlerType: (*PubsubBenchWrapperServer)(nil),
+	Methods: []grpc.MethodDesc{
+		{
+			MethodName: "Recv",
+			Handler:    _PubsubBenchWrapper_Recv_Handler,
+		},
+	},
+	Streams:  []grpc.StreamDesc{},
+	Metadata: "pubsub.proto",
+}
diff --git a/pubsub/internal/benchwrapper/proto/pubsub.proto b/pubsub/internal/benchwrapper/proto/pubsub.proto
new file mode 100644
index 0000000..efb37e2
--- /dev/null
+++ b/pubsub/internal/benchwrapper/proto/pubsub.proto
@@ -0,0 +1,32 @@
+// Copyright 2019 Google LLC
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto3";
+
+package pubsub_bench;
+
+option java_multiple_files = true;
+
+message PubsubRecv {
+  // The subscription identifier corresponding to number of messages sent.
+  string sub_name = 1;
+}
+
+// TODO(deklerk): Replace with Google's canonical Empty.
+message EmptyResponse {}
+
+service PubsubBenchWrapper {
+  // Recv represents opening a streaming pull stream to receive messages on.
+  rpc Recv(PubsubRecv) returns (EmptyResponse) {}
+}
\ No newline at end of file