001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017: package org.apache.catalina.tribes.group;
018:
019: import org.apache.catalina.tribes.ChannelException;
020: import org.apache.catalina.tribes.ChannelMessage;
021: import org.apache.catalina.tribes.ChannelReceiver;
022: import org.apache.catalina.tribes.ChannelSender;
023:
024: import org.apache.catalina.tribes.Member;
025: import org.apache.catalina.tribes.MembershipService;
026: import org.apache.catalina.tribes.MessageListener;
027: import org.apache.catalina.tribes.transport.SenderState;
028: import org.apache.catalina.tribes.transport.ReplicationTransmitter;
029: import org.apache.catalina.tribes.membership.McastService;
030: import org.apache.catalina.tribes.transport.nio.NioReceiver;
031: import org.apache.catalina.tribes.Channel;
032: import org.apache.catalina.tribes.util.Logs;
033: import org.apache.catalina.tribes.UniqueId;
034: import org.apache.catalina.tribes.util.Arrays;
035:
036: /**
037: * The channel coordinator object coordinates the membership service,
038: * the sender and the receiver.
039: * This is the last interceptor in the chain.
040: * @author Filip Hanik
041: * @version $Revision: 532943 $, $Date: 2007-04-27 05:14:58 +0200 (ven., 27 avr. 2007) $
042: */
043: public class ChannelCoordinator extends ChannelInterceptorBase
044: implements MessageListener {
045: private ChannelReceiver clusterReceiver = new NioReceiver();
046: private ChannelSender clusterSender = new ReplicationTransmitter();
047: private MembershipService membershipService = new McastService();
048:
049: //override optionflag
050: protected int optionFlag = Channel.SEND_OPTIONS_BYTE_MESSAGE
051: | Channel.SEND_OPTIONS_USE_ACK
052: | Channel.SEND_OPTIONS_SYNCHRONIZED_ACK;
053:
054: public int getOptionFlag() {
055: return optionFlag;
056: }
057:
058: public void setOptionFlag(int flag) {
059: optionFlag = flag;
060: }
061:
062: private int startLevel = 0;
063:
064: public ChannelCoordinator() {
065:
066: }
067:
068: public ChannelCoordinator(ChannelReceiver receiver,
069: ChannelSender sender, MembershipService service) {
070: this ();
071: this .setClusterReceiver(receiver);
072: this .setClusterSender(sender);
073: this .setMembershipService(service);
074: }
075:
076: /**
077: * Send a message to one or more members in the cluster
078: * @param destination Member[] - the destinations, null or zero length means all
079: * @param msg ClusterMessage - the message to send
080: * @param options int - sender options, see class documentation
081: * @return ClusterMessage[] - the replies from the members, if any.
082: */
083: public void sendMessage(Member[] destination, ChannelMessage msg,
084: InterceptorPayload payload) throws ChannelException {
085: if (destination == null)
086: destination = membershipService.getMembers();
087: clusterSender.sendMessage(msg, destination);
088: if (Logs.MESSAGES.isTraceEnabled()) {
089: Logs.MESSAGES
090: .trace("ChannelCoordinator - Sent msg:"
091: + new UniqueId(msg.getUniqueId())
092: + " at "
093: + new java.sql.Timestamp(System
094: .currentTimeMillis()) + " to "
095: + Arrays.toNameString(destination));
096: }
097: }
098:
099: /**
100: * Starts up the channel. This can be called multiple times for individual services to start
101: * The svc parameter can be the logical or value of any constants
102: * @param svc int value of <BR>
103: * DEFAULT - will start all services <BR>
104: * MBR_RX_SEQ - starts the membership receiver <BR>
105: * MBR_TX_SEQ - starts the membership broadcaster <BR>
106: * SND_TX_SEQ - starts the replication transmitter<BR>
107: * SND_RX_SEQ - starts the replication receiver<BR>
108: * @throws ChannelException if a startup error occurs or the service is already started.
109: */
110: public void start(int svc) throws ChannelException {
111: this .internalStart(svc);
112: }
113:
114: /**
115: * Shuts down the channel. This can be called multiple times for individual services to shutdown
116: * The svc parameter can be the logical or value of any constants
117: * @param svc int value of <BR>
118: * DEFAULT - will shutdown all services <BR>
119: * MBR_RX_SEQ - stops the membership receiver <BR>
120: * MBR_TX_SEQ - stops the membership broadcaster <BR>
121: * SND_TX_SEQ - stops the replication transmitter<BR>
122: * SND_RX_SEQ - stops the replication receiver<BR>
123: * @throws ChannelException if a startup error occurs or the service is already started.
124: */
125: public void stop(int svc) throws ChannelException {
126: this .internalStop(svc);
127: }
128:
129: /**
130: * Starts up the channel. This can be called multiple times for individual services to start
131: * The svc parameter can be the logical or value of any constants
132: * @param svc int value of <BR>
133: * DEFAULT - will start all services <BR>
134: * MBR_RX_SEQ - starts the membership receiver <BR>
135: * MBR_TX_SEQ - starts the membership broadcaster <BR>
136: * SND_TX_SEQ - starts the replication transmitter<BR>
137: * SND_RX_SEQ - starts the replication receiver<BR>
138: * @throws ChannelException if a startup error occurs or the service is already started.
139: */
140: protected synchronized void internalStart(int svc)
141: throws ChannelException {
142: try {
143: boolean valid = false;
144: //make sure we don't pass down any flags that are unrelated to the bottom layer
145: svc = svc & Channel.DEFAULT;
146:
147: if (startLevel == Channel.DEFAULT)
148: return; //we have already started up all components
149: if (svc == 0)
150: return;//nothing to start
151:
152: if (svc == (svc & startLevel))
153: throw new ChannelException(
154: "Channel already started for level:" + svc);
155:
156: //must start the receiver first so that we can coordinate the port it
157: //listens to with the local membership settings
158: if (Channel.SND_RX_SEQ == (svc & Channel.SND_RX_SEQ)) {
159: clusterReceiver.setMessageListener(this );
160: clusterReceiver.start();
161: //synchronize, big time FIXME
162: membershipService.setLocalMemberProperties(
163: getClusterReceiver().getHost(),
164: getClusterReceiver().getPort());
165: valid = true;
166: }
167: if (Channel.SND_TX_SEQ == (svc & Channel.SND_TX_SEQ)) {
168: clusterSender.start();
169: valid = true;
170: }
171:
172: if (Channel.MBR_RX_SEQ == (svc & Channel.MBR_RX_SEQ)) {
173: membershipService.setMembershipListener(this );
174: membershipService.start(MembershipService.MBR_RX);
175: valid = true;
176: }
177: if (Channel.MBR_TX_SEQ == (svc & Channel.MBR_TX_SEQ)) {
178: membershipService.start(MembershipService.MBR_TX);
179: valid = true;
180: }
181:
182: if (!valid) {
183: throw new IllegalArgumentException(
184: "Invalid start level, valid levels are:SND_RX_SEQ,SND_TX_SEQ,MBR_TX_SEQ,MBR_RX_SEQ");
185: }
186: startLevel = (startLevel | svc);
187: } catch (ChannelException cx) {
188: throw cx;
189: } catch (Exception x) {
190: throw new ChannelException(x);
191: }
192: }
193:
194: /**
195: * Shuts down the channel. This can be called multiple times for individual services to shutdown
196: * The svc parameter can be the logical or value of any constants
197: * @param svc int value of <BR>
198: * DEFAULT - will shutdown all services <BR>
199: * MBR_RX_SEQ - starts the membership receiver <BR>
200: * MBR_TX_SEQ - starts the membership broadcaster <BR>
201: * SND_TX_SEQ - starts the replication transmitter<BR>
202: * SND_RX_SEQ - starts the replication receiver<BR>
203: * @throws ChannelException if a startup error occurs or the service is already started.
204: */
205: protected synchronized void internalStop(int svc)
206: throws ChannelException {
207: try {
208: //make sure we don't pass down any flags that are unrelated to the bottom layer
209: svc = svc & Channel.DEFAULT;
210:
211: if (startLevel == 0)
212: return; //we have already stopped up all components
213: if (svc == 0)
214: return;//nothing to stop
215:
216: boolean valid = false;
217: if (Channel.SND_RX_SEQ == (svc & Channel.SND_RX_SEQ)) {
218: clusterReceiver.stop();
219: clusterReceiver.setMessageListener(null);
220: valid = true;
221: }
222: if (Channel.SND_TX_SEQ == (svc & Channel.SND_TX_SEQ)) {
223: clusterSender.stop();
224: valid = true;
225: }
226:
227: if (Channel.MBR_RX_SEQ == (svc & Channel.MBR_RX_SEQ)) {
228: membershipService.stop(MembershipService.MBR_RX);
229: membershipService.setMembershipListener(null);
230: valid = true;
231:
232: }
233: if (Channel.MBR_TX_SEQ == (svc & Channel.MBR_TX_SEQ)) {
234: valid = true;
235: membershipService.stop(MembershipService.MBR_TX);
236: }
237: if (!valid) {
238: throw new IllegalArgumentException(
239: "Invalid start level, valid levels are:SND_RX_SEQ,SND_TX_SEQ,MBR_TX_SEQ,MBR_RX_SEQ");
240: }
241:
242: startLevel = (startLevel & (~svc));
243:
244: } catch (Exception x) {
245: throw new ChannelException(x);
246: } finally {
247:
248: }
249:
250: }
251:
252: public void memberAdded(Member member) {
253: SenderState.getSenderState(member);
254: super .memberAdded(member);
255: }
256:
257: public void memberDisappeared(Member member) {
258: SenderState.removeSenderState(member);
259: super .memberDisappeared(member);
260: }
261:
262: public void messageReceived(ChannelMessage msg) {
263: if (Logs.MESSAGES.isTraceEnabled()) {
264: Logs.MESSAGES
265: .trace("ChannelCoordinator - Received msg:"
266: + new UniqueId(msg.getUniqueId())
267: + " at "
268: + new java.sql.Timestamp(System
269: .currentTimeMillis()) + " from "
270: + msg.getAddress().getName());
271: }
272: super .messageReceived(msg);
273: }
274:
275: public ChannelReceiver getClusterReceiver() {
276: return clusterReceiver;
277: }
278:
279: public ChannelSender getClusterSender() {
280: return clusterSender;
281: }
282:
283: public MembershipService getMembershipService() {
284: return membershipService;
285: }
286:
287: public void setClusterReceiver(ChannelReceiver clusterReceiver) {
288: if (clusterReceiver != null) {
289: this .clusterReceiver = clusterReceiver;
290: this .clusterReceiver.setMessageListener(this );
291: } else {
292: if (this .clusterReceiver != null)
293: this .clusterReceiver.setMessageListener(null);
294: this .clusterReceiver = null;
295: }
296: }
297:
298: public void setClusterSender(ChannelSender clusterSender) {
299: this .clusterSender = clusterSender;
300: }
301:
302: public void setMembershipService(MembershipService membershipService) {
303: this .membershipService = membershipService;
304: this .membershipService.setMembershipListener(this );
305: }
306:
307: public void heartbeat() {
308: if (clusterSender != null)
309: clusterSender.heartbeat();
310: super .heartbeat();
311: }
312:
313: /**
314: * has members
315: */
316: public boolean hasMembers() {
317: return this .getMembershipService().hasMembers();
318: }
319:
320: /**
321: * Get all current cluster members
322: * @return all members or empty array
323: */
324: public Member[] getMembers() {
325: return this .getMembershipService().getMembers();
326: }
327:
328: /**
329: *
330: * @param mbr Member
331: * @return Member
332: */
333: public Member getMember(Member mbr) {
334: return this .getMembershipService().getMember(mbr);
335: }
336:
337: /**
338: * Return the member that represents this node.
339: *
340: * @return Member
341: */
342: public Member getLocalMember(boolean incAlive) {
343: return this.getMembershipService().getLocalMember(incAlive);
344: }
345:
346: }
|