001: /*
002: * JBoss, Home of Professional Open Source
003: * Copyright 2005, JBoss Inc., and individual contributors as indicated
004: * by the @authors tag. See the copyright.txt in the distribution for a
005: * full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jbpm.msg.jms;
023:
024: import javax.jms.Connection;
025: import javax.jms.Destination;
026: import javax.jms.JMSException;
027: import javax.jms.Message;
028: import javax.jms.MessageProducer;
029: import javax.jms.Session;
030:
031: import org.jbpm.JbpmContext;
032: import org.jbpm.JbpmException;
033: import org.jbpm.db.JobSession;
034: import org.jbpm.graph.exe.Token;
035: import org.jbpm.job.Job;
036: import org.jbpm.msg.MessageService;
037:
038: public class JmsMessageServiceImpl implements MessageService {
039:
040: private static final long serialVersionUID = 1L;
041:
042: JobSession jobSession = null;
043: Connection connection = null;
044: Session session = null;
045: Destination destination = null;
046: MessageProducer messageProducer = null;
047: boolean isCommitEnabled = true;
048:
049: public JmsMessageServiceImpl(Connection connection,
050: Session session, Destination destination,
051: boolean isCommitEnabled) {
052: this .connection = connection;
053: this .session = session;
054: this .destination = destination;
055: this .isCommitEnabled = isCommitEnabled;
056:
057: JbpmContext jbpmContext = JbpmContext.getCurrentJbpmContext();
058: if (jbpmContext == null) {
059: throw new JbpmException(
060: "instantiation of the JmsMessageService requires a current JbpmContext");
061: }
062: this .jobSession = jbpmContext.getJobSession();
063: }
064:
065: public void send(Job job) {
066: try {
067: jobSession.saveJob(job);
068:
069: Message message = session.createMessage();
070: message.setLongProperty("jobId", job.getId());
071: if (job.getToken() != null) {
072: message.setLongProperty("tokenId", job.getToken()
073: .getId());
074: }
075: if (job.getProcessInstance() != null) {
076: message.setLongProperty("processInstanceId", job
077: .getProcessInstance().getId());
078: }
079: if (job.getTaskInstance() != null) {
080: message.setLongProperty("taskInstanceId", job
081: .getTaskInstance().getId());
082: }
083: getMessageProducer().send(destination, message);
084: } catch (JMSException e) {
085: throw new JbpmException("couldn't send jms message", e);
086: }
087: }
088:
089: public void close() {
090: Throwable exception = null;
091: try {
092: if (messageProducer != null)
093: messageProducer.close();
094: } catch (Exception e) {
095: // NOTE that Error's are not caught because that might halt the JVM and mask the original Error.
096: exception = e;
097: }
098: try {
099: if ((session != null) && (isCommitEnabled)) {
100: session.commit();
101: }
102: } catch (Exception e) {
103: if (exception != null) {
104: exception = new JbpmException(
105: "couldn't commit JMS session", e);
106: } else {
107: exception = e;
108: }
109: }
110: try {
111: if (session != null)
112: session.close();
113: } catch (Exception e) {
114: if (exception != null) {
115: exception = new JbpmException(
116: "couldn't close JMS session", e);
117: } else {
118: exception = e;
119: }
120: }
121: try {
122: if (connection != null)
123: connection.close();
124: } catch (Exception e) {
125: if (exception != null) {
126: exception = new JbpmException(
127: "couldn't close JMS connection", e);
128: } else {
129: exception = e;
130: }
131: }
132:
133: if (exception != null) {
134: throw new JbpmException(
135: "couldn't close one of the jms objects", exception);
136: }
137: }
138:
139: protected MessageProducer getMessageProducer() throws JMSException {
140: if (messageProducer == null) {
141: messageProducer = session.createProducer(destination);
142: }
143: return messageProducer;
144: }
145: }
|