blob: 9e723588c6951e8fb890fbb11db60058bb22d4d8 [file] [log] [blame]
/*
Copyright 2017 Google LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package spanner
import (
"context"
"fmt"
"io"
"strings"
"testing"
"time"
itestutil "cloud.google.com/go/internal/testutil"
. "cloud.google.com/go/spanner/internal/testutil"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
func setupMockedTestServer(t *testing.T) (server *MockedSpannerInMemTestServer, client *Client, teardown func()) {
return setupMockedTestServerWithConfig(t, ClientConfig{})
}
func setupMockedTestServerWithConfig(t *testing.T, config ClientConfig) (server *MockedSpannerInMemTestServer, client *Client, teardown func()) {
return setupMockedTestServerWithConfigAndClientOptions(t, config, []option.ClientOption{})
}
func setupMockedTestServerWithConfigAndClientOptions(t *testing.T, config ClientConfig, clientOptions []option.ClientOption) (server *MockedSpannerInMemTestServer, client *Client, teardown func()) {
grpcHeaderChecker := &itestutil.HeadersEnforcer{
OnFailure: t.Fatalf,
Checkers: []*itestutil.HeaderChecker{
{
Key: "x-goog-api-client",
ValuesValidator: func(token ...string) error {
if len(token) != 1 {
return spannerErrorf(codes.Internal, "unexpected number of api client token headers: %v", len(token))
}
if !strings.HasPrefix(token[0], "gl-go/") {
return spannerErrorf(codes.Internal, "unexpected api client token: %v", token[0])
}
return nil
},
},
},
}
clientOptions = append(clientOptions, grpcHeaderChecker.CallOptions()...)
server, opts, serverTeardown := NewMockedSpannerInMemTestServer(t)
opts = append(opts, clientOptions...)
ctx := context.Background()
var formattedDatabase = fmt.Sprintf("projects/%s/instances/%s/databases/%s", "[PROJECT]", "[INSTANCE]", "[DATABASE]")
client, err := NewClientWithConfig(ctx, formattedDatabase, config, opts...)
if err != nil {
t.Fatal(err)
}
return server, client, func() {
client.Close()
serverTeardown()
}
}
// Test validDatabaseName()
func TestValidDatabaseName(t *testing.T) {
validDbURI := "projects/spanner-cloud-test/instances/foo/databases/foodb"
invalidDbUris := []string{
// Completely wrong DB URI.
"foobarDB",
// Project ID contains "/".
"projects/spanner-cloud/test/instances/foo/databases/foodb",
// No instance ID.
"projects/spanner-cloud-test/instances//databases/foodb",
}
if err := validDatabaseName(validDbURI); err != nil {
t.Errorf("validateDatabaseName(%q) = %v, want nil", validDbURI, err)
}
for _, d := range invalidDbUris {
if err, wantErr := validDatabaseName(d), "should conform to pattern"; !strings.Contains(err.Error(), wantErr) {
t.Errorf("validateDatabaseName(%q) = %q, want error pattern %q", validDbURI, err, wantErr)
}
}
}
func TestReadOnlyTransactionClose(t *testing.T) {
// Closing a ReadOnlyTransaction shouldn't panic.
c := &Client{}
tx := c.ReadOnlyTransaction()
tx.Close()
}
func TestClient_Single(t *testing.T) {
t.Parallel()
err := testSingleQuery(t, nil)
if err != nil {
t.Fatal(err)
}
}
func TestClient_Single_Unavailable(t *testing.T) {
t.Parallel()
err := testSingleQuery(t, status.Error(codes.Unavailable, "Temporary unavailable"))
if err != nil {
t.Fatal(err)
}
}
func TestClient_Single_InvalidArgument(t *testing.T) {
t.Parallel()
err := testSingleQuery(t, status.Error(codes.InvalidArgument, "Invalid argument"))
if status.Code(err) != codes.InvalidArgument {
t.Fatalf("got unexpected exception %v, expected InvalidArgument", err)
}
}
func TestClient_Single_RetryableErrorOnPartialResultSet(t *testing.T) {
t.Parallel()
server, client, teardown := setupMockedTestServer(t)
defer teardown()
// Add two errors that will be returned by the mock server when the client
// is trying to fetch a partial result set. Both errors are retryable.
// The errors are not 'sticky' on the mocked server, i.e. once the error
// has been returned once, the next call for the same partial result set
// will succeed.
// When the client is fetching the partial result set with resume token 2,
// the mock server will respond with an internal error with the message
// 'stream terminated by RST_STREAM'. The client will retry the call to get
// this partial result set.
server.TestSpanner.AddPartialResultSetError(
SelectSingerIDAlbumIDAlbumTitleFromAlbums,
PartialResultSetExecutionTime{
ResumeToken: EncodeResumeToken(2),
Err: spannerErrorf(codes.Internal, "stream terminated by RST_STREAM"),
},
)
// When the client is fetching the partial result set with resume token 3,
// the mock server will respond with a 'Unavailable' error. The client will
// retry the call to get this partial result set.
server.TestSpanner.AddPartialResultSetError(
SelectSingerIDAlbumIDAlbumTitleFromAlbums,
PartialResultSetExecutionTime{
ResumeToken: EncodeResumeToken(3),
Err: spannerErrorf(codes.Unavailable, "server is unavailable"),
},
)
ctx := context.Background()
if err := executeSingerQuery(ctx, client.Single()); err != nil {
t.Fatal(err)
}
}
func TestClient_Single_NonRetryableErrorOnPartialResultSet(t *testing.T) {
t.Parallel()
server, client, teardown := setupMockedTestServer(t)
defer teardown()
// Add two errors that will be returned by the mock server when the client
// is trying to fetch a partial result set. The first error is retryable,
// the second is not.
// This error will automatically be retried.
server.TestSpanner.AddPartialResultSetError(
SelectSingerIDAlbumIDAlbumTitleFromAlbums,
PartialResultSetExecutionTime{
ResumeToken: EncodeResumeToken(2),
Err: spannerErrorf(codes.Internal, "stream terminated by RST_STREAM"),
},
)
// 'Session not found' is not retryable and the error will be returned to
// the user.
server.TestSpanner.AddPartialResultSetError(
SelectSingerIDAlbumIDAlbumTitleFromAlbums,
PartialResultSetExecutionTime{
ResumeToken: EncodeResumeToken(3),
Err: spannerErrorf(codes.NotFound, "Session not found"),
},
)
ctx := context.Background()
err := executeSingerQuery(ctx, client.Single())
if status.Code(err) != codes.NotFound {
t.Fatalf("Error mismatch:\ngot: %v\nwant: %v", err, codes.NotFound)
}
}
func TestClient_Single_DeadlineExceeded_NoErrors(t *testing.T) {
t.Parallel()
server, client, teardown := setupMockedTestServer(t)
defer teardown()
server.TestSpanner.PutExecutionTime(MethodExecuteStreamingSql,
SimulatedExecutionTime{
MinimumExecutionTime: 50 * time.Millisecond,
})
ctx := context.Background()
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(5*time.Millisecond))
defer cancel()
err := executeSingerQuery(ctx, client.Single())
if status.Code(err) != codes.DeadlineExceeded {
t.Fatalf("Error mismatch:\ngot: %v\nwant: %v", err, codes.DeadlineExceeded)
}
}
func TestClient_Single_DeadlineExceeded_WithErrors(t *testing.T) {
t.Parallel()
server, client, teardown := setupMockedTestServer(t)
defer teardown()
server.TestSpanner.AddPartialResultSetError(
SelectSingerIDAlbumIDAlbumTitleFromAlbums,
PartialResultSetExecutionTime{
ResumeToken: EncodeResumeToken(2),
Err: spannerErrorf(codes.Internal, "stream terminated by RST_STREAM"),
},
)
server.TestSpanner.AddPartialResultSetError(
SelectSingerIDAlbumIDAlbumTitleFromAlbums,
PartialResultSetExecutionTime{
ResumeToken: EncodeResumeToken(3),
Err: spannerErrorf(codes.Unavailable, "server is unavailable"),
ExecutionTime: 50 * time.Millisecond,
},
)
ctx := context.Background()
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(25*time.Millisecond))
defer cancel()
err := executeSingerQuery(ctx, client.Single())
if status.Code(err) != codes.DeadlineExceeded {
t.Fatalf("got unexpected error %v, expected DeadlineExceeded", err)
}
}
func TestClient_Single_ContextCanceled_noDeclaredServerErrors(t *testing.T) {
t.Parallel()
_, client, teardown := setupMockedTestServer(t)
defer teardown()
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
cancel()
err := executeSingerQuery(ctx, client.Single())
if status.Code(err) != codes.Canceled {
t.Fatalf("got unexpected error %v, expected Canceled", err)
}
}
func TestClient_Single_ContextCanceled_withDeclaredServerErrors(t *testing.T) {
t.Parallel()
server, client, teardown := setupMockedTestServer(t)
defer teardown()
server.TestSpanner.AddPartialResultSetError(
SelectSingerIDAlbumIDAlbumTitleFromAlbums,
PartialResultSetExecutionTime{
ResumeToken: EncodeResumeToken(2),
Err: spannerErrorf(codes.Internal, "stream terminated by RST_STREAM"),
},
)
server.TestSpanner.AddPartialResultSetError(
SelectSingerIDAlbumIDAlbumTitleFromAlbums,
PartialResultSetExecutionTime{
ResumeToken: EncodeResumeToken(3),
Err: spannerErrorf(codes.Unavailable, "server is unavailable"),
},
)
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
f := func(rowCount int64) error {
if rowCount == 2 {
cancel()
}
return nil
}
iter := client.Single().Query(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums))
defer iter.Stop()
err := executeSingerQueryWithRowFunc(ctx, client.Single(), f)
if status.Code(err) != codes.Canceled {
t.Fatalf("got unexpected error %v, expected Canceled", err)
}
}
func testSingleQuery(t *testing.T, serverError error) error {
ctx := context.Background()
server, client, teardown := setupMockedTestServer(t)
defer teardown()
if serverError != nil {
server.TestSpanner.SetError(serverError)
}
return executeSingerQuery(ctx, client.Single())
}
func executeSingerQuery(ctx context.Context, tx *ReadOnlyTransaction) error {
return executeSingerQueryWithRowFunc(ctx, tx, nil)
}
func executeSingerQueryWithRowFunc(ctx context.Context, tx *ReadOnlyTransaction, f func(rowCount int64) error) error {
iter := tx.Query(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums))
defer iter.Stop()
rowCount := int64(0)
for {
row, err := iter.Next()
if err == iterator.Done {
break
}
if err != nil {
return err
}
var singerID, albumID int64
var albumTitle string
if err := row.Columns(&singerID, &albumID, &albumTitle); err != nil {
return err
}
rowCount++
if f != nil {
if err := f(rowCount); err != nil {
return err
}
}
}
if rowCount != SelectSingerIDAlbumIDAlbumTitleFromAlbumsRowCount {
return spannerErrorf(codes.Internal, "Row count mismatch, got %v, expected %v", rowCount, SelectSingerIDAlbumIDAlbumTitleFromAlbumsRowCount)
}
return nil
}
func createSimulatedExecutionTimeWithTwoUnavailableErrors(method string) map[string]SimulatedExecutionTime {
errors := make([]error, 2)
errors[0] = status.Error(codes.Unavailable, "Temporary unavailable")
errors[1] = status.Error(codes.Unavailable, "Temporary unavailable")
executionTimes := make(map[string]SimulatedExecutionTime)
executionTimes[method] = SimulatedExecutionTime{
Errors: errors,
}
return executionTimes
}
func TestClient_ReadOnlyTransaction(t *testing.T) {
t.Parallel()
if err := testReadOnlyTransaction(t, make(map[string]SimulatedExecutionTime)); err != nil {
t.Fatal(err)
}
}
func TestClient_ReadOnlyTransaction_UnavailableOnSessionCreate(t *testing.T) {
t.Parallel()
if err := testReadOnlyTransaction(t, createSimulatedExecutionTimeWithTwoUnavailableErrors(MethodCreateSession)); err != nil {
t.Fatal(err)
}
}
func TestClient_ReadOnlyTransaction_UnavailableOnBeginTransaction(t *testing.T) {
t.Parallel()
if err := testReadOnlyTransaction(t, createSimulatedExecutionTimeWithTwoUnavailableErrors(MethodBeginTransaction)); err != nil {
t.Fatal(err)
}
}
func TestClient_ReadOnlyTransaction_UnavailableOnExecuteStreamingSql(t *testing.T) {
t.Parallel()
if err := testReadOnlyTransaction(t, createSimulatedExecutionTimeWithTwoUnavailableErrors(MethodExecuteStreamingSql)); err != nil {
t.Fatal(err)
}
}
func TestClient_ReadOnlyTransaction_UnavailableOnCreateSessionAndBeginTransaction(t *testing.T) {
t.Parallel()
exec := map[string]SimulatedExecutionTime{
MethodCreateSession: {Errors: []error{status.Error(codes.Unavailable, "Temporary unavailable")}},
MethodBeginTransaction: {Errors: []error{status.Error(codes.Unavailable, "Temporary unavailable")}},
}
if err := testReadOnlyTransaction(t, exec); err != nil {
t.Fatal(err)
}
}
func TestClient_ReadOnlyTransaction_UnavailableOnCreateSessionAndInvalidArgumentOnBeginTransaction(t *testing.T) {
t.Parallel()
exec := map[string]SimulatedExecutionTime{
MethodCreateSession: {Errors: []error{status.Error(codes.Unavailable, "Temporary unavailable")}},
MethodBeginTransaction: {Errors: []error{status.Error(codes.InvalidArgument, "Invalid argument")}},
}
if err := testReadOnlyTransaction(t, exec); err == nil {
t.Fatalf("Missing expected exception")
} else if status.Code(err) != codes.InvalidArgument {
t.Fatalf("Got unexpected exception: %v", err)
}
}
func testReadOnlyTransaction(t *testing.T, executionTimes map[string]SimulatedExecutionTime) error {
server, client, teardown := setupMockedTestServer(t)
defer teardown()
for method, exec := range executionTimes {
server.TestSpanner.PutExecutionTime(method, exec)
}
tx := client.ReadOnlyTransaction()
defer tx.Close()
ctx := context.Background()
return executeSingerQuery(ctx, tx)
}
func TestClient_ReadWriteTransaction(t *testing.T) {
t.Parallel()
if err := testReadWriteTransaction(t, make(map[string]SimulatedExecutionTime), 1); err != nil {
t.Fatal(err)
}
}
func TestClient_ReadWriteTransactionCommitAborted(t *testing.T) {
t.Parallel()
if err := testReadWriteTransaction(t, map[string]SimulatedExecutionTime{
MethodCommitTransaction: {Errors: []error{status.Error(codes.Aborted, "Transaction aborted")}},
}, 2); err != nil {
t.Fatal(err)
}
}
func TestClient_ReadWriteTransactionExecuteStreamingSqlAborted(t *testing.T) {
t.Parallel()
if err := testReadWriteTransaction(t, map[string]SimulatedExecutionTime{
MethodExecuteStreamingSql: {Errors: []error{status.Error(codes.Aborted, "Transaction aborted")}},
}, 2); err != nil {
t.Fatal(err)
}
}
func TestClient_ReadWriteTransaction_UnavailableOnBeginTransaction(t *testing.T) {
t.Parallel()
if err := testReadWriteTransaction(t, map[string]SimulatedExecutionTime{
MethodBeginTransaction: {Errors: []error{status.Error(codes.Unavailable, "Unavailable")}},
}, 1); err != nil {
t.Fatal(err)
}
}
func TestClient_ReadWriteTransaction_UnavailableOnBeginAndAbortOnCommit(t *testing.T) {
if err := testReadWriteTransaction(t, map[string]SimulatedExecutionTime{
MethodBeginTransaction: {Errors: []error{status.Error(codes.Unavailable, "Unavailable")}},
MethodCommitTransaction: {Errors: []error{status.Error(codes.Aborted, "Aborted")}},
}, 2); err != nil {
t.Fatal(err)
}
}
func TestClient_ReadWriteTransaction_UnavailableOnExecuteStreamingSql(t *testing.T) {
t.Parallel()
if err := testReadWriteTransaction(t, map[string]SimulatedExecutionTime{
MethodExecuteStreamingSql: {Errors: []error{status.Error(codes.Unavailable, "Unavailable")}},
}, 1); err != nil {
t.Fatal(err)
}
}
func TestClient_ReadWriteTransaction_UnavailableOnBeginAndExecuteStreamingSqlAndTwiceAbortOnCommit(t *testing.T) {
t.Parallel()
if err := testReadWriteTransaction(t, map[string]SimulatedExecutionTime{
MethodBeginTransaction: {Errors: []error{status.Error(codes.Unavailable, "Unavailable")}},
MethodExecuteStreamingSql: {Errors: []error{status.Error(codes.Unavailable, "Unavailable")}},
MethodCommitTransaction: {Errors: []error{status.Error(codes.Aborted, "Aborted"), status.Error(codes.Aborted, "Aborted")}},
}, 3); err != nil {
t.Fatal(err)
}
}
func TestClient_ReadWriteTransaction_AbortedOnExecuteStreamingSqlAndCommit(t *testing.T) {
t.Parallel()
if err := testReadWriteTransaction(t, map[string]SimulatedExecutionTime{
MethodExecuteStreamingSql: {Errors: []error{status.Error(codes.Aborted, "Aborted")}},
MethodCommitTransaction: {Errors: []error{status.Error(codes.Aborted, "Aborted"), status.Error(codes.Aborted, "Aborted")}},
}, 4); err != nil {
t.Fatal(err)
}
}
func TestClient_ReadWriteTransactionCommitAbortedAndUnavailable(t *testing.T) {
t.Parallel()
if err := testReadWriteTransaction(t, map[string]SimulatedExecutionTime{
MethodCommitTransaction: {
Errors: []error{
status.Error(codes.Aborted, "Transaction aborted"),
status.Error(codes.Unavailable, "Unavailable"),
},
},
}, 2); err != nil {
t.Fatal(err)
}
}
func TestClient_ReadWriteTransactionCommitAlreadyExists(t *testing.T) {
t.Parallel()
if err := testReadWriteTransaction(t, map[string]SimulatedExecutionTime{
MethodCommitTransaction: {Errors: []error{status.Error(codes.AlreadyExists, "A row with this key already exists")}},
}, 1); err != nil {
if status.Code(err) != codes.AlreadyExists {
t.Fatalf("Got unexpected error %v, expected %v", err, codes.AlreadyExists)
}
} else {
t.Fatalf("Missing expected exception")
}
}
func testReadWriteTransaction(t *testing.T, executionTimes map[string]SimulatedExecutionTime, expectedAttempts int) error {
server, client, teardown := setupMockedTestServer(t)
defer teardown()
for method, exec := range executionTimes {
server.TestSpanner.PutExecutionTime(method, exec)
}
ctx := context.Background()
var attempts int
_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
attempts++
iter := tx.Query(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums))
defer iter.Stop()
rowCount := int64(0)
for {
row, err := iter.Next()
if err == iterator.Done {
break
}
if err != nil {
return err
}
var singerID, albumID int64
var albumTitle string
if err := row.Columns(&singerID, &albumID, &albumTitle); err != nil {
return err
}
rowCount++
}
if rowCount != SelectSingerIDAlbumIDAlbumTitleFromAlbumsRowCount {
return spannerErrorf(codes.FailedPrecondition, "Row count mismatch, got %v, expected %v", rowCount, SelectSingerIDAlbumIDAlbumTitleFromAlbumsRowCount)
}
return nil
})
if err != nil {
return err
}
if expectedAttempts != attempts {
t.Fatalf("unexpected number of attempts: %d, expected %d", attempts, expectedAttempts)
}
return nil
}
func TestClient_ApplyAtLeastOnce(t *testing.T) {
t.Parallel()
server, client, teardown := setupMockedTestServer(t)
defer teardown()
ms := []*Mutation{
Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}),
Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}),
}
server.TestSpanner.PutExecutionTime(MethodCommitTransaction,
SimulatedExecutionTime{
Errors: []error{status.Error(codes.Aborted, "Transaction aborted")},
})
_, err := client.Apply(context.Background(), ms, ApplyAtLeastOnce())
if err != nil {
t.Fatal(err)
}
}
func TestReadWriteTransaction_ErrUnexpectedEOF(t *testing.T) {
_, client, teardown := setupMockedTestServer(t)
defer teardown()
ctx := context.Background()
var attempts int
_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
attempts++
iter := tx.Query(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums))
defer iter.Stop()
for {
row, err := iter.Next()
if err == iterator.Done {
break
}
if err != nil {
return err
}
var singerID, albumID int64
var albumTitle string
if err := row.Columns(&singerID, &albumID, &albumTitle); err != nil {
return err
}
}
return io.ErrUnexpectedEOF
})
if err != io.ErrUnexpectedEOF {
t.Fatalf("Missing expected error %v, got %v", io.ErrUnexpectedEOF, err)
}
if attempts != 1 {
t.Fatalf("unexpected number of attempts: %d, expected %d", attempts, 1)
}
}