001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017:
018: package org.apache.catalina.tribes.transport.bio;
019:
020: import org.apache.catalina.tribes.io.ObjectReader;
021: import org.apache.catalina.tribes.transport.Constants;
022: import org.apache.catalina.tribes.transport.AbstractRxTask;
023: import java.net.Socket;
024: import java.io.InputStream;
025: import java.io.OutputStream;
026: import org.apache.catalina.tribes.io.ListenCallback;
027: import org.apache.catalina.tribes.ChannelMessage;
028: import org.apache.catalina.tribes.io.ChannelData;
029: import org.apache.catalina.tribes.io.BufferPool;
030:
031: /**
032: * A worker thread class which can drain channels and echo-back the input. Each
033: * instance is constructed with a reference to the owning thread pool object.
034: * When started, the thread loops forever waiting to be awakened to service the
035: * channel associated with a SelectionKey object. The worker is tasked by
036: * calling its serviceChannel() method with a SelectionKey object. The
037: * serviceChannel() method stores the key reference in the thread object then
038: * calls notify() to wake it up. When the channel has been drained, the worker
039: * thread returns itself to its parent pool.
040: *
041: * @author Filip Hanik
042: *
043: * @version $Revision: 500684 $, $Date: 2007-01-28 00:27:18 +0100 (dim., 28 janv. 2007) $
044: */
045: public class BioReplicationTask extends AbstractRxTask {
046:
047: protected static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory
048: .getLog(BioReplicationTask.class);
049:
050: protected Socket socket;
051: protected ObjectReader reader;
052:
053: public BioReplicationTask(ListenCallback callback) {
054: super (callback);
055: }
056:
057: // loop forever waiting for work to do
058: public synchronized void run() {
059: if (socket == null)
060: return;
061: try {
062: drainSocket();
063: } catch (Exception x) {
064: log.error("Unable to service bio socket");
065: } finally {
066: try {
067: socket.close();
068: } catch (Exception ignore) {
069: }
070: try {
071: reader.close();
072: } catch (Exception ignore) {
073: }
074: reader = null;
075: socket = null;
076: }
077: // done, ready for more, return to pool
078: if (getTaskPool() != null)
079: getTaskPool().returnWorker(this );
080: }
081:
082: public synchronized void serviceSocket(Socket socket,
083: ObjectReader reader) {
084: this .socket = socket;
085: this .reader = reader;
086: this .notify(); // awaken the thread
087: }
088:
089: protected void execute(ObjectReader reader) throws Exception {
090: int pkgcnt = reader.count();
091:
092: if (pkgcnt > 0) {
093: ChannelMessage[] msgs = reader.execute();
094: for (int i = 0; i < msgs.length; i++) {
095: /**
096: * Use send ack here if you want to ack the request to the remote
097: * server before completing the request
098: * This is considered an asynchronized request
099: */
100: if (ChannelData.sendAckAsync(msgs[i].getOptions()))
101: sendAck(Constants.ACK_COMMAND);
102: try {
103: //process the message
104: getCallback().messageDataReceived(msgs[i]);
105: /**
106: * Use send ack here if you want the request to complete on this
107: * server before sending the ack to the remote server
108: * This is considered a synchronized request
109: */
110: if (ChannelData.sendAckSync(msgs[i].getOptions()))
111: sendAck(Constants.ACK_COMMAND);
112: } catch (Exception x) {
113: if (ChannelData.sendAckSync(msgs[i].getOptions()))
114: sendAck(Constants.FAIL_ACK_COMMAND);
115: log.error("Error thrown from messageDataReceived.",
116: x);
117: }
118: if (getUseBufferPool()) {
119: BufferPool.getBufferPool().returnBuffer(
120: msgs[i].getMessage());
121: msgs[i].setMessage(null);
122: }
123: }
124: }
125:
126: }
127:
128: /**
129: * The actual code which drains the channel associated with
130: * the given key. This method assumes the key has been
131: * modified prior to invocation to turn off selection
132: * interest in OP_READ. When this method completes it
133: * re-enables OP_READ and calls wakeup() on the selector
134: * so the selector will resume watching this channel.
135: */
136: protected void drainSocket() throws Exception {
137: InputStream in = socket.getInputStream();
138: // loop while data available, channel is non-blocking
139: byte[] buf = new byte[1024];
140: int length = in.read(buf);
141: while (length >= 0) {
142: int count = reader.append(buf, 0, length, true);
143: if (count > 0)
144: execute(reader);
145: length = in.read(buf);
146: }
147: }
148:
149: /**
150: * send a reply-acknowledgement (6,2,3)
151: * @param key
152: * @param channel
153: */
154: protected void sendAck(byte[] command) {
155: try {
156: OutputStream out = socket.getOutputStream();
157: out.write(command);
158: out.flush();
159: if (log.isTraceEnabled()) {
160: log.trace("ACK sent to " + socket.getPort());
161: }
162: } catch (java.io.IOException x) {
163: log
164: .warn("Unable to send ACK back through channel, channel disconnected?: "
165: + x.getMessage());
166: }
167: }
168:
169: public void close() {
170: setDoRun(false);
171: try {
172: socket.close();
173: } catch (Exception ignore) {
174: }
175: try {
176: reader.close();
177: } catch (Exception ignore) {
178: }
179: reader = null;
180: socket = null;
181: super.close();
182: }
183: }
|