001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.invocation.pooled.interfaces;
023:
024: import java.io.IOException;
025: import java.io.Externalizable;
026: import java.io.ObjectInput;
027: import java.io.ObjectOutput;
028: import java.io.BufferedOutputStream;
029: import java.io.BufferedInputStream;
030: import java.io.ObjectInputStream;
031: import java.io.ObjectOutputStream;
032: import java.io.EOFException;
033: import java.io.OptionalDataException;
034: import java.io.UnsupportedEncodingException;
035: import java.io.InterruptedIOException;
036: import java.net.Socket;
037: import java.net.SocketException;
038: import java.rmi.MarshalledObject;
039: import java.rmi.NoSuchObjectException;
040: import java.rmi.ServerException;
041: import java.rmi.ConnectException;
042: import java.util.Iterator;
043: import java.util.Map;
044: import java.util.List;
045: import java.util.LinkedList;
046:
047: import javax.transaction.TransactionRolledbackException;
048: import javax.transaction.SystemException;
049: import javax.net.ssl.SSLSocket;
050: import javax.net.ssl.HandshakeCompletedListener;
051: import javax.net.ssl.HandshakeCompletedEvent;
052: import javax.net.ssl.SSLException;
053:
054: import org.jboss.invocation.Invocation;
055: import org.jboss.invocation.Invoker;
056: import org.jboss.tm.TransactionPropagationContextFactory;
057: import org.jboss.logging.Logger;
058: import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap;
059:
060: /**
061: * Client socket connections are pooled to avoid the overhead of
062: * making a connection. RMI seems to do a new connection with each
063: * request.
064: *
065: * @author <a href="mailto:bill@jboss.org">Bill Burke</a>
066: * @author Scott.Stark@jboss.org
067: * @version $Revision: 60313 $
068: */
069: public class PooledInvokerProxy implements Invoker, Externalizable {
070: // Attributes ----------------------------------------------------
071: private static final Logger log = Logger
072: .getLogger(PooledInvokerProxy.class);
073: /** The serialVersionUID @since 1.1.4.3 */
074: private static final long serialVersionUID = -1456509931095566410L;
075: /** The current wire format we write */
076: private static final int WIRE_VERSION = 1;
077:
078: /**
079: * Factory for transaction propagation contexts.
080: *
081: * @todo marcf remove all transaction spill from here
082: *
083: * When set to a non-null value, it is used to get transaction
084: * propagation contexts for remote method invocations.
085: * If <code>null</code>, transactions are not propagated on
086: * remote method invocations.
087: */
088: protected static TransactionPropagationContextFactory tpcFactory = null;
089:
090: // @todo: MOVE TO TRANSACTION
091: //
092: // TPC factory
093: public static void setTPCFactory(
094: TransactionPropagationContextFactory tpcf) {
095: tpcFactory = tpcf;
096: }
097:
098: // Simple performance measurements, not thread safe
099: public static long getSocketTime = 0;
100: public static long readTime = 0;
101: public static long writeTime = 0;
102: public static long serializeTime = 0;
103: public static long deserializeTime = 0;
104: /** The number of times a connection has been obtained from a pool */
105: public static long usedPooled = 0;
106: /** The number of connections in use */
107: private static int inUseCount = 0;
108: /** The number of socket connections made */
109: private static long socketConnectCount = 0;
110: /** The number of socket close calls made */
111: private static long socketCloseCount = 0;
112:
113: /**
114: * Set number of retries in getSocket method
115: */
116: public static int MAX_RETRIES = 10;
117:
118: /** A class wide pool Map<ServerAddres, LinkedList<ClientSocket>> */
119: protected static final Map connectionPools = new ConcurrentReaderHashMap();
120:
121: /**
122: * connection information
123: */
124: protected ServerAddress address;
125:
126: /**
127: * Pool for this invoker. This is shared between all
128: * instances of proxies attached to a specific invoker
129: * This should not be serializable, but is for backward compatibility.
130: */
131: protected LinkedList pool = null;
132: /** */
133: protected int maxPoolSize;
134: /** The number of times to retry after seeing a ConnectionException */
135: protected int retryCount = 1;
136: /** The logging trace flag */
137: private transient boolean trace;
138:
139: /**
140: * An encapsulation of a client connection
141: */
142: protected static class ClientSocket implements
143: HandshakeCompletedListener {
144: public ObjectOutputStream out;
145: public ObjectInputStream in;
146: public Socket socket;
147: public int timeout;
148: public String sessionID;
149: private boolean handshakeComplete = false;
150: private boolean trace;
151:
152: public ClientSocket(Socket socket, int timeout)
153: throws Exception {
154: this .socket = socket;
155: trace = log.isTraceEnabled();
156: boolean needHandshake = false;
157:
158: if (socket instanceof SSLSocket) {
159: SSLSocket ssl = (SSLSocket) socket;
160: ssl.addHandshakeCompletedListener(this );
161: if (trace)
162: log.trace("Starting SSL handshake");
163: needHandshake = true;
164: handshakeComplete = false;
165: ssl.startHandshake();
166: }
167: socket.setSoTimeout(timeout);
168: this .timeout = timeout;
169: out = new OptimizedObjectOutputStream(
170: new BufferedOutputStream(socket.getOutputStream()));
171: out.flush();
172: in = new OptimizedObjectInputStream(
173: new BufferedInputStream(socket.getInputStream()));
174: if (needHandshake) {
175: // Loop waiting for the handshake to complete
176: socket.setSoTimeout(1000);
177: for (int n = 0; handshakeComplete == false && n < 60; n++) {
178: try {
179: int b = in.read();
180: } catch (SSLException e) {
181: if (trace)
182: log
183: .trace(
184: "Error while waiting for handshake to complete",
185: e);
186: throw e;
187: } catch (IOException e) {
188: if (trace)
189: log.trace("Handshaked read()", e);
190: }
191: }
192: if (handshakeComplete == false)
193: throw new SSLException(
194: "Handshaked failed to complete in 60 seconds");
195: // Restore the original timeout
196: socket.setSoTimeout(timeout);
197: }
198:
199: }
200:
201: public void handshakeCompleted(HandshakeCompletedEvent event) {
202: handshakeComplete = true;
203: byte[] id = event.getSession().getId();
204: try {
205: sessionID = new String(id, "UTF-8");
206: } catch (UnsupportedEncodingException e) {
207: log
208: .warn(
209: "Failed to create session id using UTF-8, using default",
210: e);
211: sessionID = new String(id);
212: }
213: if (trace) {
214: log.trace("handshakeCompleted, event=" + event
215: + ", sessionID=" + sessionID);
216: }
217: }
218:
219: public String toString() {
220: StringBuffer tmp = new StringBuffer("ClientSocket@");
221: tmp.append(System.identityHashCode(this ));
222: tmp.append('[');
223: tmp.append("socket=");
224: tmp.append(socket.toString());
225: tmp.append(']');
226: return tmp.toString();
227: }
228:
229: /**
230: * @todo should this be handled with weak references as this should
231: * work better with gc
232: */
233: protected void finalize() {
234: if (socket != null) {
235: if (trace)
236: log.trace("Closing socket in finalize: " + socket);
237: try {
238: socketCloseCount--;
239: socket.close();
240: } catch (Exception ignored) {
241: } finally {
242: socket = null;
243: }
244: }
245: }
246: }
247:
248: /**
249: * Clear all class level stats
250: */
251: public static void clearStats() {
252: getSocketTime = 0;
253: readTime = 0;
254: writeTime = 0;
255: serializeTime = 0;
256: deserializeTime = 0;
257: usedPooled = 0;
258: }
259:
260: /**
261: * @return the active number of client connections
262: */
263: public static long getInUseCount() {
264: return inUseCount;
265: }
266:
267: /**
268: * @return the number of times a connection was returned from a pool
269: */
270: public static long getUsedPooled() {
271: return usedPooled;
272: }
273:
274: public static long getSocketConnectCount() {
275: return socketConnectCount;
276: }
277:
278: public static long getSocketCloseCount() {
279: return socketCloseCount;
280: }
281:
282: /**
283: * @return the total number of pooled connections across all ServerAddresses
284: */
285: public static int getTotalPoolCount() {
286: int count = 0;
287: Iterator iter = connectionPools.values().iterator();
288: while (iter.hasNext()) {
289: List pool = (List) iter.next();
290: if (pool != null)
291: count += pool.size();
292: }
293: return count;
294: }
295:
296: /**
297: * @return the proxy local pool count
298: */
299: public long getPoolCount() {
300: return pool.size();
301: }
302:
303: /**
304: * Exposed for externalization.
305: */
306: public PooledInvokerProxy() {
307: super ();
308: trace = log.isTraceEnabled();
309: }
310:
311: /**
312: * Create a new Proxy.
313: *
314: */
315: public PooledInvokerProxy(ServerAddress sa, int maxPoolSize) {
316: this (sa, maxPoolSize, MAX_RETRIES);
317: }
318:
319: public PooledInvokerProxy(ServerAddress sa, int maxPoolSize,
320: int retryCount) {
321: this .address = sa;
322: this .maxPoolSize = maxPoolSize;
323: this .retryCount = retryCount;
324: }
325:
326: /**
327: * Close all sockets in a specific pool.
328: */
329: public static void clearPool(ServerAddress sa) {
330: boolean trace = log.isTraceEnabled();
331: if (trace)
332: log.trace("clearPool, sa: " + sa);
333: try {
334: LinkedList thepool = (LinkedList) connectionPools.get(sa);
335: if (thepool == null)
336: return;
337: synchronized (thepool) {
338: int size = thepool.size();
339: for (int i = 0; i < size; i++) {
340: ClientSocket cs = null;
341: try {
342: ClientSocket socket = (ClientSocket) thepool
343: .removeFirst();
344: cs = socket;
345: if (trace)
346: log.trace("Closing, ClientSocket: "
347: + socket);
348: socketCloseCount--;
349: socket.socket.close();
350: } catch (Exception ignored) {
351: } finally {
352: if (cs != null)
353: cs.socket = null;
354: }
355: }
356: }
357: } catch (Exception ex) {
358: // ignored
359: }
360: }
361:
362: /**
363: * Close all sockets in all pools
364: */
365: public static void clearPools() {
366: synchronized (connectionPools) {
367: Iterator it = connectionPools.keySet().iterator();
368: while (it.hasNext()) {
369: ServerAddress sa = (ServerAddress) it.next();
370: clearPool(sa);
371: }
372: }
373: }
374:
375: protected void initPool() {
376: synchronized (connectionPools) {
377: pool = (LinkedList) connectionPools.get(address);
378: if (pool == null) {
379: pool = new LinkedList();
380: connectionPools.put(address, pool);
381: }
382: }
383: }
384:
385: protected ClientSocket getConnection() throws Exception {
386: Socket socket = null;
387: ClientSocket cs = null;
388:
389: //
390: // Need to retry a few times
391: // on socket connection because, at least on Windoze,
392: // if too many concurrent threads try to connect
393: // at same time, you get ConnectionRefused
394: //
395: // Retrying seems to be the most performant.
396: //
397: // This problem always happens with RMI and seems to
398: // have nothing to do with backlog or number of threads
399: // waiting in accept() on the server.
400: //
401: for (int i = 0; i < retryCount; i++) {
402: ClientSocket pooled = getPooledConnection();
403: if (pooled != null) {
404: usedPooled++;
405: inUseCount++;
406: return pooled;
407: }
408:
409: try {
410: if (trace) {
411: log.trace("Connecting to addr: " + address.address
412: + ", port: " + address.port
413: + ",clientSocketFactory: "
414: + address.clientSocketFactory
415: + ",enableTcpNoDelay: "
416: + address.enableTcpNoDelay + ",timeout: "
417: + address.timeout);
418: }
419: if (address.clientSocketFactory != null)
420: socket = address.clientSocketFactory.createSocket(
421: address.address, address.port);
422: else
423: socket = new Socket(address.address, address.port);
424: socketConnectCount++;
425: if (trace)
426: log.trace("Connected, socket=" + socket);
427:
428: socket.setTcpNoDelay(address.enableTcpNoDelay);
429: cs = new ClientSocket(socket, address.timeout);
430: inUseCount++;
431: if (trace) {
432: log.trace("New ClientSocket: " + cs
433: + ", usedPooled=" + usedPooled
434: + ", inUseCount=" + inUseCount
435: + ", socketConnectCount="
436: + socketConnectCount
437: + ", socketCloseCount=" + socketCloseCount);
438: }
439: break;
440: } catch (Exception ex) {
441: if (ex instanceof InterruptedIOException
442: || ex instanceof SocketException) {
443: if (trace)
444: log.trace("Connect failed", ex);
445: if (i + 1 < retryCount) {
446: Thread.sleep(1);
447: continue;
448: }
449: }
450: throw ex;
451: }
452: }
453: // Should not happen
454: if (cs == null)
455: throw new ConnectException(
456: "Failed to obtain a socket, tries=" + retryCount);
457: return cs;
458: }
459:
460: protected ClientSocket firstConnection() {
461: synchronized (pool) {
462: if (pool.size() > 0)
463: return (ClientSocket) pool.removeFirst();
464: }
465: return null;
466: }
467:
468: protected ClientSocket getPooledConnection() {
469: ClientSocket socket = null;
470: while ((socket = firstConnection()) != null) {
471: try {
472: // Test to see if socket is alive by send ACK message
473: if (trace)
474: log.trace("Checking pooled socket: " + socket
475: + ", address: "
476: + socket.socket.getLocalSocketAddress());
477: final byte ACK = 1;
478: socket.out.writeByte(ACK);
479: socket.out.flush();
480: socket.in.readByte();
481: if (trace) {
482: log.trace("Using pooled ClientSocket: " + socket
483: + ", usedPooled=" + usedPooled
484: + ", inUseCount=" + inUseCount
485: + ", socketConnectCount="
486: + socketConnectCount
487: + ", socketCloseCount=" + socketCloseCount);
488: }
489: return socket;
490: } catch (Exception ex) {
491: if (trace)
492: log.trace("Failed to validate pooled socket: "
493: + socket, ex);
494: try {
495: if (socket != null) {
496: socketCloseCount--;
497: socket.socket.close();
498: }
499: } catch (Exception ignored) {
500: } finally {
501: if (socket != null)
502: socket.socket = null;
503: }
504: }
505: }
506: return null;
507: }
508:
509: /**
510: * Return a socket to the pool
511: * @param socket
512: * @return true if socket was added to the pool, false if the pool
513: * was full
514: */
515: protected boolean returnConnection(ClientSocket socket) {
516: boolean pooled = false;
517: synchronized (pool) {
518: if (pool.size() < maxPoolSize) {
519: pool.add(socket);
520: inUseCount--;
521: pooled = true;
522: }
523: }
524: return pooled;
525: }
526:
527: /**
528: * The name of of the server.
529: */
530: public String getServerHostName() throws Exception {
531: return address.address;
532: }
533:
534: /**
535: * ???
536: *
537: * @todo MOVE TO TRANSACTION
538: *
539: * @return the transaction propagation context of the transaction
540: * associated with the current thread.
541: * Returns <code>null</code> if the transaction manager was never
542: * set, or if no transaction is associated with the current thread.
543: */
544: public Object getTransactionPropagationContext()
545: throws SystemException {
546: return (tpcFactory == null) ? null : tpcFactory
547: .getTransactionPropagationContext();
548: }
549:
550: /**
551: * The invocation on the delegate, calls the right invoker. Remote if we are remote,
552: * local if we are local.
553: */
554: public Object invoke(Invocation invocation) throws Exception {
555: boolean trace = log.isTraceEnabled();
556: // We are going to go through a Remote invocation, switch to a Marshalled Invocation
557: PooledMarshalledInvocation mi = new PooledMarshalledInvocation(
558: invocation);
559:
560: // Set the transaction propagation context
561: // @todo: MOVE TO TRANSACTION
562: mi
563: .setTransactionPropagationContext(getTransactionPropagationContext());
564:
565: Object response = null;
566: long start = System.currentTimeMillis();
567: ClientSocket socket = getConnection();
568: long end = System.currentTimeMillis() - start;
569: getSocketTime += end;
570: // Add the socket session if it exists
571: if (socket.sessionID != null) {
572: mi.setValue("SESSION_ID", socket.sessionID);
573: if (trace)
574: log.trace("Added SESSION_ID to invocation");
575: }
576:
577: try {
578: if (trace)
579: log.trace("Sending invocation to: "
580: + mi.getObjectName());
581: socket.out.writeObject(mi);
582: socket.out.reset();
583: socket.out.writeObject(Boolean.TRUE); // for stupid ObjectInputStream reset
584: socket.out.flush();
585: socket.out.reset();
586: end = System.currentTimeMillis() - start;
587: writeTime += end;
588: start = System.currentTimeMillis();
589: response = socket.in.readObject();
590: // to make sure stream gets reset
591: // Stupid ObjectInputStream holds object graph
592: // can only be set by the client/server sending a TC_RESET
593: socket.in.readObject();
594: end = System.currentTimeMillis() - start;
595: readTime += end;
596: } catch (Exception ex) {
597: if (trace)
598: log.trace("Failure during invoke", ex);
599: try {
600: socketCloseCount--;
601: socket.socket.close();
602: } catch (Exception ignored) {
603: } finally {
604: socket.socket = null;
605: }
606: throw new java.rmi.ConnectException(
607: "Failure during invoke", ex);
608: }
609:
610: // Put socket back in pool for reuse
611: if (returnConnection(socket) == false) {
612: // Failed, close the socket
613: if (trace)
614: log.trace("Closing unpooled socket: " + socket);
615: try {
616: socketCloseCount--;
617: socket.socket.close();
618: } catch (Exception ignored) {
619: } finally {
620: socket.socket = null;
621: }
622: }
623:
624: // Return response
625:
626: try {
627: if (response instanceof Exception) {
628: throw ((Exception) response);
629: }
630: if (response instanceof MarshalledObject) {
631: return ((MarshalledObject) response).get();
632: }
633: return response;
634: } catch (ServerException ex) {
635: // Suns RMI implementation wraps NoSuchObjectException in
636: // a ServerException. We cannot have that if we want
637: // to comply with the spec, so we unwrap here.
638: if (ex.detail instanceof NoSuchObjectException) {
639: throw (NoSuchObjectException) ex.detail;
640: }
641: //likewise
642: if (ex.detail instanceof TransactionRolledbackException) {
643: throw (TransactionRolledbackException) ex.detail;
644: }
645: throw ex;
646: }
647: }
648:
649: /**
650: * Write out the serializable data
651: * @serialData address ServerAddress
652: * @serialData maxPoolSize int
653: * @serialData WIRE_VERSION int version
654: * @serialData retryCount int
655: * @param out
656: * @throws IOException
657: */
658: public void writeExternal(final ObjectOutput out)
659: throws IOException {
660: // The legacy wire format is address, maxPoolSize
661: out.writeObject(address);
662: out.writeInt(maxPoolSize);
663: // Write out the current version format and its data
664: out.writeInt(WIRE_VERSION);
665: out.writeInt(retryCount);
666: }
667:
668: public void readExternal(final ObjectInput in) throws IOException,
669: ClassNotFoundException {
670: trace = log.isTraceEnabled();
671: address = (ServerAddress) in.readObject();
672: maxPoolSize = in.readInt();
673: int version = 0;
674: try {
675: version = in.readInt();
676: } catch (EOFException e) {
677: // No version written and there is no more data
678: } catch (OptionalDataException e) {
679: // No version written and there is data from other objects
680: }
681:
682: switch (version) {
683: case 0:
684: // This has no retryCount, default it to the hard-coded value
685: retryCount = MAX_RETRIES;
686: break;
687: case 1:
688: readVersion1(in);
689: break;
690: default:
691: /* Assume a newer version that only adds defaultable values.
692: The alternative would be to thrown an exception
693: */
694: break;
695: }
696: initPool();
697: }
698:
699: private void readVersion1(final ObjectInput in) throws IOException {
700: retryCount = in.readInt();
701: }
702: }
|