001: /*
002: * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003: * notice. All rights reserved.
004: */
005: package com.tc.net.core;
006:
007: import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
008:
009: import com.tc.exception.TCInternalError;
010: import com.tc.logging.TCLogger;
011: import com.tc.logging.TCLogging;
012: import com.tc.net.NIOWorkarounds;
013: import com.tc.net.core.event.TCListenerEvent;
014: import com.tc.net.core.event.TCListenerEventListener;
015: import com.tc.util.Assert;
016: import com.tc.util.Util;
017: import com.tc.util.runtime.Os;
018:
019: import java.io.IOException;
020: import java.net.Socket;
021: import java.nio.channels.CancelledKeyException;
022: import java.nio.channels.Channel;
023: import java.nio.channels.ClosedChannelException;
024: import java.nio.channels.GatheringByteChannel;
025: import java.nio.channels.ScatteringByteChannel;
026: import java.nio.channels.SelectableChannel;
027: import java.nio.channels.SelectionKey;
028: import java.nio.channels.Selector;
029: import java.nio.channels.ServerSocketChannel;
030: import java.nio.channels.SocketChannel;
031: import java.util.HashSet;
032: import java.util.Iterator;
033: import java.util.Random;
034: import java.util.Set;
035:
036: /**
037: * JDK 1.4 (NIO) version of TCComm. Uses a single internal thread and a selector to manage channels associated with
038: * <code>TCConnection</code>'s
039: *
040: * @author teck
041: */
042: class TCCommJDK14 implements TCComm, TCListenerEventListener {
043:
044: protected static final TCLogger logger = TCLogging
045: .getLogger(TCComm.class);
046: private final SocketParams params;
047:
048: TCCommJDK14(SocketParams params) {
049: this .params = params;
050: }
051:
052: protected void startImpl() {
053: this .selector = null;
054:
055: final int tries = 3;
056:
057: for (int i = 0; i < tries; i++) {
058: try {
059: this .selector = Selector.open();
060: break;
061: } catch (IOException ioe) {
062: throw new RuntimeException(ioe);
063: } catch (NullPointerException npe) {
064: if (i < tries && NIOWorkarounds.selectorOpenRace(npe)) {
065: System.err
066: .println("Attempting to work around sun bug 6427854 (attempt "
067: + (i + 1) + " of " + tries + ")");
068: try {
069: Thread.sleep(new Random().nextInt(20) + 5);
070: } catch (InterruptedException ie) {
071: //
072: }
073: continue;
074: }
075: throw npe;
076: }
077: }
078:
079: if (this .selector == null) {
080: throw new RuntimeException("Could not start selector");
081: }
082:
083: commThread = new TCCommThread(this );
084: commThread.start();
085: }
086:
087: protected void stopImpl() {
088: try {
089: if (selector != null) {
090: selector.wakeup();
091: }
092: } catch (Exception e) {
093: logger.error("Exception trying to stop TCComm", e);
094: }
095: }
096:
097: void addSelectorTask(final Runnable task) {
098: Assert.eval(!isCommThread());
099: boolean isInterrupted = false;
100:
101: try {
102: while (true) {
103: try {
104: selectorTasks.put(task);
105: break;
106: } catch (InterruptedException e) {
107: logger.warn(e);
108: isInterrupted = true;
109: }
110: }
111: } finally {
112: selector.wakeup();
113: }
114: Util.selfInterruptIfNeeded(isInterrupted);
115: }
116:
117: void stopListener(final ServerSocketChannel ssc,
118: final Runnable callback) {
119: if (!isCommThread()) {
120: Runnable task = new Runnable() {
121: public void run() {
122: TCCommJDK14.this .stopListener(ssc, callback);
123: }
124: };
125: addSelectorTask(task);
126: return;
127: }
128:
129: try {
130: cleanupChannel(ssc, null);
131: } catch (Exception e) {
132: logger.error(e);
133: } finally {
134: try {
135: callback.run();
136: } catch (Exception e) {
137: logger.error(e);
138: }
139: }
140: }
141:
142: void unregister(SelectableChannel channel) {
143: Assert.assertTrue(isCommThread());
144: SelectionKey key = channel.keyFor(selector);
145: if (key != null) {
146: key.cancel();
147: key.attach(null);
148: }
149: }
150:
151: void cleanupChannel(final Channel ch, final Runnable callback) {
152: if (null == ch) {
153: // not expected
154: logger.warn("null channel passed to cleanupChannel()",
155: new Throwable());
156: return;
157: }
158:
159: if (!isCommThread()) {
160: if (logger.isDebugEnabled()) {
161: logger.debug("queue'ing channel close operation");
162: }
163:
164: addSelectorTask(new Runnable() {
165: public void run() {
166: TCCommJDK14.this .cleanupChannel(ch, callback);
167: }
168: });
169: return;
170: }
171:
172: try {
173: if (ch instanceof SelectableChannel) {
174: SelectableChannel sc = (SelectableChannel) ch;
175:
176: try {
177: SelectionKey sk = sc.keyFor(selector);
178: if (sk != null) {
179: sk.attach(null);
180: sk.cancel();
181: }
182: } catch (Exception e) {
183: logger.warn(
184: "Exception trying to clear selection key",
185: e);
186: }
187: }
188:
189: if (ch instanceof SocketChannel) {
190: SocketChannel sc = (SocketChannel) ch;
191:
192: Socket s = sc.socket();
193:
194: if (null != s) {
195: synchronized (s) {
196:
197: if (s.isConnected()) {
198: try {
199: if (!s.isOutputShutdown()) {
200: s.shutdownOutput();
201: }
202: } catch (Exception e) {
203: logger
204: .warn("Exception trying to shutdown socket output: "
205: + e.getMessage());
206: }
207:
208: try {
209: if (!s.isClosed()) {
210: s.close();
211: }
212: } catch (Exception e) {
213: logger
214: .warn("Exception trying to close() socket: "
215: + e.getMessage());
216: }
217: }
218: }
219: }
220: } else if (ch instanceof ServerSocketChannel) {
221: ServerSocketChannel ssc = (ServerSocketChannel) ch;
222:
223: try {
224: ssc.close();
225: } catch (Exception e) {
226: logger
227: .warn("Exception trying to close() server socket"
228: + e.getMessage());
229: }
230: }
231:
232: try {
233: ch.close();
234: } catch (Exception e) {
235: logger.warn("Exception trying to close channel", e);
236: }
237: } catch (Exception e) {
238: // this is just a catch all to make sure that no exceptions will be thrown by this method, please do not remove
239: logger.error("Unhandled exception in cleanupChannel()", e);
240: } finally {
241: try {
242: if (callback != null) {
243: callback.run();
244: }
245: } catch (Throwable t) {
246: logger
247: .error(
248: "Unhandled exception in cleanupChannel callback.",
249: t);
250: }
251: }
252:
253: }
254:
255: void requestConnectInterest(TCConnectionJDK14 conn, SocketChannel sc) {
256: handleRequest(InterestRequest.createSetInterestRequest(sc,
257: conn, SelectionKey.OP_CONNECT));
258: }
259:
260: void requestReadInterest(TCJDK14ChannelReader reader,
261: ScatteringByteChannel channel) {
262: handleRequest(InterestRequest.createAddInterestRequest(
263: (SelectableChannel) channel, reader,
264: SelectionKey.OP_READ));
265: }
266:
267: void requestWriteInterest(TCJDK14ChannelWriter writer,
268: GatheringByteChannel channel) {
269: handleRequest(InterestRequest.createAddInterestRequest(
270: (SelectableChannel) channel, writer,
271: SelectionKey.OP_WRITE));
272: }
273:
274: void requestAcceptInterest(TCListenerJDK14 lsnr,
275: ServerSocketChannel ssc) {
276: handleRequest(InterestRequest.createSetInterestRequest(ssc,
277: lsnr, SelectionKey.OP_ACCEPT));
278: }
279:
280: void removeWriteInterest(TCConnectionJDK14 conn,
281: SelectableChannel channel) {
282: handleRequest(InterestRequest.createRemoveInterestRequest(
283: channel, conn, SelectionKey.OP_WRITE));
284: }
285:
286: void removeReadInterest(TCConnectionJDK14 conn,
287: SelectableChannel channel) {
288: handleRequest(InterestRequest.createRemoveInterestRequest(
289: channel, conn, SelectionKey.OP_READ));
290: }
291:
292: public void closeEvent(TCListenerEvent event) {
293: commThread.listenerAdded(event.getSource());
294: }
295:
296: void listenerAdded(TCListener listener) {
297: commThread.listenerAdded(listener);
298: }
299:
300: private void handleRequest(final InterestRequest req) {
301: // ignore the request if we are stopped/stopping
302: if (isStopped()) {
303: return;
304: }
305:
306: if (isCommThread()) {
307: modifyInterest(req);
308: } else {
309: addSelectorTask(new Runnable() {
310: public void run() {
311: TCCommJDK14.this .handleRequest(req);
312: }
313: });
314: return;
315: }
316: }
317:
318: void selectLoop() throws IOException {
319: Assert.assertNotNull("selector", selector);
320: Assert.eval("Not started", isStarted());
321:
322: while (true) {
323: final int numKeys;
324: try {
325: numKeys = selector.select();
326: } catch (IOException ioe) {
327: if (NIOWorkarounds.linuxSelectWorkaround(ioe)) {
328: logger.warn("working around Sun bug 4504001");
329: continue;
330: }
331: throw ioe;
332: }
333:
334: if (isStopped()) {
335: if (logger.isDebugEnabled()) {
336: logger.debug("Select loop terminating");
337: }
338: return;
339: }
340:
341: boolean isInterrupted = false;
342: // run any pending selector tasks
343: while (true) {
344: Runnable task = null;
345: while (true) {
346: try {
347: task = (Runnable) selectorTasks.poll(0);
348: break;
349: } catch (InterruptedException ie) {
350: logger.error(
351: "Error getting task from task queue",
352: ie);
353: isInterrupted = true;
354: }
355: }
356:
357: if (null == task) {
358: break;
359: }
360:
361: try {
362: task.run();
363: } catch (Exception e) {
364: logger.error("error running selector task", e);
365: }
366: }
367: Util.selfInterruptIfNeeded(isInterrupted);
368:
369: final Set selectedKeys = selector.selectedKeys();
370: if ((0 == numKeys) && (0 == selectedKeys.size())) {
371: continue;
372: }
373:
374: for (Iterator iter = selectedKeys.iterator(); iter
375: .hasNext();) {
376: SelectionKey key = (SelectionKey) iter.next();
377: iter.remove();
378:
379: if (null == key) {
380: logger.error("Selection key is null");
381: continue;
382: }
383:
384: try {
385: if (key.isAcceptable()) {
386: doAccept(key);
387: continue;
388: }
389:
390: if (key.isConnectable()) {
391: doConnect(key);
392: continue;
393: }
394:
395: if (key.isReadable()) {
396: ((TCJDK14ChannelReader) key.attachment())
397: .doRead((ScatteringByteChannel) key
398: .channel());
399: }
400:
401: if (key.isValid() && key.isWritable()) {
402: ((TCJDK14ChannelWriter) key.attachment())
403: .doWrite((GatheringByteChannel) key
404: .channel());
405: }
406: } catch (CancelledKeyException cke) {
407: logger.warn(cke.getClass().getName() + " occured");
408: }
409: } // for
410: } // while (true)
411: }
412:
413: private void dispose() {
414: if (selector != null) {
415:
416: for (Iterator keys = selector.keys().iterator(); keys
417: .hasNext();) {
418: try {
419: SelectionKey key = (SelectionKey) keys.next();
420: cleanupChannel(key.channel(), null);
421: }
422:
423: catch (Exception e) {
424: logger.warn("Exception trying to close channel", e);
425: }
426: }
427:
428: try {
429: selector.close();
430: } catch (Exception e) {
431: if ((Os.isMac())
432: && (Os.isUnix())
433: && (e.getMessage()
434: .equals("Bad file descriptor"))) {
435: // I can't find a specific bug about this, but I also can't seem to prevent the exception on the Mac.
436: // So just logging this as warning.
437: logger.warn("Exception trying to close selector: "
438: + e.getMessage());
439: } else {
440: logger.error("Exception trying to close selector",
441: e);
442: }
443: }
444: }
445:
446: // drop any old selector tasks
447: selectorTasks = new LinkedQueue();
448: }
449:
450: private boolean isCommThread() {
451: return isCommThread(Thread.currentThread());
452: }
453:
454: private boolean isCommThread(Thread thread) {
455: if (thread == null) {
456: return false;
457: }
458: return thread == commThread;
459: }
460:
461: private void doConnect(SelectionKey key) {
462: SocketChannel sc = (SocketChannel) key.channel();
463: TCConnectionJDK14 conn = (TCConnectionJDK14) key.attachment();
464:
465: try {
466: if (sc.finishConnect()) {
467: sc.register(selector, SelectionKey.OP_READ, conn);
468: conn.finishConnect();
469: } else {
470: String errMsg = "finishConnect() returned false, but no exception thrown";
471:
472: if (logger.isInfoEnabled()) {
473: logger.info(errMsg);
474: }
475:
476: conn.fireErrorEvent(new Exception(errMsg), null);
477: }
478: } catch (IOException ioe) {
479: if (logger.isInfoEnabled()) {
480: logger
481: .info(
482: "IOException attempting to finish socket connection",
483: ioe);
484: }
485:
486: conn.fireErrorEvent(ioe, null);
487: }
488: }
489:
490: private void modifyInterest(InterestRequest request) {
491: Assert.eval(isCommThread());
492:
493: try {
494: final int existingOps;
495:
496: SelectionKey key = request.channel.keyFor(selector);
497: if (key != null) {
498: existingOps = key.interestOps();
499: } else {
500: existingOps = 0;
501: }
502:
503: if (logger.isDebugEnabled()) {
504: logger.debug(request);
505: }
506:
507: if (request.add) {
508: request.channel.register(selector, existingOps
509: | request.interestOps, request.attachment);
510: } else if (request.set) {
511: request.channel.register(selector, request.interestOps,
512: request.attachment);
513: } else if (request.remove) {
514: request.channel.register(selector, existingOps
515: ^ request.interestOps, request.attachment);
516: } else {
517: throw new TCInternalError();
518: }
519: } catch (ClosedChannelException cce) {
520: logger
521: .warn("Exception trying to process interest request: "
522: + cce);
523:
524: } catch (CancelledKeyException cke) {
525: logger
526: .warn("Exception trying to process interest request: "
527: + cke);
528: }
529: }
530:
531: private void doAccept(final SelectionKey key) {
532: Assert.eval(isCommThread());
533:
534: SocketChannel sc = null;
535:
536: TCListenerJDK14 lsnr = (TCListenerJDK14) key.attachment();
537:
538: try {
539: final ServerSocketChannel ssc = (ServerSocketChannel) key
540: .channel();
541: sc = ssc.accept();
542: sc.configureBlocking(false);
543:
544: TCConnectionJDK14 conn = lsnr.createConnection(sc, params);
545: sc.register(selector, SelectionKey.OP_READ
546: | SelectionKey.OP_WRITE, conn);
547: } catch (IOException ioe) {
548: if (logger.isInfoEnabled()) {
549: logger.info("IO Exception accepting new connection",
550: ioe);
551: }
552:
553: cleanupChannel(sc, null);
554: }
555: }
556:
557: public final boolean isStarted() {
558: return started;
559: }
560:
561: public final boolean isStopped() {
562: return !started;
563: }
564:
565: public final synchronized void start() {
566: if (!started) {
567: started = true;
568: if (logger.isDebugEnabled()) {
569: logger.debug("Start requested");
570: }
571:
572: startImpl();
573: }
574: }
575:
576: private volatile boolean started = false;
577:
578: public final synchronized void stop() {
579: if (started) {
580: started = false;
581: if (logger.isDebugEnabled()) {
582: logger.debug("Stop requested");
583: }
584: stopImpl();
585: }
586: }
587:
588: private Selector selector;
589: private TCCommThread commThread = null;
590: private LinkedQueue selectorTasks = new LinkedQueue();
591:
592: private static class InterestRequest {
593: final SelectableChannel channel;
594: final Object attachment;
595: final boolean set;
596: final boolean add;
597: final boolean remove;
598: final int interestOps;
599:
600: static InterestRequest createAddInterestRequest(
601: SelectableChannel channel, Object attachment,
602: int interestOps) {
603: return new InterestRequest(channel, attachment,
604: interestOps, false, true, false);
605: }
606:
607: static InterestRequest createSetInterestRequest(
608: SelectableChannel channel, Object attachment,
609: int interestOps) {
610: return new InterestRequest(channel, attachment,
611: interestOps, true, false, false);
612: }
613:
614: static InterestRequest createRemoveInterestRequest(
615: SelectableChannel channel, Object attachment,
616: int interestOps) {
617: return new InterestRequest(channel, attachment,
618: interestOps, false, false, true);
619: }
620:
621: private InterestRequest(SelectableChannel channel,
622: Object attachment, int interestOps, boolean set,
623: boolean add, boolean remove) {
624: Assert.eval(remove ^ set ^ add);
625: Assert.eval(channel != null);
626:
627: this .channel = channel;
628: this .attachment = attachment;
629: this .set = set;
630: this .add = add;
631: this .remove = remove;
632: this .interestOps = interestOps;
633: }
634:
635: public String toString() {
636: StringBuffer buf = new StringBuffer();
637:
638: buf.append("Interest modify request: ").append(
639: channel.toString()).append("\n");
640: buf.append("Ops: ").append(
641: Constants.interestOpsToString(interestOps)).append(
642: "\n");
643: buf.append("Set: ").append(set).append(", Remove: ")
644: .append(remove).append(", Add: ").append(add)
645: .append("\n");
646: buf.append("Attachment: ");
647:
648: if (attachment != null) {
649: buf.append(attachment.toString());
650: } else {
651: buf.append("null");
652: }
653:
654: buf.append("\n");
655:
656: return buf.toString();
657: }
658:
659: }
660:
661: // Little helper class to drive the selector. The main point of this class
662: // is to isolate the try/finally block around the entire selection process
663: private static class TCCommThread extends Thread {
664: final TCCommJDK14 commInstance;
665: final Set listeners = new HashSet();
666: final int number = getNextCounter();
667: final String baseName = "TCComm Selector Thread " + number;
668:
669: private static int counter = 1;
670:
671: private static synchronized int getNextCounter() {
672: return counter++;
673: }
674:
675: TCCommThread(TCCommJDK14 comm) {
676: commInstance = comm;
677: setDaemon(true);
678: setName(baseName);
679:
680: if (logger.isDebugEnabled()) {
681: logger.debug("Creating a new selector thread ("
682: + toString() + ")", new Throwable());
683: }
684: }
685:
686: String makeListenString(TCListener listener) {
687: StringBuffer buf = new StringBuffer();
688: buf.append("(listen ");
689: buf.append(listener.getBindAddress().getHostAddress());
690: buf.append(':');
691: buf.append(listener.getBindPort());
692: buf.append(')');
693: return buf.toString();
694: }
695:
696: synchronized void listenerRemoved(TCListener listener) {
697: listeners.remove(makeListenString(listener));
698: updateThreadName();
699: }
700:
701: synchronized void listenerAdded(TCListener listener) {
702: listeners.add(makeListenString(listener));
703: updateThreadName();
704: }
705:
706: private void updateThreadName() {
707: StringBuffer buf = new StringBuffer(baseName);
708: for (final Iterator iter = listeners.iterator(); iter
709: .hasNext();) {
710: buf.append(' ');
711: buf.append(iter.next());
712: }
713:
714: setName(buf.toString());
715: }
716:
717: public void run() {
718: try {
719: commInstance.selectLoop();
720: } catch (Throwable t) {
721: logger.error("Unhandled exception from selectLoop", t);
722: t.printStackTrace();
723: } finally {
724: commInstance.dispose();
725: }
726: }
727: }
728:
729: }
|