blob: b868ee839ced1581738778c3a9840b408805c4fc [file] [log] [blame]
/*
Copyright 2017 Google LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package spanner
import (
"bytes"
"container/heap"
"context"
"fmt"
"io/ioutil"
"log"
"math/rand"
"os"
"reflect"
"strings"
"testing"
"time"
. "cloud.google.com/go/spanner/internal/testutil"
"google.golang.org/api/iterator"
"google.golang.org/genproto/googleapis/rpc/errdetails"
sppb "google.golang.org/genproto/googleapis/spanner/v1"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
func newSessionNotFoundError(name string) error {
s := status.Newf(codes.NotFound, "Session not found: Session with id %s not found", name)
s, _ = s.WithDetails(&errdetails.ResourceInfo{ResourceType: sessionResourceType, ResourceName: name})
return s.Err()
}
// TestSessionPoolConfigValidation tests session pool config validation.
func TestSessionPoolConfigValidation(t *testing.T) {
t.Parallel()
_, client, teardown := setupMockedTestServer(t)
defer teardown()
for _, test := range []struct {
spc SessionPoolConfig
err error
}{
{
SessionPoolConfig{
MinOpened: 10,
MaxOpened: 5,
},
errMinOpenedGTMaxOpened(5, 10),
},
{
SessionPoolConfig{
WriteSessions: -0.1,
},
errWriteFractionOutOfRange(-0.1),
},
{
SessionPoolConfig{
WriteSessions: 2.0,
},
errWriteFractionOutOfRange(2.0),
},
{
SessionPoolConfig{
HealthCheckWorkers: -1,
},
errHealthCheckWorkersNegative(-1),
},
{
SessionPoolConfig{
HealthCheckInterval: -time.Second,
},
errHealthCheckIntervalNegative(-time.Second),
},
} {
if _, err := newSessionPool(client.sc, test.spc); !testEqual(err, test.err) {
t.Fatalf("want %v, got %v", test.err, err)
}
}
}
// TestSessionCreation tests session creation during sessionPool.Take().
func TestSessionCreation(t *testing.T) {
t.Parallel()
ctx := context.Background()
_, client, teardown := setupMockedTestServer(t)
defer teardown()
sp := client.idleSessions
// Take three sessions from session pool, this should trigger session pool
// to create SessionPoolConfig.incStep new sessions.
shs := make([]*sessionHandle, 3)
for i := 0; i < len(shs); i++ {
var err error
shs[i], err = sp.take(ctx)
if err != nil {
t.Fatalf("failed to get session(%v): %v", i, err)
}
}
// Wait until session creation has seized.
timeout := time.After(4 * time.Second)
var numBeingCreated uint64
loop:
for {
sp.mu.Lock()
numBeingCreated = sp.createReqs
sp.mu.Unlock()
select {
case <-timeout:
t.Fatalf("timed out, still %d session(s) being created, want %d", numBeingCreated, 0)
default:
if numBeingCreated == 0 {
break loop
}
}
}
for _, sh := range shs {
if _, err := sh.getClient().GetSession(context.Background(), &sppb.GetSessionRequest{
Name: sh.getID(),
}); err != nil {
t.Fatalf("error getting expected session from server: %v", err)
}
}
// Verify that created sessions are recorded correctly in session pool.
sp.mu.Lock()
if sp.numOpened != sp.incStep {
t.Fatalf("session pool reports %v open sessions, want %v", sp.numOpened, sp.incStep)
}
if sp.createReqs != 0 {
t.Fatalf("session pool reports %v session create requests, want 0", int(sp.createReqs))
}
sp.mu.Unlock()
// Verify that created sessions are tracked correctly by healthcheck queue.
hc := sp.hc
hc.mu.Lock()
if uint64(hc.queue.Len()) != sp.incStep {
t.Fatalf("healthcheck queue length = %v, want %v", hc.queue.Len(), sp.incStep)
}
hc.mu.Unlock()
}
// TestLIFOSessionOrder tests if session pool hand out sessions in LIFO order.
func TestLIFOSessionOrder(t *testing.T) {
t.Parallel()
ctx := context.Background()
_, client, teardown := setupMockedTestServerWithConfig(t,
ClientConfig{
SessionPoolConfig: SessionPoolConfig{
MaxOpened: 3,
MinOpened: 3,
},
})
defer teardown()
sp := client.idleSessions
// Create/take three sessions and recycle them.
shs, shsIDs := make([]*sessionHandle, 3), make([]string, 3)
for i := 0; i < len(shs); i++ {
var err error
if shs[i], err = sp.take(ctx); err != nil {
t.Fatalf("failed to take session(%v): %v", i, err)
}
shsIDs[i] = shs[i].getID()
}
for i := 0; i < len(shs); i++ {
shs[i].recycle()
}
for i := 2; i >= 0; i-- {
sh, err := sp.take(ctx)
if err != nil {
t.Fatalf("cannot take session from session pool: %v", err)
}
// check, if sessions returned in LIFO order.
if wantID, gotID := shsIDs[i], sh.getID(); wantID != gotID {
t.Fatalf("got session with id: %v, want: %v", gotID, wantID)
}
}
}
// TestLIFOTakeWriteSessionOrder tests if write session pool hand out sessions in LIFO order.
func TestLIFOTakeWriteSessionOrder(t *testing.T) {
t.Parallel()
ctx := context.Background()
_, client, teardown := setupMockedTestServerWithConfig(t,
ClientConfig{
SessionPoolConfig: SessionPoolConfig{
MaxOpened: 3,
MinOpened: 3,
// Set the write fraction to 0 to ensure the write sessions are
// only created on demand, which will guarantee the exact order
// in which we receive the sessions.
WriteSessions: 0,
},
})
defer teardown()
sp := client.idleSessions
// Create/take three sessions and recycle them.
shs, shsIDs := make([]*sessionHandle, 3), make([]string, 3)
for i := 0; i < len(shs); i++ {
var err error
if shs[i], err = sp.takeWriteSession(ctx); err != nil {
t.Fatalf("failed to take session(%v): %v", i, err)
}
shsIDs[i] = shs[i].getID()
}
for i := 0; i < len(shs); i++ {
shs[i].recycle()
}
for i := 2; i >= 0; i-- {
ws, err := sp.takeWriteSession(ctx)
if err != nil {
t.Fatalf("cannot take session from session pool: %v", err)
}
// check, if write sessions returned in LIFO order.
if wantID, gotID := shsIDs[i], ws.getID(); wantID != gotID {
t.Fatalf("got session with id: %v, want: %v", gotID, wantID)
}
}
}
// TestTakeFromIdleList tests taking sessions from session pool's idle list.
func TestTakeFromIdleList(t *testing.T) {
t.Parallel()
ctx := context.Background()
// Make sure maintainer keeps the idle sessions.
server, client, teardown := setupMockedTestServerWithConfig(t,
ClientConfig{
SessionPoolConfig: SessionPoolConfig{MaxIdle: 10, MaxOpened: 10},
})
defer teardown()
sp := client.idleSessions
// Take ten sessions from session pool and recycle them.
shs := make([]*sessionHandle, 10)
for i := 0; i < len(shs); i++ {
var err error
shs[i], err = sp.take(ctx)
if err != nil {
t.Fatalf("failed to get session(%v): %v", i, err)
}
}
// Make sure it's sampled once before recycling, otherwise it will be
// cleaned up.
<-time.After(sp.SessionPoolConfig.healthCheckSampleInterval)
for i := 0; i < len(shs); i++ {
shs[i].recycle()
}
// Further session requests from session pool won't cause mockclient to
// create more sessions.
wantSessions := server.TestSpanner.DumpSessions()
// Take ten sessions from session pool again, this time all sessions should
// come from idle list.
gotSessions := map[string]bool{}
for i := 0; i < len(shs); i++ {
sh, err := sp.take(ctx)
if err != nil {
t.Fatalf("cannot take session from session pool: %v", err)
}
gotSessions[sh.getID()] = true
}
if len(gotSessions) != 10 {
t.Fatalf("got %v unique sessions, want 10", len(gotSessions))
}
if !testEqual(gotSessions, wantSessions) {
t.Fatalf("got sessions: %v, want %v", gotSessions, wantSessions)
}
}
// TesttakeWriteSessionFromIdleList tests taking write sessions from session
// pool's idle list.
func TestTakeWriteSessionFromIdleList(t *testing.T) {
t.Parallel()
ctx := context.Background()
// Make sure maintainer keeps the idle sessions.
server, client, teardown := setupMockedTestServerWithConfig(t,
ClientConfig{
SessionPoolConfig: SessionPoolConfig{MaxIdle: 10, MaxOpened: 10},
})
defer teardown()
sp := client.idleSessions
// Take ten sessions from session pool and recycle them.
shs := make([]*sessionHandle, 10)
for i := 0; i < len(shs); i++ {
var err error
shs[i], err = sp.takeWriteSession(ctx)
if err != nil {
t.Fatalf("failed to get session(%v): %v", i, err)
}
}
// Make sure it's sampled once before recycling, otherwise it will be
// cleaned up.
<-time.After(sp.SessionPoolConfig.healthCheckSampleInterval)
for i := 0; i < len(shs); i++ {
shs[i].recycle()
}
// Further session requests from session pool won't cause mockclient to
// create more sessions.
wantSessions := server.TestSpanner.DumpSessions()
// Take ten sessions from session pool again, this time all sessions should
// come from idle list.
gotSessions := map[string]bool{}
for i := 0; i < len(shs); i++ {
sh, err := sp.takeWriteSession(ctx)
if err != nil {
t.Fatalf("cannot take session from session pool: %v", err)
}
gotSessions[sh.getID()] = true
}
if len(gotSessions) != 10 {
t.Fatalf("got %v unique sessions, want 10", len(gotSessions))
}
if !testEqual(gotSessions, wantSessions) {
t.Fatalf("got sessions: %v, want %v", gotSessions, wantSessions)
}
}
// TestTakeFromIdleListChecked tests taking sessions from session pool's idle
// list, but with a extra ping check.
func TestTakeFromIdleListChecked(t *testing.T) {
t.Parallel()
ctx := context.Background()
// Make sure maintainer keeps the idle sessions.
server, client, teardown := setupMockedTestServerWithConfig(t,
ClientConfig{
SessionPoolConfig: SessionPoolConfig{
WriteSessions: 0.0,
MaxIdle: 1,
HealthCheckInterval: 50 * time.Millisecond,
healthCheckSampleInterval: 10 * time.Millisecond,
},
})
defer teardown()
sp := client.idleSessions
// Stop healthcheck workers to simulate slow pings.
sp.hc.close()
// Create a session and recycle it.
sh, err := sp.take(ctx)
if err != nil {
t.Fatalf("failed to get session: %v", err)
}
// Wait until all session creation has finished.
waitFor(t, func() error {
sp.mu.Lock()
// WriteSessions = 0, so we only have to check for read sessions.
numOpened := uint64(sp.idleList.Len())
sp.mu.Unlock()
if numOpened < sp.SessionPoolConfig.incStep-1 {
return fmt.Errorf("creation not yet finished")
}
return nil
})
// Force ping during the first take() by setting check time to the past.
sp.hc.mu.Lock()
sh.session.nextCheck = time.Now().Add(-time.Minute)
sp.hc.mu.Unlock()
wantSid := sh.getID()
sh.recycle()
// Two back-to-back session requests, both of them should return the same
// session created before, but only the first of them should trigger a session ping.
for i := 0; i < 2; i++ {
// Take the session from the idle list and recycle it.
sh, err = sp.take(ctx)
if err != nil {
t.Fatalf("%v - failed to get session: %v", i, err)
}
if gotSid := sh.getID(); gotSid != wantSid {
t.Fatalf("%v - got session id: %v, want %v", i, gotSid, wantSid)
}
// The two back-to-back session requests shouldn't trigger any session
// pings because sessionPool.Take reschedules the next healthcheck.
if got, want := server.TestSpanner.DumpPings(), ([]string{wantSid}); !testEqual(got, want) {
t.Fatalf("%v - got ping session requests: %v, want %v", i, got, want)
}
sh.recycle()
}
// Inject session error to server stub, and take the session from the
// session pool, the old session should be destroyed and the session pool
// will create a new session.
server.TestSpanner.PutExecutionTime(MethodExecuteSql,
SimulatedExecutionTime{
Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")},
})
// Force ping by setting check time in the past.
s := sp.idleList.Front().Value.(*session)
s.nextCheck = time.Now().Add(-time.Minute)
// take will take the idle session. Then it will send a GetSession request
// to check if it's healthy. It'll discover that it's not healthy
// (NotFound) and drop it. No new session will be created as MinOpened=0.
sh, err = sp.take(ctx)
if err != nil {
t.Fatalf("failed to get session: %v", err)
}
ds := server.TestSpanner.DumpSessions()
if g, w := uint64(len(ds)), sp.incStep-1; g != w {
t.Fatalf("number of sessions from mock server mismatch\nGot: %v\nWant: %v\n", g, w)
}
if sh.getID() == wantSid {
t.Fatalf("sessionPool.Take still returns the same session %v, want it to create a new one", wantSid)
}
}
// TestTakeFromIdleWriteListChecked tests taking sessions from session pool's
// idle list, but with a extra ping check.
func TestTakeFromIdleWriteListChecked(t *testing.T) {
t.Parallel()
ctx := context.Background()
// Make sure maintainer keeps the idle sessions.
server, client, teardown := setupMockedTestServerWithConfig(t,
ClientConfig{
SessionPoolConfig: SessionPoolConfig{
MaxIdle: 1,
HealthCheckInterval: 50 * time.Millisecond,
healthCheckSampleInterval: 10 * time.Millisecond,
},
})
defer teardown()
sp := client.idleSessions
// Stop healthcheck workers to simulate slow pings.
sp.hc.close()
// Create a session and recycle it.
sh, err := sp.takeWriteSession(ctx)
if err != nil {
t.Fatalf("failed to get session: %v", err)
}
wantSid := sh.getID()
// Set the next check in the past to ensure the next take() call will
// trigger a health check.
sh.session.nextCheck = time.Now().Add(-time.Minute)
sh.recycle()
// Two back-to-back session requests, both of them should return the same
// session created before and only the first of them should trigger a
// session ping.
for i := 0; i < 2; i++ {
// Take the session from the idle list and recycle it.
sh, err = sp.takeWriteSession(ctx)
if err != nil {
t.Fatalf("%v - failed to get session: %v", i, err)
}
if gotSid := sh.getID(); gotSid != wantSid {
t.Fatalf("%v - got session id: %v, want %v", i, gotSid, wantSid)
}
// The two back-to-back session requests shouldn't trigger any session
// pings because sessionPool.Take reschedules the next healthcheck.
if got, want := server.TestSpanner.DumpPings(), ([]string{wantSid}); !testEqual(got, want) {
t.Fatalf("%v - got ping session requests: %v, want %v", i, got, want)
}
sh.recycle()
}
// Inject session error to mockclient, and take the session from the
// session pool, the old session should be destroyed and the session pool
// will create a new session.
server.TestSpanner.PutExecutionTime(MethodExecuteSql,
SimulatedExecutionTime{
Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")},
})
// Force ping by setting check time in the past.
s := sp.idleList.Front().Value.(*session)
s.nextCheck = time.Now().Add(-time.Minute)
sh, err = sp.takeWriteSession(ctx)
if err != nil {
t.Fatalf("failed to get session: %v", err)
}
ds := server.TestSpanner.DumpSessions()
if g, w := uint64(len(ds)), sp.incStep-1; g != w {
t.Fatalf("number of sessions from mock server mismatch\nGot: %v\nWant: %v\n", g, w)
}
if sh.getID() == wantSid {
t.Fatalf("sessionPool.Take still returns the same session %v, want it to create a new one", wantSid)
}
}
// TestSessionLeak tests leaking a session and getting the stack of the
// goroutine that leaked it.
func TestSessionLeak(t *testing.T) {
t.Parallel()
ctx := context.Background()
_, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{
SessionPoolConfig: SessionPoolConfig{
TrackSessionHandles: true,
MinOpened: 0,
MaxOpened: 1,
},
})
defer teardown()
// Execute a query without calling rowIterator.Stop. This will cause the
// session not to be returned to the pool.
single := client.Single()
iter := single.Query(ctx, NewStatement(SelectFooFromBar))
for {
_, err := iter.Next()
if err == iterator.Done {
break
}
if err != nil {
t.Fatalf("Got unexpected error while iterating results: %v\n", err)
}
}
// The session should not have been returned to the pool.
if g, w := client.idleSessions.idleList.Len(), 0; g != w {
t.Fatalf("Idle sessions count mismatch\nGot: %d\nWant: %d\n", g, w)
}
// The checked out session should contain a stack trace.
if single.sh.stack == nil {
t.Fatalf("Missing stacktrace from session handle")
}
stack := fmt.Sprintf("%s", single.sh.stack)
testMethod := "TestSessionLeak"
if !strings.Contains(stack, testMethod) {
t.Fatalf("Stacktrace does not contain '%s'\nGot: %s", testMethod, stack)
}
// Return the session to the pool.
iter.Stop()
// The stack should now have been removed from the session handle.
if single.sh.stack != nil {
t.Fatalf("Got unexpected stacktrace in session handle: %s", single.sh.stack)
}
// Do another query and hold on to the session.
single = client.Single()
iter = single.Query(ctx, NewStatement(SelectFooFromBar))
for {
_, err := iter.Next()
if err == iterator.Done {
break
}
if err != nil {
t.Fatalf("Got unexpected error while iterating results: %v\n", err)
}
}
// Try to do another query. This will fail as MaxOpened=1.
ctxWithTimeout, cancel := context.WithTimeout(ctx, time.Millisecond*10)
defer cancel()
single2 := client.Single()
iter2 := single2.Query(ctxWithTimeout, NewStatement(SelectFooFromBar))
_, gotErr := iter2.Next()
wantErr := client.idleSessions.errGetSessionTimeoutWithTrackedSessionHandles(codes.DeadlineExceeded)
// The error should contain the stacktraces of all the checked out
// sessions.
if !testEqual(gotErr, wantErr) {
t.Fatalf("Error mismatch on iterating result set.\nGot: %v\nWant: %v\n", gotErr, wantErr)
}
if !strings.Contains(gotErr.Error(), testMethod) {
t.Fatalf("Error does not contain '%s'\nGot: %s", testMethod, gotErr.Error())
}
// Close iterators to check sessions back into the pool before closing.
iter2.Stop()
iter.Stop()
}
// TestMaxOpenedSessions tests max open sessions constraint.
func TestMaxOpenedSessions(t *testing.T) {
t.Parallel()
ctx := context.Background()
_, client, teardown := setupMockedTestServerWithConfig(t,
ClientConfig{
SessionPoolConfig: SessionPoolConfig{
MaxOpened: 1,
},
})
defer teardown()
sp := client.idleSessions
sh1, err := sp.take(ctx)
if err != nil {
t.Fatalf("cannot take session from session pool: %v", err)
}
// Session request will timeout due to the max open sessions constraint.
ctx2, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
defer cancel()
_, gotErr := sp.take(ctx2)
if wantErr := sp.errGetBasicSessionTimeout(codes.DeadlineExceeded); !testEqual(gotErr, wantErr) {
t.Fatalf("the second session retrival returns error %v, want %v", gotErr, wantErr)
}
doneWaiting := make(chan struct{})
go func() {
// Destroy the first session to allow the next session request to
// proceed.
<-doneWaiting
sh1.destroy()
}()
go func() {
// Wait a short random time before destroying the session handle.
<-time.After(10 * time.Millisecond)
close(doneWaiting)
}()
// Now session request can be processed because the first session will be
// destroyed.
ctx3, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
sh2, err := sp.take(ctx3)
if err != nil {
t.Fatalf("after the first session is destroyed, session retrival still returns error %v, want nil", err)
}
if !sh2.session.isValid() || sh2.getID() == "" {
t.Fatalf("got invalid session: %v", sh2.session)
}
}
// TestMinOpenedSessions tests min open session constraint.
func TestMinOpenedSessions(t *testing.T) {
t.Parallel()
ctx := context.Background()
_, client, teardown := setupMockedTestServerWithConfig(t,
ClientConfig{
SessionPoolConfig: SessionPoolConfig{
MinOpened: 1,
healthCheckSampleInterval: time.Millisecond,
},
})
defer teardown()
sp := client.idleSessions
// Take ten sessions from session pool and recycle them.
var ss []*session
var shs []*sessionHandle
for i := 0; i < 10; i++ {
sh, err := sp.take(ctx)
if err != nil {
t.Fatalf("failed to get session(%v): %v", i, err)
}
ss = append(ss, sh.session)
shs = append(shs, sh)
sh.recycle()
}
for _, sh := range shs {
sh.recycle()
}
// Simulate session expiration.
for _, s := range ss {
s.destroy(true)
}
// Wait until the maintainer has had a chance to replenish the pool.
for i := 0; i < 10; i++ {
sp.mu.Lock()
if sp.numOpened > 0 {
sp.mu.Unlock()
break
}
sp.mu.Unlock()
<-time.After(sp.healthCheckSampleInterval)
}
sp.mu.Lock()
defer sp.mu.Unlock()
// There should be still one session left in either the idle list or in one
// of the other opened states due to the min open sessions constraint.
if (sp.idleList.Len() +
sp.idleWriteList.Len() +
int(sp.prepareReqs) +
int(sp.createReqs)) != 1 {
t.Fatalf(
"got %v sessions in idle lists, want 1. Opened: %d, read: %d, "+
"write: %d, in preparation: %d, in creation: %d",
sp.idleList.Len()+sp.idleWriteList.Len(), sp.numOpened,
sp.idleList.Len(), sp.idleWriteList.Len(), sp.prepareReqs,
sp.createReqs)
}
}
// TestMaxBurst tests max burst constraint.
func TestMaxBurst(t *testing.T) {
t.Parallel()
ctx := context.Background()
server, client, teardown := setupMockedTestServerWithConfig(t,
ClientConfig{
SessionPoolConfig: SessionPoolConfig{
MaxBurst: 1,
},
})
defer teardown()
sp := client.idleSessions
// Will cause session creation RPC to be retried forever.
server.TestSpanner.PutExecutionTime(MethodBatchCreateSession,
SimulatedExecutionTime{
Errors: []error{status.Errorf(codes.Unavailable, "try later")},
KeepError: true,
})
// This session request will never finish until the injected error is
// cleared.
go sp.take(ctx)
// Poll for the execution of the first session request.
for {
sp.mu.Lock()
cr := sp.createReqs
sp.mu.Unlock()
if cr == 0 {
<-time.After(time.Second)
continue
}
// The first session request is being executed.
break
}
ctx2, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
_, gotErr := sp.take(ctx2)
// Since MaxBurst == 1, the second session request should block.
if wantErr := sp.errGetBasicSessionTimeout(codes.DeadlineExceeded); !testEqual(gotErr, wantErr) {
t.Fatalf("session retrival returns error %v, want %v", gotErr, wantErr)
}
// Let the first session request succeed.
server.TestSpanner.Freeze()
server.TestSpanner.PutExecutionTime(MethodBatchCreateSession, SimulatedExecutionTime{})
server.TestSpanner.Unfreeze()
// Now new session request can proceed because the first session request will eventually succeed.
sh, err := sp.take(ctx)
if err != nil {
t.Fatalf("session retrival returns error %v, want nil", err)
}
if !sh.session.isValid() || sh.getID() == "" {
t.Fatalf("got invalid session: %v", sh.session)
}
}
// TestSessionRecycle tests recycling sessions.
func TestSessionRecycle(t *testing.T) {
t.Parallel()
ctx := context.Background()
// Set MaxBurst=MinOpened to prevent additional sessions to be created
// while session pool initialization is still running.
_, client, teardown := setupMockedTestServerWithConfig(t,
ClientConfig{
SessionPoolConfig: SessionPoolConfig{
MinOpened: 1,
MaxIdle: 5,
MaxBurst: 1,
},
})
defer teardown()
sp := client.idleSessions
// Test session is correctly recycled and reused.
for i := 0; i < 20; i++ {
s, err := sp.take(ctx)
if err != nil {
t.Fatalf("cannot get the session %v: %v", i, err)
}
s.recycle()
}
sp.mu.Lock()
defer sp.mu.Unlock()
// The session pool should only contain 1 session, as there is no minimum
// configured. In addition, there has never been more than one session in
// use at any time, so there's no need for the session pool to create a
// second session. The session has also been in use all the time, so there
// also no reason for the session pool to delete the session.
if sp.numOpened != 1 {
t.Fatalf("Expect session pool size 1, got %d", sp.numOpened)
}
}
// TestSessionDestroy tests destroying sessions.
func TestSessionDestroy(t *testing.T) {
t.Parallel()
ctx := context.Background()
_, client, teardown := setupMockedTestServerWithConfig(t,
ClientConfig{
SessionPoolConfig: SessionPoolConfig{
MinOpened: 1,
MaxBurst: 1,
},
})
defer teardown()
sp := client.idleSessions
// Creating a session pool with MinSessions=1 will automatically start the
// creation of 1 session when the session pool is created. As MaxBurst=1,
// the session pool will never create more than 1 session at a time, so the
// take() method will wait if the initial session has not yet been created.
sh, err := sp.take(ctx)
if err != nil {
t.Fatalf("cannot get session from session pool: %v", err)
}
s := sh.session
sh.recycle()
if d := s.destroy(true); d || !s.isValid() {
// Session should be remaining because of min open sessions constraint.
t.Fatalf("session %s invalid, want it to stay alive. (destroy in expiration mode, success: %v)", s.id, d)
}
if d := s.destroy(false); !d || s.isValid() {
// Session should be destroyed.
t.Fatalf("failed to destroy session %s. (destroy in default mode, success: %v)", s.id, d)
}
}
// TestHcHeap tests heap operation on top of hcHeap.
func TestHcHeap(t *testing.T) {
in := []*session{
{nextCheck: time.Unix(10, 0)},
{nextCheck: time.Unix(0, 5)},
{nextCheck: time.Unix(1, 8)},
{nextCheck: time.Unix(11, 7)},
{nextCheck: time.Unix(6, 3)},
}
want := []*session{
{nextCheck: time.Unix(1, 8), hcIndex: 0},
{nextCheck: time.Unix(6, 3), hcIndex: 1},
{nextCheck: time.Unix(8, 2), hcIndex: 2},
{nextCheck: time.Unix(10, 0), hcIndex: 3},
{nextCheck: time.Unix(11, 7), hcIndex: 4},
}
hh := hcHeap{}
for _, s := range in {
heap.Push(&hh, s)
}
// Change top of the heap and do a adjustment.
hh.sessions[0].nextCheck = time.Unix(8, 2)
heap.Fix(&hh, 0)
for idx := 0; hh.Len() > 0; idx++ {
got := heap.Pop(&hh).(*session)
want[idx].hcIndex = -1
if !testEqual(got, want[idx]) {
t.Fatalf("%v: heap.Pop returns %v, want %v", idx, got, want[idx])
}
}
}
// TestHealthCheckScheduler tests if healthcheck workers can schedule and
// perform healthchecks properly.
func TestHealthCheckScheduler(t *testing.T) {
t.Parallel()
ctx := context.Background()
server, client, teardown := setupMockedTestServerWithConfig(t,
ClientConfig{
SessionPoolConfig: SessionPoolConfig{
HealthCheckInterval: 50 * time.Millisecond,
healthCheckSampleInterval: 10 * time.Millisecond,
},
})
defer teardown()
sp := client.idleSessions
// Create 50 sessions.
for i := 0; i < 50; i++ {
_, err := sp.take(ctx)
if err != nil {
t.Fatalf("cannot get session from session pool: %v", err)
}
}
// Make sure we start with a ping history to avoid that the first
// sessions that were created have not already exceeded the maximum
// number of pings.
server.TestSpanner.ClearPings()
// Wait for 10-30 pings per session.
waitFor(t, func() error {
// Only check actually live sessions and ignore any sessions the
// session pool may have deleted in the meantime.
liveSessions := server.TestSpanner.DumpSessions()
dp := server.TestSpanner.DumpPings()
gotPings := map[string]int64{}
for _, p := range dp {
gotPings[p]++
}
for s := range liveSessions {
want := int64(20)
if got := gotPings[s]; got < want/2 || got > want+want/2 {
// This is an unnacceptable amount of pings.
return fmt.Errorf("got %v healthchecks on session %v, want it between (%v, %v)", got, s, want/2, want+want/2)
}
}
return nil
})
}
// TestHealthCheck_FirstHealthCheck tests if the first healthcheck scheduling
// works properly.
func TestHealthCheck_FirstHealthCheck(t *testing.T) {
t.Parallel()
_, client, teardown := setupMockedTestServerWithConfig(t,
ClientConfig{
SessionPoolConfig: SessionPoolConfig{
MaxOpened: 0,
MinOpened: 0,
HealthCheckInterval: 50 * time.Minute,
},
})
defer teardown()
sp := client.idleSessions
now := time.Now()
start := now.Add(time.Duration(float64(sp.hc.interval) * 0.2))
// A second is added to avoid the edge case.
end := now.Add(time.Duration(float64(sp.hc.interval)*1.1) + time.Second)
s := &session{}
sp.hc.scheduledHCLocked(s)
if s.nextCheck.Before(start) || s.nextCheck.After(end) {
t.Fatalf("The first healthcheck schedule is not in the correct range: %v", s.nextCheck)
}
if !s.firstHCDone {
t.Fatal("The flag 'firstHCDone' should be set to true after the first healthcheck.")
}
}
// TestHealthCheck_NonFirstHealthCheck tests if the scheduling after the first
// health check works properly.
func TestHealthCheck_NonFirstHealthCheck(t *testing.T) {
t.Parallel()
_, client, teardown := setupMockedTestServerWithConfig(t,
ClientConfig{
SessionPoolConfig: SessionPoolConfig{
MaxOpened: 0,
MinOpened: 0,
HealthCheckInterval: 50 * time.Minute,
},
})
defer teardown()
sp := client.idleSessions
now := time.Now()
start := now.Add(time.Duration(float64(sp.hc.interval) * 0.9))
// A second is added to avoid the edge case.
end := now.Add(time.Duration(float64(sp.hc.interval)*1.1) + time.Second)
s := &session{firstHCDone: true}
sp.hc.scheduledHCLocked(s)
if s.nextCheck.Before(start) || s.nextCheck.After(end) {
t.Fatalf("The non-first healthcheck schedule is not in the correct range: %v", s.nextCheck)
}
}
// Tests that a fractions of sessions are prepared for write by health checker.
func TestWriteSessionsPrepared(t *testing.T) {
t.Parallel()
ctx := context.Background()
_, client, teardown := setupMockedTestServerWithConfig(t,
ClientConfig{
SessionPoolConfig: SessionPoolConfig{
WriteSessions: 0.5,
MaxIdle: 200,
HealthCheckInterval: time.Nanosecond,
},
})
defer teardown()
sp := client.idleSessions
shs := make([]*sessionHandle, 100)
var err error
for i := 0; i < 100; i++ {
shs[i], err = sp.take(ctx)
if err != nil {
t.Fatalf("cannot get session from session pool: %v", err)
}
}
// Now there are 100 sessions in the pool. Release them.
for _, sh := range shs {
sh.recycle()
}
// Take 50 write sessions. The write sessions will be taken from either the
// list of prepared sessions (idleWriteList), or they will be prepared
// during the takeWriteSession method.
wshs := make([]*sessionHandle, 50)
for i := 0; i < 50; i++ {
wshs[i], err = sp.takeWriteSession(ctx)
if err != nil {
t.Fatalf("cannot get session from session pool: %v", err)
}
if wshs[i].getTransactionID() == nil {
t.Fatalf("got nil transaction id from session pool")
}
}
// Return the session to the pool.
for _, sh := range wshs {
sh.recycle()
}
// Now force creation of 100 more sessions.
shs = make([]*sessionHandle, 200)
for i := 0; i < 200; i++ {
shs[i], err = sp.take(ctx)
if err != nil {
t.Fatalf("cannot get session from session pool: %v", err)
}
}
// Now there are 200 sessions in the pool. Release them.
for _, sh := range shs {
sh.recycle()
}
// The health checker should eventually prepare 100 of the 200 sessions with
// a r/w tx.
waitUntil := time.After(time.Second)
var numWritePrepared int
for numWritePrepared < 100 {
select {
case <-waitUntil:
break
default:
}
sp.mu.Lock()
numWritePrepared = sp.idleWriteList.Len()
sp.mu.Unlock()
}
sp.mu.Lock()
defer sp.mu.Unlock()
if sp.idleWriteList.Len() != 100 {
t.Fatalf("Expect 100 write prepared session, got: %d", sp.idleWriteList.Len())
}
}
// TestTakeFromWriteQueue tests that sessionPool.take() returns write prepared
// sessions as well.
func TestTakeFromWriteQueue(t *testing.T) {
t.Parallel()
ctx := context.Background()
_, client, teardown := setupMockedTestServerWithConfig(t,
ClientConfig{
SessionPoolConfig: SessionPoolConfig{
MaxOpened: 1,
WriteSessions: 1.0,
MaxIdle: 1,
HealthCheckInterval: time.Nanosecond,
},
})
defer teardown()
sp := client.idleSessions
sh, err := sp.take(ctx)
if err != nil {
t.Fatalf("cannot get session from session pool: %v", err)
}
sh.recycle()
// Wait until the health checker has write-prepared the session.
waitUntil := time.After(time.Second)
var numWritePrepared int
for numWritePrepared == 0 {
select {
case <-waitUntil:
break
default:
}
sp.mu.Lock()
numWritePrepared = sp.idleWriteList.Len()
sp.mu.Unlock()
}
// The session should now be in write queue but take should also return it.
sp.mu.Lock()
if sp.idleWriteList.Len() == 0 {
t.Fatalf("write queue unexpectedly empty")
}
if sp.idleList.Len() != 0 {
t.Fatalf("read queue not empty")
}
sp.mu.Unlock()
sh, err = sp.take(ctx)
if err != nil {
t.Fatalf("cannot get session from session pool: %v", err)
}
sh.recycle()
}
// The session pool should stop trying to create write-prepared sessions if a
// non-transient error occurs while trying to begin a transaction. The
// process for preparing write sessions should automatically be re-enabled if
// a BeginTransaction call initiated by takeWriteSession succeeds.
//
// The only exception to the above is that a 'Session not found' error should
// cause the session to be removed from the session pool, and it should not
// affect the background process of preparing sessions.
func TestErrorOnPrepareSession(t *testing.T) {
t.Parallel()
serverErrors := []error{
status.Errorf(codes.PermissionDenied, "Caller is missing IAM permission spanner.databases.beginOrRollbackReadWriteTransaction on resource"),
status.Errorf(codes.NotFound, `Database not found: projects/<project>/instances/<instance>/databases/<database> resource_type: "type.googleapis.com/google.spanner.admin.database.v1.Database" resource_name: "projects/<project>/instances/<instance>/databases/<database>" description: "Database does not exist."`),
status.Errorf(codes.FailedPrecondition, "Invalid transaction option"),
status.Errorf(codes.Internal, "Unknown server error"),
}
logger := log.New(os.Stderr, "", log.LstdFlags)
for _, serverErr := range serverErrors {
ctx := context.Background()
server, client, teardown := setupMockedTestServerWithConfig(t,
ClientConfig{
SessionPoolConfig: SessionPoolConfig{
MinOpened: 10,
MaxOpened: 10,
WriteSessions: 0.5,
HealthCheckInterval: time.Millisecond,
},
logger: logger,
})
defer teardown()
// Discard logging until trying to prepare sessions has stopped.
logger.SetOutput(ioutil.Discard)
server.TestSpanner.PutExecutionTime(MethodBeginTransaction, SimulatedExecutionTime{
Errors: []error{serverErr},
KeepError: true,
})
sp := client.idleSessions
// Wait until the health checker has tried to write-prepare a session.
// This will cause the session pool to write some errors to the log that
// preparing sessions failed.
waitUntil := time.After(time.Second)
var prepareDisabled bool
var numOpened int
waitForPrepare:
for !prepareDisabled || numOpened < 10 {
select {
case <-waitUntil:
break waitForPrepare
default:
}
sp.mu.Lock()
prepareDisabled = sp.disableBackgroundPrepareSessions
numOpened = sp.idleList.Len()
sp.mu.Unlock()
}
// Re-enable logging.
logger.SetOutput(os.Stderr)
// There should be no write-prepared sessions.
sp.mu.Lock()
if sp.idleWriteList.Len() != 0 {
sp.mu.Unlock()
t.Fatalf("write queue unexpectedly not empty")
}
// All sessions should be in the read idle list.
if g, w := sp.idleList.Len(), 10; g != w {
sp.mu.Unlock()
t.Fatalf("session count mismatch:\nWant: %v\nGot: %v", w, g)
}
sp.mu.Unlock()
// Take a read session should succeed.
sh, err := sp.take(ctx)
if err != nil {
t.Fatalf("cannot get session from session pool: %v", err)
}
sh.recycle()
// Take a write session should fail with the server error.
_, err = sp.takeWriteSession(ctx)
if ErrCode(err) != ErrCode(serverErr) {
t.Fatalf("take write session failed with unexpected error.\nGot: %v\nWant: %v\n", err, serverErr)
}
// Clearing the error on the server should allow us to take a write
// session.
server.TestSpanner.PutExecutionTime(MethodBeginTransaction, SimulatedExecutionTime{})
sh, err = sp.takeWriteSession(ctx)
if err != nil {
t.Fatalf("cannot get write session from session pool: %v", err)
}
sh.recycle()
// The maintainer should also pick this up and prepare 50% of the sessions.
waitUntil = time.After(time.Second)
var numPrepared int
for numPrepared < 5 {
select {
case <-waitUntil:
break
default:
}
sp.mu.Lock()
numPrepared = sp.idleWriteList.Len()
sp.mu.Unlock()
}
sp.mu.Lock()
if g, w := sp.idleWriteList.Len(), 5; g != w {
sp.mu.Unlock()
t.Fatalf("write session count mismatch:\nWant: %v\nGot: %v", w, g)
}
sp.mu.Unlock()
}
}
// The session pool should continue to try to create write-prepared sessions if
// a 'Session not found' error occurs. The session that has been deleted by
// backend should be removed from the pool, and the maintainer should create a
// new session if this causes the number of sessions in the pool to fall below
// MinOpened.
func TestSessionNotFoundOnPrepareSession(t *testing.T) {
t.Parallel()
// The server will return 'Session not found' for the first 8
// BeginTransaction calls.
sessionNotFoundErr := newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")
serverErrors := make([]error, 8)
for i := range serverErrors {
serverErrors[i] = sessionNotFoundErr
}
ctx := context.Background()
logger := log.New(os.Stderr, "", log.LstdFlags)
server, client, teardown := setupMockedTestServerWithConfig(t,
ClientConfig{
SessionPoolConfig: SessionPoolConfig{
MinOpened: 10,
MaxOpened: 10,
WriteSessions: 0.5,
HealthCheckInterval: time.Millisecond,
healthCheckSampleInterval: time.Millisecond,
},
logger: logger,
})
defer teardown()
// Discard logging until trying to prepare sessions has stopped.
logger.SetOutput(ioutil.Discard)
server.TestSpanner.PutExecutionTime(MethodBeginTransaction, SimulatedExecutionTime{
Errors: serverErrors,
})
sp := client.idleSessions
// Wait until the health checker has tried to write-prepare the sessions.
waitUntil := time.After(5 * time.Second)
var numWriteSessions int
var numReadSessions int
waitForPrepare:
for (numWriteSessions+numReadSessions) < 10 || numWriteSessions < 5 {
select {
case <-waitUntil:
break waitForPrepare
default:
}
sp.mu.Lock()
numReadSessions = sp.idleList.Len()
numWriteSessions = sp.idleWriteList.Len()
sp.mu.Unlock()
}
// Re-enable logging.
logger.SetOutput(os.Stderr)
// There should be at least 5 write-prepared sessions.
sp.mu.Lock()
if g, w := sp.idleWriteList.Len(), 5; g < w {
sp.mu.Unlock()
t.Fatalf("write-prepared session count mismatch.\nWant at least: %v\nGot: %v", w, g)
}
// The other sessions should be in the read idle list.
if g, w := sp.idleList.Len()+sp.idleWriteList.Len(), 10; g != w {
sp.mu.Unlock()
t.Fatalf("total session count mismatch:\nWant: %v\nGot: %v", w, g)
}
sp.mu.Unlock()
// Take a read session should succeed.
sh, err := sp.take(ctx)
if err != nil {
t.Fatalf("cannot get session from session pool: %v", err)
}
sh.recycle()
// Take a write session should succeed.
sh, err = sp.takeWriteSession(ctx)
if err != nil {
t.Fatalf("take write session failed with unexpected error.\nGot: %v\nWant: %v\n", err, nil)
}
sh.recycle()
}
// TestSessionHealthCheck tests healthchecking cases.
func TestSessionHealthCheck(t *testing.T) {
t.Parallel()
ctx := context.Background()
server, client, teardown := setupMockedTestServerWithConfig(t,
ClientConfig{
SessionPoolConfig: SessionPoolConfig{
HealthCheckInterval: time.Nanosecond,
healthCheckSampleInterval: 10 * time.Millisecond,
incStep: 1,
},
})
defer teardown()
sp := client.idleSessions
// Test pinging sessions.
sh, err := sp.take(ctx)
if err != nil {
t.Fatalf("cannot get session from session pool: %v", err)
}
// Wait for healthchecker to send pings to session.
waitFor(t, func() error {
pings := server.TestSpanner.DumpPings()
if len(pings) == 0 || pings[0] != sh.getID() {
return fmt.Errorf("healthchecker didn't send any ping to session %v", sh.getID())
}
return nil
})
// Test broken session detection.
sh, err = sp.take(ctx)
if err != nil {
t.Fatalf("cannot get session from session pool: %v", err)
}
server.TestSpanner.Freeze()
server.TestSpanner.PutExecutionTime(MethodExecuteSql,
SimulatedExecutionTime{
Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")},
KeepError: true,
})
server.TestSpanner.Unfreeze()
s := sh.session
waitFor(t, func() error {
if sh.session.isValid() {
return fmt.Errorf("session(%v) is still alive, want it to be dropped by healthcheck workers", s)
}
return nil
})
server.TestSpanner.Freeze()
server.TestSpanner.PutExecutionTime(MethodExecuteSql, SimulatedExecutionTime{})
server.TestSpanner.Unfreeze()
// Test garbage collection.
sh, err = sp.take(ctx)
if err != nil {
t.Fatalf("cannot get session from session pool: %v", err)
}
sp.close()
if sh.session.isValid() {
t.Fatalf("session(%v) is still alive, want it to be garbage collected", s)
}
}
// TestStressSessionPool does stress test on session pool by the following concurrent operations:
// 1) Test worker gets a session from the pool.
// 2) Test worker turns a session back into the pool.
// 3) Test worker destroys a session got from the pool.
// 4) Healthcheck destroys a broken session (because a worker has already destroyed it).
// 5) Test worker closes the session pool.
//
// During the test, the session pool maintainer maintains the number of sessions,
// and it is expected that all sessions that are taken from session pool remains valid.
// When all test workers and healthcheck workers exit, mockclient, session pool
// and healthchecker should be in consistent state.
func TestStressSessionPool(t *testing.T) {
t.Parallel()
// Use concurrent workers to test different session pool built from different configurations.
for ti, cfg := range []SessionPoolConfig{
{},
{MinOpened: 10, MaxOpened: 100},
{MaxBurst: 50},
{MinOpened: 10, MaxOpened: 200, MaxBurst: 5},
{MinOpened: 10, MaxOpened: 200, MaxBurst: 5, WriteSessions: 0.2},
} {
// Create a more aggressive session healthchecker to increase test concurrency.
cfg.HealthCheckInterval = 50 * time.Millisecond
cfg.healthCheckSampleInterval = 10 * time.Millisecond
cfg.HealthCheckWorkers = 50
server, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{
SessionPoolConfig: cfg,
})
sp := client.idleSessions
// Create a test group for this configuration and schedule 100 sub
// sub tests within the group.
t.Run(fmt.Sprintf("TestStressSessionPoolGroup%v", ti), func(t *testing.T) {
for i := 0; i < 100; i++ {
idx := i
t.Logf("TestStressSessionPoolWithCfg%dWorker%03d", ti, idx)
testStressSessionPool(t, cfg, ti, idx, sp, client)
}
})
sp.hc.close()
// Here the states of healthchecker, session pool and mockclient are
// stable.
sp.mu.Lock()
idleSessions := map[string]bool{}
hcSessions := map[string]bool{}
mockSessions := server.TestSpanner.DumpSessions()
// Dump session pool's idle list.
for sl := sp.idleList.Front(); sl != nil; sl = sl.Next() {
s := sl.Value.(*session)
if idleSessions[s.getID()] {
t.Fatalf("%v: found duplicated session in idle list: %v", ti, s.getID())
}
idleSessions[s.getID()] = true
}
for sl := sp.idleWriteList.Front(); sl != nil; sl = sl.Next() {
s := sl.Value.(*session)
if idleSessions[s.getID()] {
t.Fatalf("%v: found duplicated session in idle write list: %v", ti, s.getID())
}
idleSessions[s.getID()] = true
}
if int(sp.numOpened) != len(idleSessions) {
t.Fatalf("%v: number of opened sessions (%v) != number of idle sessions (%v)", ti, sp.numOpened, len(idleSessions))
}
if sp.createReqs != 0 {
t.Fatalf("%v: number of pending session creations = %v, want 0", ti, sp.createReqs)
}
// Dump healthcheck queue.
sp.hc.mu.Lock()
for _, s := range sp.hc.queue.sessions {
if hcSessions[s.getID()] {
t.Fatalf("%v: found duplicated session in healthcheck queue: %v", ti, s.getID())
}
hcSessions[s.getID()] = true
}
sp.mu.Unlock()
sp.hc.mu.Unlock()
// Verify that idleSessions == hcSessions == mockSessions.
if !testEqual(idleSessions, hcSessions) {
t.Fatalf("%v: sessions in idle list (%v) != sessions in healthcheck queue (%v)", ti, idleSessions, hcSessions)
}
// The server may contain more sessions than the health check queue.
// This can be caused by a timeout client side during a CreateSession
// request. The request may still be received and executed by the
// server, but the session pool will not register the session.
for id, b := range hcSessions {
if b && !mockSessions[id] {
t.Fatalf("%v: session in healthcheck queue (%v) was not found on server", ti, id)
}
}
sp.close()
mockSessions = server.TestSpanner.DumpSessions()
for id, b := range hcSessions {
if b && mockSessions[id] {
// We only log a warning for this, as it sometimes happens.
// The exact reason for it is unknown, but in a real life
// situation the session would be garbage collected by the
// server after 60 minutes.
t.Logf("Found session from pool still live on server: %v", id)
}
}
teardown()
}
}
func testStressSessionPool(t *testing.T, cfg SessionPoolConfig, ti int, idx int, pool *sessionPool, client *Client) {
ctx := context.Background()
// Test worker iterates 1K times and tries different
// session / session pool operations.
for j := 0; j < 1000; j++ {
if idx%10 == 0 && j >= 900 {
// Close the pool in selected set of workers during the
// middle of the test.
pool.close()
}
// Take a write sessions ~ 20% of the times.
takeWrite := rand.Intn(5) == 4
var (
sh *sessionHandle
gotErr error
)
wasValid := pool.isValid()
if takeWrite {
sh, gotErr = pool.takeWriteSession(ctx)
} else {
sh, gotErr = pool.take(ctx)
}
if gotErr != nil {
if pool.isValid() {
t.Fatalf("%v.%v: pool.take returns error when pool is still valid: %v", ti, idx, gotErr)
}
// If the session pool was closed when we tried to take a session
// from the pool, then we should have gotten a specific error.
// If the session pool was closed between the take() and now (or
// even during a take()) then an error is ok.
if !wasValid {
if wantErr := errInvalidSessionPool; gotErr != wantErr {
t.Fatalf("%v.%v: got error when pool is closed: %v, want %v", ti, idx, gotErr, wantErr)
}
}
continue
}
// Verify if session is valid when session pool is valid.
// Note that if session pool is invalid after sh is taken,
// then sh might be invalidated by healthcheck workers.
if (sh.getID() == "" || sh.session == nil || !sh.session.isValid()) && pool.isValid() {
t.Fatalf("%v.%v.%v: pool.take returns invalid session %v", ti, idx, takeWrite, sh.session)
}
if takeWrite && sh.getTransactionID() == nil {
t.Fatalf("%v.%v: pool.takeWriteSession returns session %v without transaction", ti, idx, sh.session)
}
if rand.Intn(100) < idx {
// Random sleep before destroying/recycling the session,
// to give healthcheck worker a chance to step in.
<-time.After(time.Duration(rand.Int63n(int64(cfg.HealthCheckInterval))))
}
if rand.Intn(100) < idx {
// destroy the session.
sh.destroy()
continue
}
// recycle the session.
sh.recycle()
}
}
// TestMaintainer checks the session pool maintainer maintains the number of
// sessions in the following cases:
//
// 1. On initialization of session pool, replenish session pool to meet
// MinOpened or MaxIdle.
// 2. On increased session usage, provision extra MaxIdle sessions.
// 3. After the surge passes, scale down the session pool accordingly.
func TestMaintainer(t *testing.T) {
t.Parallel()
ctx := context.Background()
minOpened := uint64(5)
maxIdle := uint64(4)
_, client, teardown := setupMockedTestServerWithConfig(t,
ClientConfig{
SessionPoolConfig: SessionPoolConfig{
MinOpened: minOpened,
MaxIdle: maxIdle,
healthCheckSampleInterval: time.Millisecond,
},
})
defer teardown()
sp := client.idleSessions
waitFor(t, func() error {
sp.mu.Lock()
defer sp.mu.Unlock()
if sp.numOpened != 5 {
return fmt.Errorf("Replenish. Expect %d open, got %d", sp.MinOpened, sp.numOpened)
}
return nil
})
// To save test time, we are not creating many sessions, because the time
// to create sessions will have impact on the decision on sessionsToKeep.
// We also parallelize the take and recycle process.
shs := make([]*sessionHandle, 20)
for i := 0; i < len(shs); i++ {
var err error
shs[i], err = sp.take(ctx)
if err != nil {
t.Fatalf("cannot get session from session pool: %v", err)
}
}
sp.mu.Lock()
g, w := sp.numOpened, sp.MinOpened+sp.incStep
sp.mu.Unlock()
if g != w {
t.Fatalf("numOpened sessions mismatch\nGot: %d\nWant: %d", g, w)
}
// Return 14 sessions to the pool. There are still 6 sessions checked out.
for _, sh := range shs[:14] {
sh.recycle()
}
// The pool should scale down to sessionsInUse + MaxIdle = 6 + 4 = 10.
waitFor(t, func() error {
sp.mu.Lock()
defer sp.mu.Unlock()
if sp.numOpened != 10 {
return fmt.Errorf("Keep extra MaxIdle sessions. Expect %d open, got %d", 10, sp.numOpened)
}
return nil
})
// Return the remaining 6 sessions.
// The pool should now scale down to minOpened + maxIdle.
for _, sh := range shs[14:] {
sh.recycle()
}
waitFor(t, func() error {
sp.mu.Lock()
defer sp.mu.Unlock()
if sp.numOpened != minOpened {
return fmt.Errorf("Scale down. Expect %d open, got %d", minOpened+maxIdle, sp.numOpened)
}
return nil
})
}
// Tests that the session pool creates up to MinOpened connections.
//
// Historical context: This test also checks that a low
// healthCheckSampleInterval does not prevent it from opening connections.
// The low healthCheckSampleInterval will however sometimes cause session
// creations to time out. That should not be considered a problem, but it
// could cause the test case to fail if it happens too often.
// See: https://github.com/googleapis/google-cloud-go/issues/1259
func TestInit_CreatesSessions(t *testing.T) {
t.Parallel()
spc := SessionPoolConfig{
MinOpened: 10,
MaxIdle: 10,
WriteSessions: 0.0,
healthCheckSampleInterval: 20 * time.Millisecond,
}
server, client, teardown := setupMockedTestServerWithConfig(t,
ClientConfig{
SessionPoolConfig: spc,
NumChannels: 4,
})
defer teardown()
sp := client.idleSessions
timeout := time.After(4 * time.Second)
var numOpened int
loop:
for {
select {
case <-timeout:
t.Fatalf("timed out, got %d session(s), want %d", numOpened, spc.MinOpened)
default:
sp.mu.Lock()
numOpened = sp.idleList.Len() + sp.idleWriteList.Len()
sp.mu.Unlock()
if numOpened == 10 {
break loop
}
}
}
_, err := shouldHaveReceived(server.TestSpanner, []interface{}{
&sppb.BatchCreateSessionsRequest{},
&sppb.BatchCreateSessionsRequest{},
&sppb.BatchCreateSessionsRequest{},
&sppb.BatchCreateSessionsRequest{},
})
if err != nil {
t.Fatal(err)
}
}
// Tests that the session pool with a MinSessions>0 also prepares WriteSessions
// sessions.
func TestInit_PreparesSessions(t *testing.T) {
t.Parallel()
spc := SessionPoolConfig{
MinOpened: 10,
MaxIdle: 10,
WriteSessions: 0.5,
healthCheckSampleInterval: 20 * time.Millisecond,
}
_, client, teardown := setupMockedTestServerWithConfig(t,
ClientConfig{
SessionPoolConfig: spc,
})
defer teardown()
sp := client.idleSessions
timeoutAmt := 4 * time.Second
timeout := time.After(timeoutAmt)
var numPrepared int
want := int(spc.WriteSessions * float64(spc.MinOpened))
loop:
for {
select {
case <-timeout:
t.Fatalf("timed out after %v, got %d write-prepared session(s), want %d", timeoutAmt, numPrepared, want)
default:
sp.mu.Lock()
numPrepared = sp.idleWriteList.Len()
sp.mu.Unlock()
if numPrepared == want {
break loop
}
}
}
}
func (s1 *session) Equal(s2 *session) bool {
return s1.client == s2.client &&
s1.id == s2.id &&
s1.pool == s2.pool &&
s1.createTime == s2.createTime &&
s1.valid == s2.valid &&
s1.hcIndex == s2.hcIndex &&
s1.idleList == s2.idleList &&
s1.nextCheck.Equal(s2.nextCheck) &&
s1.checkingHealth == s2.checkingHealth &&
testEqual(s1.md, s2.md) &&
bytes.Equal(s1.tx, s2.tx)
}
func waitFor(t *testing.T, assert func() error) {
t.Helper()
timeout := 15 * time.Second
ta := time.After(timeout)
for {
select {
case <-ta:
if err := assert(); err != nil {
t.Fatalf("after %v waiting, got %v", timeout, err)
}
return
default:
}
if err := assert(); err != nil {
// Fail. Let's pause and retry.
time.Sleep(10 * time.Millisecond)
continue
}
return
}
}
// Tests that maintainer only deletes sessions after a full maintenance window
// of 10 cycles has finished.
func TestMaintainer_DeletesSessions(t *testing.T) {
t.Parallel()
ctx := context.Background()
const sampleInterval = time.Millisecond * 10
_, client, teardown := setupMockedTestServerWithConfig(t,
ClientConfig{
SessionPoolConfig: SessionPoolConfig{healthCheckSampleInterval: sampleInterval},
})
defer teardown()
sp := client.idleSessions
// Take two sessions from the pool.
// This will cause max sessions in use to be 2 during this window.
sh1 := takeSession(ctx, t, sp)
sh2 := takeSession(ctx, t, sp)
wantSessions := map[string]bool{}
wantSessions[sh1.getID()] = true
wantSessions[sh2.getID()] = true
// Return the sessions to the pool and then assure that they
// are not deleted while still within the maintenance window.
sh1.recycle()
sh2.recycle()
// Wait for 20 milliseconds, i.e. approx 2 iterations of the
// maintainer. The sessions should still be in the pool.
<-time.After(sampleInterval * 2)
sh3 := takeSession(ctx, t, sp)
sh4 := takeSession(ctx, t, sp)
// Check that the returned sessions are equal to the sessions that we got
// the first time from the session pool.
gotSessions := map[string]bool{}
gotSessions[sh3.getID()] = true
gotSessions[sh4.getID()] = true
testEqual(wantSessions, gotSessions)
// Return the sessions to the pool.
sh3.recycle()
sh4.recycle()
// Now wait for the maintenance window to finish. This will cause the
// maintainer to enter a new window and reset the max number of sessions in
// use to the currently number of checked out sessions. That is 0, as all
// sessions have been returned to the pool. That again will cause the
// maintainer to delete these sessions at the next iteration, unless we
// checkout new sessions during the first iteration.
waitFor(t, func() error {
sp.mu.Lock()
defer sp.mu.Unlock()
if sp.numOpened > 0 {
return fmt.Errorf("session pool still contains more than 0 sessions")
}
return nil
})
sh5 := takeSession(ctx, t, sp)
sh6 := takeSession(ctx, t, sp)
// Assure that these sessions are new sessions.
if gotSessions[sh5.getID()] || gotSessions[sh6.getID()] {
t.Fatal("got unexpected existing session from pool")
}
}
func takeSession(ctx context.Context, t *testing.T, sp *sessionPool) *sessionHandle {
sh, err := sp.take(ctx)
if err != nil {
t.Fatalf("cannot get session from session pool: %v", err)
}
return sh
}
func TestMaintenanceWindow_CycleAndUpdateMaxCheckedOut(t *testing.T) {
t.Parallel()
maxOpened := uint64(1000)
mw := newMaintenanceWindow(maxOpened)
for _, m := range mw.maxSessionsCheckedOut {
if m < maxOpened {
t.Fatalf("Max sessions checked out mismatch.\nGot: %v\nWant: %v", m, maxOpened)
}
}
// Do one cycle and simulate that there are currently no sessions checked
// out of the pool.
mw.startNewCycle(0)
if g, w := mw.maxSessionsCheckedOut[0], uint64(0); g != w {
t.Fatalf("Max sessions checked out mismatch.\nGot: %d\nWant: %d", g, w)
}
for _, m := range mw.maxSessionsCheckedOut[1:] {
if m < maxOpened {
t.Fatalf("Max sessions checked out mismatch.\nGot: %v\nWant: %v", m, maxOpened)
}
}
// Check that the max checked out during the entire window is still
// maxOpened.
if g, w := mw.maxSessionsCheckedOutDuringWindow(), maxOpened; g != w {
t.Fatalf("Max sessions checked out during window mismatch.\nGot: %d\nWant: %d", g, w)
}
// Update the max number checked out for the current cycle.
mw.updateMaxSessionsCheckedOutDuringWindow(uint64(10))
if g, w := mw.maxSessionsCheckedOut[0], uint64(10); g != w {
t.Fatalf("Max sessions checked out mismatch.\nGot: %d\nWant: %d", g, w)
}
// The max of the entire window should still not change.
if g, w := mw.maxSessionsCheckedOutDuringWindow(), maxOpened; g != w {
t.Fatalf("Max sessions checked out during window mismatch.\nGot: %d\nWant: %d", g, w)
}
// Now pass enough cycles to complete a maintenance window. Each cycle has
// no sessions checked out. We start at 1, as we have already passed one
// cycle. This should then be the last cycle still in the maintenance
// window, and the only one with a maxSessionsCheckedOut greater than 0.
for i := 1; i < maintenanceWindowSize; i++ {
mw.startNewCycle(0)
}
for _, m := range mw.maxSessionsCheckedOut[:9] {
if m != 0 {
t.Fatalf("Max sessions checked out mismatch.\nGot: %v\nWant: %v", m, 0)
}
}
// The oldest cycle in the window should have max=10.
if g, w := mw.maxSessionsCheckedOut[maintenanceWindowSize-1], uint64(10); g != w {
t.Fatalf("Max sessions checked out mismatch.\nGot: %d\nWant: %d", g, w)
}
// The max of the entire window should now be 10.
if g, w := mw.maxSessionsCheckedOutDuringWindow(), uint64(10); g != w {
t.Fatalf("Max sessions checked out during window mismatch.\nGot: %d\nWant: %d", g, w)
}
// Do another cycle with max=0.
mw.startNewCycle(0)
// The max of the entire window should now be 0.
if g, w := mw.maxSessionsCheckedOutDuringWindow(), uint64(0); g != w {
t.Fatalf("Max sessions checked out during window mismatch.\nGot: %d\nWant: %d", g, w)
}
// Do another cycle with 5 sessions as max. This should now be the new
// window max.
mw.startNewCycle(5)
if g, w := mw.maxSessionsCheckedOutDuringWindow(), uint64(5); g != w {
t.Fatalf("Max sessions checked out during window mismatch.\nGot: %d\nWant: %d", g, w)
}
// Do a couple of cycles so that the only non-zero value is in the middle.
// The max for the entire window should still be 5.
for i := 0; i < maintenanceWindowSize/2; i++ {
mw.startNewCycle(0)
}
if g, w := mw.maxSessionsCheckedOutDuringWindow(), uint64(5); g != w {
t.Fatalf("Max sessions checked out during window mismatch.\nGot: %d\nWant: %d", g, w)
}
}
func TestSessionCreationIsDistributedOverChannels(t *testing.T) {
t.Parallel()
numChannels := 4
spc := SessionPoolConfig{
MinOpened: 12,
WriteSessions: 0.0,
incStep: 2,
}
_, client, teardown := setupMockedTestServerWithConfig(t,
ClientConfig{
SessionPoolConfig: spc,
NumChannels: numChannels,
})
defer teardown()
sp := client.idleSessions
waitFor(t, func() error {
sp.mu.Lock()
// WriteSessions = 0, so we only have to check for read sessions.
numOpened := uint64(sp.idleList.Len())
sp.mu.Unlock()
if numOpened < spc.MinOpened {
return fmt.Errorf("not yet initialized")
}
return nil
})
sessionsPerChannel := getSessionsPerChannel(sp)
if g, w := len(sessionsPerChannel), numChannels; g != w {
t.Errorf("number of channels mismatch\nGot: %d\nWant: %d", g, w)
}
for k, v := range sessionsPerChannel {
if g, w := v, int(sp.MinOpened)/numChannels; g != w {
t.Errorf("number of sessions mismatch for %s:\nGot: %d\nWant: %d", k, g, w)
}
}
// Check out all sessions + incStep * numChannels from the pool. This
// should cause incStep * numChannels additional sessions to be created.
checkedOut := make([]*sessionHandle, sp.MinOpened+sp.incStep*uint64(numChannels))
var err error
for i := 0; i < cap(checkedOut); i++ {
checkedOut[i], err = sp.take(context.Background())
if err != nil {
t.Fatal(err)
}
}
for i := 0; i < cap(checkedOut); i++ {
checkedOut[i].recycle()
}
// The additional sessions should also be distributed over all available
// channels.
sessionsPerChannel = getSessionsPerChannel(sp)
// There should not be any new clients (channels).
if g, w := len(sessionsPerChannel), numChannels; g != w {
t.Errorf("number of channels mismatch\nGot: %d\nWant: %d", g, w)
}
for k, v := range sessionsPerChannel {
if g, w := v, int(sp.MinOpened)/numChannels+int(sp.incStep); g != w {
t.Errorf("number of sessions mismatch for %s:\nGot: %d\nWant: %d", k, g, w)
}
}
}
func getSessionsPerChannel(sp *sessionPool) map[string]int {
sessionsPerChannel := make(map[string]int)
sp.mu.Lock()
defer sp.mu.Unlock()
el := sp.idleList.Front()
for el != nil {
s, _ := el.Value.(*session)
// Get the pointer to the actual underlying gRPC ClientConn and use
// that as the key in the map.
val := reflect.ValueOf(s.client).Elem()
connPool := val.FieldByName("connPool").Elem().Elem()
conn := connPool.Field(0).Pointer()
key := fmt.Sprintf("%v", conn)
sessionsPerChannel[key] = sessionsPerChannel[key] + 1
el = el.Next()
}
return sessionsPerChannel
}