001: // $Id: Channel.java,v 1.25 2006/10/26 15:07:34 belaban Exp $
002:
003: package org.jgroups;
004:
005: import org.apache.commons.logging.Log;
006:
007: import java.io.Serializable;
008: import java.util.*;
009:
010: /**
011: A channel represents a group communication endpoint (like BSD datagram sockets). A
012: client joins a group by connecting the channel to a group address and leaves it by
013: disconnecting. Messages sent over the channel are received by all group members that
014: are connected to the same group (that is, all members that have the same group
015: address).<p>
016:
017: The FSM for a channel is roughly as follows: a channel is created
018: (<em>unconnected</em>). The channel is connected to a group
019: (<em>connected</em>). Messages can now be sent and received. The channel is
020: disconnected from the group (<em>unconnected</em>). The channel could now be connected to a
021: different group again. The channel is closed (<em>closed</em>).<p>
022:
023: Only a single sender is allowed to be connected to a channel at a time, but there can be
024: more than one channel in an application.<p>
025:
026: Messages can be sent to the group members using the <em>send</em> method and messages
027: can be received using <em>receive</em> (pull approach).<p>
028:
029: A channel instance is created using either a <em>ChannelFactory</em> or the public
030: constructor. Each implementation of a channel must provide a subclass of
031: <code>Channel</code> and an implementation of <code>ChannelFactory</code>. <p>
032: Various degrees of sophistication in message exchange can be achieved using building
033: blocks on top of channels; e.g., light-weight groups, synchronous message invocation,
034: or remote method calls. Channels are on the same abstraction level as sockets, and
035: should really be simple to use. Higher-level abstractions are all built on top of
036: channels.
037:
038: @author Bela Ban
039: @see java.net.DatagramPacket
040: @see java.net.MulticastSocket
041: */
042: public abstract class Channel implements Transport {
043: public static final int BLOCK = 0;
044: public static final int VIEW = 1;
045: public static final int SUSPECT = 2;
046: public static final int LOCAL = 3;
047: public static final int GET_STATE_EVENTS = 4;
048: public static final int AUTO_RECONNECT = 5;
049: public static final int AUTO_GETSTATE = 6;
050:
051: protected UpHandler up_handler = null; // when set, <em>all</em> events are passed to it !
052: protected ChannelListener channel_listener = null;
053: protected Set channel_listeners = null;
054: protected Receiver receiver = null;
055:
056: protected abstract Log getLog();
057:
058: /**
059: Connects the channel to a group. The client is now able to receive group
060: messages, views and block events (depending on the options set) and to send
061: messages to (all or single) group members. This is a null operation if already
062: connected.<p>
063:
064: All channels with the same name form a group, that means all messages
065: sent to the group will be received by all channels connected to the same
066: channel name.<p>
067:
068: @param cluster_name The name of the chanel to connect to.
069: @exception ChannelException The protocol stack cannot be started
070: @exception ChannelClosedException The channel is closed and therefore cannot be used any longer.
071: A new channel has to be created first.
072: @see Channel#disconnect
073: */
074: abstract public void connect(String cluster_name)
075: throws ChannelException, ChannelClosedException;
076:
077: /**
078: * Connects the channel to a group <em>and</em> fetches the state
079: * @param cluster_name
080: * @param target
081: * @param state_id The ID of a substate. If the full state is to be fetched, set this to null
082: * @param timeout
083: * @return True if the state could be fetched, otherwise false. If true is returned, the state setting method (e.g.
084: * setState() will be called.
085: * @throws ChannelException
086: */
087: abstract public boolean connect(String cluster_name,
088: Address target, String state_id, long timeout)
089: throws ChannelException;
090:
091: /** Disconnects the channel from the current group (if connected), leaving the group.
092: It is a null operation if not connected. It is a null operation if the channel is closed.
093:
094: @see #connect(String) */
095: abstract public void disconnect();
096:
097: /**
098: Destroys the channel and its associated resources (e.g., the protocol stack). After a channel
099: has been closed, invoking methods on it throws the <code>ChannelClosed</code> exception
100: (or results in a null operation). It is a null operation if the channel is already closed.<p>
101: If the channel is connected to a group, <code>disconnec()t</code> will be called first.
102: */
103: abstract public void close();
104:
105: /** Shuts down the channel without disconnecting if connected, stops all the threads */
106: abstract protected void shutdown();
107:
108: /**
109: Re-opens a closed channel. Throws an exception if the channel is already open. After this method
110: returns, connect() may be called to join a group. The address of this member will be different from
111: the previous incarnation.
112: */
113: public void open() throws ChannelException {
114: ;
115: }
116:
117: /**
118: Determines whether the channel is open;
119: i.e., the protocol stack has been created (may not be connected though).
120: */
121: abstract public boolean isOpen();
122:
123: /**
124: Determines whether the channel is connected to a group. This implies it is open. If true is returned,
125: then the channel can be used to send and receive messages.
126: */
127: abstract public boolean isConnected();
128:
129: /**
130: * Returns the number of messages that are waiting. Those messages can be
131: * removed by {@link #receive(long)}. Note that this number could change after
132: * calling this method and before calling <tt>receive()</tt> (e.g. the latter
133: * method might be called by a different thread).
134: * @return The number of messages on the queue, or -1 if the queue/channel
135: * is closed/disconnected.
136: */
137: public int getNumMessages() {
138: return -1;
139: }
140:
141: public String dumpQueue() {
142: return "";
143: }
144:
145: /**
146: * Returns a map of statistics of the various protocols and of the channel itself.
147: * @return Map<String,Map>. A map where the keys are the protocols ("channel" pseudo key is
148: * used for the channel itself") and the values are property maps.
149: */
150: public abstract Map dumpStats();
151:
152: /** Sends a message to a (unicast) destination. The message contains
153: <ol>
154: <li>a destination address (Address). A <code>null</code> address sends the message
155: to all group members.
156: <li>a source address. Can be left empty. Will be filled in by the protocol stack.
157: <li>a byte buffer. The message contents.
158: <li>several additional fields. They can be used by application programs (or patterns). E.g.
159: a message ID, a <code>oneway</code> field which determines whether a response is
160: expected etc.
161: </ol>
162: @param msg The message to be sent. Destination and buffer should be set. A null destination
163: means to send to all group members.
164:
165: @exception ChannelNotConnectedException The channel must be connected to send messages.
166:
167: @exception ChannelClosedException The channel is closed and therefore cannot be used any longer.
168: A new channel has to be created first.
169:
170: */
171: abstract public void send(Message msg)
172: throws ChannelNotConnectedException, ChannelClosedException;
173:
174: /**
175: Helper method. Will create a Message(dst, src, obj) and use send(Message).
176: @param dst Destination address for message. If null, message will be sent to all current group members
177: @param src Source (sender's) address. If null, it will be set by the protocol's transport layer before
178: being put on the wire. Can usually be set to null.
179: @param obj Serializable object. Will be serialized into the byte buffer of the Message. If it is <em>
180: not</em> serializable, the byte buffer will be null.
181: */
182: abstract public void send(Address dst, Address src, Serializable obj)
183: throws ChannelNotConnectedException, ChannelClosedException;
184:
185: /**
186: Access to event mechanism of channels. Enables to send and receive events, used by building
187: blocks to communicate with (building block) specific protocol layers. Currently useful only
188: with JChannel.
189: */
190: public void down(Event evt) {
191: }
192:
193: /** Receives a message, a view change or a block event. By using <code>setOpt</code>, the
194: type of objects to be received can be determined (e.g., not views and blocks, just
195: messages).
196:
197: The possible types returned can be:
198: <ol>
199: <li><code>Message</code>. Normal message
200: <li><code>Event</code>. All other events (used by JChannel)
201: <li><code>View</code>. A view change.
202: <li><code>BlockEvent</code>. A block event indicating that a flush protocol has been started, and we should not
203: send any more messages. This event should be ack'ed by calling {@link org.jgroups.Channel#blockOk()} .
204: Any messages sent after blockOk() returns might get blocked until the flush protocol has completed.
205: <li><code>UnblockEvent</code>. An unblock event indicating that the flush protocol has completed and we can resume
206: sending messages
207: <li><code>SuspectEvent</code>. A notification of a suspected member.
208: <li><code>GetStateEvent</code>. The current state of the application should be
209: returned using <code>ReturnState</code>.
210: <li><code>SetStateEvent</code>. The state of a single/all members as requested previously
211: by having called <code>Channel.getState(s).
212: <li><code>ExitEvent</code>. Signals that this member was forced to leave the group
213: (e.g., caused by the member being suspected.) The member can rejoin the group by calling
214: open(). If the AUTO_RECONNECT is set (see setOpt()), the reconnect will be done automatically.
215: </ol>
216: The <code>instanceof</code> operator can be used to discriminate between different types
217: returned.
218: @param timeout Value in milliseconds. Value <= 0 means wait forever
219: @return A Message, View, BlockEvent, SuspectEvent, GetStateEvent, SetStateEvent or
220: ExitEvent, depending on what is on top of the internal queue.
221:
222: @exception ChannelNotConnectedException The channel must be connected to receive messages.
223:
224: @exception ChannelClosedException The channel is closed and therefore cannot be used any longer.
225: A new channel has to be created first.
226:
227: @exception TimeoutException Thrown when a timeout has occurred.
228: */
229: abstract public Object receive(long timeout)
230: throws ChannelNotConnectedException,
231: ChannelClosedException, TimeoutException;
232:
233: /** Returns the next message, view, block, suspect or other event <em>without removing
234: it from the queue</em>.
235: @param timeout Value in milliseconds. Value <= 0 means wait forever
236: @return A Message, View, BlockEvent, SuspectEvent, GetStateEvent or SetStateEvent object,
237: depending on what is on top of the internal queue.
238:
239: @exception ChannelNotConnectedException The channel must be connected to receive messages.
240:
241: @exception ChannelClosedException The channel is closed and therefore cannot be used any longer.
242: A new channel has to be created first.
243:
244: @exception TimeoutException Thrown when a timeout has occurred.
245:
246: @see #receive(long)
247: */
248: abstract public Object peek(long timeout)
249: throws ChannelNotConnectedException,
250: ChannelClosedException, TimeoutException;
251:
252: /**
253: * Gets the current view. This does <em>not</em> retrieve a new view, use
254: <code>receive()</code> to do so. The view may only be available after a successful
255: <code>connect()</code>. The result of calling this method on an unconnected channel
256: is implementation defined (may return null). Calling it on a channel that is not
257: enabled to receive view events (via <code>setOpt</code>) returns
258: <code>null</code>. Calling this method on a closed channel returns a null view.
259: @return The current view.
260: */
261: abstract public View getView();
262:
263: /**
264: Returns the channel's own address. The result of calling this method on an unconnected
265: channel is implementation defined (may return null). Calling this method on a closed
266: channel returns null.
267:
268: @return The channel's address. Generated by the underlying transport, and opaque.
269: Addresses can be used as destination in the <code>Send</code> operation.
270: */
271: abstract public Address getLocalAddress();
272:
273: /**
274: Returns the group address of the group of which the channel is a member. This is
275: the object that was the argument to <code>connect()</code>. Calling this method on a closed
276: channel returns <code>null</code>.
277: @return The group address
278: @deprecated Use {@link #getClusterName()} instead */
279: abstract public String getChannelName();
280:
281: /**
282: Returns the cluster name of the group of which the channel is a member. This is
283: the object that was the argument to <code>connect()</code>. Calling this method on a closed
284: channel returns <code>null</code>.
285: @return The cluster name */
286: abstract public String getClusterName();
287:
288: /**
289: When up_handler is set, all events will be passed to it directly. These will not be received
290: by the channel (except connect/disconnect, state retrieval and the like). This can be used by
291: building blocks on top of a channel; thus the channel is used as a pass-through medium, and
292: the building blocks take over some of the channel's tasks. However, tasks such as connection
293: management and state transfer is still handled by the channel.
294: */
295: public void setUpHandler(UpHandler up_handler) {
296: this .up_handler = up_handler;
297: }
298:
299: /**
300: Allows to be notified when a channel event such as connect, disconnect or close occurs.
301: E.g. a PullPushAdapter may choose to stop when the channel is closed, or to start when
302: it is opened.
303: @deprecated Use addChannelListener() instead
304: */
305: public void setChannelListener(ChannelListener channel_listener) {
306: addChannelListener(channel_listener);
307: }
308:
309: /**
310: Allows to be notified when a channel event such as connect, disconnect or close occurs.
311: E.g. a PullPushAdapter may choose to stop when the channel is closed, or to start when
312: it is opened.
313: */
314: public synchronized void addChannelListener(ChannelListener listener) {
315: if (listener == null)
316: return;
317: if (channel_listeners == null)
318: channel_listeners = new LinkedHashSet();
319: channel_listeners.add(listener);
320: }
321:
322: public synchronized void removeChannelListener(
323: ChannelListener listener) {
324: if (channel_listeners != null)
325: channel_listeners.remove(listener);
326: }
327:
328: /** Sets the receiver, which will handle all messages, view changes etc */
329: public void setReceiver(Receiver r) {
330: receiver = r;
331: }
332:
333: /**
334: Sets an option. The following options are currently recognized:
335: <ol>
336: <li><code>BLOCK</code>. Turn the reception of BLOCK events on/off (value is Boolean).
337: Default is off
338: <li><code>LOCAL</code>. Receive its own broadcast messages to the group
339: (value is Boolean). Default is on.
340: <li><code>AUTO_RECONNECT</code>. Turn auto-reconnection on/off. If on, when a member if forced out
341: of a group (EXIT event), then we will reconnect.
342: <li><code>AUTO_GETSTATE</code>. Turn automatic fetching of state after an auto-reconnect on/off.
343: This also sets AUTO_RECONNECT to true (if not yet set).
344: </ol>
345: This method can be called on an unconnected channel. Calling this method on a
346: closed channel has no effect.
347: */
348: abstract public void setOpt(int option, Object value);
349:
350: /**
351: Gets an option. This method can be called on an unconnected channel. Calling this
352: method on a closed channel returns <code>null</code>.
353:
354: @param option The option to be returned.
355: @return The object associated with an option.
356: */
357: abstract public Object getOpt(int option);
358:
359: abstract public boolean flushSupported();
360:
361: abstract public boolean startFlush(long timeout,
362: boolean automatic_resume);
363:
364: abstract public void stopFlush();
365:
366: /** Called to acknowledge a block() (callback in <code>MembershipListener</code> or
367: <code>BlockEvent</code> received from call to <code>Receive</code>).
368: After sending BlockOk, no messages should be sent until a new view has been received.
369: Calling this method on a closed channel has no effect.
370: */
371: abstract public void blockOk();
372:
373: /**
374: Retrieve the state of the group. Will usually contact the oldest group member to get
375: the state. When the method returns true, a <code>SetStateEvent</code> will have been
376: added to the channel's queue, causing <code>receive()</code> to return the state in one of
377: the next invocations. If false, no state will be retrieved by <code>receive()</code>.
378: @param target The address of the member from which the state is to be retrieved. If it is
379: null, the coordinator is contacted.
380: @param timeout Milliseconds to wait for the response (0 = wait indefinitely).
381: @return boolean True if the state was retrieved successfully, otherwise false.
382: @exception ChannelNotConnectedException The channel must be connected to receive messages.
383:
384: @exception ChannelClosedException The channel is closed and therefore cannot be used
385: any longer. A new channel has to be created first.
386:
387: */
388: abstract public boolean getState(Address target, long timeout)
389: throws ChannelNotConnectedException, ChannelClosedException;
390:
391: /**
392: * Fetches a partial state identified by state_id.
393: * @param target
394: * @param state_id
395: * @param timeout
396: * @return
397: * @throws ChannelNotConnectedException
398: * @throws ChannelClosedException
399: */
400: abstract public boolean getState(Address target, String state_id,
401: long timeout) throws ChannelNotConnectedException,
402: ChannelClosedException;
403:
404: /**
405: Retrieve all states of the group members. Will contact all group members to get
406: the states. When the method returns true, a <code>SetStateEvent</code> will have been
407: added to the channel's queue, causing <code>Receive</code> to return the states in one of
408: the next invocations. If false, no states will be retrieved by <code>Receive</code>.
409: @param targets A list of members which are contacted for states. If the list is null,
410: all the current members of the group will be contacted.
411: @param timeout Milliseconds to wait for the response (0 = wait indefinitely).
412: @return boolean True if the state was retrieved successfully, otherwise false.
413: @exception ChannelNotConnectedException The channel must be connected to
414: receive messages.
415: @exception ChannelClosedException The channel is closed and therefore cannot be used
416: any longer. A new channel has to be created first.
417: @deprecated Not really needed - we always want to get the state from a single member
418: */
419: abstract public boolean getAllStates(Vector targets, long timeout)
420: throws ChannelNotConnectedException, ChannelClosedException;
421:
422: /**
423: * Called by the application is response to receiving a
424: * <code>getState()</code> object when calling <code>receive()</code>.
425: * @param state The state of the application as a byte buffer
426: * (to send over the network).
427: */
428: public abstract void returnState(byte[] state);
429:
430: /** Returns a given substate (state_id of null means return entire state) */
431: public abstract void returnState(byte[] state, String state_id);
432:
433: public static String option2String(int option) {
434: switch (option) {
435: case BLOCK:
436: return "BLOCK";
437: case VIEW:
438: return "VIEW";
439: case SUSPECT:
440: return "SUSPECT";
441: case LOCAL:
442: return "LOCAL";
443: case GET_STATE_EVENTS:
444: return "GET_STATE_EVENTS";
445: case AUTO_RECONNECT:
446: return "AUTO_RECONNECT";
447: case AUTO_GETSTATE:
448: return "AUTO_GETSTATE";
449: default:
450: return "unknown (" + option + ')';
451: }
452: }
453:
454: protected void notifyChannelConnected(Channel c) {
455: if (channel_listeners == null)
456: return;
457: for (Iterator it = channel_listeners.iterator(); it.hasNext();) {
458: ChannelListener channelListener = (ChannelListener) it
459: .next();
460: try {
461: channelListener.channelConnected(c);
462: } catch (Throwable t) {
463: getLog().error(
464: "exception in channelConnected() callback", t);
465: }
466: }
467: }
468:
469: protected void notifyChannelDisconnected(Channel c) {
470: if (channel_listeners == null)
471: return;
472: for (Iterator it = channel_listeners.iterator(); it.hasNext();) {
473: ChannelListener channelListener = (ChannelListener) it
474: .next();
475: try {
476: channelListener.channelDisconnected(c);
477: } catch (Throwable t) {
478: getLog()
479: .error(
480: "exception in channelDisonnected() callback",
481: t);
482: }
483: }
484: }
485:
486: protected void notifyChannelClosed(Channel c) {
487: if (channel_listeners == null)
488: return;
489: for (Iterator it = channel_listeners.iterator(); it.hasNext();) {
490: ChannelListener channelListener = (ChannelListener) it
491: .next();
492: try {
493: channelListener.channelClosed(c);
494: } catch (Throwable t) {
495: getLog().error("exception in channelClosed() callback",
496: t);
497: }
498: }
499: }
500:
501: protected void notifyChannelShunned() {
502: if (channel_listeners == null)
503: return;
504: for (Iterator it = channel_listeners.iterator(); it.hasNext();) {
505: ChannelListener channelListener = (ChannelListener) it
506: .next();
507: try {
508: channelListener.channelShunned();
509: } catch (Throwable t) {
510: getLog().error(
511: "exception in channelShunned() callback", t);
512: }
513: }
514: }
515:
516: protected void notifyChannelReconnected(Address addr) {
517: if (channel_listeners == null)
518: return;
519: for (Iterator it = channel_listeners.iterator(); it.hasNext();) {
520: ChannelListener channelListener = (ChannelListener) it
521: .next();
522: try {
523: channelListener.channelReconnected(addr);
524: } catch (Throwable t) {
525: getLog()
526: .error(
527: "exception in channelReconnected() callback",
528: t);
529: }
530: }
531: }
532:
533: }
|