001: /*
002: * MessageService: The message service daemon
003: * Copyright (C) 2007 Rift IT Contracting
004: *
005: * This library is free software; you can redistribute it and/or
006: * modify it under the terms of the GNU Lesser General Public
007: * License as published by the Free Software Foundation; either
008: * version 2.1 of the License, or (at your option) any later version.
009: *
010: * This library is distributed in the hope that it will be useful,
011: * but WITHOUT ANY WARRANTY; without even the implied warranty of
012: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
013: * Lesser General Public License for more details.
014: *
015: * You should have received a copy of the GNU Lesser General Public
016: * License along with this library; if not, write to the Free Software
017: * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
018: *
019: * NamedMemoryQueue.java
020: */
021:
022: // package path
023: package com.rift.coad.daemon.messageservice.named;
024:
025: // java imports
026: import java.util.ArrayList;
027: import java.util.Date;
028: import java.util.Map;
029: import java.util.HashMap;
030: import java.util.Iterator;
031: import java.util.List;
032: import java.util.Queue;
033: import java.util.concurrent.ConcurrentLinkedQueue;
034: import java.util.concurrent.ConcurrentHashMap;
035: import javax.transaction.xa.XAException;
036: import javax.transaction.xa.XAResource;
037: import javax.transaction.xa.Xid;
038:
039: // hibernate imports
040: import org.hibernate.*;
041: import org.hibernate.cfg.*;
042:
043: // logging import
044: import org.apache.log4j.Logger;
045:
046: // coadunation imports
047: import com.rift.coad.daemon.messageservice.Message;
048: import com.rift.coad.daemon.messageservice.MessageServiceException;
049: import com.rift.coad.daemon.messageservice.MessageManager;
050: import com.rift.coad.daemon.messageservice.MessageServiceManager;
051: import com.rift.coad.daemon.messageservice.ProcessMonitor;
052: import com.rift.coad.daemon.messageservice.message.MessageManagerFactory;
053: import com.rift.coad.daemon.messageservice.db.*;
054: import com.rift.coad.hibernate.util.HibernateUtil;
055: import com.rift.coad.util.transaction.TransactionManager;
056: import com.rift.coad.util.transaction.UserTransactionWrapper;
057:
058: /**
059: * This object is responsible for acting as the none volitile part of the named
060: * queue.
061: *
062: * @author Brett Chaldecott
063: */
064: public class NamedMemoryQueue implements XAResource {
065:
066: /**
067: * This object tracks the changes to a
068: */
069: public class Changes {
070:
071: // private member variables
072: private List removeList = new ArrayList();
073: private List addList = new ArrayList();
074:
075: /**
076: * The changes made in a transaction
077: */
078: public Changes() {
079: }
080:
081: /**
082: * This method adds an id to the list.
083: */
084: public void addRemoveMessage(MessageManager messageManager) {
085: removeList.add(messageManager);
086: }
087:
088: /**
089: * This method returns the ids of the messages.
090: *
091: * @return The list of ids.
092: */
093: public List getRemoveList() {
094: return removeList;
095: }
096:
097: /**
098: * This method add a message to the list of messages
099: */
100: public void addNewMessage(MessageManager messageManager) {
101: addList.add(messageManager);
102: }
103:
104: /**
105: * This method returns the list of message that have been added in the
106: * transaction.
107: *
108: * @return The list of ids.
109: */
110: public List getAddList() {
111: return addList;
112: }
113: }
114:
115: // singleton methods
116: private static Map singletons = new ConcurrentHashMap();
117: private static Map keyIndex = new HashMap();
118:
119: protected static Logger log = Logger
120: .getLogger(NamedMemoryQueue.class.getName());
121:
122: // private member variables
123: private String queueName = null;
124: private Queue queue = new ConcurrentLinkedQueue();
125: private UserTransactionWrapper utw = null;
126: private Map changes = new ConcurrentHashMap();
127: private ThreadLocal currentTransaction = new ThreadLocal();
128:
129: /**
130: * Creates a new instance of QueueMemoryIndex
131: *
132: * @param queueName The name of the queue to instanciate.
133: * @exception MessageServiceException
134: */
135: public NamedMemoryQueue(String queueName)
136: throws MessageServiceException {
137: this .queueName = queueName;
138: }
139:
140: /**
141: * This object is responsible for returning returning an instance of the
142: * named memory queue identified by the queuename. If not found it returns
143: * null.
144: *
145: * @return The reference to the named memory queueu.
146: * @param queueName The name of the queue to retrieve.
147: * @exception MessageServiceException
148: */
149: public static NamedMemoryQueue getInstance(String queueName)
150: throws MessageServiceException {
151: Object syncObj = getSyncObject(queueName);
152: synchronized (syncObj) {
153: NamedMemoryQueue singleton = (NamedMemoryQueue) singletons
154: .get(queueName);
155: if (singleton == null) {
156: singleton = new NamedMemoryQueue(queueName);
157: singletons.put(queueName, singleton);
158: }
159: return singleton;
160: }
161: }
162:
163: /**
164: * This method lists the queues currently in memory.
165: *
166: * @param list The list of named queues.
167: * @exception MessageServiceException
168: */
169: public static List listQueues() throws MessageServiceException {
170: return new ArrayList(singletons.keySet());
171: }
172:
173: /**
174: * This method returns the synchronization key.
175: *
176: * @return The reference to the object that the synchronization can be done
177: * on.
178: * @param queueName The name of the queue.
179: */
180: private static synchronized Object getSyncObject(String queueName) {
181: Object syncObj = keyIndex.get(queueName);
182: if (syncObj == null) {
183: syncObj = new String(queueName);
184: keyIndex.put(queueName, syncObj);
185: }
186: return syncObj;
187: }
188:
189: /**
190: * This method adds a message to the queue.
191: *
192: * @param messageManager The message manager.
193: */
194: public void addMessage(MessageManager messageManager)
195: throws MessageServiceException {
196: try {
197: TransactionManager.getInstance().bindResource(this , false);
198: ((Changes) currentTransaction.get())
199: .addNewMessage(messageManager);
200: } catch (Exception ex) {
201: log.error("Failed to add a message : " + ex.getMessage(),
202: ex);
203: throw new MessageServiceException(
204: "Failed to add a message : " + ex.getMessage(), ex);
205: }
206: }
207:
208: /**
209: * This method returns the first message on the list.
210: */
211: public synchronized Message poll(long delay)
212: throws MessageServiceException {
213: try {
214: Date startTime = new Date();
215: while (queue.size() == 0) {
216: Date currentTime = new Date();
217: long difference = (startTime.getTime() + delay)
218: - currentTime.getTime();
219: if (difference <= 0) {
220: return null;
221: }
222: wait(difference);
223: }
224: TransactionManager.getInstance().bindResource(this , false);
225: MessageManager messageManager = (MessageManager) queue
226: .poll();
227: Message message = messageManager.getMessage();
228: messageManager.remove();
229: ((Changes) currentTransaction.get())
230: .addRemoveMessage(messageManager);
231: log.debug("Return the message : " + message.getMessageId());
232: return message;
233: } catch (MessageServiceException ex) {
234: log.error("Failed to poll for a message : "
235: + ex.getMessage(), ex);
236: throw ex;
237: } catch (Exception ex) {
238: log.error("Failed to poll for a message : "
239: + ex.getMessage(), ex);
240: throw new MessageServiceException(
241: "Failed to poll for a message : " + ex.getMessage(),
242: ex);
243: } catch (Throwable ex) {
244: log.error("Caught an unexpected exception : "
245: + ex.getMessage(), ex);
246: throw new MessageServiceException(
247: "Caught an unexpected exception : "
248: + ex.getMessage(), ex);
249: }
250: }
251:
252: /**
253: * This method returns the list of messages for this queue.
254: *
255: * @return The list of messages in the queue.
256: * @exception MessageServiceException
257: */
258: public synchronized List getMessages()
259: throws MessageServiceException {
260: try {
261: List list = new ArrayList();
262: for (Iterator iter = queue.iterator(); iter.hasNext();) {
263: MessageManager messageManager = (MessageManager) iter
264: .next();
265: list.add(messageManager.getMessage());
266: }
267: return list;
268: } catch (Exception ex) {
269: log.error("Failed to retrieve the list of messages : "
270: + ex.getMessage(), ex);
271: throw new MessageServiceException(
272: "Failed to retrieve the list of messages : "
273: + ex.getMessage(), ex);
274: }
275: }
276:
277: /**
278: * This method is responsible for purging the contents of the named message
279: * queue.
280: *
281: * @exception MessageServiceException
282: */
283: public synchronized void purge() throws MessageServiceException {
284: try {
285: for (Iterator iter = queue.iterator(); iter.hasNext();) {
286: MessageManager messageManager = (MessageManager) iter
287: .next();
288: messageManager.remove();
289: }
290: queue.clear();
291: } catch (Exception ex) {
292: log.error("Failed to purge the queue : " + ex.getMessage(),
293: ex);
294: throw new MessageServiceException(
295: "Failed to purge the queue : " + ex.getMessage(),
296: ex);
297: }
298: }
299:
300: /**
301: * This method is called to commit the specified transaction.
302: *
303: * @param xid The id of the transaction to commit.
304: * @param onePhase If true a one phase commit should be used.
305: * @exception XAException
306: */
307: public void commit(Xid xid, boolean b) throws XAException {
308: Changes changes = (Changes) this .changes.remove(xid);
309: for (Iterator iter = changes.getAddList().iterator(); iter
310: .hasNext();) {
311: queue.add(iter.next());
312: }
313: synchronized (this ) {
314: notifyAll();
315: }
316: }
317:
318: /**
319: * The resource manager has dissociated this object from the transaction.
320: *
321: * @param xid The id of the transaction that is getting ended.
322: * @param flags The flags associated with this operation.
323: * @exception XAException
324: */
325: public void end(Xid xid, int i) throws XAException {
326: }
327:
328: /**
329: * The transaction has been completed and must be forgotten.
330: *
331: * @param xid The id of the transaction to forget.
332: * @exception XAException
333: */
334: public void forget(Xid xid) throws XAException {
335: changes.remove(xid);
336: }
337:
338: /**
339: * This method returns the transaction timeout for this object.
340: *
341: * @return The int containing the transaction timeout.
342: * @exception XAException
343: */
344: public int getTransactionTimeout() throws XAException {
345: return -1;
346: }
347:
348: /**
349: * This method returns true if this object is the resource manager getting
350: * queried.
351: *
352: * @return TRUE if this is the resource manager, FALSE if not.
353: * @param xaResource The resource to perform the check against.
354: * @exception XAException
355: */
356: public boolean isSameRM(XAResource xAResource) throws XAException {
357: return this == xAResource;
358: }
359:
360: /**
361: * This is called before a transaction is committed.
362: *
363: * @return The results of the transaction.
364: * @param xid The id of the transaction to check against.
365: * @exception XAException
366: */
367: public int prepare(Xid xid) throws XAException {
368: return XAResource.XA_OK;
369: }
370:
371: /**
372: * This method returns the list of transaction branches for this resource
373: * manager.
374: *
375: * @return The list of resource branches.
376: * @param flags The flags
377: * @exception XAException
378: */
379: public Xid[] recover(int i) throws XAException {
380: return null;
381: }
382:
383: /**
384: * This method is called to roll back the specified transaction.
385: *
386: * @param xid The id of the transaction to roll back.
387: * @exception XAException
388: */
389: public void rollback(Xid xid) throws XAException {
390: Changes changes = (Changes) this .changes.get(xid);
391: if (changes == null) {
392: return;
393: }
394: for (Iterator iter = changes.getRemoveList().iterator(); iter
395: .hasNext();) {
396: queue.add(iter.next());
397: }
398: synchronized (this ) {
399: notifyAll();
400: }
401: }
402:
403: /**
404: * This method sets the transaction timeout for this resource manager.
405: *
406: * @return TRUE if the transaction timeout can be set successfully.
407: * @param transactionTimeout The new transaction timeout value.
408: * @exception XAException
409: */
410: public boolean setTransactionTimeout(int i) throws XAException {
411: return true;
412: }
413:
414: /**
415: * This method is called to start a transaction on a resource manager.
416: *
417: * @param xid The id of the new transaction.
418: * @param flags The flags associated with the transaction.
419: * @exception XAException
420: */
421: public void start(Xid xid, int i) throws XAException {
422: Changes changes = (Changes) this .changes.get(xid);
423: if (changes == null) {
424: changes = new Changes();
425: this.changes.put(xid, changes);
426: }
427: currentTransaction.set(changes);
428: }
429: }
|