001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tc.net.core;
006:
007: import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
008: import EDU.oswego.cs.dl.util.concurrent.Latch;
009: import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
010: import EDU.oswego.cs.dl.util.concurrent.SynchronizedLong;
011:
012: import com.tc.bytes.TCByteBuffer;
013: import com.tc.logging.TCLogger;
014: import com.tc.logging.TCLogging;
015: import com.tc.net.NIOWorkarounds;
016: import com.tc.net.TCSocketAddress;
017: import com.tc.net.core.event.TCConnectionEventCaller;
018: import com.tc.net.core.event.TCConnectionEventListener;
019: import com.tc.net.protocol.TCNetworkMessage;
020: import com.tc.net.protocol.TCProtocolAdaptor;
021: import com.tc.util.Assert;
022: import com.tc.util.TCTimeoutException;
023: import com.tc.util.concurrent.SetOnceFlag;
024: import com.tc.util.concurrent.SetOnceRef;
025: import com.tc.util.concurrent.ThreadUtil;
026:
027: import java.io.IOException;
028: import java.net.InetSocketAddress;
029: import java.net.Socket;
030: import java.net.SocketException;
031: import java.net.SocketTimeoutException;
032: import java.nio.ByteBuffer;
033: import java.nio.channels.ClosedSelectorException;
034: import java.nio.channels.GatheringByteChannel;
035: import java.nio.channels.ScatteringByteChannel;
036: import java.nio.channels.SocketChannel;
037: import java.util.Date;
038: import java.util.LinkedList;
039: import java.util.List;
040:
041: /**
042: * JDK14 (nio) implementation of TCConnection
043: *
044: * @author teck
045: */
046: final class TCConnectionJDK14 implements TCConnection,
047: TCJDK14ChannelReader, TCJDK14ChannelWriter {
048:
049: private static final long NO_CONNECT_TIME = -1L;
050: private static final TCLogger logger = TCLogging
051: .getLogger(TCConnection.class);
052: private static final long WARN_THRESHOLD = 0x400000L; // 4MB
053:
054: private final LinkedList writeContexts = new LinkedList();
055: private final TCCommJDK14 comm;
056: private final TCConnectionManagerJDK14 parent;
057: private final TCConnectionEventCaller eventCaller = new TCConnectionEventCaller(
058: logger);
059: private final SynchronizedLong lastActivityTime = new SynchronizedLong(
060: System.currentTimeMillis());
061: private final SynchronizedLong connectTime = new SynchronizedLong(
062: NO_CONNECT_TIME);
063: private final List eventListeners = new CopyOnWriteArrayList();
064: private final TCProtocolAdaptor protocolAdaptor;
065: private final SynchronizedBoolean isSocketEndpoint = new SynchronizedBoolean(
066: false);
067: private final SetOnceFlag closed = new SetOnceFlag();
068: private final SynchronizedBoolean connected = new SynchronizedBoolean(
069: false);
070: private final SetOnceRef localSocketAddress = new SetOnceRef();
071: private final SetOnceRef remoteSocketAddress = new SetOnceRef();
072: private volatile SocketChannel channel;
073: private final SocketParams socketParams;
074:
075: // for creating unconnected client connections
076: TCConnectionJDK14(TCConnectionEventListener listener,
077: TCCommJDK14 comm, TCProtocolAdaptor adaptor,
078: TCConnectionManagerJDK14 managerJDK14,
079: SocketParams socketParams) {
080: this (listener, comm, adaptor, null, managerJDK14, socketParams);
081: }
082:
083: TCConnectionJDK14(TCConnectionEventListener listener,
084: TCCommJDK14 comm, TCProtocolAdaptor adaptor,
085: SocketChannel ch, TCConnectionManagerJDK14 parent,
086: SocketParams socketParams) {
087: Assert.assertNotNull(parent);
088: Assert.assertNotNull(adaptor);
089:
090: this .parent = parent;
091: this .protocolAdaptor = adaptor;
092:
093: if (listener != null)
094: addListener(listener);
095:
096: Assert.assertNotNull(comm);
097: this .comm = comm;
098: this .channel = ch;
099:
100: if (ch != null) {
101: socketParams.applySocketParams(ch.socket());
102: }
103:
104: this .socketParams = socketParams;
105: }
106:
107: private void closeImpl(Runnable callback) {
108: Assert.assertTrue(closed.isSet());
109: try {
110: if (channel != null) {
111: comm.cleanupChannel(channel, callback);
112: } else {
113: callback.run();
114: }
115: } finally {
116: synchronized (writeContexts) {
117: writeContexts.clear();
118: }
119: }
120: }
121:
122: protected void finishConnect() {
123: Assert.assertNotNull("channel", channel);
124: recordSocketAddress(channel.socket());
125: setConnected(true);
126: eventCaller.fireConnectEvent(eventListeners, this );
127: }
128:
129: private void connectImpl(TCSocketAddress addr, int timeout)
130: throws IOException, TCTimeoutException {
131: SocketChannel newSocket = null;
132: InetSocketAddress inetAddr = new InetSocketAddress(addr
133: .getAddress(), addr.getPort());
134: for (int i = 1; i <= 3; i++) {
135: try {
136: newSocket = createChannel();
137: newSocket.configureBlocking(true);
138: newSocket.socket().connect(inetAddr, timeout);
139: break;
140: } catch (SocketTimeoutException ste) {
141: comm.cleanupChannel(newSocket, null);
142: throw new TCTimeoutException("Timeout of " + timeout
143: + "ms occured connecting to " + addr, ste);
144: } catch (ClosedSelectorException cse) {
145: if (NIOWorkarounds.windowsConnectWorkaround(cse)) {
146: logger.warn("Retrying connect to " + addr
147: + ", attempt " + i);
148: ThreadUtil.reallySleep(500);
149: continue;
150: }
151: throw cse;
152: }
153: }
154:
155: channel = newSocket;
156: newSocket.configureBlocking(false);
157: comm.requestReadInterest(this , newSocket);
158: }
159:
160: private SocketChannel createChannel() throws IOException,
161: SocketException {
162: SocketChannel rv = SocketChannel.open();
163: Socket s = rv.socket();
164: socketParams.applySocketParams(s);
165: return rv;
166: }
167:
168: private Socket detachImpl() throws IOException {
169: comm.unregister(channel);
170: channel.configureBlocking(true);
171: return channel.socket();
172: }
173:
174: private boolean asynchConnectImpl(TCSocketAddress address)
175: throws IOException {
176: SocketChannel newSocket = createChannel();
177: newSocket.configureBlocking(false);
178:
179: InetSocketAddress inetAddr = new InetSocketAddress(address
180: .getAddress(), address.getPort());
181: final boolean rv = newSocket.connect(inetAddr);
182: setConnected(rv);
183:
184: channel = newSocket;
185:
186: if (!rv) {
187: comm.requestConnectInterest(this , newSocket);
188: }
189:
190: return rv;
191: }
192:
193: public void doRead(ScatteringByteChannel sbc) {
194: final boolean debug = logger.isDebugEnabled();
195: final TCByteBuffer[] readBuffers = getReadBuffers();
196:
197: int bytesRead = 0;
198: boolean readEOF = false;
199: try {
200: // Do the read in a loop, instead of calling read(ByteBuffer[]).
201: // This seems to avoid memory leaks on sun's 1.4.2 JDK
202: for (int i = 0, n = readBuffers.length; i < n; i++) {
203: ByteBuffer buf = extractNioBuffer(readBuffers[i]);
204:
205: if (buf.hasRemaining()) {
206: final int read = sbc.read(buf);
207:
208: if (-1 == read) {
209: // Normal EOF
210: readEOF = true;
211: break;
212: }
213:
214: if (0 == read) {
215: break;
216: }
217:
218: bytesRead += read;
219:
220: if (buf.hasRemaining()) {
221: // don't move on to the next buffer if we didn't fill the current one
222: break;
223: }
224: }
225: }
226: } catch (IOException ioe) {
227: if (logger.isInfoEnabled()) {
228: logger.info("error reading from channel "
229: + channel.toString() + ": " + ioe.getMessage());
230: }
231:
232: eventCaller.fireErrorEvent(eventListeners, this , ioe, null);
233: return;
234: }
235:
236: if (readEOF) {
237: if (bytesRead > 0) {
238: addNetworkData(readBuffers, bytesRead);
239: }
240:
241: if (debug)
242: logger.debug("EOF read on connection "
243: + channel.toString());
244:
245: eventCaller.fireEndOfFileEvent(eventListeners, this );
246: return;
247: }
248:
249: Assert.eval(bytesRead >= 0);
250:
251: if (debug)
252: logger.debug("Read " + bytesRead + " bytes on connection "
253: + channel.toString());
254:
255: addNetworkData(readBuffers, bytesRead);
256: }
257:
258: public void doWrite(GatheringByteChannel gbc) {
259: final boolean debug = logger.isDebugEnabled();
260:
261: // get a copy of the current write contexts. Since we call out to event/error handlers in the write
262: // loop below, we don't want to be holding the lock on the writeContexts queue
263: final WriteContext contextsToWrite[];
264: synchronized (writeContexts) {
265: if (closed.isSet()) {
266: return;
267: }
268: contextsToWrite = (WriteContext[]) writeContexts
269: .toArray(new WriteContext[writeContexts.size()]);
270: }
271:
272: int contextsToRemove = 0;
273: for (int index = 0, n = contextsToWrite.length; index < n; index++) {
274: final WriteContext context = contextsToWrite[index];
275: final ByteBuffer[] buffers = context.clonedData;
276:
277: long bytesWritten = 0;
278: try {
279: // Do the write in a loop, instead of calling write(ByteBuffer[]).
280: // This seems to avoid memory leaks on sun's 1.4.2 JDK
281: for (int i = context.index, nn = buffers.length; i < nn; i++) {
282: final ByteBuffer buf = buffers[i];
283: final int written = gbc.write(buf);
284:
285: if (written == 0) {
286: break;
287: }
288:
289: bytesWritten += written;
290:
291: if (buf.hasRemaining()) {
292: break;
293: } else {
294: context.incrementIndex();
295: }
296: }
297: } catch (IOException ioe) {
298: if (NIOWorkarounds.windowsWritevWorkaround(ioe)) {
299: break;
300: }
301:
302: eventCaller.fireErrorEvent(eventListeners, this , ioe,
303: context.message);
304: }
305:
306: if (debug)
307: logger.debug("Wrote " + bytesWritten
308: + " bytes on connection " + channel.toString());
309:
310: if (context.done()) {
311: contextsToRemove++;
312: if (debug)
313: logger.debug("Complete message sent on connection "
314: + channel.toString());
315: context.writeComplete();
316: } else {
317: if (debug)
318: logger
319: .debug("Message not yet completely sent on connection "
320: + channel.toString());
321: break;
322: }
323: }
324:
325: synchronized (writeContexts) {
326: if (closed.isSet()) {
327: return;
328: }
329:
330: for (int i = 0; i < contextsToRemove; i++) {
331: writeContexts.removeFirst();
332: }
333:
334: if (writeContexts.isEmpty()) {
335: comm.removeWriteInterest(this , channel);
336: }
337: }
338: }
339:
340: static private long bytesRemaining(ByteBuffer[] buffers) {
341: long rv = 0;
342: for (int i = 0, n = buffers.length; i < n; i++) {
343: rv += buffers[i].remaining();
344: }
345: return rv;
346: }
347:
348: static private ByteBuffer[] extractNioBuffers(TCByteBuffer[] src) {
349: ByteBuffer[] rv = new ByteBuffer[src.length];
350: for (int i = 0, n = src.length; i < n; i++) {
351: rv[i] = src[i].getNioBuffer();
352: }
353:
354: return rv;
355: }
356:
357: static private ByteBuffer extractNioBuffer(TCByteBuffer buffer) {
358: return buffer.getNioBuffer();
359: }
360:
361: private void putMessageImpl(TCNetworkMessage message) {
362: // ??? Does the message queue and the WriteContext belong in the base connection class?
363: final boolean debug = logger.isDebugEnabled();
364:
365: final WriteContext context = new WriteContext(message);
366:
367: final long bytesToWrite = bytesRemaining(context.clonedData);
368: if (bytesToWrite >= TCConnectionJDK14.WARN_THRESHOLD) {
369: logger
370: .warn("Warning: Attempting to send a messaage of size "
371: + bytesToWrite + " bytes");
372: }
373:
374: // TODO: outgoing queue should not be unbounded size!
375: final boolean newData;
376: final int msgCount;
377: synchronized (writeContexts) {
378: if (closed.isSet()) {
379: return;
380: }
381:
382: writeContexts.addLast(context);
383: msgCount = writeContexts.size();
384: newData = (msgCount == 1);
385: }
386:
387: if (debug) {
388: logger.debug("Connection (" + channel.toString() + ") has "
389: + msgCount + " messages queued");
390: }
391:
392: if (newData) {
393: if (debug) {
394: logger
395: .debug("New message on connection, registering for write interest");
396: }
397:
398: // NOTE: this might be the very first message on the socket and
399: // given the current implementation, it isn't necessarily
400: // safe to assume one can write to the channel. Long story
401: // short, always enqueue the message and wait until it is selected
402: // for write interest.
403:
404: // If you're trying to optimize for performance by letting the calling thread do the
405: // write, we need to add more logic to connection setup. Specifically, you need register
406: // for, as well as actually be selected for, write interest immediately
407: // after finishConnect(). Only after this selection occurs it is always safe to try
408: // to write.
409:
410: comm.requestWriteInterest(this , channel);
411: }
412: }
413:
414: public final void asynchClose() {
415: if (closed.attemptSet()) {
416: closeImpl(createCloseCallback(null));
417: }
418: }
419:
420: public final boolean close(final long timeout) {
421: if (timeout <= 0) {
422: throw new IllegalArgumentException(
423: "timeout cannot be less than or equal to zero");
424: }
425:
426: if (closed.attemptSet()) {
427: final Latch latch = new Latch();
428: closeImpl(createCloseCallback(latch));
429: try {
430: return latch.attempt(timeout);
431: } catch (InterruptedException e) {
432: logger.warn("close interrupted");
433: return isConnected();
434: }
435: }
436:
437: return isClosed();
438: }
439:
440: private final Runnable createCloseCallback(final Latch latch) {
441: final boolean fireClose = isConnected();
442:
443: return new Runnable() {
444: public void run() {
445: setConnected(false);
446: parent.connectionClosed(TCConnectionJDK14.this );
447:
448: if (fireClose) {
449: eventCaller.fireCloseEvent(eventListeners,
450: TCConnectionJDK14.this );
451: }
452:
453: if (latch != null)
454: latch.release();
455: }
456: };
457: }
458:
459: public final boolean isClosed() {
460: return closed.isSet();
461: }
462:
463: public final boolean isConnected() {
464: return connected.get();
465: }
466:
467: public final String toString() {
468: StringBuffer buf = new StringBuffer();
469:
470: buf.append(getClass().getName()).append('@').append(hashCode())
471: .append(":");
472:
473: buf.append(" connected: ").append(isConnected());
474: buf.append(", closed: ").append(isClosed());
475:
476: if (isSocketEndpoint.get()) {
477: buf.append(" local=");
478: if (localSocketAddress.isSet()) {
479: buf.append(((TCSocketAddress) localSocketAddress.get())
480: .getStringForm());
481: } else {
482: buf.append("[unknown]");
483: }
484:
485: buf.append(" remote=");
486: if (remoteSocketAddress.isSet()) {
487: buf
488: .append(((TCSocketAddress) remoteSocketAddress
489: .get()).getStringForm());
490: } else {
491: buf.append("[unknown");
492: }
493: }
494:
495: buf.append(" connect=[");
496: final long connect = getConnectTime();
497:
498: if (connect != NO_CONNECT_TIME) {
499: buf.append(new Date(connect));
500: } else {
501: buf.append("no connect time");
502: }
503: buf.append(']');
504:
505: buf.append(" idle=").append(getIdleTime()).append("ms");
506:
507: return buf.toString();
508: }
509:
510: public final void addListener(TCConnectionEventListener listener) {
511: if (listener == null) {
512: return;
513: }
514: eventListeners.add(listener); // don't need sync
515: }
516:
517: public final void removeListener(TCConnectionEventListener listener) {
518: if (listener == null) {
519: return;
520: }
521: eventListeners.remove(listener); // don't need sync
522: }
523:
524: public final long getConnectTime() {
525: return connectTime.get();
526: }
527:
528: public final long getIdleTime() {
529: return System.currentTimeMillis() - lastActivityTime.get();
530: }
531:
532: public final synchronized void connect(TCSocketAddress addr,
533: int timeout) throws IOException, TCTimeoutException {
534: if (closed.isSet() || connected.get()) {
535: throw new IllegalStateException(
536: "Connection closed or already connected");
537: }
538: connectImpl(addr, timeout);
539: finishConnect();
540: }
541:
542: public final synchronized boolean asynchConnect(TCSocketAddress addr)
543: throws IOException {
544: if (closed.isSet() || connected.get()) {
545: throw new IllegalStateException(
546: "Connection closed or already connected");
547: }
548:
549: boolean rv = asynchConnectImpl(addr);
550:
551: if (rv) {
552: finishConnect();
553: }
554:
555: return rv;
556: }
557:
558: public final void putMessage(TCNetworkMessage message) {
559: lastActivityTime.set(System.currentTimeMillis());
560:
561: // if (!isConnected() || isClosed()) {
562: // logger.warn("Ignoring message sent to non-connected connection");
563: // return;
564: // }
565:
566: putMessageImpl(message);
567: }
568:
569: public final TCSocketAddress getLocalAddress() {
570: return (TCSocketAddress) localSocketAddress.get();
571: }
572:
573: public final TCSocketAddress getRemoteAddress() {
574: return (TCSocketAddress) remoteSocketAddress.get();
575: }
576:
577: private final void setConnected(boolean connected) {
578: if (connected) {
579: this .connectTime.set(System.currentTimeMillis());
580: }
581: this .connected.set(connected);
582: }
583:
584: private final void recordSocketAddress(Socket socket) {
585: if (socket != null) {
586: isSocketEndpoint.set(true);
587: localSocketAddress.set(new TCSocketAddress(socket
588: .getLocalAddress(), socket.getLocalPort()));
589: remoteSocketAddress.set(new TCSocketAddress(socket
590: .getInetAddress(), socket.getPort()));
591: }
592: }
593:
594: private final void addNetworkData(TCByteBuffer[] data, int length) {
595: lastActivityTime.set(System.currentTimeMillis());
596:
597: try {
598: protocolAdaptor.addReadData(this , data, length);
599: } catch (Exception e) {
600: eventCaller.fireErrorEvent(eventListeners, this , e, null);
601: return;
602: }
603: }
604:
605: protected final TCByteBuffer[] getReadBuffers() {
606: // TODO: Hook in some form of read throttle. To throttle how much data is read from the network,
607: // only return a subset of the buffers that the protocolAdaptor advises to be used.
608:
609: // TODO: should also support a way to de-register read interest temporarily
610:
611: return protocolAdaptor.getReadBuffers();
612: }
613:
614: protected final void fireErrorEvent(Exception e,
615: TCNetworkMessage context) {
616: eventCaller.fireErrorEvent(eventListeners, this , e, context);
617: }
618:
619: public final Socket detach() throws IOException {
620: this .parent.removeConnection(this );
621: return detachImpl();
622: }
623:
624: private static class WriteContext {
625: private final TCNetworkMessage message;
626: private final ByteBuffer[] clonedData;
627: private int index = 0;
628:
629: WriteContext(TCNetworkMessage message) {
630: this .message = message;
631:
632: final ByteBuffer[] msgData = extractNioBuffers(message
633: .getEntireMessageData());
634: this .clonedData = new ByteBuffer[msgData.length];
635:
636: for (int i = 0; i < msgData.length; i++) {
637: clonedData[i] = msgData[i].duplicate()
638: .asReadOnlyBuffer();
639: }
640: }
641:
642: boolean done() {
643: for (int i = index, n = clonedData.length; i < n; i++) {
644: if (clonedData[i].hasRemaining()) {
645: return false;
646: }
647: }
648:
649: return true;
650: }
651:
652: void incrementIndex() {
653: clonedData[index] = null;
654: index++;
655: }
656:
657: void writeComplete() {
658: this.message.wasSent();
659: }
660: }
661:
662: }
|