001: /*
002: * Copyright 1999,2004 The Apache Software Foundation.
003: *
004: * Licensed under the Apache License, Version 2.0 (the "License");
005: * you may not use this file except in compliance with the License.
006: * You may obtain a copy of the License at
007: *
008: * http://www.apache.org/licenses/LICENSE-2.0
009: *
010: * Unless required by applicable law or agreed to in writing, software
011: * distributed under the License is distributed on an "AS IS" BASIS,
012: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013: * See the License for the specific language governing permissions and
014: * limitations under the License.
015: */
016:
017: package org.apache.catalina.cluster.tcp;
018:
019: import org.apache.catalina.cluster.io.XByteBuffer;
020: import org.apache.catalina.cluster.Member;
021: import org.apache.catalina.cluster.ClusterSender;
022:
023: public class ReplicationTransmitter implements ClusterSender {
024: private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory
025: .getLog(ReplicationTransmitter.class);
026:
027: private java.util.HashMap map = new java.util.HashMap();
028:
029: public ReplicationTransmitter() {
030: }
031:
032: private static long nrOfRequests = 0;
033: private static long totalBytes = 0;
034: private String replicationMode;
035:
036: private static synchronized void addStats(int length) {
037: nrOfRequests++;
038: totalBytes += length;
039: if ((nrOfRequests % 100) == 0) {
040: log.info("Nr of bytes sent=" + totalBytes + " over "
041: + nrOfRequests + " =="
042: + (totalBytes / nrOfRequests) + " bytes/request");
043: }
044:
045: }
046:
047: public void setReplicationMode(String mode) {
048: String msg = IDataSenderFactory.validateMode(mode);
049: if (msg == null) {
050: log.debug("Setting replcation mode to " + mode);
051: this .replicationMode = mode;
052: } else
053: throw new IllegalArgumentException(msg);
054:
055: }
056:
057: public synchronized void add(Member member) {
058: try {
059: IDataSender sender = IDataSenderFactory.getIDataSender(
060: replicationMode, member);
061: String key = sender.getAddress().getHostAddress() + ":"
062: + sender.getPort();
063: if (!map.containsKey(key))
064: map.put(sender.getAddress().getHostAddress() + ":"
065: + sender.getPort(), sender);
066: } catch (java.io.IOException x) {
067: log.error("Unable to create and add a IDataSender object.",
068: x);
069: }
070: }//add
071:
072: public synchronized void remove(Member member) {
073: String key = member.getHost() + ":" + member.getPort();
074: IDataSender toberemoved = (IDataSender) map.get(key);
075: if (toberemoved == null)
076: return;
077: toberemoved.disconnect();
078: map.remove(key);
079: }
080:
081: public void start() throws java.io.IOException {
082: //don't have to do shit, we connect on demand
083: }
084:
085: public synchronized void stop() {
086: java.util.Iterator i = map.entrySet().iterator();
087: while (i.hasNext()) {
088: IDataSender sender = (IDataSender) ((java.util.Map.Entry) i
089: .next()).getValue();
090: try {
091: sender.disconnect();
092: } catch (Exception x) {
093: }
094: }//while
095: }//stop
096:
097: public IDataSender[] getSenders() {
098: java.util.Iterator i = map.entrySet().iterator();
099: java.util.Vector v = new java.util.Vector();
100: while (i.hasNext()) {
101: IDataSender sender = (IDataSender) ((java.util.Map.Entry) i
102: .next()).getValue();
103: if (sender != null)
104: v.addElement(sender);
105: }
106: IDataSender[] result = new IDataSender[v.size()];
107: v.copyInto(result);
108: return result;
109: }
110:
111: protected void sendMessageData(String sessionId, byte[] data,
112: IDataSender sender) throws java.io.IOException {
113: if (sender == null)
114: throw new java.io.IOException(
115: "Sender not available. Make sure sender information is available to the ReplicationTransmitter.");
116: try {
117: if (!sender.isConnected())
118: sender.connect();
119: sender.sendMessage(sessionId, data);
120: sender.setSuspect(false);
121: addStats(data.length);
122: } catch (Exception x) {
123: if (!sender.getSuspect()) {
124: log
125: .warn(
126: "Unable to send replicated message, is server down?",
127: x);
128: }
129: sender.setSuspect(true);
130:
131: }
132:
133: }
134:
135: public void sendMessage(String sessionId, byte[] indata,
136: Member member) throws java.io.IOException {
137: byte[] data = XByteBuffer.createDataPackage(indata);
138: String key = member.getHost() + ":" + member.getPort();
139: IDataSender sender = (IDataSender) map.get(key);
140: sendMessageData(sessionId, data, sender);
141: }
142:
143: public void sendMessage(String sessionId, byte[] indata)
144: throws java.io.IOException {
145: IDataSender[] senders = getSenders();
146: byte[] data = XByteBuffer.createDataPackage(indata);
147: for (int i = 0; i < senders.length; i++) {
148:
149: IDataSender sender = senders[i];
150: try {
151: sendMessageData(sessionId, data, sender);
152: } catch (Exception x) {
153:
154: if (!sender.getSuspect())
155: log.warn("Unable to send replicated message to "
156: + sender + ", is server down?", x);
157: sender.setSuspect(true);
158: }
159: }//while
160: }
161:
162: public String getReplicationMode() {
163: return replicationMode;
164: }
165:
166: public boolean getIsSenderSynchronized() {
167: return IDataSenderFactory.SYNC_MODE.equals(replicationMode)
168: || IDataSenderFactory.POOLED_SYNC_MODE
169: .equals(replicationMode);
170: }
171:
172: }
|