001: /*
002: * Copyright 1999,2004 The Apache Software Foundation.
003: *
004: * Licensed under the Apache License, Version 2.0 (the "License");
005: * you may not use this file except in compliance with the License.
006: * You may obtain a copy of the License at
007: *
008: * http://www.apache.org/licenses/LICENSE-2.0
009: *
010: * Unless required by applicable law or agreed to in writing, software
011: * distributed under the License is distributed on an "AS IS" BASIS,
012: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013: * See the License for the specific language governing permissions and
014: * limitations under the License.
015: */
016:
017: package org.apache.catalina.cluster.tcp;
018:
019: import java.nio.ByteBuffer;
020: import java.nio.channels.SelectionKey;
021: import java.nio.channels.Selector;
022: import java.util.List;
023: import java.io.IOException;
024: import java.nio.channels.SocketChannel;
025: import org.apache.catalina.cluster.io.ObjectReader;
026:
027: import java.util.LinkedList;
028:
029: /**
030: * A worker thread class which can drain channels and echo-back
031: * the input. Each instance is constructed with a reference to
032: * the owning thread pool object. When started, the thread loops
033: * forever waiting to be awakened to service the channel associated
034: * with a SelectionKey object.
035: * The worker is tasked by calling its serviceChannel() method
036: * with a SelectionKey object. The serviceChannel() method stores
037: * the key reference in the thread object then calls notify()
038: * to wake it up. When the channel has been drained, the worker
039: * thread returns itself to its parent pool.
040: */
041: public class TcpReplicationThread extends WorkerThread {
042: private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory
043: .getLog(TcpReplicationThread.class);
044: private ByteBuffer buffer = ByteBuffer.allocate(1024);
045: private SelectionKey key;
046: private boolean synchronous = false;
047:
048: TcpReplicationThread() {
049: }
050:
051: // loop forever waiting for work to do
052: public synchronized void run() {
053: while (doRun) {
054: try {
055: // sleep and release object lock
056: this .wait();
057: } catch (InterruptedException e) {
058: log.info("TCP worker thread interrupted in cluster", e);
059: // clear interrupt status
060: this .interrupted();
061: }
062: if (key == null) {
063: continue; // just in case
064: }
065: try {
066: drainChannel(key);
067: } catch (Exception e) {
068: log.error("TCP Worker thread in cluster caught '" + e
069: + "' closing channel", e);
070:
071: // close channel and nudge selector
072: try {
073: key.channel().close();
074: } catch (IOException ex) {
075: log.error("Unable to close channel.", ex);
076: }
077: key.selector().wakeup();
078: }
079: key = null;
080: // done, ready for more, return to pool
081: this .pool.returnWorker(this );
082: }
083: }
084:
085: /**
086: * Called to initiate a unit of work by this worker thread
087: * on the provided SelectionKey object. This method is
088: * synchronized, as is the run() method, so only one key
089: * can be serviced at a given time.
090: * Before waking the worker thread, and before returning
091: * to the main selection loop, this key's interest set is
092: * updated to remove OP_READ. This will cause the selector
093: * to ignore read-readiness for this channel while the
094: * worker thread is servicing it.
095: */
096: synchronized void serviceChannel(SelectionKey key,
097: boolean synchronous) {
098: this .key = key;
099: this .synchronous = synchronous;
100: key.interestOps(key.interestOps() & (~SelectionKey.OP_READ));
101: key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
102: this .notify(); // awaken the thread
103: }
104:
105: /**
106: * The actual code which drains the channel associated with
107: * the given key. This method assumes the key has been
108: * modified prior to invocation to turn off selection
109: * interest in OP_READ. When this method completes it
110: * re-enables OP_READ and calls wakeup() on the selector
111: * so the selector will resume watching this channel.
112: */
113: private void drainChannel(SelectionKey key) throws Exception {
114: boolean packetReceived = false;
115: SocketChannel channel = (SocketChannel) key.channel();
116: int count;
117: buffer.clear(); // make buffer empty
118: ObjectReader reader = (ObjectReader) key.attachment();
119: // loop while data available, channel is non-blocking
120: while ((count = channel.read(buffer)) > 0) {
121: buffer.flip(); // make buffer readable
122: int pkgcnt = reader.append(buffer.array(), 0, count);
123: buffer.clear(); // make buffer empty
124: }
125: //check to see if any data is available
126: int pkgcnt = reader.execute();
127: while (pkgcnt > 0) {
128: if (synchronous) {
129: sendAck(key, channel);
130: } //end if
131: pkgcnt--;
132: }
133: if (count < 0) {
134: // close channel on EOF, invalidates the key
135: channel.close();
136: return;
137: }
138:
139: //acquire the interestOps mutex
140: Object mutex = this .getPool().getInterestOpsMutex();
141: synchronized (mutex) {
142: // cycle the selector so this key is active again
143: key.selector().wakeup();
144: // resume interest in OP_READ, OP_WRITE
145: int resumeOps = key.interestOps() | SelectionKey.OP_READ;
146: key.interestOps(resumeOps);
147: }
148:
149: }
150:
151: private void sendAck(SelectionKey key, SocketChannel channel) {
152: //send a reply-acknowledgement
153: try {
154: channel.write(ByteBuffer.wrap(new byte[] { 6, 2, 3 }));
155: } catch (java.io.IOException x) {
156: log
157: .warn("Unable to send ACK back through channel, channel disconnected?: "
158: + x.getMessage());
159: }
160: }
161: }
|