001: /*
002: * $Id: JdbcMessageReceiver.java 10961 2008-02-22 19:01:02Z dfeist $
003: * --------------------------------------------------------------------------------------
004: * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com
005: *
006: * The software in this package is published under the terms of the CPAL v1.0
007: * license, a copy of which has been included with this distribution in the
008: * LICENSE.txt file.
009: */
010:
011: package org.mule.transport.jdbc;
012:
013: import org.mule.DefaultMuleMessage;
014: import org.mule.api.MuleMessage;
015: import org.mule.api.endpoint.InboundEndpoint;
016: import org.mule.api.lifecycle.CreateException;
017: import org.mule.api.service.Service;
018: import org.mule.api.transaction.Transaction;
019: import org.mule.api.transport.Connector;
020: import org.mule.api.transport.MessageAdapter;
021: import org.mule.transaction.TransactionCoordination;
022: import org.mule.transport.ConnectException;
023: import org.mule.transport.TransactedPollingMessageReceiver;
024: import org.mule.util.ArrayUtils;
025:
026: import java.sql.Connection;
027: import java.sql.SQLException;
028: import java.util.ArrayList;
029: import java.util.List;
030:
031: /** TODO */
032: public class JdbcMessageReceiver extends
033: TransactedPollingMessageReceiver {
034:
035: protected JdbcConnector connector;
036: protected String readStmt;
037: protected String ackStmt;
038: protected List readParams;
039: protected List ackParams;
040:
041: public JdbcMessageReceiver(Connector connector, Service service,
042: InboundEndpoint endpoint, String readStmt, String ackStmt)
043: throws CreateException {
044: super (connector, service, endpoint);
045: this .setFrequency(((JdbcConnector) connector)
046: .getPollingFrequency());
047: this .setReceiveMessagesInTransaction(false);
048:
049: this .connector = (JdbcConnector) connector;
050: this .readParams = new ArrayList();
051: this .readStmt = this .connector.parseStatement(readStmt,
052: this .readParams);
053: this .ackParams = new ArrayList();
054: this .ackStmt = this .connector.parseStatement(ackStmt,
055: this .ackParams);
056: }
057:
058: protected void doDispose() {
059: // template method
060: }
061:
062: protected void doConnect() throws Exception {
063: Connection con = null;
064: try {
065: con = this .connector.getConnection();
066: } catch (Exception e) {
067: throw new ConnectException(e, this );
068: } finally {
069: JdbcUtils.close(con);
070: }
071: }
072:
073: protected void doDisconnect() throws ConnectException {
074: // noop
075: }
076:
077: public void processMessage(Object message) throws Exception {
078: Connection con = null;
079: Transaction tx = TransactionCoordination.getInstance()
080: .getTransaction();
081: try {
082: con = this .connector.getConnection();
083: MessageAdapter msgAdapter = this .connector
084: .getMessageAdapter(message);
085: MuleMessage umoMessage = new DefaultMuleMessage(msgAdapter);
086: if (this .ackStmt != null) {
087: Object[] ackParams = connector.getParams(endpoint,
088: this .ackParams, umoMessage, this .endpoint
089: .getEndpointURI().getAddress());
090: if (logger.isDebugEnabled()) {
091: logger.debug("SQL UPDATE: " + ackStmt
092: + ", params = "
093: + ArrayUtils.toString(ackParams));
094: }
095: int nbRows = connector.getQueryRunner().update(con,
096: this .ackStmt, ackParams);
097: if (nbRows != 1) {
098: logger
099: .warn("Row count for ack should be 1 and not "
100: + nbRows);
101: }
102: }
103: routeMessage(umoMessage, tx, tx != null
104: || endpoint.isSynchronous());
105:
106: } catch (Exception ex) {
107: if (tx != null) {
108: tx.setRollbackOnly();
109: }
110:
111: // rethrow
112: throw ex;
113: } finally {
114: if (endpoint.getMuleContext().getTransactionManager() != null
115: || tx == null) {
116: // We are running in an XA transaction.
117: // This call is required here for compatibility with strict XA
118: // DataSources
119: // implementations, as is the case for WebSphere AS and Weblogic.
120: // Failure to do it here may result in a connection leak.
121: // The close() call will NOT close the connection, neither will it
122: // return it to the pool.
123: // It will notify the XA driver's ConnectionEventListener that the XA
124: // connection
125: // is no longer used by the application and is ready for the 2PC
126: // commit.
127: JdbcUtils.close(con);
128: }
129: }
130: }
131:
132: public List getMessages() throws Exception {
133: Connection con = null;
134: try {
135: try {
136: con = this .connector.getConnection();
137: } catch (SQLException e) {
138: throw new ConnectException(e, this );
139: }
140:
141: Object[] readParams = connector.getParams(endpoint,
142: this .readParams, null, this .endpoint
143: .getEndpointURI().getAddress());
144: if (logger.isDebugEnabled()) {
145: logger.debug("SQL QUERY: " + readStmt + ", params = "
146: + ArrayUtils.toString(readParams));
147: }
148: Object results = connector.getQueryRunner().query(con,
149: this .readStmt, readParams,
150: connector.getResultSetHandler());
151: return (List) results;
152: } finally {
153: JdbcUtils.close(con);
154: }
155: }
156:
157: }
|