Adding use of AsyncDocIdPusher

Adding changes to allow the adaptor so use the AsyncDocIdPusher interface. With
using AsyncDocIdPusher, the adaptor will not need the monitor thread pushes the
changes to the GSA. Instead, when an file incremental change notification is
sent to the adaptor, the adaptor will pushed file Records to the
AsyncDocIdPusher. The library will be managing batching and send the Records to
the GSA.
diff --git a/src/com/google/enterprise/adaptor/fs/BlockingQueueBatcher.java b/src/com/google/enterprise/adaptor/fs/BlockingQueueBatcher.java
deleted file mode 100644
index 393433f..0000000
--- a/src/com/google/enterprise/adaptor/fs/BlockingQueueBatcher.java
+++ /dev/null
@@ -1,102 +0,0 @@
-// Copyright 2013 Google Inc. All Rights Reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//      http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.google.enterprise.adaptor.fs;
-
-import com.google.common.annotations.VisibleForTesting;
-
-import java.util.Collection;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-/** Batches elements from a BlockingQueue. */
-class BlockingQueueBatcher {
-  /** Interface for classes that can provide a relative time stamp. */
-  interface RelativeTimeProvider {
-    /** Returns a time stamp that can be used for calculating elapsed time. */
-    public long relativeTime(TimeUnit timeUnit);
-  }
-  static class SystemRelativeTimeProvider implements RelativeTimeProvider {
-    @Override
-    public long relativeTime(TimeUnit timeUnit) {
-      return timeUnit.convert(System.nanoTime(), TimeUnit.NANOSECONDS);
-    }
-  }
-
-  @VisibleForTesting
-  static RelativeTimeProvider timeProvider = new SystemRelativeTimeProvider();
-
-  // Prevent instantiation.
-  private BlockingQueueBatcher() {}
-
-  /**
-   * Read a batch-worth of elements from {@code queue}, placing them in {@code
-   * batch}, blocking until a batch is ready. No element will be delayed waiting
-   * for the batch to complete longer than {@code maxLatency}. Latency should
-   * not be confused with a timeout for the overall call, since the latency
-   * applies only once an element arrives and begins the moment the first
-   * element arrives.
-   *
-   * <p>At least one element will be added to {@code batch}, except if an
-   * exception is thrown.
-   *
-   * <p>Uses of this method that reuse {@code batch} should not forget to remove
-   * items from the collection after they are consumed. Otherwise, they will
-   * accumulate.
-   *
-   * @return number of elements added to {@code batch}
-   * @throws InterruptedException if interrupted while waiting
-   */
-  public static <T> int take(BlockingQueue<T> queue,
-      Collection<? super T> batch, int maxBatchSize, long maxLatency,
-      TimeUnit maxLatencyUnit) throws InterruptedException {
-    long maxLatencyNanos = maxLatencyUnit.toNanos(maxLatency);
-
-    int curBatchSize = 0;
-    long stopBatchTimeNanos = -1;
-
-    // The loop flow is 1) block, 2) drain queue, 3) possibly consume batch.
-    while (true) {
-      boolean timeout = false;
-      if (stopBatchTimeNanos == -1) {
-        // Start of new batch. Block for the first item of this batch.
-        batch.add(queue.take());
-        curBatchSize++;
-        stopBatchTimeNanos = timeProvider.relativeTime(TimeUnit.NANOSECONDS)
-            + maxLatencyNanos;
-      } else {
-        // Continue existing batch. Block until an item is in the queue or the
-        // batch timeout expires.
-        T element = queue.poll(
-            stopBatchTimeNanos
-                - timeProvider.relativeTime(TimeUnit.NANOSECONDS),
-            TimeUnit.NANOSECONDS);
-        if (element == null) {
-          // Timeout occurred.
-          break;
-        }
-        batch.add(element);
-        curBatchSize++;
-      }
-      curBatchSize += queue.drainTo(batch, maxBatchSize - curBatchSize);
-
-      if (curBatchSize >= maxBatchSize) {
-        // End current batch.
-        break;
-      }
-    }
-
-    return curBatchSize;
-  }
-}
diff --git a/src/com/google/enterprise/adaptor/fs/FileDelegate.java b/src/com/google/enterprise/adaptor/fs/FileDelegate.java
index 0e65917..1bdd715 100644
--- a/src/com/google/enterprise/adaptor/fs/FileDelegate.java
+++ b/src/com/google/enterprise/adaptor/fs/FileDelegate.java
@@ -14,6 +14,7 @@
 
 package com.google.enterprise.adaptor.fs;
 
+import com.google.enterprise.adaptor.AsyncDocIdPusher;
 import com.google.enterprise.adaptor.DocId;
 
 import java.io.IOException;
@@ -23,7 +24,6 @@
 import java.nio.file.attribute.FileTime;
 import java.nio.file.DirectoryStream;
 import java.nio.file.Path;
-import java.util.concurrent.BlockingQueue;
 
 interface FileDelegate {
   /**
@@ -134,7 +134,7 @@
    */
   DocId newDocId(Path doc) throws IOException;
 
-  void startMonitorPath(Path watchPath, BlockingQueue<Path> queue)
+  void startMonitorPath(Path watchPath, AsyncDocIdPusher pusher)
       throws IOException;
 
   void stopMonitorPath();
diff --git a/src/com/google/enterprise/adaptor/fs/FsAdaptor.java b/src/com/google/enterprise/adaptor/fs/FsAdaptor.java
index d4c3099..6c29b17 100644
--- a/src/com/google/enterprise/adaptor/fs/FsAdaptor.java
+++ b/src/com/google/enterprise/adaptor/fs/FsAdaptor.java
@@ -134,9 +134,6 @@
   /** The namespace applied to ACL Principals. */
   private String namespace;
 
-  /** The filesystem change monitor. */
-  private FsMonitor monitor;
-
   private AdaptorContext context;
   private Path rootPath;
   private boolean isDfsUnc;
@@ -174,11 +171,6 @@
     return namespace;
   }
 
-  @VisibleForTesting
-  BlockingQueue<Path> getFsMonitorQueue() {
-    return monitor.getQueue();
-  }
-
   @Override
   public void initConfig(Config config) {
     config.addKey(CONFIG_SRC, null);
@@ -248,30 +240,14 @@
     log.log(Level.CONFIG, "supportedWindowsAccounts: {0}",
         supportedWindowsAccounts);
 
-    int maxFeed = Integer.parseInt(
-        context.getConfig().getValue("feed.maxUrls"));
-    long maxLatencyMillis = 1000L * Integer.parseInt(
-        context.getConfig().getValue(CONFIG_MAX_INCREMENTAL_LATENCY));
-
     rootPathDocId = delegate.newDocId(rootPath);
-    monitor = new FsMonitor(delegate, context.getDocIdPusher(), maxFeed,
-        maxLatencyMillis);
-    delegate.startMonitorPath(rootPath, monitor.getQueue());
-    monitor.start();
-
+    delegate.startMonitorPath(rootPath, context.getAsyncDocIdPusher());
     context.setPollingIncrementalLister(this);
   }
 
   @Override
   public void destroy() {
     delegate.destroy();
-    // TODO (bmj): The check for null monitor is strictly for the tests,
-    // some of which may not have fully initialized the adaptor.  Maybe
-    // look into handling this less obtrusively in the future.
-    if (monitor != null) {
-      monitor.destroy();
-      monitor = null;
-    }
   }
 
   private ShareAcls readShareAcls() throws IOException {
@@ -557,93 +533,6 @@
     return false;
   }
 
-  private class FsMonitor {
-    private final DocIdPusher pusher;
-    private final PushThread pushThread;
-    private final BlockingQueue<Path> queue;
-    private final int maxFeedSize;
-    private final long maxLatencyMillis;
-
-    public FsMonitor(FileDelegate delegate, DocIdPusher pusher,
-        int maxFeedSize, long maxLatencyMillis) {
-      Preconditions.checkNotNull(delegate, "the delegate may not be null");
-      Preconditions.checkNotNull(pusher, "the DocId pusher may not be null");
-      Preconditions.checkArgument(maxFeedSize > 0,
-          "the maxFeedSize must be greater than zero");
-      Preconditions.checkArgument(maxLatencyMillis >= 0,
-          "the maxLatencyMillis must be greater than or equal to zero");
-      this.pusher = pusher;
-      this.maxFeedSize = maxFeedSize;
-      this.maxLatencyMillis = maxLatencyMillis;
-      queue = new LinkedBlockingQueue<Path>(20 * maxFeedSize);
-      pushThread = new PushThread();
-    }
-
-    public BlockingQueue<Path> getQueue() {
-      return queue;
-    }
-
-    public void start() {
-      pushThread.start();
-    }
-
-    public synchronized void destroy() {
-      pushThread.terminate();
-      try {
-        pushThread.join();
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-      }
-    }
-
-    private class PushThread extends Thread {
-      public PushThread() {
-      }
-
-      public void terminate() {
-        interrupt();
-      }
-
-      public void run() {
-        log.entering("FsMonitor", "PushThread.run");
-        Set<Path> docs = new HashSet<Path>();
-        Set<Record> records = new HashSet<Record>();
-        while (true) {
-          try {
-            BlockingQueueBatcher.take(queue, docs, maxFeedSize,
-                maxLatencyMillis, TimeUnit.MILLISECONDS);
-            createRecords(records, docs);
-            log.log(Level.FINER, "Sending crawl immediately records: {0}",
-                records);
-            pusher.pushRecords(records);
-            records.clear();
-            docs.clear();
-          } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            break;
-          }
-        }
-        log.exiting("FsMonitor", "PushThread.run");
-      }
-
-      private void createRecords(Set<Record> records, Collection<Path> docs) {
-        for (Path doc : docs) {
-          try {
-            if (isSupportedPath(doc)) {
-              records.add(new DocIdPusher.Record.Builder(delegate.newDocId(doc))
-                  .setCrawlImmediately(true).build());
-            } else {
-              log.log(Level.INFO,
-                  "Skipping path {0}. It is not a supported file type.", doc);
-            }
-          } catch (IOException e) {
-            log.log(Level.WARNING, "Unable to create new DocId for " + doc, e);
-          }
-        }
-      }
-    }
-  }
-
   private class ShareAcls {
     private final Acl shareAcl;
     private final Acl dfsShareAcl;
diff --git a/src/com/google/enterprise/adaptor/fs/WindowsFileDelegate.java b/src/com/google/enterprise/adaptor/fs/WindowsFileDelegate.java
index 07c106d..c1d19ec 100644
--- a/src/com/google/enterprise/adaptor/fs/WindowsFileDelegate.java
+++ b/src/com/google/enterprise/adaptor/fs/WindowsFileDelegate.java
@@ -16,7 +16,9 @@
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
+import com.google.enterprise.adaptor.AsyncDocIdPusher;
 import com.google.enterprise.adaptor.DocId;
+import com.google.enterprise.adaptor.DocIdPusher;
 import com.google.enterprise.adaptor.fs.WinApi.Netapi32Ex;
 import com.google.enterprise.adaptor.fs.WinApi.Shlwapi;
 
@@ -50,7 +52,6 @@
 import java.nio.file.Paths;
 import java.util.Arrays;
 import java.util.List;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -221,7 +222,7 @@
   }
 
   @Override
-  public void startMonitorPath(Path watchPath, BlockingQueue<Path> queue)
+  public void startMonitorPath(Path watchPath, AsyncDocIdPusher pusher)
       throws IOException {
     // Stop the current running monitor thread.
     stopMonitorPath();
@@ -233,7 +234,7 @@
 
     CountDownLatch startSignal = new CountDownLatch(1);
     synchronized (monitorThreadLock) {
-      monitorThread = new MonitorThread(watchPath, queue, startSignal);
+      monitorThread = new MonitorThread(watchPath, pusher, startSignal);
       monitorThread.start();
     }
     // Wait for the monitor thread to start watching filesystem.
@@ -254,20 +255,20 @@
     }
   }
 
-  private static class MonitorThread extends Thread {
+  private class MonitorThread extends Thread {
     private final Path watchPath;
-    private final BlockingQueue<Path> queue;
+    private final AsyncDocIdPusher pusher;
     private final CountDownLatch startSignal;
     private final HANDLE stopEvent;
 
-    public MonitorThread(Path watchPath, BlockingQueue<Path> queue,
+    public MonitorThread(Path watchPath, AsyncDocIdPusher pusher,
         CountDownLatch startSignal) {
       Preconditions.checkNotNull(watchPath, "the watchPath may not be null");
-      Preconditions.checkNotNull(queue, "the queue may not be null");
+      Preconditions.checkNotNull(pusher, "the pusher may not be null");
       Preconditions.checkNotNull(startSignal,
                                  "the start signal may not be null");
       this.watchPath = watchPath;
-      this.queue = queue;
+      this.pusher = pusher;
       this.startSignal = startSignal;
       stopEvent = Kernel32.INSTANCE.CreateEvent(null, false, false, null);
     }
@@ -409,18 +410,18 @@
         switch (info.Action) {
           case Kernel32.FILE_ACTION_MODIFIED:
             log.log(Level.FINEST, "Modified: {0}", changePath);
-            offerPath(changePath);
+            pushPath(changePath);
             break;
           case Kernel32.FILE_ACTION_ADDED:
           case Kernel32.FILE_ACTION_RENAMED_NEW_NAME:
             log.log(Level.FINEST, "Added: {0}", changePath);
-            offerPath(changePath.getParent());
+            pushPath(changePath.getParent());
             break;
           case Kernel32.FILE_ACTION_REMOVED:
           case Kernel32.FILE_ACTION_RENAMED_OLD_NAME:
             log.log(Level.FINEST, "Removed: {0}", changePath);
-            offerPath(changePath);
-            offerPath(changePath.getParent());
+            pushPath(changePath);
+            pushPath(changePath.getParent());
             break;
           default:
             // Nothing to do here.
@@ -430,10 +431,21 @@
       } while (info != null);
     }
 
-    private void offerPath(Path path) {
-      if (!queue.offer(path)) {
-        log.log(Level.INFO, "Unable to add path {0} to push queue. " +
-            "Incremental update notification will be lost.", path);
+    private void pushPath(Path doc) {
+      // For deleted, moved or renamed files we want to push the old name
+      // so in this case, feed it if the path does not exists.
+      boolean deletedOrMoved = !Files.exists(doc);
+      try {
+        if (deletedOrMoved || isRegularFile(doc) || isDirectory(doc)) {
+          pusher.pushRecord(new DocIdPusher.Record.Builder(newDocId(doc))
+              .setCrawlImmediately(true).build());
+        } else {
+          log.log(Level.INFO,
+              "Skipping path {0}. It is not a supported file type.", doc);
+        }
+      } catch (IOException e) {
+        log.log(Level.WARNING, "Unable to push the path " + doc +
+            " to the GSA.", e);
       }
     }
   }
diff --git a/test/com/google/enterprise/adaptor/fs/AccumulatingAsyncDocIdPusher.java b/test/com/google/enterprise/adaptor/fs/AccumulatingAsyncDocIdPusher.java
new file mode 100644
index 0000000..dc41fbf
--- /dev/null
+++ b/test/com/google/enterprise/adaptor/fs/AccumulatingAsyncDocIdPusher.java
@@ -0,0 +1,59 @@
+// Copyright 2014 Google Inc. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.google.enterprise.adaptor.fs;
+
+import com.google.common.collect.ImmutableSortedMap;
+import com.google.enterprise.adaptor.Acl;
+import com.google.enterprise.adaptor.AsyncDocIdPusher;
+import com.google.enterprise.adaptor.DocId;
+import com.google.enterprise.adaptor.DocIdPusher;
+
+import java.util.Collections;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+class AccumulatingAsyncDocIdPusher extends AccumulatingDocIdPusher
+    implements AsyncDocIdPusher {
+  private static final Logger log
+      = Logger.getLogger(AccumulatingAsyncDocIdPusher.class.getName());
+
+  @Override
+  public void pushDocId(DocId docId) {
+    try {
+      pushDocIds(Collections.singleton(docId));
+    } catch (InterruptedException e) {
+      log.warning("Interrupted. Aborted getDocIds");
+      Thread.currentThread().interrupt();
+    }
+  }
+  @Override
+  public void pushRecord(DocIdPusher.Record record) {
+    try {
+      pushRecords(Collections.singleton(record));
+    } catch (InterruptedException e) {
+      log.warning("Interrupted. Aborted getDocIds");
+      Thread.currentThread().interrupt();
+    }
+  }
+  @Override
+  public void pushNamedResource(DocId docId, Acl acl) {
+    try {
+      pushNamedResources(ImmutableSortedMap.of(docId, acl));
+    } catch (InterruptedException e) {
+      log.warning("Interrupted. Aborted getDocIds");
+      Thread.currentThread().interrupt();
+    }
+  }
+}
diff --git a/test/com/google/enterprise/adaptor/fs/BlockingQueueBatcherTest.java b/test/com/google/enterprise/adaptor/fs/BlockingQueueBatcherTest.java
deleted file mode 100644
index 19abc49..0000000
--- a/test/com/google/enterprise/adaptor/fs/BlockingQueueBatcherTest.java
+++ /dev/null
@@ -1,145 +0,0 @@
-// Copyright 2013 Google Inc. All Rights Reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//      http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.google.enterprise.adaptor.fs;
-
-import com.google.enterprise.adaptor.fs.BlockingQueueBatcher.RelativeTimeProvider;
-
-import static org.junit.Assert.*;
-
-import org.junit.*;
-import org.junit.rules.ExpectedException;
-
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.*;
-
-/** Tests for {@link BlockingQueueBatcher}. */
-public class BlockingQueueBatcherTest {
-  @Rule
-  public ExpectedException thrown = ExpectedException.none();
-
-  private final RelativeTimeProvider savedTimeProvider;
-
-  public BlockingQueueBatcherTest() {
-    savedTimeProvider = BlockingQueueBatcher.timeProvider;
-  }
-
-  @After
-  public void restoreTimeProvider() {
-    BlockingQueueBatcher.timeProvider = savedTimeProvider;
-  }
-
-  @Test(timeout = 500)
-  public void testAlreadyAvailable() throws Exception {
-    final List<Object> golden = Arrays.asList(
-        new Object(), new Object(), new Object());
-    BlockingQueue<Object> queue = new LinkedBlockingQueue<Object>();
-    queue.addAll(golden);
-    List<Object> list = new ArrayList<Object>();
-    // No blocking should occur.
-    assertEquals(golden.size(), BlockingQueueBatcher.take(
-        queue, list, golden.size(), 1, TimeUnit.SECONDS));
-    assertEquals(golden, list);
-  }
-
-  @Test(timeout = 500)
-  public void testBatchFilledWhileWaiting() throws Exception {
-    final List<Object> golden = Arrays.asList(
-        new Object(), new Object(), new Object());
-    final AtomicBoolean addedExtraElements = new AtomicBoolean();
-    BlockingQueue<Object> queue = new LinkedBlockingQueue<Object>() {
-      @Override
-      public Object poll(long timeout, TimeUnit unit) {
-        // This method should only be called once.
-        assertFalse(addedExtraElements.get());
-        addedExtraElements.set(true);
-        assertEquals(0, size());
-        // Add the third for later retrieval; the second will be returned now.
-        add(golden.get(2));
-        return golden.get(1);
-      }
-    };
-    queue.add(golden.get(0));
-    List<Object> list = new ArrayList<Object>();
-    // No blocking should occur.
-    assertEquals(golden.size(), BlockingQueueBatcher.take(
-        queue, list, golden.size(), 1, TimeUnit.SECONDS));
-    assertEquals(golden, list);
-  }
-
-  @Test
-  public void testTimeout() throws Exception {
-    final List<Object> golden = Arrays.asList(new Object(), new Object());
-    final AtomicLong currentTime = new AtomicLong(5);
-    BlockingQueueBatcher.timeProvider = new RelativeTimeProvider() {
-      public long relativeTime(TimeUnit unit) {
-        return unit.convert(currentTime.get(), TimeUnit.MILLISECONDS);
-      }
-    };
-    final AtomicBoolean timedOut = new AtomicBoolean();
-    BlockingQueue<Object> queue = new LinkedBlockingQueue<Object>() {
-      private long timesPolledCalled;
-
-      @Override
-      public Object take() throws InterruptedException {
-        Object o = super.take();
-        // Simulate 100ms passing.
-        currentTime.set(currentTime.get() + 100);
-        return o;
-      }
-
-      @Override
-      public Object poll(long timeout, TimeUnit unit) {
-        assertEquals(0, size());
-        if (timesPolledCalled == 0) {
-          timesPolledCalled++;
-          assertEquals(1000, unit.toMillis(timeout));
-          // Simulate 100ms passing.
-          currentTime.set(currentTime.get() + 100);
-          return golden.get(1);
-        } else if (timesPolledCalled == 1) {
-          timesPolledCalled++;
-          // the last poll() took 100ms, so 900ms is left of the second.
-          assertEquals(900, unit.toMillis(timeout));
-          // Claim that we timed out.
-          currentTime.set(currentTime.get() + 900);
-          timedOut.set(true);
-          return null;
-        } else {
-          fail("poll called more times than expected");
-          throw new AssertionError();
-        }
-      }
-    };
-    queue.add(golden.get(0));
-    List<Object> list = new ArrayList<Object>();
-    // No blocking should occur, because we overrode the queue's implementation.
-    // In normal circumstances it would block until timeout. If blocking did
-    // occur, it is likely the test is out-of-date.
-    assertEquals(golden.size(), BlockingQueueBatcher.take(
-        queue, list, 3, 1, TimeUnit.SECONDS));
-    assertEquals(golden, list);
-    assertTrue(timedOut.get());
-  }
-
-  @Test
-  public void testInterrupt() throws Exception {
-    BlockingQueue<Object> queue = new LinkedBlockingQueue<Object>();
-    List<Object> list = new ArrayList<Object>();
-    Thread.currentThread().interrupt();
-    thrown.expect(InterruptedException.class);
-    BlockingQueueBatcher.take(queue, list, 1, 1, TimeUnit.SECONDS);
-  }
-}
diff --git a/test/com/google/enterprise/adaptor/fs/FsAdaptorTest.java b/test/com/google/enterprise/adaptor/fs/FsAdaptorTest.java
index efb317c..a030ffe 100644
--- a/test/com/google/enterprise/adaptor/fs/FsAdaptorTest.java
+++ b/test/com/google/enterprise/adaptor/fs/FsAdaptorTest.java
@@ -847,93 +847,6 @@
   }
 
   @Test
-  public void testMonitorZeroFeedSize() throws Exception {
-    config.overrideKey("feed.maxUrls", "0");
-    thrown.expect(IllegalArgumentException.class);
-    adaptor.init(context);
-  }
-
-  @Test
-  public void testMonitorZeroLatency() throws Exception {
-    config.overrideKey("adaptor.incrementalPollPeriodSecs", "0");
-    adaptor.init(context);
-  }
-
-
-  @Test
-  public void testMonitorSubZeroLatency() throws Exception {
-    config.overrideKey("adaptor.incrementalPollPeriodSecs", "-1");
-    thrown.expect(IllegalArgumentException.class);
-    adaptor.init(context);
-  }
-
-  @Test
-  public void testMonitorUnsupportedPath() throws Exception {
-    root.addChildren(new MockFile("unsuppored").setIsRegularFile(false));
-    adaptor.init(context);
-    BlockingQueue<Path> queue = adaptor.getFsMonitorQueue();
-    queue.add(getPath("unsupported"));
-    Thread.sleep(100L);  // Allow FsMonitor to drain the queue.
-    // Verify it has been processed, but not been fed.
-    assertEquals(0, queue.size());
-    assertEquals(0, pusher.getRecords().size());
-    assertEquals(0, pusher.getNamedResources().size());
-  }
-
-  @Test
-  public void testMonitorOneFile() throws Exception {
-    String name = "test";
-    root.addChildren(new MockFile(name));
-    adaptor.init(context);
-    testMonitor(name);
-  }
-
-  @Test
-  public void testMonitorOneDirectory() throws Exception {
-    String name = "test.dir";
-    root.addChildren(new MockFile(name, true));
-    adaptor.init(context);
-    testMonitor(name);
-  }
-
-  @Test
-  public void testMonitorMultipleItems() throws Exception {
-    String dirname = "subdir";
-    String fname = "test.txt";
-    root.addChildren(new MockFile(dirname, true), new MockFile(fname));
-    adaptor.init(context);
-    testMonitor(dirname, fname);
-  }
-
-  @Test
-  public void testMonitorMultipleBatches() throws Exception {
-    String dirname = "subdir";
-    String fname = "test.txt";
-    root.addChildren(new MockFile(dirname, true), new MockFile(fname));
-    adaptor.init(context);
-    testMonitor(dirname);
-    // Make sure the previous batch does not get fed again.
-    pusher.reset();
-    testMonitor(fname);
-  }
-
-  private void testMonitor(String... names) throws Exception {
-    BlockingQueue<Path> queue = adaptor.getFsMonitorQueue();
-    for (String name : names) {
-      queue.add(getPath(name));
-    }
-    Thread.sleep(100L); // Allow FsMonitor to drain the queue.
-    assertEquals(0, queue.size());
-    assertEquals(0, pusher.getNamedResources().size());
-    Set<Record> expected = Sets.newHashSet();
-    for (String name : names) {
-      expected.add(new Record.Builder(getDocId(name))
-                   .setCrawlImmediately(true).build());
-    }
-    assertEquals(expected, Sets.newHashSet(pusher.getRecords()));
-  }
-
-  @Test
   public void testIncrementalShareAcls() throws Exception {
     adaptor.init(context);
 
diff --git a/test/com/google/enterprise/adaptor/fs/MockAdaptorContext.java b/test/com/google/enterprise/adaptor/fs/MockAdaptorContext.java
index 5ac7c2a..9edd2d5 100644
--- a/test/com/google/enterprise/adaptor/fs/MockAdaptorContext.java
+++ b/test/com/google/enterprise/adaptor/fs/MockAdaptorContext.java
@@ -37,6 +37,8 @@
 class MockAdaptorContext implements AdaptorContext {
   private final Config config = new Config();
   private final DocIdPusher docIdPusher = new AccumulatingDocIdPusher();
+  private final AsyncDocIdPusher asycDocIdPusher =
+      new AccumulatingAsyncDocIdPusher();
   private final DocIdEncoder docIdEncoder = new MockDocIdCodec();
 
   @Override
@@ -50,6 +52,11 @@
   }
 
   @Override
+  public AsyncDocIdPusher getAsyncDocIdPusher() {
+    return asycDocIdPusher;
+  }
+
+  @Override
   public DocIdEncoder getDocIdEncoder() {
     return docIdEncoder;
   }
@@ -108,9 +115,4 @@
   public void setAuthzAuthority(AuthzAuthority authzAuthority) {
     throw new UnsupportedOperationException();
   }
-
-  @Override
-  public AsyncDocIdPusher getAsyncDocIdPusher() {
-    throw new UnsupportedOperationException();
-  }
 }
diff --git a/test/com/google/enterprise/adaptor/fs/MockFileDelegate.java b/test/com/google/enterprise/adaptor/fs/MockFileDelegate.java
index 9a4c6f8..4764a8a 100644
--- a/test/com/google/enterprise/adaptor/fs/MockFileDelegate.java
+++ b/test/com/google/enterprise/adaptor/fs/MockFileDelegate.java
@@ -15,6 +15,7 @@
 package com.google.enterprise.adaptor.fs;
 
 import com.google.common.base.Preconditions;
+import com.google.enterprise.adaptor.AsyncDocIdPusher;
 import com.google.enterprise.adaptor.DocId;
 
 import java.io.FileNotFoundException;
@@ -161,7 +162,7 @@
   }
 
   @Override
-  public void startMonitorPath(Path watchPath, BlockingQueue<Path> queue)
+  public void startMonitorPath(Path watchPath, AsyncDocIdPusher pusher)
     throws IOException {
     // TODO (bmj): implementation
   }
diff --git a/test/com/google/enterprise/adaptor/fs/NioFileDelegateTest.java b/test/com/google/enterprise/adaptor/fs/NioFileDelegateTest.java
index 4db3dce..761fdd9 100644
--- a/test/com/google/enterprise/adaptor/fs/NioFileDelegateTest.java
+++ b/test/com/google/enterprise/adaptor/fs/NioFileDelegateTest.java
@@ -18,6 +18,7 @@
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Sets;
 import com.google.common.io.CharStreams;
+import com.google.enterprise.adaptor.AsyncDocIdPusher;
 
 import static org.junit.Assert.*;
 
@@ -199,7 +200,7 @@
     public void destroy() {}
 
     @Override
-    public void startMonitorPath(Path watchPath, BlockingQueue<Path> queue) {
+    public void startMonitorPath(Path watchPath, AsyncDocIdPusher pusher) {
       throw new UnsupportedOperationException();
     }
       
diff --git a/test/com/google/enterprise/adaptor/fs/WindowsFileDelegateTest.java b/test/com/google/enterprise/adaptor/fs/WindowsFileDelegateTest.java
index 0a536eb..fcacfb4 100644
--- a/test/com/google/enterprise/adaptor/fs/WindowsFileDelegateTest.java
+++ b/test/com/google/enterprise/adaptor/fs/WindowsFileDelegateTest.java
@@ -18,6 +18,8 @@
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Sets;
 import com.google.common.io.CharStreams;
+import com.google.enterprise.adaptor.Acl;
+import com.google.enterprise.adaptor.DocIdPusher;
 
 import static org.junit.Assert.*;
 import static org.junit.Assume.*;
@@ -39,9 +41,6 @@
 import java.nio.file.attribute.FileTime;
 import java.util.Collections;
 import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
 
 /** Tests for {@link WindowsFileDelegate} */
 public class WindowsFileDelegateTest {
@@ -52,7 +51,8 @@
   }
 
   private FileDelegate delegate = new WindowsFileDelegate();
-  private BlockingQueue<Path> queue = new LinkedBlockingQueue<Path>();
+  private AccumulatingAsyncDocIdPusher pusher =
+      new AccumulatingAsyncDocIdPusher();
   private Path tempRoot;
 
   @Rule
@@ -157,12 +157,12 @@
   public void testStartMonitorBadPath() throws Exception {
     Path file = newTempFile("test.txt");
     thrown.expect(IOException.class);
-    delegate.startMonitorPath(file, queue);
+    delegate.startMonitorPath(file, pusher);
   }
 
   @Test
   public void testStartStopMonitor() throws Exception {
-    delegate.startMonitorPath(tempRoot, queue);
+    delegate.startMonitorPath(tempRoot, pusher);
     delegate.stopMonitorPath();
   }
 
@@ -171,30 +171,31 @@
     // These shouldn't show up as new or modified.
     newTempDir("existingDir");
     newTempFile("existingFile");
-    delegate.startMonitorPath(tempRoot, queue);
+    delegate.startMonitorPath(tempRoot, pusher);
     Path file = newTempFile("test.txt");
     // Adding a file shows up as a change to its parent.
-    checkForChanges(Collections.singleton(tempRoot));
+    checkForChanges(Collections.singleton(newRecord(tempRoot)));
   }
 
   @Test
   public void testMonitorDeleteFile() throws Exception {
     Path file = newTempFile("test.txt");
-    delegate.startMonitorPath(tempRoot, queue);
+    delegate.startMonitorPath(tempRoot, pusher);
     Files.delete(file);
     // Deleting a file shows up as a change to itself and its parent.
-    checkForChanges(Sets.newHashSet(tempRoot, file));
+    checkForChanges(Sets.newHashSet(newRecord(tempRoot), newRecord(file)));
   }
 
   @Test
   public void testMonitorRenameFile() throws Exception {
     Path file = newTempFile("test.txt");
     Path newFile = file.resolveSibling("newName.txt");
-    delegate.startMonitorPath(tempRoot, queue);
+    delegate.startMonitorPath(tempRoot, pusher);
     Files.move(file, newFile, StandardCopyOption.ATOMIC_MOVE);
     // Renaming a file shows up as a change to its old name, its new name,
     // and its parent.
-    checkForChanges(Sets.newHashSet(tempRoot, file, newFile));
+    checkForChanges(Sets.newHashSet(newRecord(tempRoot), newRecord(file),
+        newRecord(newFile)));
   }
 
   @Test
@@ -203,42 +204,43 @@
     Path dir2 = newTempDir("dir2");
     Path file1 = newTempFile(dir1, "test.txt");
     Path file2 = dir2.resolve(file1.getFileName());
-    delegate.startMonitorPath(tempRoot, queue);
+    delegate.startMonitorPath(tempRoot, pusher);
     Files.move(file1, file2);
     // Moving a file shows up as a change to its old name, its new name,
     // its old parent, and its new parent.
-    checkForChanges(Sets.newHashSet(file1, file2, dir1, dir2));
+    checkForChanges(Sets.newHashSet(newRecord(file1), newRecord(file2),
+        newRecord(dir1), newRecord(dir2)));
   }
 
   @Test
   public void testMonitorModifyFile() throws Exception {
     Path file = newTempFile("test.txt");
-    delegate.startMonitorPath(tempRoot, queue);
+    delegate.startMonitorPath(tempRoot, pusher);
     Files.write(file, "Hello World".getBytes("UTF-8"));
     // Modifying a file shows up as a change to that file.
-    checkForChanges(Collections.singleton(file));
+    checkForChanges(Collections.singleton(newRecord(file)));
   }
 
   @Test
   public void testMonitorModifyFileAttributes() throws Exception {
     Path file = newTempFile("test.txt");
     FileTime lastModified = Files.getLastModifiedTime(file);
-    delegate.startMonitorPath(tempRoot, queue);
+    delegate.startMonitorPath(tempRoot, pusher);
     Files.setLastModifiedTime(file, 
         FileTime.fromMillis(lastModified.toMillis() + 10000L));
     // Modifying a file shows up as a change to that file.
-    checkForChanges(Collections.singleton(file));
+    checkForChanges(Collections.singleton(newRecord(file)));
   }
 
   @Test
   public void testMonitorRenameDir() throws Exception {
     Path dir = newTempDir("dir1");
     Path newDir = dir.resolveSibling("newName.dir");
-    delegate.startMonitorPath(tempRoot, queue);
+    delegate.startMonitorPath(tempRoot, pusher);
     Files.move(dir, newDir, StandardCopyOption.ATOMIC_MOVE);
     // Renaming a directory shows up as a change to its old name, its new name,
     // and its parent.
-    checkForChanges(Sets.newHashSet(tempRoot, dir));
+    checkForChanges(Sets.newHashSet(newRecord(tempRoot), newRecord(dir)));
   }
 
   @Test
@@ -246,46 +248,62 @@
     Path dir1 = newTempDir("dir1");
     Path dir2 = newTempDir("dir2");
     Path dir1dir2 = dir1.resolve(dir2.getFileName());
-    delegate.startMonitorPath(tempRoot, queue);
+    delegate.startMonitorPath(tempRoot, pusher);
     Files.move(dir2, dir1dir2);
     // Moving a file shows up as a change to its old name, its new name,
     // its old parent, and its new parent.
-    checkForChanges(Sets.newHashSet(tempRoot, dir1, dir2));
+    checkForChanges(Sets.newHashSet(newRecord(tempRoot), newRecord(dir1),
+        newRecord(dir2)));
   }
 
   @Test
   public void testMonitorChangesInSubDirs() throws Exception {
     Path dir = newTempDir("testDir");
     Path file = newTempFile(dir, "test.txt");
-    delegate.startMonitorPath(tempRoot, queue);
+    delegate.startMonitorPath(tempRoot, pusher);
     Files.write(file, "Hello World".getBytes("UTF-8"));
     // Modifying a file shows up as a change to that file.
-    checkForChanges(Collections.singleton(file));
+    checkForChanges(Sets.newHashSet(newRecord(file), newRecord(dir)));
   }
 
-  private void checkForChanges(Set<Path> expected) throws Exception {
-    // Collect up the changes.  Adapted from BlockingQueueBatcher.take(),
-    // but without infinite initial wait.
-    Set<Path> changes = Sets.newHashSet();
-    int maxBatchSize = expected.size();
-    long maxLatencyMillis = 10000;
-    long currentTime = System.currentTimeMillis();
-    long stopBatchTime = currentTime + maxLatencyMillis;
+  private void checkForChanges(Set<DocIdPusher.Record> expected)
+      throws Exception {
+    // Collect up the changes.
+    Set<DocIdPusher.Record> changes = Sets.newHashSet();
+    final long maxLatencyMillis = 10000;
+    long latencyMillis = maxLatencyMillis;
+    long batchLatencyMillis = 500;
+    boolean inFollowup = false;
 
-    while (currentTime < stopBatchTime && changes.size() < maxBatchSize) {
-      // Block until an item is in the queue or the batch timeout expires.
-      Path path =
-          queue.poll(stopBatchTime - currentTime, TimeUnit.MILLISECONDS);
-      if (path == null) {
-        // Timeout occurred.
+    while (latencyMillis > 0) {
+      Thread.sleep(batchLatencyMillis);
+      latencyMillis -= batchLatencyMillis;
+
+      changes.addAll(pusher.getRecords());
+      pusher.reset();
+      if (changes.size() == expected.size()) {
+        // If the changes size is equal to the expected size then
+        // keep listening for changes for the same period of time
+        // that it took to get the current notifications to see if
+        // we find any additional changes.
+        if (!inFollowup) {
+          latencyMillis = maxLatencyMillis - latencyMillis;
+          inFollowup = true;
+        }
+      }
+      if (changes.size() > expected.size()) {
+        // We've found more changes than are expected. Just stop
+        // listening, we'll fail below.
         break;
       }
-      changes.add(path);
-      queue.drainTo(changes);
-      currentTime = System.currentTimeMillis();
     }
 
     // Now verify that the changes we got were the ones that were expected.
     assertEquals(expected, changes);
   }
+
+  private DocIdPusher.Record newRecord(Path path) throws Exception {
+    return new DocIdPusher.Record.Builder(delegate.newDocId(path))
+        .setCrawlImmediately(true).build();
+  }
 }