001: /*
002: * Copyright 1999,2004 The Apache Software Foundation.
003: *
004: * Licensed under the Apache License, Version 2.0 (the "License");
005: * you may not use this file except in compliance with the License.
006: * You may obtain a copy of the License at
007: *
008: * http://www.apache.org/licenses/LICENSE-2.0
009: *
010: * Unless required by applicable law or agreed to in writing, software
011: * distributed under the License is distributed on an "AS IS" BASIS,
012: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013: * See the License for the specific language governing permissions and
014: * limitations under the License.
015: */
016:
017: package org.apache.catalina.cluster.tcp;
018:
019: import java.net.InetAddress;
020: import java.net.Socket;
021: import java.io.IOException;
022: import org.apache.catalina.cluster.util.SmartQueue;
023:
024: public class AsyncSocketSender implements IDataSender {
025: private static int threadCounter = 1;
026:
027: private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory
028: .getLog(AsyncSocketSender.class);
029:
030: private InetAddress address;
031: private int port;
032: private Socket sc = null;
033: private boolean isSocketConnected = false;
034: private SmartQueue queue = new SmartQueue();
035: private boolean suspect;
036:
037: private QueueThread queueThread = null;
038:
039: public AsyncSocketSender(InetAddress host, int port) {
040: this .address = host;
041: this .port = port;
042: checkThread();
043: log.info("Started async sender thread for TCP replication.");
044: }
045:
046: public InetAddress getAddress() {
047: return address;
048: }
049:
050: public int getPort() {
051: return port;
052: }
053:
054: public void connect() throws java.io.IOException {
055: sc = new Socket(getAddress(), getPort());
056: isSocketConnected = true;
057: checkThread();
058:
059: }
060:
061: protected void checkThread() {
062: if (queueThread == null) {
063: queueThread = new QueueThread(this );
064: queueThread.setDaemon(true);
065: queueThread.start();
066: }
067: }
068:
069: public void disconnect() {
070: try {
071: sc.close();
072: } catch (Exception x) {
073: }
074: isSocketConnected = false;
075: if (queueThread != null) {
076: queueThread.stopRunning();
077: queueThread = null;
078: }
079:
080: }
081:
082: public boolean isConnected() {
083: return isSocketConnected;
084: }
085:
086: /**
087: * Blocking send
088: * @param data
089: * @throws java.io.IOException
090: */
091: private synchronized void sendMessage(byte[] data)
092: throws java.io.IOException {
093: if (!isConnected())
094: connect();
095: try {
096: sc.getOutputStream().write(data);
097: } catch (java.io.IOException x) {
098: disconnect();
099: connect();
100: sc.getOutputStream().write(data);
101: }
102: }
103:
104: public synchronized void sendMessage(String sessionId, byte[] data)
105: throws java.io.IOException {
106: SmartQueue.SmartEntry entry = new SmartQueue.SmartEntry(
107: sessionId, data);
108: queue.add(entry);
109: }
110:
111: public String toString() {
112: StringBuffer buf = new StringBuffer("SocketSender[");
113: buf.append(getAddress()).append(":").append(getPort()).append(
114: "]");
115: return buf.toString();
116: }
117:
118: public boolean isSuspect() {
119: return suspect;
120: }
121:
122: public boolean getSuspect() {
123: return suspect;
124: }
125:
126: public void setSuspect(boolean suspect) {
127: this .suspect = suspect;
128: }
129:
130: private class QueueThread extends Thread {
131: AsyncSocketSender sender;
132: private boolean keepRunning = true;
133:
134: public QueueThread(AsyncSocketSender sender) {
135: this .sender = sender;
136: setName("Cluster-AsyncSocketSender-" + (threadCounter++));
137: }
138:
139: public void stopRunning() {
140: keepRunning = false;
141: }
142:
143: public void run() {
144: while (keepRunning) {
145: SmartQueue.SmartEntry entry = sender.queue.remove(5000);
146: if (entry != null) {
147: try {
148: byte[] data = (byte[]) entry.getValue();
149: sender.sendMessage(data);
150: } catch (Exception x) {
151: log
152: .warn("Unable to asynchronously send session w/ id="
153: + entry.getKey()
154: + " message will be ignored.");
155: }
156: }
157: }
158: }
159: }
160: }
|