001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017: package org.bpmscript.jbi.tasklist;
018:
019: import java.io.Serializable;
020: import java.util.ArrayList;
021: import java.util.Enumeration;
022: import java.util.List;
023: import java.util.UUID;
024:
025: import javax.jbi.messaging.MessageExchange;
026: import javax.jms.MessageConsumer;
027: import javax.jms.MessageProducer;
028: import javax.jms.ObjectMessage;
029: import javax.jms.Queue;
030: import javax.jms.QueueBrowser;
031: import javax.jms.Session;
032:
033: import org.bpmscript.IPagedResult;
034: import org.bpmscript.IQuery;
035: import org.bpmscript.PagedResult;
036: import org.bpmscript.jms.JmsTemplate;
037: import org.bpmscript.jms.JmsTemplateException;
038: import org.bpmscript.jms.SessionCallback;
039: import org.springframework.beans.factory.InitializingBean;
040:
041: public class JmsTaskListStore implements ITaskListStore,
042: InitializingBean {
043:
044: protected final transient org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory
045: .getLog(getClass());
046:
047: private JmsTemplate template = null;
048: private String queueName = null;
049:
050: private Queue queue = null;
051:
052: public static final class TaskAndExchange implements Serializable {
053: public TaskAndExchange(ITaskRequest taskRequest,
054: MessageExchange messageExchange) {
055: this .taskRequest = taskRequest;
056: this .messageExchange = messageExchange;
057: }
058:
059: public ITaskRequest taskRequest;
060: public MessageExchange messageExchange;
061: }
062:
063: public ITask getTask(final String taskId) throws TaskListException {
064: try {
065: return (ITask) template.execute(new SessionCallback() {
066:
067: public Object doInJms(Session session) throws Exception {
068: QueueBrowser taskBrowser = session.createBrowser(
069: queue, "taskId = '" + taskId + "'");
070: Enumeration enumeration = taskBrowser
071: .getEnumeration();
072: if (enumeration.hasMoreElements()) {
073: ObjectMessage message = (ObjectMessage) enumeration
074: .nextElement();
075: TaskAndExchange taskAndExchange = (TaskAndExchange) message
076: .getObject();
077: return new Task(
078: message.getStringProperty("taskId"),
079: (ITaskRequest) taskAndExchange.taskRequest,
080: TaskState.valueOf(message
081: .getStringProperty("state")));
082: }
083: return null;
084: }
085:
086: });
087: } catch (JmsTemplateException e) {
088: throw new TaskListException(e);
089: }
090: }
091:
092: public String createTask(final MessageExchange exchange,
093: final ITaskRequest taskRequest) throws TaskListException {
094: final String id = UUID.randomUUID().toString();
095: try {
096: template.execute(new SessionCallback() {
097:
098: public Object doInJms(Session session) throws Exception {
099: MessageProducer producer = session
100: .createProducer(queue);
101: ObjectMessage message = session
102: .createObjectMessage();
103: TaskAndExchange taskAndExchange = new TaskAndExchange(
104: taskRequest, exchange);
105: message.setObject(taskAndExchange);
106: message.setObjectProperty("processInstanceId",
107: taskRequest.getProcessInstanceId());
108: message.setStringProperty("taskId", id);
109: message.setStringProperty("state", TaskState.OPEN
110: .name());
111: producer.send(message);
112: return null;
113: }
114: });
115: } catch (JmsTemplateException e) {
116: throw new TaskListException(e);
117: }
118: return id;
119: }
120:
121: @SuppressWarnings("unchecked")
122: public IPagedResult<ITask> getTasks(final IQuery query)
123: throws TaskListException {
124: try {
125: List<ITask> tasks = (List<ITask>) template
126: .execute(new SessionCallback() {
127:
128: public Object doInJms(Session session)
129: throws Exception {
130: QueueBrowser browser = session
131: .createBrowser(queue);
132: Enumeration enumeration = browser
133: .getEnumeration();
134: List<ITask> tasks = new ArrayList<ITask>();
135: while (enumeration.hasMoreElements()) {
136: ObjectMessage message = (ObjectMessage) enumeration
137: .nextElement();
138: TaskAndExchange taskAndExchange = (TaskAndExchange) message
139: .getObject();
140: tasks
141: .add(new Task(
142: message
143: .getStringProperty("taskId"),
144: (ITaskRequest) taskAndExchange.taskRequest,
145: TaskState
146: .valueOf(message
147: .getStringProperty("state"))));
148: }
149: return tasks;
150: }
151: });
152:
153: // TODO: reuse this logic
154: List<ITask> trimmedResults = null;
155: int lastResult = query.getFirstResult()
156: + query.getMaxResults();
157:
158: if (query.getFirstResult() >= 0
159: && query.getMaxResults() >= 0) {
160: trimmedResults = new ArrayList<ITask>(tasks.subList(
161: query.getFirstResult(), lastResult < tasks
162: .size() ? lastResult : tasks.size()));
163: } else {
164: trimmedResults = tasks;
165: }
166: return new PagedResult<ITask>(trimmedResults,
167: lastResult > 0 && lastResult < tasks.size(), tasks
168: .size());
169: } catch (JmsTemplateException e) {
170: throw new TaskListException(e);
171: }
172: }
173:
174: public MessageExchange getMessageExchange(final String taskId)
175: throws TaskListException {
176: try {
177: return (MessageExchange) template
178: .execute(new SessionCallback() {
179:
180: public Object doInJms(Session session)
181: throws Exception {
182: QueueBrowser browser = session
183: .createBrowser(queue, "taskId = '"
184: + taskId + "'");
185: Enumeration enumeration = browser
186: .getEnumeration();
187: if (enumeration.hasMoreElements()) {
188: ObjectMessage message = (ObjectMessage) enumeration
189: .nextElement();
190: TaskAndExchange taskAndExchange = (TaskAndExchange) message
191: .getObject();
192: return taskAndExchange.messageExchange;
193: }
194: return null;
195: }
196: });
197: } catch (JmsTemplateException e) {
198: throw new TaskListException(e);
199: }
200: }
201:
202: public void closeTask(final String taskId) throws TaskListException {
203: try {
204: template.execute(true, Session.AUTO_ACKNOWLEDGE,
205: new SessionCallback() {
206:
207: public Object doInJms(Session session)
208: throws Exception {
209: try {
210: MessageConsumer consumer = session
211: .createConsumer(
212: queue,
213: "taskId = '"
214: + taskId
215: + "' and state = '"
216: + TaskState.OPEN
217: .name()
218: + "'");
219: ObjectMessage openMessage = (ObjectMessage) consumer
220: .receiveNoWait();
221: // TODO: check that there's a message
222: // TODO: consider a split between open and closed tasks
223: // TODO: add taskresponse into api and closed message
224: ObjectMessage closedMessage = session
225: .createObjectMessage();
226: TaskAndExchange taskAndExchange = (TaskAndExchange) openMessage
227: .getObject();
228: closedMessage
229: .setObject(taskAndExchange);
230: closedMessage
231: .setStringProperty(
232: "taskId",
233: openMessage
234: .getStringProperty("taskId"));
235: closedMessage.setStringProperty(
236: "state", TaskState.CLOSED
237: .name());
238: closedMessage
239: .setObjectProperty(
240: "processInstanceId",
241: openMessage
242: .getStringProperty("processInstanceId"));
243: MessageProducer producer = session
244: .createProducer(queue);
245: producer.send(closedMessage);
246: session.commit();
247: } catch (Throwable e) {
248: session.rollback();
249: }
250: return null;
251: }
252: });
253: } catch (JmsTemplateException e) {
254: throw new TaskListException(e);
255: }
256: }
257:
258: @SuppressWarnings("unchecked")
259: public List<ITask> getTasksForInstance(
260: final String processInstanceId) throws TaskListException {
261: try {
262: return (List<ITask>) template
263: .execute(new SessionCallback() {
264:
265: public Object doInJms(Session session)
266: throws Exception {
267: QueueBrowser browser = session
268: .createBrowser(queue,
269: "processInstanceId = '"
270: + processInstanceId
271: + "'");
272: Enumeration enumeration = browser
273: .getEnumeration();
274: List<ITask> tasks = new ArrayList<ITask>();
275: while (enumeration.hasMoreElements()) {
276: ObjectMessage message = (ObjectMessage) enumeration
277: .nextElement();
278: TaskAndExchange taskAndExchange = (TaskAndExchange) message
279: .getObject();
280: tasks
281: .add(new Task(
282: message
283: .getStringProperty("taskId"),
284: (ITaskRequest) taskAndExchange.taskRequest,
285: TaskState
286: .valueOf(message
287: .getStringProperty("state"))));
288: }
289: return tasks;
290: }
291: });
292: } catch (JmsTemplateException e) {
293: throw new TaskListException(e);
294: }
295: }
296:
297: @SuppressWarnings("unchecked")
298: public List<ITask> clearTasks() throws TaskListException {
299: try {
300: return (List<ITask>) template
301: .execute(new SessionCallback() {
302:
303: public Object doInJms(Session session)
304: throws Exception {
305: List<ITask> tasks = new ArrayList<ITask>();
306: MessageConsumer consumer = session
307: .createConsumer(queue);
308: ObjectMessage message = null;
309: while ((message = (ObjectMessage) consumer
310: .receiveNoWait()) != null) {
311: TaskAndExchange taskAndExchange = (TaskAndExchange) message
312: .getObject();
313: tasks
314: .add(new Task(
315: message
316: .getStringProperty("taskId"),
317: (ITaskRequest) taskAndExchange.taskRequest,
318: TaskState
319: .valueOf(message
320: .getStringProperty("state"))));
321: }
322: return tasks;
323: }
324: });
325: } catch (JmsTemplateException e) {
326: throw new TaskListException(e);
327: }
328: }
329:
330: public void afterPropertiesSet() throws Exception {
331: queue = template.getQueue(queueName);
332: }
333:
334: public void setQueueName(String queueName) {
335: this .queueName = queueName;
336: }
337:
338: public void setTemplate(JmsTemplate template) {
339: this.template = template;
340: }
341:
342: }
|