001: /*
002: * MessageService: The message service daemon
003: * Copyright (C) 2006 Rift IT Contracting
004: *
005: * This library is free software; you can redistribute it and/or
006: * modify it under the terms of the GNU Lesser General Public
007: * License as published by the Free Software Foundation; either
008: * version 2.1 of the License, or (at your option) any later version.
009: *
010: * This library is distributed in the hope that it will be useful,
011: * but WITHOUT ANY WARRANTY; without even the implied warranty of
012: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
013: * Lesser General Public License for more details.
014: *
015: * You should have received a copy of the GNU Lesser General Public
016: * License along with this library; if not, write to the Free Software
017: * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
018: *
019: * MessageServiceImpl.java
020: */
021:
022: // package path
023: package com.rift.coad.daemon.messageservice;
024:
025: // java imports
026: import com.rift.coad.lib.thread.pool.PoolException;
027: import java.util.List;
028: import java.util.ArrayList;
029: import java.util.Iterator;
030: import java.rmi.Remote;
031: import java.rmi.RemoteException;
032: import javax.naming.InitialContext;
033: import javax.naming.Context;
034: import javax.transaction.UserTransaction;
035: import javax.transaction.TransactionManager;
036: import javax.transaction.Transaction;
037:
038: // logging import
039: import org.apache.log4j.Logger;
040:
041: // hibernate imports
042: import org.hibernate.*;
043: import org.hibernate.cfg.*;
044:
045: // coadunation imports
046: import com.rift.coad.lib.bean.BeanRunnable;
047: import com.rift.coad.lib.configuration.Configuration;
048: import com.rift.coad.lib.configuration.ConfigurationFactory;
049: import com.rift.coad.lib.thread.ThreadStateMonitor;
050: import com.rift.coad.lib.thread.pool.ThreadPoolManager;
051: import com.rift.coad.util.change.ChangeLog;
052: import com.rift.coad.util.transaction.UserTransactionWrapper;
053: import com.rift.coad.hibernate.util.HibernateUtil;
054: import com.rift.coad.daemon.messageservice.db.*;
055: import com.rift.coad.daemon.messageservice.message.MessageManagerFactory;
056: import com.rift.coad.daemon.messageservice.message.MessageManagerImpl;
057: import com.rift.coad.daemon.messageservice.named.NamedMemoryQueue;
058:
059: /**
060: * The implementation of the Message Service management interface.
061: *
062: * @author Brett Chaldecott
063: */
064: public class MessageServiceManager implements
065: MessageServiceManagerMBean, BeanRunnable {
066:
067: // class constants
068: private final static String THREAD_POOL_SIZE = "thread_pool_size";
069: private final static int DEFAULT_THREAD_POOL_SIZE = 10;
070: private final static String THREAD_POOL_USER = "thread_pool_user";
071:
072: // the logger reference
073: protected Logger log = Logger.getLogger(MessageServiceManager.class
074: .getName());
075:
076: // private member variables
077: private ThreadStateMonitor state = new ThreadStateMonitor();
078: private Configuration config = null;
079: private Context context = null;
080: private ThreadPoolManager threadPoolManager = null;
081: private UserTransactionWrapper utw = null;
082: private List initialEntries = null;
083:
084: /**
085: * Creates a new instance of MessageServiceImpl
086: *
087: * @exception MessageServiceException
088: */
089: public MessageServiceManager() throws MessageServiceException {
090: try {
091: config = ConfigurationFactory.getInstance().getConfig(
092: MessageServiceManager.class);
093: context = new InitialContext();
094: utw = new UserTransactionWrapper();
095: log.info("Reading in and applying change log information, "
096: + "this may take some time");
097: ChangeLog.init(MessageServiceManager.class);
098: initialEntries = getDbMessageList();
099: threadPoolManager = new ThreadPoolManager(
100: (int) config.getLong(THREAD_POOL_SIZE,
101: DEFAULT_THREAD_POOL_SIZE),
102: MessageProcessor.class, config
103: .getString(THREAD_POOL_USER));
104: } catch (Exception ex) {
105: log.error("Failed to instanciate the "
106: + "message service : " + ex.getMessage(), ex);
107: throw new MessageServiceException(
108: "Failed to instanciate the " + "message service : "
109: + ex.getMessage(), ex);
110: }
111:
112: }
113:
114: /**
115: * This method returns the thread pool size.
116: *
117: * @return The size of the thread pool.
118: * @exception RemoteException
119: * @exception MessageServiceException
120: */
121: public int getThreadPoolSize() throws RemoteException,
122: MessageServiceException {
123: return threadPoolManager.getSize();
124: }
125:
126: /**
127: * This method sets the size of the thread pool.
128: *
129: * @param size The new size of the thread pool.
130: * @exception RemoteException
131: * @exception MessageServiceException
132: */
133: public void setThreadPoolSize(int size) throws RemoteException,
134: MessageServiceException {
135: try {
136: threadPoolManager.setSize(size);
137: } catch (Exception ex) {
138: log
139: .error("Failed to set the size : "
140: + ex.getMessage(), ex);
141: throw new MessageServiceException(
142: "Failed to set the size : " + ex.getMessage(), ex);
143: }
144: }
145:
146: /**
147: * This method lists the named queues.
148: *
149: * @return The list of named queues.
150: * @exception RemoteException
151: * @exception MessageServiceException
152: */
153: public List listNamedQueues() throws RemoteException,
154: MessageServiceException {
155: return NamedMemoryQueue.listQueues();
156: }
157:
158: /**
159: * This method returns the list of messages in the named queue.
160: *
161: * @return The list of messages for this queue.
162: * @param queueName The name of the queue to list messages for.
163: * @exception RemoteException
164: * @exception MessageServiceException
165: */
166: public List listMessagesForNamedQueue(String queueName)
167: throws RemoteException, MessageServiceException {
168: List namedQueues = NamedMemoryQueue.listQueues();
169: if (!namedQueues.contains(queueName)) {
170: throw new MessageServiceException("The queue [" + queueName
171: + "] does not exist.");
172: }
173: return NamedMemoryQueue.getInstance(queueName).getMessages();
174: }
175:
176: /**
177: * This purges the messages from the named queue
178: *
179: * @param queueName The name of the queue to purge.
180: * @exception RemoteException
181: * @exception MessageServiceException
182: */
183: public void purgeNamedQueue(String queueName)
184: throws RemoteException, MessageServiceException {
185: List namedQueues = NamedMemoryQueue.listQueues();
186: if (!namedQueues.contains(queueName)) {
187: throw new MessageServiceException("The queue [" + queueName
188: + "] does not exist.");
189: }
190: NamedMemoryQueue.getInstance(queueName).purge();
191: }
192:
193: /**
194: * This method is responsible for performing the processing.
195: */
196: public void process() {
197: try {
198: for (Iterator iter = initialEntries.iterator(); iter
199: .hasNext();) {
200: String messageId = (String) iter.next();
201: try {
202: log.info("Load message : " + messageId);
203: utw.begin();
204: Session session = HibernateUtil.getInstance(
205: MessageServiceManager.class).getSession();
206: com.rift.coad.daemon.messageservice.db.Message message = (com.rift.coad.daemon.messageservice.db.Message) session
207: .get(
208: com.rift.coad.daemon.messageservice.db.Message.class,
209: messageId);
210: MessageManager messageManager = MessageManagerFactory
211: .getInstance().getMessageManager(messageId);
212: MessageQueue messageQueue = MessageQueueManager
213: .getInstance().getQueue(
214: MessageQueueManager.UNSORTED);
215: messageQueue.addMessage(messageManager);
216: utw.commit();
217: ProcessMonitor.getInstance().notifyProcessor();
218: } catch (Exception ex) {
219: log.error("Failed to retrieve the message : "
220: + ex.getMessage(), ex);
221: } finally {
222: utw.release();
223: }
224: }
225: try {
226: ChangeLog.getInstance().start();
227: } catch (Exception ex) {
228: log.error(
229: "Failed to start the change log processing : "
230: + ex.getMessage(), ex);
231: }
232: while (!state.isTerminated()) {
233:
234: // wait indefinitly
235: state.monitor();
236: }
237: } catch (Exception ex) {
238: log.error(
239: "The processing failed in the message service because : "
240: + ex.getMessage(), ex);
241: }
242: }
243:
244: /**
245: * This method is called to terminate the processing of this object.
246: */
247: public void terminate() {
248: try {
249: threadPoolManager.terminate();
250: } catch (PoolException ex) {
251: log.error("Failed to terminate the thread pool : "
252: + ex.getMessage(), ex);
253: }
254: state.terminate(true);
255: try {
256: ProcessMonitor.getInstance().terminate();
257: } catch (Exception ex) {
258: log.error("Failed to terminate the processor: "
259: + ex.getMessage(), ex);
260: }
261: try {
262: log.info("Waiting for all changes to be dumped");
263: ChangeLog.terminate();
264: log.info("Changes have been dumped.");
265: } catch (Exception ex) {
266: log.error("Failed to shut down the change log : "
267: + ex.getMessage(), ex);
268: }
269: }
270:
271: /**
272: * This method returns a list db entries.
273: */
274: private List getDbMessageList() {
275: boolean startedTransaction = false;
276: List dbEntries = new ArrayList();
277: try {
278: utw.begin();
279: startedTransaction = true;
280: Session session = HibernateUtil.getInstance(
281: MessageServiceManager.class).getSession();
282: List messages = session.createQuery(
283: "FROM Message as message").list();
284: for (Iterator iter = messages.iterator(); iter.hasNext();) {
285: com.rift.coad.daemon.messageservice.db.Message msg = (com.rift.coad.daemon.messageservice.db.Message) iter
286: .next();
287: dbEntries.add(msg.getId());
288: }
289:
290: utw.commit();
291: startedTransaction = false;
292: } catch (Exception ex) {
293: log.error(
294: "Failed to load the list of messages from the db : "
295: + ex.getMessage(), ex);
296: } finally {
297: utw.release();
298: }
299: return dbEntries;
300: }
301: }
|