Better manage GsaCommHandler's lifecycle
We now manage GsaCommunicationHandler's lifecycle more precisely. We now
setup the bare minimum before Adaptor.init() and teardown as much as
possible before Adaptor.destroy().
There are many thread-safety and object reachability fixes, primarily
made obvious by the handling of most Adaptor references within
AdaptorContextImpl.
diff --git a/src/com/google/enterprise/adaptor/GsaCommunicationHandler.java b/src/com/google/enterprise/adaptor/GsaCommunicationHandler.java
index fa66cbc..2bc3425 100644
--- a/src/com/google/enterprise/adaptor/GsaCommunicationHandler.java
+++ b/src/com/google/enterprise/adaptor/GsaCommunicationHandler.java
@@ -46,10 +46,9 @@
private final Adaptor adaptor;
private final Config config;
private final Journal journal;
- private boolean afterInit;
- private PollingIncrementalLister pollingIncrementalLister;
- private AuthnAuthority authnAuthority;
- private AuthzAuthority authzAuthority;
+ private final GsaConfigModListener gsaConfigModListener
+ = new GsaConfigModListener();
+ private AdaptorContextImpl adaptorContext;
/**
* Cron-style scheduler. Available for other uses, but necessary for
* scheduling {@link docIdFullPusher}. Tasks should execute quickly, to allow
@@ -61,8 +60,7 @@
* permits one invocation at a time. If multiple simultaneous invocations
* occur, all but the first will log a warning and return immediately.
*/
- private final OneAtATimeRunnable docIdFullPusher = new OneAtATimeRunnable(
- new PushRunnable(), new AlreadyRunningRunnable());
+ private OneAtATimeRunnable docIdFullPusher;
/**
* Runnable to be called for doing incremental feed pushes. It is only
* set if the Adaptor supports incremental updates. Otherwise, it's null.
@@ -96,10 +94,7 @@
private DocIdSender docIdSender;
private HttpServerScope dashboardScope;
private Dashboard dashboard;
- private final List<StatusSource> statusSources
- = new CopyOnWriteArrayList<StatusSource>();
private SensitiveValueCodec secureValueCodec;
- private SamlIdentityProvider samlIdentityProvider;
/**
* Used to stop startup prematurely. This allows cancelling an already-running
* start(). If start fails, a stale shuttingDownLatch can remain, thus it does
@@ -164,9 +159,9 @@
if (secure != config.isServerSecure()) {
config.setValue("server.secure", "" + secure);
}
- KeyPair key = null;
+ KeyPair keyPair = null;
try {
- key = getKeyPair(config.getServerKeyAlias());
+ keyPair = getKeyPair(config.getServerKeyAlias());
} catch (IOException ex) {
// The exception is only fatal if we are in secure mode.
if (secure) {
@@ -178,7 +173,7 @@
throw ex;
}
}
- secureValueCodec = new SensitiveValueCodec(key);
+ secureValueCodec = new SensitiveValueCodec(keyPair);
int port = server.getAddress().getPort();
if (port != config.getServerPort()) {
@@ -190,25 +185,14 @@
}
scope = new HttpServerScope(server, contextPrefix);
- dashboardScope = new HttpServerScope(dashboardServer, contextPrefix);
waiter = new ShutdownWaiter();
- scheduleExecutor = Executors.newSingleThreadScheduledExecutor(
- new ThreadFactoryBuilder().setDaemon(true).setNameFormat("schedule")
- .build());
- // The cachedThreadPool implementation created here is considerably better
- // than using ThreadPoolExecutor. ThreadPoolExecutor does not create threads
- // as would be expected from a thread pool.
- backgroundExecutor = Executors.newCachedThreadPool(
- new ThreadFactoryBuilder().setDaemon(true).setNameFormat("background")
- .build());
-
sessionManager = new SessionManager<HttpExchange>(
new SessionManager.HttpExchangeClientStore("sessid_" + port, secure),
30 * 60 * 1000 /* session lifetime: 30 minutes */,
5 * 60 * 1000 /* max cleanup frequency: 5 minutes */);
- config.addConfigModificationListener(new GsaConfigModListener());
+ config.addConfigModificationListener(gsaConfigModListener);
URI baseUri = config.getServerBaseUri();
URI docUri;
@@ -248,9 +232,10 @@
while (true) {
try {
tryToPutVersionIntoConfig(secure);
+ adaptorContext = new AdaptorContextImpl();
String adaptorType = adaptor.getClass().getName();
log.log(Level.INFO, "about to init {0}", adaptorType);
- adaptor.init(new AdaptorContextImpl());
+ adaptor.init(adaptorContext);
break;
} catch (InterruptedException ex) {
throw ex;
@@ -269,7 +254,7 @@
// Since the Adaptor has been started, we can now issue other calls to it.
// Usages of 'adaptor' are completely safe after this point.
- afterInit = true;
+ adaptorContext.freeze();
// Since we are white-listing particular keys for auto-update, things aren't
// ready enough to expose to adaptors.
@@ -278,6 +263,8 @@
(ConfigModificationListener) adaptor);
}*/
+ dashboardScope = new HttpServerScope(dashboardServer, contextPrefix);
+
SamlServiceProvider samlServiceProvider = null;
if (secure) {
bootstrapOpenSaml();
@@ -285,35 +272,45 @@
config.getServerPort(), config.getGsaHostname(),
config.getGsaSamlEntityId(), config.getServerSamlEntityId());
- if (authnAuthority != null) {
+ if (adaptorContext.authnAuthority != null) {
log.config("Adaptor-based authentication supported");
- samlIdentityProvider = new SamlIdentityProvider(
- authnAuthority, metadata, key);
+ SamlIdentityProvider samlIdentityProvider = new SamlIdentityProvider(
+ adaptorContext.authnAuthority, metadata, keyPair);
addFilters(scope.createContext("/samlip",
samlIdentityProvider.getSingleSignOnHandler()));
} else {
log.config("Adaptor-based authentication not supported");
}
samlServiceProvider
- = new SamlServiceProvider(sessionManager, metadata, key);
+ = new SamlServiceProvider(sessionManager, metadata, keyPair);
addFilters(scope.createContext("/samlassertionconsumer",
samlServiceProvider.getAssertionConsumer()));
- if (authzAuthority != null) {
+ if (adaptorContext.authzAuthority != null) {
log.config("Adaptor-based authorization supported");
addFilters(scope.createContext("/saml-authz", new SamlBatchAuthzHandler(
- authzAuthority, docIdCodec, metadata)));
+ adaptorContext.authzAuthority, docIdCodec, metadata)));
} else {
log.config("Adaptor-based authorization not supported");
}
}
+ scheduleExecutor = Executors.newSingleThreadScheduledExecutor(
+ new ThreadFactoryBuilder().setDaemon(true).setNameFormat("schedule")
+ .build());
Watchdog watchdog = new Watchdog(scheduleExecutor);
AsyncDocIdSender asyncDocIdSender = new AsyncDocIdSender(docIdSender,
config.getFeedMaxUrls() /* batch size */,
5 /* max latency */, TimeUnit.MINUTES,
2 * config.getFeedMaxUrls() /* queue size */);
+
+ // The cachedThreadPool implementation created here is considerably better
+ // than using ThreadPoolExecutor. ThreadPoolExecutor does not create threads
+ // as would be expected from a thread pool.
+ backgroundExecutor = Executors.newCachedThreadPool(
+ new ThreadFactoryBuilder().setDaemon(true).setNameFormat("background")
+ .build());
backgroundExecutor.execute(waiter.runnable(asyncDocIdSender.worker()));
DocumentHandler docHandler = new DocumentHandler(
- docIdCodec, docIdCodec, journal, adaptor, authzAuthority,
+ docIdCodec, docIdCodec, journal, adaptor, adaptorContext.authzAuthority,
config.getGsaHostname(),
config.getServerFullAccessHosts(),
samlServiceProvider, createTransformPipeline(), aclTransform,
@@ -334,6 +331,9 @@
// have been started yet.
scheduler = new CronScheduler(scheduleExecutor);
+ docIdFullPusher = new OneAtATimeRunnable(
+ new PushRunnable(adaptorContext.fullExceptionHandler),
+ new AlreadyRunningRunnable());
sendDocIdsFuture = scheduler.schedule(
config.getAdaptorFullListingSchedule(),
waiter.runnable(new BackgroundRunnable(docIdFullPusher)));
@@ -342,9 +342,10 @@
checkAndScheduleImmediatePushOfDocIds();
}
- if (pollingIncrementalLister != null) {
+ if (adaptorContext.pollingIncrementalLister != null) {
docIdIncrementalPusher = new OneAtATimeRunnable(
- new IncrementalPushRunnable(pollingIncrementalLister),
+ new IncrementalPushRunnable(adaptorContext.pollingIncrementalLister,
+ adaptorContext.incrExceptionHandler),
new AlreadyRunningRunnable());
scheduleExecutor.scheduleAtFixedRate(
@@ -355,7 +356,7 @@
}
dashboard = new Dashboard(config, this, journal, sessionManager,
- secureValueCodec, adaptor, statusSources);
+ secureValueCodec, adaptor, adaptorContext.statusSources);
dashboard.start(dashboardScope);
shuttingDownLatch = null;
@@ -615,45 +616,55 @@
}
private synchronized void realStop(long time, TimeUnit unit) {
+ if (adaptorContext != null) {
+ adaptorContext.freeze();
+ }
+ config.removeConfigModificationListener(gsaConfigModListener);
if (scope != null) {
scope.close();
scope = null;
-
+ }
+ if (dashboardScope != null) {
+ // Post-Adaptor.init() resources need to be stopped.
dashboardScope.close();
dashboardScope = null;
+ scheduleExecutor.shutdownNow();
+ scheduleExecutor = null;
+
+ backgroundExecutor.shutdownNow();
+ backgroundExecutor = null;
+
+ scheduler = null;
+ sendDocIdsFuture = null;
+
+ docIdIncrementalPusher = null;
+
dashboard.stop();
dashboard = null;
- }
- if (scheduleExecutor != null) {
- scheduleExecutor.shutdownNow();
- }
- if (backgroundExecutor != null) {
- backgroundExecutor.shutdownNow();
- }
- if (waiter != null) {
+
+ // Clear references set by Adaptor via AdaptorContext.
+ docIdFullPusher = null;
+ docIdIncrementalPusher = null;
+
try {
waiter.shutdown(time, unit);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
+ waiter = null;
}
try {
adaptor.destroy();
} finally {
- // Wait until after adaptor.destroy() to set things to null, so that the
- // AdaptorContext is usable until the very end.
- sendDocIdsFuture = null;
- scheduler = null;
- scheduleExecutor = null;
- backgroundExecutor = null;
- waiter = null;
+ // Wait until after adaptor.destroy() to shutdown things accessible by
+ // AdaptorContext, so that the AdaptorContext is usable until the very
+ // end.
+ secureValueCodec = null;
sessionManager = null;
docIdCodec = null;
- afterInit = false;
- pollingIncrementalLister = null;
- authnAuthority = null;
- authzAuthority = null;
+ docIdSender = null;
+ adaptorContext = null;
}
}
@@ -694,7 +705,10 @@
}
boolean isAdaptorIncremental() {
- return pollingIncrementalLister != null;
+ if (adaptorContext == null || adaptorContext.mutable) {
+ throw new IllegalStateException("Can only be used after init()");
+ }
+ return adaptorContext.pollingIncrementalLister != null;
}
boolean ensureLatestConfigLoaded() {
@@ -722,8 +736,11 @@
* Runnable that calls {@link DocIdSender#pushDocIds}.
*/
private class PushRunnable implements Runnable {
- private volatile ExceptionHandler handler
- = ExceptionHandlers.defaultHandler();
+ private final ExceptionHandler handler;
+
+ public PushRunnable(ExceptionHandler handler) {
+ this.handler = handler;
+ }
@Override
public void run() {
@@ -733,54 +750,33 @@
Thread.currentThread().interrupt();
}
}
-
- public void setGetDocIdsErrorHandler(ExceptionHandler handler) {
- if (handler == null) {
- throw new NullPointerException();
- }
- this.handler = handler;
- }
-
- public ExceptionHandler getGetDocIdsErrorHandler() {
- return handler;
- }
}
/**
* Runnable that performs incremental feed push.
*/
private class IncrementalPushRunnable implements Runnable {
- private volatile ExceptionHandler handler
- = ExceptionHandlers.defaultHandler();
- private PollingIncrementalLister adaptor;
+ private final ExceptionHandler handler;
+ private final PollingIncrementalLister incrementalLister;
- public IncrementalPushRunnable(PollingIncrementalLister adaptor) {
- this.adaptor = adaptor;
+ public IncrementalPushRunnable(PollingIncrementalLister incrementalLister,
+ ExceptionHandler handler) {
+ this.incrementalLister = incrementalLister;
+ this.handler = handler;
}
@Override
public void run() {
try {
docIdSender.pushIncrementalDocIdsFromAdaptor(
- pollingIncrementalLister, handler);
+ incrementalLister, handler);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
} catch (Exception ex) {
log.log(Level.WARNING, "Exception during incremental polling", ex);
}
}
-
- public void setGetDocIdsErrorHandler(ExceptionHandler handler) {
- if (handler == null) {
- throw new NullPointerException();
- }
- this.handler = handler;
- }
-
- public ExceptionHandler getGetDocIdsErrorHandler() {
- return handler;
- }
-}
+ }
/**
* Runnable that logs an error that {@link PushRunnable} is already executing.
@@ -859,6 +855,21 @@
* This class is thread-safe.
*/
private class AdaptorContextImpl implements AdaptorContext {
+ private boolean mutable = true;
+ private ExceptionHandler fullExceptionHandler
+ = ExceptionHandlers.defaultHandler();
+ private ExceptionHandler incrExceptionHandler
+ = ExceptionHandlers.defaultHandler();
+ private final List<StatusSource> statusSources
+ = new ArrayList<StatusSource>();
+ private PollingIncrementalLister pollingIncrementalLister;
+ private AuthnAuthority authnAuthority;
+ private AuthzAuthority authzAuthority;
+
+ private synchronized void freeze() {
+ mutable = false;
+ }
+
@Override
public Config getConfig() {
return config;
@@ -875,42 +886,45 @@
}
@Override
- public void addStatusSource(StatusSource source) {
- if (afterInit) {
+ public synchronized void addStatusSource(StatusSource source) {
+ if (!mutable) {
throw new IllegalStateException("After init()");
}
statusSources.add(source);
}
@Override
- public void setGetDocIdsFullErrorHandler(ExceptionHandler handler) {
- if (afterInit) {
- throw new IllegalStateException("After init()");
- }
- ((PushRunnable) docIdFullPusher.getRunnable())
- .setGetDocIdsErrorHandler(handler);
- }
-
- @Override
- public ExceptionHandler getGetDocIdsFullErrorHandler() {
- return ((PushRunnable) docIdFullPusher.getRunnable())
- .getGetDocIdsErrorHandler();
- }
-
- @Override
- public void setGetDocIdsIncrementalErrorHandler(
+ public synchronized void setGetDocIdsFullErrorHandler(
ExceptionHandler handler) {
- if (afterInit) {
+ if (!mutable) {
throw new IllegalStateException("After init()");
}
- ((PushRunnable) docIdFullPusher.getRunnable())
- .setGetDocIdsErrorHandler(handler);
+ if (handler == null) {
+ throw new NullPointerException();
+ }
+ fullExceptionHandler = handler;
}
@Override
- public ExceptionHandler getGetDocIdsIncrementalErrorHandler() {
- return ((PushRunnable) docIdFullPusher.getRunnable())
- .getGetDocIdsErrorHandler();
+ public synchronized ExceptionHandler getGetDocIdsFullErrorHandler() {
+ return fullExceptionHandler;
+ }
+
+ @Override
+ public synchronized void setGetDocIdsIncrementalErrorHandler(
+ ExceptionHandler handler) {
+ if (!mutable) {
+ throw new IllegalStateException("After init()");
+ }
+ if (handler == null) {
+ throw new NullPointerException();
+ }
+ incrExceptionHandler = handler;
+ }
+
+ @Override
+ public synchronized ExceptionHandler getGetDocIdsIncrementalErrorHandler() {
+ return incrExceptionHandler;
}
@Override
@@ -919,8 +933,9 @@
}
@Override
- public HttpContext createHttpContext(String path, HttpHandler handler) {
- if (afterInit) {
+ public synchronized HttpContext createHttpContext(String path,
+ HttpHandler handler) {
+ if (!mutable) {
throw new IllegalStateException("After init()");
}
return addFilters(scope.createContext(path, handler));
@@ -945,27 +960,28 @@
}
@Override
- public void setPollingIncrementalLister(PollingIncrementalLister lister) {
- if (afterInit) {
+ public synchronized void setPollingIncrementalLister(
+ PollingIncrementalLister lister) {
+ if (!mutable) {
throw new IllegalStateException("After init()");
}
pollingIncrementalLister = lister;
}
@Override
- public void setAuthnAuthority(AuthnAuthority authnAuthority) {
- if (afterInit) {
+ public synchronized void setAuthnAuthority(AuthnAuthority authnAuthority) {
+ if (!mutable) {
throw new IllegalStateException("After init()");
}
- GsaCommunicationHandler.this.authnAuthority = authnAuthority;
+ this.authnAuthority = authnAuthority;
}
@Override
- public void setAuthzAuthority(AuthzAuthority authzAuthority) {
- if (afterInit) {
+ public synchronized void setAuthzAuthority(AuthzAuthority authzAuthority) {
+ if (!mutable) {
throw new IllegalStateException("After init()");
}
- GsaCommunicationHandler.this.authzAuthority = authzAuthority;
+ this.authzAuthority = authzAuthority;
}
}
}
diff --git a/test/com/google/enterprise/adaptor/GsaCommunicationHandlerTest.java b/test/com/google/enterprise/adaptor/GsaCommunicationHandlerTest.java
index 3ede388..7db5785 100644
--- a/test/com/google/enterprise/adaptor/GsaCommunicationHandlerTest.java
+++ b/test/com/google/enterprise/adaptor/GsaCommunicationHandlerTest.java
@@ -17,6 +17,7 @@
import static org.junit.Assert.*;
import com.sun.net.httpserver.HttpExchange;
+import com.sun.net.httpserver.HttpHandler;
import org.junit.*;
import org.junit.rules.ExpectedException;
@@ -93,6 +94,11 @@
StatusSource source = new MockStatusSource("test",
new MockStatus(Status.Code.NORMAL));
context.addStatusSource(source);
+
+ assertNotNull(context.createHttpContext("/test", new HttpHandler() {
+ @Override
+ public void handle(HttpExchange ex) {}
+ }));
} catch (Throwable t) {
error.set(t);
}