001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017: package org.apache.catalina.tribes.group;
018:
019: import java.io.Serializable;
020: import java.util.ArrayList;
021: import java.util.Iterator;
022:
023: import org.apache.catalina.tribes.ByteMessage;
024: import org.apache.catalina.tribes.Channel;
025: import org.apache.catalina.tribes.ChannelException;
026: import org.apache.catalina.tribes.ChannelInterceptor;
027: import org.apache.catalina.tribes.ChannelListener;
028: import org.apache.catalina.tribes.ChannelMessage;
029: import org.apache.catalina.tribes.ChannelReceiver;
030: import org.apache.catalina.tribes.ChannelSender;
031: import org.apache.catalina.tribes.ErrorHandler;
032: import org.apache.catalina.tribes.ManagedChannel;
033: import org.apache.catalina.tribes.Member;
034: import org.apache.catalina.tribes.MembershipListener;
035: import org.apache.catalina.tribes.MembershipService;
036: import org.apache.catalina.tribes.group.interceptors.MessageDispatchInterceptor;
037: import org.apache.catalina.tribes.io.ChannelData;
038: import org.apache.catalina.tribes.io.XByteBuffer;
039: import org.apache.catalina.tribes.UniqueId;
040: import org.apache.catalina.tribes.Heartbeat;
041: import org.apache.catalina.tribes.io.BufferPool;
042: import org.apache.catalina.tribes.RemoteProcessException;
043: import org.apache.catalina.tribes.util.Logs;
044: import org.apache.catalina.tribes.util.Arrays;
045:
046: /**
047: * The default implementation of a Channel.<br>
048: * The GroupChannel manages the replication channel. It coordinates
049: * message being sent and received with membership announcements.
050: * The channel has an chain of interceptors that can modify the message or perform other logic.<br>
051: * It manages a complete group, both membership and replication.
052: * @author Filip Hanik
053: * @version $Revision: 500684 $, $Date: 2007-01-28 00:27:18 +0100 (dim., 28 janv. 2007) $
054: */
055: public class GroupChannel extends ChannelInterceptorBase implements
056: ManagedChannel {
057: /**
058: * Flag to determine if the channel manages its own heartbeat
059: * If set to true, the channel will start a local thread for the heart beat.
060: */
061: protected boolean heartbeat = true;
062: /**
063: * If <code>heartbeat == true</code> then how often do we want this
064: * heartbeat to run. default is one minute
065: */
066: protected long heartbeatSleeptime = 5 * 1000;//every 5 seconds
067:
068: /**
069: * Internal heartbeat thread
070: */
071: protected HeartbeatThread hbthread = null;
072:
073: /**
074: * The <code>ChannelCoordinator</code> coordinates the bottom layer components:<br>
075: * - MembershipService<br>
076: * - ChannelSender <br>
077: * - ChannelReceiver<br>
078: */
079: protected ChannelCoordinator coordinator = new ChannelCoordinator();
080:
081: /**
082: * The first interceptor in the inteceptor stack.
083: * The interceptors are chained in a linked list, so we only need a reference to the
084: * first one
085: */
086: protected ChannelInterceptor interceptors = null;
087:
088: /**
089: * A list of membership listeners that subscribe to membership announcements
090: */
091: protected ArrayList membershipListeners = new ArrayList();
092:
093: /**
094: * A list of channel listeners that subscribe to incoming messages
095: */
096: protected ArrayList channelListeners = new ArrayList();
097:
098: /**
099: * If set to true, the GroupChannel will check to make sure that
100: */
101: protected boolean optionCheck = false;
102:
103: /**
104: * Creates a GroupChannel. This constructor will also
105: * add the first interceptor in the GroupChannel.<br>
106: * The first interceptor is always the channel itself.
107: */
108: public GroupChannel() {
109: addInterceptor(this );
110: }
111:
112: /**
113: * Adds an interceptor to the stack for message processing<br>
114: * Interceptors are ordered in the way they are added.<br>
115: * <code>channel.addInterceptor(A);</code><br>
116: * <code>channel.addInterceptor(C);</code><br>
117: * <code>channel.addInterceptor(B);</code><br>
118: * Will result in a interceptor stack like this:<br>
119: * <code>A -> C -> B</code><br>
120: * The complete stack will look like this:<br>
121: * <code>Channel -> A -> C -> B -> ChannelCoordinator</code><br>
122: * @param interceptor ChannelInterceptorBase
123: */
124: public void addInterceptor(ChannelInterceptor interceptor) {
125: if (interceptors == null) {
126: interceptors = interceptor;
127: interceptors.setNext(coordinator);
128: interceptors.setPrevious(null);
129: coordinator.setPrevious(interceptors);
130: } else {
131: ChannelInterceptor last = interceptors;
132: while (last.getNext() != coordinator) {
133: last = last.getNext();
134: }
135: last.setNext(interceptor);
136: interceptor.setNext(coordinator);
137: interceptor.setPrevious(last);
138: coordinator.setPrevious(interceptor);
139: }
140: }
141:
142: /**
143: * Sends a heartbeat through the interceptor stack.<br>
144: * Invoke this method from the application on a periodic basis if
145: * you have turned off internal heartbeats <code>channel.setHeartbeat(false)</code>
146: */
147: public void heartbeat() {
148: super .heartbeat();
149: Iterator i = membershipListeners.iterator();
150: while (i.hasNext()) {
151: Object o = i.next();
152: if (o instanceof Heartbeat)
153: ((Heartbeat) o).heartbeat();
154: }
155: i = channelListeners.iterator();
156: while (i.hasNext()) {
157: Object o = i.next();
158: if (o instanceof Heartbeat)
159: ((Heartbeat) o).heartbeat();
160: }
161:
162: }
163:
164: /**
165: * Send a message to the destinations specified
166: * @param destination Member[] - destination.length > 1
167: * @param msg Serializable - the message to send
168: * @param options int - sender options, options can trigger guarantee levels and different interceptors to
169: * react to the message see class documentation for the <code>Channel</code> object.<br>
170: * @return UniqueId - the unique Id that was assigned to this message
171: * @throws ChannelException - if an error occurs processing the message
172: * @see org.apache.catalina.tribes.Channel
173: */
174: public UniqueId send(Member[] destination, Serializable msg,
175: int options) throws ChannelException {
176: return send(destination, msg, options, null);
177: }
178:
179: /**
180: *
181: * @param destination Member[] - destination.length > 1
182: * @param msg Serializable - the message to send
183: * @param options int - sender options, options can trigger guarantee levels and different interceptors to
184: * react to the message see class documentation for the <code>Channel</code> object.<br>
185: * @param handler - callback object for error handling and completion notification, used when a message is
186: * sent asynchronously using the <code>Channel.SEND_OPTIONS_ASYNCHRONOUS</code> flag enabled.
187: * @return UniqueId - the unique Id that was assigned to this message
188: * @throws ChannelException - if an error occurs processing the message
189: * @see org.apache.catalina.tribes.Channel
190: */
191: public UniqueId send(Member[] destination, Serializable msg,
192: int options, ErrorHandler handler) throws ChannelException {
193: if (msg == null)
194: throw new ChannelException("Cant send a NULL message");
195: XByteBuffer buffer = null;
196: try {
197: if (destination == null || destination.length == 0)
198: throw new ChannelException("No destination given");
199: ChannelData data = new ChannelData(true);//generates a unique Id
200: data.setAddress(getLocalMember(false));
201: data.setTimestamp(System.currentTimeMillis());
202: byte[] b = null;
203: if (msg instanceof ByteMessage) {
204: b = ((ByteMessage) msg).getMessage();
205: options = options | SEND_OPTIONS_BYTE_MESSAGE;
206: } else {
207: b = XByteBuffer.serialize(msg);
208: options = options & (~SEND_OPTIONS_BYTE_MESSAGE);
209: }
210: data.setOptions(options);
211: //XByteBuffer buffer = new XByteBuffer(b.length+128,false);
212: buffer = BufferPool.getBufferPool().getBuffer(
213: b.length + 128, false);
214: buffer.append(b, 0, b.length);
215: data.setMessage(buffer);
216: InterceptorPayload payload = null;
217: if (handler != null) {
218: payload = new InterceptorPayload();
219: payload.setErrorHandler(handler);
220: }
221: getFirstInterceptor().sendMessage(destination, data,
222: payload);
223: if (Logs.MESSAGES.isTraceEnabled()) {
224: Logs.MESSAGES.trace("GroupChannel - Sent msg:"
225: + new UniqueId(data.getUniqueId())
226: + " at "
227: + new java.sql.Timestamp(System
228: .currentTimeMillis()) + " to "
229: + Arrays.toNameString(destination));
230: Logs.MESSAGES.trace("GroupChannel - Send Message:"
231: + new UniqueId(data.getUniqueId()) + " is "
232: + msg);
233: }
234:
235: return new UniqueId(data.getUniqueId());
236: } catch (Exception x) {
237: if (x instanceof ChannelException)
238: throw (ChannelException) x;
239: throw new ChannelException(x);
240: } finally {
241: if (buffer != null)
242: BufferPool.getBufferPool().returnBuffer(buffer);
243: }
244: }
245:
246: /**
247: * Callback from the interceptor stack. <br>
248: * When a message is received from a remote node, this method will be invoked by
249: * the previous interceptor.<br>
250: * This method can also be used to send a message to other components within the same application,
251: * but its an extreme case, and you're probably better off doing that logic between the applications itself.
252: * @param msg ChannelMessage
253: */
254: public void messageReceived(ChannelMessage msg) {
255: if (msg == null)
256: return;
257: try {
258: if (Logs.MESSAGES.isTraceEnabled()) {
259: Logs.MESSAGES.trace("GroupChannel - Received msg:"
260: + new UniqueId(msg.getUniqueId())
261: + " at "
262: + new java.sql.Timestamp(System
263: .currentTimeMillis()) + " from "
264: + msg.getAddress().getName());
265: }
266:
267: Serializable fwd = null;
268: if ((msg.getOptions() & SEND_OPTIONS_BYTE_MESSAGE) == SEND_OPTIONS_BYTE_MESSAGE) {
269: fwd = new ByteMessage(msg.getMessage().getBytes());
270: } else {
271: fwd = XByteBuffer.deserialize(msg.getMessage()
272: .getBytesDirect(), 0, msg.getMessage()
273: .getLength());
274: }
275: if (Logs.MESSAGES.isTraceEnabled()) {
276: Logs.MESSAGES.trace("GroupChannel - Receive Message:"
277: + new UniqueId(msg.getUniqueId()) + " is "
278: + fwd);
279: }
280:
281: //get the actual member with the correct alive time
282: Member source = msg.getAddress();
283: boolean rx = false;
284: boolean delivered = false;
285: for (int i = 0; i < channelListeners.size(); i++) {
286: ChannelListener channelListener = (ChannelListener) channelListeners
287: .get(i);
288: if (channelListener != null
289: && channelListener.accept(fwd, source)) {
290: channelListener.messageReceived(fwd, source);
291: delivered = true;
292: //if the message was accepted by an RPC channel, that channel
293: //is responsible for returning the reply, otherwise we send an absence reply
294: if (channelListener instanceof RpcChannel)
295: rx = true;
296: }
297: }//for
298: if ((!rx) && (fwd instanceof RpcMessage)) {
299: //if we have a message that requires a response,
300: //but none was given, send back an immediate one
301: sendNoRpcChannelReply((RpcMessage) fwd, source);
302: }
303: if (Logs.MESSAGES.isTraceEnabled()) {
304: Logs.MESSAGES.trace("GroupChannel delivered["
305: + delivered + "] id:"
306: + new UniqueId(msg.getUniqueId()));
307: }
308:
309: } catch (Exception x) {
310: if (log.isDebugEnabled())
311: log.error("Unable to process channel:IOException.", x);
312: throw new RemoteProcessException("IOException:"
313: + x.getMessage(), x);
314: }
315: }
316:
317: /**
318: * Sends a <code>NoRpcChannelReply</code> message to a member<br>
319: * This method gets invoked by the channel if a RPC message comes in
320: * and no channel listener accepts the message. This avoids timeout
321: * @param msg RpcMessage
322: * @param destination Member - the destination for the reply
323: */
324: protected void sendNoRpcChannelReply(RpcMessage msg,
325: Member destination) {
326: try {
327: //avoid circular loop
328: if (msg instanceof RpcMessage.NoRpcChannelReply)
329: return;
330: RpcMessage.NoRpcChannelReply reply = new RpcMessage.NoRpcChannelReply(
331: msg.rpcId, msg.uuid);
332: send(new Member[] { destination }, reply,
333: Channel.SEND_OPTIONS_ASYNCHRONOUS);
334: } catch (Exception x) {
335: log
336: .error(
337: "Unable to find rpc channel, failed to send NoRpcChannelReply.",
338: x);
339: }
340: }
341:
342: /**
343: * memberAdded gets invoked by the interceptor below the channel
344: * and the channel will broadcast it to the membership listeners
345: * @param member Member - the new member
346: */
347: public void memberAdded(Member member) {
348: //notify upwards
349: for (int i = 0; i < membershipListeners.size(); i++) {
350: MembershipListener membershipListener = (MembershipListener) membershipListeners
351: .get(i);
352: if (membershipListener != null)
353: membershipListener.memberAdded(member);
354: }
355: }
356:
357: /**
358: * memberDisappeared gets invoked by the interceptor below the channel
359: * and the channel will broadcast it to the membership listeners
360: * @param member Member - the member that left or crashed
361: */
362: public void memberDisappeared(Member member) {
363: //notify upwards
364: for (int i = 0; i < membershipListeners.size(); i++) {
365: MembershipListener membershipListener = (MembershipListener) membershipListeners
366: .get(i);
367: if (membershipListener != null)
368: membershipListener.memberDisappeared(member);
369: }
370: }
371:
372: /**
373: * Sets up the default implementation interceptor stack
374: * if no interceptors have been added
375: * @throws ChannelException
376: */
377: protected synchronized void setupDefaultStack()
378: throws ChannelException {
379:
380: if (getFirstInterceptor() != null
381: && ((getFirstInterceptor().getNext() instanceof ChannelCoordinator))) {
382: ChannelInterceptor interceptor = null;
383: Class clazz = null;
384: try {
385: clazz = Class
386: .forName(
387: "org.apache.catalina.tribes.group.interceptors.MessageDispatch15Interceptor",
388: true, GroupChannel.class
389: .getClassLoader());
390: clazz.newInstance();
391: } catch (Throwable x) {
392: clazz = MessageDispatchInterceptor.class;
393: }//catch
394: try {
395: interceptor = (ChannelInterceptor) clazz.newInstance();
396: } catch (Exception x) {
397: throw new ChannelException(
398: "Unable to add MessageDispatchInterceptor to interceptor chain.",
399: x);
400: }
401: this .addInterceptor(interceptor);
402: }
403: }
404:
405: /**
406: * Validates the option flags that each interceptor is using and reports
407: * an error if two interceptor share the same flag.
408: * @throws ChannelException
409: */
410: protected void checkOptionFlags() throws ChannelException {
411: StringBuffer conflicts = new StringBuffer();
412: ChannelInterceptor first = interceptors;
413: while (first != null) {
414: int flag = first.getOptionFlag();
415: if (flag != 0) {
416: ChannelInterceptor next = first.getNext();
417: while (next != null) {
418: int nflag = next.getOptionFlag();
419: if (nflag != 0
420: && (((flag & nflag) == flag) || ((flag & nflag) == nflag))) {
421: conflicts.append("[");
422: conflicts.append(first.getClass().getName());
423: conflicts.append(":");
424: conflicts.append(flag);
425: conflicts.append(" == ");
426: conflicts.append(next.getClass().getName());
427: conflicts.append(":");
428: conflicts.append(nflag);
429: conflicts.append("] ");
430: }//end if
431: next = next.getNext();
432: }//while
433: }//end if
434: first = first.getNext();
435: }//while
436: if (conflicts.length() > 0)
437: throw new ChannelException(
438: "Interceptor option flag conflict: "
439: + conflicts.toString());
440:
441: }
442:
443: /**
444: * Starts the channel
445: * @param svc int - what service to start
446: * @throws ChannelException
447: * @see org.apache.catalina.tribes.Channel#start(int)
448: */
449: public synchronized void start(int svc) throws ChannelException {
450: setupDefaultStack();
451: if (optionCheck)
452: checkOptionFlags();
453: super .start(svc);
454: if (hbthread == null && heartbeat) {
455: hbthread = new HeartbeatThread(this , heartbeatSleeptime);
456: hbthread.start();
457: }
458: }
459:
460: /**
461: * Stops the channel
462: * @param svc int
463: * @throws ChannelException
464: * @see org.apache.catalina.tribes.Channel#stop(int)
465: */
466: public synchronized void stop(int svc) throws ChannelException {
467: if (hbthread != null) {
468: hbthread.stopHeartbeat();
469: hbthread = null;
470: }
471: super .stop(svc);
472: }
473:
474: /**
475: * Returns the first interceptor of the stack. Useful for traversal.
476: * @return ChannelInterceptor
477: */
478: public ChannelInterceptor getFirstInterceptor() {
479: if (interceptors != null)
480: return interceptors;
481: else
482: return coordinator;
483: }
484:
485: /**
486: * Returns the channel receiver component
487: * @return ChannelReceiver
488: */
489: public ChannelReceiver getChannelReceiver() {
490: return coordinator.getClusterReceiver();
491: }
492:
493: /**
494: * Returns the channel sender component
495: * @return ChannelSender
496: */
497: public ChannelSender getChannelSender() {
498: return coordinator.getClusterSender();
499: }
500:
501: /**
502: * Returns the membership service component
503: * @return MembershipService
504: */
505: public MembershipService getMembershipService() {
506: return coordinator.getMembershipService();
507: }
508:
509: /**
510: * Sets the channel receiver component
511: * @param clusterReceiver ChannelReceiver
512: */
513: public void setChannelReceiver(ChannelReceiver clusterReceiver) {
514: coordinator.setClusterReceiver(clusterReceiver);
515: }
516:
517: /**
518: * Sets the channel sender component
519: * @param clusterSender ChannelSender
520: */
521: public void setChannelSender(ChannelSender clusterSender) {
522: coordinator.setClusterSender(clusterSender);
523: }
524:
525: /**
526: * Sets the membership component
527: * @param membershipService MembershipService
528: */
529: public void setMembershipService(MembershipService membershipService) {
530: coordinator.setMembershipService(membershipService);
531: }
532:
533: /**
534: * Adds a membership listener to the channel.<br>
535: * Membership listeners are uniquely identified using the equals(Object) method
536: * @param membershipListener MembershipListener
537: */
538: public void addMembershipListener(
539: MembershipListener membershipListener) {
540: if (!this .membershipListeners.contains(membershipListener))
541: this .membershipListeners.add(membershipListener);
542: }
543:
544: /**
545: * Removes a membership listener from the channel.<br>
546: * Membership listeners are uniquely identified using the equals(Object) method
547: * @param membershipListener MembershipListener
548: */
549:
550: public void removeMembershipListener(
551: MembershipListener membershipListener) {
552: membershipListeners.remove(membershipListener);
553: }
554:
555: /**
556: * Adds a channel listener to the channel.<br>
557: * Channel listeners are uniquely identified using the equals(Object) method
558: * @param channelListener ChannelListener
559: */
560: public void addChannelListener(ChannelListener channelListener) {
561: if (!this .channelListeners.contains(channelListener)) {
562: this .channelListeners.add(channelListener);
563: } else {
564: throw new IllegalArgumentException(
565: "Listener already exists:" + channelListener + "["
566: + channelListener.getClass().getName()
567: + "]");
568: }
569: }
570:
571: /**
572: *
573: * Removes a channel listener from the channel.<br>
574: * Channel listeners are uniquely identified using the equals(Object) method
575: * @param channelListener ChannelListener
576: */
577: public void removeChannelListener(ChannelListener channelListener) {
578: channelListeners.remove(channelListener);
579: }
580:
581: /**
582: * Returns an iterator of all the interceptors in this stack
583: * @return Iterator
584: */
585: public Iterator getInterceptors() {
586: return new InterceptorIterator(this .getNext(), this .coordinator);
587: }
588:
589: /**
590: * Enables/disables the option check<br>
591: * Setting this to true, will make the GroupChannel perform a conflict check
592: * on the interceptors. If two interceptors are using the same option flag
593: * and throw an error upon start.
594: * @param optionCheck boolean
595: */
596: public void setOptionCheck(boolean optionCheck) {
597: this .optionCheck = optionCheck;
598: }
599:
600: /**
601: * Configure local heartbeat sleep time<br>
602: * Only used when <code>getHeartbeat()==true</code>
603: * @param heartbeatSleeptime long - time in milliseconds to sleep between heartbeats
604: */
605: public void setHeartbeatSleeptime(long heartbeatSleeptime) {
606: this .heartbeatSleeptime = heartbeatSleeptime;
607: }
608:
609: /**
610: * Enables or disables local heartbeat.
611: * if <code>setHeartbeat(true)</code> is invoked then the channel will start an internal
612: * thread to invoke <code>Channel.heartbeat()</code> every <code>getHeartbeatSleeptime</code> milliseconds
613: * @param heartbeat boolean
614: */
615: public void setHeartbeat(boolean heartbeat) {
616: this .heartbeat = heartbeat;
617: }
618:
619: /**
620: * @see #setOptionCheck(boolean)
621: * @return boolean
622: */
623: public boolean getOptionCheck() {
624: return optionCheck;
625: }
626:
627: /**
628: * @see #setHeartbeat(boolean)
629: * @return boolean
630: */
631: public boolean getHeartbeat() {
632: return heartbeat;
633: }
634:
635: /**
636: * Returns the sleep time in milliseconds that the internal heartbeat will
637: * sleep in between invokations of <code>Channel.heartbeat()</code>
638: * @return long
639: */
640: public long getHeartbeatSleeptime() {
641: return heartbeatSleeptime;
642: }
643:
644: /**
645: *
646: * <p>Title: Interceptor Iterator</p>
647: *
648: * <p>Description: An iterator to loop through the interceptors in a channel</p>
649: *
650: * @version 1.0
651: */
652: public static class InterceptorIterator implements Iterator {
653: private ChannelInterceptor end;
654: private ChannelInterceptor start;
655:
656: public InterceptorIterator(ChannelInterceptor start,
657: ChannelInterceptor end) {
658: this .end = end;
659: this .start = start;
660: }
661:
662: public boolean hasNext() {
663: return start != null && start != end;
664: }
665:
666: public Object next() {
667: Object result = null;
668: if (hasNext()) {
669: result = start;
670: start = start.getNext();
671: }
672: return result;
673: }
674:
675: public void remove() {
676: //empty operation
677: }
678: }
679:
680: /**
681: *
682: * <p>Title: Internal heartbeat thread</p>
683: *
684: * <p>Description: if <code>Channel.getHeartbeat()==true</code> then a thread of this class
685: * is created</p>
686: *
687: * @version 1.0
688: */
689: public static class HeartbeatThread extends Thread {
690: protected static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory
691: .getLog(HeartbeatThread.class);
692: protected static int counter = 1;
693:
694: protected static synchronized int inc() {
695: return counter++;
696: }
697:
698: protected boolean doRun = true;
699: protected GroupChannel channel;
700: protected long sleepTime;
701:
702: public HeartbeatThread(GroupChannel channel, long sleepTime) {
703: super ();
704: this .setPriority(MIN_PRIORITY);
705: setName("GroupChannel-Heartbeat-" + inc());
706: setDaemon(true);
707: this .channel = channel;
708: this .sleepTime = sleepTime;
709: }
710:
711: public void stopHeartbeat() {
712: doRun = false;
713: interrupt();
714: }
715:
716: public void run() {
717: while (doRun) {
718: try {
719: Thread.sleep(sleepTime);
720: channel.heartbeat();
721: } catch (InterruptedException x) {
722: interrupted();
723: } catch (Exception x) {
724: log
725: .error(
726: "Unable to send heartbeat through Tribes interceptor stack. Will try to sleep again.",
727: x);
728: }//catch
729: }//while
730: }//run
731: }//HeartbeatThread
732:
733: }
|