001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017: package org.apache.catalina.tribes.group;
018:
019: import java.io.Serializable;
020: import java.util.ArrayList;
021: import java.util.Arrays;
022: import java.util.HashMap;
023:
024: import org.apache.catalina.tribes.Channel;
025: import org.apache.catalina.tribes.ChannelException;
026: import org.apache.catalina.tribes.ChannelListener;
027: import org.apache.catalina.tribes.Member;
028: import org.apache.catalina.tribes.util.UUIDGenerator;
029:
030: /**
031: * A channel to handle RPC messaging
032: * @author Filip Hanik
033: */
034: public class RpcChannel implements ChannelListener {
035: protected static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory
036: .getLog(RpcChannel.class);
037:
038: public static final int FIRST_REPLY = 1;
039: public static final int MAJORITY_REPLY = 2;
040: public static final int ALL_REPLY = 3;
041: public static final int NO_REPLY = 4;
042:
043: private Channel channel;
044: private RpcCallback callback;
045: private byte[] rpcId;
046:
047: private HashMap responseMap = new HashMap();
048:
049: /**
050: * Create an RPC channel. You can have several RPC channels attached to a group
051: * all separated out by the uniqueness
052: * @param rpcId - the unique Id for this RPC group
053: * @param channel Channel
054: * @param callback RpcCallback
055: */
056: public RpcChannel(byte[] rpcId, Channel channel,
057: RpcCallback callback) {
058: this .channel = channel;
059: this .callback = callback;
060: this .rpcId = rpcId;
061: channel.addChannelListener(this );
062: }
063:
064: /**
065: * Send a message and wait for the response.
066: * @param destination Member[] - the destination for the message, and the members you request a reply from
067: * @param message Serializable - the message you are sending out
068: * @param options int - FIRST_REPLY, MAJORITY_REPLY or ALL_REPLY
069: * @param timeout long - timeout in milliseconds, if no reply is received within this time null is returned
070: * @return Response[] - an array of response objects.
071: * @throws ChannelException
072: */
073: public Response[] send(Member[] destination, Serializable message,
074: int rpcOptions, int channelOptions, long timeout)
075: throws ChannelException {
076:
077: if (destination == null || destination.length == 0)
078: return new Response[0];
079:
080: //avoid dead lock
081: channelOptions = channelOptions
082: & ~Channel.SEND_OPTIONS_SYNCHRONIZED_ACK;
083:
084: RpcCollectorKey key = new RpcCollectorKey(UUIDGenerator
085: .randomUUID(false));
086: RpcCollector collector = new RpcCollector(key, rpcOptions,
087: destination.length, timeout);
088: try {
089: synchronized (collector) {
090: if (rpcOptions != NO_REPLY)
091: responseMap.put(key, collector);
092: RpcMessage rmsg = new RpcMessage(rpcId, key.id, message);
093: channel.send(destination, rmsg, channelOptions);
094: if (rpcOptions != NO_REPLY)
095: collector.wait(timeout);
096: }
097: } catch (InterruptedException ix) {
098: Thread.currentThread().interrupted();
099: //throw new ChannelException(ix);
100: } finally {
101: responseMap.remove(key);
102: }
103: return collector.getResponses();
104: }
105:
106: public void messageReceived(Serializable msg, Member sender) {
107: RpcMessage rmsg = (RpcMessage) msg;
108: RpcCollectorKey key = new RpcCollectorKey(rmsg.uuid);
109: if (rmsg.reply) {
110: RpcCollector collector = (RpcCollector) responseMap
111: .get(key);
112: if (collector == null) {
113: callback.leftOver(rmsg.message, sender);
114: } else {
115: synchronized (collector) {
116: //make sure it hasn't been removed
117: if (responseMap.containsKey(key)) {
118: if ((rmsg instanceof RpcMessage.NoRpcChannelReply))
119: collector.destcnt--;
120: else
121: collector.addResponse(rmsg.message, sender);
122: if (collector.isComplete())
123: collector.notifyAll();
124: } else {
125: if (!(rmsg instanceof RpcMessage.NoRpcChannelReply))
126: callback.leftOver(rmsg.message, sender);
127: }
128: }//synchronized
129: }//end if
130: } else {
131: Serializable reply = callback.replyRequest(rmsg.message,
132: sender);
133: rmsg.reply = true;
134: rmsg.message = reply;
135: try {
136: channel.send(new Member[] { sender }, rmsg, 0);
137: } catch (Exception x) {
138: log
139: .error(
140: "Unable to send back reply in RpcChannel.",
141: x);
142: }
143: }//end if
144: }
145:
146: public void breakdown() {
147: channel.removeChannelListener(this );
148: }
149:
150: public void finalize() {
151: breakdown();
152: }
153:
154: public boolean accept(Serializable msg, Member sender) {
155: if (msg instanceof RpcMessage) {
156: RpcMessage rmsg = (RpcMessage) msg;
157: return Arrays.equals(rmsg.rpcId, rpcId);
158: } else
159: return false;
160: }
161:
162: public Channel getChannel() {
163: return channel;
164: }
165:
166: public RpcCallback getCallback() {
167: return callback;
168: }
169:
170: public byte[] getRpcId() {
171: return rpcId;
172: }
173:
174: public void setChannel(Channel channel) {
175: this .channel = channel;
176: }
177:
178: public void setCallback(RpcCallback callback) {
179: this .callback = callback;
180: }
181:
182: public void setRpcId(byte[] rpcId) {
183: this .rpcId = rpcId;
184: }
185:
186: /**
187: *
188: * Class that holds all response.
189: * @author not attributable
190: * @version 1.0
191: */
192: public static class RpcCollector {
193: public ArrayList responses = new ArrayList();
194: public RpcCollectorKey key;
195: public int options;
196: public int destcnt;
197: public long timeout;
198:
199: public RpcCollector(RpcCollectorKey key, int options,
200: int destcnt, long timeout) {
201: this .key = key;
202: this .options = options;
203: this .destcnt = destcnt;
204: this .timeout = timeout;
205: }
206:
207: public void addResponse(Serializable message, Member sender) {
208: Response resp = new Response(sender, message);
209: responses.add(resp);
210: }
211:
212: public boolean isComplete() {
213: if (destcnt <= 0)
214: return true;
215: switch (options) {
216: case ALL_REPLY:
217: return destcnt == responses.size();
218: case MAJORITY_REPLY: {
219: float perc = ((float) responses.size())
220: / ((float) destcnt);
221: return perc >= 0.50f;
222: }
223: case FIRST_REPLY:
224: return responses.size() > 0;
225: default:
226: return false;
227: }
228: }
229:
230: public int hashCode() {
231: return key.hashCode();
232: }
233:
234: public boolean equals(Object o) {
235: if (o instanceof RpcCollector) {
236: RpcCollector r = (RpcCollector) o;
237: return r.key.equals(this .key);
238: } else
239: return false;
240: }
241:
242: public Response[] getResponses() {
243: return (Response[]) responses
244: .toArray(new Response[responses.size()]);
245: }
246: }
247:
248: public static class RpcCollectorKey {
249: byte[] id;
250:
251: public RpcCollectorKey(byte[] id) {
252: this .id = id;
253: }
254:
255: public int hashCode() {
256: return id[0] + id[1] + id[2] + id[3];
257: }
258:
259: public boolean equals(Object o) {
260: if (o instanceof RpcCollectorKey) {
261: RpcCollectorKey r = (RpcCollectorKey) o;
262: return Arrays.equals(id, r.id);
263: } else
264: return false;
265: }
266:
267: }
268:
269: protected static String bToS(byte[] data) {
270: StringBuffer buf = new StringBuffer(4 * 16);
271: buf.append("{");
272: for (int i = 0; data != null && i < data.length; i++)
273: buf.append(String.valueOf(data[i])).append(" ");
274: buf.append("}");
275: return buf.toString();
276: }
277:
278: }
|