| /* |
| Copyright 2017 Google LLC |
| |
| Licensed under the Apache License, Version 2.0 (the "License"); |
| you may not use this file except in compliance with the License. |
| You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| package spanner |
| |
| import ( |
| "container/heap" |
| "container/list" |
| "context" |
| "fmt" |
| "log" |
| "math" |
| "math/rand" |
| "strings" |
| "sync" |
| "time" |
| |
| "cloud.google.com/go/internal/trace" |
| vkit "cloud.google.com/go/spanner/apiv1" |
| sppb "google.golang.org/genproto/googleapis/spanner/v1" |
| "google.golang.org/grpc/codes" |
| "google.golang.org/grpc/metadata" |
| ) |
| |
| // sessionHandle is an interface for transactions to access Cloud Spanner |
| // sessions safely. It is generated by sessionPool.take(). |
| type sessionHandle struct { |
| // mu guarantees that the inner session object is returned / destroyed only |
| // once. |
| mu sync.Mutex |
| // session is a pointer to a session object. Transactions never need to |
| // access it directly. |
| session *session |
| } |
| |
| // recycle gives the inner session object back to its home session pool. It is |
| // safe to call recycle multiple times but only the first one would take effect. |
| func (sh *sessionHandle) recycle() { |
| sh.mu.Lock() |
| defer sh.mu.Unlock() |
| if sh.session == nil { |
| // sessionHandle has already been recycled. |
| return |
| } |
| sh.session.recycle() |
| sh.session = nil |
| } |
| |
| // getID gets the Cloud Spanner session ID from the internal session object. |
| // getID returns empty string if the sessionHandle is nil or the inner session |
| // object has been released by recycle / destroy. |
| func (sh *sessionHandle) getID() string { |
| sh.mu.Lock() |
| defer sh.mu.Unlock() |
| if sh.session == nil { |
| // sessionHandle has already been recycled/destroyed. |
| return "" |
| } |
| return sh.session.getID() |
| } |
| |
| // getClient gets the Cloud Spanner RPC client associated with the session ID |
| // in sessionHandle. |
| func (sh *sessionHandle) getClient() *vkit.Client { |
| sh.mu.Lock() |
| defer sh.mu.Unlock() |
| if sh.session == nil { |
| return nil |
| } |
| return sh.session.client |
| } |
| |
| // getMetadata returns the metadata associated with the session in sessionHandle. |
| func (sh *sessionHandle) getMetadata() metadata.MD { |
| sh.mu.Lock() |
| defer sh.mu.Unlock() |
| if sh.session == nil { |
| return nil |
| } |
| return sh.session.md |
| } |
| |
| // getTransactionID returns the transaction id in the session if available. |
| func (sh *sessionHandle) getTransactionID() transactionID { |
| sh.mu.Lock() |
| defer sh.mu.Unlock() |
| if sh.session == nil { |
| return nil |
| } |
| return sh.session.tx |
| } |
| |
| // destroy destroys the inner session object. It is safe to call destroy |
| // multiple times and only the first call would attempt to |
| // destroy the inner session object. |
| func (sh *sessionHandle) destroy() { |
| sh.mu.Lock() |
| s := sh.session |
| sh.session = nil |
| sh.mu.Unlock() |
| if s == nil { |
| // sessionHandle has already been destroyed.. |
| return |
| } |
| s.destroy(false) |
| } |
| |
| // session wraps a Cloud Spanner session ID through which transactions are |
| // created and executed. |
| type session struct { |
| // client is the RPC channel to Cloud Spanner. It is set only once during |
| // session's creation. |
| client *vkit.Client |
| // id is the unique id of the session in Cloud Spanner. It is set only once |
| // during session's creation. |
| id string |
| // pool is the session's home session pool where it was created. It is set |
| // only once during session's creation. |
| pool *sessionPool |
| // createTime is the timestamp of the session's creation. It is set only |
| // once during session's creation. |
| createTime time.Time |
| |
| // mu protects the following fields from concurrent access: both |
| // healthcheck workers and transactions can modify them. |
| mu sync.Mutex |
| // valid marks the validity of a session. |
| valid bool |
| // hcIndex is the index of the session inside the global healthcheck queue. |
| // If hcIndex < 0, session has been unregistered from the queue. |
| hcIndex int |
| // idleList is the linkedlist node which links the session to its home |
| // session pool's idle list. If idleList == nil, the |
| // session is not in idle list. |
| idleList *list.Element |
| // nextCheck is the timestamp of next scheduled healthcheck of the session. |
| // It is maintained by the global health checker. |
| nextCheck time.Time |
| // checkingHelath is true if currently this session is being processed by |
| // health checker. Must be modified under health checker lock. |
| checkingHealth bool |
| // md is the Metadata to be sent with each request. |
| md metadata.MD |
| // tx contains the transaction id if the session has been prepared for |
| // write. |
| tx transactionID |
| } |
| |
| // isValid returns true if the session is still valid for use. |
| func (s *session) isValid() bool { |
| s.mu.Lock() |
| defer s.mu.Unlock() |
| return s.valid |
| } |
| |
| // isWritePrepared returns true if the session is prepared for write. |
| func (s *session) isWritePrepared() bool { |
| s.mu.Lock() |
| defer s.mu.Unlock() |
| return s.tx != nil |
| } |
| |
| // String implements fmt.Stringer for session. |
| func (s *session) String() string { |
| s.mu.Lock() |
| defer s.mu.Unlock() |
| return fmt.Sprintf("<id=%v, hcIdx=%v, idleList=%p, valid=%v, create=%v, nextcheck=%v>", |
| s.id, s.hcIndex, s.idleList, s.valid, s.createTime, s.nextCheck) |
| } |
| |
| // ping verifies if the session is still alive in Cloud Spanner. |
| func (s *session) ping() error { |
| ctx, cancel := context.WithTimeout(context.Background(), time.Second) |
| defer cancel() |
| // s.getID is safe even when s is invalid. |
| _, err := s.client.GetSession(contextWithOutgoingMetadata(ctx, s.md), &sppb.GetSessionRequest{Name: s.getID()}) |
| return err |
| } |
| |
| // setHcIndex atomically sets the session's index in the healthcheck queue and |
| // returns the old index. |
| func (s *session) setHcIndex(i int) int { |
| s.mu.Lock() |
| defer s.mu.Unlock() |
| oi := s.hcIndex |
| s.hcIndex = i |
| return oi |
| } |
| |
| // setIdleList atomically sets the session's idle list link and returns the old |
| // link. |
| func (s *session) setIdleList(le *list.Element) *list.Element { |
| s.mu.Lock() |
| defer s.mu.Unlock() |
| old := s.idleList |
| s.idleList = le |
| return old |
| } |
| |
| // invalidate marks a session as invalid and returns the old validity. |
| func (s *session) invalidate() bool { |
| s.mu.Lock() |
| defer s.mu.Unlock() |
| ov := s.valid |
| s.valid = false |
| return ov |
| } |
| |
| // setNextCheck sets the timestamp for next healthcheck on the session. |
| func (s *session) setNextCheck(t time.Time) { |
| s.mu.Lock() |
| defer s.mu.Unlock() |
| s.nextCheck = t |
| } |
| |
| // setTransactionID sets the transaction id in the session |
| func (s *session) setTransactionID(tx transactionID) { |
| s.mu.Lock() |
| defer s.mu.Unlock() |
| s.tx = tx |
| } |
| |
| // getID returns the session ID which uniquely identifies the session in Cloud |
| // Spanner. |
| func (s *session) getID() string { |
| s.mu.Lock() |
| defer s.mu.Unlock() |
| return s.id |
| } |
| |
| // getHcIndex returns the session's index into the global healthcheck priority |
| // queue. |
| func (s *session) getHcIndex() int { |
| s.mu.Lock() |
| defer s.mu.Unlock() |
| return s.hcIndex |
| } |
| |
| // getIdleList returns the session's link in its home session pool's idle list. |
| func (s *session) getIdleList() *list.Element { |
| s.mu.Lock() |
| defer s.mu.Unlock() |
| return s.idleList |
| } |
| |
| // getNextCheck returns the timestamp for next healthcheck on the session. |
| func (s *session) getNextCheck() time.Time { |
| s.mu.Lock() |
| defer s.mu.Unlock() |
| return s.nextCheck |
| } |
| |
| // recycle turns the session back to its home session pool. |
| func (s *session) recycle() { |
| s.setTransactionID(nil) |
| if !s.pool.recycle(s) { |
| // s is rejected by its home session pool because it expired and the |
| // session pool currently has enough open sessions. |
| s.destroy(false) |
| } |
| } |
| |
| // destroy removes the session from its home session pool, healthcheck queue |
| // and Cloud Spanner service. |
| func (s *session) destroy(isExpire bool) bool { |
| // Remove s from session pool. |
| if !s.pool.remove(s, isExpire) { |
| return false |
| } |
| // Unregister s from healthcheck queue. |
| s.pool.hc.unregister(s) |
| // Remove s from Cloud Spanner service. |
| ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) |
| defer cancel() |
| s.delete(ctx) |
| return true |
| } |
| |
| func (s *session) delete(ctx context.Context) { |
| // Ignore the error because even if we fail to explicitly destroy the |
| // session, it will be eventually garbage collected by Cloud Spanner. |
| err := s.client.DeleteSession(ctx, &sppb.DeleteSessionRequest{Name: s.getID()}) |
| if err != nil { |
| log.Printf("Failed to delete session %v. Error: %v", s.getID(), err) |
| } |
| } |
| |
| // prepareForWrite prepares the session for write if it is not already in that |
| // state. |
| func (s *session) prepareForWrite(ctx context.Context) error { |
| if s.isWritePrepared() { |
| return nil |
| } |
| tx, err := beginTransaction(contextWithOutgoingMetadata(ctx, s.md), s.getID(), s.client) |
| if err != nil { |
| return err |
| } |
| s.setTransactionID(tx) |
| return nil |
| } |
| |
| // SessionPoolConfig stores configurations of a session pool. |
| type SessionPoolConfig struct { |
| // MaxOpened is the maximum number of opened sessions allowed by the session |
| // pool. If the client tries to open a session and there are already |
| // MaxOpened sessions, it will block until one becomes available or the |
| // context passed to the client method is canceled or times out. |
| // |
| // Defaults to NumChannels * 100. |
| MaxOpened uint64 |
| |
| // MinOpened is the minimum number of opened sessions that the session pool |
| // tries to maintain. Session pool won't continue to expire sessions if |
| // number of opened connections drops below MinOpened. However, if a session |
| // is found to be broken, it will still be evicted from the session pool, |
| // therefore it is posssible that the number of opened sessions drops below |
| // MinOpened. |
| // |
| // Defaults to 100. |
| MinOpened uint64 |
| |
| // MaxIdle is the maximum number of idle sessions, pool is allowed to keep. |
| // |
| // Defaults to 0. |
| MaxIdle uint64 |
| |
| // MaxBurst is the maximum number of concurrent session creation requests. |
| // |
| // Defaults to 10. |
| MaxBurst uint64 |
| |
| // WriteSessions is the fraction of sessions we try to keep prepared for |
| // write. |
| // |
| // Defaults to 0.2. |
| WriteSessions float64 |
| |
| // HealthCheckWorkers is number of workers used by health checker for this |
| // pool. |
| // |
| // Defaults to 10. |
| HealthCheckWorkers int |
| |
| // HealthCheckInterval is how often the health checker pings a session. |
| // |
| // Defaults to 5m. |
| HealthCheckInterval time.Duration |
| |
| // healthCheckSampleInterval is how often the health checker samples live |
| // session (for use in maintaining session pool size). |
| // |
| // Defaults to 1m. |
| healthCheckSampleInterval time.Duration |
| |
| // sessionLabels for the sessions created in the session pool. |
| sessionLabels map[string]string |
| } |
| |
| // DefaultSessionPoolConfig is the default configuration for the session pool |
| // that will be used for a Spanner client, unless the user supplies a specific |
| // session pool config. |
| var DefaultSessionPoolConfig = SessionPoolConfig{ |
| MinOpened: 100, |
| MaxOpened: numChannels * 100, |
| MaxBurst: 10, |
| WriteSessions: 0.2, |
| HealthCheckWorkers: 10, |
| HealthCheckInterval: 5 * time.Minute, |
| } |
| |
| // errMinOpenedGTMapOpened returns error for SessionPoolConfig.MaxOpened < SessionPoolConfig.MinOpened when SessionPoolConfig.MaxOpened is set. |
| func errMinOpenedGTMaxOpened(maxOpened, minOpened uint64) error { |
| return spannerErrorf(codes.InvalidArgument, |
| "require SessionPoolConfig.MaxOpened >= SessionPoolConfig.MinOpened, got %d and %d", maxOpened, minOpened) |
| } |
| |
| // errWriteFractionOutOfRange returns error for |
| // SessionPoolConfig.WriteFraction < 0 or SessionPoolConfig.WriteFraction > 1 |
| func errWriteFractionOutOfRange(writeFraction float64) error { |
| return spannerErrorf(codes.InvalidArgument, |
| "require SessionPoolConfig.WriteSessions >= 0.0 && SessionPoolConfig.WriteSessions <= 1.0, got %.2f", writeFraction) |
| } |
| |
| // errHealthCheckWorkersNegative returns error for |
| // SessionPoolConfig.HealthCheckWorkers < 0 |
| func errHealthCheckWorkersNegative(workers int) error { |
| return spannerErrorf(codes.InvalidArgument, |
| "require SessionPoolConfig.HealthCheckWorkers >= 0, got %d", workers) |
| } |
| |
| // errHealthCheckIntervalNegative returns error for |
| // SessionPoolConfig.HealthCheckInterval < 0 |
| func errHealthCheckIntervalNegative(interval time.Duration) error { |
| return spannerErrorf(codes.InvalidArgument, |
| "require SessionPoolConfig.HealthCheckInterval >= 0, got %v", interval) |
| } |
| |
| // validate verifies that the SessionPoolConfig is good for use. |
| func (spc *SessionPoolConfig) validate() error { |
| if spc.MinOpened > spc.MaxOpened && spc.MaxOpened > 0 { |
| return errMinOpenedGTMaxOpened(spc.MaxOpened, spc.MinOpened) |
| } |
| if spc.WriteSessions < 0.0 || spc.WriteSessions > 1.0 { |
| return errWriteFractionOutOfRange(spc.WriteSessions) |
| } |
| if spc.HealthCheckWorkers < 0 { |
| return errHealthCheckWorkersNegative(spc.HealthCheckWorkers) |
| } |
| if spc.HealthCheckInterval < 0 { |
| return errHealthCheckIntervalNegative(spc.HealthCheckInterval) |
| } |
| return nil |
| } |
| |
| // sessionPool creates and caches Cloud Spanner sessions. |
| type sessionPool struct { |
| // mu protects sessionPool from concurrent access. |
| mu sync.Mutex |
| // valid marks the validity of the session pool. |
| valid bool |
| // sc is used to create the sessions for the pool. |
| sc *sessionClient |
| // idleList caches idle session IDs. Session IDs in this list can be |
| // allocated for use. |
| idleList list.List |
| // idleWriteList caches idle sessions which have been prepared for write. |
| idleWriteList list.List |
| // mayGetSession is for broadcasting that session retrival/creation may |
| // proceed. |
| mayGetSession chan struct{} |
| // numOpened is the total number of open sessions from the session pool. |
| numOpened uint64 |
| // createReqs is the number of ongoing session creation requests. |
| createReqs uint64 |
| // prepareReqs is the number of ongoing session preparation request. |
| prepareReqs uint64 |
| // configuration of the session pool. |
| SessionPoolConfig |
| // hc is the health checker |
| hc *healthChecker |
| |
| // mw is the maintenance window containing statistics for the max number of |
| // sessions checked out of the pool during the last 10 minutes. |
| mw *maintenanceWindow |
| } |
| |
| // newSessionPool creates a new session pool. |
| func newSessionPool(sc *sessionClient, config SessionPoolConfig) (*sessionPool, error) { |
| if err := config.validate(); err != nil { |
| return nil, err |
| } |
| pool := &sessionPool{ |
| sc: sc, |
| valid: true, |
| mayGetSession: make(chan struct{}), |
| SessionPoolConfig: config, |
| mw: newMaintenanceWindow(), |
| } |
| if config.HealthCheckWorkers == 0 { |
| // With 10 workers and assuming average latency of 5ms for |
| // BeginTransaction, we will be able to prepare 2000 tx/sec in advance. |
| // If the rate of takeWriteSession is more than that, it will degrade to |
| // doing BeginTransaction inline. |
| // |
| // TODO: consider resizing the worker pool dynamically according to the load. |
| config.HealthCheckWorkers = 10 |
| } |
| if config.HealthCheckInterval == 0 { |
| config.HealthCheckInterval = 5 * time.Minute |
| } |
| if config.healthCheckSampleInterval == 0 { |
| config.healthCheckSampleInterval = time.Minute |
| } |
| // On GCE VM, within the same region an healthcheck ping takes on average |
| // 10ms to finish, given a 5 minutes interval and 10 healthcheck workers, a |
| // healthChecker can effectively mantain |
| // 100 checks_per_worker/sec * 10 workers * 300 seconds = 300K sessions. |
| pool.hc = newHealthChecker(config.HealthCheckInterval, config.HealthCheckWorkers, config.healthCheckSampleInterval, pool) |
| |
| // First initialize the pool before we indicate that the healthchecker is |
| // ready. This prevents the maintainer from starting before the pool has |
| // been initialized, which means that we guarantee that the initial |
| // sessions are created using BatchCreateSessions. |
| if config.MinOpened > 0 { |
| numSessions := minUint64(config.MinOpened, math.MaxInt32) |
| if err := pool.initPool(int32(numSessions)); err != nil { |
| return nil, err |
| } |
| } |
| close(pool.hc.ready) |
| return pool, nil |
| } |
| |
| func (p *sessionPool) initPool(numSessions int32) error { |
| p.mu.Lock() |
| // Take budget before the actual session creation. |
| p.numOpened += uint64(numSessions) |
| recordStat(context.Background(), OpenSessionCount, int64(p.numOpened)) |
| p.createReqs += uint64(numSessions) |
| p.mu.Unlock() |
| // Asynchronously create the initial sessions for the pool. |
| return p.sc.batchCreateSessions(numSessions, p) |
| } |
| |
| // sessionReady is executed by the SessionClient when a session has been |
| // created and is ready to use. This method will add the new session to the |
| // pool and decrease the number of sessions that is being created. |
| func (p *sessionPool) sessionReady(s *session) { |
| p.mu.Lock() |
| defer p.mu.Unlock() |
| // Set this pool as the home pool of the session and register it with the |
| // health checker. |
| s.pool = p |
| p.hc.register(s) |
| p.createReqs-- |
| // Insert the session at a random position in the pool to prevent all |
| // sessions affiliated with a channel to be placed at sequentially in the |
| // pool. |
| if p.idleList.Len() > 0 { |
| pos := rand.Intn(p.idleList.Len()) |
| before := p.idleList.Front() |
| for i := 0; i < pos; i++ { |
| before = before.Next() |
| } |
| s.setIdleList(p.idleList.InsertBefore(s, before)) |
| } else { |
| s.setIdleList(p.idleList.PushBack(s)) |
| } |
| // Notify other waiters blocking on session creation. |
| close(p.mayGetSession) |
| p.mayGetSession = make(chan struct{}) |
| } |
| |
| // sessionCreationFailed is called by the SessionClient when the creation of one |
| // or more requested sessions finished with an error. sessionCreationFailed will |
| // decrease the number of sessions being created and notify any waiters that |
| // the session creation failed. |
| func (p *sessionPool) sessionCreationFailed(err error, numSessions int32) { |
| p.mu.Lock() |
| defer p.mu.Unlock() |
| p.createReqs -= uint64(numSessions) |
| p.numOpened -= uint64(numSessions) |
| recordStat(context.Background(), OpenSessionCount, int64(p.numOpened)) |
| // Notify other waiters blocking on session creation. |
| close(p.mayGetSession) |
| p.mayGetSession = make(chan struct{}) |
| } |
| |
| // isValid checks if the session pool is still valid. |
| func (p *sessionPool) isValid() bool { |
| if p == nil { |
| return false |
| } |
| p.mu.Lock() |
| defer p.mu.Unlock() |
| return p.valid |
| } |
| |
| // close marks the session pool as closed. |
| func (p *sessionPool) close() { |
| if p == nil { |
| return |
| } |
| p.mu.Lock() |
| if !p.valid { |
| p.mu.Unlock() |
| return |
| } |
| p.valid = false |
| p.mu.Unlock() |
| p.hc.close() |
| // destroy all the sessions |
| p.hc.mu.Lock() |
| allSessions := make([]*session, len(p.hc.queue.sessions)) |
| copy(allSessions, p.hc.queue.sessions) |
| p.hc.mu.Unlock() |
| for _, s := range allSessions { |
| s.destroy(false) |
| } |
| } |
| |
| // errInvalidSessionPool returns error for using an invalid session pool. |
| func errInvalidSessionPool() error { |
| return spannerErrorf(codes.InvalidArgument, "invalid session pool") |
| } |
| |
| // errGetSessionTimeout returns error for context timeout during |
| // sessionPool.take(). |
| func errGetSessionTimeout() error { |
| return spannerErrorf(codes.Canceled, "timeout / context canceled during getting session") |
| } |
| |
| // shouldPrepareWriteLocked returns true if we should prepare more sessions for write. |
| func (p *sessionPool) shouldPrepareWriteLocked() bool { |
| return float64(p.numOpened)*p.WriteSessions > float64(p.idleWriteList.Len()+int(p.prepareReqs)) |
| } |
| |
| func (p *sessionPool) createSession(ctx context.Context) (*session, error) { |
| trace.TracePrintf(ctx, nil, "Creating a new session") |
| doneCreate := func(done bool) { |
| p.mu.Lock() |
| if !done { |
| // Session creation failed, give budget back. |
| p.numOpened-- |
| recordStat(ctx, OpenSessionCount, int64(p.numOpened)) |
| } |
| p.createReqs-- |
| // Notify other waiters blocking on session creation. |
| close(p.mayGetSession) |
| p.mayGetSession = make(chan struct{}) |
| p.mu.Unlock() |
| } |
| s, err := p.sc.createSession(ctx) |
| if err != nil { |
| doneCreate(false) |
| // Should return error directly because of the previous retries on |
| // CreateSession RPC. |
| // If the error is a timeout, there is a chance that the session was |
| // created on the server but is not known to the session pool. This |
| // session will then be garbage collected by the server after 1 hour. |
| return nil, err |
| } |
| s.pool = p |
| p.hc.register(s) |
| doneCreate(true) |
| return s, nil |
| } |
| |
| func (p *sessionPool) isHealthy(s *session) bool { |
| if s.getNextCheck().Add(2 * p.hc.getInterval()).Before(time.Now()) { |
| // TODO: figure out if we need to schedule a new healthcheck worker here. |
| if err := s.ping(); shouldDropSession(err) { |
| // The session is already bad, continue to fetch/create a new one. |
| s.destroy(false) |
| return false |
| } |
| p.hc.scheduledHC(s) |
| } |
| return true |
| } |
| |
| // take returns a cached session if there are available ones; if there isn't |
| // any, it tries to allocate a new one. Session returned by take should be used |
| // for read operations. |
| func (p *sessionPool) take(ctx context.Context) (*sessionHandle, error) { |
| trace.TracePrintf(ctx, nil, "Acquiring a read-only session") |
| for { |
| var ( |
| s *session |
| err error |
| ) |
| |
| p.mu.Lock() |
| if !p.valid { |
| p.mu.Unlock() |
| return nil, errInvalidSessionPool() |
| } |
| if p.idleList.Len() > 0 { |
| // Idle sessions are available, get one from the top of the idle |
| // list. |
| s = p.idleList.Remove(p.idleList.Front()).(*session) |
| trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()}, |
| "Acquired read-only session") |
| } else if p.idleWriteList.Len() > 0 { |
| s = p.idleWriteList.Remove(p.idleWriteList.Front()).(*session) |
| trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()}, |
| "Acquired read-write session") |
| } |
| if s != nil { |
| s.setIdleList(nil) |
| numCheckedOut := p.currSessionsCheckedOutLocked() |
| p.mu.Unlock() |
| p.mw.updateMaxSessionsCheckedOutDuringWindow(numCheckedOut) |
| // From here, session is no longer in idle list, so healthcheck |
| // workers won't destroy it. If healthcheck workers failed to |
| // schedule healthcheck for the session timely, do the check here. |
| // Because session check is still much cheaper than session |
| // creation, they should be reused as much as possible. |
| if !p.isHealthy(s) { |
| continue |
| } |
| return &sessionHandle{session: s}, nil |
| } |
| |
| // Idle list is empty, block if session pool has reached max session |
| // creation concurrency or max number of open sessions. |
| if (p.MaxOpened > 0 && p.numOpened >= p.MaxOpened) || (p.MaxBurst > 0 && p.createReqs >= p.MaxBurst) { |
| mayGetSession := p.mayGetSession |
| p.mu.Unlock() |
| trace.TracePrintf(ctx, nil, "Waiting for read-only session to become available") |
| select { |
| case <-ctx.Done(): |
| trace.TracePrintf(ctx, nil, "Context done waiting for session") |
| return nil, errGetSessionTimeout() |
| case <-mayGetSession: |
| } |
| continue |
| } |
| |
| // Take budget before the actual session creation. |
| p.numOpened++ |
| // Creating a new session that will be returned directly to the client |
| // means that the max number of sessions in use also increases. |
| numCheckedOut := p.currSessionsCheckedOutLocked() |
| recordStat(ctx, OpenSessionCount, int64(p.numOpened)) |
| p.createReqs++ |
| p.mu.Unlock() |
| p.mw.updateMaxSessionsCheckedOutDuringWindow(numCheckedOut) |
| if s, err = p.createSession(ctx); err != nil { |
| trace.TracePrintf(ctx, nil, "Error creating session: %v", err) |
| return nil, toSpannerError(err) |
| } |
| trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()}, |
| "Created session") |
| return &sessionHandle{session: s}, nil |
| } |
| } |
| |
| // takeWriteSession returns a write prepared cached session if there are |
| // available ones; if there isn't any, it tries to allocate a new one. Session |
| // returned should be used for read write transactions. |
| func (p *sessionPool) takeWriteSession(ctx context.Context) (*sessionHandle, error) { |
| trace.TracePrintf(ctx, nil, "Acquiring a read-write session") |
| for { |
| var ( |
| s *session |
| err error |
| ) |
| |
| p.mu.Lock() |
| if !p.valid { |
| p.mu.Unlock() |
| return nil, errInvalidSessionPool() |
| } |
| if p.idleWriteList.Len() > 0 { |
| // Idle sessions are available, get one from the top of the idle |
| // list. |
| s = p.idleWriteList.Remove(p.idleWriteList.Front()).(*session) |
| trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()}, "Acquired read-write session") |
| } else if p.idleList.Len() > 0 { |
| s = p.idleList.Remove(p.idleList.Front()).(*session) |
| trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()}, "Acquired read-only session") |
| } |
| if s != nil { |
| s.setIdleList(nil) |
| numCheckedOut := p.currSessionsCheckedOutLocked() |
| p.mu.Unlock() |
| p.mw.updateMaxSessionsCheckedOutDuringWindow(numCheckedOut) |
| // From here, session is no longer in idle list, so healthcheck |
| // workers won't destroy it. If healthcheck workers failed to |
| // schedule healthcheck for the session timely, do the check here. |
| // Because session check is still much cheaper than session |
| // creation, they should be reused as much as possible. |
| if !p.isHealthy(s) { |
| continue |
| } |
| } else { |
| // Idle list is empty, block if session pool has reached max session |
| // creation concurrency or max number of open sessions. |
| if (p.MaxOpened > 0 && p.numOpened >= p.MaxOpened) || (p.MaxBurst > 0 && p.createReqs >= p.MaxBurst) { |
| mayGetSession := p.mayGetSession |
| p.mu.Unlock() |
| trace.TracePrintf(ctx, nil, "Waiting for read-write session to become available") |
| select { |
| case <-ctx.Done(): |
| trace.TracePrintf(ctx, nil, "Context done waiting for session") |
| return nil, errGetSessionTimeout() |
| case <-mayGetSession: |
| } |
| continue |
| } |
| |
| // Take budget before the actual session creation. |
| p.numOpened++ |
| // Creating a new session that will be returned directly to the client |
| // means that the max number of sessions in use also increases. |
| numCheckedOut := p.currSessionsCheckedOutLocked() |
| recordStat(ctx, OpenSessionCount, int64(p.numOpened)) |
| p.createReqs++ |
| p.mu.Unlock() |
| p.mw.updateMaxSessionsCheckedOutDuringWindow(numCheckedOut) |
| if s, err = p.createSession(ctx); err != nil { |
| trace.TracePrintf(ctx, nil, "Error creating session: %v", err) |
| return nil, toSpannerError(err) |
| } |
| trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()}, |
| "Created session") |
| } |
| if !s.isWritePrepared() { |
| if err = s.prepareForWrite(ctx); err != nil { |
| s.recycle() |
| trace.TracePrintf(ctx, map[string]interface{}{"sessionID": s.getID()}, |
| "Error preparing session for write") |
| return nil, toSpannerError(err) |
| } |
| } |
| return &sessionHandle{session: s}, nil |
| } |
| } |
| |
| // recycle puts session s back to the session pool's idle list, it returns true |
| // if the session pool successfully recycles session s. |
| func (p *sessionPool) recycle(s *session) bool { |
| p.mu.Lock() |
| defer p.mu.Unlock() |
| if !s.isValid() || !p.valid { |
| // Reject the session if session is invalid or pool itself is invalid. |
| return false |
| } |
| // Put session at the back of the list to round robin for load balancing |
| // across channels. |
| if s.isWritePrepared() { |
| s.setIdleList(p.idleWriteList.PushBack(s)) |
| } else { |
| s.setIdleList(p.idleList.PushBack(s)) |
| } |
| // Broadcast that a session has been returned to idle list. |
| close(p.mayGetSession) |
| p.mayGetSession = make(chan struct{}) |
| return true |
| } |
| |
| // remove atomically removes session s from the session pool and invalidates s. |
| // If isExpire == true, the removal is triggered by session expiration and in |
| // such cases, only idle sessions can be removed. |
| func (p *sessionPool) remove(s *session, isExpire bool) bool { |
| p.mu.Lock() |
| defer p.mu.Unlock() |
| if isExpire && (p.numOpened <= p.MinOpened || s.getIdleList() == nil) { |
| // Don't expire session if the session is not in idle list (in use), or |
| // if number of open sessions is going below p.MinOpened. |
| return false |
| } |
| ol := s.setIdleList(nil) |
| // If the session is in the idlelist, remove it. |
| if ol != nil { |
| // Remove from whichever list it is in. |
| p.idleList.Remove(ol) |
| p.idleWriteList.Remove(ol) |
| } |
| if s.invalidate() { |
| // Decrease the number of opened sessions. |
| p.numOpened-- |
| recordStat(context.Background(), OpenSessionCount, int64(p.numOpened)) |
| // Broadcast that a session has been destroyed. |
| close(p.mayGetSession) |
| p.mayGetSession = make(chan struct{}) |
| return true |
| } |
| return false |
| } |
| |
| func (p *sessionPool) currSessionsCheckedOutLocked() uint64 { |
| return p.numOpened - uint64(p.idleList.Len()) - uint64(p.idleWriteList.Len()) |
| } |
| |
| // hcHeap implements heap.Interface. It is used to create the priority queue for |
| // session healthchecks. |
| type hcHeap struct { |
| sessions []*session |
| } |
| |
| // Len implements heap.Interface.Len. |
| func (h hcHeap) Len() int { |
| return len(h.sessions) |
| } |
| |
| // Less implements heap.Interface.Less. |
| func (h hcHeap) Less(i, j int) bool { |
| return h.sessions[i].getNextCheck().Before(h.sessions[j].getNextCheck()) |
| } |
| |
| // Swap implements heap.Interface.Swap. |
| func (h hcHeap) Swap(i, j int) { |
| h.sessions[i], h.sessions[j] = h.sessions[j], h.sessions[i] |
| h.sessions[i].setHcIndex(i) |
| h.sessions[j].setHcIndex(j) |
| } |
| |
| // Push implements heap.Interface.Push. |
| func (h *hcHeap) Push(s interface{}) { |
| ns := s.(*session) |
| ns.setHcIndex(len(h.sessions)) |
| h.sessions = append(h.sessions, ns) |
| } |
| |
| // Pop implements heap.Interface.Pop. |
| func (h *hcHeap) Pop() interface{} { |
| old := h.sessions |
| n := len(old) |
| s := old[n-1] |
| h.sessions = old[:n-1] |
| s.setHcIndex(-1) |
| return s |
| } |
| |
| // maintenanceWindowSize specifies the number of health check cycles that |
| // defines a maintenance window. The maintenance window keeps track of a |
| // rolling set of numbers for the number of maximum checked out sessions during |
| // the maintenance window. This is used by the maintainer to determine the |
| // number of sessions to create or delete at the end of each health check |
| // cycle. |
| const maintenanceWindowSize = 10 |
| |
| // maintenanceWindow contains the statistics that are gathered during a health |
| // check maintenance window. |
| type maintenanceWindow struct { |
| mu sync.Mutex |
| // maxSessionsCheckedOut contains the maximum number of sessions that was |
| // checked out of the session pool during a health check cycle. This number |
| // indicates the number of sessions that was actually needed by the pool to |
| // serve the load during that cycle. The values are kept as a rolling set |
| // containing the values for the past 10 cycles (minutes). The maintainer |
| // uses these values to determine the number of sessions to keep at the end |
| // of each cycle. |
| maxSessionsCheckedOut [maintenanceWindowSize]uint64 |
| } |
| |
| // maxSessionsCheckedOutDuringWindow returns the maximum number of sessions |
| // that has been checked out during the last maintenance window of 10 cycles |
| // (minutes). |
| func (mw *maintenanceWindow) maxSessionsCheckedOutDuringWindow() uint64 { |
| mw.mu.Lock() |
| defer mw.mu.Unlock() |
| var max uint64 |
| for _, cycleMax := range mw.maxSessionsCheckedOut { |
| max = maxUint64(max, cycleMax) |
| } |
| return max |
| } |
| |
| // updateMaxSessionsCheckedOutDuringWindow updates the maximum number of |
| // sessions that has been checked out of the pool during the current |
| // cycle of the maintenance window. A maintenance window consists of 10 |
| // maintenance cycles. Each cycle keeps track of the max number of sessions in |
| // use during that cycle. The rolling maintenance window of 10 cycles is used |
| // to determine the number of sessions to keep at the end of a cycle by |
| // calculating the max in use during the last 10 cycles. |
| func (mw *maintenanceWindow) updateMaxSessionsCheckedOutDuringWindow(currNumSessionsCheckedOut uint64) { |
| mw.mu.Lock() |
| defer mw.mu.Unlock() |
| mw.maxSessionsCheckedOut[0] = maxUint64(currNumSessionsCheckedOut, mw.maxSessionsCheckedOut[0]) |
| } |
| |
| // startNewCycle starts a new health check cycle with the specified number of |
| // checked out sessions as its initial value. |
| func (mw *maintenanceWindow) startNewCycle(currNumSessionsCheckedOut uint64) { |
| mw.mu.Lock() |
| defer mw.mu.Unlock() |
| copy(mw.maxSessionsCheckedOut[1:], mw.maxSessionsCheckedOut[:9]) |
| mw.maxSessionsCheckedOut[0] = currNumSessionsCheckedOut |
| } |
| |
| // newMaintenanceWindow creates a new maintenance window with all values for |
| // maxSessionsCheckedOut set to math.MaxUint64. This ensures that a complete |
| // maintenance window must pass before the maintainer will start to delete any |
| // sessions. |
| func newMaintenanceWindow() *maintenanceWindow { |
| mw := &maintenanceWindow{} |
| // Initialize the rolling window with max values to prevent the maintainer |
| // from deleting sessions before a complete window of 10 cycles has |
| // finished. |
| for i := 0; i < maintenanceWindowSize; i++ { |
| mw.maxSessionsCheckedOut[i] = math.MaxUint64 |
| } |
| return mw |
| } |
| |
| // healthChecker performs periodical healthchecks on registered sessions. |
| type healthChecker struct { |
| // mu protects concurrent access to healthChecker. |
| mu sync.Mutex |
| // queue is the priority queue for session healthchecks. Sessions with lower |
| // nextCheck rank higher in the queue. |
| queue hcHeap |
| // interval is the average interval between two healthchecks on a session. |
| interval time.Duration |
| // workers is the number of concurrent healthcheck workers. |
| workers int |
| // waitWorkers waits for all healthcheck workers to exit |
| waitWorkers sync.WaitGroup |
| // pool is the underlying session pool. |
| pool *sessionPool |
| // sampleInterval is the interval of sampling by the maintainer. |
| sampleInterval time.Duration |
| // ready is used to signal that maintainer can start running. |
| ready chan struct{} |
| // done is used to signal that health checker should be closed. |
| done chan struct{} |
| // once is used for closing channel done only once. |
| once sync.Once |
| maintainerCancel func() |
| } |
| |
| // newHealthChecker initializes new instance of healthChecker. |
| func newHealthChecker(interval time.Duration, workers int, sampleInterval time.Duration, pool *sessionPool) *healthChecker { |
| if workers <= 0 { |
| workers = 1 |
| } |
| hc := &healthChecker{ |
| interval: interval, |
| workers: workers, |
| pool: pool, |
| sampleInterval: sampleInterval, |
| ready: make(chan struct{}), |
| done: make(chan struct{}), |
| maintainerCancel: func() {}, |
| } |
| hc.waitWorkers.Add(1) |
| go hc.maintainer() |
| for i := 1; i <= hc.workers; i++ { |
| hc.waitWorkers.Add(1) |
| go hc.worker(i) |
| } |
| return hc |
| } |
| |
| // close closes the healthChecker and waits for all healthcheck workers to exit. |
| func (hc *healthChecker) close() { |
| hc.mu.Lock() |
| hc.maintainerCancel() |
| hc.mu.Unlock() |
| hc.once.Do(func() { close(hc.done) }) |
| hc.waitWorkers.Wait() |
| } |
| |
| // isClosing checks if a healthChecker is already closing. |
| func (hc *healthChecker) isClosing() bool { |
| select { |
| case <-hc.done: |
| return true |
| default: |
| return false |
| } |
| } |
| |
| // getInterval gets the healthcheck interval. |
| func (hc *healthChecker) getInterval() time.Duration { |
| hc.mu.Lock() |
| defer hc.mu.Unlock() |
| return hc.interval |
| } |
| |
| // scheduledHCLocked schedules next healthcheck on session s with the assumption |
| // that hc.mu is being held. |
| func (hc *healthChecker) scheduledHCLocked(s *session) { |
| // The next healthcheck will be scheduled after |
| // [interval*0.5, interval*1.5) ns. |
| nsFromNow := rand.Int63n(int64(hc.interval)) + int64(hc.interval)/2 |
| s.setNextCheck(time.Now().Add(time.Duration(nsFromNow))) |
| if hi := s.getHcIndex(); hi != -1 { |
| // Session is still being tracked by healthcheck workers. |
| heap.Fix(&hc.queue, hi) |
| } |
| } |
| |
| // scheduledHC schedules next healthcheck on session s. It is safe to be called |
| // concurrently. |
| func (hc *healthChecker) scheduledHC(s *session) { |
| hc.mu.Lock() |
| defer hc.mu.Unlock() |
| hc.scheduledHCLocked(s) |
| } |
| |
| // register registers a session with healthChecker for periodical healthcheck. |
| func (hc *healthChecker) register(s *session) { |
| hc.mu.Lock() |
| defer hc.mu.Unlock() |
| hc.scheduledHCLocked(s) |
| heap.Push(&hc.queue, s) |
| } |
| |
| // unregister unregisters a session from healthcheck queue. |
| func (hc *healthChecker) unregister(s *session) { |
| hc.mu.Lock() |
| defer hc.mu.Unlock() |
| oi := s.setHcIndex(-1) |
| if oi >= 0 { |
| heap.Remove(&hc.queue, oi) |
| } |
| } |
| |
| // markDone marks that health check for session has been performed. |
| func (hc *healthChecker) markDone(s *session) { |
| hc.mu.Lock() |
| defer hc.mu.Unlock() |
| s.checkingHealth = false |
| } |
| |
| // healthCheck checks the health of the session and pings it if needed. |
| func (hc *healthChecker) healthCheck(s *session) { |
| defer hc.markDone(s) |
| if !s.pool.isValid() { |
| // Session pool is closed, perform a garbage collection. |
| s.destroy(false) |
| return |
| } |
| if err := s.ping(); shouldDropSession(err) { |
| // Ping failed, destroy the session. |
| s.destroy(false) |
| } |
| } |
| |
| // worker performs the healthcheck on sessions in healthChecker's priority |
| // queue. |
| func (hc *healthChecker) worker(i int) { |
| // Returns a session which we should ping to keep it alive. |
| getNextForPing := func() *session { |
| hc.pool.mu.Lock() |
| defer hc.pool.mu.Unlock() |
| hc.mu.Lock() |
| defer hc.mu.Unlock() |
| if hc.queue.Len() <= 0 { |
| // Queue is empty. |
| return nil |
| } |
| s := hc.queue.sessions[0] |
| if s.getNextCheck().After(time.Now()) && hc.pool.valid { |
| // All sessions have been checked recently. |
| return nil |
| } |
| hc.scheduledHCLocked(s) |
| if !s.checkingHealth { |
| s.checkingHealth = true |
| return s |
| } |
| return nil |
| } |
| |
| // Returns a session which we should prepare for write. |
| getNextForTx := func() *session { |
| hc.pool.mu.Lock() |
| defer hc.pool.mu.Unlock() |
| if hc.pool.shouldPrepareWriteLocked() { |
| if hc.pool.idleList.Len() > 0 && hc.pool.valid { |
| hc.mu.Lock() |
| defer hc.mu.Unlock() |
| if hc.pool.idleList.Front().Value.(*session).checkingHealth { |
| return nil |
| } |
| session := hc.pool.idleList.Remove(hc.pool.idleList.Front()).(*session) |
| session.checkingHealth = true |
| hc.pool.prepareReqs++ |
| return session |
| } |
| } |
| return nil |
| } |
| |
| for { |
| if hc.isClosing() { |
| // Exit when the pool has been closed and all sessions have been |
| // destroyed or when health checker has been closed. |
| hc.waitWorkers.Done() |
| return |
| } |
| ws := getNextForTx() |
| if ws != nil { |
| ctx, cancel := context.WithTimeout(context.Background(), time.Second) |
| err := ws.prepareForWrite(ctx) |
| cancel() |
| if err != nil { |
| // Skip handling prepare error, session can be prepared in next |
| // cycle. |
| log.Printf("Failed to prepare session, error: %v", toSpannerError(err)) |
| } |
| hc.pool.recycle(ws) |
| hc.pool.mu.Lock() |
| hc.pool.prepareReqs-- |
| hc.pool.mu.Unlock() |
| hc.markDone(ws) |
| } |
| rs := getNextForPing() |
| if rs == nil { |
| if ws == nil { |
| // No work to be done so sleep to avoid burning CPU. |
| pause := int64(100 * time.Millisecond) |
| if pause > int64(hc.interval) { |
| pause = int64(hc.interval) |
| } |
| select { |
| case <-time.After(time.Duration(rand.Int63n(pause) + pause/2)): |
| case <-hc.done: |
| } |
| |
| } |
| continue |
| } |
| hc.healthCheck(rs) |
| } |
| } |
| |
| // maintainer maintains the number of sessions in the pool based on the session |
| // pool configuration and the current and historical number of sessions checked |
| // out of the pool. The maintainer will: |
| // 1. Ensure that the session pool contains at least MinOpened sessions. |
| // 2. If the current number of sessions in the pool exceeds the greatest number |
| // of checked out sessions (=sessions in use) during the last 10 minutes, |
| // and the delta is larger than MaxIdleSessions, the maintainer will reduce |
| // the number of sessions to maxSessionsInUseDuringWindow+MaxIdleSessions. |
| func (hc *healthChecker) maintainer() { |
| // Wait until the pool is ready. |
| <-hc.ready |
| |
| for iteration := uint64(0); ; iteration++ { |
| if hc.isClosing() { |
| hc.waitWorkers.Done() |
| return |
| } |
| |
| hc.pool.mu.Lock() |
| currSessionsOpened := hc.pool.numOpened |
| maxIdle := hc.pool.MaxIdle |
| minOpened := hc.pool.MinOpened |
| hc.pool.mu.Unlock() |
| // Get the maximum number of sessions in use during the current |
| // maintenance window. |
| maxSessionsInUseDuringWindow := hc.pool.mw.maxSessionsCheckedOutDuringWindow() |
| hc.mu.Lock() |
| ctx, cancel := context.WithTimeout(context.Background(), hc.sampleInterval) |
| hc.maintainerCancel = cancel |
| hc.mu.Unlock() |
| |
| // Grow or shrink pool if needed. |
| // The number of sessions in the pool should be in the range |
| // [Config.MinOpened, Config.MaxIdle+maxSessionsInUseDuringWindow] |
| if currSessionsOpened < minOpened { |
| hc.growPool(ctx, minOpened) |
| } else if maxIdle+maxSessionsInUseDuringWindow > currSessionsOpened { |
| hc.shrinkPool(ctx, maxIdle+maxSessionsInUseDuringWindow) |
| } |
| |
| select { |
| case <-ctx.Done(): |
| case <-hc.done: |
| cancel() |
| } |
| // Cycle the maintenance window. This will remove the oldest cycle and |
| // add a new cycle at the beginning of the maintenance window with the |
| // currently checked out number of sessions as the max number of |
| // sessions in use in this cycle. This value will be increased during |
| // the next cycle if it increases. |
| hc.pool.mu.Lock() |
| currSessionsInUse := hc.pool.currSessionsCheckedOutLocked() |
| hc.pool.mu.Unlock() |
| hc.pool.mw.startNewCycle(currSessionsInUse) |
| } |
| } |
| |
| // growPool grows the number of sessions in the pool to the specified number of |
| // sessions. It timeouts on sampleInterval. |
| func (hc *healthChecker) growPool(ctx context.Context, growToNumSessions uint64) { |
| // Calculate the max number of sessions to create as a safeguard against |
| // other processes that could be deleting sessions concurrently. |
| hc.pool.mu.Lock() |
| maxSessionsToCreate := int(growToNumSessions - hc.pool.numOpened) |
| hc.pool.mu.Unlock() |
| var created int |
| for { |
| if ctx.Err() != nil { |
| return |
| } |
| |
| p := hc.pool |
| p.mu.Lock() |
| // Take budget before the actual session creation. |
| if growToNumSessions <= p.numOpened || created >= maxSessionsToCreate { |
| p.mu.Unlock() |
| break |
| } |
| p.numOpened++ |
| recordStat(ctx, OpenSessionCount, int64(p.numOpened)) |
| p.createReqs++ |
| shouldPrepareWrite := p.shouldPrepareWriteLocked() |
| p.mu.Unlock() |
| var ( |
| s *session |
| err error |
| ) |
| createContext, cancel := context.WithTimeout(context.Background(), time.Minute) |
| if s, err = p.createSession(createContext); err != nil { |
| cancel() |
| log.Printf("Failed to create session, error: %v", toSpannerError(err)) |
| continue |
| } |
| cancel() |
| created++ |
| if shouldPrepareWrite { |
| prepareContext, cancel := context.WithTimeout(context.Background(), time.Minute) |
| if err = s.prepareForWrite(prepareContext); err != nil { |
| cancel() |
| p.recycle(s) |
| log.Printf("Failed to prepare session, error: %v", toSpannerError(err)) |
| continue |
| } |
| cancel() |
| } |
| p.recycle(s) |
| } |
| } |
| |
| // shrinkPool scales down the session pool. The method will stop deleting |
| // sessions when shrinkToNumSessions number of sessions in the pool has |
| // been reached. The method will also stop deleting sessions if it detects that |
| // another process has started creating sessions for the pool again, for |
| // example through the take() method. |
| func (hc *healthChecker) shrinkPool(ctx context.Context, shrinkToNumSessions uint64) { |
| hc.pool.mu.Lock() |
| maxSessionsToDelete := int(hc.pool.numOpened - shrinkToNumSessions) |
| hc.pool.mu.Unlock() |
| var deleted int |
| var prevNumOpened uint64 = math.MaxUint64 |
| for { |
| if ctx.Err() != nil { |
| return |
| } |
| |
| p := hc.pool |
| p.mu.Lock() |
| // Check if the number of open sessions has increased. If it has, we |
| // should stop deleting sessions, as the load has increased and |
| // additional sessions are needed. |
| if p.numOpened >= prevNumOpened { |
| break |
| } |
| prevNumOpened = p.numOpened |
| |
| // Check on both whether we have reached the number of open sessions as |
| // well as the number of sessions to delete, in case sessions have been |
| // deleted by other methods because they have expired or deemed |
| // invalid. |
| if shrinkToNumSessions >= p.numOpened || deleted >= maxSessionsToDelete { |
| p.mu.Unlock() |
| break |
| } |
| |
| var s *session |
| if p.idleList.Len() > 0 { |
| s = p.idleList.Front().Value.(*session) |
| } else if p.idleWriteList.Len() > 0 { |
| s = p.idleWriteList.Front().Value.(*session) |
| } |
| p.mu.Unlock() |
| if s != nil { |
| deleted++ |
| // destroy session as expire. |
| s.destroy(true) |
| } else { |
| break |
| } |
| } |
| } |
| |
| // shouldDropSession returns true if a particular error leads to the removal of |
| // a session |
| func shouldDropSession(err error) bool { |
| if err == nil { |
| return false |
| } |
| // If a Cloud Spanner can no longer locate the session (for example, if |
| // session is garbage collected), then caller should not try to return the |
| // session back into the session pool. |
| // |
| // TODO: once gRPC can return auxiliary error information, stop parsing the error message. |
| if ErrCode(err) == codes.NotFound && strings.Contains(ErrDesc(err), "Session not found") { |
| return true |
| } |
| return false |
| } |
| |
| // maxUint64 returns the maximum of two uint64. |
| func maxUint64(a, b uint64) uint64 { |
| if a > b { |
| return a |
| } |
| return b |
| } |
| |
| // minUint64 returns the minimum of two uint64. |
| func minUint64(a, b uint64) uint64 { |
| if a > b { |
| return b |
| } |
| return a |
| } |