blob: f3f779bc8b11baa87783295b206b68ae68781145 [file] [log] [blame]
/*
Copyright 2017 Google LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package spanner
import (
"bytes"
"container/heap"
"context"
"fmt"
"math/rand"
"testing"
"time"
vkit "cloud.google.com/go/spanner/apiv1"
"cloud.google.com/go/spanner/internal/testutil"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// TestSessionPoolConfigValidation tests session pool config validation.
func TestSessionPoolConfigValidation(t *testing.T) {
t.Parallel()
server, client := newSpannerInMemTestServer(t)
defer server.teardown(client)
for _, test := range []struct {
spc SessionPoolConfig
err error
}{
{
SessionPoolConfig{},
errNoRPCGetter(),
},
{
SessionPoolConfig{
getRPCClient: func() (*vkit.Client, error) {
return client.clients[0], nil
},
MinOpened: 10,
MaxOpened: 5,
},
errMinOpenedGTMaxOpened(5, 10),
},
} {
if _, err := newSessionPool("mockdb", test.spc, nil); !testEqual(err, test.err) {
t.Fatalf("want %v, got %v", test.err, err)
}
}
}
// TestSessionCreation tests session creation during sessionPool.Take().
func TestSessionCreation(t *testing.T) {
t.Parallel()
ctx := context.Background()
server, client := newSpannerInMemTestServer(t)
defer server.teardown(client)
sp := client.idleSessions
// Take three sessions from session pool, this should trigger session pool
// to create three new sessions.
shs := make([]*sessionHandle, 3)
// gotDs holds the unique sessions taken from session pool.
gotDs := map[string]bool{}
for i := 0; i < len(shs); i++ {
var err error
shs[i], err = sp.take(ctx)
if err != nil {
t.Fatalf("failed to get session(%v): %v", i, err)
}
gotDs[shs[i].getID()] = true
}
if len(gotDs) != len(shs) {
t.Fatalf("session pool created %v sessions, want %v", len(gotDs), len(shs))
}
if wantDs := server.testSpanner.DumpSessions(); !testEqual(gotDs, wantDs) {
t.Fatalf("session pool creates sessions %v, want %v", gotDs, wantDs)
}
// Verify that created sessions are recorded correctly in session pool.
sp.mu.Lock()
if int(sp.numOpened) != len(shs) {
t.Fatalf("session pool reports %v open sessions, want %v", sp.numOpened, len(shs))
}
if sp.createReqs != 0 {
t.Fatalf("session pool reports %v session create requests, want 0", int(sp.createReqs))
}
sp.mu.Unlock()
// Verify that created sessions are tracked correctly by healthcheck queue.
hc := sp.hc
hc.mu.Lock()
if hc.queue.Len() != len(shs) {
t.Fatalf("healthcheck queue length = %v, want %v", hc.queue.Len(), len(shs))
}
for _, s := range hc.queue.sessions {
if !gotDs[s.getID()] {
t.Fatalf("session %v is in healthcheck queue, but it is not created by session pool", s.getID())
}
}
hc.mu.Unlock()
}
// TestTakeFromIdleList tests taking sessions from session pool's idle list.
func TestTakeFromIdleList(t *testing.T) {
t.Parallel()
ctx := context.Background()
// Make sure maintainer keeps the idle sessions.
server, client := newSpannerInMemTestServerWithConfig(t,
ClientConfig{
SessionPoolConfig: SessionPoolConfig{MaxIdle: 10},
})
defer server.teardown(client)
sp := client.idleSessions
// Take ten sessions from session pool and recycle them.
shs := make([]*sessionHandle, 10)
for i := 0; i < len(shs); i++ {
var err error
shs[i], err = sp.take(ctx)
if err != nil {
t.Fatalf("failed to get session(%v): %v", i, err)
}
}
// Make sure it's sampled once before recycling, otherwise it will be
// cleaned up.
<-time.After(sp.SessionPoolConfig.healthCheckSampleInterval)
for i := 0; i < len(shs); i++ {
shs[i].recycle()
}
// Further session requests from session pool won't cause mockclient to
// create more sessions.
wantSessions := server.testSpanner.DumpSessions()
// Take ten sessions from session pool again, this time all sessions should
// come from idle list.
gotSessions := map[string]bool{}
for i := 0; i < len(shs); i++ {
sh, err := sp.take(ctx)
if err != nil {
t.Fatalf("cannot take session from session pool: %v", err)
}
gotSessions[sh.getID()] = true
}
if len(gotSessions) != 10 {
t.Fatalf("got %v unique sessions, want 10", len(gotSessions))
}
if !testEqual(gotSessions, wantSessions) {
t.Fatalf("got sessions: %v, want %v", gotSessions, wantSessions)
}
}
// TesttakeWriteSessionFromIdleList tests taking write sessions from session
// pool's idle list.
func TestTakeWriteSessionFromIdleList(t *testing.T) {
t.Parallel()
ctx := context.Background()
// Make sure maintainer keeps the idle sessions.
server, client := newSpannerInMemTestServerWithConfig(t,
ClientConfig{
SessionPoolConfig: SessionPoolConfig{MaxIdle: 20},
})
defer server.teardown(client)
sp := client.idleSessions
// Take ten sessions from session pool and recycle them.
shs := make([]*sessionHandle, 10)
for i := 0; i < len(shs); i++ {
var err error
shs[i], err = sp.takeWriteSession(ctx)
if err != nil {
t.Fatalf("failed to get session(%v): %v", i, err)
}
}
// Make sure it's sampled once before recycling, otherwise it will be
// cleaned up.
<-time.After(sp.SessionPoolConfig.healthCheckSampleInterval)
for i := 0; i < len(shs); i++ {
shs[i].recycle()
}
// Further session requests from session pool won't cause mockclient to
// create more sessions.
wantSessions := server.testSpanner.DumpSessions()
// Take ten sessions from session pool again, this time all sessions should
// come from idle list.
gotSessions := map[string]bool{}
for i := 0; i < len(shs); i++ {
sh, err := sp.takeWriteSession(ctx)
if err != nil {
t.Fatalf("cannot take session from session pool: %v", err)
}
gotSessions[sh.getID()] = true
}
if len(gotSessions) != 10 {
t.Fatalf("got %v unique sessions, want 10", len(gotSessions))
}
if !testEqual(gotSessions, wantSessions) {
t.Fatalf("got sessions: %v, want %v", gotSessions, wantSessions)
}
}
// TestTakeFromIdleListChecked tests taking sessions from session pool's idle
// list, but with a extra ping check.
func TestTakeFromIdleListChecked(t *testing.T) {
t.Parallel()
ctx := context.Background()
// Make sure maintainer keeps the idle sessions.
server, client := newSpannerInMemTestServerWithConfig(t,
ClientConfig{
SessionPoolConfig: SessionPoolConfig{
MaxIdle: 1,
HealthCheckInterval: 50 * time.Millisecond,
healthCheckSampleInterval: 10 * time.Millisecond,
},
})
defer server.teardown(client)
sp := client.idleSessions
// Stop healthcheck workers to simulate slow pings.
sp.hc.close()
// Create a session and recycle it.
sh, err := sp.take(ctx)
if err != nil {
t.Fatalf("failed to get session: %v", err)
}
// Make sure it's sampled once before recycling, otherwise it will be
// cleaned up.
<-time.After(sp.SessionPoolConfig.healthCheckSampleInterval)
wantSid := sh.getID()
sh.recycle()
// TODO(deklerk): get rid of this
<-time.After(time.Second)
// Two back-to-back session requests, both of them should return the same
// session created before and none of them should trigger a session ping.
for i := 0; i < 2; i++ {
// Take the session from the idle list and recycle it.
sh, err = sp.take(ctx)
if err != nil {
t.Fatalf("%v - failed to get session: %v", i, err)
}
if gotSid := sh.getID(); gotSid != wantSid {
t.Fatalf("%v - got session id: %v, want %v", i, gotSid, wantSid)
}
// The two back-to-back session requests shouldn't trigger any session
// pings because sessionPool.Take
// reschedules the next healthcheck.
if got, want := server.testSpanner.DumpPings(), ([]string{wantSid}); !testEqual(got, want) {
t.Fatalf("%v - got ping session requests: %v, want %v", i, got, want)
}
sh.recycle()
}
// Inject session error to server stub, and take the session from the
// session pool, the old session should be destroyed and the session pool
// will create a new session.
server.testSpanner.PutExecutionTime(testutil.MethodGetSession,
testutil.SimulatedExecutionTime{
Errors: []error{status.Errorf(codes.NotFound, "Session not found")},
})
// Delay to trigger sessionPool.Take to ping the session.
// TODO(deklerk): get rid of this
<-time.After(time.Second)
// take will take the idle session. Then it will send a GetSession request
// to check if it's healthy. It'll discover that it's not healthy
// (NotFound), drop it, and create a new session.
sh, err = sp.take(ctx)
if err != nil {
t.Fatalf("failed to get session: %v", err)
}
ds := server.testSpanner.DumpSessions()
if len(ds) != 1 {
t.Fatalf("dumped sessions from mockclient: %v, want %v", ds, sh.getID())
}
if sh.getID() == wantSid {
t.Fatalf("sessionPool.Take still returns the same session %v, want it to create a new one", wantSid)
}
}
// TestTakeFromIdleWriteListChecked tests taking sessions from session pool's
// idle list, but with a extra ping check.
func TestTakeFromIdleWriteListChecked(t *testing.T) {
t.Parallel()
ctx := context.Background()
// Make sure maintainer keeps the idle sessions.
server, client := newSpannerInMemTestServerWithConfig(t,
ClientConfig{
SessionPoolConfig: SessionPoolConfig{
MaxIdle: 1,
HealthCheckInterval: 50 * time.Millisecond,
healthCheckSampleInterval: 10 * time.Millisecond,
},
})
defer server.teardown(client)
sp := client.idleSessions
// Stop healthcheck workers to simulate slow pings.
sp.hc.close()
// Create a session and recycle it.
sh, err := sp.takeWriteSession(ctx)
if err != nil {
t.Fatalf("failed to get session: %v", err)
}
wantSid := sh.getID()
// Make sure it's sampled once before recycling, otherwise it will be
// cleaned up.
<-time.After(sp.SessionPoolConfig.healthCheckSampleInterval)
sh.recycle()
// TODO(deklerk): get rid of this
<-time.After(time.Second)
// Two back-to-back session requests, both of them should return the same
// session created before and none of them should trigger a session ping.
for i := 0; i < 2; i++ {
// Take the session from the idle list and recycle it.
sh, err = sp.takeWriteSession(ctx)
if err != nil {
t.Fatalf("%v - failed to get session: %v", i, err)
}
if gotSid := sh.getID(); gotSid != wantSid {
t.Fatalf("%v - got session id: %v, want %v", i, gotSid, wantSid)
}
// The two back-to-back session requests shouldn't trigger any session
// pings because sessionPool.Take reschedules the next healthcheck.
if got, want := server.testSpanner.DumpPings(), ([]string{wantSid}); !testEqual(got, want) {
t.Fatalf("%v - got ping session requests: %v, want %v", i, got, want)
}
sh.recycle()
}
// Inject session error to mockclient, and take the session from the
// session pool, the old session should be destroyed and the session pool
// will create a new session.
server.testSpanner.PutExecutionTime(testutil.MethodGetSession,
testutil.SimulatedExecutionTime{
Errors: []error{status.Errorf(codes.NotFound, "Session not found")},
})
// Delay to trigger sessionPool.Take to ping the session.
// TOOD(deklerk) get rid of this
<-time.After(time.Second)
sh, err = sp.takeWriteSession(ctx)
if err != nil {
t.Fatalf("failed to get session: %v", err)
}
ds := server.testSpanner.DumpSessions()
if len(ds) != 1 {
t.Fatalf("dumped sessions from mockclient: %v, want %v", ds, sh.getID())
}
if sh.getID() == wantSid {
t.Fatalf("sessionPool.Take still returns the same session %v, want it to create a new one", wantSid)
}
}
// TestMaxOpenedSessions tests max open sessions constraint.
func TestMaxOpenedSessions(t *testing.T) {
t.Parallel()
ctx := context.Background()
server, client := newSpannerInMemTestServerWithConfig(t,
ClientConfig{
SessionPoolConfig: SessionPoolConfig{
MaxOpened: 1,
},
})
defer server.teardown(client)
sp := client.idleSessions
sh1, err := sp.take(ctx)
if err != nil {
t.Fatalf("cannot take session from session pool: %v", err)
}
// Session request will timeout due to the max open sessions constraint.
ctx2, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
_, gotErr := sp.take(ctx2)
if wantErr := errGetSessionTimeout(); !testEqual(gotErr, wantErr) {
t.Fatalf("the second session retrival returns error %v, want %v", gotErr, wantErr)
}
go func() {
// TODO(deklerk): remove this
<-time.After(time.Second)
// Destroy the first session to allow the next session request to
// proceed.
sh1.destroy()
}()
// Now session request can be processed because the first session will be
// destroyed.
sh2, err := sp.take(ctx)
if err != nil {
t.Fatalf("after the first session is destroyed, session retrival still returns error %v, want nil", err)
}
if !sh2.session.isValid() || sh2.getID() == "" {
t.Fatalf("got invalid session: %v", sh2.session)
}
}
// TestMinOpenedSessions tests min open session constraint.
func TestMinOpenedSessions(t *testing.T) {
t.Parallel()
ctx := context.Background()
server, client := newSpannerInMemTestServerWithConfig(t,
ClientConfig{
SessionPoolConfig: SessionPoolConfig{
MinOpened: 1,
},
})
defer server.teardown(client)
sp := client.idleSessions
// Take ten sessions from session pool and recycle them.
var ss []*session
var shs []*sessionHandle
for i := 0; i < 10; i++ {
sh, err := sp.take(ctx)
if err != nil {
t.Fatalf("failed to get session(%v): %v", i, err)
}
ss = append(ss, sh.session)
shs = append(shs, sh)
sh.recycle()
}
for _, sh := range shs {
sh.recycle()
}
// Simulate session expiration.
for _, s := range ss {
s.destroy(true)
}
sp.mu.Lock()
defer sp.mu.Unlock()
// There should be still one session left in idle list due to the min open
// sessions constraint.
if sp.idleList.Len() != 1 {
t.Fatalf("got %v sessions in idle list, want 1 %d", sp.idleList.Len(), sp.numOpened)
}
}
// TestMaxBurst tests max burst constraint.
func TestMaxBurst(t *testing.T) {
t.Parallel()
ctx := context.Background()
server, client := newSpannerInMemTestServerWithConfig(t,
ClientConfig{
SessionPoolConfig: SessionPoolConfig{
MaxBurst: 1,
},
})
defer server.teardown(client)
sp := client.idleSessions
// Will cause session creation RPC to be retried forever.
server.testSpanner.PutExecutionTime(testutil.MethodCreateSession,
testutil.SimulatedExecutionTime{
Errors: []error{status.Errorf(codes.Unavailable, "try later")},
KeepError: true,
})
// This session request will never finish until the injected error is
// cleared.
go sp.take(ctx)
// Poll for the execution of the first session request.
for {
sp.mu.Lock()
cr := sp.createReqs
sp.mu.Unlock()
if cr == 0 {
<-time.After(time.Second)
continue
}
// The first session request is being executed.
break
}
ctx2, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
_, gotErr := sp.take(ctx2)
// Since MaxBurst == 1, the second session request should block.
if wantErr := errGetSessionTimeout(); !testEqual(gotErr, wantErr) {
t.Fatalf("session retrival returns error %v, want %v", gotErr, wantErr)
}
// Let the first session request succeed.
server.testSpanner.Freeze()
server.testSpanner.PutExecutionTime(testutil.MethodCreateSession, testutil.SimulatedExecutionTime{})
//close(allowRequests)
server.testSpanner.Unfreeze()
// Now new session request can proceed because the first session request will eventually succeed.
sh, err := sp.take(ctx)
if err != nil {
t.Fatalf("session retrival returns error %v, want nil", err)
}
if !sh.session.isValid() || sh.getID() == "" {
t.Fatalf("got invalid session: %v", sh.session)
}
}
// TestSessionRecycle tests recycling sessions.
func TestSessionRecycle(t *testing.T) {
t.Parallel()
ctx := context.Background()
server, client := newSpannerInMemTestServerWithConfig(t,
ClientConfig{
SessionPoolConfig: SessionPoolConfig{
MinOpened: 1,
MaxIdle: 5,
},
})
defer server.teardown(client)
sp := client.idleSessions
// Test session is correctly recycled and reused.
for i := 0; i < 20; i++ {
s, err := sp.take(ctx)
if err != nil {
t.Fatalf("cannot get the session %v: %v", i, err)
}
s.recycle()
}
sp.mu.Lock()
defer sp.mu.Unlock()
// Ideally it should only be 1, because the session should be recycled and
// re-used each time. However, sometimes the pool maintainer might increase
// the pool size by 1 right around the time we take (which also increases
// the pool size by 1), so this assertion is OK with either 1 or 2. We
// expect never to see more than 2, though, even when MaxIdle is quite high:
// each session should be recycled and re-used.
if sp.numOpened != 1 && sp.numOpened != 2 {
t.Fatalf("Expect session pool size 1 or 2, got %d", sp.numOpened)
}
}
// TODO(deklerk): Investigate why s.destroy(true) is flakey.
// TestSessionDestroy tests destroying sessions.
func TestSessionDestroy(t *testing.T) {
t.Skip("s.destroy(true) is flakey")
t.Parallel()
ctx := context.Background()
server, client := newSpannerInMemTestServerWithConfig(t,
ClientConfig{
SessionPoolConfig: SessionPoolConfig{
MinOpened: 1,
},
})
defer server.teardown(client)
sp := client.idleSessions
<-time.After(10 * time.Millisecond) // maintainer will create one session, we wait for it create session to avoid flakiness in test
sh, err := sp.take(ctx)
if err != nil {
t.Fatalf("cannot get session from session pool: %v", err)
}
s := sh.session
sh.recycle()
if d := s.destroy(true); d || !s.isValid() {
// Session should be remaining because of min open sessions constraint.
t.Fatalf("session %s invalid, want it to stay alive. (destroy in expiration mode, success: %v)", s.id, d)
}
if d := s.destroy(false); !d || s.isValid() {
// Session should be destroyed.
t.Fatalf("failed to destroy session %s. (destroy in default mode, success: %v)", s.id, d)
}
}
// TestHcHeap tests heap operation on top of hcHeap.
func TestHcHeap(t *testing.T) {
in := []*session{
{nextCheck: time.Unix(10, 0)},
{nextCheck: time.Unix(0, 5)},
{nextCheck: time.Unix(1, 8)},
{nextCheck: time.Unix(11, 7)},
{nextCheck: time.Unix(6, 3)},
}
want := []*session{
{nextCheck: time.Unix(1, 8), hcIndex: 0},
{nextCheck: time.Unix(6, 3), hcIndex: 1},
{nextCheck: time.Unix(8, 2), hcIndex: 2},
{nextCheck: time.Unix(10, 0), hcIndex: 3},
{nextCheck: time.Unix(11, 7), hcIndex: 4},
}
hh := hcHeap{}
for _, s := range in {
heap.Push(&hh, s)
}
// Change top of the heap and do a adjustment.
hh.sessions[0].nextCheck = time.Unix(8, 2)
heap.Fix(&hh, 0)
for idx := 0; hh.Len() > 0; idx++ {
got := heap.Pop(&hh).(*session)
want[idx].hcIndex = -1
if !testEqual(got, want[idx]) {
t.Fatalf("%v: heap.Pop returns %v, want %v", idx, got, want[idx])
}
}
}
// TestHealthCheckScheduler tests if healthcheck workers can schedule and
// perform healthchecks properly.
func TestHealthCheckScheduler(t *testing.T) {
t.Parallel()
ctx := context.Background()
server, client := newSpannerInMemTestServerWithConfig(t,
ClientConfig{
SessionPoolConfig: SessionPoolConfig{
HealthCheckInterval: 50 * time.Millisecond,
healthCheckSampleInterval: 10 * time.Millisecond,
},
})
defer server.teardown(client)
sp := client.idleSessions
// Create 50 sessions.
for i := 0; i < 50; i++ {
_, err := sp.take(ctx)
if err != nil {
t.Fatalf("cannot get session from session pool: %v", err)
}
}
// Make sure we start with a ping history to avoid that the first
// sessions that were created have not already exceeded the maximum
// number of pings.
server.testSpanner.ClearPings()
// Wait for 10-30 pings per session.
waitFor(t, func() error {
// Only check actually live sessions and ignore any sessions the
// session pool may have deleted in the meantime.
liveSessions := server.testSpanner.DumpSessions()
dp := server.testSpanner.DumpPings()
gotPings := map[string]int64{}
for _, p := range dp {
gotPings[p]++
}
for s := range liveSessions {
want := int64(20)
if got := gotPings[s]; got < want/2 || got > want+want/2 {
// This is an unnacceptable amount of pings.
return fmt.Errorf("got %v healthchecks on session %v, want it between (%v, %v)", got, s, want/2, want+want/2)
}
}
return nil
})
}
// Tests that a fractions of sessions are prepared for write by health checker.
func TestWriteSessionsPrepared(t *testing.T) {
t.Parallel()
ctx := context.Background()
server, client := newSpannerInMemTestServerWithConfig(t,
ClientConfig{
SessionPoolConfig: SessionPoolConfig{
WriteSessions: 0.5,
MaxIdle: 20,
},
})
defer server.teardown(client)
sp := client.idleSessions
shs := make([]*sessionHandle, 10)
var err error
for i := 0; i < 10; i++ {
shs[i], err = sp.take(ctx)
if err != nil {
t.Fatalf("cannot get session from session pool: %v", err)
}
}
// Now there are 10 sessions in the pool. Release them.
for _, sh := range shs {
sh.recycle()
}
// Sleep for 1s, allowing healthcheck workers to invoke begin transaction.
// TODO(deklerk): get rid of this
<-time.After(time.Second)
wshs := make([]*sessionHandle, 5)
for i := 0; i < 5; i++ {
wshs[i], err = sp.takeWriteSession(ctx)
if err != nil {
t.Fatalf("cannot get session from session pool: %v", err)
}
if wshs[i].getTransactionID() == nil {
t.Fatalf("got nil transaction id from session pool")
}
}
for _, sh := range wshs {
sh.recycle()
}
// TODO(deklerk): get rid of this
<-time.After(time.Second)
// Now force creation of 10 more sessions.
shs = make([]*sessionHandle, 20)
for i := 0; i < 20; i++ {
shs[i], err = sp.take(ctx)
if err != nil {
t.Fatalf("cannot get session from session pool: %v", err)
}
}
// Now there are 20 sessions in the pool. Release them.
for _, sh := range shs {
sh.recycle()
}
// TODO(deklerk): get rid of this
<-time.After(time.Second)
if sp.idleWriteList.Len() != 10 {
t.Fatalf("Expect 10 write prepared session, got: %d", sp.idleWriteList.Len())
}
}
// TestTakeFromWriteQueue tests that sessionPool.take() returns write prepared
// sessions as well.
func TestTakeFromWriteQueue(t *testing.T) {
t.Parallel()
ctx := context.Background()
server, client := newSpannerInMemTestServerWithConfig(t,
ClientConfig{
SessionPoolConfig: SessionPoolConfig{
MaxOpened: 1,
WriteSessions: 1.0,
MaxIdle: 1,
},
})
defer server.teardown(client)
sp := client.idleSessions
sh, err := sp.take(ctx)
if err != nil {
t.Fatalf("cannot get session from session pool: %v", err)
}
sh.recycle()
// TODO(deklerk): get rid of this
<-time.After(time.Second)
// The session should now be in write queue but take should also return it.
if sp.idleWriteList.Len() == 0 {
t.Fatalf("write queue unexpectedly empty")
}
if sp.idleList.Len() != 0 {
t.Fatalf("read queue not empty")
}
sh, err = sp.take(ctx)
if err != nil {
t.Fatalf("cannot get session from session pool: %v", err)
}
sh.recycle()
}
// TestSessionHealthCheck tests healthchecking cases.
func TestSessionHealthCheck(t *testing.T) {
t.Parallel()
ctx := context.Background()
server, client := newSpannerInMemTestServerWithConfig(t,
ClientConfig{
SessionPoolConfig: SessionPoolConfig{
HealthCheckInterval: 50 * time.Millisecond,
healthCheckSampleInterval: 10 * time.Millisecond,
},
})
defer server.teardown(client)
sp := client.idleSessions
// Test pinging sessions.
sh, err := sp.take(ctx)
if err != nil {
t.Fatalf("cannot get session from session pool: %v", err)
}
// Wait for healthchecker to send pings to session.
waitFor(t, func() error {
pings := server.testSpanner.DumpPings()
if len(pings) == 0 || pings[0] != sh.getID() {
return fmt.Errorf("healthchecker didn't send any ping to session %v", sh.getID())
}
return nil
})
// Test broken session detection.
sh, err = sp.take(ctx)
if err != nil {
t.Fatalf("cannot get session from session pool: %v", err)
}
server.testSpanner.Freeze()
server.testSpanner.PutExecutionTime(testutil.MethodGetSession,
testutil.SimulatedExecutionTime{
Errors: []error{status.Errorf(codes.NotFound, "Session not found")},
KeepError: true,
})
server.testSpanner.Unfreeze()
//atomic.SwapInt64(&requestShouldErr, 1)
// Wait for healthcheck workers to find the broken session and tear it down.
// TODO(deklerk): get rid of this
<-time.After(1 * time.Second)
s := sh.session
if sh.session.isValid() {
t.Fatalf("session(%v) is still alive, want it to be dropped by healthcheck workers", s)
}
server.testSpanner.Freeze()
server.testSpanner.PutExecutionTime(testutil.MethodGetSession, testutil.SimulatedExecutionTime{})
server.testSpanner.Unfreeze()
// Test garbage collection.
sh, err = sp.take(ctx)
if err != nil {
t.Fatalf("cannot get session from session pool: %v", err)
}
sp.close()
if sh.session.isValid() {
t.Fatalf("session(%v) is still alive, want it to be garbage collected", s)
}
}
// TestStressSessionPool does stress test on session pool by the following concurrent operations:
// 1) Test worker gets a session from the pool.
// 2) Test worker turns a session back into the pool.
// 3) Test worker destroys a session got from the pool.
// 4) Healthcheck destroys a broken session (because a worker has already destroyed it).
// 5) Test worker closes the session pool.
//
// During the test, the session pool maintainer maintains the number of sessions,
// and it is expected that all sessions that are taken from session pool remains valid.
// When all test workers and healthcheck workers exit, mockclient, session pool
// and healthchecker should be in consistent state.
func TestStressSessionPool(t *testing.T) {
t.Parallel()
// Use concurrent workers to test different session pool built from different configurations.
for ti, cfg := range []SessionPoolConfig{
{},
{MinOpened: 10, MaxOpened: 100},
{MaxBurst: 50},
{MinOpened: 10, MaxOpened: 200, MaxBurst: 5},
{MinOpened: 10, MaxOpened: 200, MaxBurst: 5, WriteSessions: 0.2},
} {
// Create a more aggressive session healthchecker to increase test concurrency.
cfg.HealthCheckInterval = 50 * time.Millisecond
cfg.healthCheckSampleInterval = 10 * time.Millisecond
cfg.HealthCheckWorkers = 50
server, client := newSpannerInMemTestServerWithConfig(t,
ClientConfig{
SessionPoolConfig: cfg,
})
sp := client.idleSessions
// Create a test group for this configuration and schedule 100 sub
// sub tests within the group.
t.Run(fmt.Sprintf("TestStressSessionPoolGroup%v", ti), func(t *testing.T) {
for i := 0; i < 100; i++ {
idx := i
t.Run(fmt.Sprintf("TestStressSessionPoolWithCfg%dWorker%03d", ti, idx),
func(t *testing.T) {
testStressSessionPool(t, cfg, ti, idx, sp, client)
})
}
})
sp.hc.close()
// Here the states of healthchecker, session pool and mockclient are
// stable.
idleSessions := map[string]bool{}
hcSessions := map[string]bool{}
mockSessions := server.testSpanner.DumpSessions()
// Dump session pool's idle list.
for sl := sp.idleList.Front(); sl != nil; sl = sl.Next() {
s := sl.Value.(*session)
if idleSessions[s.getID()] {
t.Fatalf("%v: found duplicated session in idle list: %v", ti, s.getID())
}
idleSessions[s.getID()] = true
}
for sl := sp.idleWriteList.Front(); sl != nil; sl = sl.Next() {
s := sl.Value.(*session)
if idleSessions[s.getID()] {
t.Fatalf("%v: found duplicated session in idle write list: %v", ti, s.getID())
}
idleSessions[s.getID()] = true
}
sp.mu.Lock()
if int(sp.numOpened) != len(idleSessions) {
t.Fatalf("%v: number of opened sessions (%v) != number of idle sessions (%v)", ti, sp.numOpened, len(idleSessions))
}
if sp.createReqs != 0 {
t.Fatalf("%v: number of pending session creations = %v, want 0", ti, sp.createReqs)
}
// Dump healthcheck queue.
for _, s := range sp.hc.queue.sessions {
if hcSessions[s.getID()] {
t.Fatalf("%v: found duplicated session in healthcheck queue: %v", ti, s.getID())
}
hcSessions[s.getID()] = true
}
sp.mu.Unlock()
// Verify that idleSessions == hcSessions == mockSessions.
if !testEqual(idleSessions, hcSessions) {
t.Fatalf("%v: sessions in idle list (%v) != sessions in healthcheck queue (%v)", ti, idleSessions, hcSessions)
}
// The server may contain more sessions than the health check queue.
// This can be caused by a timeout client side during a CreateSession
// request. The request may still be received and executed by the
// server, but the session pool will not register the session.
for id, b := range hcSessions {
if b && !mockSessions[id] {
t.Fatalf("%v: session in healthcheck queue (%v) was not found on server", ti, id)
}
}
sp.close()
mockSessions = server.testSpanner.DumpSessions()
for id, b := range hcSessions {
if b && mockSessions[id] {
t.Fatalf("Found session from pool still live on server: %v", id)
}
}
server.teardown(client)
}
}
func testStressSessionPool(t *testing.T, cfg SessionPoolConfig, ti int, idx int, pool *sessionPool, client *Client) {
t.Parallel()
ctx := context.Background()
// Test worker iterates 1K times and tries different
// session / session pool operations.
for j := 0; j < 1000; j++ {
if idx%10 == 0 && j >= 900 {
// Close the pool in selected set of workers during the
// middle of the test.
pool.close()
}
// Take a write sessions ~ 20% of the times.
takeWrite := rand.Intn(5) == 4
var (
sh *sessionHandle
gotErr error
)
wasValid := pool.isValid()
if takeWrite {
sh, gotErr = pool.takeWriteSession(ctx)
} else {
sh, gotErr = pool.take(ctx)
}
if gotErr != nil {
if pool.isValid() {
t.Fatalf("%v.%v: pool.take returns error when pool is still valid: %v", ti, idx, gotErr)
}
// If the session pool was closed when we tried to take a session
// from the pool, then we should have gotten a specific error.
// If the session pool was closed between the take() and now (or
// even during a take()) then an error is ok.
if !wasValid {
if wantErr := errInvalidSessionPool(); !testEqual(gotErr, wantErr) {
t.Fatalf("%v.%v: got error when pool is closed: %v, want %v", ti, idx, gotErr, wantErr)
}
}
continue
}
// Verify if session is valid when session pool is valid.
// Note that if session pool is invalid after sh is taken,
// then sh might be invalidated by healthcheck workers.
if (sh.getID() == "" || sh.session == nil || !sh.session.isValid()) && pool.isValid() {
t.Fatalf("%v.%v.%v: pool.take returns invalid session %v", ti, idx, takeWrite, sh.session)
}
if takeWrite && sh.getTransactionID() == nil {
t.Fatalf("%v.%v: pool.takeWriteSession returns session %v without transaction", ti, idx, sh.session)
}
if rand.Intn(100) < idx {
// Random sleep before destroying/recycling the session,
// to give healthcheck worker a chance to step in.
<-time.After(time.Duration(rand.Int63n(int64(cfg.HealthCheckInterval))))
}
if rand.Intn(100) < idx {
// destroy the session.
sh.destroy()
continue
}
// recycle the session.
sh.recycle()
}
}
// TODO(deklerk): Investigate why this test is flakey, even with waitFor. Example
// flakey failure: session_test.go:946: after 15s waiting, got Scale down.
// Expect 5 open, got 6
//
// TestMaintainer checks the session pool maintainer maintains the number of
// sessions in the following cases:
//
// 1. On initialization of session pool, replenish session pool to meet
// MinOpened or MaxIdle.
// 2. On increased session usage, provision extra MaxIdle sessions.
// 3. After the surge passes, scale down the session pool accordingly.
func TestMaintainer(t *testing.T) {
t.Skip("asserting session state seems flakey")
t.Parallel()
ctx := context.Background()
minOpened := uint64(5)
maxIdle := uint64(4)
server, client := newSpannerInMemTestServerWithConfig(t,
ClientConfig{
SessionPoolConfig: SessionPoolConfig{
MinOpened: minOpened,
MaxIdle: maxIdle,
},
})
defer server.teardown(client)
sp := client.idleSessions
sampleInterval := sp.SessionPoolConfig.healthCheckSampleInterval
waitFor(t, func() error {
sp.mu.Lock()
defer sp.mu.Unlock()
if sp.numOpened != 5 {
return fmt.Errorf("Replenish. Expect %d open, got %d", sp.MinOpened, sp.numOpened)
}
return nil
})
// To save test time, we are not creating many sessions, because the time
// to create sessions will have impact on the decision on sessionsToKeep.
// We also parallelize the take and recycle process.
shs := make([]*sessionHandle, 10)
for i := 0; i < len(shs); i++ {
var err error
shs[i], err = sp.take(ctx)
if err != nil {
t.Fatalf("cannot get session from session pool: %v", err)
}
}
sp.mu.Lock()
if sp.numOpened != 10 {
t.Fatalf("Scale out from normal use. Expect %d open, got %d", 10, sp.numOpened)
}
sp.mu.Unlock()
<-time.After(sampleInterval)
for _, sh := range shs[:7] {
sh.recycle()
}
waitFor(t, func() error {
sp.mu.Lock()
defer sp.mu.Unlock()
if sp.numOpened != 7 {
return fmt.Errorf("Keep extra MaxIdle sessions. Expect %d open, got %d", 7, sp.numOpened)
}
return nil
})
for _, sh := range shs[7:] {
sh.recycle()
}
waitFor(t, func() error {
sp.mu.Lock()
defer sp.mu.Unlock()
if sp.numOpened != minOpened {
return fmt.Errorf("Scale down. Expect %d open, got %d", minOpened, sp.numOpened)
}
return nil
})
}
// Tests that maintainer creates up to MinOpened connections.
//
// Historical context: This test also checks that a low
// healthCheckSampleInterval does not prevent it from opening connections.
// The low healthCheckSampleInterval will however sometimes cause session
// creations to time out. That should not be considered a problem, but it
// could cause the test case to fail if it happens too often.
// See: https://github.com/googleapis/google-cloud-go/issues/1259
func TestMaintainer_CreatesSessions(t *testing.T) {
t.Parallel()
spc := SessionPoolConfig{
MinOpened: 10,
MaxIdle: 10,
healthCheckSampleInterval: 20 * time.Millisecond,
}
server, client := newSpannerInMemTestServerWithConfig(t,
ClientConfig{
SessionPoolConfig: spc,
})
defer server.teardown(client)
sp := client.idleSessions
timeoutAmt := 4 * time.Second
timeout := time.After(timeoutAmt)
var numOpened uint64
loop:
for {
select {
case <-timeout:
t.Fatalf("timed out after %v, got %d session(s), want %d", timeoutAmt, numOpened, spc.MinOpened)
default:
sp.mu.Lock()
numOpened = sp.numOpened
sp.mu.Unlock()
if numOpened == 10 {
break loop
}
}
}
}
func (s1 *session) Equal(s2 *session) bool {
return s1.client == s2.client &&
s1.id == s2.id &&
s1.pool == s2.pool &&
s1.createTime == s2.createTime &&
s1.valid == s2.valid &&
s1.hcIndex == s2.hcIndex &&
s1.idleList == s2.idleList &&
s1.nextCheck.Equal(s2.nextCheck) &&
s1.checkingHealth == s2.checkingHealth &&
testEqual(s1.md, s2.md) &&
bytes.Equal(s1.tx, s2.tx)
}
func waitFor(t *testing.T, assert func() error) {
t.Helper()
timeout := 15 * time.Second
ta := time.After(timeout)
for {
select {
case <-ta:
if err := assert(); err != nil {
t.Fatalf("after %v waiting, got %v", timeout, err)
}
return
default:
}
if err := assert(); err != nil {
// Fail. Let's pause and retry.
time.Sleep(10 * time.Millisecond)
continue
}
return
}
}