blob: 4fee0533fbe7c6c49be9865f8256faa4f025a0b9 [file] [log] [blame]
// Copyright 2011 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.enterprise.adaptor;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Calls Thread.interrupt() when a thread takes too long to complete a task.
* You must ensure that a processingCompleted() is executed on the same thread
* after each processingStarted(); using try-finally is highly encouraged:
*
* <code>
* watchdog.processingStarted();
* try {
* doWork();
* } finally {
* watchdog.processingCompleted();
* }
* </code>
*/
class Watchdog {
private final ScheduledExecutorService executor;
private final ConcurrentMap<Thread, FutureInfo> inProcess
= new ConcurrentHashMap<Thread, FutureInfo>();
/**
* @param executor executor to schedule tasks
*/
public Watchdog(ScheduledExecutorService executor) {
if (executor == null) {
throw new NullPointerException();
}
this.executor = executor;
}
/**
* @param timeout maximum allowed duration in milliseconds
*/
public void processingStarting(long timeout) {
processingStarting(Thread.currentThread(), timeout);
}
public void processingStarting(Thread thread, long timeout) {
if (inProcess.get(thread) != null) {
throw new IllegalStateException("Processing is already occuring on the "
+ "thread");
}
AtomicBoolean interruptNeeded = new AtomicBoolean(true);
Runnable task = new Interrupter(thread, interruptNeeded);
Future<?> future = executor.schedule(task, timeout, TimeUnit.MILLISECONDS);
FutureInfo info = new FutureInfo(future, interruptNeeded);
if (inProcess.putIfAbsent(thread, info) != null) {
// Reset state and try again.
future.cancel(false);
processingStarting(thread, timeout);
return;
}
}
public void processingCompleted() {
processingCompleted(Thread.currentThread());
}
public void processingCompleted(Thread thread) {
FutureInfo info = inProcess.remove(thread);
if (info == null) {
throw new IllegalStateException("No processing was started on the "
+ "thread");
}
// Prevent Interrupter from running if it hasn't started already. It may
// still be running after this call and Future doesn't tell us if it is
// currently running.
info.future.cancel(false);
synchronized (info.interruptNeeded) {
if (info.interruptNeeded.get()) {
// Interrupter hasn't interrupted this thread.
// Prevent the Interrupter from interrupting this thread in the future.
info.interruptNeeded.set(false);
} else {
// Interrupter has interrupted this thread.
// Clear the interrupt, if not already cleared, since we don't want to
// interrupt this thread any further.
thread.interrupted();
}
}
}
private static class Interrupter implements Runnable {
private final Thread thread;
/**
* Denotes the interrupter has responsibility to interrupt the thread. It
* must be cleared after the thread has been interrupted.
*/
private AtomicBoolean interruptNeeded;
public Interrupter(Thread thread, AtomicBoolean interruptNeeded) {
this.thread = thread;
this.interruptNeeded = interruptNeeded;
}
public void run() {
// Must synchronize to prevent processingCompleted() from attempting to
// clear the interrupt before interrupt() is called here.
synchronized (interruptNeeded) {
if (interruptNeeded.get()) {
thread.interrupt();
interruptNeeded.set(false);
}
}
}
}
private static class FutureInfo {
public final Future<?> future;
/**
* Denotes that the interrupter has responibility to interrupt the thread.
*/
public final AtomicBoolean interruptNeeded;
public FutureInfo(Future<?> future, AtomicBoolean interruptNeeded) {
this.future = future;
this.interruptNeeded = interruptNeeded;
}
}
}