001: /*
002: * <Add library description here>
003: * Copyright (C) 2006 Rift IT Contracting
004: *
005: * This library is free software; you can redistribute it and/or
006: * modify it under the terms of the GNU Lesser General Public
007: * License as published by the Free Software Foundation; either
008: * version 2.1 of the License, or (at your option) any later version.
009: *
010: * This library is distributed in the hope that it will be useful,
011: * but WITHOUT ANY WARRANTY; without even the implied warranty of
012: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
013: * Lesser General Public License for more details.
014: *
015: * You should have received a copy of the GNU Lesser General Public
016: * License along with this library; if not, write to the Free Software
017: * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
018: *
019: * NamedQueueImpl.java
020: */
021:
022: package com.rift.coad.daemon.messageservice.named;
023:
024: // java imports
025: import java.rmi.Remote;
026: import java.rmi.RemoteException;
027: import java.util.ArrayList;
028: import java.util.Date;
029: import java.util.List;
030: import java.util.HashMap;
031: import java.util.Map;
032: import java.util.Vector;
033: import java.util.concurrent.ConcurrentHashMap;
034: import java.util.concurrent.ConcurrentLinkedQueue;
035: import javax.naming.Context;
036: import javax.naming.InitialContext;
037: import javax.transaction.SystemException;
038: import javax.transaction.UserTransaction;
039: import javax.transaction.Status;
040: import javax.transaction.xa.XAException;
041: import javax.transaction.xa.XAResource;
042: import javax.transaction.xa.Xid;
043:
044: // logging import
045: import org.apache.log4j.Logger;
046:
047: // hibernate imports
048: import org.hibernate.*;
049: import org.hibernate.cfg.*;
050:
051: // coadunation imports
052: import com.rift.coad.daemon.messageservice.db.*;
053: import com.rift.coad.daemon.messageservice.Message;
054: import com.rift.coad.daemon.messageservice.MessageManager;
055: import com.rift.coad.daemon.messageservice.MessageQueue;
056: import com.rift.coad.daemon.messageservice.MessageQueueManager;
057: import com.rift.coad.daemon.messageservice.MessageServiceManagerMBean;
058: import com.rift.coad.daemon.messageservice.MessageServiceException;
059: import com.rift.coad.daemon.messageservice.MessageServiceManager;
060: import com.rift.coad.daemon.messageservice.NamedQueue;
061: import com.rift.coad.daemon.messageservice.TimeoutException;
062: import com.rift.coad.daemon.messageservice.message.MessageImpl;
063: import com.rift.coad.daemon.messageservice.message.MessageManagerFactory;
064: import com.rift.coad.daemon.messageservice.message.MessageManagerImpl;
065: import com.rift.coad.daemon.servicebroker.ServiceBroker;
066: import com.rift.coad.hibernate.util.HibernateUtil;
067: import com.rift.coad.lib.configuration.Configuration;
068: import com.rift.coad.lib.configuration.ConfigurationFactory;
069: import com.rift.coad.lib.Resource;
070: import com.rift.coad.lib.ResourceIndex;
071: import com.rift.coad.util.transaction.TransactionManager;
072: import com.rift.coad.util.lock.LockRef;
073: import com.rift.coad.util.lock.ObjectLockFactory;
074: import com.rift.coad.util.connection.ConnectionManager;
075:
076: /**
077: * The implementation of the Queue. This object represents a named database
078: * queue.
079: *
080: * @author Brett Chaldecott
081: */
082: public class NamedQueueImpl implements NamedQueue, ResourceIndex,
083: Resource {
084:
085: // class constants
086: private final static String TIMEOUT = "QUEUE_TIMEOUT";
087: private final static long DEFAULT_TIMEOUT = 30000;
088:
089: // the logger reference
090: protected Logger log = Logger.getLogger(NamedQueueImpl.class
091: .getName());
092:
093: // private member variables
094: private String queueName = null;
095: private long maxTimeout = DEFAULT_TIMEOUT;
096:
097: /**
098: * The constructor of the queue.
099: *
100: * @param queueName The name of the queue.
101: * @exception MessageServiceException
102: */
103: public NamedQueueImpl(String queueName)
104: throws MessageServiceException {
105: try {
106: this .queueName = queueName;
107: Configuration config = ConfigurationFactory.getInstance()
108: .getConfig(this .getClass());
109: maxTimeout = config.getLong(TIMEOUT, DEFAULT_TIMEOUT);
110: } catch (Exception ex) {
111: log.error("Failed to init the Named Queue : "
112: + ex.getMessage(), ex);
113: throw new MessageServiceException(
114: "Failed to init the Named Queue : "
115: + ex.getMessage(), ex);
116: }
117: }
118:
119: /**
120: * This method returns a message for processing. If that message is not
121: * acknowledged by the target with in a configured time it will be made
122: * available for processing again.
123: *
124: * @return The reference to the Message for processing.
125: * @param delay The delay before returning a null reference.
126: * @exception RemoteException
127: * @exception MessageServiceException
128: * @exception TimeoutException
129: */
130: public Message receive(long delay) throws RemoteException,
131: MessageServiceException, TimeoutException {
132: try {
133: return NamedMemoryQueue.getInstance(queueName).poll(delay);
134: } catch (Throwable ex) {
135: log.error(
136: "Failed to retrieve message : " + ex.getMessage(),
137: ex);
138: throw new MessageServiceException(
139: "Failed to retrieve message : " + ex.getMessage(),
140: ex);
141: }
142: }
143:
144: /**
145: * This method adds a service to the list of services used to identify this
146: * queue, by the service broker.
147: *
148: * @param service The string containing the service name.
149: * @exception RemoteException
150: * @exception MessageServiceException
151: */
152: public void addService(String service) throws RemoteException,
153: MessageServiceException {
154: try {
155: Session session = HibernateUtil.getInstance(
156: MessageServiceManager.class).getSession();
157: com.rift.coad.daemon.messageservice.db.MessageQueue mq = (com.rift.coad.daemon.messageservice.db.MessageQueue) session
158: .get(
159: com.rift.coad.daemon.messageservice.db.MessageQueue.class,
160: queueName);
161: MessageQueueService messageQueueService = new MessageQueueService(
162: service, mq);
163: session.persist(messageQueueService);
164: ServiceBroker broker = (ServiceBroker) ConnectionManager
165: .getInstance().getConnection(ServiceBroker.class,
166: "ServiceBroker");
167: List serviceList = new ArrayList();
168: serviceList.add(service);
169: broker.registerService(MessageServiceManagerMBean.JNDI_URL,
170: serviceList);
171: } catch (Exception ex) {
172: log.error("Failed to add a service : " + ex.getMessage(),
173: ex);
174: throw new MessageServiceException(
175: "Failed to add a service : " + ex.getMessage(), ex);
176: }
177: }
178:
179: /**
180: * This method returns a list of services used to identify this queue to the
181: * service broker.
182: *
183: * @return The list of service used to identify this queue to the service
184: * broker.
185: * @exception RemoteException
186: * @exception MessageServiceException
187: */
188: public List listServices() throws RemoteException,
189: MessageServiceException {
190: try {
191: Session session = HibernateUtil.getInstance(
192: MessageServiceManager.class).getSession();
193: List entries = session.createQuery(
194: "SELECT mqs.service FROM MessageQueueService as mqs "
195: + "WHERE mqs.messageQueue.named = ?")
196: .setString(0, this .queueName).list();
197: List result = new ArrayList();
198: for (int index = 0; index < entries.size(); index++) {
199: result.add(((Object[]) entries.get(index))[index]);
200: }
201: return result;
202: } catch (Exception ex) {
203: log.error("Failed to add a service : " + ex.getMessage(),
204: ex);
205: throw new MessageServiceException(
206: "Failed to add a service : " + ex.getMessage(), ex);
207: }
208: }
209:
210: /**
211: * This method removes a service from the list of services.
212: *
213: * @param service The name of the service to remove.
214: * @exception RemoteException
215: * @exception MessageServiceException
216: */
217: public void removeService(String service) throws RemoteException,
218: MessageServiceException {
219: try {
220: Session session = HibernateUtil.getInstance(
221: MessageServiceManager.class).getSession();
222: session
223: .createQuery(
224: "DELETE FROM MessageQueueService as mqs "
225: + "WHERE mqs.service = ? AND mqs.messageQueue.named = ?")
226: .setString(0, service).setString(1, this .queueName)
227: .executeUpdate();
228: ServiceBroker broker = (ServiceBroker) ConnectionManager
229: .getInstance().getConnection(ServiceBroker.class,
230: "ServiceBroker");
231: List mqs = session.createQuery(
232: "FROM MessageQueueService as mqs "
233: + "WHERE mqs.service = ?").setString(0,
234: service).list();
235: if (mqs.size() == 0) {
236: List serviceList = new ArrayList();
237: serviceList.add(service);
238: broker.removeServiceProviders(
239: MessageServiceManagerMBean.JNDI_URL,
240: serviceList);
241: }
242: } catch (Exception ex) {
243: log.error(
244: "Failed to remove a service : " + ex.getMessage(),
245: ex);
246: throw new MessageServiceException(
247: "Failed to remove a service : " + ex.getMessage(),
248: ex);
249: }
250: }
251:
252: /**
253: * This method returns the primary key of this resource to enable
254: * indexing.
255: *
256: * @return The primary key of this object.
257: */
258: public Object getPrimaryKey() {
259: return queueName;
260: }
261:
262: /**
263: * This method returns the name of the resource.
264: *
265: * @return The string containing the name of the resource.
266: */
267: public String getResourceName() {
268: return queueName;
269: }
270:
271: /**
272: * This method will be called to release the resources controlled by
273: * this object.
274: *
275: * @param This method adds a new resource.
276: */
277: public void releaseResource() {
278:
279: }
280:
281: }
|