blob: f7c1e314eec97ae1dd54e9a593e9f4ea64905d68 [file] [log] [blame]
/*
* Copyright (c) 2001, 2013, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package com.sun.corba.se.impl.transport;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.Collections;
import java.util.Hashtable;
import java.util.HashMap;
import java.util.Map;
import org.omg.CORBA.COMM_FAILURE;
import org.omg.CORBA.CompletionStatus;
import org.omg.CORBA.DATA_CONVERSION;
import org.omg.CORBA.INTERNAL;
import org.omg.CORBA.MARSHAL;
import org.omg.CORBA.OBJECT_NOT_EXIST;
import org.omg.CORBA.SystemException;
import com.sun.org.omg.SendingContext.CodeBase;
import com.sun.corba.se.pept.broker.Broker;
import com.sun.corba.se.pept.encoding.InputObject;
import com.sun.corba.se.pept.encoding.OutputObject;
import com.sun.corba.se.pept.protocol.MessageMediator;
import com.sun.corba.se.pept.transport.Acceptor;
import com.sun.corba.se.pept.transport.Connection;
import com.sun.corba.se.pept.transport.ConnectionCache;
import com.sun.corba.se.pept.transport.ContactInfo;
import com.sun.corba.se.pept.transport.EventHandler;
import com.sun.corba.se.pept.transport.InboundConnectionCache;
import com.sun.corba.se.pept.transport.OutboundConnectionCache;
import com.sun.corba.se.pept.transport.ResponseWaitingRoom;
import com.sun.corba.se.pept.transport.Selector;
import com.sun.corba.se.spi.ior.IOR;
import com.sun.corba.se.spi.ior.iiop.GIOPVersion;
import com.sun.corba.se.spi.logging.CORBALogDomains;
import com.sun.corba.se.spi.orb.ORB ;
import com.sun.corba.se.spi.orbutil.threadpool.NoSuchThreadPoolException;
import com.sun.corba.se.spi.orbutil.threadpool.NoSuchWorkQueueException;
import com.sun.corba.se.spi.orbutil.threadpool.Work;
import com.sun.corba.se.spi.protocol.CorbaMessageMediator;
import com.sun.corba.se.spi.transport.CorbaContactInfo;
import com.sun.corba.se.spi.transport.CorbaConnection;
import com.sun.corba.se.spi.transport.CorbaResponseWaitingRoom;
import com.sun.corba.se.spi.transport.ReadTimeouts;
import com.sun.corba.se.impl.encoding.CachedCodeBase;
import com.sun.corba.se.impl.encoding.CDRInputStream_1_0;
import com.sun.corba.se.impl.encoding.CDROutputObject;
import com.sun.corba.se.impl.encoding.CDROutputStream_1_0;
import com.sun.corba.se.impl.encoding.CodeSetComponentInfo;
import com.sun.corba.se.impl.encoding.OSFCodeSetRegistry;
import com.sun.corba.se.impl.logging.ORBUtilSystemException;
import com.sun.corba.se.impl.orbutil.ORBConstants;
import com.sun.corba.se.impl.orbutil.ORBUtility;
import com.sun.corba.se.impl.protocol.giopmsgheaders.Message;
import com.sun.corba.se.impl.protocol.giopmsgheaders.MessageBase;
import com.sun.corba.se.impl.transport.CorbaResponseWaitingRoomImpl;
/**
* @author Harold Carr
*/
public class SocketOrChannelConnectionImpl
extends
EventHandlerBase
implements
CorbaConnection,
Work
{
public static boolean dprintWriteLocks = false;
//
// New transport.
//
protected long enqueueTime;
protected SocketChannel socketChannel;
public SocketChannel getSocketChannel()
{
return socketChannel;
}
// REVISIT:
// protected for test: genericRPCMSGFramework.IIOPConnection constructor.
protected CorbaContactInfo contactInfo;
protected Acceptor acceptor;
protected ConnectionCache connectionCache;
//
// From iiop.Connection.java
//
protected Socket socket; // The socket used for this connection.
protected long timeStamp = 0;
protected boolean isServer = false;
// Start at some value other than zero since this is a magic
// value in some protocols.
protected int requestId = 5;
protected CorbaResponseWaitingRoom responseWaitingRoom;
protected int state;
protected java.lang.Object stateEvent = new java.lang.Object();
protected java.lang.Object writeEvent = new java.lang.Object();
protected boolean writeLocked;
protected int serverRequestCount = 0;
// Server request map: used on the server side of Connection
// Maps request ID to IIOPInputStream.
Map serverRequestMap = null;
// This is a flag associated per connection telling us if the
// initial set of sending contexts were sent to the receiver
// already...
protected boolean postInitialContexts = false;
// Remote reference to CodeBase server (supplies
// FullValueDescription, among other things)
protected IOR codeBaseServerIOR;
// CodeBase cache for this connection. This will cache remote operations,
// handle connecting, and ensure we don't do any remote operations until
// necessary.
protected CachedCodeBase cachedCodeBase = new CachedCodeBase(this);
protected ORBUtilSystemException wrapper ;
// transport read timeout values
protected ReadTimeouts readTimeouts;
protected boolean shouldReadGiopHeaderOnly;
// A message mediator used when shouldReadGiopHeaderOnly is
// true to maintain request message state across execution in a
// SelectorThread and WorkerThread.
protected CorbaMessageMediator partialMessageMediator = null;
// Used in genericRPCMSGFramework test.
protected SocketOrChannelConnectionImpl(ORB orb)
{
this.orb = orb;
wrapper = ORBUtilSystemException.get( orb,
CORBALogDomains.RPC_TRANSPORT ) ;
setWork(this);
responseWaitingRoom = new CorbaResponseWaitingRoomImpl(orb, this);
setReadTimeouts(orb.getORBData().getTransportTCPReadTimeouts());
}
// Both client and servers.
protected SocketOrChannelConnectionImpl(ORB orb,
boolean useSelectThreadToWait,
boolean useWorkerThread)
{
this(orb) ;
setUseSelectThreadToWait(useSelectThreadToWait);
setUseWorkerThreadForEvent(useWorkerThread);
}
// Client constructor.
public SocketOrChannelConnectionImpl(ORB orb,
CorbaContactInfo contactInfo,
boolean useSelectThreadToWait,
boolean useWorkerThread,
String socketType,
String hostname,
int port)
{
this(orb, useSelectThreadToWait, useWorkerThread);
this.contactInfo = contactInfo;
try {
socket = orb.getORBData().getSocketFactory()
.createSocket(socketType,
new InetSocketAddress(hostname, port));
socketChannel = socket.getChannel();
if (socketChannel != null) {
boolean isBlocking = !useSelectThreadToWait;
socketChannel.configureBlocking(isBlocking);
} else {
// IMPORTANT: non-channel-backed sockets must use
// dedicated reader threads.
setUseSelectThreadToWait(false);
}
if (orb.transportDebugFlag) {
dprint(".initialize: connection created: " + socket);
}
} catch (Throwable t) {
throw wrapper.connectFailure(t, socketType, hostname,
Integer.toString(port));
}
state = OPENING;
}
// Client-side convenience.
public SocketOrChannelConnectionImpl(ORB orb,
CorbaContactInfo contactInfo,
String socketType,
String hostname,
int port)
{
this(orb, contactInfo,
orb.getORBData().connectionSocketUseSelectThreadToWait(),
orb.getORBData().connectionSocketUseWorkerThreadForEvent(),
socketType, hostname, port);
}
// Server-side constructor.
public SocketOrChannelConnectionImpl(ORB orb,
Acceptor acceptor,
Socket socket,
boolean useSelectThreadToWait,
boolean useWorkerThread)
{
this(orb, useSelectThreadToWait, useWorkerThread);
this.socket = socket;
socketChannel = socket.getChannel();
if (socketChannel != null) {
// REVISIT
try {
boolean isBlocking = !useSelectThreadToWait;
socketChannel.configureBlocking(isBlocking);
} catch (IOException e) {
RuntimeException rte = new RuntimeException();
rte.initCause(e);
throw rte;
}
}
this.acceptor = acceptor;
serverRequestMap = Collections.synchronizedMap(new HashMap());
isServer = true;
state = ESTABLISHED;
}
// Server-side convenience
public SocketOrChannelConnectionImpl(ORB orb,
Acceptor acceptor,
Socket socket)
{
this(orb, acceptor, socket,
(socket.getChannel() == null
? false
: orb.getORBData().connectionSocketUseSelectThreadToWait()),
(socket.getChannel() == null
? false
: orb.getORBData().connectionSocketUseWorkerThreadForEvent()));
}
////////////////////////////////////////////////////
//
// framework.transport.Connection
//
public boolean shouldRegisterReadEvent()
{
return true;
}
public boolean shouldRegisterServerReadEvent()
{
return true;
}
public boolean read()
{
try {
if (orb.transportDebugFlag) {
dprint(".read->: " + this);
}
CorbaMessageMediator messageMediator = readBits();
if (messageMediator != null) {
// Null can happen when client closes stream
// causing purgecalls.
return dispatch(messageMediator);
}
return true;
} finally {
if (orb.transportDebugFlag) {
dprint(".read<-: " + this);
}
}
}
protected CorbaMessageMediator readBits()
{
try {
if (orb.transportDebugFlag) {
dprint(".readBits->: " + this);
}
MessageMediator messageMediator;
// REVISIT - use common factory base class.
if (contactInfo != null) {
messageMediator =
contactInfo.createMessageMediator(orb, this);
} else if (acceptor != null) {
messageMediator = acceptor.createMessageMediator(orb, this);
} else {
throw
new RuntimeException("SocketOrChannelConnectionImpl.readBits");
}
return (CorbaMessageMediator) messageMediator;
} catch (ThreadDeath td) {
if (orb.transportDebugFlag) {
dprint(".readBits: " + this + ": ThreadDeath: " + td, td);
}
try {
purgeCalls(wrapper.connectionAbort(td), false, false);
} catch (Throwable t) {
if (orb.transportDebugFlag) {
dprint(".readBits: " + this + ": purgeCalls: Throwable: " + t, t);
}
}
throw td;
} catch (Throwable ex) {
if (orb.transportDebugFlag) {
dprint(".readBits: " + this + ": Throwable: " + ex, ex);
}
try {
if (ex instanceof INTERNAL) {
sendMessageError(GIOPVersion.DEFAULT_VERSION);
}
} catch (IOException e) {
if (orb.transportDebugFlag) {
dprint(".readBits: " + this +
": sendMessageError: IOException: " + e, e);
}
}
// REVISIT - make sure reader thread is killed.
Selector selector = orb.getTransportManager().getSelector(0);
if (selector != null) {
selector.unregisterForEvent(this);
}
// Notify anyone waiting.
purgeCalls(wrapper.connectionAbort(ex), true, false);
// REVISIT
//keepRunning = false;
// REVISIT - if this is called after purgeCalls then
// the state of the socket is ABORT so the writeLock
// in close throws an exception. It is ignored but
// causes IBM (screen scraping) tests to fail.
//close();
} finally {
if (orb.transportDebugFlag) {
dprint(".readBits<-: " + this);
}
}
return null;
}
protected CorbaMessageMediator finishReadingBits(MessageMediator messageMediator)
{
try {
if (orb.transportDebugFlag) {
dprint(".finishReadingBits->: " + this);
}
// REVISIT - use common factory base class.
if (contactInfo != null) {
messageMediator =
contactInfo.finishCreatingMessageMediator(orb, this, messageMediator);
} else if (acceptor != null) {
messageMediator =
acceptor.finishCreatingMessageMediator(orb, this, messageMediator);
} else {
throw
new RuntimeException("SocketOrChannelConnectionImpl.finishReadingBits");
}
return (CorbaMessageMediator) messageMediator;
} catch (ThreadDeath td) {
if (orb.transportDebugFlag) {
dprint(".finishReadingBits: " + this + ": ThreadDeath: " + td, td);
}
try {
purgeCalls(wrapper.connectionAbort(td), false, false);
} catch (Throwable t) {
if (orb.transportDebugFlag) {
dprint(".finishReadingBits: " + this + ": purgeCalls: Throwable: " + t, t);
}
}
throw td;
} catch (Throwable ex) {
if (orb.transportDebugFlag) {
dprint(".finishReadingBits: " + this + ": Throwable: " + ex, ex);
}
try {
if (ex instanceof INTERNAL) {
sendMessageError(GIOPVersion.DEFAULT_VERSION);
}
} catch (IOException e) {
if (orb.transportDebugFlag) {
dprint(".finishReadingBits: " + this +
": sendMessageError: IOException: " + e, e);
}
}
// REVISIT - make sure reader thread is killed.
orb.getTransportManager().getSelector(0).unregisterForEvent(this);
// Notify anyone waiting.
purgeCalls(wrapper.connectionAbort(ex), true, false);
// REVISIT
//keepRunning = false;
// REVISIT - if this is called after purgeCalls then
// the state of the socket is ABORT so the writeLock
// in close throws an exception. It is ignored but
// causes IBM (screen scraping) tests to fail.
//close();
} finally {
if (orb.transportDebugFlag) {
dprint(".finishReadingBits<-: " + this);
}
}
return null;
}
protected boolean dispatch(CorbaMessageMediator messageMediator)
{
try {
if (orb.transportDebugFlag) {
dprint(".dispatch->: " + this);
}
//
// NOTE:
//
// This call is the transition from the tranport block
// to the protocol block.
//
boolean result =
messageMediator.getProtocolHandler()
.handleRequest(messageMediator);
return result;
} catch (ThreadDeath td) {
if (orb.transportDebugFlag) {
dprint(".dispatch: ThreadDeath", td );
}
try {
purgeCalls(wrapper.connectionAbort(td), false, false);
} catch (Throwable t) {
if (orb.transportDebugFlag) {
dprint(".dispatch: purgeCalls: Throwable", t);
}
}
throw td;
} catch (Throwable ex) {
if (orb.transportDebugFlag) {
dprint(".dispatch: Throwable", ex ) ;
}
try {
if (ex instanceof INTERNAL) {
sendMessageError(GIOPVersion.DEFAULT_VERSION);
}
} catch (IOException e) {
if (orb.transportDebugFlag) {
dprint(".dispatch: sendMessageError: IOException", e);
}
}
purgeCalls(wrapper.connectionAbort(ex), false, false);
// REVISIT
//keepRunning = false;
} finally {
if (orb.transportDebugFlag) {
dprint(".dispatch<-: " + this);
}
}
return true;
}
public boolean shouldUseDirectByteBuffers()
{
return getSocketChannel() != null;
}
public ByteBuffer read(int size, int offset, int length, long max_wait_time)
throws IOException
{
if (shouldUseDirectByteBuffers()) {
ByteBuffer byteBuffer =
orb.getByteBufferPool().getByteBuffer(size);
if (orb.transportDebugFlag) {
// print address of ByteBuffer gotten from pool
int bbAddress = System.identityHashCode(byteBuffer);
StringBuffer sb = new StringBuffer(80);
sb.append(".read: got ByteBuffer id (");
sb.append(bbAddress).append(") from ByteBufferPool.");
String msgStr = sb.toString();
dprint(msgStr);
}
byteBuffer.position(offset);
byteBuffer.limit(size);
readFully(byteBuffer, length, max_wait_time);
return byteBuffer;
}
byte[] buf = new byte[size];
readFully(getSocket().getInputStream(), buf,
offset, length, max_wait_time);
ByteBuffer byteBuffer = ByteBuffer.wrap(buf);
byteBuffer.limit(size);
return byteBuffer;
}
public ByteBuffer read(ByteBuffer byteBuffer, int offset,
int length, long max_wait_time)
throws IOException
{
int size = offset + length;
if (shouldUseDirectByteBuffers()) {
if (! byteBuffer.isDirect()) {
throw wrapper.unexpectedNonDirectByteBufferWithChannelSocket();
}
if (size > byteBuffer.capacity()) {
if (orb.transportDebugFlag) {
// print address of ByteBuffer being released
int bbAddress = System.identityHashCode(byteBuffer);
StringBuffer bbsb = new StringBuffer(80);
bbsb.append(".read: releasing ByteBuffer id (")
.append(bbAddress).append(") to ByteBufferPool.");
String bbmsg = bbsb.toString();
dprint(bbmsg);
}
orb.getByteBufferPool().releaseByteBuffer(byteBuffer);
byteBuffer = orb.getByteBufferPool().getByteBuffer(size);
}
byteBuffer.position(offset);
byteBuffer.limit(size);
readFully(byteBuffer, length, max_wait_time);
byteBuffer.position(0);
byteBuffer.limit(size);
return byteBuffer;
}
if (byteBuffer.isDirect()) {
throw wrapper.unexpectedDirectByteBufferWithNonChannelSocket();
}
byte[] buf = new byte[size];
readFully(getSocket().getInputStream(), buf,
offset, length, max_wait_time);
return ByteBuffer.wrap(buf);
}
public void readFully(ByteBuffer byteBuffer, int size, long max_wait_time)
throws IOException
{
int n = 0;
int bytecount = 0;
long time_to_wait = readTimeouts.get_initial_time_to_wait();
long total_time_in_wait = 0;
// The reading of data incorporates a strategy to detect a
// rogue client. The strategy is implemented as follows. As
// long as data is being read, at least 1 byte or more, we
// assume we have a well behaved client. If no data is read,
// then we sleep for a time to wait, re-calculate a new time to
// wait which is lengthier than the previous time spent waiting.
// Then, if the total time spent waiting does not exceed a
// maximum time we are willing to wait, we attempt another
// read. If the maximum amount of time we are willing to
// spend waiting for more data is exceeded, we throw an
// IOException.
// NOTE: Reading of GIOP headers are treated with a smaller
// maximum time to wait threshold. Based on extensive
// performance testing, all GIOP headers are being
// read in 1 read access.
do {
bytecount = getSocketChannel().read(byteBuffer);
if (bytecount < 0) {
throw new IOException("End-of-stream");
}
else if (bytecount == 0) {
try {
Thread.sleep(time_to_wait);
total_time_in_wait += time_to_wait;
time_to_wait =
(long)(time_to_wait*readTimeouts.get_backoff_factor());
}
catch (InterruptedException ie) {
// ignore exception
if (orb.transportDebugFlag) {
dprint("readFully(): unexpected exception "
+ ie.toString());
}
}
}
else {
n += bytecount;
}
}
while (n < size && total_time_in_wait < max_wait_time);
if (n < size && total_time_in_wait >= max_wait_time)
{
// failed to read entire message
throw wrapper.transportReadTimeoutExceeded(new Integer(size),
new Integer(n), new Long(max_wait_time),
new Long(total_time_in_wait));
}
getConnectionCache().stampTime(this);
}
// To support non-channel connections.
public void readFully(java.io.InputStream is, byte[] buf,
int offset, int size, long max_wait_time)
throws IOException
{
int n = 0;
int bytecount = 0;
long time_to_wait = readTimeouts.get_initial_time_to_wait();
long total_time_in_wait = 0;
// The reading of data incorporates a strategy to detect a
// rogue client. The strategy is implemented as follows. As
// long as data is being read, at least 1 byte or more, we
// assume we have a well behaved client. If no data is read,
// then we sleep for a time to wait, re-calculate a new time to
// wait which is lengthier than the previous time spent waiting.
// Then, if the total time spent waiting does not exceed a
// maximum time we are willing to wait, we attempt another
// read. If the maximum amount of time we are willing to
// spend waiting for more data is exceeded, we throw an
// IOException.
// NOTE: Reading of GIOP headers are treated with a smaller
// maximum time to wait threshold. Based on extensive
// performance testing, all GIOP headers are being
// read in 1 read access.
do {
bytecount = is.read(buf, offset + n, size - n);
if (bytecount < 0) {
throw new IOException("End-of-stream");
}
else if (bytecount == 0) {
try {
Thread.sleep(time_to_wait);
total_time_in_wait += time_to_wait;
time_to_wait =
(long)(time_to_wait*readTimeouts.get_backoff_factor());
}
catch (InterruptedException ie) {
// ignore exception
if (orb.transportDebugFlag) {
dprint("readFully(): unexpected exception "
+ ie.toString());
}
}
}
else {
n += bytecount;
}
}
while (n < size && total_time_in_wait < max_wait_time);
if (n < size && total_time_in_wait >= max_wait_time)
{
// failed to read entire message
throw wrapper.transportReadTimeoutExceeded(new Integer(size),
new Integer(n), new Long(max_wait_time),
new Long(total_time_in_wait));
}
getConnectionCache().stampTime(this);
}
public void write(ByteBuffer byteBuffer)
throws IOException
{
if (shouldUseDirectByteBuffers()) {
/* NOTE: cannot perform this test. If one ask for a
ByteBuffer from the pool which is bigger than the size
of ByteBuffers managed by the pool, then the pool will
return a HeapByteBuffer.
if (byteBuffer.hasArray()) {
throw wrapper.unexpectedNonDirectByteBufferWithChannelSocket();
}
*/
// IMPORTANT: For non-blocking SocketChannels, there's no guarantee
// all bytes are written on first write attempt.
do {
getSocketChannel().write(byteBuffer);
}
while (byteBuffer.hasRemaining());
} else {
if (! byteBuffer.hasArray()) {
throw wrapper.unexpectedDirectByteBufferWithNonChannelSocket();
}
byte[] tmpBuf = byteBuffer.array();
getSocket().getOutputStream().write(tmpBuf, 0, byteBuffer.limit());
getSocket().getOutputStream().flush();
}
// TimeStamp connection to indicate it has been used
// Note granularity of connection usage is assumed for
// now to be that of a IIOP packet.
getConnectionCache().stampTime(this);
}
/**
* Note:it is possible for this to be called more than once
*/
public synchronized void close()
{
try {
if (orb.transportDebugFlag) {
dprint(".close->: " + this);
}
writeLock();
// REVISIT It will be good to have a read lock on the reader thread
// before we proceed further, to avoid the reader thread (server side)
// from processing requests. This avoids the risk that a new request
// will be accepted by ReaderThread while the ListenerThread is
// attempting to close this connection.
if (isBusy()) { // we are busy!
writeUnlock();
if (orb.transportDebugFlag) {
dprint(".close: isBusy so no close: " + this);
}
return;
}
try {
try {
sendCloseConnection(GIOPVersion.V1_0);
} catch (Throwable t) {
wrapper.exceptionWhenSendingCloseConnection(t);
}
synchronized ( stateEvent ){
state = CLOSE_SENT;
stateEvent.notifyAll();
}
// stop the reader without causing it to do purgeCalls
//Exception ex = new Exception();
//reader.stop(ex); // REVISIT
// NOTE: !!!!!!
// This does writeUnlock().
purgeCalls(wrapper.connectionRebind(), false, true);
} catch (Exception ex) {
if (orb.transportDebugFlag) {
dprint(".close: exception: " + this, ex);
}
}
try {
Selector selector = orb.getTransportManager().getSelector(0);
if (selector != null) {
selector.unregisterForEvent(this);
}
if (socketChannel != null) {
socketChannel.close();
}
socket.close();
} catch (IOException e) {
if (orb.transportDebugFlag) {
dprint(".close: " + this, e);
}
}
closeConnectionResources();
} finally {
if (orb.transportDebugFlag) {
dprint(".close<-: " + this);
}
}
}
public void closeConnectionResources() {
if (orb.transportDebugFlag) {
dprint(".closeConnectionResources->: " + this);
}
Selector selector = orb.getTransportManager().getSelector(0);
if (selector != null) {
selector.unregisterForEvent(this);
}
try {
if (socketChannel != null)
socketChannel.close() ;
if (socket != null && !socket.isClosed())
socket.close() ;
} catch (IOException e) {
if (orb.transportDebugFlag) {
dprint( ".closeConnectionResources: " + this, e ) ;
}
}
if (orb.transportDebugFlag) {
dprint(".closeConnectionResources<-: " + this);
}
}
public Acceptor getAcceptor()
{
return acceptor;
}
public ContactInfo getContactInfo()
{
return contactInfo;
}
public EventHandler getEventHandler()
{
return this;
}
public OutputObject createOutputObject(MessageMediator messageMediator)
{
// REVISIT - remove this method from Connection and all it subclasses.
throw new RuntimeException("*****SocketOrChannelConnectionImpl.createOutputObject - should not be called.");
}
// This is used by the GIOPOutputObject in order to
// throw the correct error when handling code sets.
// Can we determine if we are on the server side by
// other means? XREVISIT
public boolean isServer()
{
return isServer;
}
public boolean isBusy()
{
if (serverRequestCount > 0 ||
getResponseWaitingRoom().numberRegistered() > 0)
{
return true;
} else {
return false;
}
}
public long getTimeStamp()
{
return timeStamp;
}
public void setTimeStamp(long time)
{
timeStamp = time;
}
public void setState(String stateString)
{
synchronized (stateEvent) {
if (stateString.equals("ESTABLISHED")) {
state = ESTABLISHED;
stateEvent.notifyAll();
} else {
// REVISIT: ASSERT
}
}
}
/**
* Sets the writeLock for this connection.
* If the writeLock is already set by someone else, block till the
* writeLock is released and can set by us.
* IMPORTANT: this connection's lock must be acquired before
* setting the writeLock and must be unlocked after setting the writeLock.
*/
public void writeLock()
{
try {
if (dprintWriteLocks && orb.transportDebugFlag) {
dprint(".writeLock->: " + this);
}
// Keep looping till we can set the writeLock.
while ( true ) {
int localState = state;
switch ( localState ) {
case OPENING:
synchronized (stateEvent) {
if (state != OPENING) {
// somebody has changed 'state' so be careful
break;
}
try {
stateEvent.wait();
} catch (InterruptedException ie) {
if (orb.transportDebugFlag) {
dprint(".writeLock: OPENING InterruptedException: " + this);
}
}
}
// Loop back
break;
case ESTABLISHED:
synchronized (writeEvent) {
if (!writeLocked) {
writeLocked = true;
return;
}
try {
// do not stay here too long if state != ESTABLISHED
// Bug 4752117
while (state == ESTABLISHED && writeLocked) {
writeEvent.wait(100);
}
} catch (InterruptedException ie) {
if (orb.transportDebugFlag) {
dprint(".writeLock: ESTABLISHED InterruptedException: " + this);
}
}
}
// Loop back
break;
//
// XXX
// Need to distinguish between client and server roles
// here probably.
//
case ABORT:
synchronized ( stateEvent ){
if (state != ABORT) {
break;
}
throw wrapper.writeErrorSend() ;
}
case CLOSE_RECVD:
// the connection has been closed or closing
// ==> throw rebind exception
synchronized ( stateEvent ){
if (state != CLOSE_RECVD) {
break;
}
throw wrapper.connectionCloseRebind() ;
}
default:
if (orb.transportDebugFlag) {
dprint(".writeLock: default: " + this);
}
// REVISIT
throw new RuntimeException(".writeLock: bad state");
}
}
} finally {
if (dprintWriteLocks && orb.transportDebugFlag) {
dprint(".writeLock<-: " + this);
}
}
}
public void writeUnlock()
{
try {
if (dprintWriteLocks && orb.transportDebugFlag) {
dprint(".writeUnlock->: " + this);
}
synchronized (writeEvent) {
writeLocked = false;
writeEvent.notify(); // wake up one guy waiting to write
}
} finally {
if (dprintWriteLocks && orb.transportDebugFlag) {
dprint(".writeUnlock<-: " + this);
}
}
}
// Assumes the caller handles writeLock and writeUnlock
public void sendWithoutLock(OutputObject outputObject)
{
// Don't we need to check for CloseConnection
// here? REVISIT
// XREVISIT - Shouldn't the MessageMediator
// be the one to handle writing the data here?
try {
// Write the fragment/message
CDROutputObject cdrOutputObject = (CDROutputObject) outputObject;
cdrOutputObject.writeTo(this);
// REVISIT - no flush?
//socket.getOutputStream().flush();
} catch (IOException e1) {
/*
* ADDED(Ram J) 10/13/2000 In the event of an IOException, try
* sending a CancelRequest for regular requests / locate requests
*/
// Since IIOPOutputStream's msgheader is set only once, and not
// altered during sending multiple fragments, the original
// msgheader will always have the requestId.
// REVISIT This could be optimized to send a CancelRequest only
// if any fragments had been sent already.
/* REVISIT: MOVE TO SUBCONTRACT
Message msg = os.getMessage();
if (msg.getType() == Message.GIOPRequest ||
msg.getType() == Message.GIOPLocateRequest) {
GIOPVersion requestVersion = msg.getGIOPVersion();
int requestId = MessageBase.getRequestId(msg);
try {
sendCancelRequest(requestVersion, requestId);
} catch (IOException e2) {
// most likely an abortive connection closure.
// ignore, since nothing more can be done.
if (orb.transportDebugFlag) {
}
}
*/
// REVISIT When a send failure happens, purgeCalls() need to be
// called to ensure that the connection is properly removed from
// further usage (ie., cancelling pending requests with COMM_FAILURE
// with an appropriate minor_code CompletionStatus.MAY_BE).
// Relying on the IIOPOutputStream (as noted below) is not
// sufficient as it handles COMM_FAILURE only for the final
// fragment (during invoke processing). Note that COMM_FAILURE could
// happen while sending the initial fragments.
// Also the IIOPOutputStream does not properly close the connection.
// It simply removes the connection from the table. An orderly
// closure is needed (ie., cancel pending requests on the connection
// COMM_FAILURE as well.
// IIOPOutputStream will cleanup the connection info when it
// sees this exception.
SystemException exc = wrapper.writeErrorSend(e1);
purgeCalls(exc, false, true);
throw exc;
}
}
public void registerWaiter(MessageMediator messageMediator)
{
responseWaitingRoom.registerWaiter(messageMediator);
}
public void unregisterWaiter(MessageMediator messageMediator)
{
responseWaitingRoom.unregisterWaiter(messageMediator);
}
public InputObject waitForResponse(MessageMediator messageMediator)
{
return responseWaitingRoom.waitForResponse(messageMediator);
}
public void setConnectionCache(ConnectionCache connectionCache)
{
this.connectionCache = connectionCache;
}
public ConnectionCache getConnectionCache()
{
return connectionCache;
}
////////////////////////////////////////////////////
//
// EventHandler methods
//
public void setUseSelectThreadToWait(boolean x)
{
useSelectThreadToWait = x;
// REVISIT - Reading of a GIOP header only is information
// that should be passed into the constructor
// from the SocketOrChannelConnection factory.
setReadGiopHeaderOnly(shouldUseSelectThreadToWait());
}
public void handleEvent()
{
if (orb.transportDebugFlag) {
dprint(".handleEvent->: " + this);
}
getSelectionKey().interestOps(getSelectionKey().interestOps() &
(~ getInterestOps()));
if (shouldUseWorkerThreadForEvent()) {
Throwable throwable = null;
try {
int poolToUse = 0;
if (shouldReadGiopHeaderOnly()) {
partialMessageMediator = readBits();
poolToUse =
partialMessageMediator.getThreadPoolToUse();
}
if (orb.transportDebugFlag) {
dprint(".handleEvent: addWork to pool: " + poolToUse);
}
orb.getThreadPoolManager().getThreadPool(poolToUse)
.getWorkQueue(0).addWork(getWork());
} catch (NoSuchThreadPoolException e) {
throwable = e;
} catch (NoSuchWorkQueueException e) {
throwable = e;
}
// REVISIT: need to close connection.
if (throwable != null) {
if (orb.transportDebugFlag) {
dprint(".handleEvent: " + throwable);
}
INTERNAL i = new INTERNAL("NoSuchThreadPoolException");
i.initCause(throwable);
throw i;
}
} else {
if (orb.transportDebugFlag) {
dprint(".handleEvent: doWork");
}
getWork().doWork();
}
if (orb.transportDebugFlag) {
dprint(".handleEvent<-: " + this);
}
}
public SelectableChannel getChannel()
{
return socketChannel;
}
public int getInterestOps()
{
return SelectionKey.OP_READ;
}
// public Acceptor getAcceptor() - already defined above.
public Connection getConnection()
{
return this;
}
////////////////////////////////////////////////////
//
// Work methods.
//
public String getName()
{
return this.toString();
}
public void doWork()
{
try {
if (orb.transportDebugFlag) {
dprint(".doWork->: " + this);
}
// IMPORTANT: Sanity checks on SelectionKeys such as
// SelectorKey.isValid() should not be done
// here.
//
if (!shouldReadGiopHeaderOnly()) {
read();
}
else {
// get the partialMessageMediator
// created by SelectorThread
CorbaMessageMediator messageMediator =
this.getPartialMessageMediator();
// read remaining info needed in a MessageMediator
messageMediator = finishReadingBits(messageMediator);
if (messageMediator != null) {
// Null can happen when client closes stream
// causing purgecalls.
dispatch(messageMediator);
}
}
} catch (Throwable t) {
if (orb.transportDebugFlag) {
dprint(".doWork: ignoring Throwable: "
+ t
+ " " + this);
}
} finally {
if (orb.transportDebugFlag) {
dprint(".doWork<-: " + this);
}
}
}
public void setEnqueueTime(long timeInMillis)
{
enqueueTime = timeInMillis;
}
public long getEnqueueTime()
{
return enqueueTime;
}
////////////////////////////////////////////////////
//
// spi.transport.CorbaConnection.
//
// IMPORTANT: Reader Threads must NOT read Giop header only.
public boolean shouldReadGiopHeaderOnly() {
return shouldReadGiopHeaderOnly;
}
protected void setReadGiopHeaderOnly(boolean shouldReadHeaderOnly) {
shouldReadGiopHeaderOnly = shouldReadHeaderOnly;
}
public ResponseWaitingRoom getResponseWaitingRoom()
{
return responseWaitingRoom;
}
// REVISIT - inteface defines isServer but already defined in
// higher interface.
public void serverRequestMapPut(int requestId,
CorbaMessageMediator messageMediator)
{
serverRequestMap.put(new Integer(requestId), messageMediator);
}
public CorbaMessageMediator serverRequestMapGet(int requestId)
{
return (CorbaMessageMediator)
serverRequestMap.get(new Integer(requestId));
}
public void serverRequestMapRemove(int requestId)
{
serverRequestMap.remove(new Integer(requestId));
}
// REVISIT: this is also defined in:
// com.sun.corba.se.spi.legacy.connection.Connection
public java.net.Socket getSocket()
{
return socket;
}
/** It is possible for a Close Connection to have been
** sent here, but we will not check for this. A "lazy"
** Exception will be thrown in the Worker thread after the
** incoming request has been processed even though the connection
** is closed before the request is processed. This is o.k because
** it is a boundary condition. To prevent it we would have to add
** more locks which would reduce performance in the normal case.
**/
public synchronized void serverRequestProcessingBegins()
{
serverRequestCount++;
}
public synchronized void serverRequestProcessingEnds()
{
serverRequestCount--;
}
//
//
//
public synchronized int getNextRequestId()
{
return requestId++;
}
// Negotiated code sets for char and wchar data
protected CodeSetComponentInfo.CodeSetContext codeSetContext = null;
public ORB getBroker()
{
return orb;
}
public CodeSetComponentInfo.CodeSetContext getCodeSetContext() {
// Needs to be synchronized for the following case when the client
// doesn't send the code set context twice, and we have two threads
// in ServerRequestDispatcher processCodeSetContext.
//
// Thread A checks to see if there is a context, there is none, so
// it calls setCodeSetContext, getting the synch lock.
// Thread B checks to see if there is a context. If we didn't synch,
// it might decide to outlaw wchar/wstring.
if (codeSetContext == null) {
synchronized(this) {
return codeSetContext;
}
}
return codeSetContext;
}
public synchronized void setCodeSetContext(CodeSetComponentInfo.CodeSetContext csc) {
// Double check whether or not we need to do this
if (codeSetContext == null) {
if (OSFCodeSetRegistry.lookupEntry(csc.getCharCodeSet()) == null ||
OSFCodeSetRegistry.lookupEntry(csc.getWCharCodeSet()) == null) {
// If the client says it's negotiated a code set that
// isn't a fallback and we never said we support, then
// it has a bug.
throw wrapper.badCodesetsFromClient() ;
}
codeSetContext = csc;
}
}
//
// from iiop.IIOPConnection.java
//
// Map request ID to an InputObject.
// This is so the client thread can start unmarshaling
// the reply and remove it from the out_calls map while the
// ReaderThread can still obtain the input stream to give
// new fragments. Only the ReaderThread touches the clientReplyMap,
// so it doesn't incur synchronization overhead.
public MessageMediator clientRequestMapGet(int requestId)
{
return responseWaitingRoom.getMessageMediator(requestId);
}
protected MessageMediator clientReply_1_1;
public void clientReply_1_1_Put(MessageMediator x)
{
clientReply_1_1 = x;
}
public MessageMediator clientReply_1_1_Get()
{
return clientReply_1_1;
}
public void clientReply_1_1_Remove()
{
clientReply_1_1 = null;
}
protected MessageMediator serverRequest_1_1;
public void serverRequest_1_1_Put(MessageMediator x)
{
serverRequest_1_1 = x;
}
public MessageMediator serverRequest_1_1_Get()
{
return serverRequest_1_1;
}
public void serverRequest_1_1_Remove()
{
serverRequest_1_1 = null;
}
protected String getStateString( int state )
{
synchronized ( stateEvent ){
switch (state) {
case OPENING : return "OPENING" ;
case ESTABLISHED : return "ESTABLISHED" ;
case CLOSE_SENT : return "CLOSE_SENT" ;
case CLOSE_RECVD : return "CLOSE_RECVD" ;
case ABORT : return "ABORT" ;
default : return "???" ;
}
}
}
public synchronized boolean isPostInitialContexts() {
return postInitialContexts;
}
// Can never be unset...
public synchronized void setPostInitialContexts(){
postInitialContexts = true;
}
/**
* Wake up the outstanding requests on the connection, and hand them
* COMM_FAILURE exception with a given minor code.
*
* Also, delete connection from connection table and
* stop the reader thread.
* Note that this should only ever be called by the Reader thread for
* this connection.
*
* @param minor_code The minor code for the COMM_FAILURE major code.
* @param die Kill the reader thread (this thread) before exiting.
*/
public void purgeCalls(SystemException systemException,
boolean die, boolean lockHeld)
{
int minor_code = systemException.minor;
try{
if (orb.transportDebugFlag) {
dprint(".purgeCalls->: "
+ minor_code + "/" + die + "/" + lockHeld
+ " " + this);
}
// If this invocation is a result of ThreadDeath caused
// by a previous execution of this routine, just exit.
synchronized ( stateEvent ){
if ((state == ABORT) || (state == CLOSE_RECVD)) {
if (orb.transportDebugFlag) {
dprint(".purgeCalls: exiting since state is: "
+ getStateString(state)
+ " " + this);
}
return;
}
}
// Grab the writeLock (freeze the calls)
try {
if (!lockHeld) {
writeLock();
}
} catch (SystemException ex) {
if (orb.transportDebugFlag)
dprint(".purgeCalls: SystemException" + ex
+ "; continuing " + this);
}
// Mark the state of the connection
// and determine the request status
org.omg.CORBA.CompletionStatus completion_status;
synchronized ( stateEvent ){
if (minor_code == ORBUtilSystemException.CONNECTION_REBIND) {
state = CLOSE_RECVD;
systemException.completed = CompletionStatus.COMPLETED_NO;
} else {
state = ABORT;
systemException.completed = CompletionStatus.COMPLETED_MAYBE;
}
stateEvent.notifyAll();
}
try {
socket.getInputStream().close();
socket.getOutputStream().close();
socket.close();
} catch (Exception ex) {
if (orb.transportDebugFlag) {
dprint(".purgeCalls: Exception closing socket: " + ex
+ " " + this);
}
}
// Signal all threads with outstanding requests on this
// connection and give them the SystemException;
responseWaitingRoom.signalExceptionToAllWaiters(systemException);
} finally {
if (contactInfo != null) {
((OutboundConnectionCache)getConnectionCache()).remove(contactInfo);
} else if (acceptor != null) {
((InboundConnectionCache)getConnectionCache()).remove(this);
}
//
// REVISIT: Stop the reader thread
//
// Signal all the waiters of the writeLock.
// There are 4 types of writeLock waiters:
// 1. Send waiters:
// 2. SendReply waiters:
// 3. cleanUp waiters:
// 4. purge_call waiters:
//
writeUnlock();
if (orb.transportDebugFlag) {
dprint(".purgeCalls<-: "
+ minor_code + "/" + die + "/" + lockHeld
+ " " + this);
}
}
}
/*************************************************************************
* The following methods are for dealing with Connection cleaning for
* better scalability of servers in high network load conditions.
**************************************************************************/
public void sendCloseConnection(GIOPVersion giopVersion)
throws IOException
{
Message msg = MessageBase.createCloseConnection(giopVersion);
sendHelper(giopVersion, msg);
}
public void sendMessageError(GIOPVersion giopVersion)
throws IOException
{
Message msg = MessageBase.createMessageError(giopVersion);
sendHelper(giopVersion, msg);
}
/**
* Send a CancelRequest message. This does not lock the connection, so the
* caller needs to ensure this method is called appropriately.
* @exception IOException - could be due to abortive connection closure.
*/
public void sendCancelRequest(GIOPVersion giopVersion, int requestId)
throws IOException
{
Message msg = MessageBase.createCancelRequest(giopVersion, requestId);
sendHelper(giopVersion, msg);
}
protected void sendHelper(GIOPVersion giopVersion, Message msg)
throws IOException
{
// REVISIT: See comments in CDROutputObject constructor.
CDROutputObject outputObject =
sun.corba.OutputStreamFactory.newCDROutputObject((ORB)orb, null, giopVersion,
this, msg, ORBConstants.STREAM_FORMAT_VERSION_1);
msg.write(outputObject);
outputObject.writeTo(this);
}
public void sendCancelRequestWithLock(GIOPVersion giopVersion,
int requestId)
throws IOException
{
writeLock();
try {
sendCancelRequest(giopVersion, requestId);
} finally {
writeUnlock();
}
}
// Begin Code Base methods ---------------------------------------
//
// Set this connection's code base IOR. The IOR comes from the
// SendingContext. This is an optional service context, but all
// JavaSoft ORBs send it.
//
// The set and get methods don't need to be synchronized since the
// first possible get would occur during reading a valuetype, and
// that would be after the set.
// Sets this connection's code base IOR. This is done after
// getting the IOR out of the SendingContext service context.
// Our ORBs always send this, but it's optional in CORBA.
public final void setCodeBaseIOR(IOR ior) {
codeBaseServerIOR = ior;
}
public final IOR getCodeBaseIOR() {
return codeBaseServerIOR;
}
// Get a CodeBase stub to use in unmarshaling. The CachedCodeBase
// won't connect to the remote codebase unless it's necessary.
public final CodeBase getCodeBase() {
return cachedCodeBase;
}
// End Code Base methods -----------------------------------------
// set transport read thresholds
protected void setReadTimeouts(ReadTimeouts readTimeouts) {
this.readTimeouts = readTimeouts;
}
protected void setPartialMessageMediator(CorbaMessageMediator messageMediator) {
partialMessageMediator = messageMediator;
}
protected CorbaMessageMediator getPartialMessageMediator() {
return partialMessageMediator;
}
public String toString()
{
synchronized ( stateEvent ){
return
"SocketOrChannelConnectionImpl[" + " "
+ (socketChannel == null ?
socket.toString() : socketChannel.toString()) + " "
+ getStateString( state ) + " "
+ shouldUseSelectThreadToWait() + " "
+ shouldUseWorkerThreadForEvent() + " "
+ shouldReadGiopHeaderOnly()
+ "]" ;
}
}
// Must be public - used in encoding.
public void dprint(String msg)
{
ORBUtility.dprint("SocketOrChannelConnectionImpl", msg);
}
protected void dprint(String msg, Throwable t)
{
dprint(msg);
t.printStackTrace(System.out);
}
}
// End of file.