001: /*
002: * $Id: VMMessageRequester.java 10961 2008-02-22 19:01:02Z dfeist $
003: * --------------------------------------------------------------------------------------
004: * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com
005: *
006: * The software in this package is published under the terms of the CPAL v1.0
007: * license, a copy of which has been included with this distribution in the
008: * LICENSE.txt file.
009: */
010:
011: package org.mule.transport.vm;
012:
013: import org.mule.api.MuleMessage;
014: import org.mule.api.endpoint.InboundEndpoint;
015: import org.mule.transport.AbstractMessageRequester;
016: import org.mule.util.queue.Queue;
017: import org.mule.util.queue.QueueSession;
018:
019: /**
020: * <code>VMMessageDispatcher</code> is used for providing in memory interaction
021: * between components.
022: */
023: public class VMMessageRequester extends AbstractMessageRequester {
024:
025: private final VMConnector connector;
026:
027: public VMMessageRequester(InboundEndpoint endpoint) {
028: super (endpoint);
029: this .connector = (VMConnector) endpoint.getConnector();
030: }
031:
032: /**
033: * Make a specific request to the underlying transport
034: *
035: * @param timeout the maximum time the operation should block before returning.
036: * The call should return immediately if there is data available. If
037: * no data becomes available before the timeout elapses, null will be
038: * returned
039: * @return the result of the request wrapped in a MuleMessage object. Null will be
040: * returned if no data was available
041: * @throws Exception if the call to the underlying protocol causes an exception
042: */
043: protected MuleMessage doRequest(long timeout) throws Exception {
044: if (!connector.isQueueEvents()) {
045: throw new UnsupportedOperationException(
046: "Receive requested on VM Connector, but queueEvents is false");
047: }
048:
049: try {
050: QueueSession queueSession = connector.getQueueSession();
051: Queue queue = queueSession.getQueue(endpoint
052: .getEndpointURI().getAddress());
053:
054: if (queue == null) {
055: if (logger.isDebugEnabled()) {
056: logger.debug("No queue with name "
057: + endpoint.getEndpointURI().getAddress());
058: }
059: return null;
060: } else {
061: MuleMessage message = null;
062: if (logger.isDebugEnabled()) {
063: logger.debug("Waiting for a message on "
064: + endpoint.getEndpointURI().getAddress());
065: }
066: try {
067: message = (MuleMessage) queue.poll(timeout);
068: } catch (InterruptedException e) {
069: logger
070: .error("Failed to receive message from queue: "
071: + endpoint.getEndpointURI());
072: }
073: if (message != null) {
074: if (logger.isDebugEnabled()) {
075: logger.debug("Message received: " + message);
076: }
077: return message;
078: } else {
079: if (logger.isDebugEnabled()) {
080: logger.debug("No event received after "
081: + timeout + " ms");
082: }
083: return null;
084: }
085: }
086: } catch (Exception e) {
087: throw e;
088: }
089: }
090:
091: protected void doDispose() {
092: // template method
093: }
094:
095: protected void doConnect() throws Exception {
096: if (connector.isQueueEvents()) {
097: // use the default queue profile to configure this queue.
098: connector.getQueueProfile().configureQueue(
099: endpoint.getEndpointURI().getAddress(),
100: connector.getQueueManager());
101: }
102: }
103:
104: protected void doDisconnect() throws Exception {
105: // template method
106: }
107:
108: }
|