Fix calling watchdog from different thread

Previously it was assumed that Response would only be accessed on the
primary thread for a request. However, our own builtin Command Line
Adaptor actually used Response from a different thread. With this change
you can now use Response on a different thread than getDocContent(),
although the object is still not thread-safe.
diff --git a/src/com/google/enterprise/adaptor/DocumentHandler.java b/src/com/google/enterprise/adaptor/DocumentHandler.java
index 813fa87..f72fb8e 100644
--- a/src/com/google/enterprise/adaptor/DocumentHandler.java
+++ b/src/com/google/enterprise/adaptor/DocumentHandler.java
@@ -205,7 +205,8 @@
       }
 
       DocumentRequest request = new DocumentRequest(ex, docId);
-      DocumentResponse response = new DocumentResponse(ex, docId);
+      DocumentResponse response
+          = new DocumentResponse(ex, docId, Thread.currentThread());
       journal.recordRequestProcessingStart();
       watchdog.processingStarting(headerTimeoutMillis);
       try {
@@ -537,6 +538,7 @@
    * need to be very aware of all the different possibilities.
    */
   private class DocumentResponse implements Response {
+    private Thread workingThread;
     private State state = State.SETUP;
     private HttpExchange ex;
     // Whether ex.getResponseBody().close() has been called while we are in the
@@ -562,9 +564,10 @@
     private boolean lock;
     private Map<String, Acl> fragments = new TreeMap<String, Acl>();
 
-    public DocumentResponse(HttpExchange ex, DocId docId) {
+    public DocumentResponse(HttpExchange ex, DocId docId, Thread thread) {
       this.ex = ex;
       this.docId = docId;
+      this.workingThread = thread;
     }
 
     @Override
@@ -841,8 +844,8 @@
       }
       // There are separate timeouts for sending headers and sending content.
       // Here we stop the headers timer and start the content timer.
-      watchdog.processingCompleted();
-      watchdog.processingStarting(contentTimeoutMillis);
+      watchdog.processingCompleted(workingThread);
+      watchdog.processingStarting(workingThread, contentTimeoutMillis);
       HttpExchanges.startResponse(
           ex, HttpURLConnection.HTTP_OK, contentType, hasContent);
       for (Map.Entry<String, Acl> fragment : fragments.entrySet()) {
diff --git a/src/com/google/enterprise/adaptor/Watchdog.java b/src/com/google/enterprise/adaptor/Watchdog.java
index 65f4da9..4fee053 100644
--- a/src/com/google/enterprise/adaptor/Watchdog.java
+++ b/src/com/google/enterprise/adaptor/Watchdog.java
@@ -33,8 +33,8 @@
  */
 class Watchdog {
   private final ScheduledExecutorService executor;
-  private final ThreadLocal<FutureInfo> inProcess
-      = new ThreadLocal<FutureInfo>();
+  private final ConcurrentMap<Thread, FutureInfo> inProcess
+      = new ConcurrentHashMap<Thread, FutureInfo>();
 
   /**
    * @param executor executor to schedule tasks
@@ -50,23 +50,36 @@
    * @param timeout maximum allowed duration in milliseconds
    */
   public void processingStarting(long timeout) {
-    if (inProcess.get() != null) {
+    processingStarting(Thread.currentThread(), timeout);
+  }
+
+  public void processingStarting(Thread thread, long timeout) {
+    if (inProcess.get(thread) != null) {
       throw new IllegalStateException("Processing is already occuring on the "
           + "thread");
     }
     AtomicBoolean interruptNeeded = new AtomicBoolean(true);
-    Runnable task = new Interrupter(Thread.currentThread(), interruptNeeded);
+    Runnable task = new Interrupter(thread, interruptNeeded);
     Future<?> future = executor.schedule(task, timeout, TimeUnit.MILLISECONDS);
-    inProcess.set(new FutureInfo(future, interruptNeeded));
+    FutureInfo info = new FutureInfo(future, interruptNeeded);
+    if (inProcess.putIfAbsent(thread, info) != null) {
+      // Reset state and try again.
+      future.cancel(false);
+      processingStarting(thread, timeout);
+      return;
+    }
   }
 
   public void processingCompleted() {
-    FutureInfo info = inProcess.get();
+    processingCompleted(Thread.currentThread());
+  }
+
+  public void processingCompleted(Thread thread) {
+    FutureInfo info = inProcess.remove(thread);
     if (info == null) {
       throw new IllegalStateException("No processing was started on the "
           + "thread");
     }
-    inProcess.remove();
     // Prevent Interrupter from running if it hasn't started already. It may
     // still be running after this call and Future doesn't tell us if it is
     // currently running.
@@ -80,7 +93,7 @@
         // Interrupter has interrupted this thread.
         // Clear the interrupt, if not already cleared, since we don't want to
         // interrupt this thread any further.
-        Thread.currentThread().interrupted();
+        thread.interrupted();
       }
     }
   }
diff --git a/test/com/google/enterprise/adaptor/MockWatchdog.java b/test/com/google/enterprise/adaptor/MockWatchdog.java
index 1b89c5b..a46153e 100644
--- a/test/com/google/enterprise/adaptor/MockWatchdog.java
+++ b/test/com/google/enterprise/adaptor/MockWatchdog.java
@@ -37,6 +37,14 @@
   }
 
   @Override
+  public void processingStarting(Thread thread, long timout) {
+  }
+
+  @Override
   public void processingCompleted() {
   }
+
+  @Override
+  public void processingCompleted(Thread thread) {
+  }
 }
diff --git a/test/com/google/enterprise/adaptor/WatchdogTest.java b/test/com/google/enterprise/adaptor/WatchdogTest.java
index ed6d078..bf2629c 100644
--- a/test/com/google/enterprise/adaptor/WatchdogTest.java
+++ b/test/com/google/enterprise/adaptor/WatchdogTest.java
@@ -20,6 +20,7 @@
 import org.junit.rules.ExpectedException;
 
 import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Tests for {@link Watchdog}.
@@ -56,6 +57,27 @@
   }
 
   @Test
+  public void testDifferentThread() throws InterruptedException {
+    watchdog = new Watchdog(executor);
+    final Thread thread = Thread.currentThread();
+    watchdog.processingStarting(1000);
+    final AtomicBoolean success = new AtomicBoolean();
+    Thread thread2 = new Thread() {
+      @Override
+      public void run() {
+        watchdog.processingCompleted(thread);
+        watchdog.processingStarting(thread, 1000);
+        success.set(true);
+      }
+    };
+    thread2.start();
+    thread2.join();
+    assertTrue(success.get());
+
+    watchdog.processingCompleted();
+  }
+
+  @Test
   public void testNoInterruption() throws InterruptedException {
     watchdog = new Watchdog(executor);
     watchdog.processingStarting(5);