001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017: package org.apache.catalina.tribes.transport.nio;
018:
019: import java.io.IOException;
020: import java.nio.channels.SelectionKey;
021: import java.nio.channels.Selector;
022: import java.util.HashMap;
023: import java.util.Iterator;
024: import java.util.Map;
025:
026: import org.apache.catalina.tribes.ChannelException;
027: import org.apache.catalina.tribes.ChannelMessage;
028: import org.apache.catalina.tribes.Member;
029: import org.apache.catalina.tribes.io.ChannelData;
030: import org.apache.catalina.tribes.io.XByteBuffer;
031: import org.apache.catalina.tribes.transport.MultiPointSender;
032: import org.apache.catalina.tribes.transport.SenderState;
033: import org.apache.catalina.tribes.transport.AbstractSender;
034: import java.net.UnknownHostException;
035: import org.apache.catalina.tribes.Channel;
036: import org.apache.catalina.tribes.util.Logs;
037: import org.apache.catalina.tribes.UniqueId;
038:
039: /**
040: * <p>Title: </p>
041: *
042: * <p>Description: </p>
043: *
044: * <p>Company: </p>
045: *
046: * @author not attributable
047: * @version 1.0
048: */
049: public class ParallelNioSender extends AbstractSender implements
050: MultiPointSender {
051:
052: protected static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory
053: .getLog(ParallelNioSender.class);
054: protected long selectTimeout = 5000; //default 5 seconds, same as send timeout
055: protected Selector selector;
056: protected HashMap nioSenders = new HashMap();
057:
058: public ParallelNioSender() throws IOException {
059: selector = Selector.open();
060: setConnected(true);
061: }
062:
063: public synchronized void sendMessage(Member[] destination,
064: ChannelMessage msg) throws ChannelException {
065: long start = System.currentTimeMillis();
066: byte[] data = XByteBuffer.createDataPackage((ChannelData) msg);
067: NioSender[] senders = setupForSend(destination);
068: connect(senders);
069: setData(senders, data);
070:
071: int remaining = senders.length;
072: ChannelException cx = null;
073: try {
074: //loop until complete, an error happens, or we timeout
075: long delta = System.currentTimeMillis() - start;
076: boolean waitForAck = (Channel.SEND_OPTIONS_USE_ACK & msg
077: .getOptions()) == Channel.SEND_OPTIONS_USE_ACK;
078: while ((remaining > 0) && (delta < getTimeout())) {
079: try {
080: remaining -= doLoop(selectTimeout,
081: getMaxRetryAttempts(), waitForAck, msg);
082: } catch (Exception x) {
083: int faulty = (cx == null) ? 0 : cx
084: .getFaultyMembers().length;
085: if (cx == null) {
086: if (x instanceof ChannelException)
087: cx = (ChannelException) x;
088: else
089: cx = new ChannelException(
090: "Parallel NIO send failed.", x);
091: } else {
092: if (x instanceof ChannelException)
093: cx.addFaultyMember(((ChannelException) x)
094: .getFaultyMembers());
095: }
096: //count down the remaining on an error
097: if (faulty < cx.getFaultyMembers().length)
098: remaining -= (cx.getFaultyMembers().length - faulty);
099: }
100: //bail out if all remaining senders are failing
101: if (cx != null
102: && cx.getFaultyMembers().length == remaining)
103: throw cx;
104: delta = System.currentTimeMillis() - start;
105: }
106: if (remaining > 0) {
107: //timeout has occured
108: ChannelException cxtimeout = new ChannelException(
109: "Operation has timed out(" + getTimeout()
110: + " ms.).");
111: if (cx == null)
112: cx = new ChannelException(
113: "Operation has timed out(" + getTimeout()
114: + " ms.).");
115: for (int i = 0; i < senders.length; i++) {
116: if (!senders[i].isComplete())
117: cx.addFaultyMember(senders[i].getDestination(),
118: cxtimeout);
119: }
120: throw cx;
121: } else if (cx != null) {
122: //there was an error
123: throw cx;
124: }
125: } catch (Exception x) {
126: try {
127: this .disconnect();
128: } catch (Exception ignore) {
129: }
130: if (x instanceof ChannelException)
131: throw (ChannelException) x;
132: else
133: throw new ChannelException(x);
134: }
135:
136: }
137:
138: private int doLoop(long selectTimeOut, int maxAttempts,
139: boolean waitForAck, ChannelMessage msg) throws IOException,
140: ChannelException {
141: int completed = 0;
142: int selectedKeys = selector.select(selectTimeOut);
143:
144: if (selectedKeys == 0) {
145: return 0;
146: }
147:
148: Iterator it = selector.selectedKeys().iterator();
149: while (it.hasNext()) {
150: SelectionKey sk = (SelectionKey) it.next();
151: it.remove();
152: int readyOps = sk.readyOps();
153: sk.interestOps(sk.interestOps() & ~readyOps);
154: NioSender sender = (NioSender) sk.attachment();
155: try {
156: if (sender.process(sk, waitForAck)) {
157: completed++;
158: sender.setComplete(true);
159: if (Logs.MESSAGES.isTraceEnabled()) {
160: Logs.MESSAGES
161: .trace("ParallelNioSender - Sent msg:"
162: + new UniqueId(msg
163: .getUniqueId())
164: + " at "
165: + new java.sql.Timestamp(System
166: .currentTimeMillis())
167: + " to "
168: + sender.getDestination()
169: .getName());
170: }
171: SenderState.getSenderState(sender.getDestination())
172: .setReady();
173: }//end if
174: } catch (Exception x) {
175: SenderState state = SenderState.getSenderState(sender
176: .getDestination());
177: int attempt = sender.getAttempt() + 1;
178: boolean retry = (sender.getAttempt() <= maxAttempts && maxAttempts > 0);
179: synchronized (state) {
180:
181: //sk.cancel();
182: if (state.isSuspect())
183: state.setFailing();
184: if (state.isReady()) {
185: state.setSuspect();
186: if (retry)
187: log
188: .warn("Member send is failing for:"
189: + sender.getDestination()
190: .getName()
191: + " ; Setting to suspect and retrying.");
192: else
193: log.warn("Member send is failing for:"
194: + sender.getDestination().getName()
195: + " ; Setting to suspect.", x);
196: }
197: }
198: if (!isConnected()) {
199: log.warn("Not retrying send for:"
200: + sender.getDestination().getName()
201: + "; Sender is disconnected.");
202: ChannelException cx = new ChannelException(
203: "Send failed, and sender is disconnected. Not retrying.",
204: x);
205: cx.addFaultyMember(sender.getDestination(), x);
206: throw cx;
207: }
208:
209: byte[] data = sender.getMessage();
210: if (retry) {
211: try {
212: sender.disconnect();
213: sender.connect();
214: sender.setAttempt(attempt);
215: sender.setMessage(data);
216: } catch (Exception ignore) {
217: state.setFailing();
218: }
219: } else {
220: ChannelException cx = new ChannelException(
221: "Send failed, attempt:"
222: + sender.getAttempt() + " max:"
223: + maxAttempts, x);
224: cx.addFaultyMember(sender.getDestination(), x);
225: throw cx;
226: }//end if
227: }
228: }
229: return completed;
230:
231: }
232:
233: private void connect(NioSender[] senders) throws ChannelException {
234: ChannelException x = null;
235: for (int i = 0; i < senders.length; i++) {
236: try {
237: if (!senders[i].isConnected())
238: senders[i].connect();
239: } catch (IOException io) {
240: if (x == null)
241: x = new ChannelException(io);
242: x.addFaultyMember(senders[i].getDestination(), io);
243: }
244: }
245: if (x != null)
246: throw x;
247: }
248:
249: private void setData(NioSender[] senders, byte[] data)
250: throws ChannelException {
251: ChannelException x = null;
252: for (int i = 0; i < senders.length; i++) {
253: try {
254: senders[i].setMessage(data);
255: } catch (IOException io) {
256: if (x == null)
257: x = new ChannelException(io);
258: x.addFaultyMember(senders[i].getDestination(), io);
259: }
260: }
261: if (x != null)
262: throw x;
263: }
264:
265: private NioSender[] setupForSend(Member[] destination)
266: throws ChannelException {
267: ChannelException cx = null;
268: NioSender[] result = new NioSender[destination.length];
269: for (int i = 0; i < destination.length; i++) {
270: NioSender sender = (NioSender) nioSenders
271: .get(destination[i]);
272: try {
273:
274: if (sender == null) {
275: sender = new NioSender();
276: sender.transferProperties(this , sender);
277: nioSenders.put(destination[i], sender);
278: }
279: if (sender != null) {
280: sender.reset();
281: sender.setDestination(destination[i]);
282: sender.setSelector(selector);
283: result[i] = sender;
284: }
285: } catch (UnknownHostException x) {
286: if (cx == null)
287: cx = new ChannelException(
288: "Unable to setup NioSender.", x);
289: cx.addFaultyMember(destination[i], x);
290: }
291: }
292: if (cx != null)
293: throw cx;
294: else
295: return result;
296: }
297:
298: public void connect() {
299: //do nothing, we connect on demand
300: setConnected(true);
301: }
302:
303: private synchronized void close() throws ChannelException {
304: ChannelException x = null;
305: Object[] members = nioSenders.keySet().toArray();
306: for (int i = 0; i < members.length; i++) {
307: Member mbr = (Member) members[i];
308: try {
309: NioSender sender = (NioSender) nioSenders.get(mbr);
310: sender.disconnect();
311: } catch (Exception e) {
312: if (x == null)
313: x = new ChannelException(e);
314: x.addFaultyMember(mbr, e);
315: }
316: nioSenders.remove(mbr);
317: }
318: if (x != null)
319: throw x;
320: }
321:
322: public void add(Member member) {
323:
324: }
325:
326: public void remove(Member member) {
327: //disconnect senders
328: NioSender sender = (NioSender) nioSenders.remove(member);
329: if (sender != null)
330: sender.disconnect();
331: }
332:
333: public synchronized void disconnect() {
334: setConnected(false);
335: try {
336: close();
337: } catch (Exception x) {
338: }
339:
340: }
341:
342: public void finalize() {
343: try {
344: disconnect();
345: } catch (Exception ignore) {
346: }
347: }
348:
349: public boolean keepalive() {
350: boolean result = false;
351: for (Iterator i = nioSenders.entrySet().iterator(); i.hasNext();) {
352: Map.Entry entry = (Map.Entry) i.next();
353: NioSender sender = (NioSender) entry.getValue();
354: if (sender.keepalive()) {
355: //nioSenders.remove(entry.getKey());
356: i.remove();
357: result = true;
358: } else {
359: try {
360: sender.read(null);
361: } catch (IOException x) {
362: sender.disconnect();
363: sender.reset();
364: //nioSenders.remove(entry.getKey());
365: i.remove();
366: result = true;
367: } catch (Exception x) {
368: log.warn("Error during keepalive test for sender:"
369: + sender, x);
370: }
371: }
372: }
373: //clean up any cancelled keys
374: if (result)
375: try {
376: selector.selectNow();
377: } catch (Exception ignore) {
378: }
379: return result;
380: }
381:
382: }
|