001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017:
018: package org.apache.catalina.tribes.transport.nio;
019:
020: import java.io.IOException;
021: import java.net.ServerSocket;
022: import java.nio.channels.SelectableChannel;
023: import java.nio.channels.SelectionKey;
024: import java.nio.channels.Selector;
025: import java.nio.channels.ServerSocketChannel;
026: import java.nio.channels.SocketChannel;
027: import java.util.Iterator;
028:
029: import org.apache.catalina.tribes.ChannelReceiver;
030: import org.apache.catalina.tribes.io.ListenCallback;
031: import org.apache.catalina.tribes.io.ObjectReader;
032: import org.apache.catalina.tribes.transport.Constants;
033: import org.apache.catalina.tribes.transport.ReceiverBase;
034: import org.apache.catalina.tribes.transport.RxTaskPool;
035: import org.apache.catalina.tribes.transport.AbstractRxTask;
036: import org.apache.catalina.tribes.util.StringManager;
037: import java.util.LinkedList;
038: import java.util.Set;
039: import java.nio.channels.CancelledKeyException;
040:
041: /**
042: * @author Filip Hanik
043: * @version $Revision: 538977 $ $Date: 2007-05-17 17:43:49 +0200 (jeu., 17 mai 2007) $
044: */
045: public class NioReceiver extends ReceiverBase implements Runnable,
046: ChannelReceiver, ListenCallback {
047:
048: protected static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory
049: .getLog(NioReceiver.class);
050:
051: /**
052: * The string manager for this package.
053: */
054: protected StringManager sm = StringManager
055: .getManager(Constants.Package);
056:
057: /**
058: * The descriptive information about this implementation.
059: */
060: private static final String info = "NioReceiver/1.0";
061:
062: private Selector selector = null;
063: private ServerSocketChannel serverChannel = null;
064:
065: protected LinkedList events = new LinkedList();
066:
067: // private Object interestOpsMutex = new Object();
068:
069: public NioReceiver() {
070: }
071:
072: /**
073: * Return descriptive information about this implementation and the
074: * corresponding version number, in the format
075: * <code><description>/<version></code>.
076: */
077: public String getInfo() {
078: return (info);
079: }
080:
081: // public Object getInterestOpsMutex() {
082: // return interestOpsMutex;
083: // }
084:
085: public void stop() {
086: this .stopListening();
087: super .stop();
088: }
089:
090: /**
091: * start cluster receiver
092: * @throws Exception
093: * @see org.apache.catalina.tribes.ClusterReceiver#start()
094: */
095: public void start() throws IOException {
096: super .start();
097: try {
098: setPool(new RxTaskPool(getMaxThreads(), getMinThreads(),
099: this ));
100: } catch (Exception x) {
101: log.fatal("ThreadPool can initilzed. Listener not started",
102: x);
103: if (x instanceof IOException)
104: throw (IOException) x;
105: else
106: throw new IOException(x.getMessage());
107: }
108: try {
109: getBind();
110: bind();
111: Thread t = new Thread(this , "NioReceiver");
112: t.setDaemon(true);
113: t.start();
114: } catch (Exception x) {
115: log.fatal("Unable to start cluster receiver", x);
116: if (x instanceof IOException)
117: throw (IOException) x;
118: else
119: throw new IOException(x.getMessage());
120: }
121: }
122:
123: public AbstractRxTask createRxTask() {
124: NioReplicationTask thread = new NioReplicationTask(this , this );
125: thread.setUseBufferPool(this .getUseBufferPool());
126: thread.setRxBufSize(getRxBufSize());
127: thread.setOptions(getWorkerThreadOptions());
128: return thread;
129: }
130:
131: protected void bind() throws IOException {
132: // allocate an unbound server socket channel
133: serverChannel = ServerSocketChannel.open();
134: // Get the associated ServerSocket to bind it with
135: ServerSocket serverSocket = serverChannel.socket();
136: // create a new Selector for use below
137: selector = Selector.open();
138: // set the port the server channel will listen to
139: //serverSocket.bind(new InetSocketAddress(getBind(), getTcpListenPort()));
140: bind(serverSocket, getTcpListenPort(), getAutoBind());
141: // set non-blocking mode for the listening socket
142: serverChannel.configureBlocking(false);
143: // register the ServerSocketChannel with the Selector
144: serverChannel.register(selector, SelectionKey.OP_ACCEPT);
145:
146: }
147:
148: public void addEvent(Runnable event) {
149: if (selector != null) {
150: synchronized (events) {
151: events.add(event);
152: }
153: if (log.isTraceEnabled())
154: log.trace("Adding event to selector:" + event);
155: if (isListening() && selector != null)
156: selector.wakeup();
157: }
158: }
159:
160: public void events() {
161: if (events.size() == 0)
162: return;
163: synchronized (events) {
164: Runnable r = null;
165: while ((events.size() > 0)
166: && (r = (Runnable) events.removeFirst()) != null) {
167: try {
168: if (log.isTraceEnabled())
169: log.trace("Processing event in selector:" + r);
170: r.run();
171: } catch (Exception x) {
172: log.error("", x);
173: }
174: }
175: events.clear();
176: }
177: }
178:
179: public static void cancelledKey(SelectionKey key) {
180: ObjectReader reader = (ObjectReader) key.attachment();
181: if (reader != null) {
182: reader.setCancelled(true);
183: reader.finish();
184: }
185: key.cancel();
186: key.attach(null);
187: try {
188: ((SocketChannel) key.channel()).socket().close();
189: } catch (IOException e) {
190: if (log.isDebugEnabled())
191: log.debug("", e);
192: }
193: try {
194: key.channel().close();
195: } catch (IOException e) {
196: if (log.isDebugEnabled())
197: log.debug("", e);
198: }
199:
200: }
201:
202: protected long lastCheck = System.currentTimeMillis();
203:
204: protected void socketTimeouts() {
205: long now = System.currentTimeMillis();
206: if ((now - lastCheck) < getSelectorTimeout())
207: return;
208: //timeout
209: Selector tmpsel = selector;
210: Set keys = (isListening() && tmpsel != null) ? tmpsel.keys()
211: : null;
212: if (keys == null)
213: return;
214: for (Iterator iter = keys.iterator(); iter.hasNext();) {
215: SelectionKey key = (SelectionKey) iter.next();
216: try {
217: // if (key.interestOps() == SelectionKey.OP_READ) {
218: // //only timeout sockets that we are waiting for a read from
219: // ObjectReader ka = (ObjectReader) key.attachment();
220: // long delta = now - ka.getLastAccess();
221: // if (delta > (long) getTimeout()) {
222: // cancelledKey(key);
223: // }
224: // }
225: // else
226: if (key.interestOps() == 0) {
227: //check for keys that didn't make it in.
228: ObjectReader ka = (ObjectReader) key.attachment();
229: if (ka != null) {
230: long delta = now - ka.getLastAccess();
231: if (delta > (long) getTimeout()
232: && (!ka.isAccessed())) {
233: log
234: .warn("Channel key is registered, but has had no interest ops for the last "
235: + getTimeout()
236: + " ms. (cancelled:"
237: + ka.isCancelled()
238: + "):"
239: + key
240: + " last access:"
241: + new java.sql.Timestamp(ka
242: .getLastAccess()));
243: // System.out.println("Interest:"+key.interestOps());
244: // System.out.println("Ready Ops:"+key.readyOps());
245: // System.out.println("Valid:"+key.isValid());
246: ka.setLastAccess(now);
247: //key.interestOps(SelectionKey.OP_READ);
248: }//end if
249: } else {
250: cancelledKey(key);
251: }//end if
252: }//end if
253: } catch (CancelledKeyException ckx) {
254: cancelledKey(key);
255: }
256: }
257: lastCheck = System.currentTimeMillis();
258: }
259:
260: /**
261: * get data from channel and store in byte array
262: * send it to cluster
263: * @throws IOException
264: * @throws java.nio.channels.ClosedChannelException
265: */
266: protected void listen() throws Exception {
267: if (doListen()) {
268: log.warn("ServerSocketChannel already started");
269: return;
270: }
271:
272: setListen(true);
273:
274: while (doListen() && selector != null) {
275: // this may block for a long time, upon return the
276: // selected set contains keys of the ready channels
277: try {
278: events();
279: socketTimeouts();
280: int n = selector.select(getTcpSelectorTimeout());
281: if (n == 0) {
282: //there is a good chance that we got here
283: //because the TcpReplicationThread called
284: //selector wakeup().
285: //if that happens, we must ensure that that
286: //thread has enough time to call interestOps
287: // synchronized (interestOpsMutex) {
288: //if we got the lock, means there are no
289: //keys trying to register for the
290: //interestOps method
291: // }
292: continue; // nothing to do
293: }
294: // get an iterator over the set of selected keys
295: Iterator it = selector.selectedKeys().iterator();
296: // look at each key in the selected set
297: while (it.hasNext()) {
298: SelectionKey key = (SelectionKey) it.next();
299: // Is a new connection coming in?
300: if (key.isAcceptable()) {
301: ServerSocketChannel server = (ServerSocketChannel) key
302: .channel();
303: SocketChannel channel = server.accept();
304: channel.socket().setReceiveBufferSize(
305: getRxBufSize());
306: channel.socket().setSendBufferSize(
307: getTxBufSize());
308: channel.socket().setTcpNoDelay(getTcpNoDelay());
309: channel.socket().setKeepAlive(getSoKeepAlive());
310: channel.socket().setOOBInline(getOoBInline());
311: channel.socket().setReuseAddress(
312: getSoReuseAddress());
313: channel.socket().setSoLinger(getSoLingerOn(),
314: getSoLingerTime());
315: channel.socket().setTrafficClass(
316: getSoTrafficClass());
317: channel.socket().setSoTimeout(getTimeout());
318: Object attach = new ObjectReader(channel);
319: registerChannel(selector, channel,
320: SelectionKey.OP_READ, attach);
321: }
322: // is there data to read on this channel?
323: if (key.isReadable()) {
324: readDataFromSocket(key);
325: } else {
326: key.interestOps(key.interestOps()
327: & (~SelectionKey.OP_WRITE));
328: }
329:
330: // remove key from selected set, it's been handled
331: it.remove();
332: }
333: } catch (java.nio.channels.ClosedSelectorException cse) {
334: // ignore is normal at shutdown or stop listen socket
335: } catch (java.nio.channels.CancelledKeyException nx) {
336: log
337: .warn("Replication client disconnected, error when polling key. Ignoring client.");
338: } catch (Throwable x) {
339: try {
340: log.error(
341: "Unable to process request in NioReceiver",
342: x);
343: } catch (Throwable tx) {
344: //in case an out of memory error, will affect the logging framework as well
345: tx.printStackTrace();
346: }
347: }
348:
349: }
350: serverChannel.close();
351: if (selector != null)
352: selector.close();
353: }
354:
355: /**
356: * Close Selector.
357: *
358: * @see org.apache.catalina.tribes.transport.ClusterReceiverBase#stopListening()
359: */
360: protected void stopListening() {
361: setListen(false);
362: if (selector != null) {
363: try {
364: selector.wakeup();
365: selector.close();
366: } catch (Exception x) {
367: log.error("Unable to close cluster receiver selector.",
368: x);
369: } finally {
370: selector = null;
371: }
372: }
373: }
374:
375: // ----------------------------------------------------------
376:
377: /**
378: * Register the given channel with the given selector for
379: * the given operations of interest
380: */
381: protected void registerChannel(Selector selector,
382: SelectableChannel channel, int ops, Object attach)
383: throws Exception {
384: if (channel == null)
385: return; // could happen
386: // set the new channel non-blocking
387: channel.configureBlocking(false);
388: // register it with the selector
389: channel.register(selector, ops, attach);
390: }
391:
392: /**
393: * Start thread and listen
394: */
395: public void run() {
396: try {
397: listen();
398: } catch (Exception x) {
399: log.error("Unable to run replication listener.", x);
400: }
401: }
402:
403: // ----------------------------------------------------------
404:
405: /**
406: * Sample data handler method for a channel with data ready to read.
407: * @param key A SelectionKey object associated with a channel
408: * determined by the selector to be ready for reading. If the
409: * channel returns an EOF condition, it is closed here, which
410: * automatically invalidates the associated key. The selector
411: * will then de-register the channel on the next select call.
412: */
413: protected void readDataFromSocket(SelectionKey key)
414: throws Exception {
415: NioReplicationTask task = (NioReplicationTask) getTaskPool()
416: .getRxTask();
417: if (task == null) {
418: // No threads/tasks available, do nothing, the selection
419: // loop will keep calling this method until a
420: // thread becomes available, the thread pool itself has a waiting mechanism
421: // so we will not wait here.
422: if (log.isDebugEnabled())
423: log.debug("No TcpReplicationThread available");
424: } else {
425: // invoking this wakes up the worker thread then returns
426: //add task to thread pool
427: task.serviceChannel(key);
428: getExecutor().execute(task);
429: }
430: }
431:
432: }
|