Fix calling watchdog from different thread Previously it was assumed that Response would only be accessed on the primary thread for a request. However, our own builtin Command Line Adaptor actually used Response from a different thread. With this change you can now use Response on a different thread than getDocContent(), although the object is still not thread-safe.
diff --git a/src/com/google/enterprise/adaptor/DocumentHandler.java b/src/com/google/enterprise/adaptor/DocumentHandler.java index 813fa87..f72fb8e 100644 --- a/src/com/google/enterprise/adaptor/DocumentHandler.java +++ b/src/com/google/enterprise/adaptor/DocumentHandler.java
@@ -205,7 +205,8 @@ } DocumentRequest request = new DocumentRequest(ex, docId); - DocumentResponse response = new DocumentResponse(ex, docId); + DocumentResponse response + = new DocumentResponse(ex, docId, Thread.currentThread()); journal.recordRequestProcessingStart(); watchdog.processingStarting(headerTimeoutMillis); try { @@ -537,6 +538,7 @@ * need to be very aware of all the different possibilities. */ private class DocumentResponse implements Response { + private Thread workingThread; private State state = State.SETUP; private HttpExchange ex; // Whether ex.getResponseBody().close() has been called while we are in the @@ -562,9 +564,10 @@ private boolean lock; private Map<String, Acl> fragments = new TreeMap<String, Acl>(); - public DocumentResponse(HttpExchange ex, DocId docId) { + public DocumentResponse(HttpExchange ex, DocId docId, Thread thread) { this.ex = ex; this.docId = docId; + this.workingThread = thread; } @Override @@ -841,8 +844,8 @@ } // There are separate timeouts for sending headers and sending content. // Here we stop the headers timer and start the content timer. - watchdog.processingCompleted(); - watchdog.processingStarting(contentTimeoutMillis); + watchdog.processingCompleted(workingThread); + watchdog.processingStarting(workingThread, contentTimeoutMillis); HttpExchanges.startResponse( ex, HttpURLConnection.HTTP_OK, contentType, hasContent); for (Map.Entry<String, Acl> fragment : fragments.entrySet()) {
diff --git a/src/com/google/enterprise/adaptor/Watchdog.java b/src/com/google/enterprise/adaptor/Watchdog.java index 65f4da9..4fee053 100644 --- a/src/com/google/enterprise/adaptor/Watchdog.java +++ b/src/com/google/enterprise/adaptor/Watchdog.java
@@ -33,8 +33,8 @@ */ class Watchdog { private final ScheduledExecutorService executor; - private final ThreadLocal<FutureInfo> inProcess - = new ThreadLocal<FutureInfo>(); + private final ConcurrentMap<Thread, FutureInfo> inProcess + = new ConcurrentHashMap<Thread, FutureInfo>(); /** * @param executor executor to schedule tasks @@ -50,23 +50,36 @@ * @param timeout maximum allowed duration in milliseconds */ public void processingStarting(long timeout) { - if (inProcess.get() != null) { + processingStarting(Thread.currentThread(), timeout); + } + + public void processingStarting(Thread thread, long timeout) { + if (inProcess.get(thread) != null) { throw new IllegalStateException("Processing is already occuring on the " + "thread"); } AtomicBoolean interruptNeeded = new AtomicBoolean(true); - Runnable task = new Interrupter(Thread.currentThread(), interruptNeeded); + Runnable task = new Interrupter(thread, interruptNeeded); Future<?> future = executor.schedule(task, timeout, TimeUnit.MILLISECONDS); - inProcess.set(new FutureInfo(future, interruptNeeded)); + FutureInfo info = new FutureInfo(future, interruptNeeded); + if (inProcess.putIfAbsent(thread, info) != null) { + // Reset state and try again. + future.cancel(false); + processingStarting(thread, timeout); + return; + } } public void processingCompleted() { - FutureInfo info = inProcess.get(); + processingCompleted(Thread.currentThread()); + } + + public void processingCompleted(Thread thread) { + FutureInfo info = inProcess.remove(thread); if (info == null) { throw new IllegalStateException("No processing was started on the " + "thread"); } - inProcess.remove(); // Prevent Interrupter from running if it hasn't started already. It may // still be running after this call and Future doesn't tell us if it is // currently running. @@ -80,7 +93,7 @@ // Interrupter has interrupted this thread. // Clear the interrupt, if not already cleared, since we don't want to // interrupt this thread any further. - Thread.currentThread().interrupted(); + thread.interrupted(); } } }
diff --git a/test/com/google/enterprise/adaptor/MockWatchdog.java b/test/com/google/enterprise/adaptor/MockWatchdog.java index 1b89c5b..a46153e 100644 --- a/test/com/google/enterprise/adaptor/MockWatchdog.java +++ b/test/com/google/enterprise/adaptor/MockWatchdog.java
@@ -37,6 +37,14 @@ } @Override + public void processingStarting(Thread thread, long timout) { + } + + @Override public void processingCompleted() { } + + @Override + public void processingCompleted(Thread thread) { + } }
diff --git a/test/com/google/enterprise/adaptor/WatchdogTest.java b/test/com/google/enterprise/adaptor/WatchdogTest.java index ed6d078..bf2629c 100644 --- a/test/com/google/enterprise/adaptor/WatchdogTest.java +++ b/test/com/google/enterprise/adaptor/WatchdogTest.java
@@ -20,6 +20,7 @@ import org.junit.rules.ExpectedException; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; /** * Tests for {@link Watchdog}. @@ -56,6 +57,27 @@ } @Test + public void testDifferentThread() throws InterruptedException { + watchdog = new Watchdog(executor); + final Thread thread = Thread.currentThread(); + watchdog.processingStarting(1000); + final AtomicBoolean success = new AtomicBoolean(); + Thread thread2 = new Thread() { + @Override + public void run() { + watchdog.processingCompleted(thread); + watchdog.processingStarting(thread, 1000); + success.set(true); + } + }; + thread2.start(); + thread2.join(); + assertTrue(success.get()); + + watchdog.processingCompleted(); + } + + @Test public void testNoInterruption() throws InterruptedException { watchdog = new Watchdog(executor); watchdog.processingStarting(5);