001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017:
018: package org.apache.catalina.tribes.transport.nio;
019:
020: import java.io.IOException;
021: import java.nio.ByteBuffer;
022: import java.nio.channels.SelectionKey;
023: import java.nio.channels.SocketChannel;
024:
025: import org.apache.catalina.tribes.io.ObjectReader;
026: import org.apache.catalina.tribes.transport.Constants;
027: import org.apache.catalina.tribes.transport.AbstractRxTask;
028: import org.apache.catalina.tribes.ChannelMessage;
029: import org.apache.catalina.tribes.io.ListenCallback;
030: import org.apache.catalina.tribes.io.ChannelData;
031: import org.apache.catalina.tribes.io.BufferPool;
032: import java.nio.channels.CancelledKeyException;
033: import org.apache.catalina.tribes.UniqueId;
034: import org.apache.catalina.tribes.RemoteProcessException;
035: import org.apache.catalina.tribes.util.Logs;
036:
037: /**
038: * A worker thread class which can drain channels and echo-back the input. Each
039: * instance is constructed with a reference to the owning thread pool object.
040: * When started, the thread loops forever waiting to be awakened to service the
041: * channel associated with a SelectionKey object. The worker is tasked by
042: * calling its serviceChannel() method with a SelectionKey object. The
043: * serviceChannel() method stores the key reference in the thread object then
044: * calls notify() to wake it up. When the channel has been drained, the worker
045: * thread returns itself to its parent pool.
046: *
047: * @author Filip Hanik
048: *
049: * @version $Revision: 546955 $, $Date: 2007-06-13 19:00:21 +0200 (mer., 13 juin 2007) $
050: */
051: public class NioReplicationTask extends AbstractRxTask {
052:
053: private static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory
054: .getLog(NioReplicationTask.class);
055:
056: private ByteBuffer buffer = null;
057: private SelectionKey key;
058: private int rxBufSize;
059: private NioReceiver receiver;
060:
061: public NioReplicationTask(ListenCallback callback,
062: NioReceiver receiver) {
063: super (callback);
064: this .receiver = receiver;
065: }
066:
067: // loop forever waiting for work to do
068: public synchronized void run() {
069: if (buffer == null) {
070: if ((getOptions() & OPTION_DIRECT_BUFFER) == OPTION_DIRECT_BUFFER) {
071: buffer = ByteBuffer.allocateDirect(getRxBufSize());
072: } else {
073: buffer = ByteBuffer.allocate(getRxBufSize());
074: }
075: } else {
076: buffer.clear();
077: }
078: if (key == null) {
079: return; // just in case
080: }
081: if (log.isTraceEnabled())
082: log.trace("Servicing key:" + key);
083:
084: try {
085: ObjectReader reader = (ObjectReader) key.attachment();
086: if (reader == null) {
087: if (log.isTraceEnabled())
088: log.trace("No object reader, cancelling:" + key);
089: cancelKey(key);
090: } else {
091: if (log.isTraceEnabled())
092: log.trace("Draining channel:" + key);
093:
094: drainChannel(key, reader);
095: }
096: } catch (Exception e) {
097: //this is common, since the sockets on the other
098: //end expire after a certain time.
099: if (e instanceof CancelledKeyException) {
100: //do nothing
101: } else if (e instanceof IOException) {
102: //dont spew out stack traces for IO exceptions unless debug is enabled.
103: if (log.isDebugEnabled())
104: log
105: .debug(
106: "IOException in replication worker, unable to drain channel. Probable cause: Keep alive socket closed["
107: + e.getMessage() + "].", e);
108: else
109: log
110: .warn("IOException in replication worker, unable to drain channel. Probable cause: Keep alive socket closed["
111: + e.getMessage() + "].");
112: } else if (log.isErrorEnabled()) {
113: //this is a real error, log it.
114: log
115: .error(
116: "Exception caught in TcpReplicationThread.drainChannel.",
117: e);
118: }
119: cancelKey(key);
120: } finally {
121:
122: }
123: key = null;
124: // done, ready for more, return to pool
125: getTaskPool().returnWorker(this );
126: }
127:
128: /**
129: * Called to initiate a unit of work by this worker thread
130: * on the provided SelectionKey object. This method is
131: * synchronized, as is the run() method, so only one key
132: * can be serviced at a given time.
133: * Before waking the worker thread, and before returning
134: * to the main selection loop, this key's interest set is
135: * updated to remove OP_READ. This will cause the selector
136: * to ignore read-readiness for this channel while the
137: * worker thread is servicing it.
138: */
139: public synchronized void serviceChannel(SelectionKey key) {
140: if (log.isTraceEnabled())
141: log.trace("About to service key:" + key);
142: ObjectReader reader = (ObjectReader) key.attachment();
143: if (reader != null)
144: reader.setLastAccess(System.currentTimeMillis());
145: this .key = key;
146: key.interestOps(key.interestOps() & (~SelectionKey.OP_READ));
147: key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
148: }
149:
150: /**
151: * The actual code which drains the channel associated with
152: * the given key. This method assumes the key has been
153: * modified prior to invocation to turn off selection
154: * interest in OP_READ. When this method completes it
155: * re-enables OP_READ and calls wakeup() on the selector
156: * so the selector will resume watching this channel.
157: */
158: protected void drainChannel(final SelectionKey key,
159: ObjectReader reader) throws Exception {
160: reader.setLastAccess(System.currentTimeMillis());
161: reader.access();
162: SocketChannel channel = (SocketChannel) key.channel();
163: int count;
164: buffer.clear(); // make buffer empty
165:
166: // loop while data available, channel is non-blocking
167: while ((count = channel.read(buffer)) > 0) {
168: buffer.flip(); // make buffer readable
169: if (buffer.hasArray())
170: reader.append(buffer.array(), 0, count, false);
171: else
172: reader.append(buffer, count, false);
173: buffer.clear(); // make buffer empty
174: //do we have at least one package?
175: if (reader.hasPackage())
176: break;
177: }
178:
179: int pkgcnt = reader.count();
180:
181: if (count < 0 && pkgcnt == 0) {
182: //end of stream, and no more packages to process
183: remoteEof(key);
184: return;
185: }
186:
187: ChannelMessage[] msgs = pkgcnt == 0 ? ChannelData.EMPTY_DATA_ARRAY
188: : reader.execute();
189:
190: registerForRead(key, reader);//register to read new data, before we send it off to avoid dead locks
191:
192: for (int i = 0; i < msgs.length; i++) {
193: /**
194: * Use send ack here if you want to ack the request to the remote
195: * server before completing the request
196: * This is considered an asynchronized request
197: */
198: if (ChannelData.sendAckAsync(msgs[i].getOptions()))
199: sendAck(key, channel, Constants.ACK_COMMAND);
200: try {
201: if (Logs.MESSAGES.isTraceEnabled()) {
202: try {
203: Logs.MESSAGES
204: .trace("NioReplicationThread - Received msg:"
205: + new UniqueId(msgs[i]
206: .getUniqueId())
207: + " at "
208: + new java.sql.Timestamp(System
209: .currentTimeMillis()));
210: } catch (Throwable t) {
211: }
212: }
213: //process the message
214: getCallback().messageDataReceived(msgs[i]);
215: /**
216: * Use send ack here if you want the request to complete on this
217: * server before sending the ack to the remote server
218: * This is considered a synchronized request
219: */
220: if (ChannelData.sendAckSync(msgs[i].getOptions()))
221: sendAck(key, channel, Constants.ACK_COMMAND);
222: } catch (RemoteProcessException e) {
223: if (log.isDebugEnabled())
224: log.error("Processing of cluster message failed.",
225: e);
226: if (ChannelData.sendAckSync(msgs[i].getOptions()))
227: sendAck(key, channel, Constants.FAIL_ACK_COMMAND);
228: } catch (Exception e) {
229: log.error("Processing of cluster message failed.", e);
230: if (ChannelData.sendAckSync(msgs[i].getOptions()))
231: sendAck(key, channel, Constants.FAIL_ACK_COMMAND);
232: }
233: if (getUseBufferPool()) {
234: BufferPool.getBufferPool().returnBuffer(
235: msgs[i].getMessage());
236: msgs[i].setMessage(null);
237: }
238: }
239:
240: if (count < 0) {
241: remoteEof(key);
242: return;
243: }
244: }
245:
246: private void remoteEof(SelectionKey key) {
247: // close channel on EOF, invalidates the key
248: if (log.isDebugEnabled())
249: log
250: .debug("Channel closed on the remote end, disconnecting");
251: cancelKey(key);
252: }
253:
254: protected void registerForRead(final SelectionKey key,
255: ObjectReader reader) {
256: if (log.isTraceEnabled())
257: log.trace("Adding key for read event:" + key);
258: reader.finish();
259: //register our OP_READ interest
260: Runnable r = new Runnable() {
261: public void run() {
262: try {
263: if (key.isValid()) {
264: // cycle the selector so this key is active again
265: key.selector().wakeup();
266: // resume interest in OP_READ, OP_WRITE
267: int resumeOps = key.interestOps()
268: | SelectionKey.OP_READ;
269: key.interestOps(resumeOps);
270: if (log.isTraceEnabled())
271: log
272: .trace("Registering key for read:"
273: + key);
274: }
275: } catch (CancelledKeyException ckx) {
276: NioReceiver.cancelledKey(key);
277: if (log.isTraceEnabled())
278: log.trace("CKX Cancelling key:" + key);
279:
280: } catch (Exception x) {
281: log.error("Error registering key for read:" + key,
282: x);
283: }
284: }
285: };
286: receiver.addEvent(r);
287: }
288:
289: private void cancelKey(final SelectionKey key) {
290: if (log.isTraceEnabled())
291: log.trace("Adding key for cancel event:" + key);
292:
293: ObjectReader reader = (ObjectReader) key.attachment();
294: if (reader != null) {
295: reader.setCancelled(true);
296: reader.finish();
297: }
298: Runnable cx = new Runnable() {
299: public void run() {
300: if (log.isTraceEnabled())
301: log.trace("Cancelling key:" + key);
302:
303: NioReceiver.cancelledKey(key);
304: }
305: };
306: receiver.addEvent(cx);
307: }
308:
309: /**
310: * send a reply-acknowledgement (6,2,3)
311: * @param key
312: * @param channel
313: */
314: protected void sendAck(SelectionKey key, SocketChannel channel,
315: byte[] command) {
316:
317: try {
318: ByteBuffer buf = ByteBuffer.wrap(command);
319: int total = 0;
320: while (total < command.length) {
321: total += channel.write(buf);
322: }
323: if (log.isTraceEnabled()) {
324: log.trace("ACK sent to " + channel.socket().getPort());
325: }
326: } catch (java.io.IOException x) {
327: log
328: .warn("Unable to send ACK back through channel, channel disconnected?: "
329: + x.getMessage());
330: }
331: }
332:
333: public void setRxBufSize(int rxBufSize) {
334: this .rxBufSize = rxBufSize;
335: }
336:
337: public int getRxBufSize() {
338: return rxBufSize;
339: }
340: }
|