Incremental Crawl Phase 2

This makes incremental crawl more efficient.
diff --git a/src/com/google/enterprise/adaptor/ad/AdAdaptor.java b/src/com/google/enterprise/adaptor/ad/AdAdaptor.java
index b18118f..0a0c899 100644
--- a/src/com/google/enterprise/adaptor/ad/AdAdaptor.java
+++ b/src/com/google/enterprise/adaptor/ad/AdAdaptor.java
@@ -30,6 +30,7 @@
 import java.text.MessageFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -187,12 +188,14 @@
       clearLastCompleteGroupCatalog();
       GroupCatalog cumulativeCatalog = makeFullCatalog();
       // all servers were able to successfully populate the catalog: do a push
-      cumulativeCatalog.resolveForeignSecurityPrincipals();
+      // TODO(myk): Rework the structure so that a member variable of
+      // cumulativeCatalog isn't passed in as a parameter to its own method.
+      cumulativeCatalog.resolveForeignSecurityPrincipals(
+          cumulativeCatalog.entities);
       Map<GroupPrincipal, List<Principal>> groups =
-          cumulativeCatalog.makeDefs();
+          cumulativeCatalog.makeDefs(cumulativeCatalog.entities);
       pusher.pushGroupDefinitions(groups, CASE_SENSITIVITY);
-      // TODO(myk): clear membership information from cache - retain only the
-      // entities in bySid, byDn, and wellKnownMembership.
+      cumulativeCatalog.members.clear();
       lastCompleteGroupCatalog = cumulativeCatalog;
     } finally {
       mutex.unlock();
@@ -200,8 +203,8 @@
     }
   }
 
-  private GroupCatalog makeFullCatalog() throws InterruptedException,
-      IOException {
+  @VisibleForTesting
+  GroupCatalog makeFullCatalog() throws InterruptedException, IOException {
     GroupCatalog cumulativeCatalog = new GroupCatalog(localizedStrings,
         namespace, feedBuiltinGroups);
     for (AdServer server : servers) {
@@ -209,7 +212,7 @@
         server.ensureConnectionIsCurrent();
         GroupCatalog catalog = new GroupCatalog(localizedStrings, namespace,
               feedBuiltinGroups);
-        catalog.readEverythingFrom(server);
+        catalog.readEverythingFrom(server, /*includeMembers=*/ true);
         cumulativeCatalog.add(catalog);
       } catch (NamingException ne) {
         String host = server.getHostName();
@@ -224,8 +227,7 @@
    * <p>
    * When a server cannot do an incremental push, it does a full crawl without
    * doing a push afterwards -- this sets up its state so that subsequent
-   * incremental pushes can work.  A future version of this method will do the
-   * full crawl ignoring the "member" attribute.
+   * incremental pushes can work.
    */
   @Override
   public void getModifiedDocIds(DocIdPusher pusher) throws InterruptedException,
@@ -237,39 +239,50 @@
     }
     try {
       log.log(Level.FINE, "getModifiedDocIds starting - acquired lock.");
-
-      for (AdServer server : servers) {
-        String previousServiceName = server.getDsServiceName();
-        String previousInvocationId = server.getInvocationID();
-        long previousHighestUSN = server.getHighestCommittedUSN();
-        try {
-          server.ensureConnectionIsCurrent();
-          // TODO(myk): combine each server's resulting Entities into one Set
-          lastCompleteGroupCatalog.readUpdatesFrom(server, previousServiceName,
-              previousInvocationId, previousHighestUSN);
-        } catch (NamingException ne) {
-          // invalidate the saved group catalog
-          clearLastCompleteGroupCatalog();
-          String host = server.getHostName();
-          throw new IOException("could not get entities from " + host, ne);
-        }
-      }
-
-      // all servers were able to successfully update the catalog: do a push
-      lastCompleteGroupCatalog.resolveForeignSecurityPrincipals();
-      Map<GroupPrincipal, List<Principal>> groups =
-          lastCompleteGroupCatalog.makeDefs();
-      // TODO(myk): pass on (only) new/modified entities to resolve/makeDefs,
-      // so that we're only pushing the new/modified groups below.
-      pusher.pushGroupDefinitions(groups, CASE_SENSITIVITY);
-      // TODO(myk): clear membership information from cache - retain only the
-      // entities in bySid, byDn, and wellKnownMembership.
+      getModifiedDocIdsHelper(pusher);
     } finally {
       mutex.unlock();
       log.log(Level.FINE, "getModifiedDocIds ending - lock released.");
     }
   }
 
+  @VisibleForTesting
+  void getModifiedDocIdsHelper(DocIdPusher pusher) throws InterruptedException,
+      IOException {
+    if (lastCompleteGroupCatalog == null) {
+      log.log(Level.FINE, "getModifiedDocIds doing a fetch with no push.");
+      lastCompleteGroupCatalog = makeFullCatalog();
+      return;
+    }
+
+    Set<AdEntity> allNewOrUpdatedEntities = new HashSet<AdEntity>();
+    for (AdServer server : servers) {
+      String previousServiceName = server.getDsServiceName();
+      String previousInvocationId = server.getInvocationID();
+      long previousHighestUSN = server.getHighestCommittedUSN();
+      try {
+        server.ensureConnectionIsCurrent();
+        allNewOrUpdatedEntities.addAll(
+            lastCompleteGroupCatalog.readUpdatesFrom(server,
+                previousServiceName, previousInvocationId,
+                previousHighestUSN));
+      } catch (NamingException ne) {
+        // invalidate the saved group catalog
+        clearLastCompleteGroupCatalog();
+        String host = server.getHostName();
+        throw new IOException("could not get entities from " + host, ne);
+      }
+    }
+
+    // all servers were able to successfully update the catalog: do a push
+    lastCompleteGroupCatalog.resolveForeignSecurityPrincipals(
+        allNewOrUpdatedEntities);
+    Map<GroupPrincipal, List<Principal>> groups =
+        lastCompleteGroupCatalog.makeDefs(allNewOrUpdatedEntities);
+    pusher.pushGroupDefinitions(groups, CASE_SENSITIVITY);
+    lastCompleteGroupCatalog.members.clear();
+  }
+
   // don't expose the <code>lastCompleteGroupCatalog</code> field, but do allow
   // tests to clear it
   @VisibleForTesting
@@ -357,9 +370,14 @@
     }
 
     @VisibleForTesting
-    void readEverythingFrom(AdServer server) throws InterruptedNamingException {
-      // TODO(myk): Phase II: indicate whether or not this search should
-      // include members
+    void readEverythingFrom(AdServer server, boolean includeMembers)
+        throws InterruptedNamingException {
+      final String[] nonMemberAttributes = new String[] { "uSNChanged",
+          "sAMAccountName", "objectGUID;binary", "objectSid;binary",
+          "userPrincipalName", "primaryGroupId", "userAccountControl" };
+      final String[] allAttributes = Arrays.copyOf(nonMemberAttributes,
+          nonMemberAttributes.length + 1);
+      allAttributes[nonMemberAttributes.length] = "member";
       log.log(Level.FINE, "Starting full crawl.");
       // LDAP_MATCHING_RULE_BIT_AND = 1.2.840.113556.1.4.803
       // and ADS_GROUP_TYPE_SECURITY_ENABLED = 2147483648.
@@ -367,9 +385,7 @@
           + "(groupType:1.2.840.113556.1.4.803:=2147483648))"
           + "(&(objectClass=user)(objectCategory=person)))",
           /*deleted=*/ false,
-          new String[] { "uSNChanged", "sAMAccountName", "objectGUID;binary",
-              "objectSid;binary", "userPrincipalName", "primaryGroupId",
-              "member", "userAccountControl" });
+          includeMembers ? allAttributes : nonMemberAttributes);
       // disabled groups handled later, in makeDefs()
       log.log(Level.FINE, "Ending full crawl - now starting processing.");
       processEntities(entities, server.getnETBIOSName());
@@ -392,14 +408,13 @@
      * @param previousHighestUSN last-crawled value of
      *     <code>server.getHighestCommittedUSN()</code>
      * <code>previousHighestUSN</code>.
-     */
-     /* TODO(myk): add
+     *
      * @return all instances of <code>AdEntity</code> that are users/groups that
      *     have a <code>uSNChanged</code> attribute newer than, or
      *     <code>Collections.emptySet()</code> when the cache had been stale.
      */
     @VisibleForTesting
-    void readUpdatesFrom(AdServer server, String previousServiceName,
+    Set<AdEntity> readUpdatesFrom(AdServer server, String previousServiceName,
         String previousInvocationId, long previousHighestUSN)
         throws InterruptedNamingException {
       // TODO(myk): Determine whether adaptors should include code to get/set
@@ -417,26 +432,23 @@
               + "partial updates support.",
               new Object[]{previousServiceName, currentServiceName});
         }
-        readEverythingFrom(server);
-        return;
-        //TODO(myk): return Collections.emptySet();
+        readEverythingFrom(server, /*includeMembers=*/ false);
+        return Collections.emptySet();
       }
       if (!currentInvocationId.equals(previousInvocationId)) {
         log.log(Level.WARNING,
             "Directory Controller {0} has been restored from backup.  "
             + "Performing full recrawl.", currentServiceName);
-        readEverythingFrom(server);
-        return;
-        //TODO(myk): return Collections.emptySet();
+        readEverythingFrom(server, /*includeMembers=*/ false);
+        return Collections.emptySet();
       }
       if (currentHighestUSN == previousHighestUSN) {
         log.log(Level.INFO, "No updates on server {0} -- no crawl invoked.",
             server);
-        return;
-        //TODO(myk): return Collections.emptySet();
+        return Collections.emptySet();
       }
       log.log(Level.INFO, "Attempting incremental crawl.");
-      incrementalCrawl(server, previousHighestUSN, currentHighestUSN);
+      return incrementalCrawl(server, previousHighestUSN, currentHighestUSN);
     }
 
     private void processEntities(Set<AdEntity> entities, String nETBIOSName) {
@@ -459,7 +471,6 @@
       log.log(Level.FINE, "Starting incremental crawl.");
       // LDAP_MATCHING_RULE_BIT_AND = 1.2.840.113556.1.4.803
       // and ADS_GROUP_TYPE_SECURITY_ENABLED = 2147483648.
-      // TODO(myk): Phase II: indicate that this search should exclude members
       Set<AdEntity> newOrModifiedEntities = server.search("(&(uSNChanged>="
           + (previousHighestUSN + 1) + ")(|(&(objectClass=group)"
           + "(groupType:1.2.840.113556.1.4.803:=2147483648))"
@@ -534,9 +545,7 @@
       log.log(Level.FINE, "# users added to all primary groups: {0}", nadds);
     }
 
-    void resolveForeignSecurityPrincipals() {
-      //TODO(myk) Phase II: pass in only the entities just read in (for an
-      // incremental search to only consider those entities)
+    void resolveForeignSecurityPrincipals(Set<AdEntity> entities) {
       int nGroups = 0;
       int nNullSid = 0;
       int nNullResolution = 0;
@@ -572,10 +581,8 @@
       log.log(Level.FINE, "#resolved: {0}", nResolved);
     }
 
-    Map<GroupPrincipal, List<Principal>> makeDefs() {
+    Map<GroupPrincipal, List<Principal>> makeDefs(Set<AdEntity> entities) {
       // Merge members with well known group members
-      //TODO(myk) Phase II: pass in only the entities just read in (for an
-      // incremental search to only consider those entities)
       Map<AdEntity, Set<String>> allMembers
           = new HashMap<AdEntity, Set<String>>(members);
       allMembers.putAll(wellKnownMembership);
diff --git a/test/com/google/enterprise/adaptor/ad/AdAdaptorTest.java b/test/com/google/enterprise/adaptor/ad/AdAdaptorTest.java
index 14b4228..8a3c507 100644
--- a/test/com/google/enterprise/adaptor/ad/AdAdaptorTest.java
+++ b/test/com/google/enterprise/adaptor/ad/AdAdaptorTest.java
@@ -50,10 +50,15 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import javax.naming.CommunicationException;
 import javax.naming.InterruptedNamingException;
+import javax.naming.NamingException;
+
 
 /** Test cases for {@link AdAdaptor}. */
 public class AdAdaptorTest {
+  public static final int BRIEF_DELAY_IN_MILLISECONDS = 25;
+
   @Test
   public void testNoop() {
   }
@@ -116,8 +121,13 @@
     groupCatalog.add(golden);
     groupCatalog.domain.clear();
     assertFalse(golden.equals(groupCatalog));
-
     assertFalse(golden.hashCode() == groupCatalog.hashCode());
+
+    almostClear(groupCatalog);
+    groupCatalog.add(golden);
+    assertEquals(golden, groupCatalog);
+    groupCatalog.wellKnownMembership.get(groupCatalog.everyone).add("fakeDN");
+    assertFalse(golden.equals(groupCatalog));
   }
 
   @Test
@@ -145,7 +155,7 @@
     AdServer adServer = new AdServer("localhost", ldapContext);
     adServer.initialize();
 
-    groupCatalog.readEverythingFrom(adServer);
+    groupCatalog.readEverythingFrom(adServer, /*includeMembers=*/ true);
 
     final AdEntity goldenEntity = new AdEntity("S-1-0-0",
         "cn=name\\ under,DN_for_default_naming_context");
@@ -199,7 +209,7 @@
     AdServer adServer = new AdServer("localhost", ldapContext);
     adServer.initialize();
 
-    groupCatalog.readEverythingFrom(adServer);
+    groupCatalog.readEverythingFrom(adServer, /*includeMembers=*/ true);
 
     AdEntity[] groupEntity = groupCatalog.entities.toArray(new AdEntity[0]);
     final AdEntity goldenEntity = groupEntity[0];
@@ -261,7 +271,7 @@
     adServer.initialize();
 
     groupCatalog.bySid.put("S-1-5-32-users", userGroup);
-    groupCatalog.readEverythingFrom(adServer);
+    groupCatalog.readEverythingFrom(adServer, /*includeMembers=*/ true);
 
     final AdEntity goldenEntity = new AdEntity("S-1-5-32-544",
         "cn=name\\ under,DN_for_default_naming_context", "users", "sam");
@@ -288,7 +298,7 @@
     assertTrue(golden.equals(groupCatalog));
 
     // make sure readEverythingFrom call is idempotent
-    groupCatalog.readEverythingFrom(adServer);
+    groupCatalog.readEverythingFrom(adServer, /*includeMembers=*/ true);
     assertTrue(golden.equals(groupCatalog));
   }
 
@@ -315,7 +325,7 @@
     AdServer adServer = new AdServer("localhost", ldapContext);
     adServer.initialize();
 
-    groupCatalog.readEverythingFrom(adServer);
+    groupCatalog.readEverythingFrom(adServer, /*includeMembers=*/ true);
 
     final AdEntity goldenEntity = new AdEntity("S-1-5-32-544",
         "cn=name\\ under,DN_for_default_naming_context", "users", "sam");
@@ -339,27 +349,24 @@
     assertTrue(golden.equals(groupCatalog));
 
     // make sure readEverythingFrom call is idempotent
-    groupCatalog.readEverythingFrom(adServer);
+    groupCatalog.readEverythingFrom(adServer, /*includeMembers=*/ true);
     assertTrue(golden.equals(groupCatalog));
   }
 
   @Test
   public void testFullCrawlVersusIncrementalCrawlFlow() throws Exception {
-    FakeAdaptor adAdaptor = new FakeAdaptor();
-    FakeCatalog groupCatalog = new FakeCatalog(defaultLocalizedStringMap(),
-        "example.com", false);
+    final FakeAdaptor adAdaptor = new FakeAdaptor();
+    final FakeCatalog groupCatalog = new FakeCatalog(
+        defaultLocalizedStringMap(), "example.com", false);
     MockLdapContext ldapContext = defaultMockLdapContext();
-    AdServer adServer = new AdServer("localhost", ldapContext);
+    final AdServer adServer = new AdServer("localhost", ldapContext);
     adServer.initialize();
-    // the following 3 lines initialize AdAdAptor.
-    AccumulatingDocIdPusher pusher = new AccumulatingDocIdPusher();
-    Map<String, String> configEntries = defaultConfig();
 
     groupCatalog.resetCrawlFlags();
     assertFalse(groupCatalog.ranFullCrawl());
     assertFalse(groupCatalog.ranIncrementalCrawl());
 
-    groupCatalog.readEverythingFrom(adServer);
+    groupCatalog.readEverythingFrom(adServer, /*includeMembers=*/ true);
     assertTrue(groupCatalog.ranFullCrawl());
     assertFalse(groupCatalog.ranIncrementalCrawl());
 
@@ -381,13 +388,44 @@
     assertFalse(groupCatalog.ranFullCrawl());
     assertFalse(groupCatalog.ranIncrementalCrawl());
 
-    // call Incremental call (only) when it's desired and the service name and
-    // invocation ID both match and the HighestUSN does not match.
     groupCatalog.resetCrawlFlags();
     groupCatalog.readUpdatesFrom(adServer, "ds_service_name", "0x0123456789abc",
-        12345677L);
+        12345677L); // earlier USN than previous run: does an incremental run
     assertFalse(groupCatalog.ranFullCrawl());
     assertTrue(groupCatalog.ranIncrementalCrawl());
+
+    // attempt to invoke incremental crawl during a full crawl
+    MoreFakeAdaptor adaptor = new MoreFakeAdaptor();
+    adaptor.resetCrawlFlags();
+    Thread fullThread = new FullCrawlThread(adaptor);
+    Thread incrementalThread = new IncrementalCrawlThread(adaptor);
+    fullThread.start();
+    try {
+      Thread.sleep(BRIEF_DELAY_IN_MILLISECONDS);
+    } catch (InterruptedException ex) {
+      throw new InterruptedNamingException(ex.getMessage());
+    }
+    incrementalThread.start();
+    fullThread.join();
+    incrementalThread.join();
+    assertTrue(adaptor.ranFullCrawl());
+    assertFalse(adaptor.ranIncrementalCrawl());
+
+    // invoke full crawl during an incremental crawl
+    incrementalThread = new IncrementalCrawlThread(adaptor);
+    fullThread = new FullCrawlThread(adaptor);
+    adaptor.resetCrawlFlags();
+    incrementalThread.start();
+    try {
+      Thread.sleep(BRIEF_DELAY_IN_MILLISECONDS);
+    } catch (InterruptedException ex) {
+      throw new InterruptedNamingException(ex.getMessage());
+    }
+    fullThread.start();
+    incrementalThread.join();
+    fullThread.join();
+    assertTrue(adaptor.ranFullCrawl());
+    assertTrue(adaptor.ranIncrementalCrawl());
   }
 
   @Test
@@ -436,16 +474,22 @@
     adServer.initialize();
 
     // first, do a full crawl
-    groupCatalog.readEverythingFrom(adServer);
+    Set<AdEntity> updateResults = groupCatalog.readUpdatesFrom(adServer, null,
+        "0x0123456789abc", 12345677L);
+    Set<AdEntity> goldenResults = Collections.emptySet();
+    assertEquals(goldenResults, updateResults);
 
     // now do an incremental crawl
-    groupCatalog.readUpdatesFrom(adServer, "ds_service_name", "0x0123456789abc",
-        12345677L);
+    updateResults = groupCatalog.readUpdatesFrom(adServer, "ds_service_name",
+        "0x0123456789abc", 12345677L);
 
     // extract incrementally-added user as one golden entity
     Set<AdEntity> incrementalUserSet = adServer.search(incrementalFilter, false,
         new String[] { "member", "objectSid;binary", "objectGUID;binary",
             "primaryGroupId", "sAMAccountName" });
+    goldenResults = incrementalUserSet;
+    assertEquals(goldenResults, updateResults);
+
     assertEquals(1, incrementalUserSet.size());
     for (AdEntity ae : incrementalUserSet) {
       assertFalse(ae.isGroup());
@@ -464,7 +508,6 @@
     AdEntity[] groupEntity = fullyCrawledGroupSet.toArray(new AdEntity[0]);
     final AdEntity goldenGroupEntity = new AdEntity("S-1-0-0",
         "cn=name\\ under,DN_for_default_naming_context");
-    goldenGroupEntity.getMembers().addAll(members);
 
     final Map<AdEntity, Set<String>> goldenMembers =
         new HashMap<AdEntity, Set<String>>();
@@ -487,6 +530,12 @@
       /*bySid*/ goldenSid,
       /*byDn*/ goldenDn,
       /*domain*/ goldenDomain);
+    assertEquals(golden, groupCatalog);
+
+    // do another incremental crawl with same results
+    updateResults = groupCatalog.readUpdatesFrom(adServer, "ds_service_name",
+        "0x0123456789abc", 12345677L);
+    assertEquals(goldenResults, updateResults);
   }
 
   @Test
@@ -526,7 +575,7 @@
     AdServer adServer = new AdServer("localhost", ldapContext);
     adServer.initialize();
 
-    groupCatalog.readEverythingFrom(adServer);
+    groupCatalog.readEverythingFrom(adServer, /*includeMembers=*/ true);
 
     // add two additional entities to test all branches of our method.
     // first -- a user
@@ -545,7 +594,7 @@
     assertTrue(wellKnownEntity.isWellKnown());
     groupCatalog.entities.add(wellKnownEntity);
 
-    groupCatalog.resolveForeignSecurityPrincipals();
+    groupCatalog.resolveForeignSecurityPrincipals(groupCatalog.entities);
 
     // extract original group entity
     Set<AdEntity> groupEntitySet = adServer.search(filter, false,
@@ -592,7 +641,7 @@
     assertTrue(golden.equals(groupCatalog));
 
     // make sure resolveForeignSecurityPrincipals call is idempotent
-    groupCatalog.resolveForeignSecurityPrincipals();
+    groupCatalog.resolveForeignSecurityPrincipals(groupCatalog.entities);
     assertTrue(golden.equals(groupCatalog));
   }
 
@@ -608,7 +657,7 @@
     AdServer adServer = new AdServer("localhost", ldapContext);
     adServer.initialize();
 
-    groupCatalog.readEverythingFrom(adServer);
+    groupCatalog.readEverythingFrom(adServer, /*includeMembers=*/ true);
 
     tweakGroupCatalogForMakeDefs(groupCatalog, adServer, false);
 
@@ -622,7 +671,7 @@
       golden.put(new GroupPrincipal("known_group", "example.com"),
           Collections.<Principal>emptyList());
     }
-    assertEquals(golden, groupCatalog.makeDefs());
+    assertEquals(golden, groupCatalog.makeDefs(groupCatalog.entities));
   }
 
   @Test
@@ -637,7 +686,7 @@
     AdServer adServer = new AdServer("localhost", ldapContext);
     adServer.initialize();
 
-    groupCatalog.readEverythingFrom(adServer);
+    groupCatalog.readEverythingFrom(adServer, /*includeMembers=*/ true);
 
     tweakGroupCatalogForMakeDefs(groupCatalog, adServer, true);
 
@@ -649,7 +698,7 @@
       golden.put(new GroupPrincipal("known_group", "example.com"),
           Collections.<Principal>emptyList());
     }
-    assertEquals(golden, groupCatalog.makeDefs());
+    assertEquals(golden, groupCatalog.makeDefs(groupCatalog.entities));
   }
 
   @Test
@@ -676,7 +725,7 @@
     AdServer adServer = new AdServer("localhost", ldapContext);
     adServer.initialize();
 
-    groupCatalog.readEverythingFrom(adServer);
+    groupCatalog.readEverythingFrom(adServer, /*includeMembers=*/ true);
 
     tweakGroupCatalogForMakeDefs(groupCatalog, adServer, false);
     // now replace the parent group with a well-known one
@@ -687,7 +736,7 @@
       if ("cn=name\\ under,DN_for_default_naming_context".equals(entity.getDn())
           && (entity.getMembers().size() == 4)) {
         formerGroup = entity;
-        for (String member: entity.getMembers()) {
+        for (String member : entity.getMembers()) {
           replacementGroup.getMembers().add(member);
         }
         // trigger the IllegalArgumentException paths by adding empty-named
@@ -726,7 +775,7 @@
       golden.put(new GroupPrincipal("known_group", "example.com"),
           Collections.<Principal>emptyList());
     }
-    assertEquals(golden, groupCatalog.makeDefs());
+    assertEquals(golden, groupCatalog.makeDefs(groupCatalog.entities));
   }
 
   // Tests for the methods of the outer class
@@ -759,7 +808,8 @@
     configEntries.put("ad.ldapReadTimeoutSecs", "");
     configEntries.put("server.port", "5680");
     configEntries.put("server.dashboardPort", "5681");
-    pushGroupDefinitions(adAdaptor, configEntries, pusher, /*fullPush=*/ true);
+    pushGroupDefinitions(adAdaptor, configEntries, pusher, /*fullPush=*/ true,
+        /*init=*/ true);
     Map<GroupPrincipal, Collection<Principal>> results = pusher.getGroups();
     // the above (eventually) calls AdAdaptor.init() with the specified config.
   }
@@ -784,7 +834,8 @@
     configEntries.put("ad.ldapReadTimeoutSecs", "0");
     configEntries.put("server.port", "5680");
     configEntries.put("server.dashboardPort", "5681");
-    pushGroupDefinitions(adAdaptor, configEntries, pusher, /*fullPush=*/ true);
+    pushGroupDefinitions(adAdaptor, configEntries, pusher, /*fullPush=*/ true,
+        /*init=*/ true);
     Map<GroupPrincipal, Collection<Principal>> results = pusher.getGroups();
     // the above (eventually) calls AdAdaptor.init() with the specified config.
   }
@@ -875,7 +926,8 @@
     AdAdaptor adAdaptor = new FakeAdaptor();
     AccumulatingDocIdPusher pusher = new AccumulatingDocIdPusher();
     Map<String, String> configEntries = defaultConfig();
-    pushGroupDefinitions(adAdaptor, configEntries, pusher, /*fullPush=*/ true);
+    pushGroupDefinitions(adAdaptor, configEntries, pusher, /*fullPush=*/ true,
+        /*init=*/ true);
     Map<GroupPrincipal, Collection<Principal>> results = pusher.getGroups();
 
     final Map<GroupPrincipal, Collection<Principal>> goldenGroups =
@@ -893,12 +945,14 @@
     assertEquals(goldenGroups, results);
 
     // make sure pushGroupDefinitions call is idempotent
-    pushGroupDefinitions(adAdaptor, configEntries, pusher, /*fullPush=*/ true);
+    pushGroupDefinitions(adAdaptor, configEntries, pusher, /*fullPush=*/ true,
+        /*init=*/ false);
     results = pusher.getGroups();
     assertEquals(goldenGroups, results);
 
     // even when doing an incremental push
-    pushGroupDefinitions(adAdaptor, configEntries, pusher, /*fullPush=*/ false);
+    pushGroupDefinitions(adAdaptor, configEntries, pusher, /*fullPush=*/ false,
+        /*init=*/ false);
     results = pusher.getGroups();
     assertEquals(goldenGroups, results);
   }
@@ -919,18 +973,25 @@
           fail("Could not create LdapContext:" + e);
         }
         return new AdServer(host, ldapContext) {
+          int timesSearchCalled = 0;
+          int timesEnsureConnectionCalled = 0;
           @Override
           public Set<AdEntity> search(String filter, boolean deleted,
               String[] attributes) throws InterruptedNamingException {
-            if (errorFilter.equals(filter)) {
-              throw new InterruptedNamingException("Catch me if you can!");
+            if (errorFilter.equals(filter) && timesSearchCalled++ == 0) {
+              throw new InterruptedNamingException("First exception");
             } else {
               return super.search(filter, deleted, attributes);
             }
           }
           @Override
-          void recreateLdapContext() {
-            // leave ldapContext unchanged
+          public void ensureConnectionIsCurrent()
+              throws CommunicationException, NamingException {
+            if (timesEnsureConnectionCalled++ < 9) {
+              super.ensureConnectionIsCurrent();
+            } else {
+              throw new InterruptedNamingException("Second exception");
+            }
           }
         };
       }
@@ -939,14 +1000,39 @@
     Map<String, String> configEntries = defaultConfig();
     try {
       pushGroupDefinitions(adAdaptor, configEntries, pusher,
-          /*fullPush=*/ true);
+          /*fullPush=*/ true, /*init=*/ true);
       fail("Did not catch expected IOException.");
     } catch (IOException ioe) {
-      assertTrue(ioe.getCause().getMessage().equals("Catch me if you can!"));
+      assertTrue(ioe.getCause().getMessage().equals("First exception"));
+    }
+    // repeat for getModifiedDocIds
+    try {
+      adAdaptor.clearLastCompleteGroupCatalog();
+      pushGroupDefinitions(adAdaptor, configEntries, pusher,
+          /*fullPush=*/ false, /*init=*/ true);
+      fail("Did not catch expected IOException.");
+    } catch (IOException ioe) {
+      assertTrue(ioe.getCause().getMessage().equals("First exception"));
+      boolean reachedLastCall = false;
+      try {
+        /* second call fills the catalog */
+        pushGroupDefinitions(adAdaptor, configEntries, pusher,
+            /*fullPush=*/ false, /*init=*/ false);
+        /* third call does the push without any exception */
+        pushGroupDefinitions(adAdaptor, configEntries, pusher,
+            /*fullPush=*/ false, /*init=*/ false);
+        reachedLastCall = true;
+        /* last call redoes the push (and catches the second exception) */
+        pushGroupDefinitions(adAdaptor, configEntries, pusher,
+            /*fullPush=*/ false, /*init=*/ false);
+        fail("Did not catch second expected IOException.");
+      } catch (IOException ioe2) {
+        assertTrue(ioe2.getCause().getMessage().equals("Second exception"));
+        assertTrue(reachedLastCall);
+      }
     }
   }
 
-
   public static byte[] hexStringToByteArray(String s) {
     return AdServerTest.hexStringToByteArray(s);
   }
@@ -962,7 +1048,7 @@
     return strings;
   }
 
-  private Map<String, String> defaultConfig() {
+  private static Map<String, String> defaultConfig() {
     Map<String, String> strings = new HashMap<String, String>();
     strings.put("gsa.hostname", "localhost");
     strings.put("ad.servers", "server1");
@@ -1090,8 +1176,10 @@
 
   public static void pushGroupDefinitions(AdAdaptor adaptor,
       Map<String, String> configEntries, final DocIdPusher pusher,
-      boolean fullPush) throws Exception {
-    initializeAdaptorConfig(adaptor, configEntries);
+      boolean fullPush, boolean init) throws Exception {
+    if (init) {
+      initializeAdaptorConfig(adaptor, configEntries);
+    }
     if (fullPush) {
       adaptor.getDocIds(pusher);
     } else {
@@ -1205,12 +1293,23 @@
         fail("Could not create LdapContext:" + e);
       }
       return new AdServer(host, ldapContext) {
+        private long highestCommittedUSN = 12345678;
         @Override
         void recreateLdapContext() {
           // leave ldapContext unchanged
         }
+        @Override
+        public long getHighestCommittedUSN() {
+          // always indicate new items available to sync
+          return ++highestCommittedUSN;
+        }
       };
     }
+    @Override
+    void getModifiedDocIdsHelper(DocIdPusher pusher)
+        throws InterruptedException, IOException {
+      // do nothing
+    }
   };
 
   /** Simple Fake of GroupCatalog that tracks calls to full/incremental crawl */
@@ -1224,15 +1323,24 @@
     }
 
     @Override
-    void readEverythingFrom(AdServer server) throws InterruptedNamingException {
-      ranIncrementalCrawl = false;
+    void readEverythingFrom(AdServer server, boolean unused)
+        throws InterruptedNamingException {
+      try {
+        Thread.sleep(BRIEF_DELAY_IN_MILLISECONDS * 2);
+      } catch (InterruptedException ex) {
+        throw new InterruptedNamingException(ex.getMessage());
+      }
       ranFullCrawl = true;
     }
 
     @Override
     Set<AdEntity> incrementalCrawl(AdServer server, long previousHighestUSN,
         long currentHighestUSN) throws InterruptedNamingException {
-      ranFullCrawl = false;
+      try {
+        Thread.sleep(BRIEF_DELAY_IN_MILLISECONDS * 2);
+      } catch (InterruptedException ex) {
+        throw new InterruptedNamingException(ex.getMessage());
+      }
       ranIncrementalCrawl = true;
       return Collections.emptySet();
     }
@@ -1250,4 +1358,76 @@
       return ranFullCrawl;
     }
   }
+
+  /** An even "faker" version of AdAdaptor that only tests the mutex */
+  public class MoreFakeAdaptor extends FakeAdaptor {
+    private boolean ranFullCrawl;
+    private boolean ranIncrementalCrawl;
+
+    void resetCrawlFlags() {
+      ranFullCrawl = false;
+      ranIncrementalCrawl = false;
+    }
+
+    public boolean ranIncrementalCrawl() {
+      return ranIncrementalCrawl;
+    }
+
+    public boolean ranFullCrawl() {
+      return ranFullCrawl;
+    }
+
+    @Override
+    void getModifiedDocIdsHelper(DocIdPusher pusher)
+        throws InterruptedException, IOException {
+      ranIncrementalCrawl = true;
+    }
+
+    @Override
+    AdAdaptor.GroupCatalog makeFullCatalog() throws InterruptedException,
+        IOException {
+      try {
+        Thread.sleep(BRIEF_DELAY_IN_MILLISECONDS * 2);
+      } catch (InterruptedException ex) {
+        throw new RuntimeException(ex);
+      }
+      ranFullCrawl = true;
+      return new AdAdaptor.GroupCatalog(defaultLocalizedStringMap(),
+          "example.com", /*feedBuiltinGroups=*/ true);
+    }
+  };
+
+  /** generates a thread that invokes a (fake!) full crawl */
+  private static class FullCrawlThread extends Thread {
+    FullCrawlThread (final AdAdaptor adAdaptor) {
+      super((new Runnable() {
+        @Override
+        public void run() {
+          try {
+            AccumulatingDocIdPusher pusher = new AccumulatingDocIdPusher();
+            adAdaptor.getDocIds(pusher);
+          } catch (Exception ex) {
+            throw new RuntimeException(ex);
+          }
+        }
+      }));
+    }
+  }
+
+  /** generates a thread that invokes a (fake!) incremental crawl */
+  private static class IncrementalCrawlThread extends Thread {
+    IncrementalCrawlThread (final AdAdaptor adAdaptor) {
+      super((new Runnable() {
+        @Override
+        public void run() {
+          try {
+            AccumulatingDocIdPusher pusher = new AccumulatingDocIdPusher();
+            adAdaptor.getModifiedDocIds(pusher);
+          } catch (Exception ex) {
+            throw new RuntimeException(ex);
+          }
+        }
+      }));
+    }
+  }
 }