001: /*
002: * MessageService: The message service daemon
003: * Copyright (C) 2006 Rift IT Contracting
004: *
005: * This library is free software; you can redistribute it and/or
006: * modify it under the terms of the GNU Lesser General Public
007: * License as published by the Free Software Foundation; either
008: * version 2.1 of the License, or (at your option) any later version.
009: *
010: * This library is distributed in the hope that it will be useful,
011: * but WITHOUT ANY WARRANTY; without even the implied warranty of
012: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
013: * Lesser General Public License for more details.
014: *
015: * You should have received a copy of the GNU Lesser General Public
016: * License along with this library; if not, write to the Free Software
017: * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
018: *
019: * MessageQueue.java
020: */
021:
022: // package path
023: package com.rift.coad.daemon.messageservice;
024:
025: // java imports
026: import java.lang.ThreadLocal;
027: import java.util.Date;
028: import java.util.Queue;
029: import java.util.PriorityQueue;
030: import java.util.HashMap;
031: import java.util.Map;
032: import java.util.ArrayList;
033: import java.util.concurrent.ConcurrentHashMap;
034: import javax.transaction.xa.XAException;
035: import javax.transaction.xa.XAResource;
036: import javax.transaction.xa.Xid;
037:
038: // logging import
039: import org.apache.log4j.Logger;
040:
041: // coadunation imports
042: import com.rift.coad.util.transaction.TransactionManager;
043: import com.rift.coad.util.lock.LockRef;
044: import com.rift.coad.util.lock.ObjectLockFactory;
045:
046: /**
047: * The message queue object is responsible for queuing messages in memory.
048: *
049: * @author Brett Chaldecott
050: */
051: public class MessageQueue implements XAResource {
052:
053: /**
054: * This object tracks the changes made by a transaction.
055: */
056: public class TransactionChange {
057:
058: // private member variables
059: private ArrayList addList = new ArrayList();
060: private ArrayList updateList = new ArrayList();
061: private ArrayList removeList = new ArrayList();
062:
063: /**
064: * The constructor of the transaction change object.
065: */
066: public TransactionChange() {
067:
068: }
069:
070: /**
071: * This method adds to the add list
072: *
073: * @param message The message to add to the add list.
074: */
075: public void add(MessageManager messageManager) {
076: addList.add(messageManager);
077: }
078:
079: /**
080: * This method returns the list of added entries.
081: *
082: * @return The list of added messages.
083: */
084: public ArrayList getAddList() {
085: return addList;
086: }
087:
088: /**
089: * This method add an entry to the update list.
090: *
091: * @param message The message to add.
092: */
093: public void update(MessageManager messageManager) {
094: updateList.add(messageManager);
095: }
096:
097: /**
098: * This method returns an array list containing all the updated messages.
099: *
100: * @return The array list of all the updated messages.
101: */
102: public ArrayList getUpdateList() {
103: return updateList;
104: }
105:
106: /**
107: * This method marks a message within a transaction as being removed
108: * by that transaction.
109: *
110: * @param message The message to remove.
111: */
112: public void remove(MessageManager messageManager) {
113: removeList.add(messageManager);
114: }
115:
116: /**
117: * This method returns list of removed entries.
118: *
119: */
120: public ArrayList getRemoveList() {
121: return removeList;
122: }
123: }
124:
125: /**
126: * This is the id index.
127: */
128: public class IDIndex {
129: private Map baseIndex = new HashMap();
130: private ThreadLocal threadIndex = new ThreadLocal();
131: private Map transactionIndex = new HashMap();
132:
133: /**
134: * The constructor of the ID index.
135: */
136: public IDIndex() {
137:
138: }
139:
140: /**
141: * This method is called to commit the specified transaction.
142: *
143: * @param xid The id of the transaction to commit.
144: * @exception MessageServiceException
145: */
146: public synchronized void commit(Xid xid)
147: throws MessageServiceException {
148: try {
149: TransactionChange changes = (TransactionChange) transactionChange
150: .get(xid);
151: // add entries
152: ArrayList addedEntries = changes.getAddList();
153: for (int index = 0; index < addedEntries.size(); index++) {
154: MessageManager messageManager = (MessageManager) addedEntries
155: .get(index);
156: baseIndex.put(messageManager.getID(),
157: messageManager);
158: }
159:
160: // remove entries
161: ArrayList removedEntries = changes.getRemoveList();
162: for (int index = 0; index < removedEntries.size(); index++) {
163: MessageManager messageManager = (MessageManager) removedEntries
164: .get(index);
165: baseIndex.remove(messageManager.getID());
166: }
167: transactionIndex.remove(xid);
168: } catch (Exception ex) {
169: log.error("Failed to commit the transaction : "
170: + ex.getMessage(), ex);
171: throw new MessageServiceException(
172: "Failed to commit the " + "transaction : "
173: + ex.getMessage());
174: }
175: }
176:
177: /**
178: * The transaction has been completed and must be forgotten.
179: *
180: * @param xid The id of the transaction to forget.
181: * @exception MessageServiceException
182: */
183: public synchronized void forget(Xid xid)
184: throws MessageServiceException {
185: transactionIndex.remove(xid);
186: }
187:
188: /**
189: * This method is called to roll back the specified transaction.
190: *
191: * @param xid The id of the transaction to roll back.
192: * @exception MessageServiceException
193: */
194: public synchronized void rollback(Xid xid)
195: throws MessageServiceException {
196: transactionIndex.remove(xid);
197: }
198:
199: /**
200: * This method is called to start a transaction on a resource manager.
201: *
202: * @param xid The id of the new transaction.
203: * @exception MessageServiceException
204: */
205: public synchronized void start(Xid xid)
206: throws MessageServiceException {
207: Map transactionScopedIndex = null;
208: if (transactionIndex.containsKey(xid)) {
209: transactionScopedIndex = (Map) transactionIndex
210: .get(xid);
211: } else {
212: transactionScopedIndex = new HashMap(baseIndex);
213: transactionIndex.put(xid, transactionScopedIndex);
214: }
215: threadIndex.set(transactionScopedIndex);
216: }
217:
218: /**
219: * This method adds a message to the message queue.
220: *
221: * @param message The message to add to the message queue.
222: * @exception MessageServiceException
223: */
224: public void addMessage(MessageManager message)
225: throws MessageServiceException {
226: Map index = (Map) threadIndex.get();
227: index.put(message.getID(), message);
228: }
229:
230: /**
231: * This method removes the specified message from the list.
232: *
233: * @param messageId The id of the message to remove.
234: * @exception MessageServiceException
235: */
236: public void removeMessage(String messageId)
237: throws MessageServiceException {
238: try {
239: Map index = (Map) threadIndex.get();
240: if (index.containsKey(messageId)) {
241: MessageManager message = (MessageManager) index
242: .get(messageId);
243: TransactionChange change = (TransactionChange) transactionChange
244: .get(transactionId.get());
245: change.remove(message);
246: index.remove(messageId);
247: } else {
248: throw new MessageServiceException("The message ["
249: + messageId + "] was not found to remove");
250: }
251: } catch (MessageServiceException ex) {
252: throw ex;
253: } catch (Exception ex) {
254: log.error("Failed to remove the message : "
255: + ex.getMessage(), ex);
256: throw new MessageServiceException(
257: "Failed to remove the message : "
258: + ex.getMessage(), ex);
259: }
260: }
261:
262: /**
263: * This method retrieves the specified message from the list.
264: *
265: * @param messageId The id of the message to retrieve.
266: * @exception MessageServiceException
267: */
268: public MessageManager getMessage(String messageId)
269: throws MessageServiceException {
270: Map index = (Map) threadIndex.get();
271: if (index.containsKey(messageId)) {
272: return (MessageManager) index.get(messageId);
273: } else {
274: throw new MessageServiceException("The message ["
275: + messageId + "] was not found.");
276: }
277: }
278:
279: }
280:
281: /**
282: * This class is responsible for managing the queue of entries.
283: */
284: public class QueueIndex {
285: // private member variables
286: private PriorityQueue baseQueue = new PriorityQueue();
287: private Map processingEntries = new HashMap();
288:
289: /**
290: * The constructor of the
291: */
292: public QueueIndex() {
293:
294: }
295:
296: /**
297: * This method is called to commit the specified transaction.
298: *
299: * @param xid The id of the transaction to commit.
300: * @exception MessageServiceException
301: */
302: public synchronized void commit(Xid xid)
303: throws MessageServiceException {
304: try {
305: TransactionChange changes = (TransactionChange) transactionChange
306: .get(xid);
307: // add entries
308: ArrayList addedEntries = changes.getAddList();
309: for (int index = 0; index < addedEntries.size(); index++) {
310: MessageManager messageManager = (MessageManager) addedEntries
311: .get(index);
312: baseQueue.add(messageManager);
313: }
314:
315: // remove entries
316: ArrayList removedEntries = changes.getRemoveList();
317: for (int index = 0; index < removedEntries.size(); index++) {
318: MessageManager messageManager = (MessageManager) removedEntries
319: .get(index);
320: if (baseQueue.contains(messageManager)) {
321: baseQueue.remove(messageManager);
322: } else if (processingEntries
323: .containsKey(messageManager)) {
324: LockRef lockRef = (LockRef) processingEntries
325: .get(messageManager);
326: processingEntries.remove(messageManager);
327: lockRef.release();
328: }
329: }
330: } catch (Exception ex) {
331: log.error("Failed to commit the transaction : "
332: + ex.getMessage(), ex);
333: throw new MessageServiceException(
334: "Failed to commit the " + "transaction : "
335: + ex.getMessage());
336: }
337: }
338:
339: /**
340: * The transaction has been completed and must be forgotten.
341: *
342: * @param xid The id of the transaction to forget.
343: * @exception MessageServiceException
344: */
345: public synchronized void forget(Xid xid)
346: throws MessageServiceException {
347:
348: }
349:
350: /**
351: * This method is called to roll back the specified transaction.
352: *
353: * @param xid The id of the transaction to roll back.
354: * @exception MessageServiceException
355: */
356: public synchronized void rollback(Xid xid)
357: throws MessageServiceException {
358:
359: }
360:
361: /**
362: * This method is called to start a transaction on a resource manager.
363: *
364: * @param xid The id of the new transaction.
365: * @exception MessageServiceException
366: */
367: public synchronized void start(Xid xid)
368: throws MessageServiceException {
369:
370: }
371:
372: /**
373: * This method returns the next message manager or null.
374: *
375: * @return The reference to message manager.
376: * @param nextRunTime The date wrapper object.
377: * @exception MessageServiceException
378: */
379: public synchronized MessageManager popFrontMessage(
380: Date nextRunTime) throws MessageServiceException {
381: LockRef lockRef = null;
382: try {
383: MessageManager messageManager = (MessageManager) baseQueue
384: .peek();
385: if (messageManager == null) {
386: return null;
387: }
388: try {
389: lockRef = ObjectLockFactory.getInstance()
390: .acquireWriteLock(messageManager,
391: ObjectLockFactory.WAIT_ON_THREAD);
392: } catch (Exception ex) {
393: log.debug(
394: "Cannot aquire a lock on this object because : "
395: + ex.getMessage(), ex);
396: return null;
397: }
398: Date currentDate = new Date();
399: Date nextProcessDate = messageManager.nextProcessTime();
400: if (nextProcessDate == null) {
401: throw new MessageServiceException(
402: "The next process date is invalid cannot be null");
403: } else if (nextProcessDate.getTime() <= currentDate
404: .getTime()) {
405: baseQueue.poll();
406: processingEntries.put(messageManager, lockRef);
407: lockRef = null;
408: return messageManager;
409: }
410: nextRunTime.setTime(nextProcessDate.getTime());
411: return null;
412: } catch (Exception ex) {
413: log.error("Failed to pop a message off the queue : "
414: + ex.getMessage(), ex);
415: throw new MessageServiceException(
416: "Failed to pop a message off the queue : "
417: + ex.getMessage(), ex);
418: } finally {
419: try {
420: if (lockRef != null) {
421: lockRef.release();
422: lockRef = null;
423: }
424: } catch (Exception ex2) {
425: log.error("Failed to release the lock :"
426: + ex2.getMessage(), ex2);
427: }
428: }
429: }
430:
431: /**
432: * This method returns the next message manager or null.
433: *
434: * @return The reference to message manager.
435: * @param nextRunTime The date wrapper object.
436: * @exception MessageServiceException
437: */
438: public synchronized void pushBackMessage(
439: MessageManager messageManager)
440: throws MessageServiceException {
441: try {
442: LockRef lockRef = (LockRef) processingEntries
443: .get(messageManager);
444: if (lockRef == null) {
445: log.error("This message is not locked : "
446: + messageManager.getID());
447: throw new MessageServiceException(
448: "This message is not locked : "
449: + messageManager.getID());
450: }
451: baseQueue.add(messageManager);
452: processingEntries.remove(messageManager);
453: lockRef.release();
454: } catch (MessageServiceException ex) {
455: throw ex;
456: } catch (Exception ex) {
457: log
458: .error(
459: "Failed to push a message back in the queue for "
460: + "processing : "
461: + ex.getMessage(), ex);
462: throw new MessageServiceException(
463: "Failed to push a message back in the queue for "
464: + "processing : " + ex.getMessage(), ex);
465: }
466: }
467:
468: }
469:
470: // the logger reference
471: protected static Logger log = Logger.getLogger(MessageQueue.class
472: .getName());
473:
474: // private member variables
475: private ThreadLocal transactionId = new ThreadLocal();
476: private Map transactionChange = new ConcurrentHashMap();
477: private IDIndex idIndex = new IDIndex();
478: private QueueIndex queueIndex = new QueueIndex();
479: private String name = null;
480:
481: /**
482: * Creates a new instance of MessageQueue
483: */
484: public MessageQueue(String name) {
485: this .name = name;
486: }
487:
488: /**
489: * This method is called to commit the specified transaction.
490: *
491: * @param xid The id of the transaction to commit.
492: * @param onePhase If true a one phase commit should be used.
493: * @exception XAException
494: */
495: public synchronized void commit(Xid xid, boolean onePhase)
496: throws XAException {
497: try {
498: idIndex.commit(xid);
499: queueIndex.commit(xid);
500: transactionChange.remove(xid);
501: ProcessMonitor.getInstance().notifyProcessor();
502: } catch (Exception ex) {
503: log.error("Failed to commit the changes : "
504: + ex.getMessage(), ex);
505: throw new XAException("Failed to commit the changes : "
506: + ex.getMessage());
507: }
508: }
509:
510: /**
511: * The resource manager has dissociated this object from the transaction.
512: *
513: * @param xid The id of the transaction that is getting ended.
514: * @param flags The flags associated with this operation.
515: * @exception XAException
516: */
517: public void end(Xid xid, int flags) throws XAException {
518: }
519:
520: /**
521: * The transaction has been completed and must be forgotten.
522: *
523: * @param xid The id of the transaction to forget.
524: * @exception XAException
525: */
526: public void forget(Xid xid) throws XAException {
527: try {
528: idIndex.forget(xid);
529: queueIndex.forget(xid);
530: transactionChange.remove(xid);
531: } catch (Exception ex) {
532: log.error("Failed to forget the changes : "
533: + ex.getMessage(), ex);
534: throw new XAException("Failed to forget the changes : "
535: + ex.getMessage());
536: }
537: }
538:
539: /**
540: * This method returns the transaction timeout for this object.
541: *
542: * @return The int containing the transaction timeout.
543: * @exception XAException
544: */
545: public int getTransactionTimeout() throws XAException {
546: return -1;
547: }
548:
549: /**
550: * This method returns true if this object is the resource manager getting
551: * queried.
552: *
553: * @return TRUE if this is the resource manager, FALSE if not.
554: * @param xaResource The resource to perform the check against.
555: * @exception XAException
556: */
557: public boolean isSameRM(XAResource xAResource) throws XAException {
558: return this == xAResource;
559: }
560:
561: /**
562: * This is called before a transaction is committed.
563: *
564: * @return The results of the transaction.
565: * @param xid The id of the transaction to check against.
566: * @exception XAException
567: */
568: public int prepare(Xid xid) throws XAException {
569: return XAResource.XA_OK;
570: }
571:
572: /**
573: * This method returns the list of transaction branches for this resource
574: * manager.
575: *
576: * @return The list of resource branches.
577: * @param flags The flags
578: * @exception XAException
579: */
580: public Xid[] recover(int flags) throws XAException {
581: return null;
582: }
583:
584: /**
585: * This method is called to roll back the specified transaction.
586: *
587: * @param xid The id of the transaction to roll back.
588: * @exception XAException
589: */
590: public void rollback(Xid xid) throws XAException {
591: try {
592: idIndex.rollback(xid);
593: queueIndex.rollback(xid);
594: transactionChange.remove(xid);
595: } catch (Exception ex) {
596: log.error("Failed to rollback the changes : "
597: + ex.getMessage(), ex);
598: throw new XAException("Failed to rollback the changes : "
599: + ex.getMessage());
600: }
601: }
602:
603: /**
604: * This method sets the transaction timeout for this resource manager.
605: *
606: * @return TRUE if the transaction timeout can be set successfully.
607: * @param transactionTimeout The new transaction timeout value.
608: * @exception XAException
609: */
610: public boolean setTransactionTimeout(int transactionTimeout)
611: throws XAException {
612: return true;
613: }
614:
615: /**
616: * This method is called to start a transaction on a resource manager.
617: *
618: * @param xid The id of the new transaction.
619: * @param flags The flags associated with the transaction.
620: * @exception XAException
621: */
622: public void start(Xid xid, int flags) throws XAException {
623: try {
624: if (!transactionChange.containsKey(xid)) {
625: transactionChange.put(xid, new TransactionChange());
626: }
627: transactionId.set(xid);
628: idIndex.start(xid);
629: queueIndex.start(xid);
630: } catch (Exception ex) {
631: log.error("Failed to start the transaction : "
632: + ex.getMessage(), ex);
633: throw new XAException("Failed to start the transaction : "
634: + ex.getMessage());
635: }
636: }
637:
638: /**
639: * This method returns the name of the message queue.
640: *
641: * @return The name of the message queue.
642: */
643: public String getName() {
644: return name;
645: }
646:
647: /**
648: * This method adds a message to the message queue.
649: *
650: * @param message The message to add to the message queue.
651: */
652: public void addMessage(MessageManager message)
653: throws MessageServiceException {
654: try {
655: TransactionManager.getInstance().bindResource(this , false);
656: TransactionChange change = (TransactionChange) transactionChange
657: .get(transactionId.get());
658: change.add(message);
659: idIndex.addMessage(message);
660: } catch (Exception ex) {
661: log.error("Failed to add a message : " + ex.getMessage(),
662: ex);
663: throw new MessageServiceException(
664: "Failed to add a message : " + ex.getMessage());
665: }
666: }
667:
668: /**
669: * This method removes the specified message from the list.
670: *
671: * @param messageId The id of the message to remove.
672: */
673: public void removeMessage(String messageId)
674: throws MessageServiceException {
675: try {
676: TransactionManager.getInstance().bindResource(this , false);
677: idIndex.removeMessage(messageId);
678: } catch (Exception ex) {
679: log.error(
680: "Failed to remove a message : " + ex.getMessage(),
681: ex);
682: throw new MessageServiceException(
683: "Failed to remove a message : " + ex.getMessage());
684: }
685: }
686:
687: /**
688: * This method retrieves the specified message from the list.
689: *
690: * @param messageId The id of the message to retrieve.
691: */
692: public MessageManager getMessage(String messageId)
693: throws MessageServiceException {
694: try {
695: TransactionManager.getInstance().bindResource(this , false);
696: return idIndex.getMessage(messageId);
697: } catch (Exception ex) {
698: log.error("Failed to get a message : " + ex.getMessage(),
699: ex);
700: throw new MessageServiceException(
701: "Failed to get a message : " + ex.getMessage());
702: }
703: }
704:
705: /**
706: * This method returns the next message manager or null.
707: *
708: * @return The reference to message manager.
709: * @param nextRunTime The date wrapper object.
710: * @exception MessageServiceException
711: */
712: public MessageManager popFrontMessage(Date nextRunTime)
713: throws MessageServiceException {
714: return queueIndex.popFrontMessage(nextRunTime);
715: }
716:
717: /**
718: * This method returns the next message manager or null.
719: *
720: * @return The reference to message manager.
721: * @param nextRunTime The date wrapper object.
722: * @exception MessageServiceException
723: */
724: public void pushBackMessage(MessageManager messageManager)
725: throws MessageServiceException {
726: queueIndex.pushBackMessage(messageManager);
727: }
728: }
|