blob: 7756ef2cccc50d5349bea3f7598bfc4d4adaca05 [file] [log] [blame]
/*
* Copyright (c) 2003, 2013, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package com.sun.corba.se.impl.transport;
import java.io.IOException;
import java.net.ServerSocket;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ClosedSelectorException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Iterator;
import java.util.List;
import com.sun.corba.se.pept.broker.Broker;
import com.sun.corba.se.pept.transport.Acceptor;
import com.sun.corba.se.pept.transport.Connection;
import com.sun.corba.se.pept.transport.EventHandler;
import com.sun.corba.se.pept.transport.ListenerThread;
import com.sun.corba.se.pept.transport.ReaderThread;
import com.sun.corba.se.spi.logging.CORBALogDomains;
import com.sun.corba.se.spi.orb.ORB;
import com.sun.corba.se.spi.orbutil.threadpool.Work;
import com.sun.corba.se.spi.orbutil.threadpool.NoSuchThreadPoolException;
import com.sun.corba.se.spi.orbutil.threadpool.NoSuchWorkQueueException;
import com.sun.corba.se.impl.logging.ORBUtilSystemException;
import com.sun.corba.se.impl.orbutil.ORBUtility;
/**
* @author Harold Carr
*/
class SelectorImpl
extends
Thread
implements
com.sun.corba.se.pept.transport.Selector
{
private ORB orb;
private Selector selector;
private long timeout;
private List deferredRegistrations;
private List interestOpsList;
private HashMap listenerThreads;
private Map readerThreads;
private boolean selectorStarted;
private volatile boolean closed;
private ORBUtilSystemException wrapper;
public SelectorImpl(ORB orb)
{
this.orb = orb;
selector = null;
selectorStarted = false;
timeout = 60000;
deferredRegistrations = new ArrayList();
interestOpsList = new ArrayList();
listenerThreads = new HashMap();
readerThreads = java.util.Collections.synchronizedMap(new HashMap());
closed = false;
wrapper = ORBUtilSystemException.get(orb,CORBALogDomains.RPC_TRANSPORT);
}
public void setTimeout(long timeout)
{
this.timeout = timeout;
}
public long getTimeout()
{
return timeout;
}
public void registerInterestOps(EventHandler eventHandler)
{
if (orb.transportDebugFlag) {
dprint(".registerInterestOps:-> " + eventHandler);
}
SelectionKey selectionKey = eventHandler.getSelectionKey();
if (selectionKey.isValid()) {
int ehOps = eventHandler.getInterestOps();
SelectionKeyAndOp keyAndOp = new SelectionKeyAndOp(selectionKey, ehOps);
synchronized(interestOpsList) {
interestOpsList.add(keyAndOp);
}
// tell Selector Thread there's an update to a SelectorKey's Ops
try {
if (selector != null) {
// wakeup Selector thread to process close request
selector.wakeup();
}
} catch (Throwable t) {
if (orb.transportDebugFlag) {
dprint(".registerInterestOps: selector.wakeup: ", t);
}
}
}
else {
wrapper.selectionKeyInvalid(eventHandler.toString());
if (orb.transportDebugFlag) {
dprint(".registerInterestOps: EventHandler SelectionKey not valid " + eventHandler);
}
}
if (orb.transportDebugFlag) {
dprint(".registerInterestOps:<- ");
}
}
public void registerForEvent(EventHandler eventHandler)
{
if (orb.transportDebugFlag) {
dprint(".registerForEvent: " + eventHandler);
}
if (isClosed()) {
if (orb.transportDebugFlag) {
dprint(".registerForEvent: closed: " + eventHandler);
}
return;
}
if (eventHandler.shouldUseSelectThreadToWait()) {
synchronized (deferredRegistrations) {
deferredRegistrations.add(eventHandler);
}
if (! selectorStarted) {
startSelector();
}
selector.wakeup();
return;
}
switch (eventHandler.getInterestOps()) {
case SelectionKey.OP_ACCEPT :
createListenerThread(eventHandler);
break;
case SelectionKey.OP_READ :
createReaderThread(eventHandler);
break;
default:
if (orb.transportDebugFlag) {
dprint(".registerForEvent: default: " + eventHandler);
}
throw new RuntimeException(
"SelectorImpl.registerForEvent: unknown interest ops");
}
}
public void unregisterForEvent(EventHandler eventHandler)
{
if (orb.transportDebugFlag) {
dprint(".unregisterForEvent: " + eventHandler);
}
if (isClosed()) {
if (orb.transportDebugFlag) {
dprint(".unregisterForEvent: closed: " + eventHandler);
}
return;
}
if (eventHandler.shouldUseSelectThreadToWait()) {
SelectionKey selectionKey ;
synchronized(deferredRegistrations) {
selectionKey = eventHandler.getSelectionKey();
}
if (selectionKey != null) {
selectionKey.cancel();
}
if (selector != null) {
selector.wakeup();
}
return;
}
switch (eventHandler.getInterestOps()) {
case SelectionKey.OP_ACCEPT :
destroyListenerThread(eventHandler);
break;
case SelectionKey.OP_READ :
destroyReaderThread(eventHandler);
break;
default:
if (orb.transportDebugFlag) {
dprint(".unregisterForEvent: default: " + eventHandler);
}
throw new RuntimeException(
"SelectorImpl.uregisterForEvent: unknown interest ops");
}
}
public void close()
{
if (orb.transportDebugFlag) {
dprint(".close");
}
if (isClosed()) {
if (orb.transportDebugFlag) {
dprint(".close: already closed");
}
return;
}
setClosed(true);
Iterator i;
// Kill listeners.
i = listenerThreads.values().iterator();
while (i.hasNext()) {
ListenerThread listenerThread = (ListenerThread) i.next();
listenerThread.close();
}
// Kill readers.
i = readerThreads.values().iterator();
while (i.hasNext()) {
ReaderThread readerThread = (ReaderThread) i.next();
readerThread.close();
}
clearDeferredRegistrations();
// Selector
try {
if (selector != null) {
// wakeup Selector thread to process close request
selector.wakeup();
}
} catch (Throwable t) {
if (orb.transportDebugFlag) {
dprint(".close: selector.wakeup: ", t);
}
}
}
///////////////////////////////////////////////////
//
// Thread methods.
//
public void run()
{
setName("SelectorThread");
while (!closed) {
try {
int n = 0;
if (timeout == 0 && orb.transportDebugFlag) {
dprint(".run: Beginning of selection cycle");
}
handleDeferredRegistrations();
enableInterestOps();
try {
n = selector.select(timeout);
} catch (IOException e) {
if (orb.transportDebugFlag) {
dprint(".run: selector.select: ", e);
}
} catch (ClosedSelectorException csEx) {
if (orb.transportDebugFlag) {
dprint(".run: selector.select: ", csEx);
}
break;
}
if (closed) {
break;
}
/*
if (timeout == 0 && orb.transportDebugFlag) {
dprint(".run: selector.select() returned: " + n);
}
if (n == 0) {
continue;
}
*/
Iterator iterator = selector.selectedKeys().iterator();
if (orb.transportDebugFlag) {
if (iterator.hasNext()) {
dprint(".run: n = " + n);
}
}
while (iterator.hasNext()) {
SelectionKey selectionKey = (SelectionKey) iterator.next();
iterator.remove();
EventHandler eventHandler = (EventHandler)
selectionKey.attachment();
try {
eventHandler.handleEvent();
} catch (Throwable t) {
if (orb.transportDebugFlag) {
dprint(".run: eventHandler.handleEvent", t);
}
}
}
if (timeout == 0 && orb.transportDebugFlag) {
dprint(".run: End of selection cycle");
}
} catch (Throwable t) {
// IMPORTANT: ignore all errors so the select thread keeps running.
// Otherwise a guaranteed hang.
if (orb.transportDebugFlag) {
dprint(".run: ignoring", t);
}
}
}
try {
if (selector != null) {
if (orb.transportDebugFlag) {
dprint(".run: selector.close ");
}
selector.close();
}
} catch (Throwable t) {
if (orb.transportDebugFlag) {
dprint(".run: selector.close: ", t);
}
}
}
/////////////////////////////////////////////////////
//
// Implementation.
//
private void clearDeferredRegistrations() {
synchronized (deferredRegistrations) {
int deferredListSize = deferredRegistrations.size();
if (orb.transportDebugFlag) {
dprint(".clearDeferredRegistrations:deferred list size == " + deferredListSize);
}
for (int i = 0; i < deferredListSize; i++) {
EventHandler eventHandler =
(EventHandler)deferredRegistrations.get(i);
if (orb.transportDebugFlag) {
dprint(".clearDeferredRegistrations: " + eventHandler);
}
SelectableChannel channel = eventHandler.getChannel();
SelectionKey selectionKey = null;
try {
if (orb.transportDebugFlag) {
dprint(".clearDeferredRegistrations:close channel == "
+ channel);
dprint(".clearDeferredRegistrations:close channel class == "
+ channel.getClass().getName());
}
channel.close();
selectionKey = eventHandler.getSelectionKey();
if (selectionKey != null) {
selectionKey.cancel();
selectionKey.attach(null);
}
} catch (IOException ioEx) {
if (orb.transportDebugFlag) {
dprint(".clearDeferredRegistrations: ", ioEx);
}
}
}
deferredRegistrations.clear();
}
}
private synchronized boolean isClosed ()
{
return closed;
}
private synchronized void setClosed(boolean closed)
{
this.closed = closed;
}
private void startSelector()
{
try {
selector = Selector.open();
} catch (IOException e) {
if (orb.transportDebugFlag) {
dprint(".startSelector: Selector.open: IOException: ", e);
}
// REVISIT - better handling/reporting
RuntimeException rte =
new RuntimeException(".startSelector: Selector.open exception");
rte.initCause(e);
throw rte;
}
setDaemon(true);
start();
selectorStarted = true;
if (orb.transportDebugFlag) {
dprint(".startSelector: selector.start completed.");
}
}
private void handleDeferredRegistrations()
{
synchronized (deferredRegistrations) {
int deferredListSize = deferredRegistrations.size();
for (int i = 0; i < deferredListSize; i++) {
EventHandler eventHandler =
(EventHandler)deferredRegistrations.get(i);
if (orb.transportDebugFlag) {
dprint(".handleDeferredRegistrations: " + eventHandler);
}
SelectableChannel channel = eventHandler.getChannel();
SelectionKey selectionKey = null;
try {
selectionKey =
channel.register(selector,
eventHandler.getInterestOps(),
(Object)eventHandler);
} catch (ClosedChannelException e) {
if (orb.transportDebugFlag) {
dprint(".handleDeferredRegistrations: ", e);
}
}
eventHandler.setSelectionKey(selectionKey);
}
deferredRegistrations.clear();
}
}
private void enableInterestOps()
{
synchronized (interestOpsList) {
int listSize = interestOpsList.size();
if (listSize > 0) {
if (orb.transportDebugFlag) {
dprint(".enableInterestOps:->");
}
SelectionKey selectionKey = null;
SelectionKeyAndOp keyAndOp = null;
int keyOp, selectionKeyOps = 0;
for (int i = 0; i < listSize; i++) {
keyAndOp = (SelectionKeyAndOp)interestOpsList.get(i);
selectionKey = keyAndOp.selectionKey;
// Need to check if the SelectionKey is valid because a
// connection's SelectionKey could be put on the list to
// have its OP enabled and before it's enabled be reclaimed.
// Otherwise, the enabling of the OP will throw an exception
// here and exit this method an potentially not enable all
// registered ops.
//
// So, we ignore SelectionKeys that are invalid. They will get
// cleaned up on the next Selector.select() call.
if (selectionKey.isValid()) {
if (orb.transportDebugFlag) {
dprint(".enableInterestOps: " + keyAndOp);
}
keyOp = keyAndOp.keyOp;
selectionKeyOps = selectionKey.interestOps();
selectionKey.interestOps(selectionKeyOps | keyOp);
}
}
interestOpsList.clear();
if (orb.transportDebugFlag) {
dprint(".enableInterestOps:<-");
}
}
}
}
private void createListenerThread(EventHandler eventHandler)
{
if (orb.transportDebugFlag) {
dprint(".createListenerThread: " + eventHandler);
}
Acceptor acceptor = eventHandler.getAcceptor();
ListenerThread listenerThread =
new ListenerThreadImpl(orb, acceptor, this);
listenerThreads.put(eventHandler, listenerThread);
Throwable throwable = null;
try {
orb.getThreadPoolManager().getThreadPool(0)
.getWorkQueue(0).addWork((Work)listenerThread);
} catch (NoSuchThreadPoolException e) {
throwable = e;
} catch (NoSuchWorkQueueException e) {
throwable = e;
}
if (throwable != null) {
RuntimeException rte = new RuntimeException(throwable.toString());
rte.initCause(throwable);
throw rte;
}
}
private void destroyListenerThread(EventHandler eventHandler)
{
if (orb.transportDebugFlag) {
dprint(".destroyListenerThread: " + eventHandler);
}
ListenerThread listenerThread = (ListenerThread)
listenerThreads.get(eventHandler);
if (listenerThread == null) {
if (orb.transportDebugFlag) {
dprint(".destroyListenerThread: cannot find ListenerThread - ignoring.");
}
return;
}
listenerThreads.remove(eventHandler);
listenerThread.close();
}
private void createReaderThread(EventHandler eventHandler)
{
if (orb.transportDebugFlag) {
dprint(".createReaderThread: " + eventHandler);
}
Connection connection = eventHandler.getConnection();
ReaderThread readerThread =
new ReaderThreadImpl(orb, connection, this);
readerThreads.put(eventHandler, readerThread);
Throwable throwable = null;
try {
orb.getThreadPoolManager().getThreadPool(0)
.getWorkQueue(0).addWork((Work)readerThread);
} catch (NoSuchThreadPoolException e) {
throwable = e;
} catch (NoSuchWorkQueueException e) {
throwable = e;
}
if (throwable != null) {
RuntimeException rte = new RuntimeException(throwable.toString());
rte.initCause(throwable);
throw rte;
}
}
private void destroyReaderThread(EventHandler eventHandler)
{
if (orb.transportDebugFlag) {
dprint(".destroyReaderThread: " + eventHandler);
}
ReaderThread readerThread = (ReaderThread)
readerThreads.get(eventHandler);
if (readerThread == null) {
if (orb.transportDebugFlag) {
dprint(".destroyReaderThread: cannot find ReaderThread - ignoring.");
}
return;
}
readerThreads.remove(eventHandler);
readerThread.close();
}
private void dprint(String msg)
{
ORBUtility.dprint("SelectorImpl", msg);
}
protected void dprint(String msg, Throwable t)
{
dprint(msg);
t.printStackTrace(System.out);
}
// Private class to contain a SelectionKey and a SelectionKey op.
// Used only by SelectorImpl to register and enable SelectionKey
// Op.
// REVISIT - Could do away with this class and use the EventHanlder
// directly.
private class SelectionKeyAndOp
{
// A SelectionKey.[OP_READ|OP_WRITE|OP_ACCEPT|OP_CONNECT]
public int keyOp;
public SelectionKey selectionKey;
// constructor
public SelectionKeyAndOp(SelectionKey selectionKey, int keyOp) {
this.selectionKey = selectionKey;
this.keyOp = keyOp;
}
}
// End of file.
}