pubsub: add benchwrapper
Change-Id: I0eec480d2955045a55fad7c8c6fa48d01be5ac8e
Reviewed-on: https://code-review.googlesource.com/c/gocloud/+/46250
Reviewed-by: kokoro <noreply+kokoro@google.com>
Reviewed-by: Jean de Klerk <deklerk@google.com>
diff --git a/pubsub/go.mod b/pubsub/go.mod
index 3d1f83c..763aa6f 100644
--- a/pubsub/go.mod
+++ b/pubsub/go.mod
@@ -4,6 +4,7 @@
require (
cloud.google.com/go v0.46.3
+ cloud.google.com/go/spanner v1.0.0 // indirect
cloud.google.com/go/storage v1.0.0 // indirect
github.com/golang/protobuf v1.3.2
github.com/google/go-cmp v0.3.0
diff --git a/pubsub/go.sum b/pubsub/go.sum
index 8bfbb90..8ae7130 100644
--- a/pubsub/go.sum
+++ b/pubsub/go.sum
@@ -5,6 +5,7 @@
cloud.google.com/go v0.44.2/go.mod h1:60680Gw3Yr4ikxnPRS/oxxkBccT6SA1yMk63TGekxKY=
cloud.google.com/go v0.45.1 h1:lRi0CHyU+ytlvylOlFKKq0af6JncuyoRh1J+QJBqQx0=
cloud.google.com/go v0.45.1/go.mod h1:RpBamKRgapWJb87xiFSdk4g1CME7QZg3uwTez+TSTjc=
+cloud.google.com/go v0.46.2/go.mod h1:a6bKKbmY7er1mI7TEI4lsAkts/mkhTSZK8w33B4RAg0=
cloud.google.com/go v0.46.3 h1:AVXDdKsrtX33oR9fbCMu/+c1o8Ofjq6Ku/MInaLVg5Y=
cloud.google.com/go v0.46.3/go.mod h1:a6bKKbmY7er1mI7TEI4lsAkts/mkhTSZK8w33B4RAg0=
cloud.google.com/go/bigquery v1.0.1 h1:hL+ycaJpVE9M7nLoiXb/Pn10ENE2u+oddxbD8uu0ZVU=
@@ -12,6 +13,8 @@
cloud.google.com/go/datastore v1.0.0 h1:Kt+gOPPp2LEPWp8CSfxhsM8ik9CcyE/gYu+0r+RnZvM=
cloud.google.com/go/datastore v1.0.0/go.mod h1:LXYbyblFSglQ5pkeyhO+Qmw7ukd3C+pD7TKLgZqpHYE=
cloud.google.com/go/pubsub v1.0.1/go.mod h1:R0Gpsv3s54REJCy4fxDixWD93lHJMoZTyQ2kNxGRt3I=
+cloud.google.com/go/spanner v1.0.0 h1:jLKThep5kbWLeBhLgtEfm/OPT08n1z7itVTR82WUBQg=
+cloud.google.com/go/spanner v1.0.0/go.mod h1:z7t0U9rMHnkwMx9CZr/AVr3h60tTWRyR4n17+emFjFE=
cloud.google.com/go/storage v1.0.0 h1:VV2nUM3wwLLGh9lSABFgZMjInyUbJeaRSE64WuAIQ+4=
cloud.google.com/go/storage v1.0.0/go.mod h1:IhtSnM/ZTZV8YYJWCY8RULGVqBDmpoyjwiyrjsg+URw=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
diff --git a/pubsub/internal/benchwrapper/README.md b/pubsub/internal/benchwrapper/README.md
new file mode 100644
index 0000000..49e5983
--- /dev/null
+++ b/pubsub/internal/benchwrapper/README.md
@@ -0,0 +1,12 @@
+# Benchwrapper
+
+A small gRPC wrapper around the pubsub client library. This allows the
+benchmarking code to prod at pubsub without speaking Go.
+
+## Running
+
+```bash
+cd pubsub/internal/benchwrapper
+export PUBSUB_EMULATOR_HOST=localhost:8080
+go run *.go --port=8081
+```
diff --git a/pubsub/internal/benchwrapper/main.go b/pubsub/internal/benchwrapper/main.go
new file mode 100644
index 0000000..f9a05c4
--- /dev/null
+++ b/pubsub/internal/benchwrapper/main.go
@@ -0,0 +1,83 @@
+// Copyright 2019 Google LLC
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package main wraps the client library in a gRPC interface that a benchmarker
+// can communicate through.
+package main
+
+import (
+ "context"
+ "flag"
+ "fmt"
+ "log"
+ "net"
+ "os"
+ "strings"
+
+ "cloud.google.com/go/pubsub"
+ pb "cloud.google.com/go/pubsub/internal/benchwrapper/proto"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/status"
+)
+
+var port = flag.String("port", "", "specify a port to run on")
+
+func main() {
+ flag.Parse()
+ if *port == "" {
+ log.Fatalf("usage: %s --port=8081", os.Args[0])
+ }
+
+ if os.Getenv("PUBSUB_EMULATOR_HOST") == "" {
+ log.Fatal("This benchmarking server only works when connected to an emulator. Please set PUBSUB_EMULATOR_HOST.")
+ }
+
+ ctx := context.Background()
+ c, err := pubsub.NewClient(ctx, "someproject")
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ lis, err := net.Listen("tcp", fmt.Sprintf(":%s", *port))
+ if err != nil {
+ log.Fatal(err)
+ }
+ s := grpc.NewServer()
+ pb.RegisterPubsubBenchWrapperServer(s, &server{
+ c: c,
+ })
+ log.Printf("Running on localhost:%s\n", *port)
+ log.Fatal(s.Serve(lis))
+}
+
+type server struct {
+ c *pubsub.Client
+}
+
+func (s *server) Recv(ctx context.Context, req *pb.PubsubRecv) (*pb.EmptyResponse, error) {
+ sub := s.c.Subscription(req.SubName)
+ err := sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
+ msg.Ack()
+ })
+
+ if err != nil {
+ s, _ := status.FromError(err)
+ // Return success on server initiated EOF, which is expected.
+ if strings.Contains(s.Message(), "EOF") {
+ return &pb.EmptyResponse{}, nil
+ }
+ return nil, err
+ }
+ return &pb.EmptyResponse{}, nil
+}
diff --git a/pubsub/internal/benchwrapper/proto/README.md b/pubsub/internal/benchwrapper/proto/README.md
new file mode 100644
index 0000000..a468ca8
--- /dev/null
+++ b/pubsub/internal/benchwrapper/proto/README.md
@@ -0,0 +1,6 @@
+# Regenerating protos
+
+```
+cd pubsub/internal/benchwrapper/proto
+protoc --go_out=plugins=grpc:. *.proto
+```
diff --git a/pubsub/internal/benchwrapper/proto/pubsub.pb.go b/pubsub/internal/benchwrapper/proto/pubsub.pb.go
new file mode 100644
index 0000000..a28c08d
--- /dev/null
+++ b/pubsub/internal/benchwrapper/proto/pubsub.pb.go
@@ -0,0 +1,201 @@
+// Code generated by protoc-gen-go. DO NOT EDIT.
+// source: pubsub.proto
+
+package pubsub_bench
+
+import (
+ context "context"
+ fmt "fmt"
+ math "math"
+
+ proto "github.com/golang/protobuf/proto"
+ grpc "google.golang.org/grpc"
+ codes "google.golang.org/grpc/codes"
+ status "google.golang.org/grpc/status"
+)
+
+// Reference imports to suppress errors if they are not otherwise used.
+var _ = proto.Marshal
+var _ = fmt.Errorf
+var _ = math.Inf
+
+// This is a compile-time assertion to ensure that this generated file
+// is compatible with the proto package it is being compiled against.
+// A compilation error at this line likely means your copy of the
+// proto package needs to be updated.
+const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
+
+type PubsubRecv struct {
+ // The subscription identifier corresponding to number of messages sent.
+ SubName string `protobuf:"bytes,1,opt,name=sub_name,json=subName,proto3" json:"sub_name,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *PubsubRecv) Reset() { *m = PubsubRecv{} }
+func (m *PubsubRecv) String() string { return proto.CompactTextString(m) }
+func (*PubsubRecv) ProtoMessage() {}
+func (*PubsubRecv) Descriptor() ([]byte, []int) {
+ return fileDescriptor_91df006b05e20cf7, []int{0}
+}
+
+func (m *PubsubRecv) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_PubsubRecv.Unmarshal(m, b)
+}
+func (m *PubsubRecv) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_PubsubRecv.Marshal(b, m, deterministic)
+}
+func (m *PubsubRecv) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_PubsubRecv.Merge(m, src)
+}
+func (m *PubsubRecv) XXX_Size() int {
+ return xxx_messageInfo_PubsubRecv.Size(m)
+}
+func (m *PubsubRecv) XXX_DiscardUnknown() {
+ xxx_messageInfo_PubsubRecv.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_PubsubRecv proto.InternalMessageInfo
+
+func (m *PubsubRecv) GetSubName() string {
+ if m != nil {
+ return m.SubName
+ }
+ return ""
+}
+
+// TODO(deklerk): Replace with Google's canonical Empty.
+type EmptyResponse struct {
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *EmptyResponse) Reset() { *m = EmptyResponse{} }
+func (m *EmptyResponse) String() string { return proto.CompactTextString(m) }
+func (*EmptyResponse) ProtoMessage() {}
+func (*EmptyResponse) Descriptor() ([]byte, []int) {
+ return fileDescriptor_91df006b05e20cf7, []int{1}
+}
+
+func (m *EmptyResponse) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_EmptyResponse.Unmarshal(m, b)
+}
+func (m *EmptyResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_EmptyResponse.Marshal(b, m, deterministic)
+}
+func (m *EmptyResponse) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_EmptyResponse.Merge(m, src)
+}
+func (m *EmptyResponse) XXX_Size() int {
+ return xxx_messageInfo_EmptyResponse.Size(m)
+}
+func (m *EmptyResponse) XXX_DiscardUnknown() {
+ xxx_messageInfo_EmptyResponse.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_EmptyResponse proto.InternalMessageInfo
+
+func init() {
+ proto.RegisterType((*PubsubRecv)(nil), "pubsub_bench.PubsubRecv")
+ proto.RegisterType((*EmptyResponse)(nil), "pubsub_bench.EmptyResponse")
+}
+
+func init() { proto.RegisterFile("pubsub.proto", fileDescriptor_91df006b05e20cf7) }
+
+var fileDescriptor_91df006b05e20cf7 = []byte{
+ // 147 bytes of a gzipped FileDescriptorProto
+ 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x29, 0x28, 0x4d, 0x2a,
+ 0x2e, 0x4d, 0xd2, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x82, 0xf2, 0xe2, 0x93, 0x52, 0xf3, 0x92,
+ 0x33, 0x94, 0xd4, 0xb9, 0xb8, 0x02, 0xc0, 0xfc, 0xa0, 0xd4, 0xe4, 0x32, 0x21, 0x49, 0x2e, 0x0e,
+ 0x90, 0x54, 0x5e, 0x62, 0x6e, 0xaa, 0x04, 0xa3, 0x02, 0xa3, 0x06, 0x67, 0x10, 0x7b, 0x71, 0x69,
+ 0x92, 0x5f, 0x62, 0x6e, 0xaa, 0x12, 0x3f, 0x17, 0xaf, 0x6b, 0x6e, 0x41, 0x49, 0x65, 0x50, 0x6a,
+ 0x71, 0x41, 0x7e, 0x5e, 0x71, 0xaa, 0x51, 0x28, 0x97, 0x10, 0x44, 0xa7, 0x13, 0xc8, 0xa0, 0xf0,
+ 0xa2, 0xc4, 0x82, 0x82, 0xd4, 0x22, 0x21, 0x7b, 0x2e, 0x16, 0xb0, 0x49, 0x12, 0x7a, 0xc8, 0xd6,
+ 0xe8, 0x21, 0xec, 0x90, 0x92, 0x46, 0x95, 0x41, 0x31, 0x54, 0x89, 0xc1, 0x89, 0x29, 0x80, 0x31,
+ 0x89, 0x0d, 0xec, 0x52, 0x63, 0x40, 0x00, 0x00, 0x00, 0xff, 0xff, 0xe3, 0xea, 0xe3, 0xed, 0xb9,
+ 0x00, 0x00, 0x00,
+}
+
+// Reference imports to suppress errors if they are not otherwise used.
+var _ context.Context
+var _ grpc.ClientConn
+
+// This is a compile-time assertion to ensure that this generated file
+// is compatible with the grpc package it is being compiled against.
+const _ = grpc.SupportPackageIsVersion4
+
+// PubsubBenchWrapperClient is the client API for PubsubBenchWrapper service.
+//
+// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
+type PubsubBenchWrapperClient interface {
+ // Recv represents opening a streaming pull stream to receive messages on.
+ Recv(ctx context.Context, in *PubsubRecv, opts ...grpc.CallOption) (*EmptyResponse, error)
+}
+
+type pubsubBenchWrapperClient struct {
+ cc *grpc.ClientConn
+}
+
+func NewPubsubBenchWrapperClient(cc *grpc.ClientConn) PubsubBenchWrapperClient {
+ return &pubsubBenchWrapperClient{cc}
+}
+
+func (c *pubsubBenchWrapperClient) Recv(ctx context.Context, in *PubsubRecv, opts ...grpc.CallOption) (*EmptyResponse, error) {
+ out := new(EmptyResponse)
+ err := c.cc.Invoke(ctx, "/pubsub_bench.PubsubBenchWrapper/Recv", in, out, opts...)
+ if err != nil {
+ return nil, err
+ }
+ return out, nil
+}
+
+// PubsubBenchWrapperServer is the server API for PubsubBenchWrapper service.
+type PubsubBenchWrapperServer interface {
+ // Recv represents opening a streaming pull stream to receive messages on.
+ Recv(context.Context, *PubsubRecv) (*EmptyResponse, error)
+}
+
+// UnimplementedPubsubBenchWrapperServer can be embedded to have forward compatible implementations.
+type UnimplementedPubsubBenchWrapperServer struct {
+}
+
+func (*UnimplementedPubsubBenchWrapperServer) Recv(ctx context.Context, req *PubsubRecv) (*EmptyResponse, error) {
+ return nil, status.Errorf(codes.Unimplemented, "method Recv not implemented")
+}
+
+func RegisterPubsubBenchWrapperServer(s *grpc.Server, srv PubsubBenchWrapperServer) {
+ s.RegisterService(&_PubsubBenchWrapper_serviceDesc, srv)
+}
+
+func _PubsubBenchWrapper_Recv_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
+ in := new(PubsubRecv)
+ if err := dec(in); err != nil {
+ return nil, err
+ }
+ if interceptor == nil {
+ return srv.(PubsubBenchWrapperServer).Recv(ctx, in)
+ }
+ info := &grpc.UnaryServerInfo{
+ Server: srv,
+ FullMethod: "/pubsub_bench.PubsubBenchWrapper/Recv",
+ }
+ handler := func(ctx context.Context, req interface{}) (interface{}, error) {
+ return srv.(PubsubBenchWrapperServer).Recv(ctx, req.(*PubsubRecv))
+ }
+ return interceptor(ctx, in, info, handler)
+}
+
+var _PubsubBenchWrapper_serviceDesc = grpc.ServiceDesc{
+ ServiceName: "pubsub_bench.PubsubBenchWrapper",
+ HandlerType: (*PubsubBenchWrapperServer)(nil),
+ Methods: []grpc.MethodDesc{
+ {
+ MethodName: "Recv",
+ Handler: _PubsubBenchWrapper_Recv_Handler,
+ },
+ },
+ Streams: []grpc.StreamDesc{},
+ Metadata: "pubsub.proto",
+}
diff --git a/pubsub/internal/benchwrapper/proto/pubsub.proto b/pubsub/internal/benchwrapper/proto/pubsub.proto
new file mode 100644
index 0000000..efb37e2
--- /dev/null
+++ b/pubsub/internal/benchwrapper/proto/pubsub.proto
@@ -0,0 +1,32 @@
+// Copyright 2019 Google LLC
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto3";
+
+package pubsub_bench;
+
+option java_multiple_files = true;
+
+message PubsubRecv {
+ // The subscription identifier corresponding to number of messages sent.
+ string sub_name = 1;
+}
+
+// TODO(deklerk): Replace with Google's canonical Empty.
+message EmptyResponse {}
+
+service PubsubBenchWrapper {
+ // Recv represents opening a streaming pull stream to receive messages on.
+ rpc Recv(PubsubRecv) returns (EmptyResponse) {}
+}
\ No newline at end of file