blob: f98880bf68df32809671911e31a8fb982fa6353a [file] [log] [blame]
/*
* Copyright (c) 2003, 2012, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package com.sun.corba.se.impl.orbutil.threadpool;
import java.io.IOException;
import java.io.Closeable;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.List;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import com.sun.corba.se.spi.orbutil.threadpool.NoSuchWorkQueueException;
import com.sun.corba.se.spi.orbutil.threadpool.ThreadPool;
import com.sun.corba.se.spi.orbutil.threadpool.Work;
import com.sun.corba.se.spi.orbutil.threadpool.WorkQueue;
import com.sun.corba.se.impl.orbutil.ORBConstants;
import com.sun.corba.se.impl.orbutil.threadpool.WorkQueueImpl;
import com.sun.corba.se.spi.monitoring.MonitoringConstants;
import com.sun.corba.se.spi.monitoring.MonitoredObject;
import com.sun.corba.se.spi.monitoring.MonitoringFactories;
import com.sun.corba.se.spi.orb.ORB;
import com.sun.corba.se.spi.monitoring.LongMonitoredAttributeBase;
import com.sun.corba.se.impl.logging.ORBUtilSystemException;
import com.sun.corba.se.impl.orbutil.ORBConstants;
import com.sun.corba.se.spi.logging.CORBALogDomains;
public class ThreadPoolImpl implements ThreadPool
{
// serial counter useful for debugging
private static AtomicInteger threadCounter = new AtomicInteger(0);
private static final ORBUtilSystemException wrapper =
ORBUtilSystemException.get(CORBALogDomains.RPC_TRANSPORT);
// Any time currentThreadCount and/or availableWorkerThreads is updated
// or accessed this ThreadPool's WorkQueue must be locked. And, it is
// expected that this ThreadPool's WorkQueue is the only object that
// updates and accesses these values directly and indirectly though a
// call to a method in this ThreadPool. If any call to update or access
// those values must synchronized on this ThreadPool's WorkQueue.
private WorkQueue workQueue;
// Stores the number of available worker threads
private int availableWorkerThreads = 0;
// Stores the number of threads in the threadpool currently
private int currentThreadCount = 0;
// Minimum number of worker threads created at instantiation of the threadpool
private int minWorkerThreads = 0;
// Maximum number of worker threads in the threadpool
private int maxWorkerThreads = 0;
// Inactivity timeout value for worker threads to exit and stop running
private long inactivityTimeout;
// Indicates if the threadpool is bounded or unbounded
private boolean boundedThreadPool = false;
// Running count of the work items processed
// Set the value to 1 so that divide by zero is avoided in
// averageWorkCompletionTime()
private AtomicLong processedCount = new AtomicLong(1);
// Running aggregate of the time taken in millis to execute work items
// processed by the threads in the threadpool
private AtomicLong totalTimeTaken = new AtomicLong(0);
// Name of the ThreadPool
private String name;
// MonitoredObject for ThreadPool
private MonitoredObject threadpoolMonitoredObject;
// ThreadGroup in which threads should be created
private ThreadGroup threadGroup;
Object workersLock = new Object();
List<WorkerThread> workers = new ArrayList<>();
/**
* This constructor is used to create an unbounded threadpool
*/
public ThreadPoolImpl(ThreadGroup tg, String threadpoolName) {
inactivityTimeout = ORBConstants.DEFAULT_INACTIVITY_TIMEOUT;
maxWorkerThreads = Integer.MAX_VALUE;
workQueue = new WorkQueueImpl(this);
threadGroup = tg;
name = threadpoolName;
initializeMonitoring();
}
/**
* This constructor is used to create an unbounded threadpool
* in the ThreadGroup of the current thread
*/
public ThreadPoolImpl(String threadpoolName) {
this( Thread.currentThread().getThreadGroup(), threadpoolName ) ;
}
/**
* This constructor is used to create bounded threadpool
*/
public ThreadPoolImpl(int minSize, int maxSize, long timeout,
String threadpoolName)
{
minWorkerThreads = minSize;
maxWorkerThreads = maxSize;
inactivityTimeout = timeout;
boundedThreadPool = true;
workQueue = new WorkQueueImpl(this);
name = threadpoolName;
for (int i = 0; i < minWorkerThreads; i++) {
createWorkerThread();
}
initializeMonitoring();
}
// Note that this method should not return until AFTER all threads have died.
public void close() throws IOException {
// Copy to avoid concurrent modification problems.
List<WorkerThread> copy = null;
synchronized (workersLock) {
copy = new ArrayList<>(workers);
}
for (WorkerThread wt : copy) {
wt.close();
while (wt.getState() != Thread.State.TERMINATED) {
try {
wt.join();
} catch (InterruptedException exc) {
wrapper.interruptedJoinCallWhileClosingThreadPool(exc, wt, this);
}
}
}
threadGroup = null;
}
// Setup monitoring for this threadpool
private void initializeMonitoring() {
// Get root monitored object
MonitoredObject root = MonitoringFactories.getMonitoringManagerFactory().
createMonitoringManager(MonitoringConstants.DEFAULT_MONITORING_ROOT, null).
getRootMonitoredObject();
// Create the threadpool monitoring root
MonitoredObject threadPoolMonitoringObjectRoot = root.getChild(
MonitoringConstants.THREADPOOL_MONITORING_ROOT);
if (threadPoolMonitoringObjectRoot == null) {
threadPoolMonitoringObjectRoot = MonitoringFactories.
getMonitoredObjectFactory().createMonitoredObject(
MonitoringConstants.THREADPOOL_MONITORING_ROOT,
MonitoringConstants.THREADPOOL_MONITORING_ROOT_DESCRIPTION);
root.addChild(threadPoolMonitoringObjectRoot);
}
threadpoolMonitoredObject = MonitoringFactories.
getMonitoredObjectFactory().
createMonitoredObject(name,
MonitoringConstants.THREADPOOL_MONITORING_DESCRIPTION);
threadPoolMonitoringObjectRoot.addChild(threadpoolMonitoredObject);
LongMonitoredAttributeBase b1 = new
LongMonitoredAttributeBase(MonitoringConstants.THREADPOOL_CURRENT_NUMBER_OF_THREADS,
MonitoringConstants.THREADPOOL_CURRENT_NUMBER_OF_THREADS_DESCRIPTION) {
public Object getValue() {
return new Long(ThreadPoolImpl.this.currentNumberOfThreads());
}
};
threadpoolMonitoredObject.addAttribute(b1);
LongMonitoredAttributeBase b2 = new
LongMonitoredAttributeBase(MonitoringConstants.THREADPOOL_NUMBER_OF_AVAILABLE_THREADS,
MonitoringConstants.THREADPOOL_CURRENT_NUMBER_OF_THREADS_DESCRIPTION) {
public Object getValue() {
return new Long(ThreadPoolImpl.this.numberOfAvailableThreads());
}
};
threadpoolMonitoredObject.addAttribute(b2);
LongMonitoredAttributeBase b3 = new
LongMonitoredAttributeBase(MonitoringConstants.THREADPOOL_NUMBER_OF_BUSY_THREADS,
MonitoringConstants.THREADPOOL_NUMBER_OF_BUSY_THREADS_DESCRIPTION) {
public Object getValue() {
return new Long(ThreadPoolImpl.this.numberOfBusyThreads());
}
};
threadpoolMonitoredObject.addAttribute(b3);
LongMonitoredAttributeBase b4 = new
LongMonitoredAttributeBase(MonitoringConstants.THREADPOOL_AVERAGE_WORK_COMPLETION_TIME,
MonitoringConstants.THREADPOOL_AVERAGE_WORK_COMPLETION_TIME_DESCRIPTION) {
public Object getValue() {
return new Long(ThreadPoolImpl.this.averageWorkCompletionTime());
}
};
threadpoolMonitoredObject.addAttribute(b4);
LongMonitoredAttributeBase b5 = new
LongMonitoredAttributeBase(MonitoringConstants.THREADPOOL_CURRENT_PROCESSED_COUNT,
MonitoringConstants.THREADPOOL_CURRENT_PROCESSED_COUNT_DESCRIPTION) {
public Object getValue() {
return new Long(ThreadPoolImpl.this.currentProcessedCount());
}
};
threadpoolMonitoredObject.addAttribute(b5);
// Add the monitored object for the WorkQueue
threadpoolMonitoredObject.addChild(
((WorkQueueImpl)workQueue).getMonitoredObject());
}
// Package private method to get the monitored object for this
// class
MonitoredObject getMonitoredObject() {
return threadpoolMonitoredObject;
}
public WorkQueue getAnyWorkQueue()
{
return workQueue;
}
public WorkQueue getWorkQueue(int queueId)
throws NoSuchWorkQueueException
{
if (queueId != 0)
throw new NoSuchWorkQueueException();
return workQueue;
}
/**
* To be called from the workqueue when work is added to the
* workQueue. This method would create new threads if required
* or notify waiting threads on the queue for available work
*/
void notifyForAvailableWork(WorkQueue aWorkQueue) {
synchronized (aWorkQueue) {
if (availableWorkerThreads < aWorkQueue.workItemsInQueue()) {
createWorkerThread();
} else {
aWorkQueue.notify();
}
}
}
private Thread createWorkerThreadHelper( String name ) {
// Thread creation needs to be in a doPrivileged block
// if there is a non-null security manager for two reasons:
// 1. The creation of a thread in a specific ThreadGroup
// is a privileged operation. Lack of a doPrivileged
// block here causes an AccessControlException
// (see bug 6268145).
// 2. We want to make sure that the permissions associated
// with this thread do NOT include the permissions of
// the current thread that is calling this method.
// This leads to problems in the app server where
// some threads in the ThreadPool randomly get
// bad permissions, leading to unpredictable
// permission errors (see bug 6021011).
//
// A Java thread contains a stack of call frames,
// one for each method called that has not yet returned.
// Each method comes from a particular class. The class
// was loaded by a ClassLoader which has an associated
// CodeSource, and this determines the Permissions
// for all methods in that class. The current
// Permissions for the thread are the intersection of
// all Permissions for the methods on the stack.
// This is part of the Security Context of the thread.
//
// When a thread creates a new thread, the new thread
// inherits the security context of the old thread.
// This is bad in a ThreadPool, because different
// creators of threads may have different security contexts.
// This leads to occasional unpredictable errors when
// a thread is re-used in a different security context.
//
// Avoiding this problem is simple: just do the thread
// creation in a doPrivileged block. This sets the
// inherited security context to that of the code source
// for the ORB code itself, which contains all permissions
// in either Java SE or Java EE.
WorkerThread thread = new WorkerThread(threadGroup, name);
synchronized (workersLock) {
workers.add(thread);
}
// The thread must be set to a daemon thread so the
// VM can exit if the only threads left are PooledThreads
// or other daemons. We don't want to rely on the
// calling thread always being a daemon.
// Note that no exception is possible here since we
// are inside the doPrivileged block.
thread.setDaemon(true);
wrapper.workerThreadCreated(thread, thread.getContextClassLoader());
thread.start();
return null;
}
/**
* To be called from the workqueue to create worker threads when none
* available.
*/
void createWorkerThread() {
final String name = getName();
synchronized (workQueue) {
try {
if (System.getSecurityManager() == null) {
createWorkerThreadHelper(name);
} else {
// If we get here, we need to create a thread.
AccessController.doPrivileged(
new PrivilegedAction() {
public Object run() {
return createWorkerThreadHelper(name);
}
}
);
}
} catch (Throwable t) {
// Decrementing the count of current worker threads.
// But, it will be increased in the finally block.
decrementCurrentNumberOfThreads();
wrapper.workerThreadCreationFailure(t);
} finally {
incrementCurrentNumberOfThreads();
}
}
}
public int minimumNumberOfThreads() {
return minWorkerThreads;
}
public int maximumNumberOfThreads() {
return maxWorkerThreads;
}
public long idleTimeoutForThreads() {
return inactivityTimeout;
}
public int currentNumberOfThreads() {
synchronized (workQueue) {
return currentThreadCount;
}
}
void decrementCurrentNumberOfThreads() {
synchronized (workQueue) {
currentThreadCount--;
}
}
void incrementCurrentNumberOfThreads() {
synchronized (workQueue) {
currentThreadCount++;
}
}
public int numberOfAvailableThreads() {
synchronized (workQueue) {
return availableWorkerThreads;
}
}
public int numberOfBusyThreads() {
synchronized (workQueue) {
return (currentThreadCount - availableWorkerThreads);
}
}
public long averageWorkCompletionTime() {
synchronized (workQueue) {
return (totalTimeTaken.get() / processedCount.get());
}
}
public long currentProcessedCount() {
synchronized (workQueue) {
return processedCount.get();
}
}
public String getName() {
return name;
}
/**
* This method will return the number of WorkQueues serviced by the threadpool.
*/
public int numberOfWorkQueues() {
return 1;
}
private static synchronized int getUniqueThreadId() {
return ThreadPoolImpl.threadCounter.incrementAndGet();
}
/**
* This method will decrement the number of available threads
* in the threadpool which are waiting for work. Called from
* WorkQueueImpl.requestWork()
*/
void decrementNumberOfAvailableThreads() {
synchronized (workQueue) {
availableWorkerThreads--;
}
}
/**
* This method will increment the number of available threads
* in the threadpool which are waiting for work. Called from
* WorkQueueImpl.requestWork()
*/
void incrementNumberOfAvailableThreads() {
synchronized (workQueue) {
availableWorkerThreads++;
}
}
private class WorkerThread extends Thread implements Closeable
{
private Work currentWork;
private int threadId = 0; // unique id for the thread
private volatile boolean closeCalled = false;
private String threadPoolName;
// name seen by Thread.getName()
private StringBuffer workerThreadName = new StringBuffer();
WorkerThread(ThreadGroup tg, String threadPoolName) {
super(tg, "Idle");
this.threadId = ThreadPoolImpl.getUniqueThreadId();
this.threadPoolName = threadPoolName;
setName(composeWorkerThreadName(threadPoolName, "Idle"));
}
public synchronized void close() {
closeCalled = true;
interrupt();
}
private void resetClassLoader() {
}
private void performWork() {
long start = System.currentTimeMillis();
try {
currentWork.doWork();
} catch (Throwable t) {
wrapper.workerThreadDoWorkThrowable(this, t);
}
long elapsedTime = System.currentTimeMillis() - start;
totalTimeTaken.addAndGet(elapsedTime);
processedCount.incrementAndGet();
}
public void run() {
try {
while (!closeCalled) {
try {
currentWork = ((WorkQueueImpl)workQueue).requestWork(
inactivityTimeout);
if (currentWork == null)
continue;
} catch (InterruptedException exc) {
wrapper.workQueueThreadInterrupted( exc, getName(),
Boolean.valueOf(closeCalled));
continue ;
} catch (Throwable t) {
wrapper.workerThreadThrowableFromRequestWork(this, t,
workQueue.getName());
continue;
}
performWork();
// set currentWork to null so that the work item can be
// garbage collected without waiting for the next work item.
currentWork = null;
resetClassLoader();
}
} catch (Throwable e) {
// This should not be possible
wrapper.workerThreadCaughtUnexpectedThrowable(this,e);
} finally {
synchronized (workersLock) {
workers.remove(this);
}
}
}
private String composeWorkerThreadName(String poolName, String workerName) {
workerThreadName.setLength(0);
workerThreadName.append("p: ").append(poolName);
workerThreadName.append("; w: ").append(workerName);
return workerThreadName.toString();
}
} // End of WorkerThread class
}
// End of file.