Send async records during shutdown
Currently, the display URLs and ACLs that are sent via AsyncDocIdSender
are lost if the Adaptor is shutdown before they were sent, which is
likely because we sleep waiting for a batch. Now we push those lingering
records on shutdown.
diff --git a/src/com/google/enterprise/adaptor/AsyncDocIdSender.java b/src/com/google/enterprise/adaptor/AsyncDocIdSender.java
index 3f49a08..d1342ad 100644
--- a/src/com/google/enterprise/adaptor/AsyncDocIdSender.java
+++ b/src/com/google/enterprise/adaptor/AsyncDocIdSender.java
@@ -74,8 +74,8 @@
private class WorkerRunnable implements Runnable {
@Override
public void run() {
+ List<DocIdSender.Item> list = new ArrayList<DocIdSender.Item>();
try {
- List<DocIdSender.Item> list = new ArrayList<DocIdSender.Item>();
while (true) {
BlockingQueueBatcher.take(
queue, list, maxBatchSize, maxLatency, maxLatencyUnit);
@@ -83,8 +83,24 @@
list.clear();
}
} catch (InterruptedException ex) {
- log.log(Level.FINE, "AsyncDocIdSender worker shutdown", ex);
- Thread.currentThread().interrupt();
+ log.log(Level.FINE, "AsyncDocIdSender worker shutting down", ex);
+ try {
+ // We are shutting down, but there are likely items that haven't been
+ // sent because of maxLatency, so we try to send those now.
+ // If we were interrupted between calls to take(), then take() may
+ // have interrupted itself before draining the queue; might as well
+ // send everything that was put on the queue.
+ queue.drainTo(list);
+ itemPusher.pushItems(list.iterator(),
+ ExceptionHandlers.noRetryHandler());
+ } catch (InterruptedException ex2) {
+ // Ignore, because we are going to interrupt anyway. This should
+ // actually not happen because of the ExceptionHandler we are using,
+ // but the precise behavior of pushItems() may change in the future.
+ } finally {
+ log.log(Level.FINE, "AsyncDocIdSender worker shutdown", ex);
+ Thread.currentThread().interrupt();
+ }
}
}
}
diff --git a/src/com/google/enterprise/adaptor/ExceptionHandlers.java b/src/com/google/enterprise/adaptor/ExceptionHandlers.java
index 69125b5..85d6cb9 100644
--- a/src/com/google/enterprise/adaptor/ExceptionHandlers.java
+++ b/src/com/google/enterprise/adaptor/ExceptionHandlers.java
@@ -22,6 +22,8 @@
public class ExceptionHandlers {
private static final ExceptionHandler defaultHandler
= exponentialBackoffHandler(12, 5, TimeUnit.SECONDS);
+ private static final ExceptionHandler noRetryHandler
+ = exponentialBackoffHandler(-1, 0, TimeUnit.SECONDS);
// Prevent instantiation.
private ExceptionHandlers() {}
@@ -47,6 +49,13 @@
maximumTries, initialSleepDuration, initialSleepUnit);
}
+ /**
+ * Create a handler that always returns {@code false}, causing no retries.
+ */
+ public static ExceptionHandler noRetryHandler() {
+ return noRetryHandler;
+ }
+
private static class ExponentialBackoffExceptionHandler
implements ExceptionHandler {
private final int maximumTries;
@@ -69,5 +78,11 @@
sleepUnit.sleep(sleepDuration * ntries);
return true;
}
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + "(" + maximumTries + ","
+ + sleepDuration + " " + sleepUnit + ")";
+ }
}
}
diff --git a/test/com/google/enterprise/adaptor/AsyncDocIdSenderTest.java b/test/com/google/enterprise/adaptor/AsyncDocIdSenderTest.java
index 5b109b3..81e1318 100644
--- a/test/com/google/enterprise/adaptor/AsyncDocIdSenderTest.java
+++ b/test/com/google/enterprise/adaptor/AsyncDocIdSenderTest.java
@@ -56,7 +56,7 @@
}
@Test(timeout = 100)
- public void testPush() throws Exception {
+ public void testPushOfFullBatch() throws Exception {
AsyncDocIdSender sender = new AsyncDocIdSender(pusher, 3 /* maxBatchSize */,
1, TimeUnit.SECONDS, 3 /* queueCapacity */);
final List<DocIdPusher.Record> golden = Arrays.asList(
@@ -79,6 +79,42 @@
assertEquals(golden, pusher.getItems());
}
+ @Test(timeout = 100)
+ public void testPushOnInterrupt() throws Exception {
+ AsyncDocIdSender sender = new AsyncDocIdSender(pusher, 2 /* maxBatchSize */,
+ 1, TimeUnit.SECONDS, 1 /* queueCapacity */);
+ final List<DocIdPusher.Record> golden = Arrays.asList(
+ new DocIdPusher.Record.Builder(new DocId("1")).build());
+ sender.asyncPushItem(golden.get(0));
+ Thread workerThread = new Thread(sender.worker());
+ workerThread.start();
+ // We want to trigger the code that waits on maxLatency.
+ Thread.sleep(10);
+ workerThread.interrupt();
+ workerThread.join();
+ assertEquals(golden, pusher.getItems());
+ }
+
+ @Test(timeout = 100)
+ public void testPushOnInterruptDrainQueue() throws Exception {
+ AsyncDocIdSender sender = new AsyncDocIdSender(pusher, 2 /* maxBatchSize */,
+ 1, TimeUnit.SECONDS, 1 /* queueCapacity */);
+ final List<DocIdPusher.Record> golden = Arrays.asList(
+ new DocIdPusher.Record.Builder(new DocId("1")).build());
+ sender.asyncPushItem(golden.get(0));
+ final Runnable worker = sender.worker();
+ Thread workerThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ Thread.currentThread().interrupt();
+ worker.run();
+ }
+ });
+ workerThread.start();
+ workerThread.join();
+ assertEquals(golden, pusher.getItems());
+ }
+
private static class AccumulatingPusher
implements AsyncDocIdSender.ItemPusher {
private final List<DocIdSender.Item> items
diff --git a/test/com/google/enterprise/adaptor/ExceptionHandlersTest.java b/test/com/google/enterprise/adaptor/ExceptionHandlersTest.java
new file mode 100644
index 0000000..72fad80
--- /dev/null
+++ b/test/com/google/enterprise/adaptor/ExceptionHandlersTest.java
@@ -0,0 +1,42 @@
+// Copyright 2013 Google Inc. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.google.enterprise.adaptor;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+
+/** Tests for {@link ExceptionHandlers}. */
+public class ExceptionHandlersTest {
+ @Test
+ public void testBackoffToString() {
+ ExceptionHandler handler
+ = ExceptionHandlers.exponentialBackoffHandler(1, 2, TimeUnit.MINUTES);
+ assertEquals("ExponentialBackoffExceptionHandler(1,2 MINUTES)",
+ handler.toString());
+ }
+
+ @Test
+ public void testNoRetry() throws Exception {
+ ExceptionHandler handler = ExceptionHandlers.noRetryHandler();
+ // Make sure it doesn't throw InterruptedException
+ Thread.currentThread().interrupt();
+ assertFalse(handler.handleException(new RuntimeException(), 1));
+ // Clear flag
+ Thread.currentThread().interrupted();
+ }
+}