001: /*
002: * MessageService: The message service daemon
003: * Copyright (C) 2006 Rift IT Contracting
004: *
005: * This library is free software; you can redistribute it and/or
006: * modify it under the terms of the GNU Lesser General Public
007: * License as published by the Free Software Foundation; either
008: * version 2.1 of the License, or (at your option) any later version.
009: *
010: * This library is distributed in the hope that it will be useful,
011: * but WITHOUT ANY WARRANTY; without even the implied warranty of
012: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
013: * Lesser General Public License for more details.
014: *
015: * You should have received a copy of the GNU Lesser General Public
016: * License along with this library; if not, write to the Free Software
017: * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
018: *
019: * MessageQueueManager.java
020: */
021:
022: // package path
023: package com.rift.coad.daemon.messageservice;
024:
025: // java imports
026: import java.util.Date;
027: import java.util.List;
028: import java.util.ArrayList;
029: import java.util.Map;
030: import java.util.HashMap;
031: import java.util.Vector;
032: import java.util.concurrent.ConcurrentHashMap;
033: import javax.transaction.xa.XAException;
034: import javax.transaction.xa.XAResource;
035: import javax.transaction.xa.Xid;
036:
037: // logging import
038: import org.apache.log4j.Logger;
039:
040: // hibernate imports
041: import org.hibernate.*;
042: import org.hibernate.cfg.*;
043:
044: // coadunation imports
045: import com.rift.coad.util.transaction.TransactionManager;
046: import com.rift.coad.util.lock.LockRef;
047: import com.rift.coad.util.lock.ObjectLockFactory;
048: import com.rift.coad.daemon.messageservice.db.*;
049: import com.rift.coad.hibernate.util.HibernateUtil;
050:
051: /**
052: * This object is responsible for managing the message queues.
053: *
054: * @author Brett Chaldecott
055: */
056: public class MessageQueueManager implements XAResource {
057:
058: /**
059: * This object tracks the changes made by a transaction
060: */
061: public class Changes {
062: // the class private member variables
063: private Xid transactionId = null;
064: private List queues = new ArrayList();
065: private List locks = new ArrayList();
066:
067: /**
068: * The constructor of the changes object.
069: *
070: * @param transactionId The id of the current transaction
071: */
072: public Changes(Xid transactionId) {
073: this .transactionId = transactionId;
074: }
075:
076: /**
077: * This method adds a new message queue to the list of changes.
078: *
079: * @param messageQueue
080: */
081: public void add(MessageQueue messageQueue, LockRef lockRef)
082: throws MessageServiceException {
083: try {
084: lockRef.setLockName(transactionId);
085: locks.add(lockRef);
086: queues.add(messageQueue);
087: } catch (Exception ex) {
088: log.error("Failed to add the " + "change entries : "
089: + ex.getMessage(), ex);
090: throw new MessageServiceException("Failed to add the "
091: + "change entries + " + ex.getMessage(), ex);
092: }
093: }
094:
095: /**
096: * This method returns the list of queues.
097: *
098: * @return The list of queues.
099: */
100: public List getQueues() {
101: return queues;
102: }
103:
104: /**
105: * This method returns the list of locks.
106: *
107: * @return The list of locks.
108: */
109: public List getLocks() {
110: return locks;
111: }
112: }
113:
114: // class constants
115: public final static String UNSORTED = "UNSORTED";
116: public final static String DEAD_LETTER = "DEAD_LETTER";
117:
118: // private singleton methods
119: private static MessageQueueManager singleton = null;
120:
121: // the logger reference
122: protected Logger log = Logger.getLogger(MessageQueueManager.class
123: .getName());
124:
125: // private member variables
126: private ThreadLocal currentTransaction = new ThreadLocal();
127: private Map keyLockMap = new HashMap();
128: private Map messageQueues = new ConcurrentHashMap();
129: private Map transactionChanges = new ConcurrentHashMap();
130: private Vector listIndex = new Vector();
131: private int pos = 0;
132:
133: /**
134: * Creates a new instance of MessageQueueManager
135: */
136: private MessageQueueManager() {
137: }
138:
139: /**
140: * This method returns an instance of the MessageQueueManager singleton.
141: *
142: * @return An instance of the message queue manager.
143: */
144: public static synchronized MessageQueueManager getInstance() {
145: if (singleton == null) {
146: singleton = new MessageQueueManager();
147: }
148: return singleton;
149: }
150:
151: /**
152: * This method returns a reference to the message queue in questions.
153: *
154: * @return The reference to the message queue.
155: * @param name The name of the message queue to return.
156: * @exception MessageServiceException
157: */
158: public MessageQueue getQueue(String name)
159: throws MessageServiceException {
160: LockRef lockRef = null;
161: try {
162: lockRef = getLock(name);
163: if (messageQueues.containsKey(name)) {
164: MessageQueue messageQueue = (MessageQueue) messageQueues
165: .get(name);
166: lockRef.release();
167: return messageQueue;
168: }
169: //MessageTransactionLock.getInstance().lock();
170: Session session = HibernateUtil.getInstance(
171: MessageServiceManager.class).getSession();
172: List list = session.createQuery(
173: "FROM MessageQueue AS queue "
174: + "WHERE queue.messageQueueName = ?")
175: .setString(0, name).list();
176: MessageQueue queue = new MessageQueue(name);
177: if (list.size() == 1) {
178: com.rift.coad.daemon.messageservice.db.MessageQueue dbQueue = (com.rift.coad.daemon.messageservice.db.MessageQueue) list
179: .get(0);
180: if ((dbQueue.getNamed() != null)
181: && (dbQueue.getNamed() == 1)) {
182: log.error("This is a named queue [" + name
183: + "] and cannot be loaded into memory.");
184: throw new MessageServiceException(
185: "This is a named queue ["
186: + name
187: + "] and cannot be loaded into memory.");
188: }
189: messageQueues.put(name, queue);
190: addQueueToIndex(queue);
191: return queue;
192: }
193: TransactionManager.getInstance().bindResource(this , false);
194: com.rift.coad.daemon.messageservice.db.MessageQueue dbQueue = new com.rift.coad.daemon.messageservice.db.MessageQueue(
195: name);
196: session.persist(dbQueue);
197: Changes changes = (Changes) currentTransaction.get();
198: changes.add(queue, lockRef);
199: lockRef = null;
200: return queue;
201: } catch (MessageServiceException ex) {
202: throw ex;
203: } catch (Exception ex) {
204: log.error("Failed to retrieve th message queue [" + name
205: + "] : " + ex.getMessage(), ex);
206: throw new MessageServiceException(
207: "Failed to retrieve th message queue [" + name
208: + "] : " + ex.getMessage(), ex);
209: } finally {
210: if (lockRef != null) {
211: try {
212: lockRef.release();
213: } catch (Exception ex2) {
214: log.error("Failed to unlock the queue [" + name
215: + "] : " + ex2.getMessage(), ex2);
216: }
217: }
218: }
219: }
220:
221: /**
222: * This method returns the next message to process.
223: *
224: * @return NULL if no message is found, A message process object otherwise.
225: * @param nextRunTime The next run time.
226: * @exception MessageServiceException
227: */
228: public synchronized MessageProcessInfo getNextMessage(
229: Date nextRunTime) throws MessageServiceException {
230: Vector index = cloneIndex();
231: int currentPos = pos;
232: Date currentDate = nextRunTime;
233: MessageManager result = null;
234: while (index.size() > 0) {
235: currentPos++;
236: if (currentPos >= index.size()) {
237: currentPos = 0;
238: }
239: MessageQueue messageQueue = (MessageQueue) index
240: .get(currentPos);
241: Date nextDate = new Date();
242: result = messageQueue.popFrontMessage(nextDate);
243: if (result != null) {
244: MessageProcessInfo messageProcessInfo = new MessageProcessInfo(
245: messageQueue, result);
246: return messageProcessInfo;
247: }
248: if ((currentDate == nextRunTime)
249: || (currentDate.getTime() > nextDate.getTime())) {
250: currentDate = nextDate;
251: }
252: if (currentPos == pos) {
253: break;
254: }
255: }
256: // set the next runtime delay
257: nextRunTime.setTime(currentDate.getTime());
258:
259: // reset the pos
260: pos = currentPos;
261:
262: // return the result
263: return null;
264: }
265:
266: /**
267: * This method is called to commit the specified transaction.
268: *
269: * @param xid The id of the transaction to commit.
270: * @param onePhase If true a one phase commit should be used.
271: * @exception XAException
272: */
273: public void commit(Xid xid, boolean b) throws XAException {
274: try {
275: Changes changes = (Changes) transactionChanges.get(xid);
276: transactionChanges.remove(xid);
277: List queues = changes.getQueues();
278: List locks = changes.getLocks();
279: for (int index = 0; index < queues.size(); index++) {
280: MessageQueue queue = (MessageQueue) queues.get(index);
281: messageQueues.put(queue.getName(), queue);
282: addQueueToIndex(queue);
283: }
284: for (int index = 0; index < locks.size(); index++) {
285: LockRef lockRef = (LockRef) locks.get(index);
286: lockRef.release();
287: }
288: } catch (Exception ex) {
289: log.error("Failed to commit the changes : "
290: + ex.getMessage(), ex);
291: throw new XAException("Failed to commit the changes : "
292: + ex.getMessage());
293: }
294: }
295:
296: /**
297: * The resource manager has dissociated this object from the transaction.
298: *
299: * @param xid The id of the transaction that is getting ended.
300: * @param flags The flags associated with this operation.
301: * @exception XAException
302: */
303: public void end(Xid xid, int i) throws XAException {
304: }
305:
306: /**
307: * The transaction has been completed and must be forgotten.
308: *
309: * @param xid The id of the transaction to forget.
310: * @exception XAException
311: */
312: public void forget(Xid xid) throws XAException {
313: try {
314: Changes changes = (Changes) transactionChanges.get(xid);
315: transactionChanges.remove(xid);
316: List locks = changes.getLocks();
317: for (int index = 0; index < locks.size(); index++) {
318: LockRef lockRef = (LockRef) locks.get(index);
319: lockRef.release();
320: }
321: } catch (Exception ex) {
322: log.error("Failed to forget the changes : "
323: + ex.getMessage(), ex);
324: throw new XAException("Failed to forget the changes : "
325: + ex.getMessage());
326: }
327: }
328:
329: /**
330: * This method returns the transaction timeout for this object.
331: *
332: * @return The int containing the transaction timeout.
333: * @exception XAException
334: */
335: public int getTransactionTimeout() throws XAException {
336: return -1;
337: }
338:
339: /**
340: * This method returns true if this object is the resource manager getting
341: * queried.
342: *
343: * @return TRUE if this is the resource manager, FALSE if not.
344: * @param xaResource The resource to perform the check against.
345: * @exception XAException
346: */
347: public boolean isSameRM(XAResource xAResource) throws XAException {
348: return this == xAResource;
349: }
350:
351: /**
352: * This is called before a transaction is committed.
353: *
354: * @return The results of the transaction.
355: * @param xid The id of the transaction to check against.
356: * @exception XAException
357: */
358: public int prepare(Xid xid) throws XAException {
359: return XAResource.XA_OK;
360: }
361:
362: /**
363: * This method returns the list of transaction branches for this resource
364: * manager.
365: *
366: * @return The list of resource branches.
367: * @param flags The flags
368: * @exception XAException
369: */
370: public Xid[] recover(int i) throws XAException {
371: return null;
372: }
373:
374: /**
375: * This method is called to roll back the specified transaction.
376: *
377: * @param xid The id of the transaction to roll back.
378: * @exception XAException
379: */
380: public void rollback(Xid xid) throws XAException {
381: try {
382: Changes changes = (Changes) transactionChanges.get(xid);
383: transactionChanges.remove(xid);
384: List locks = changes.getLocks();
385: for (int index = 0; index < locks.size(); index++) {
386: LockRef lockRef = (LockRef) locks.get(index);
387: lockRef.release();
388: }
389: } catch (Exception ex) {
390: log.error("Failed to rollback the changes : "
391: + ex.getMessage(), ex);
392: throw new XAException("Failed to rollback the changes : "
393: + ex.getMessage());
394: }
395: }
396:
397: /**
398: * This method sets the transaction timeout for this resource manager.
399: *
400: * @return TRUE if the transaction timeout can be set successfully.
401: * @param transactionTimeout The new transaction timeout value.
402: * @exception XAException
403: */
404: public boolean setTransactionTimeout(int i) throws XAException {
405: return true;
406: }
407:
408: /**
409: * This method is called to start a transaction on a resource manager.
410: *
411: * @param xid The id of the new transaction.
412: * @param flags The flags associated with the transaction.
413: * @exception XAException
414: */
415: public void start(Xid xid, int i) throws XAException {
416: if (transactionChanges.containsKey(xid)) {
417: currentTransaction.set(transactionChanges.get(xid));
418: } else {
419: Changes changes = new Changes(xid);
420: transactionChanges.put(xid, changes);
421: currentTransaction.set(changes);
422: }
423: }
424:
425: /**
426: * This method returns the named lock
427: *
428: * @return The reference to the lock.
429: * @param The name of the queue that must be locked.
430: * @exception MessageServiceException
431: */
432: private LockRef getLock(String name) throws MessageServiceException {
433: try {
434: Object key = null;
435: synchronized (keyLockMap) {
436: if (keyLockMap.containsKey(name)) {
437: key = keyLockMap.get(name);
438: } else {
439: key = new String(name);
440: keyLockMap.put(name, key);
441: }
442: }
443: LockRef lockRef = ObjectLockFactory.getInstance()
444: .acquireWriteLock(key);
445: Changes changes = (Changes) currentTransaction.get();
446:
447: return lockRef;
448: } catch (Exception ex) {
449: log.error(
450: "Failed to retrieve a lock on the message queue : "
451: + ex.getMessage(), ex);
452: throw new MessageServiceException(
453: "Failed to retrieve a lock on the message queue : "
454: + ex.getMessage(), ex);
455: }
456: }
457:
458: /**
459: * This method is called to add an entry to the messsage queue
460: */
461: private void addQueueToIndex(MessageQueue messageQueue) {
462: synchronized (listIndex) {
463: listIndex.add(messageQueue);
464: }
465: }
466:
467: /**
468: * This method is called to clone the index.
469: *
470: * @return The cloned index
471: */
472: private Vector cloneIndex() {
473: synchronized (listIndex) {
474: return (Vector) listIndex.clone();
475: }
476: }
477: }
|