001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.mq;
023:
024: import java.io.Serializable;
025: import java.util.Map;
026: import java.util.ArrayList;
027:
028: import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap;
029:
030: import javax.jms.JMSException;
031: import javax.transaction.xa.XAException;
032: import javax.transaction.xa.XAResource;
033: import javax.transaction.xa.Xid;
034:
035: import org.jboss.logging.Logger;
036:
037: /**
038: * This class implements the ResourceManager used for the XAResources used int
039: * JBossMQ.
040: *
041: * @author Hiram Chirino (Cojonudo14@hotmail.com)
042: * @author <a href="mailto:adrian@jboss.org">Adrian Brock</a>
043: * @version $Revision: 57198 $
044: */
045: public class SpyXAResourceManager implements Serializable {
046: /** The serialVersionUID */
047: static final long serialVersionUID = -6268132972627753772L;
048: /** The log */
049: private static final Logger log = Logger
050: .getLogger(SpyXAResourceManager.class);
051: /** Whether trace is enabled */
052: private static boolean trace = log.isTraceEnabled();
053:
054: //Valid tx states:
055: private final static byte TX_OPEN = 0;
056: private final static byte TX_ENDED = 1;
057: private final static byte TX_PREPARED = 3;
058: private final static byte TX_COMMITED = 4;
059: private final static byte TX_ROLLEDBACK = 5;
060: private final static byte TX_READONLY = 6;
061:
062: /** The connection */
063: private Connection connection;
064: /** The transactions */
065: private Map transactions = new ConcurrentReaderHashMap();
066: /** The next xid */
067: private long nextInternalXid = Long.MIN_VALUE;
068:
069: /**
070: * Create a new SpyXAResourceManager
071: *
072: * @param conn the connection
073: */
074: public SpyXAResourceManager(Connection conn) {
075: super ();
076: connection = conn;
077: }
078:
079: /**
080: * Acknowledge a message
081: *
082: * @param xid the xid
083: * @param msg the message
084: * @throws JMSException for any error
085: */
086: public void ackMessage(Object xid, SpyMessage msg)
087: throws JMSException {
088: if (xid == null) {
089: if (trace)
090: log.trace("No Xid, acking message "
091: + msg.header.jmsMessageID);
092: msg.doAcknowledge();
093: return;
094: }
095:
096: if (trace)
097: log.trace("Adding acked message xid=" + xid + " "
098: + msg.header.jmsMessageID);
099:
100: TXState state = (TXState) transactions.get(xid);
101: if (state == null)
102: throw new JMSException("Invalid transaction id.");
103: AcknowledgementRequest item = msg
104: .getAcknowledgementRequest(true);
105: state.ackedMessages.add(item);
106: }
107:
108: public void addMessage(Object xid, SpyMessage msg)
109: throws JMSException {
110: if (xid == null) {
111: if (trace)
112: log.trace("No Xid, sending message to server "
113: + msg.header.jmsMessageID);
114: connection.sendToServer(msg);
115: return;
116: }
117:
118: if (trace)
119: log.trace("Adding message xid=" + xid + ", message="
120: + msg.header.jmsMessageID);
121:
122: TXState state = (TXState) transactions.get(xid);
123: if (trace)
124: log.trace("TXState=" + state);
125:
126: if (state == null)
127: throw new JMSException("Invalid transaction id.");
128:
129: state.sentMessages.add(msg);
130: }
131:
132: public void commit(Object xid, boolean onePhase)
133: throws XAException, JMSException {
134: if (trace)
135: log
136: .trace("Commiting xid=" + xid + ", onePhase="
137: + onePhase);
138:
139: TXState state = (TXState) transactions.remove(xid);
140: if (state == null) {
141: XAException e = new XAException(
142: "Unknown transaction during commit " + xid);
143: e.errorCode = XAException.XAER_NOTA;
144: throw e;
145: }
146:
147: if (onePhase) {
148: if (state.isReadOnly()) {
149: if (trace)
150: log.trace("Nothing to do for " + xid);
151: }
152:
153: TransactionRequest transaction = new TransactionRequest();
154: transaction.requestType = TransactionRequest.ONE_PHASE_COMMIT_REQUEST;
155: transaction.xid = null;
156: if (state.sentMessages.size() != 0) {
157: SpyMessage job[] = new SpyMessage[state.sentMessages
158: .size()];
159: job = (SpyMessage[]) state.sentMessages.toArray(job);
160: transaction.messages = job;
161: }
162: if (state.ackedMessages.size() != 0) {
163: AcknowledgementRequest job[] = new AcknowledgementRequest[state.ackedMessages
164: .size()];
165: job = (AcknowledgementRequest[]) state.ackedMessages
166: .toArray(job);
167: transaction.acks = job;
168: }
169: connection.send(transaction);
170: } else {
171: if (state.txState == TX_READONLY) {
172: if (trace)
173: log.trace("Nothing to do for " + xid);
174: return;
175: }
176: if (state.txState != TX_PREPARED) {
177: XAException e = new XAException(
178: "Cannot complete 2 phase commit, the transaction has not been prepared "
179: + xid);
180: e.errorCode = XAException.XAER_PROTO;
181: throw e;
182: }
183: TransactionRequest transaction = new TransactionRequest();
184: transaction.xid = xid;
185: transaction.requestType = TransactionRequest.TWO_PHASE_COMMIT_COMMIT_REQUEST;
186: connection.send(transaction);
187: }
188: state.txState = TX_COMMITED;
189: }
190:
191: public void endTx(Object xid, boolean success) throws XAException {
192: if (trace)
193: log.trace("Ending xid=" + xid + ", success=" + success);
194:
195: TXState state = (TXState) transactions.get(xid);
196: if (state == null) {
197: XAException e = new XAException(
198: "Unknown transaction during delist " + xid);
199: e.errorCode = XAException.XAER_NOTA;
200: throw e;
201: }
202: state.txState = TX_ENDED;
203: }
204:
205: public Object joinTx(Xid xid) throws XAException {
206: if (trace)
207: log.trace("Joining tx xid=" + xid);
208:
209: if (!transactions.containsKey(xid)) {
210: XAException e = new XAException(
211: "Unknown transaction during join " + xid);
212: e.errorCode = XAException.XAER_NOTA;
213: throw e;
214: }
215: return xid;
216: }
217:
218: public int prepare(Object xid) throws XAException, JMSException {
219: if (trace)
220: log.trace("Preparing xid=" + xid);
221:
222: TXState state = (TXState) transactions.get(xid);
223: if (state == null) {
224: XAException e = new XAException(
225: "Unknown transaction during prepare " + xid);
226: e.errorCode = XAException.XAER_NOTA;
227: throw e;
228: }
229:
230: if (state.isReadOnly()) {
231: if (trace)
232: log.trace("Vote read only for " + xid);
233: state.txState = TX_READONLY;
234: return XAResource.XA_RDONLY;
235: }
236:
237: TransactionRequest transaction = new TransactionRequest();
238: transaction.requestType = TransactionRequest.TWO_PHASE_COMMIT_PREPARE_REQUEST;
239: transaction.xid = xid;
240: if (state.sentMessages.size() != 0) {
241: SpyMessage job[] = new SpyMessage[state.sentMessages.size()];
242: job = (SpyMessage[]) state.sentMessages.toArray(job);
243: transaction.messages = job;
244: }
245: if (state.ackedMessages.size() != 0) {
246: AcknowledgementRequest job[] = new AcknowledgementRequest[state.ackedMessages
247: .size()];
248: job = (AcknowledgementRequest[]) state.ackedMessages
249: .toArray(job);
250: transaction.acks = job;
251: }
252: connection.send(transaction);
253: state.txState = TX_PREPARED;
254: return XAResource.XA_OK;
255: }
256:
257: public Object resumeTx(Xid xid) throws XAException {
258: if (trace)
259: log.trace("Resuming tx xid=" + xid);
260:
261: if (!transactions.containsKey(xid)) {
262: XAException e = new XAException(
263: "Unknown transaction during resume " + xid);
264: e.errorCode = XAException.XAER_NOTA;
265: throw e;
266: }
267: return xid;
268: }
269:
270: public void rollback(Object xid) throws XAException, JMSException {
271: if (trace)
272: log.trace("Rolling back xid=" + xid);
273:
274: TXState state = (TXState) transactions.remove(xid);
275: if (state == null) {
276: XAException e = new XAException(
277: "Unknown transaction during rollback " + xid);
278: e.errorCode = XAException.XAER_NOTA;
279: throw e;
280: }
281: if (state.txState == TX_READONLY) {
282: if (trace)
283: log.trace("Nothing to do for " + xid);
284: return;
285: }
286: if (state.txState != TX_PREPARED) {
287: TransactionRequest transaction = new TransactionRequest();
288: transaction.requestType = TransactionRequest.ONE_PHASE_COMMIT_REQUEST;
289: transaction.xid = null;
290: if (state.ackedMessages.size() != 0) {
291: AcknowledgementRequest job[] = new AcknowledgementRequest[state.ackedMessages
292: .size()];
293: job = (AcknowledgementRequest[]) state.ackedMessages
294: .toArray(job);
295: transaction.acks = job;
296: //Neg Acknowlege all consumed messages
297: for (int i = 0; i < transaction.acks.length; i++) {
298: transaction.acks[i].isAck = false;
299: }
300: }
301: connection.send(transaction);
302: } else {
303: TransactionRequest transaction = new TransactionRequest();
304: transaction.xid = xid;
305: transaction.requestType = TransactionRequest.TWO_PHASE_COMMIT_ROLLBACK_REQUEST;
306: connection.send(transaction);
307: }
308: state.txState = TX_ROLLEDBACK;
309: }
310:
311: public Xid[] recover(int arg) throws XAException, JMSException {
312: if (trace)
313: log.trace("Recover arg=" + arg);
314:
315: Xid[] xids = connection.recover(arg);
316:
317: // Make sure we have a reference to each xid
318: for (int i = 0; i < xids.length; ++i) {
319: if (transactions.containsKey(xids[i]) == false) {
320: TXState state = new TXState();
321: state.txState = TX_PREPARED;
322: transactions.put(xids[i], state);
323: }
324: }
325: return xids;
326: }
327:
328: public void forget(Xid xid) throws XAException, JMSException {
329: if (trace)
330: log.trace("Forget xid=" + xid);
331:
332: TXState state = (TXState) transactions.get(xid);
333: if (state == null)
334: return;
335: if (state.txState != TX_PREPARED)
336: transactions.remove(xid);
337: rollback(xid);
338: }
339:
340: public synchronized Long getNewXid() {
341: return new Long(nextInternalXid++);
342: }
343:
344: public Object startTx() {
345: Long newXid = getNewXid();
346: transactions.put(newXid, new TXState());
347:
348: if (trace)
349: log.trace("Starting tx with new xid=" + newXid);
350:
351: return newXid;
352: }
353:
354: public Object startTx(Xid xid) throws XAException {
355: if (trace)
356: log.trace("Starting tx xid=" + xid);
357:
358: if (transactions.containsKey(xid)) {
359: XAException e = new XAException(
360: "Duplicate transaction id during enlist " + xid);
361: e.errorCode = XAException.XAER_DUPID;
362: throw e;
363: }
364: transactions.put(xid, new TXState());
365: return xid;
366: }
367:
368: public Object suspendTx(Xid xid) throws XAException {
369: if (trace)
370: log.trace("Suppending tx xid=" + xid);
371:
372: if (!transactions.containsKey(xid)) {
373: XAException e = new XAException(
374: "Unknown transaction during suspend " + xid);
375: e.errorCode = XAException.XAER_NOTA;
376: throw e;
377: }
378: return xid;
379: }
380:
381: public Object convertTx(Long anonXid, Xid xid) throws XAException {
382: if (trace)
383: log.trace("Converting tx anonXid=" + anonXid + ", xid="
384: + xid);
385:
386: if (!transactions.containsKey(anonXid)) {
387: XAException e = new XAException(
388: "Unknown transaction during convert " + anonXid);
389: e.errorCode = XAException.XAER_NOTA;
390: throw e;
391: }
392: if (transactions.containsKey(xid)) {
393: XAException e = new XAException(
394: "Duplicate transaction during convert " + xid);
395: e.errorCode = XAException.XAER_DUPID;
396: throw e;
397: }
398: TXState s = (TXState) transactions.remove(anonXid);
399:
400: transactions.put(xid, s);
401: return xid;
402: }
403:
404: /**
405: * The transaction state
406: */
407: static class TXState {
408: byte txState = TX_OPEN;
409: ArrayList sentMessages = new ArrayList();
410: ArrayList ackedMessages = new ArrayList();
411:
412: public boolean isReadOnly() {
413: return sentMessages.size() == 0
414: && ackedMessages.size() == 0;
415: }
416:
417: public String toString() {
418: StringBuffer buffer = new StringBuffer(100);
419: buffer.append("TxState txState=").append(txState);
420: buffer.append(" sent=").append(sentMessages);
421: buffer.append(" acks=").append(ackedMessages);
422: return buffer.toString();
423: }
424: }
425: }
|