Fix calling watchdog from different thread
Previously it was assumed that Response would only be accessed on the
primary thread for a request. However, our own builtin Command Line
Adaptor actually used Response from a different thread. With this change
you can now use Response on a different thread than getDocContent(),
although the object is still not thread-safe.
diff --git a/src/com/google/enterprise/adaptor/DocumentHandler.java b/src/com/google/enterprise/adaptor/DocumentHandler.java
index 813fa87..f72fb8e 100644
--- a/src/com/google/enterprise/adaptor/DocumentHandler.java
+++ b/src/com/google/enterprise/adaptor/DocumentHandler.java
@@ -205,7 +205,8 @@
}
DocumentRequest request = new DocumentRequest(ex, docId);
- DocumentResponse response = new DocumentResponse(ex, docId);
+ DocumentResponse response
+ = new DocumentResponse(ex, docId, Thread.currentThread());
journal.recordRequestProcessingStart();
watchdog.processingStarting(headerTimeoutMillis);
try {
@@ -537,6 +538,7 @@
* need to be very aware of all the different possibilities.
*/
private class DocumentResponse implements Response {
+ private Thread workingThread;
private State state = State.SETUP;
private HttpExchange ex;
// Whether ex.getResponseBody().close() has been called while we are in the
@@ -562,9 +564,10 @@
private boolean lock;
private Map<String, Acl> fragments = new TreeMap<String, Acl>();
- public DocumentResponse(HttpExchange ex, DocId docId) {
+ public DocumentResponse(HttpExchange ex, DocId docId, Thread thread) {
this.ex = ex;
this.docId = docId;
+ this.workingThread = thread;
}
@Override
@@ -841,8 +844,8 @@
}
// There are separate timeouts for sending headers and sending content.
// Here we stop the headers timer and start the content timer.
- watchdog.processingCompleted();
- watchdog.processingStarting(contentTimeoutMillis);
+ watchdog.processingCompleted(workingThread);
+ watchdog.processingStarting(workingThread, contentTimeoutMillis);
HttpExchanges.startResponse(
ex, HttpURLConnection.HTTP_OK, contentType, hasContent);
for (Map.Entry<String, Acl> fragment : fragments.entrySet()) {
diff --git a/src/com/google/enterprise/adaptor/Watchdog.java b/src/com/google/enterprise/adaptor/Watchdog.java
index 65f4da9..4fee053 100644
--- a/src/com/google/enterprise/adaptor/Watchdog.java
+++ b/src/com/google/enterprise/adaptor/Watchdog.java
@@ -33,8 +33,8 @@
*/
class Watchdog {
private final ScheduledExecutorService executor;
- private final ThreadLocal<FutureInfo> inProcess
- = new ThreadLocal<FutureInfo>();
+ private final ConcurrentMap<Thread, FutureInfo> inProcess
+ = new ConcurrentHashMap<Thread, FutureInfo>();
/**
* @param executor executor to schedule tasks
@@ -50,23 +50,36 @@
* @param timeout maximum allowed duration in milliseconds
*/
public void processingStarting(long timeout) {
- if (inProcess.get() != null) {
+ processingStarting(Thread.currentThread(), timeout);
+ }
+
+ public void processingStarting(Thread thread, long timeout) {
+ if (inProcess.get(thread) != null) {
throw new IllegalStateException("Processing is already occuring on the "
+ "thread");
}
AtomicBoolean interruptNeeded = new AtomicBoolean(true);
- Runnable task = new Interrupter(Thread.currentThread(), interruptNeeded);
+ Runnable task = new Interrupter(thread, interruptNeeded);
Future<?> future = executor.schedule(task, timeout, TimeUnit.MILLISECONDS);
- inProcess.set(new FutureInfo(future, interruptNeeded));
+ FutureInfo info = new FutureInfo(future, interruptNeeded);
+ if (inProcess.putIfAbsent(thread, info) != null) {
+ // Reset state and try again.
+ future.cancel(false);
+ processingStarting(thread, timeout);
+ return;
+ }
}
public void processingCompleted() {
- FutureInfo info = inProcess.get();
+ processingCompleted(Thread.currentThread());
+ }
+
+ public void processingCompleted(Thread thread) {
+ FutureInfo info = inProcess.remove(thread);
if (info == null) {
throw new IllegalStateException("No processing was started on the "
+ "thread");
}
- inProcess.remove();
// Prevent Interrupter from running if it hasn't started already. It may
// still be running after this call and Future doesn't tell us if it is
// currently running.
@@ -80,7 +93,7 @@
// Interrupter has interrupted this thread.
// Clear the interrupt, if not already cleared, since we don't want to
// interrupt this thread any further.
- Thread.currentThread().interrupted();
+ thread.interrupted();
}
}
}
diff --git a/test/com/google/enterprise/adaptor/MockWatchdog.java b/test/com/google/enterprise/adaptor/MockWatchdog.java
index 1b89c5b..a46153e 100644
--- a/test/com/google/enterprise/adaptor/MockWatchdog.java
+++ b/test/com/google/enterprise/adaptor/MockWatchdog.java
@@ -37,6 +37,14 @@
}
@Override
+ public void processingStarting(Thread thread, long timout) {
+ }
+
+ @Override
public void processingCompleted() {
}
+
+ @Override
+ public void processingCompleted(Thread thread) {
+ }
}
diff --git a/test/com/google/enterprise/adaptor/WatchdogTest.java b/test/com/google/enterprise/adaptor/WatchdogTest.java
index ed6d078..bf2629c 100644
--- a/test/com/google/enterprise/adaptor/WatchdogTest.java
+++ b/test/com/google/enterprise/adaptor/WatchdogTest.java
@@ -20,6 +20,7 @@
import org.junit.rules.ExpectedException;
import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
/**
* Tests for {@link Watchdog}.
@@ -56,6 +57,27 @@
}
@Test
+ public void testDifferentThread() throws InterruptedException {
+ watchdog = new Watchdog(executor);
+ final Thread thread = Thread.currentThread();
+ watchdog.processingStarting(1000);
+ final AtomicBoolean success = new AtomicBoolean();
+ Thread thread2 = new Thread() {
+ @Override
+ public void run() {
+ watchdog.processingCompleted(thread);
+ watchdog.processingStarting(thread, 1000);
+ success.set(true);
+ }
+ };
+ thread2.start();
+ thread2.join();
+ assertTrue(success.get());
+
+ watchdog.processingCompleted();
+ }
+
+ @Test
public void testNoInterruption() throws InterruptedException {
watchdog = new Watchdog(executor);
watchdog.processingStarting(5);