001: /*
002: * Copyright 2004 Outerthought bvba and Schaubroeck nv
003: *
004: * Licensed under the Apache License, Version 2.0 (the "License");
005: * you may not use this file except in compliance with the License.
006: * You may obtain a copy of the License at
007: *
008: * http://www.apache.org/licenses/LICENSE-2.0
009: *
010: * Unless required by applicable law or agreed to in writing, software
011: * distributed under the License is distributed on an "AS IS" BASIS,
012: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013: * See the License for the specific language governing permissions and
014: * limitations under the License.
015: */
016: package org.outerj.daisy.event;
017:
018: import org.apache.avalon.framework.configuration.Configuration;
019: import org.apache.avalon.framework.configuration.ConfigurationException;
020: import org.apache.commons.logging.Log;
021: import org.apache.commons.logging.LogFactory;
022: import org.outerj.daisy.jms.JmsClient;
023: import org.outerj.daisy.jms.Sender;
024:
025: import javax.jms.*;
026: import javax.sql.DataSource;
027: import javax.annotation.PreDestroy;
028: import java.sql.Connection;
029: import java.sql.PreparedStatement;
030: import java.sql.ResultSet;
031: import java.util.ArrayList;
032: import java.util.List;
033:
034: public class EventDispatcherImpl implements EventDispatcher {
035: private EventDispatchThread eventDispatchThread;
036: private DataSource dataSource;
037: private String jmsTopicName;
038: private JmsClient jmsClient;
039: private Sender topicSender;
040: private final Log log = LogFactory.getLog(getClass());
041:
042: public EventDispatcherImpl(Configuration configuration,
043: DataSource dataSource, JmsClient jmsClient)
044: throws Exception {
045: this .dataSource = dataSource;
046: this .jmsClient = jmsClient;
047: this .configure(configuration);
048: this .initialize();
049: this .start();
050: }
051:
052: @PreDestroy
053: public void destroy() throws Exception {
054: this .stop();
055: }
056:
057: private void configure(Configuration configuration)
058: throws ConfigurationException {
059: this .jmsTopicName = configuration.getChild("jmsTopic")
060: .getValue();
061: }
062:
063: public void notifyNewEvents() {
064: eventDispatchThread.notify();
065: }
066:
067: private void initialize() throws Exception {
068: topicSender = jmsClient.getSender(jmsTopicName);
069: }
070:
071: private void start() throws Exception {
072: eventDispatchThread = new EventDispatchThread();
073: eventDispatchThread.setDaemon(true);
074: eventDispatchThread.start();
075: }
076:
077: private void stop() throws Exception {
078: log.info("Waiting for event dispatcher thread to end.");
079: eventDispatchThread.interrupt();
080: try {
081: eventDispatchThread.join();
082: } catch (InterruptedException e) {
083: // ignore
084: }
085: jmsClient.unregisterSender(topicSender);
086: }
087:
088: private class EventDispatchThread extends Thread {
089: public EventDispatchThread() {
090: super ("Daisy event dispatcher");
091: }
092:
093: public synchronized void run() {
094: try {
095: while (true) {
096: Connection conn = null;
097: PreparedStatement stmt = null;
098: PreparedStatement messageStmt = null;
099: PreparedStatement removeEventStmt = null;
100: try {
101: conn = dataSource.getConnection();
102:
103: stmt = conn
104: .prepareStatement("select seqnr from events order by seqnr");
105: ResultSet rs = stmt.executeQuery();
106: List<Long> seqnrsToProcess = new ArrayList<Long>();
107:
108: while (rs.next()) {
109: seqnrsToProcess
110: .add(new Long(rs.getLong(1)));
111: }
112: stmt.close();
113:
114: messageStmt = conn
115: .prepareStatement("select message_type, message from events where seqnr = ?");
116: removeEventStmt = conn
117: .prepareStatement("delete from events where seqnr = ?");
118:
119: for (Long seqnrsToProces : seqnrsToProcess) {
120: // Check if we don't want to stop
121: if (Thread.interrupted())
122: return;
123: long seqnr = seqnrsToProces.longValue();
124:
125: messageStmt.setLong(1, seqnr);
126: rs = messageStmt.executeQuery();
127: rs.next();
128: String messageType = rs.getString(1);
129: String message = rs.getString(2);
130: rs.close();
131:
132: if (log.isDebugEnabled())
133: log.debug("Will forward message "
134: + seqnr + " to JMS.");
135:
136: Message jmsMessage = topicSender
137: .createTextMessage(message);
138: jmsMessage.setStringProperty("type",
139: messageType);
140:
141: // Again check if we don't want to stop, in an attempt to avoid a forever-wait
142: // condition in ActiveMQ when trying to send a message while the VM is shutting down.
143: if (Thread.interrupted())
144: return;
145: topicSender.send(jmsMessage);
146:
147: removeEventStmt.setLong(1, seqnr);
148: removeEventStmt.execute();
149: }
150: } catch (Throwable e) {
151: if (e instanceof InterruptedException) {
152: return;
153: } else {
154: log.error("Exception in event dispatcher.",
155: e);
156: }
157: } finally {
158: closeStatement(stmt);
159: closeStatement(messageStmt);
160: closeStatement(removeEventStmt);
161: try {
162: if (conn != null)
163: conn.close();
164: } catch (Throwable e) {
165: log
166: .error(
167: "Failed to close database connection.",
168: e);
169: }
170: }
171: if (Thread.interrupted())
172: return;
173: wait(5000);
174: }
175: } catch (InterruptedException e) {
176: // ignore
177: } finally {
178: log.info("Event dispatcher thread ended.");
179: }
180: }
181:
182: private void closeStatement(PreparedStatement stmt) {
183: try {
184: if (stmt != null)
185: stmt.close();
186: } catch (Throwable e) {
187: log.error("Failed to close JDBC statement.", e);
188: }
189: }
190: }
191: }
|