001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.jms.serverless;
023:
024: import org.jboss.logging.Logger;
025: import javax.jms.Session;
026: import java.util.List;
027: import java.util.ArrayList;
028: import javax.jms.JMSException;
029: import java.util.Iterator;
030: import javax.jms.Message;
031:
032: /**
033: * The main reason for this class to exist is to insure synchronized access to the connection's
034: * session list. It also handles message delivery from the group to sessions and vice-versa.
035: *
036: * @author Ovidiu Feodorov <ovidiu@jboss.org>
037: * @version $Revision: 57195 $ $Date: 2006-09-26 08:08:17 -0400 (Tue, 26 Sep 2006) $
038: *
039: **/
040: class SessionManager implements Runnable {
041:
042: private static final Logger log = Logger
043: .getLogger(SessionManager.class);
044:
045: private GroupConnection connection;
046: private org.jgroups.util.Queue deliveryQueue;
047: private Thread deliveryThread;
048: private List sessions;
049: private int sessionCounter = 0;
050:
051: SessionManager(GroupConnection connection,
052: org.jgroups.util.Queue deliveryQueue) {
053:
054: this .connection = connection;
055: this .deliveryQueue = deliveryQueue;
056: sessions = new ArrayList();
057: deliveryThread = new Thread(this , "Session Delivery Thread");
058: deliveryThread.start();
059: }
060:
061: public Session createSession(boolean transacted, int acknowledgeMode)
062: throws JMSException {
063:
064: Session s = new SessionImpl(this , generateSessionID(),
065: transacted, acknowledgeMode);
066: synchronized (sessions) {
067: sessions.add(s);
068: }
069: return s;
070: }
071:
072: /**
073: * The only way for the managed Session to access the Connection instance. If a session
074: * needs to access the connection directly, that's the way it gets the instance.
075: **/
076: GroupConnection getConnection() {
077: return connection;
078: }
079:
080: // TO_DO: acknowledgement, deal with failed deliveries
081: private void deliver(Message m) {
082:
083: // TO_DO: single threaded access for sessions
084: // So far, the only thread that accesses dispatch() is the connection's puller thread and
085: // this will be the unique thread that accesses the Sessions. This may not be sufficient
086: // for high load, consider the possiblity to (dynamically) add new threads to handle
087: // delivery, possibly a thread per session.
088:
089: synchronized (sessions) {
090: for (Iterator i = sessions.iterator(); i.hasNext();) {
091: ((SessionImpl) i.next()).deliver(m);
092: }
093: }
094: }
095:
096: // TO_DO: acknowledgement, deal with failed deliveries
097: private void deliver(Message m, String sessionID,
098: String queueReceiverID) {
099:
100: // TO_DO: single threaded access for sessions
101: // So far, the only thread that accesses dispatch() is the connection's puller thread and
102: // this will be the unique thread that accesses the Sessions. This may not be sufficient
103: // for high load, consider the possiblity to (dynamically) add new threads to handle
104: // delivery, possibly a thread per session.
105:
106: SessionImpl session = null;
107: synchronized (sessions) {
108: for (Iterator i = sessions.iterator(); i.hasNext();) {
109: SessionImpl crts = (SessionImpl) i.next();
110: if (crts.getID().equals(sessionID)) {
111: session = crts;
112: break;
113: }
114: }
115: }
116: if (session == null) {
117: log.error("No such session: " + sessionID
118: + ". Delivery failed!");
119: } else {
120: session.deliver(m, queueReceiverID);
121: }
122: }
123:
124: /**
125: * Method called by a managed sessions when a new queue receiver is created or removed.
126: * The queue receiver has to be advertised to the group, to update the queue section of the
127: * group state.
128: **/
129: void advertiseQueueReceiver(String sessionID, QueueReceiverImpl qr,
130: boolean isOn) throws JMSException {
131: try {
132: connection.advertiseQueueReceiver(qr.getQueue()
133: .getQueueName(), sessionID, qr.getID(), isOn);
134: } catch (ProviderException e) {
135: // the multicast failed, the queue receiver is invalid
136: String msg = "Cannot advertise queue receiver";
137: JMSException jmse = new JMSException(msg);
138: jmse.setLinkedException(e);
139: throw jmse;
140: }
141: }
142:
143: //
144: //
145: //
146:
147: /**
148: * Generate a session ID that is quaranteed to be unique for the life time of a SessionManager
149: * instance.
150: **/
151: private synchronized String generateSessionID() {
152: return Integer.toString(sessionCounter++);
153: }
154:
155: //
156: // Runnable INTERFACE IMPLEMENTATION
157: //
158:
159: public void run() {
160:
161: while (true) {
162:
163: try {
164: Object o = deliveryQueue.remove();
165: if (o instanceof javax.jms.Message) {
166: deliver((javax.jms.Message) o);
167: } else if (o instanceof QueueCarrier) {
168: QueueCarrier qc = (QueueCarrier) o;
169: deliver(qc.getJMSMessage(), qc.getSessionID(), qc
170: .getReceiverID());
171: } else {
172: log.warn("Unknown delivery object: "
173: + (o == null ? "null" : o.getClass()
174: .getName()));
175: }
176: } catch (Exception e) {
177: log
178: .warn(
179: "Failed to remove element from the delivery queue",
180: e);
181: }
182: }
183: }
184:
185: //
186: // END Runnable INTERFACE IMPLEMENTATION
187: //
188:
189: }
|