Adding AsyncDocIdPusher interface
Adding the interface AsyncDocIdPusher that exposes the functionality
of the AsyncPusher interface to adaptors.
diff --git a/src/com/google/enterprise/adaptor/AdaptorContext.java b/src/com/google/enterprise/adaptor/AdaptorContext.java
index 295c469..9392dc6 100644
--- a/src/com/google/enterprise/adaptor/AdaptorContext.java
+++ b/src/com/google/enterprise/adaptor/AdaptorContext.java
@@ -35,6 +35,12 @@
public DocIdPusher getDocIdPusher();
/**
+ * Callback object for asynchronously pushing {@code DocId}s to the GSA
+ * at any time.
+ */
+ public AsyncDocIdPusher getAsyncDocIdPusher();
+
+ /**
* A way to construct URIs from DocIds.
*/
public DocIdEncoder getDocIdEncoder();
diff --git a/src/com/google/enterprise/adaptor/AsyncDocIdPusher.java b/src/com/google/enterprise/adaptor/AsyncDocIdPusher.java
new file mode 100644
index 0000000..8b310db
--- /dev/null
+++ b/src/com/google/enterprise/adaptor/AsyncDocIdPusher.java
@@ -0,0 +1,46 @@
+// Copyright 2014 Google Inc. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.google.enterprise.adaptor;
+
+/**
+ * Interface that allows asynchronous at-will pushing of {@code DocId}s
+ * to the GSA.
+ */
+public interface AsyncDocIdPusher {
+ /**
+ * Push a {@code DocId} asynchronously to the GSA. The {@code DocId} is
+ * enqueued and sent in the next batch to the GSA. If the queue is full,
+ * then the item will be dropped and a warning will be logged.
+ */
+ public void pushDocId(DocId docId);
+
+ /**
+ * Push a {@code Record} asynchronously to the GSA. The {@code Record}
+ * is enqueued and sent in the next batch to the GSA. If the queue is full,
+ * then the item will be dropped and a warning will be logged.
+ */
+ public void pushRecord(DocIdPusher.Record record);
+
+ /**
+ * Push a named resource asynchronously to the GSA. The named resource is
+ * enqueued and sent in the next batch to the GSA. If the queue is full,
+ * then the item will be dropped and a warning will be logged.
+ *
+ * <p>Named resources are {@code DocId}s without any content or metadata,
+ * that only exist for ACL inheritance. These {@code DocId} will never be
+ * visible to the user and have no meaning outside of ACL processing.
+ */
+ public void pushNamedResource(DocId docId, Acl acl);
+}
diff --git a/src/com/google/enterprise/adaptor/AsyncDocIdSender.java b/src/com/google/enterprise/adaptor/AsyncDocIdSender.java
index 5ca176c..ce59de9 100644
--- a/src/com/google/enterprise/adaptor/AsyncDocIdSender.java
+++ b/src/com/google/enterprise/adaptor/AsyncDocIdSender.java
@@ -23,7 +23,8 @@
* Asynchronous sender of feed items. {@code worker()} must be started by client
* and running for items to be sent.
*/
-class AsyncDocIdSender implements DocumentHandler.AsyncPusher {
+class AsyncDocIdSender implements AsyncDocIdPusher,
+ DocumentHandler.AsyncPusher {
private static final Logger log
= Logger.getLogger(AsyncDocIdSender.class.getName());
@@ -67,6 +68,21 @@
}
}
+ @Override
+ public void pushDocId(DocId docId) {
+ asyncPushItem(new DocIdPusher.Record.Builder(docId).build());
+ }
+
+ @Override
+ public void pushRecord(DocIdPusher.Record record) {
+ asyncPushItem(record);
+ }
+
+ @Override
+ public void pushNamedResource(DocId docId, Acl acl) {
+ asyncPushItem(new DocIdSender.AclItem(docId, null, acl));
+ }
+
public Runnable worker() {
return worker;
}
diff --git a/src/com/google/enterprise/adaptor/DocIdSender.java b/src/com/google/enterprise/adaptor/DocIdSender.java
index 38ad69b..bf1414f 100644
--- a/src/com/google/enterprise/adaptor/DocIdSender.java
+++ b/src/com/google/enterprise/adaptor/DocIdSender.java
@@ -391,7 +391,25 @@
public Acl getAcl() {
return acl;
}
-
+
+ @Override
+ public boolean equals(Object o) {
+ boolean same = false;
+ if (null != o && this.getClass().equals(o.getClass())) {
+ AclItem other = (AclItem) o;
+ same = id.equals(other.id) && acl.equals(other.acl)
+ && (docIdFragment == null ? other.docIdFragment == null :
+ docIdFragment.equals(other.docIdFragment));
+ }
+ return same;
+ }
+
+ @Override
+ public int hashCode() {
+ Object members[] = new Object[] { id, acl, docIdFragment };
+ return Arrays.hashCode(members);
+ }
+
@Override
public String toString() {
return "AclItem(" + id + "," + docIdFragment + "," + acl + ")";
diff --git a/src/com/google/enterprise/adaptor/GsaCommunicationHandler.java b/src/com/google/enterprise/adaptor/GsaCommunicationHandler.java
index fabb141..a02dac2 100644
--- a/src/com/google/enterprise/adaptor/GsaCommunicationHandler.java
+++ b/src/com/google/enterprise/adaptor/GsaCommunicationHandler.java
@@ -90,6 +90,7 @@
private ExecutorService backgroundExecutor;
private DocIdCodec docIdCodec;
private DocIdSender docIdSender;
+ private AsyncDocIdSender asyncDocIdSender;
private HttpServerScope dashboardScope;
private Dashboard dashboard;
private SensitiveValueCodec secureValueCodec;
@@ -192,6 +193,10 @@
config.isGsa70AuthMethodWorkaroundEnabled());
docIdSender
= new DocIdSender(fileMaker, fileSender, journal, config, adaptor);
+ asyncDocIdSender = new AsyncDocIdSender(docIdSender,
+ config.getFeedMaxUrls() /* batch size */,
+ 5 /* max latency */, TimeUnit.MINUTES,
+ 2 * config.getFeedMaxUrls() /* queue size */);
// Could be done during start(), but then we would have to save
// dashboardServer and contextPrefix.
@@ -252,10 +257,6 @@
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("schedule")
.build());
Watchdog watchdog = new Watchdog(scheduleExecutor);
- AsyncDocIdSender asyncDocIdSender = new AsyncDocIdSender(docIdSender,
- config.getFeedMaxUrls() /* batch size */,
- 5 /* max latency */, TimeUnit.MINUTES,
- 2 * config.getFeedMaxUrls() /* queue size */);
// The cachedThreadPool implementation created here is considerably better
// than using ThreadPoolExecutor. ThreadPoolExecutor does not create threads
@@ -806,6 +807,11 @@
}
@Override
+ public AsyncDocIdPusher getAsyncDocIdPusher() {
+ return asyncDocIdSender;
+ }
+
+ @Override
public DocIdEncoder getDocIdEncoder() {
return docIdCodec;
}
diff --git a/src/com/google/enterprise/adaptor/WrapperAdaptor.java b/src/com/google/enterprise/adaptor/WrapperAdaptor.java
index 9bdfb64..04decfd 100644
--- a/src/com/google/enterprise/adaptor/WrapperAdaptor.java
+++ b/src/com/google/enterprise/adaptor/WrapperAdaptor.java
@@ -427,6 +427,11 @@
}
@Override
+ public AsyncDocIdPusher getAsyncDocIdPusher() {
+ return context.getAsyncDocIdPusher();
+ }
+
+ @Override
public DocIdEncoder getDocIdEncoder() {
return context.getDocIdEncoder();
}
diff --git a/test/com/google/enterprise/adaptor/AsyncDocIdSenderTest.java b/test/com/google/enterprise/adaptor/AsyncDocIdSenderTest.java
index 3b8bb69..445961d 100644
--- a/test/com/google/enterprise/adaptor/AsyncDocIdSenderTest.java
+++ b/test/com/google/enterprise/adaptor/AsyncDocIdSenderTest.java
@@ -115,6 +115,55 @@
assertEquals(golden, pusher.getItems());
}
+ @Test
+ public void testPushDocId() throws Exception {
+ AsyncDocIdSender sender = new AsyncDocIdSender(pusher, 3, 1,
+ TimeUnit.SECONDS, 3);
+ DocId docId = new DocId("1");
+ final List<DocIdPusher.Record> golden = Arrays.asList(
+ new DocIdPusher.Record.Builder(docId).build());
+ sender.pushDocId(docId);
+ verifyPushedItems(sender, golden);
+ }
+
+ @Test
+ public void testPushRecord() throws Exception {
+ AsyncDocIdSender sender = new AsyncDocIdSender(pusher, 3, 1,
+ TimeUnit.SECONDS, 3);
+ DocIdPusher.Record record =
+ new DocIdPusher.Record.Builder(new DocId("1")).build();
+ final List<DocIdPusher.Record> golden = Arrays.asList(record);
+ sender.pushRecord(record);
+ verifyPushedItems(sender, golden);
+ }
+
+ @Test
+ public void testPushNamedResource() throws Exception {
+ AsyncDocIdSender sender = new AsyncDocIdSender(pusher, 3, 1,
+ TimeUnit.SECONDS, 3);
+ DocId docId = new DocId("1");
+ Acl acl = new Acl.Builder().setInheritFrom(new DocId("2")).build();
+ final List<DocIdSender.AclItem> golden = Arrays.asList(
+ new DocIdSender.AclItem(docId, null, acl));
+ sender.pushNamedResource(docId, acl);
+ verifyPushedItems(sender, golden);
+ }
+
+ private void verifyPushedItems(AsyncDocIdSender sender,
+ List<? extends DocIdPusher.Item> expected) throws Exception {
+ final Runnable worker = sender.worker();
+ Thread workerThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ Thread.currentThread().interrupt();
+ worker.run();
+ }
+ });
+ workerThread.start();
+ workerThread.join();
+ assertEquals(expected, pusher.getItems());
+ }
+
private static class AccumulatingPusher
implements AsyncDocIdSender.ItemPusher {
private final List<DocIdPusher.Item> items
diff --git a/test/com/google/enterprise/adaptor/GsaCommunicationHandlerTest.java b/test/com/google/enterprise/adaptor/GsaCommunicationHandlerTest.java
index a673980..5e5b239 100644
--- a/test/com/google/enterprise/adaptor/GsaCommunicationHandlerTest.java
+++ b/test/com/google/enterprise/adaptor/GsaCommunicationHandlerTest.java
@@ -77,6 +77,7 @@
AdaptorContext context = gsa.setup(mockServer, mockServer, null);
assertSame(config, context.getConfig());
assertNotNull(context.getDocIdPusher());
+ assertNotNull(context.getAsyncDocIdPusher());
assertNotNull(context.getDocIdEncoder());
assertNotNull(context.getSensitiveValueDecoder());
ExceptionHandler originalHandler