001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.jms.serverless;
023:
024: import org.jboss.logging.Logger;
025: import javax.jms.Connection;
026: import javax.jms.JMSException;
027: import javax.jms.ConnectionMetaData;
028: import javax.jms.ExceptionListener;
029: import javax.jms.ConnectionConsumer;
030: import javax.jms.ServerSessionPool;
031: import javax.jms.Destination;
032: import javax.jms.Topic;
033: import javax.jms.Session;
034: import org.jgroups.JChannel;
035: import org.jgroups.ChannelListener;
036: import org.jgroups.Channel;
037: import org.jgroups.Address;
038: import org.jgroups.ChannelException;
039: import java.io.Serializable;
040: import java.net.URL;
041: import javax.jms.Queue;
042: import org.jgroups.SetStateEvent;
043: import org.jgroups.util.Util;
044: import org.jgroups.GetStateEvent;
045: import org.jgroups.View;
046: import org.jgroups.SuspectEvent;
047: import org.jgroups.ChannelClosedException;
048: import org.jgroups.ChannelNotConnectedException;
049:
050: /**
051: * The main piece of the JMS client runtime. Sits in top of a JChannel and mainains the "server
052: * group" state. Delegates the session management to the SessionManager instance. Deals with
053: * message delivery to and from sessions. Implements the Connection interface.
054: *
055: * @author Ovidiu Feodorov <ovidiu@jboss.org>
056: * @version $Revision: 57195 $ $Date: 2006-09-26 08:08:17 -0400 (Tue, 26 Sep 2006) $
057: *
058: **/
059: class GroupConnection implements Connection, Runnable {
060:
061: private static final Logger log = Logger
062: .getLogger(GroupConnection.class);
063:
064: private static final String DEFAULT_SERVER_GROUP_NAME = "serverGroup";
065:
066: private URL serverChannelConfigURL;
067:
068: private SessionManager sessionManager;
069: private org.jgroups.util.Queue deliveryQueue;
070: private ConnectionState connState;
071:
072: // private ChannelState channelState;
073: private GroupState groupState;
074: private Thread connManagementThread;
075: private JChannel serverChannel;
076:
077: /**
078: * The constructor leaves the Connection in a DISCONNECTED state.
079: *
080: * @param serverChannelConfigURL the URL of the XML file containing channel configuration.
081: **/
082: GroupConnection(URL serverChannelConfigURL) {
083:
084: this .serverChannelConfigURL = serverChannelConfigURL;
085:
086: deliveryQueue = new org.jgroups.util.Queue();
087: sessionManager = new SessionManager(this , deliveryQueue);
088: groupState = new GroupState();
089: connManagementThread = new Thread(this ,
090: "Connection Management Thread");
091: connState = new ConnectionState();
092:
093: }
094:
095: /**
096: * Initalizes the connection, by connecting the channel to the server group. Should be called
097: * only once, when the Connection instance is created.
098: **/
099: void connect() throws JMSException {
100:
101: // TO_DO: if is already connected (stopped), just return
102:
103: try {
104:
105: serverChannel = new JChannel(serverChannelConfigURL);
106: serverChannel
107: .setOpt(Channel.GET_STATE_EVENTS, Boolean.TRUE);
108: serverChannel.setChannelListener(new ChannelListener() {
109:
110: public void channelClosed(Channel channel) {
111: log.debug("channelClosed(" + channel + ")");
112: }
113:
114: public void channelConnected(Channel channel) {
115: log.debug("channelConnected() to group ["
116: + channel.getChannelName() + "]");
117: }
118:
119: public void channelDisconnected(Channel channel) {
120: log.debug("channelDisconnected(" + channel + ")");
121: }
122:
123: public void channelReconnected(Address addr) {
124: log.debug("channelReconnected(" + addr + ")");
125: }
126:
127: public void channelShunned() {
128: log.debug("channelShunned()");
129: }
130: });
131:
132: log.debug("channel created");
133: serverChannel.connect(DEFAULT_SERVER_GROUP_NAME);
134: log.debug("channel connected");
135: connState.setStopped();
136: connManagementThread.start();
137: log.debug("Connection Management Thread started");
138: boolean getStateOK = serverChannel.getState(null, 0);
139: log.debug("getState(): " + getStateOK);
140: } catch (ChannelException e) {
141: String msg = "Failed to create an active connection";
142: log.error(msg, e);
143: JMSException jmse = new JMSException(msg);
144: jmse.setLinkedException(e);
145: throw jmse;
146: }
147: }
148:
149: // TO_DO: deal with situation when this method is accessed concurrently from different threads
150: void send(javax.jms.Message m) throws JMSException {
151:
152: try {
153: // the Destination is already set for the message
154: if (m.getJMSDestination() instanceof Topic) {
155: // for topics, multicast
156: serverChannel.send(null, null, (Serializable) m);
157: } else {
158: // for queues, unicast to the coordinator
159:
160: // TO_DO: optimization, if I am the only on in group, don't send the messages
161: // down the stack anymore
162: org.jgroups.Message jgmsg = new org.jgroups.Message(
163: (Address) serverChannel.getView().getMembers()
164: .get(0), null, new QueueCarrier(m));
165: serverChannel.send(jgmsg);
166: }
167: } catch (Exception e) {
168: String msg = "Failed to send message";
169: log.error(msg, e);
170: JMSException jmse = new JMSException(msg);
171: jmse.setLinkedException(e);
172: throw jmse;
173: }
174:
175: }
176:
177: //
178: // Runnable INTERFACE IMPLEMENTATION
179: //
180:
181: /**
182: * Code executed on the Connection Management Thread thread. It synchronously pulls JG
183: * message and events from the channel.
184: **/
185: public void run() {
186:
187: Object incoming = null;
188:
189: while (true) {
190:
191: try {
192: incoming = serverChannel.receive(0);
193: } catch (ChannelClosedException e) {
194: log.debug("Channel closed, exiting");
195: break;
196: } catch (ChannelNotConnectedException e) {
197: log
198: .warn("TO_DO: Channel not connected, I should block the thread ...");
199: continue;
200: } catch (Exception e) {
201: // TO_DO: use a JMS ExceptionListener and do some other things as well ....
202: log
203: .error(
204: "Failed to synchronously read from the channel",
205: e);
206: }
207:
208: try {
209: dispatch(incoming);
210: } catch (Exception e) {
211: // TO_DO: I don't want that poorly written client code (dispatch() ends running
212: // MessageListener code) to throw RuntimeException and terminate this thread
213: // use the ExceptionListener and do some other things as well ....
214: log.error("Dispatching failed", e);
215: }
216: }
217: }
218:
219: //
220: //
221: //
222:
223: private void dispatch(Object o) throws Exception {
224:
225: log.debug("dispatching " + o);
226:
227: if (o instanceof SetStateEvent) {
228: byte[] buffer = ((SetStateEvent) o).getArg();
229: if (buffer == null) {
230: // that's ok if I am the coordinator, just ignore it
231: log.debug("null group state, ignoring ...");
232: } else {
233: // update my group state
234: groupState.fromByteBuffer(buffer);
235: }
236: return;
237: } else if (o instanceof GetStateEvent) {
238: // somebody is requesting the group state
239: serverChannel.returnState(groupState.toByteBuffer());
240: return;
241: } else if (o instanceof View) {
242: // no use for it for the time being
243: return;
244: } else if (o instanceof SuspectEvent) {
245: // no use for it for the time being
246: return;
247: } else if (!(o instanceof org.jgroups.Message)) {
248: // ignore it for the time being
249: log.warn("Ignoring " + o);
250: return;
251: }
252:
253: org.jgroups.Message jgmsg = (org.jgroups.Message) o;
254: Object payload = jgmsg.getObject();
255: if (payload instanceof ServerAdminCommand) {
256: // ADD_QUEUE_RECEIVER, aso
257: handleServerAdminCommand(jgmsg.getSrc(),
258: (ServerAdminCommand) payload);
259: } else if (payload instanceof QueueCarrier) {
260: QueueCarrier qc = (QueueCarrier) payload;
261: String sessionID = qc.getSessionID();
262: // this is either an initial queue carrier that forwards the message from its
263: // source to the coordinator, or a final queue carrier that forwards the message
264: // from the coordinator to its final destination.
265: if (sessionID == null) {
266: queueForward(qc);
267: } else {
268: deliveryQueue.add(qc);
269: }
270: } else if (payload instanceof javax.jms.Message) {
271: // deliver only if the connection is started, discard otherwise
272: if (connState.isStarted()) {
273: deliveryQueue.add((javax.jms.Message) payload);
274: }
275: } else {
276: log
277: .warn("JG Message with a payload something else than a JMS Message: "
278: + (payload == null ? "null" : payload
279: .getClass().getName()));
280: }
281: }
282:
283: private void handleServerAdminCommand(Address src,
284: ServerAdminCommand c) {
285: //log.debug("Handling "+c.getCommand());
286: String comm = c.getCommand();
287: if (ServerAdminCommand.ADD_QUEUE_RECEIVER.equals(comm)) {
288: String queueName = (String) c.get(0);
289: String sessionID = (String) c.get(1);
290: String queueReceiverID = (String) c.get(2);
291: groupState.addQueueReceiver(queueName, src, sessionID,
292: queueReceiverID);
293: } else if (ServerAdminCommand.REMOVE_QUEUE_RECEIVER
294: .equals(comm)) {
295: String queueName = (String) c.get(0);
296: String sessionID = (String) c.get(1);
297: String queueReceiverID = (String) c.get(2);
298: groupState.removeQueueReceiver(queueName, src, sessionID,
299: queueReceiverID);
300: } else {
301: log.error("Unknown server administration command: " + comm);
302: }
303: }
304:
305: void advertiseQueueReceiver(String queueName, String sessionID,
306: String queueReceiverID, boolean isOn)
307: throws ProviderException {
308:
309: try {
310: // multicast the change, this will update my own state as well
311: String cs = isOn ? ServerAdminCommand.ADD_QUEUE_RECEIVER
312: : ServerAdminCommand.REMOVE_QUEUE_RECEIVER;
313: ServerAdminCommand comm = new ServerAdminCommand(cs,
314: queueName, sessionID, queueReceiverID);
315: serverChannel.send(null, null, comm);
316: } catch (ChannelException e) {
317: throw new ProviderException(
318: "Failed to advertise the queue receiver", e);
319: }
320: }
321:
322: private void queueForward(QueueCarrier qc) throws Exception {
323:
324: Queue destQueue = (Queue) qc.getJMSMessage()
325: .getJMSDestination();
326: QueueReceiverAddress ra = groupState.selectReceiver(destQueue
327: .getQueueName());
328: if (ra == null) {
329: // TO_DO: no receivers for this queue, discard it for the time being
330: log.warn("Discarding message for queue "
331: + destQueue.getQueueName() + "!");
332: return;
333: }
334: Address destAddress = ra.getAddress();
335: qc.setSessionID(ra.getSessionID());
336: qc.setReceiverID(ra.getReceiverID());
337:
338: // forward it to the final destination
339: serverChannel.send(destAddress, null, qc);
340:
341: }
342:
343: //
344: // Connection INTERFACE IMPLEMENTATION
345: //
346:
347: public void start() throws JMSException {
348:
349: // makes sense to call it only a connection that is stopped. If called on a started
350: // connection, the call is ignored. If called on a closed connection: TO_DO
351: // TO_DO: throw apropriate exceptions for illegal transitions
352: if (connState.isStarted()) {
353: return;
354: }
355: synchronized (connState) {
356: connState.setStarted();
357: connState.notify();
358: }
359:
360: }
361:
362: public void stop() throws JMSException {
363:
364: // TO_DO: throw apropriate exceptions for illegal transitions
365: connState.setStopped();
366: }
367:
368: public void close() throws JMSException {
369:
370: // TO_DO: throw apropriate exceptions for illegal transitions
371: // TO_DO: read the rest of specs and make sure I comply; tests
372: if (connState.isClosed()) {
373: return;
374: }
375: connState.setClosed();
376: serverChannel.close();
377:
378: }
379:
380: public Session createSession(boolean transacted, int acknowledgeMode)
381: throws JMSException {
382:
383: return sessionManager
384: .createSession(transacted, acknowledgeMode);
385:
386: }
387:
388: public String getClientID() throws JMSException {
389: throw new NotImplementedException();
390: }
391:
392: public void setClientID(String clientID) throws JMSException {
393:
394: // Once the connection has been initialized, the runtime provides a ClientID, that cannot
395: // be changed by the user; according to JMS1.1 specs, the method should throw
396: // IllegalStateException
397: String msg = "ClientID (" + "" + ") cannot be modified";
398: throw new IllegalStateException(msg);
399: }
400:
401: public ConnectionMetaData getMetaData() throws JMSException {
402: throw new NotImplementedException();
403: }
404:
405: public ExceptionListener getExceptionListener() throws JMSException {
406: throw new NotImplementedException();
407: }
408:
409: public void setExceptionListener(ExceptionListener listener)
410: throws JMSException {
411: throw new NotImplementedException();
412: }
413:
414: public ConnectionConsumer createConnectionConsumer(
415: Destination destination, String messageSelector,
416: ServerSessionPool sessionPool, int maxMessages)
417: throws JMSException {
418: throw new NotImplementedException();
419: }
420:
421: public ConnectionConsumer createDurableConnectionConsumer(
422: Topic topic, String subscriptionName,
423: String messageSelector, ServerSessionPool sessionPool,
424: int maxMessages) throws JMSException {
425: throw new NotImplementedException();
426: }
427:
428: //
429: // END OF Connection INTERFACE IMPLEMENTATION
430: //
431:
432: /**
433: * Debugging only
434: **/
435: public static void main(String[] args) throws Exception {
436:
437: GroupConnection c = new GroupConnection(new URL(args[0]));
438: c.connect();
439: }
440:
441: }
|