blob: b9e6e52c21cf04b88ef6f8d7558c5f91e3f1c175 [file] [log] [blame]
// Copyright 2011 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.enterprise.adaptor;
import java.util.*;
/**
* Contains registers and stats regarding runtime.
*/
class Journal {
private Map<DocId, Integer> timesPushed;
private long totalPushes;
private Map<DocId, Integer> timesGsaRequested;
private long totalGsaRequests;
private Map<DocId, Integer> timesNonGsaRequested;
private long totalNonGsaRequests;
private final TimeProvider timeProvider;
private final long startedAt;
/**
* Resolution of {@link System#currentTypeMillis()} to the millisecond.
*/
private final long timeResolution;
/**
* Time-based bookkeeping for charts. Each element in the array is for a
* different time period.
*/
private Stats[] timeStats;
private Stats dayStatsByHalfHour;
/** Request processing start time storage until processing completion. */
private ThreadLocal<Long> requestProcessingStart = new ThreadLocal<Long>();
/**
* Date in milliseconds of current full push start. If zero, then there is not
* a running full push.
*/
private long currentFullPushStart;
/** Date in milliseconds. */
private long lastSuccessfulFullPushStart;
/** Date in milliseconds. */
private long lastSuccessfulFullPushEnd;
private CompletionStatus lastFullPushStatus = CompletionStatus.SUCCESS;
private long currentIncrementalPushStart;
private long lastSuccessfulIncrementalPushStart;
private long lastSuccessfulIncrementalPushEnd;
private CompletionStatus lastIncrementalPushStatus = CompletionStatus.SUCCESS;
enum CompletionStatus {
SUCCESS,
INTERRUPTION,
FAILURE,
}
/**
* @param reducedMem whether to use a fixed amount of memory, at the expense
* of some statistics being disabled
*/
public Journal(boolean reducedMem) {
this(reducedMem, new SystemTimeProvider());
}
/**
* Same as {@code Journal(false, timeProvider)}.
*/
protected Journal(TimeProvider timeProvider) {
this(false, timeProvider);
}
protected Journal(boolean reducedMem, TimeProvider timeProvider) {
this.timeProvider = timeProvider;
this.startedAt = timeProvider.currentTimeMillis();
this.timeResolution = determineTimeResolution();
// We want data within the Stats to agree with each other, so we provide the
// same time to each of them.
long time = startedAt;
this.timeStats = new Stats[] {
new Stats(60, 1000, time), /* one minute, second granularity */
new Stats(60, 1000 * 60, time), /* one hour, minute granularity */
new Stats(48, 1000 * 60 * 30, time), /* one day, half-hour granularity */
};
this.dayStatsByHalfHour = this.timeStats[this.timeStats.length - 1];
if (reducedMem) {
timesPushed = new NegSizeFakeMap<DocId, Integer>();
timesGsaRequested = new NegSizeFakeMap<DocId, Integer>();
timesNonGsaRequested = new NegSizeFakeMap<DocId, Integer>();
} else {
timesPushed = new HashMap<DocId, Integer>();
timesGsaRequested = new HashMap<DocId, Integer>();
timesNonGsaRequested = new HashMap<DocId, Integer>();
}
}
synchronized void recordDocIdPush(List<? extends DocIdSender.Item> pushed) {
for (Object item : pushed) {
if (item instanceof DocIdPusher.Record) {
DocIdPusher.Record record = (DocIdPusher.Record) item;
increment(timesPushed, record.getDocId());
} else if (item instanceof DocIdSender.AclItem) {
// Don't record any information.
} else {
throw new IllegalArgumentException("Unsupported class: "
+ item.getClass().getName());
}
}
totalPushes += pushed.size();
}
void recordGsaContentRequest(DocId docId) {
long time = timeProvider.currentTimeMillis();
synchronized (this) {
increment(timesGsaRequested, docId);
totalGsaRequests++;
for (Stats stats : timeStats) {
Stat stat = stats.getCurrentStat(time);
stat.gsaRetrievedDocument = true;
}
}
}
synchronized void recordNonGsaContentRequest(DocId requested) {
increment(timesNonGsaRequested, requested);
totalNonGsaRequests++;
}
/**
* Record that the processing of a request has been started on this thread.
* This relates to internal computation required to satisfy the request.
*/
void recordRequestProcessingStart() {
requestProcessingStart.set(timeProvider.currentTimeMillis());
}
/**
* Record that the processing this thread was performing to satisfy the
* request has completed.
*/
void recordRequestProcessingEnd(long responseSize) {
recordRequestProcessingEnd(responseSize, timeProvider.currentTimeMillis());
}
private void recordRequestProcessingEnd(long responseSize, long time) {
long duration = endDuration(requestProcessingStart, time);
synchronized (this) {
for (Stats stats : timeStats) {
Stat stat = stats.getCurrentStat(time);
stat.requestProcessingsCount++;
stat.requestProcessingsDurationSum += duration;
stat.requestProcessingsMaxDuration = Math.max(
stat.requestProcessingsMaxDuration, duration);
stat.requestProcessingsThroughput += responseSize;
}
}
}
/**
* Record that the processing this thread was performing to satisfy the
* request has completed, but with an error.
*/
void recordRequestProcessingFailure() {
long time = timeProvider.currentTimeMillis();
synchronized (this) {
recordRequestProcessingEnd(0, time);
for (Stats stats : timeStats) {
Stat stat = stats.getCurrentStat(time);
stat.requestProcessingsFailureCount++;
}
}
}
private long endDuration(ThreadLocal<Long> localStartTime, long endTime) {
Long startTime = localStartTime.get();
localStartTime.remove();
if (startTime == null) {
throw new IllegalStateException("Record start must be called before "
+ "record end");
}
return endTime - startTime;
}
/**
* Determine the resolution that {@link System#currentTimeMillis} supports, in
* milliseconds.
*/
private long determineTimeResolution() {
long resolution = Long.MAX_VALUE;
for (int i = 0; i < 5; i++) {
resolution = Math.min(resolution, determineTimeResolutionOnce());
}
return resolution;
}
/**
* One trial of determining the resolution that {@link
* System#currentTimeMillis} supports, in milliseconds.
*/
private long determineTimeResolutionOnce() {
long time = timeProvider.currentTimeMillis();
long startTime = time;
while (startTime == time) {
time = timeProvider.currentTimeMillis();
}
return time - startTime;
}
private static void increment(Map<DocId, Integer> counts, DocId id) {
if (!counts.containsKey(id)) {
counts.put(id, 1);
} else {
counts.put(id, 1 + counts.get(id));
}
}
/**
* Record that a full push has started. Only one is tracked at a time.
*/
synchronized void recordFullPushStarted() {
if (currentFullPushStart != 0) {
throw new IllegalStateException("Full push already started");
}
currentFullPushStart = timeProvider.currentTimeMillis();
}
/**
* Record that the full push completed successfully.
*/
void recordFullPushSuccessful() {
long endTime = timeProvider.currentTimeMillis();
synchronized (this) {
long startTime = currentFullPushStart;
currentFullPushStart = 0;
this.lastSuccessfulFullPushStart = startTime;
this.lastSuccessfulFullPushEnd = endTime;
lastFullPushStatus = CompletionStatus.SUCCESS;
}
}
/**
* Record that the full push was interrupted prematurely.
*/
synchronized void recordFullPushInterrupted() {
if (currentFullPushStart == 0) {
throw new IllegalStateException("Full push not already started");
}
currentFullPushStart = 0;
lastFullPushStatus = CompletionStatus.INTERRUPTION;
}
/**
* Record that the full push was interrupted prematurely.
*/
synchronized void recordFullPushFailed() {
if (currentFullPushStart == 0) {
throw new IllegalStateException("Full push not already started");
}
currentFullPushStart = 0;
lastFullPushStatus = CompletionStatus.FAILURE;
}
synchronized CompletionStatus getLastFullPushStatus() {
return lastFullPushStatus;
}
/**
* Record that an incremental push has started.
*/
synchronized void recordIncrementalPushStarted() {
if (currentIncrementalPushStart != 0) {
throw new IllegalStateException("Incremental push already started");
}
currentIncrementalPushStart = timeProvider.currentTimeMillis();
}
/**
* Record that the incremental push completed successfully.
*/
void recordIncrementalPushSuccessful() {
long endTime = timeProvider.currentTimeMillis();
synchronized (this) {
long startTime = currentIncrementalPushStart;
currentIncrementalPushStart = 0;
this.lastSuccessfulIncrementalPushStart = startTime;
this.lastSuccessfulIncrementalPushEnd = endTime;
lastIncrementalPushStatus = CompletionStatus.SUCCESS;
}
}
/**
* Record that the incremental push was interrupted prematurely.
*/
synchronized void recordIncrementalPushInterrupted() {
if (currentIncrementalPushStart == 0) {
throw new IllegalStateException("Incremental push not already started");
}
currentIncrementalPushStart = 0;
lastIncrementalPushStatus = CompletionStatus.INTERRUPTION;
}
/**
* Record that the incremental push was interrupted prematurely.
*/
synchronized void recordIncrementalPushFailed() {
if (currentIncrementalPushStart == 0) {
throw new IllegalStateException("Incremental push not already started");
}
currentIncrementalPushStart = 0;
lastIncrementalPushStatus = CompletionStatus.FAILURE;
}
synchronized CompletionStatus getLastIncrementalPushStatus() {
return lastIncrementalPushStatus;
}
double getRetrieverErrorRate(long maxCount) {
long currentTime = timeProvider.currentTimeMillis();
long count = 0;
long failures = 0;
synchronized (Journal.this) {
Stats stats = dayStatsByHalfHour;
// Update bookkeeping.
stats.getCurrentStat(currentTime);
for (int i = 0; i < stats.stats.length && count < maxCount; i++) {
// Walk through indexes in reverse order, starting with most current.
int index = (stats.currentStat - i + stats.stats.length)
% stats.stats.length;
Stat stat = stats.stats[index];
count += stat.requestProcessingsCount;
failures += stat.requestProcessingsFailureCount;
}
}
double rate = 0;
if (count > 0) {
rate = failures / (double) count;
}
return rate;
}
boolean hasGsaCrawledWithinLastDay() {
long currentTime = timeProvider.currentTimeMillis();
synchronized (Journal.this) {
Stats stats = dayStatsByHalfHour;
// Update bookkeeping.
stats.getCurrentStat(currentTime);
for (Stat stat : stats.stats) {
if (stat.gsaRetrievedDocument) {
return true;
}
}
}
return false;
}
/**
* Access to the timeStats for use in {@link DashboardHandler} only.
*/
synchronized JournalSnapshot getSnapshot() {
long currentTime = timeProvider.currentTimeMillis();
Stats[] timeStatsClone = new Stats[timeStats.length];
for (int i = 0; i < timeStats.length; i++) {
// Cause stats to update its internal structures
timeStats[i].getCurrentStat(currentTime);
timeStatsClone[i] = timeStats[i].clone();
}
return new JournalSnapshot(this, currentTime, timeStatsClone);
}
static class JournalSnapshot {
final long numUniqueDocIdsPushed;
final long numTotalDocIdsPushed;
final long numUniqueGsaRequests;
final long numTotalGsaRequests;
final long numUniqueNonGsaRequests;
final long numTotalNonGsaRequests;
final long whenStarted;
final long currentTime;
final long timeResolution;
final long lastSuccessfulFullPushStart;
final long lastSuccessfulFullPushEnd;
final long currentFullPushStart;
final long lastSuccessfulIncrementalPushStart;
final long lastSuccessfulIncrementalPushEnd;
final long currentIncrementalPushStart;
final Stats[] timeStats;
JournalSnapshot(Journal journal, long currentTime, Stats[] timeStatsClone) {
this.numUniqueDocIdsPushed = journal.timesPushed.size();
this.numTotalDocIdsPushed = journal.totalPushes;
this.numUniqueGsaRequests = journal.timesGsaRequested.size();
this.numTotalGsaRequests = journal.totalGsaRequests;
this.numUniqueNonGsaRequests = journal.timesNonGsaRequested.size();
this.numTotalNonGsaRequests = journal.totalNonGsaRequests;
this.timeResolution = journal.timeResolution;
this.lastSuccessfulFullPushStart = journal.lastSuccessfulFullPushStart;
this.lastSuccessfulFullPushEnd = journal.lastSuccessfulFullPushEnd;
this.currentFullPushStart = journal.currentFullPushStart;
this.lastSuccessfulIncrementalPushStart
= journal.lastSuccessfulIncrementalPushStart;
this.lastSuccessfulIncrementalPushEnd
= journal.lastSuccessfulIncrementalPushEnd;
this.currentIncrementalPushStart = journal.currentIncrementalPushStart;
this.whenStarted = journal.startedAt;
this.currentTime = currentTime;
this.timeStats = timeStatsClone;
}
}
static class Stats implements Cloneable {
/**
* Circular buffer containing all the statistics this object contains. When
* the current {@link Stat} "expires", {@link #currentStat} is incremented
* (floor {@code stats.length}) and the new current {@code Stat} object is
* reset.
*/
Stat[] stats;
/**
* The amount of time each {@link Stat} object's statistics were measured.
*/
long snapshotDurationMs;
/**
* Current {@link Stat} object within {@link #stats}. All incoming
* statistics should update the current {@code Stat} object.
*/
int currentStat;
/**
* Time that the current {@link Stat} object will expire and {@link
* #currentStat} will be incremented.
*/
long pendingStatPeriodEnd;
public Stats(int statCount, long snapshotDuration, long currentTime) {
this.snapshotDurationMs = snapshotDuration;
this.stats = new Stat[statCount];
// Pre-allocate all Stat objects
for (int i = 0; i < this.stats.length; i++) {
this.stats[i] = new Stat();
}
// Initialize expiration times for pendingStat objects
long duration = this.snapshotDurationMs;
this.pendingStatPeriodEnd = ((currentTime / duration) * duration)
+ duration;
}
/**
* Retrive the current {@code Stat} object that applies to {@code
* currentTime}. {@code currentTime} is expected to be an actual point in
* time while the caller was holding the lock on {@link Journal}. It may
* never be less than the previous call.
*/
public Stat getCurrentStat(long currentTime) {
// Check if the current Stat object is still valid to write to
if (pendingStatPeriodEnd > currentTime) {
return stats[currentStat];
}
// Check if all the Stat objects are invalid. This occurs when the last
// request was a long time ago.
if (currentTime - pendingStatPeriodEnd
> snapshotDurationMs * stats.length) {
for (int i = 0; i < stats.length; i++) {
stats[i].reset();
}
long duration = snapshotDurationMs;
pendingStatPeriodEnd = ((currentTime / duration) * duration) + duration;
}
// Walk through time to get the current Stat object
while (pendingStatPeriodEnd <= currentTime) {
currentStat++;
currentStat %= stats.length;
stats[currentStat].reset();
pendingStatPeriodEnd += snapshotDurationMs;
}
return stats[currentStat];
}
public Stats clone() {
Stats statsClone;
try {
statsClone = (Stats) super.clone();
} catch (CloneNotSupportedException ex) {
throw new AssertionError();
}
statsClone.stats = new Stat[stats.length];
for (int i = 0; i < stats.length; i++) {
statsClone.stats[i] = stats[i].clone();
}
return statsClone;
}
}
/**
* Structure for holding statistics data. Data applies to a time period only,
* as controlled by {@link Stats}.
*/
static class Stat implements Cloneable {
/**
* The number of requests processed.
*/
long requestProcessingsCount;
/**
* The number of request processings that errored.
*/
long requestProcessingsFailureCount;
/**
* The total duration of response processings.
*/
long requestProcessingsDurationSum;
/**
* The maximal duration of any one response processing.
*/
long requestProcessingsMaxDuration;
/**
* Number of bytes generated by the adaptor.
*/
long requestProcessingsThroughput;
/**
* True if the GSA requested a document.
*/
boolean gsaRetrievedDocument;
public Stat() {
reset();
}
/**
* Reset object for reuse.
*/
private void reset() {
requestProcessingsCount = 0;
requestProcessingsFailureCount = 0;
requestProcessingsDurationSum = 0;
requestProcessingsMaxDuration = 0;
requestProcessingsThroughput = 0;
gsaRetrievedDocument = false;
}
public Stat clone() {
try {
return (Stat) super.clone();
} catch (CloneNotSupportedException ex) {
throw new AssertionError();
}
}
}
private static class NegSizeFakeMap<K, V> extends FakeMap<K, V> {
@Override
public int size() {
return -1;
}
}
}