blob: 3c7d7321b059a39461eb7ce606ecea32425db1f7 [file] [log] [blame]
// Copyright 2011 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.enterprise.adaptor;
import java.io.IOException;
import java.util.*;
import java.util.logging.*;
/**
* All logic for sending DocIds to the GSA from an adaptor.
*/
class DocIdSender extends AbstractDocIdPusher
implements AsyncDocIdSender.ItemPusher {
private static final Logger log
= Logger.getLogger(DocIdSender.class.getName());
private final GsaFeedFileMaker fileMaker;
private final GsaFeedFileSender fileSender;
private final Journal journal;
private final Config config;
private final Adaptor adaptor;
private final ExceptionHandler defaultErrorHandler
= ExceptionHandlers.defaultHandler();
public DocIdSender(GsaFeedFileMaker fileMaker, GsaFeedFileSender fileSender,
Journal journal, Config config, Adaptor adaptor) {
this.fileMaker = fileMaker;
this.fileSender = fileSender;
this.journal = journal;
this.config = config;
this.adaptor = adaptor;
}
/**
* Calls {@link Adaptor#getDocIds}. This method blocks until all DocIds are
* sent or retrying failed.
*/
public void pushFullDocIdsFromAdaptor(ExceptionHandler handler)
throws InterruptedException {
if (handler == null) {
throw new NullPointerException();
}
log.info("Beginning getDocIds");
journal.recordFullPushStarted();
for (int ntries = 1;; ntries++) {
boolean keepGoing = true;
try {
adaptor.getDocIds(this);
break; // Success
} catch (InterruptedException ex) {
// Stop early.
journal.recordFullPushInterrupted();
log.info("Interrupted. Aborted getDocIds");
throw ex;
} catch (Exception ex) {
log.log(Level.WARNING, "Exception during getDocIds", ex);
keepGoing = handler.handleException(ex, ntries);
}
if (keepGoing) {
log.log(Level.INFO, "Trying again... Number of attemps: {0}", ntries);
} else {
journal.recordFullPushFailed();
log.warning("Gave up. Failed getDocIds");
return; // Bail
}
}
journal.recordFullPushSuccessful();
log.info("Completed getDocIds");
}
/**
* Calls {@link Adaptor#getModifiedDocIds}. This method blocks until all
* DocIds are sent or retrying failed.
*/
public void pushIncrementalDocIdsFromAdaptor(ExceptionHandler handler)
throws InterruptedException {
if (handler == null) {
throw new NullPointerException();
}
log.info("Beginning getModifiedDocIds");
journal.recordIncrementalPushStarted();
for (int ntries = 1;; ntries++) {
boolean keepGoing = true;
try {
((PollingIncrementalAdaptor) adaptor).getModifiedDocIds(this);
break; // Success
} catch (InterruptedException ex) {
// Stop early.
journal.recordIncrementalPushInterrupted();
log.info("Interrupted. Aborted getModifiedDocIds");
throw ex;
} catch (Exception ex) {
log.log(Level.WARNING, "Exception during getModifiedDocIds", ex);
keepGoing = handler.handleException(ex, ntries);
}
if (keepGoing) {
log.log(Level.INFO, "Trying again... Number of attemps: {0}", ntries);
} else {
journal.recordIncrementalPushFailed();
log.warning("Gave up. Failed getModifiedDocIds");
return; // Bail
}
}
journal.recordIncrementalPushSuccessful();
log.info("Completed getModifiedDocIds");
}
/**
* Makes and sends metadata-and-url feed files to GSA. Generally, you should
* use {@link #pushDocIds()} instead of this method. However, if you want to
* push just a few DocIds to the GSA manually, this is the method to use.
* This method blocks until all DocIds are sent or retrying failed.
*/
@Override
public Record pushRecords(Iterable<Record> items, ExceptionHandler handler)
throws InterruptedException {
return pushItems(items.iterator(), handler);
}
@Override
public DocId pushNamedResources(Map<DocId, Acl> resources,
ExceptionHandler handler)
throws InterruptedException {
List<AclItem> acls = new ArrayList<AclItem>(resources.size());
for (Map.Entry<DocId, Acl> me : resources.entrySet()) {
acls.add(new AclItem(me.getKey(), me.getValue()));
}
AclItem acl = pushItems(acls.iterator(), handler);
return acl == null ? null : acl.getDocId();
}
@Override
public <T extends Item> T pushItems(Iterator<T> items,
ExceptionHandler handler) throws InterruptedException {
log.log(Level.INFO, "Pushing items");
if (handler == null) {
handler = defaultErrorHandler;
}
boolean firstBatch = true;
final int max = config.getFeedMaxUrls();
while (items.hasNext()) {
List<T> batch = new ArrayList<T>();
for (int j = 0; j < max; j++) {
if (!items.hasNext()) {
break;
}
batch.add(items.next());
}
log.log(Level.INFO, "Pushing group of {0} items", batch.size());
T failedId;
try {
failedId = pushSizedBatchOfRecords(batch, handler);
} catch (InterruptedException ex) {
if (firstBatch) {
throw ex;
} else {
// If this is not the first batch, then some items have already been
// sent. Thus, return gracefully instead of throwing an exception so
// that the caller can discover what was sent.
Thread.currentThread().interrupt();
return batch.get(0);
}
}
if (failedId != null) {
log.info("Failed to push all items. Failed on: " + failedId);
return failedId;
}
firstBatch = false;
journal.recordDocIdPush(batch);
}
log.info("Pushed items");
return null;
}
public GroupPrincipal pushGroupDefinitions(
Map<GroupPrincipal, ? extends Collection<Principal>> defs,
boolean caseSensitive, ExceptionHandler handler)
throws InterruptedException {
if (defs.isEmpty()) {
return null;
}
if (null == handler) {
handler = defaultErrorHandler;
}
String groupsDefXml
= fileMaker.makeGroupsDefinitionsXml(defs, caseSensitive);
boolean keepGoing = true;
boolean success = false;
log.log(Level.INFO, "pushing groups");
for (int ntries = 1; keepGoing; ntries++) {
try {
log.info("sending groups to GSA host name: " + config.getGsaHostname());
fileSender.sendGroups(config.getFeedName(),
groupsDefXml, config.isServerToUseCompression());
keepGoing = false; // Sent.
success = true;
} catch (IOException ex) {
log.log(Level.WARNING, "failed to send groups", ex);
keepGoing = handler.handleException(ex, ntries);
}
if (keepGoing) {
log.log(Level.INFO, "trying again... number of attemps: {0}", ntries);
}
}
GroupPrincipal last = null;
if (success) {
log.info("pushing groups succeeded");
} else {
log.log(Level.WARNING, "gave up pushing groups");
last = defs.entrySet().iterator().next().getKey(); // checked on entry
}
log.info("finished pushing groups");
return last;
}
private <T extends Item> T pushSizedBatchOfRecords(List<T> items,
ExceptionHandler handler)
throws InterruptedException {
String feedSourceName = config.getFeedName();
String xmlFeedFile = fileMaker.makeMetadataAndUrlXml(feedSourceName, items);
boolean keepGoing = true;
boolean success = false;
log.log(Level.INFO, "Pushing batch of {0} items to GSA", items.size());
for (int ntries = 1; keepGoing; ntries++) {
try {
log.info("Sending items to GSA host: " + config.getGsaHostname());
fileSender.sendMetadataAndUrl(feedSourceName, xmlFeedFile,
config.isServerToUseCompression());
keepGoing = false; // Sent.
success = true;
} catch (IOException ex) {
log.log(Level.WARNING, "Failed to send items", ex);
keepGoing = handler.handleException(ex, ntries);
}
if (keepGoing) {
log.log(Level.INFO, "Trying again... Number of attemps: {0}", ntries);
}
}
if (success) {
log.info("Pushing batch succeeded");
} else {
log.log(Level.WARNING, "Gave up. First item in list: {0}", items.get(0));
}
log.info("Finished pushing batch of items");
return success ? null : items.get(0);
}
/** Marker interface for an item that can exist in a feed. */
interface Item {}
/**
* Represents the ACL tag sent in feeds.
*/
static final class AclItem implements Item {
private DocId id;
private final String docIdFragment;
private Acl acl;
public AclItem(DocId id, Acl acl) {
this(id, null, acl);
}
public AclItem(DocId id, String docIdFragment, Acl acl) {
if (id == null || acl == null) {
throw new NullPointerException("DocId and Acl must not be null");
}
this.id = id;
this.docIdFragment = docIdFragment;
this.acl = acl;
}
public DocId getDocId() {
return id;
}
public String getDocIdFragment() {
return docIdFragment;
}
public Acl getAcl() {
return acl;
}
}
}