rpcreplay: file format and I/O
Recording writes a file that replaying will read. This CL describes
the file format and implements reading and writing. I use a proto
for Entry so the files can be accessed from other languages.
The messageOrError type may seem superfluous, but it will turn
out to be useful to support streaming.
Change-Id: I9d2a8892d7c213aba8f0692c56ccce717d6a0d92
Reviewed-on: https://code-review.googlesource.com/13750
Reviewed-by: kokoro <noreply+kokoro@google.com>
Reviewed-by: Sarah Adams <shadams@google.com>
diff --git a/internal/rpcreplay/Makefile b/internal/rpcreplay/Makefile
new file mode 100644
index 0000000..f41293f
--- /dev/null
+++ b/internal/rpcreplay/Makefile
@@ -0,0 +1,32 @@
+# Copyright 2017 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Makefile for building Go files from protos.
+
+# Change these to match your environment.
+PROTOC=$(HOME)/bin/protoc
+PROTOC_GO_PLUGIN_DIR=$(GOPATH)/bin
+PROTOBUF_REPO=$(HOME)/git-repos/protobuf
+
+gen-protos: sync-protobuf
+ for d in proto/*; do \
+ PATH=$(PATH):$(PROTOC_GO_PLUGIN_DIR) \
+ $(PROTOC) --go_out=plugins=grpc:$$d \
+ -I $$d -I $(PROTOBUF_REPO)/src $$d/*.proto; \
+ done
+
+
+sync-protobuf:
+ cd $(PROTOBUF_REPO); git pull
+
diff --git a/internal/rpcreplay/proto/rpcreplay/rpcreplay.pb.go b/internal/rpcreplay/proto/rpcreplay/rpcreplay.pb.go
new file mode 100644
index 0000000..8e76a39
--- /dev/null
+++ b/internal/rpcreplay/proto/rpcreplay/rpcreplay.pb.go
@@ -0,0 +1,170 @@
+// Code generated by protoc-gen-go.
+// source: rpcreplay.proto
+// DO NOT EDIT!
+
+/*
+Package rpcreplay is a generated protocol buffer package.
+
+It is generated from these files:
+ rpcreplay.proto
+
+It has these top-level messages:
+ Entry
+*/
+package rpcreplay
+
+import proto "github.com/golang/protobuf/proto"
+import fmt "fmt"
+import math "math"
+import google_protobuf "github.com/golang/protobuf/ptypes/any"
+
+// Reference imports to suppress errors if they are not otherwise used.
+var _ = proto.Marshal
+var _ = fmt.Errorf
+var _ = math.Inf
+
+// This is a compile-time assertion to ensure that this generated file
+// is compatible with the proto package it is being compiled against.
+// A compilation error at this line likely means your copy of the
+// proto package needs to be updated.
+const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
+
+type Entry_Kind int32
+
+const (
+ Entry_TYPE_UNSPECIFIED Entry_Kind = 0
+ // A unary request.
+ // method: the full name of the method
+ // message: the request proto
+ // is_error: false
+ // ref_index: 0
+ Entry_REQUEST Entry_Kind = 1
+ // A unary response.
+ // method: the full name of the method
+ // message:
+ // if is_error: a google.rpc.Status proto
+ // else: the response proto
+ // ref_index: index in the sequence of Entries of matching request (1-based)
+ Entry_RESPONSE Entry_Kind = 2
+ // A method that creates a stream.
+ // method: the full name of the method
+ // message:
+ // if is_error: a google.rpc.Status proto
+ // else: nil
+ // ref_index: 0
+ Entry_CREATE_STREAM Entry_Kind = 3
+ // A call to Send on the client returned by a stream-creating method.
+ // method: unset
+ // message: the proto being sent
+ // is_error: false
+ // ref_index: index of matching CREATE_STREAM entry (1-based)
+ Entry_SEND Entry_Kind = 4
+ // A call to Recv on the client returned by a stream-creating method.
+ // method: unset
+ // message:
+ // if is_error: a google.rpc.Status proto, or nil on EOF
+ // else: the received message
+ // ref_index: index of matching CREATE_STREAM entry
+ Entry_RECV Entry_Kind = 5
+)
+
+var Entry_Kind_name = map[int32]string{
+ 0: "TYPE_UNSPECIFIED",
+ 1: "REQUEST",
+ 2: "RESPONSE",
+ 3: "CREATE_STREAM",
+ 4: "SEND",
+ 5: "RECV",
+}
+var Entry_Kind_value = map[string]int32{
+ "TYPE_UNSPECIFIED": 0,
+ "REQUEST": 1,
+ "RESPONSE": 2,
+ "CREATE_STREAM": 3,
+ "SEND": 4,
+ "RECV": 5,
+}
+
+func (x Entry_Kind) String() string {
+ return proto.EnumName(Entry_Kind_name, int32(x))
+}
+func (Entry_Kind) EnumDescriptor() ([]byte, []int) { return fileDescriptor0, []int{0, 0} }
+
+// An Entry represents a single RPC activity, typically a request or response.
+type Entry struct {
+ Kind Entry_Kind `protobuf:"varint,1,opt,name=kind,enum=rpcreplay.Entry_Kind" json:"kind,omitempty"`
+ Method string `protobuf:"bytes,2,opt,name=method" json:"method,omitempty"`
+ Message *google_protobuf.Any `protobuf:"bytes,3,opt,name=message" json:"message,omitempty"`
+ IsError bool `protobuf:"varint,4,opt,name=is_error,json=isError" json:"is_error,omitempty"`
+ RefIndex int32 `protobuf:"varint,5,opt,name=ref_index,json=refIndex" json:"ref_index,omitempty"`
+}
+
+func (m *Entry) Reset() { *m = Entry{} }
+func (m *Entry) String() string { return proto.CompactTextString(m) }
+func (*Entry) ProtoMessage() {}
+func (*Entry) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }
+
+func (m *Entry) GetKind() Entry_Kind {
+ if m != nil {
+ return m.Kind
+ }
+ return Entry_TYPE_UNSPECIFIED
+}
+
+func (m *Entry) GetMethod() string {
+ if m != nil {
+ return m.Method
+ }
+ return ""
+}
+
+func (m *Entry) GetMessage() *google_protobuf.Any {
+ if m != nil {
+ return m.Message
+ }
+ return nil
+}
+
+func (m *Entry) GetIsError() bool {
+ if m != nil {
+ return m.IsError
+ }
+ return false
+}
+
+func (m *Entry) GetRefIndex() int32 {
+ if m != nil {
+ return m.RefIndex
+ }
+ return 0
+}
+
+func init() {
+ proto.RegisterType((*Entry)(nil), "rpcreplay.Entry")
+ proto.RegisterEnum("rpcreplay.Entry_Kind", Entry_Kind_name, Entry_Kind_value)
+}
+
+func init() { proto.RegisterFile("rpcreplay.proto", fileDescriptor0) }
+
+var fileDescriptor0 = []byte{
+ // 289 bytes of a gzipped FileDescriptorProto
+ 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x44, 0x8e, 0xdf, 0x4e, 0xc2, 0x30,
+ 0x14, 0xc6, 0x2d, 0x6c, 0x30, 0x0e, 0xfe, 0xa9, 0x0d, 0x9a, 0xa1, 0x37, 0x0b, 0x57, 0xf3, 0xa6,
+ 0x24, 0xf8, 0x04, 0x04, 0x8e, 0x09, 0x31, 0x22, 0xb6, 0xc3, 0xc4, 0x1b, 0x17, 0x70, 0x05, 0x17,
+ 0xa1, 0x25, 0xdd, 0x4c, 0xdc, 0x6b, 0xf8, 0xc4, 0x66, 0x13, 0xf4, 0xae, 0xbf, 0x7e, 0xbf, 0x9c,
+ 0xef, 0x83, 0x33, 0xbb, 0x7b, 0xb3, 0x6a, 0xb7, 0x59, 0x14, 0x7c, 0x67, 0x4d, 0x6e, 0x58, 0xeb,
+ 0xef, 0xe3, 0xaa, 0xbb, 0x36, 0x66, 0xbd, 0x51, 0xfd, 0x2a, 0x58, 0x7e, 0xae, 0xfa, 0x0b, 0xbd,
+ 0xb7, 0x7a, 0xdf, 0x35, 0x70, 0x51, 0xe7, 0xb6, 0x60, 0x37, 0xe0, 0x7c, 0xa4, 0x3a, 0xf1, 0x49,
+ 0x40, 0xc2, 0xd3, 0xc1, 0x05, 0xff, 0xbf, 0x57, 0xe5, 0xfc, 0x3e, 0xd5, 0x89, 0xa8, 0x14, 0x76,
+ 0x09, 0x8d, 0xad, 0xca, 0xdf, 0x4d, 0xe2, 0xd7, 0x02, 0x12, 0xb6, 0xc4, 0x9e, 0x18, 0x87, 0xe6,
+ 0x56, 0x65, 0xd9, 0x62, 0xad, 0xfc, 0x7a, 0x40, 0xc2, 0xf6, 0xa0, 0xc3, 0x7f, 0x9b, 0xf9, 0xa1,
+ 0x99, 0x0f, 0x75, 0x21, 0x0e, 0x12, 0xeb, 0x82, 0x97, 0x66, 0xb1, 0xb2, 0xd6, 0x58, 0xdf, 0x09,
+ 0x48, 0xe8, 0x89, 0x66, 0x9a, 0x61, 0x89, 0xec, 0x1a, 0x5a, 0x56, 0xad, 0xe2, 0x54, 0x27, 0xea,
+ 0xcb, 0x77, 0x03, 0x12, 0xba, 0xc2, 0xb3, 0x6a, 0x35, 0x29, 0xb9, 0xf7, 0x0a, 0x4e, 0xb9, 0x86,
+ 0x75, 0x80, 0x46, 0x2f, 0x33, 0x8c, 0xe7, 0x53, 0x39, 0xc3, 0xd1, 0xe4, 0x6e, 0x82, 0x63, 0x7a,
+ 0xc4, 0xda, 0xd0, 0x14, 0xf8, 0x34, 0x47, 0x19, 0x51, 0xc2, 0x8e, 0xc1, 0x13, 0x28, 0x67, 0x8f,
+ 0x53, 0x89, 0xb4, 0xc6, 0xce, 0xe1, 0x64, 0x24, 0x70, 0x18, 0x61, 0x2c, 0x23, 0x81, 0xc3, 0x07,
+ 0x5a, 0x67, 0x1e, 0x38, 0x12, 0xa7, 0x63, 0xea, 0x94, 0x2f, 0x81, 0xa3, 0x67, 0xea, 0x2e, 0x1b,
+ 0xd5, 0xdc, 0xdb, 0x9f, 0x00, 0x00, 0x00, 0xff, 0xff, 0xe7, 0x9b, 0x9d, 0x4f, 0x54, 0x01, 0x00,
+ 0x00,
+}
diff --git a/internal/rpcreplay/proto/rpcreplay/rpcreplay.proto b/internal/rpcreplay/proto/rpcreplay/rpcreplay.proto
new file mode 100644
index 0000000..8475f33
--- /dev/null
+++ b/internal/rpcreplay/proto/rpcreplay/rpcreplay.proto
@@ -0,0 +1,71 @@
+// Copyright 2017 Google Inc. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto3";
+
+package rpcreplay;
+
+import "google/protobuf/any.proto";
+
+// An Entry represents a single RPC activity, typically a request or response.
+message Entry {
+ enum Kind {
+ TYPE_UNSPECIFIED = 0;
+
+ // A unary request.
+ // method: the full name of the method
+ // message: the request proto
+ // is_error: false
+ // ref_index: 0
+ REQUEST = 1;
+
+ // A unary response.
+ // method: the full name of the method
+ // message:
+ // if is_error: a google.rpc.Status proto
+ // else: the response proto
+ // ref_index: index in the sequence of Entries of matching request (1-based)
+ RESPONSE = 2;
+
+ // A method that creates a stream.
+ // method: the full name of the method
+ // message:
+ // if is_error: a google.rpc.Status proto
+ // else: nil
+ // ref_index: 0
+ CREATE_STREAM = 3;
+
+ // A call to Send on the client returned by a stream-creating method.
+ // method: unset
+ // message: the proto being sent
+ // is_error: false
+ // ref_index: index of matching CREATE_STREAM entry (1-based)
+ SEND = 4; // message sent on stream
+
+ // A call to Recv on the client returned by a stream-creating method.
+ // method: unset
+ // message:
+ // if is_error: a google.rpc.Status proto, or nil on EOF
+ // else: the received message
+ // ref_index: index of matching CREATE_STREAM entry
+ RECV = 5; // message received from stream
+ }
+
+ Kind kind = 1;
+ string method = 2; // method name
+ google.protobuf.Any message = 3; // request, response or error status
+ bool is_error = 4; // was response an error?
+ int32 ref_index = 5; // for RESPONSE, index of matching request;
+ // for SEND/RECV, index of CREATE_STREAM
+}
diff --git a/internal/rpcreplay/rpcreplay.go b/internal/rpcreplay/rpcreplay.go
new file mode 100644
index 0000000..36b9241
--- /dev/null
+++ b/internal/rpcreplay/rpcreplay.go
@@ -0,0 +1,204 @@
+// Copyright 2017 Google Inc. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package rpcreplay
+
+import (
+ "encoding/binary"
+ "errors"
+ "fmt"
+ "io"
+
+ "google.golang.org/grpc/status"
+
+ pb "cloud.google.com/go/internal/rpcreplay/proto/rpcreplay"
+ "github.com/golang/protobuf/proto"
+ "github.com/golang/protobuf/ptypes"
+ "github.com/golang/protobuf/ptypes/any"
+ spb "google.golang.org/genproto/googleapis/rpc/status"
+)
+
+// An entry holds one gRPC action (request, response, etc.).
+type entry struct {
+ kind pb.Entry_Kind
+ method string
+ msg message
+ refIndex int // index of corresponding request or create-stream
+}
+
+func (e1 *entry) equal(e2 *entry) bool {
+ if e1 == nil && e2 == nil {
+ return true
+ }
+ if e1 == nil || e2 == nil {
+ return false
+ }
+ return e1.kind == e2.kind &&
+ e1.method == e2.method &&
+ proto.Equal(e1.msg.msg, e2.msg.msg) &&
+ errEqual(e1.msg.err, e2.msg.err) &&
+ e1.refIndex == e2.refIndex
+}
+
+func errEqual(e1, e2 error) bool {
+ if e1 == e2 {
+ return true
+ }
+ s1, ok1 := status.FromError(e1)
+ s2, ok2 := status.FromError(e2)
+ if !ok1 || !ok2 {
+ return false
+ }
+ return proto.Equal(s1.Proto(), s2.Proto())
+}
+
+// message holds either a single proto.Message or an error.
+type message struct {
+ msg proto.Message
+ err error
+}
+
+func (m *message) set(msg interface{}, err error) {
+ if msg != nil {
+ m.msg = msg.(proto.Message)
+ }
+ m.err = err
+}
+
+// File format:
+// header
+// sequence of Entry protos
+//
+// Header format:
+// magic string
+// a record containing the bytes of the initial state
+
+const magic = "RPCReplay"
+
+func writeHeader(w io.Writer, initial []byte) error {
+ if _, err := io.WriteString(w, magic); err != nil {
+ return err
+ }
+ return writeRecord(w, initial)
+}
+
+func readHeader(r io.Reader) ([]byte, error) {
+ var buf [len(magic)]byte
+ if _, err := io.ReadFull(r, buf[:]); err != nil {
+ if err == io.EOF {
+ err = errors.New("rpcreplay: empty replay file")
+ }
+ return nil, err
+ }
+ if string(buf[:]) != magic {
+ return nil, errors.New("rpcreplay: not a replay file (does not begin with magic string)")
+ }
+ bytes, err := readRecord(r)
+ if err == io.EOF {
+ err = errors.New("rpcreplay: missing initial state")
+ }
+ return bytes, err
+}
+
+func writeEntry(w io.Writer, e *entry) error {
+ var m proto.Message
+ if e.msg.err != nil && e.msg.err != io.EOF {
+ s, ok := status.FromError(e.msg.err)
+ if !ok {
+ return fmt.Errorf("rpcreplay: error %v is not a Status", e.msg.err)
+ }
+ m = s.Proto()
+ } else {
+ m = e.msg.msg
+ }
+ var a *any.Any
+ var err error
+ if m != nil {
+ a, err = ptypes.MarshalAny(m)
+ if err != nil {
+ return err
+ }
+ }
+ pe := &pb.Entry{
+ Kind: e.kind,
+ Method: e.method,
+ Message: a,
+ IsError: e.msg.err != nil,
+ RefIndex: int32(e.refIndex),
+ }
+ bytes, err := proto.Marshal(pe)
+ if err != nil {
+ return err
+ }
+ return writeRecord(w, bytes)
+}
+
+func readEntry(r io.Reader) (*entry, error) {
+ buf, err := readRecord(r)
+ if err == io.EOF {
+ return nil, nil
+ }
+ if err != nil {
+ return nil, err
+ }
+ var pe pb.Entry
+ if err := proto.Unmarshal(buf, &pe); err != nil {
+ return nil, err
+ }
+ var msg message
+ if pe.Message != nil {
+ var any ptypes.DynamicAny
+ if err := ptypes.UnmarshalAny(pe.Message, &any); err != nil {
+ return nil, err
+ }
+ if pe.IsError {
+ msg.err = status.ErrorProto(any.Message.(*spb.Status))
+ } else {
+ msg.msg = any.Message
+ }
+ } else if pe.IsError {
+ msg.err = io.EOF
+ } else {
+ return nil, errors.New("rpcreplay: entry with nil message and false is_error")
+ }
+ return &entry{
+ kind: pe.Kind,
+ method: pe.Method,
+ msg: msg,
+ refIndex: int(pe.RefIndex),
+ }, nil
+}
+
+// A record consists of an unsigned 32-bit little-endian length L followed by L
+// bytes.
+
+func writeRecord(w io.Writer, data []byte) error {
+ if err := binary.Write(w, binary.LittleEndian, uint32(len(data))); err != nil {
+ return err
+ }
+ _, err := w.Write(data)
+ return err
+}
+
+func readRecord(r io.Reader) ([]byte, error) {
+ var size uint32
+ if err := binary.Read(r, binary.LittleEndian, &size); err != nil {
+ return nil, err
+ }
+ buf := make([]byte, size)
+ if _, err := io.ReadFull(r, buf); err != nil {
+ return nil, err
+ }
+ return buf, nil
+}
diff --git a/internal/rpcreplay/rpcreplay_test.go b/internal/rpcreplay/rpcreplay_test.go
new file mode 100644
index 0000000..30e64f5
--- /dev/null
+++ b/internal/rpcreplay/rpcreplay_test.go
@@ -0,0 +1,98 @@
+// Copyright 2017 Google Inc. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package rpcreplay
+
+import (
+ "bytes"
+ "io"
+ "reflect"
+ "testing"
+
+ rpb "cloud.google.com/go/internal/rpcreplay/proto/rpcreplay"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+)
+
+func TestRecordIO(t *testing.T) {
+ buf := &bytes.Buffer{}
+ want := []byte{1, 2, 3}
+ if err := writeRecord(buf, want); err != nil {
+ t.Fatal(err)
+ }
+ got, err := readRecord(buf)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if !bytes.Equal(got, want) {
+ t.Errorf("got %v, want %v", got, want)
+ }
+}
+
+func TestHeaderIO(t *testing.T) {
+ buf := &bytes.Buffer{}
+ want := []byte{1, 2, 3}
+ if err := writeHeader(buf, want); err != nil {
+ t.Fatal(err)
+ }
+ got, err := readHeader(buf)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if !reflect.DeepEqual(got, want) {
+ t.Errorf("got %v, want %v", got, want)
+ }
+
+ // readHeader errors
+ for _, contents := range []string{"", "badmagic", "gRPCReplay"} {
+ if _, err := readHeader(bytes.NewBufferString(contents)); err == nil {
+ t.Errorf("%q: got nil, want error", contents)
+ }
+ }
+}
+
+func TestEntryIO(t *testing.T) {
+ for i, want := range []*entry{
+ {
+ kind: rpb.Entry_REQUEST,
+ method: "method",
+ msg: message{msg: &rpb.Entry{}},
+ refIndex: 7,
+ },
+ {
+ kind: rpb.Entry_RESPONSE,
+ method: "method",
+ msg: message{err: status.Error(codes.NotFound, "not found")},
+ refIndex: 8,
+ },
+ {
+ kind: rpb.Entry_RECV,
+ method: "method",
+ msg: message{err: io.EOF},
+ refIndex: 3,
+ },
+ } {
+ buf := &bytes.Buffer{}
+ if err := writeEntry(buf, want); err != nil {
+ t.Fatal(err)
+ }
+ got, err := readEntry(buf)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if !got.equal(want) {
+ t.Errorf("#%d: got %v, want %v", i, got, want)
+ }
+ }
+}