| /* |
| Copyright 2017 Google LLC |
| |
| Licensed under the Apache License, Version 2.0 (the "License"); |
| you may not use this file except in compliance with the License. |
| You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| package testutil |
| |
| import ( |
| "encoding/binary" |
| "fmt" |
| "io" |
| "net" |
| "sync" |
| "testing" |
| "time" |
| |
| "github.com/golang/protobuf/ptypes/empty" |
| proto3 "github.com/golang/protobuf/ptypes/struct" |
| pbt "github.com/golang/protobuf/ptypes/timestamp" |
| "golang.org/x/net/context" |
| sppb "google.golang.org/genproto/googleapis/spanner/v1" |
| "google.golang.org/grpc" |
| "google.golang.org/grpc/codes" |
| "google.golang.org/grpc/status" |
| ) |
| |
| var ( |
| // KvMeta is the Metadata for mocked KV table. |
| KvMeta = sppb.ResultSetMetadata{ |
| RowType: &sppb.StructType{ |
| Fields: []*sppb.StructType_Field{ |
| { |
| Name: "Key", |
| Type: &sppb.Type{Code: sppb.TypeCode_STRING}, |
| }, |
| { |
| Name: "Value", |
| Type: &sppb.Type{Code: sppb.TypeCode_STRING}, |
| }, |
| }, |
| }, |
| } |
| ) |
| |
| // MockCtlMsg encapsulates PartialResultSet/error that might be sent to |
| // client |
| type MockCtlMsg struct { |
| // If ResumeToken == true, mock server will generate a row with |
| // resume token. |
| ResumeToken bool |
| // If Err != nil, mock server will return error in RPC response. |
| Err error |
| } |
| |
| // MockCloudSpanner is a mock implementation of SpannerServer interface. |
| // TODO: make MockCloudSpanner a full-fleged Cloud Spanner implementation. |
| type MockCloudSpanner struct { |
| sppb.SpannerServer |
| |
| s *grpc.Server |
| t *testing.T |
| addr string |
| msgs chan MockCtlMsg |
| readTs time.Time |
| |
| mu sync.Mutex |
| next int |
| nextSession int |
| sessions map[string]*sppb.Session |
| } |
| |
| // Addr returns the listening address of mock server. |
| func (m *MockCloudSpanner) Addr() string { |
| return m.addr |
| } |
| |
| // AddMsg generates a new mocked row which can be received by client. |
| func (m *MockCloudSpanner) AddMsg(err error, resumeToken bool) { |
| msg := MockCtlMsg{ |
| ResumeToken: resumeToken, |
| Err: err, |
| } |
| if err == io.EOF { |
| close(m.msgs) |
| } else { |
| m.msgs <- msg |
| } |
| } |
| |
| // Done signals an end to a mocked stream. |
| func (m *MockCloudSpanner) Done() { |
| close(m.msgs) |
| } |
| |
| // CreateSession is a placeholder for SpannerServer.CreateSession. |
| func (m *MockCloudSpanner) CreateSession(c context.Context, r *sppb.CreateSessionRequest) (*sppb.Session, error) { |
| m.mu.Lock() |
| defer m.mu.Unlock() |
| name := fmt.Sprintf("session-%d", m.nextSession) |
| m.nextSession++ |
| s := &sppb.Session{Name: name} |
| m.sessions[name] = s |
| return s, nil |
| } |
| |
| // GetSession is a placeholder for SpannerServer.GetSession. |
| func (m *MockCloudSpanner) GetSession(c context.Context, r *sppb.GetSessionRequest) (*sppb.Session, error) { |
| m.mu.Lock() |
| defer m.mu.Unlock() |
| if s, ok := m.sessions[r.Name]; ok { |
| return s, nil |
| } |
| return nil, status.Errorf(codes.NotFound, "not found") |
| } |
| |
| // DeleteSession is a placeholder for SpannerServer.DeleteSession. |
| func (m *MockCloudSpanner) DeleteSession(c context.Context, r *sppb.DeleteSessionRequest) (*empty.Empty, error) { |
| m.mu.Lock() |
| defer m.mu.Unlock() |
| delete(m.sessions, r.Name) |
| return &empty.Empty{}, nil |
| } |
| |
| // EncodeResumeToken return mock resume token encoding for an uint64 integer. |
| func EncodeResumeToken(t uint64) []byte { |
| rt := make([]byte, 16) |
| binary.PutUvarint(rt, t) |
| return rt |
| } |
| |
| // DecodeResumeToken decodes a mock resume token into an uint64 integer. |
| func DecodeResumeToken(t []byte) (uint64, error) { |
| s, n := binary.Uvarint(t) |
| if n <= 0 { |
| return 0, fmt.Errorf("invalid resume token: %v", t) |
| } |
| return s, nil |
| } |
| |
| // ExecuteStreamingSql is a mock implementation of SpannerServer.ExecuteStreamingSql. |
| func (m *MockCloudSpanner) ExecuteStreamingSql(r *sppb.ExecuteSqlRequest, s sppb.Spanner_ExecuteStreamingSqlServer) error { |
| switch r.Sql { |
| case "SELECT * from t_unavailable": |
| return status.Errorf(codes.Unavailable, "mock table unavailable") |
| |
| case "UPDATE t SET x = 2 WHERE x = 1": |
| err := s.Send(&sppb.PartialResultSet{ |
| Stats: &sppb.ResultSetStats{RowCount: &sppb.ResultSetStats_RowCountLowerBound{3}}, |
| }) |
| if err != nil { |
| panic(err) |
| } |
| return nil |
| |
| case "SELECT t.key key, t.value value FROM t_mock t": |
| if r.ResumeToken != nil { |
| s, err := DecodeResumeToken(r.ResumeToken) |
| if err != nil { |
| return err |
| } |
| m.mu.Lock() |
| m.next = int(s) + 1 |
| m.mu.Unlock() |
| } |
| for { |
| msg, more := <-m.msgs |
| if !more { |
| break |
| } |
| if msg.Err == nil { |
| var rt []byte |
| if msg.ResumeToken { |
| m.mu.Lock() |
| rt = EncodeResumeToken(uint64(m.next)) |
| m.mu.Unlock() |
| } |
| meta := KvMeta |
| meta.Transaction = &sppb.Transaction{ |
| ReadTimestamp: &pbt.Timestamp{ |
| Seconds: m.readTs.Unix(), |
| Nanos: int32(m.readTs.Nanosecond()), |
| }, |
| } |
| m.mu.Lock() |
| next := m.next |
| m.next++ |
| m.mu.Unlock() |
| err := s.Send(&sppb.PartialResultSet{ |
| Metadata: &meta, |
| Values: []*proto3.Value{ |
| {Kind: &proto3.Value_StringValue{StringValue: fmt.Sprintf("foo-%02d", next)}}, |
| {Kind: &proto3.Value_StringValue{StringValue: fmt.Sprintf("bar-%02d", next)}}, |
| }, |
| ResumeToken: rt, |
| }) |
| if err != nil { |
| return err |
| } |
| continue |
| } |
| return msg.Err |
| } |
| return nil |
| default: |
| return fmt.Errorf("unsupported SQL: %v", r.Sql) |
| } |
| } |
| |
| // StreamingRead is a placeholder for SpannerServer.StreamingRead. |
| func (m *MockCloudSpanner) StreamingRead(r *sppb.ReadRequest, s sppb.Spanner_StreamingReadServer) error { |
| return s.Send(&sppb.PartialResultSet{}) |
| } |
| |
| // Serve runs a MockCloudSpanner listening on a random localhost address. |
| func (m *MockCloudSpanner) Serve() { |
| m.s = grpc.NewServer() |
| if m.addr == "" { |
| m.addr = "localhost:0" |
| } |
| lis, err := net.Listen("tcp", m.addr) |
| if err != nil { |
| m.t.Fatalf("Failed to listen: %v", err) |
| } |
| _, port, err := net.SplitHostPort(lis.Addr().String()) |
| if err != nil { |
| m.t.Fatalf("Failed to parse listener address: %v", err) |
| } |
| sppb.RegisterSpannerServer(m.s, m) |
| m.addr = "localhost:" + port |
| go m.s.Serve(lis) |
| } |
| |
| // BeginTransaction is a placeholder for SpannerServer.BeginTransaction. |
| func (m *MockCloudSpanner) BeginTransaction(_ context.Context, r *sppb.BeginTransactionRequest) (*sppb.Transaction, error) { |
| m.mu.Lock() |
| defer m.mu.Unlock() |
| return &sppb.Transaction{}, nil |
| } |
| |
| // Stop terminates MockCloudSpanner and closes the serving port. |
| func (m *MockCloudSpanner) Stop() { |
| m.s.Stop() |
| } |
| |
| // NewMockCloudSpanner creates a new MockCloudSpanner instance. |
| func NewMockCloudSpanner(t *testing.T, ts time.Time) *MockCloudSpanner { |
| mcs := &MockCloudSpanner{ |
| t: t, |
| msgs: make(chan MockCtlMsg, 1000), |
| readTs: ts, |
| sessions: map[string]*sppb.Session{}, |
| } |
| return mcs |
| } |