spanner: add session pool maintainer to maintain the session pool size

The session pool maintainer will compute sessionsToKeep from the
SessionPoolConfig and by keeping the MaxSessionInUse over a period of
time. And try to keep the session pool size.

Breaking:
- Memove MaxSessionAge, we are deprecating MaxSessionAge in favor of
session pool maintainer.
Misc:
- Use sessions in round robin.
- Set default maxSessionAge.
- Set tests to run in parallel.
- Add closeCh for faster teardown.
Fix:
- Update session creation time on refresu.

Change-Id: I7ffeb8f2e9a6ddfdaaf5a62ff634556f379103f7
Reviewed-on: https://code-review.googlesource.com/14210
Reviewed-by: kokoro <noreply+kokoro@google.com>
Reviewed-by: Jonathan Amsterdam <jba@google.com>
Reviewed-by: Vikas Kedia <vikask@google.com>
diff --git a/spanner/client.go b/spanner/client.go
index 13b5a75..221e261 100644
--- a/spanner/client.go
+++ b/spanner/client.go
@@ -138,6 +138,10 @@
 	if config.MaxBurst == 0 {
 		config.MaxBurst = 10
 	}
+	// Default MaxSessionAge
+	if config.maxSessionAge == 0 {
+		config.maxSessionAge = time.Minute * 30
+	}
 	for i := 0; i < config.NumChannels; i++ {
 		conn, err := gtransport.Dial(ctx, allOpts...)
 		if err != nil {
diff --git a/spanner/session.go b/spanner/session.go
index 6930a3a..ae60d85 100644
--- a/spanner/session.go
+++ b/spanner/session.go
@@ -201,6 +201,7 @@
 	if s.valid && s.idleList != nil {
 		// session is in idle list, refresh its session id.
 		sid, s.id = s.id, sid
+		s.createTime = time.Now()
 		if s.tx != nil {
 			s.tx = nil
 			s.pool.idleWriteList.Remove(s.idleList)
@@ -351,17 +352,23 @@
 	// to be broken, it will still be evicted from session pool, therefore it is
 	// posssible that the number of opened sessions drops below MinOpened.
 	MinOpened uint64
-	// MaxSessionAge is the maximum duration that a session can be reused, zero
+	// maxSessionAge is the maximum duration that a session can be reused, zero
 	// means session pool will never expire sessions.
-	MaxSessionAge time.Duration
+	maxSessionAge time.Duration
+	// MaxIdle is the maximum number of idle sessions, pool is allowed to keep. Defaults to 0.
+	MaxIdle uint64
 	// MaxBurst is the maximum number of concurrent session creation requests. Defaults to 10.
 	MaxBurst uint64
 	// WriteSessions is the fraction of sessions we try to keep prepared for write.
 	WriteSessions float64
 	// HealthCheckWorkers is number of workers used by health checker for this pool.
 	HealthCheckWorkers int
-	// HealthCheckInterval is how often the health checker pings a session.
+	// HealthCheckInterval is how often the health checker pings a session. Defaults to 5 min.
 	HealthCheckInterval time.Duration
+	// healthCheckMaintainerEnabled enables the session pool maintainer.
+	healthCheckMaintainerEnabled bool
+	// healthCheckSampleInterval is how often the health checker samples live session (for use in maintaining session pool size). Defaults to 1 min.
+	healthCheckSampleInterval time.Duration
 }
 
 // errNoRPCGetter returns error for SessionPoolConfig missing getRPCClient method.
@@ -436,9 +443,13 @@
 	if config.HealthCheckInterval == 0 {
 		config.HealthCheckInterval = 5 * time.Minute
 	}
+	if config.healthCheckSampleInterval == 0 {
+		config.healthCheckSampleInterval = time.Minute
+	}
 	// On GCE VM, within the same region an healthcheck ping takes on average 10ms to finish, given a 5 minutes interval and
 	// 10 healthcheck workers, a healthChecker can effectively mantain 100 checks_per_worker/sec * 10 workers * 300 seconds = 300K sessions.
-	pool.hc = newHealthChecker(config.HealthCheckInterval, config.HealthCheckWorkers, pool)
+	pool.hc = newHealthChecker(config.HealthCheckInterval, config.HealthCheckWorkers, config.healthCheckSampleInterval, pool)
+	close(pool.hc.ready)
 	return pool, nil
 }
 
@@ -666,15 +677,15 @@
 		// Reject the session if session is invalid or pool itself is invalid.
 		return false
 	}
-	if p.MaxSessionAge != 0 && s.createTime.Add(p.MaxSessionAge).Before(time.Now()) && p.numOpened > p.MinOpened {
-		// session expires and number of opened sessions exceeds MinOpened, let the session destroy itself.
+	if p.maxSessionAge != 0 && s.createTime.Add(p.maxSessionAge).Before(time.Now()) && p.numOpened > p.MinOpened {
+		// session expires and number of opened sessions exceeds MinOpened, let the session  itself.
 		return false
 	}
-	// Hot sessions will be converging at the front of the list, cold sessions will be evicted by healthcheck workers.
+	// Put session at the back of the list to round robin for load balancing across channels.
 	if s.isWritePrepared() {
-		s.setIdleList(p.idleWriteList.PushFront(s))
+		s.setIdleList(p.idleWriteList.PushBack(s))
 	} else {
-		s.setIdleList(p.idleList.PushFront(s))
+		s.setIdleList(p.idleList.PushBack(s))
 	}
 	// Broadcast that a session has been returned to idle list.
 	close(p.mayGetSession)
@@ -762,21 +773,34 @@
 	waitWorkers sync.WaitGroup
 	// pool is the underlying session pool.
 	pool *sessionPool
-	// closed marks if a healthChecker has been closed.
-	closed bool
+	// sampleInterval is the interval of sampling by the maintainer.
+	sampleInterval time.Duration
+	// ready is used to signal that maintainer can start running.
+	ready chan struct{}
+	// done is used to signal that health checker should be closed.
+	done chan struct{}
+	// once is used for closing channel done only once.
+	once sync.Once
 }
 
 // newHealthChecker initializes new instance of healthChecker.
-func newHealthChecker(interval time.Duration, workers int, pool *sessionPool) *healthChecker {
+func newHealthChecker(interval time.Duration, workers int, sampleInterval time.Duration, pool *sessionPool) *healthChecker {
 	if workers <= 0 {
 		workers = 1
 	}
 	hc := &healthChecker{
-		interval: interval,
-		workers:  workers,
-		pool:     pool,
+		interval:       interval,
+		workers:        workers,
+		pool:           pool,
+		sampleInterval: sampleInterval,
+		ready:          make(chan struct{}),
+		done:           make(chan struct{}),
 	}
-	for i := 0; i < hc.workers; i++ {
+	if hc.pool.healthCheckMaintainerEnabled {
+		hc.waitWorkers.Add(1)
+		go hc.maintainer()
+	}
+	for i := 1; i <= hc.workers; i++ {
 		hc.waitWorkers.Add(1)
 		go hc.worker(i)
 	}
@@ -785,17 +809,18 @@
 
 // close closes the healthChecker and waits for all healthcheck workers to exit.
 func (hc *healthChecker) close() {
-	hc.mu.Lock()
-	hc.closed = true
-	hc.mu.Unlock()
+	hc.once.Do(func() { close(hc.done) })
 	hc.waitWorkers.Wait()
 }
 
 // isClosing checks if a healthChecker is already closing.
 func (hc *healthChecker) isClosing() bool {
-	hc.mu.Lock()
-	defer hc.mu.Unlock()
-	return hc.closed
+	select {
+	case <-hc.done:
+		return true
+	default:
+		return false
+	}
 }
 
 // getInterval gets the healthcheck interval.
@@ -856,7 +881,7 @@
 		s.destroy(false)
 		return
 	}
-	if s.pool.MaxSessionAge != 0 && s.createTime.Add(s.pool.MaxSessionAge).Before(time.Now()) {
+	if s.pool.maxSessionAge != 0 && s.createTime.Add(s.pool.maxSessionAge).Before(time.Now()) {
 		// Session reaches its maximum age, retire it. Failing that try to refresh it.
 		if s.destroy(true) || !s.refreshIdle() {
 			return
@@ -870,9 +895,6 @@
 
 // worker performs the healthcheck on sessions in healthChecker's priority queue.
 func (hc *healthChecker) worker(i int) {
-	if log.V(2) {
-		log.Infof("Starting health check worker %v", i)
-	}
 	// Returns a session which we should ping to keep it alive.
 	getNextForPing := func() *session {
 		hc.pool.mu.Lock()
@@ -918,9 +940,6 @@
 
 	for {
 		if hc.isClosing() {
-			if log.V(2) {
-				log.Infof("Closing health check worker %v", i)
-			}
 			// Exit when the pool has been closed and all sessions have been destroyed
 			// or when health checker has been closed.
 			hc.waitWorkers.Done()
@@ -945,7 +964,13 @@
 				if pause > int64(hc.interval) {
 					pause = int64(hc.interval)
 				}
-				<-time.After(time.Duration(rand.Int63n(pause) + pause/2))
+				select {
+				case <-time.After(time.Duration(rand.Int63n(pause) + pause/2)):
+					break
+				case <-hc.done:
+					break
+				}
+
 			}
 			continue
 		}
@@ -953,6 +978,135 @@
 	}
 }
 
+// maintainer maintains the maxSessionsInUse by a window of kWindowSize * sampleInterval.
+// Based on this information, health checker will try to maintain the number of sessions by hc..
+func (hc *healthChecker) maintainer() {
+	// Wait so that pool is ready.
+	<-hc.ready
+
+	var (
+		kWindowSize uint64 = 10
+		iteration   uint64 = 0
+		timeout     <-chan time.Time
+	)
+
+	// replenishPool is run if numOpened is less than sessionsToKeep, timeouts on sampleInterval.
+	replenishPool := func(sessionsToKeep uint64) {
+		ctx, _ := context.WithTimeout(context.Background(), hc.sampleInterval)
+		for {
+			select {
+			case <-timeout:
+				return
+			default:
+				break
+			}
+
+			p := hc.pool
+			p.mu.Lock()
+			// Take budget before the actual session creation.
+			if sessionsToKeep <= p.numOpened {
+				p.mu.Unlock()
+				break
+			}
+			p.numOpened++
+			p.createReqs++
+			shouldPrepareWrite := p.shouldPrepareWrite()
+			p.mu.Unlock()
+			var (
+				s   *session
+				err error
+			)
+			if s, err = p.createSession(ctx); err != nil {
+				log.Warningf("Failed to create session, error: %v", toSpannerError(err))
+				continue
+			}
+			if shouldPrepareWrite {
+				if err = s.prepareForWrite(ctx); err != nil {
+					log.Warningf("Failed to prepare session, error: %v", toSpannerError(err))
+					continue
+				}
+			}
+			p.recycle(s)
+		}
+	}
+
+	// shrinkPool, scales down the session pool.
+	shrinkPool := func(sessionsToKeep uint64) {
+		for {
+			select {
+			case <-timeout:
+				return
+			default:
+				break
+			}
+
+			p := hc.pool
+			p.mu.Lock()
+
+			if sessionsToKeep >= p.numOpened {
+				p.mu.Unlock()
+				break
+			}
+
+			var s *session
+			if p.idleList.Len() > 0 {
+				s = p.idleList.Front().Value.(*session)
+			} else if p.idleWriteList.Len() > 0 {
+				s = p.idleWriteList.Front().Value.(*session)
+			}
+			p.mu.Unlock()
+			if s != nil {
+				// destroy session as expire.
+				s.destroy(true)
+			} else {
+				break
+			}
+		}
+	}
+
+	for {
+		if hc.isClosing() {
+			hc.waitWorkers.Done()
+			return
+		}
+
+		// maxSessionsInUse is the maximum number of sessions in use concurrently over a period of time.
+		var maxSessionsInUse uint64
+
+		// Updates metrics.
+		hc.pool.mu.Lock()
+		currSessionsInUse := hc.pool.numOpened - uint64(hc.pool.idleList.Len()) - uint64(hc.pool.idleWriteList.Len())
+		currSessionsOpened := hc.pool.numOpened
+		hc.pool.mu.Unlock()
+
+		hc.mu.Lock()
+		if iteration%kWindowSize == 0 || maxSessionsInUse < currSessionsInUse {
+			maxSessionsInUse = currSessionsInUse
+		}
+		sessionsToKeep := maxUint64(hc.pool.MinOpened,
+			minUint64(currSessionsOpened, hc.pool.MaxIdle+maxSessionsInUse))
+		hc.mu.Unlock()
+
+		timeout = time.After(hc.sampleInterval)
+		// Replenish or Shrink pool if needed.
+		// Note: we don't need to worry about pending create session requests, we only need to sample the current sessions in use.
+		// the routines will not try to create extra / delete creating sessions.
+		if sessionsToKeep > currSessionsOpened {
+			replenishPool(sessionsToKeep)
+		} else {
+			shrinkPool(sessionsToKeep)
+		}
+
+		select {
+		case <-timeout:
+			break
+		case <-hc.done:
+			break
+		}
+		iteration++
+	}
+}
+
 // shouldDropSession returns true if a particular error leads to the removal of a session
 func shouldDropSession(err error) bool {
 	if err == nil {
diff --git a/spanner/session_test.go b/spanner/session_test.go
index 7c3d4f8..f3dc674 100644
--- a/spanner/session_test.go
+++ b/spanner/session_test.go
@@ -38,7 +38,13 @@
 	spc.getRPCClient = func() (sppb.SpannerClient, error) {
 		return sc, nil
 	}
-	spc.HealthCheckInterval = 50 * time.Millisecond
+	if spc.HealthCheckInterval == 0 {
+		spc.HealthCheckInterval = 50 * time.Millisecond
+	}
+	if spc.healthCheckSampleInterval == 0 {
+		spc.healthCheckSampleInterval = 10 * time.Millisecond
+	}
+	spc.healthCheckMaintainerEnabled = true
 	sp, err := newSessionPool("mockdb", spc, nil)
 	if err != nil {
 		t.Fatalf("cannot create session pool: %v", err)
@@ -51,6 +57,7 @@
 
 // TestSessionCreation tests session creation during sessionPool.Take().
 func TestSessionCreation(t *testing.T) {
+	t.Parallel()
 	sp, sc, cancel := setup(t, SessionPoolConfig{})
 	defer cancel()
 	// Take three sessions from session pool, this should trigger session pool to create three new sessions.
@@ -96,7 +103,8 @@
 
 // TestTakeFromIdleList tests taking sessions from session pool's idle list.
 func TestTakeFromIdleList(t *testing.T) {
-	sp, sc, cancel := setup(t, SessionPoolConfig{})
+	t.Parallel()
+	sp, sc, cancel := setup(t, SessionPoolConfig{MaxIdle: 10}) // make sure maintainer keeps the idle sessions
 	defer cancel()
 	// Take ten sessions from session pool and recycle them.
 	shs := make([]*sessionHandle, 10)
@@ -107,6 +115,8 @@
 			t.Errorf("failed to get session(%v): %v", i, err)
 		}
 	}
+	// Make sure it's sampled once before recycling, otherwise it will be cleaned up.
+	<-time.After(sp.SessionPoolConfig.healthCheckSampleInterval)
 	for i := 0; i < len(shs); i++ {
 		shs[i].recycle()
 	}
@@ -131,7 +141,8 @@
 
 // TesttakeWriteSessionFromIdleList tests taking write sessions from session pool's idle list.
 func TestTakeWriteSessionFromIdleList(t *testing.T) {
-	sp, sc, cancel := setup(t, SessionPoolConfig{})
+	t.Parallel()
+	sp, sc, cancel := setup(t, SessionPoolConfig{MaxIdle: 20}) // make sure maintainer keeps the idle sessions
 	defer cancel()
 	act := testutil.NewAction("Begin", nil)
 	acts := make([]testutil.Action, 20)
@@ -148,6 +159,8 @@
 			t.Errorf("failed to get session(%v): %v", i, err)
 		}
 	}
+	// Make sure it's sampled once before recycling, otherwise it will be cleaned up.
+	<-time.After(sp.SessionPoolConfig.healthCheckSampleInterval)
 	for i := 0; i < len(shs); i++ {
 		shs[i].recycle()
 	}
@@ -172,10 +185,11 @@
 
 // TestTakeFromIdleListChecked tests taking sessions from session pool's idle list, but with a extra ping check.
 func TestTakeFromIdleListChecked(t *testing.T) {
+	t.Parallel()
 	if testing.Short() {
 		t.SkipNow()
 	}
-	sp, sc, cancel := setup(t, SessionPoolConfig{})
+	sp, sc, cancel := setup(t, SessionPoolConfig{MaxIdle: 1}) // make sure maintainer keeps the idle sessions
 	defer cancel()
 	// Stop healthcheck workers to simulate slow pings.
 	sp.hc.close()
@@ -184,6 +198,8 @@
 	if err != nil {
 		t.Errorf("failed to get session: %v", err)
 	}
+	// Make sure it's sampled once before recycling, otherwise it will be cleaned up.
+	<-time.After(sp.SessionPoolConfig.healthCheckSampleInterval)
 	wantSid := sh.getID()
 	sh.recycle()
 	<-time.After(time.Second)
@@ -225,10 +241,11 @@
 
 // TestTakeFromIdleWriteListChecked tests taking sessions from session pool's idle list, but with a extra ping check.
 func TestTakeFromIdleWriteListChecked(t *testing.T) {
+	t.Parallel()
 	if testing.Short() {
 		t.SkipNow()
 	}
-	sp, sc, cancel := setup(t, SessionPoolConfig{})
+	sp, sc, cancel := setup(t, SessionPoolConfig{MaxIdle: 1}) // make sure maintainer keeps the idle sessions
 	defer cancel()
 	sc.MakeNice()
 	// Stop healthcheck workers to simulate slow pings.
@@ -239,6 +256,8 @@
 		t.Errorf("failed to get session: %v", err)
 	}
 	wantSid := sh.getID()
+	// Make sure it's sampled once before recycling, otherwise it will be cleaned up.
+	<-time.After(sp.SessionPoolConfig.healthCheckSampleInterval)
 	sh.recycle()
 	<-time.After(time.Second)
 	// Two back-to-back session requests, both of them should return the same session created before and
@@ -279,6 +298,7 @@
 
 // TestMaxOpenedSessions tests max open sessions constraint.
 func TestMaxOpenedSessions(t *testing.T) {
+	t.Parallel()
 	if testing.Short() {
 		t.SkipNow()
 	}
@@ -337,12 +357,13 @@
 	defer sp.mu.Unlock()
 	// There should be still one session left in idle list due to the min open sessions constraint.
 	if sp.idleList.Len() != 1 {
-		t.Errorf("got %v sessions in idle list, want 1", sp.idleList.Len())
+		t.Errorf("got %v sessions in idle list, want 1 %d", sp.idleList.Len(), sp.numOpened)
 	}
 }
 
 // TestMaxBurst tests max burst constraint.
 func TestMaxBurst(t *testing.T) {
+	t.Parallel()
 	if testing.Short() {
 		t.SkipNow()
 	}
@@ -385,13 +406,29 @@
 
 // TestSessionrecycle tests recycling sessions.
 func TestSessionRecycle(t *testing.T) {
+	t.Parallel()
 	if testing.Short() {
 		t.SkipNow()
 	}
-	sp, _, cancel := setup(t, SessionPoolConfig{MaxSessionAge: 100 * time.Millisecond, MinOpened: 1})
+	sp, _, cancel := setup(t, SessionPoolConfig{maxSessionAge: 100 * time.Millisecond, MinOpened: 1, MaxIdle: 2})
+	// Set MaxIdle to ensure shs[0] is not destroyed from scale down.
 	// Healthcheck is explicitly turned off in this test because it might aggressively expire sessions in idle list.
 	sp.hc.close()
 	defer cancel()
+
+	// Test session is correctly recycled and reused.
+	for i := 0; i < 20; i++ {
+		s, err := sp.take(context.Background())
+		if err != nil {
+			t.Errorf("cannot get the session %v: %v", i, err)
+		}
+		s.recycle()
+	}
+	if sp.numOpened != 1 {
+		t.Errorf("Expect session pool size %d, got %d", 1, sp.numOpened)
+	}
+
+	// Test recycling expired session.
 	var ss []*session
 	shs := make([]*sessionHandle, 2)
 	for i := 0; i < len(shs); i++ {
@@ -419,6 +456,7 @@
 
 // TestSessionDestroy tests destroying sessions.
 func TestSessionDestroy(t *testing.T) {
+	t.Parallel()
 	sp, _, cancel := setup(t, SessionPoolConfig{MinOpened: 1})
 	defer cancel()
 	sh, err := sp.take(context.Background())
@@ -471,6 +509,7 @@
 
 // TestHealthCheckScheduler tests if healthcheck workers can schedule and perform healthchecks properly.
 func TestHealthCheckScheduler(t *testing.T) {
+	t.Parallel()
 	if testing.Short() {
 		t.SkipNow()
 	}
@@ -506,7 +545,7 @@
 	if testing.Short() {
 		t.SkipNow()
 	}
-	sp, sc, cancel := setup(t, SessionPoolConfig{WriteSessions: 0.5})
+	sp, sc, cancel := setup(t, SessionPoolConfig{WriteSessions: 0.5, MaxIdle: 20})
 	sc.MakeNice()
 	defer cancel()
 	shs := make([]*sessionHandle, 10)
@@ -557,10 +596,11 @@
 
 // TestTakeFromWriteQueue tests that sessionPool.take() returns write prepared sessions as well.
 func TestTakeFromWriteQueue(t *testing.T) {
+	t.Parallel()
 	if testing.Short() {
 		t.SkipNow()
 	}
-	sp, sc, cancel := setup(t, SessionPoolConfig{MaxOpened: 1, WriteSessions: 1.0})
+	sp, sc, cancel := setup(t, SessionPoolConfig{MaxOpened: 1, WriteSessions: 1.0, MaxIdle: 1})
 	sc.MakeNice()
 	defer cancel()
 	sh, err := sp.take(context.Background())
@@ -585,10 +625,11 @@
 
 // TestSessionHealthCheck tests healthchecking cases.
 func TestSessionHealthCheck(t *testing.T) {
+	t.Parallel()
 	if testing.Short() {
 		t.SkipNow()
 	}
-	sp, sc, cancel := setup(t, SessionPoolConfig{MaxSessionAge: 2 * time.Second})
+	sp, sc, cancel := setup(t, SessionPoolConfig{maxSessionAge: 2 * time.Second})
 	defer cancel()
 	// Test pinging sessions.
 	sh, err := sp.take(context.Background())
@@ -632,7 +673,7 @@
 	// Test session id refresh.
 	// Recreate the session pool with min open sessions constraint.
 	sp, err = newSessionPool("mockdb", SessionPoolConfig{
-		MaxSessionAge: time.Second,
+		maxSessionAge: time.Second,
 		MinOpened:     1,
 		getRPCClient: func() (sppb.SpannerClient, error) {
 			return sc, nil
@@ -667,21 +708,23 @@
 // During the test, it is expected that all sessions that are taken from session pool remains valid and
 // when all test workers and healthcheck workers exit, mockclient, session pool and healthchecker should be in consistent state.
 func TestStressSessionPool(t *testing.T) {
+	t.Parallel()
 	// Use concurrent workers to test different session pool built from different configurations.
 	if testing.Short() {
 		t.SkipNow()
 	}
 	for ti, cfg := range []SessionPoolConfig{
 		SessionPoolConfig{},
-		SessionPoolConfig{MaxSessionAge: 20 * time.Millisecond},
+		SessionPoolConfig{maxSessionAge: 20 * time.Millisecond},
 		SessionPoolConfig{MinOpened: 10, MaxOpened: 100},
 		SessionPoolConfig{MaxBurst: 50},
-		SessionPoolConfig{MaxSessionAge: 20 * time.Millisecond, MinOpened: 10, MaxOpened: 200, MaxBurst: 5},
-		SessionPoolConfig{MaxSessionAge: 20 * time.Millisecond, MinOpened: 10, MaxOpened: 200, MaxBurst: 5, WriteSessions: 0.2},
+		SessionPoolConfig{maxSessionAge: 20 * time.Millisecond, MinOpened: 10, MaxOpened: 200, MaxBurst: 5},
+		SessionPoolConfig{maxSessionAge: 20 * time.Millisecond, MinOpened: 10, MaxOpened: 200, MaxBurst: 5, WriteSessions: 0.2},
 	} {
 		var wg sync.WaitGroup
 		// Create a more aggressive session healthchecker to increase test concurrency.
 		cfg.HealthCheckInterval = 50 * time.Millisecond
+		cfg.healthCheckSampleInterval = 10 * time.Millisecond
 		cfg.HealthCheckWorkers = 50
 		sc := testutil.NewMockCloudSpannerClient(t)
 		sc.MakeNice()
@@ -728,9 +771,9 @@
 					if takeWrite && sh.getTransactionID() == nil {
 						t.Errorf("%v.%v: pool.takeWriteSession returns session %v without transaction", ti, idx, sh.session)
 					}
-					if int64(cfg.MaxSessionAge) > 0 && rand.Intn(100) < idx {
+					if int64(cfg.maxSessionAge) > 0 && rand.Intn(100) < idx {
 						// Random sleep before destroying/recycling the session, to give healthcheck worker a chance to step in.
-						<-time.After(time.Duration(rand.Int63n(int64(cfg.MaxSessionAge))))
+						<-time.After(time.Duration(rand.Int63n(int64(cfg.maxSessionAge))))
 					}
 					if rand.Intn(100) < idx {
 						// destroy the session.
@@ -790,3 +833,64 @@
 		}
 	}
 }
+
+// TestMaintainer checks the session pool maintainer maintains the number of sessions in the following cases
+// 1. On initialization of session pool, replenish session pool to meet MinOpened or MaxIdle.
+// 2. On increased session usage, provision extra MaxIdle sessions.
+// 3. After the surge passes, scale down the session pool accordingly.
+func TestMaintainer(t *testing.T) {
+	t.Parallel()
+	if testing.Short() {
+		t.SkipNow()
+	}
+	var (
+		kMinOpened uint64 = 5
+		kMaxIdle   uint64 = 4
+	)
+	sp, _, cancel := setup(t, SessionPoolConfig{MinOpened: kMinOpened, MaxIdle: kMaxIdle})
+	sampleInterval := sp.SessionPoolConfig.healthCheckSampleInterval
+	hcInterval := sp.SessionPoolConfig.HealthCheckInterval
+	defer cancel()
+
+	<-time.After(sampleInterval * 1)
+	if sp.numOpened != 5 {
+		t.Errorf("Replenish. Expect %d open, got %d", sp.MinOpened, sp.numOpened)
+	}
+
+	// To save test time, we are not creating many sessions, because the time to create sessions will have impact on the decision on sessionsToKeep. We also parallelize the take and recycle process.
+	shs := make([]*sessionHandle, 10)
+	for i := 0; i < len(shs); i++ {
+		var err error
+		shs[i], err = sp.take(context.Background())
+		if err != nil {
+			t.Errorf("cannot get session from session pool: %v", err)
+		}
+	}
+	sp.mu.Lock()
+	if sp.numOpened != 10 {
+		t.Errorf("Scale out from normal use. Expect %d open, got %d", 10, sp.numOpened)
+	}
+	sp.mu.Unlock()
+
+	<-time.After(sampleInterval)
+	for _, sh := range shs[:7] {
+		sh.recycle()
+	}
+
+	<-time.After(sampleInterval * 2)
+	sp.mu.Lock()
+	if sp.numOpened != 7 {
+		t.Errorf("Keep extra MaxIdle sessions. Expect %d open, got %d", 7, sp.numOpened)
+	}
+	sp.mu.Unlock()
+
+	for _, sh := range shs[7:] {
+		sh.recycle()
+	}
+	<-time.After(sampleInterval*10 + hcInterval)
+	sp.mu.Lock()
+	if sp.numOpened != kMinOpened {
+		t.Errorf("Scale down. Expect %d open, got %d", kMinOpened, sp.numOpened)
+	}
+	sp.mu.Unlock()
+}
diff --git a/spanner/util.go b/spanner/util.go
new file mode 100644
index 0000000..d35fec2
--- /dev/null
+++ b/spanner/util.go
@@ -0,0 +1,33 @@
+/*
+Copyright 2017 Google Inc. All Rights Reserved.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package spanner
+
+// maxUint64 returns the maximum of two uint64
+func maxUint64(a, b uint64) uint64 {
+	if a > b {
+		return a
+	}
+	return b
+}
+
+// minUint64 returns the minimum of two uint64
+func minUint64(a, b uint64) uint64 {
+	if a > b {
+		return b
+	}
+	return a
+}