001: package org.objectweb.celtix.bus.ws.rm.persistence.jdbc;
002:
003: import java.io.File;
004: import java.io.IOException;
005: import java.io.InputStream;
006: import java.math.BigDecimal;
007: import java.math.BigInteger;
008: import java.sql.Connection;
009: import java.sql.DriverManager;
010: import java.sql.PreparedStatement;
011: import java.sql.ResultSet;
012: import java.sql.SQLException;
013: import java.sql.Statement;
014: import java.text.MessageFormat;
015: import java.util.ArrayList;
016: import java.util.Collection;
017: import java.util.Date;
018: import java.util.HashMap;
019: import java.util.Map;
020: import java.util.logging.Level;
021: import java.util.logging.Logger;
022:
023: import org.objectweb.celtix.bus.ws.rm.DestinationSequence;
024: import org.objectweb.celtix.bus.ws.rm.RMMessageImpl;
025: import org.objectweb.celtix.bus.ws.rm.RMUtils;
026: import org.objectweb.celtix.bus.ws.rm.SourceSequence;
027: import org.objectweb.celtix.bus.ws.rm.persistence.RMStoreException;
028: import org.objectweb.celtix.common.i18n.Message;
029: import org.objectweb.celtix.common.logging.LogUtils;
030: import org.objectweb.celtix.ws.addressing.v200408.EndpointReferenceType;
031: import org.objectweb.celtix.ws.rm.Identifier;
032: import org.objectweb.celtix.ws.rm.SequenceAcknowledgement;
033: import org.objectweb.celtix.ws.rm.persistence.RMDestinationSequence;
034: import org.objectweb.celtix.ws.rm.persistence.RMMessage;
035: import org.objectweb.celtix.ws.rm.persistence.RMSourceSequence;
036: import org.objectweb.celtix.ws.rm.persistence.RMStore;
037:
038: public class RMTxStore implements RMStore {
039:
040: public static final String DRIVER_CLASS_NAME_PROPERTY = "org.objectweb.celtix.rm.persistence.jdbc.driver";
041: public static final String CONNECTION_URL_PROPERTY = "org.objectweb.celtix.rm.persistence.jdbc.url";
042: public static final String USER_NAME_PROPERTY = "org.objectweb.celtix.rm.persistence.jdbc.user";
043: public static final String PASSWORD_PROPERTY = "org.objectweb.celtix.rm.persistence.jdbc.password";
044:
045: private static final String CREATE_DEST_SEQUENCES_TABLE_STMT = "CREATE TABLE CELTIX_RM_DEST_SEQUENCES "
046: + "(SEQ_ID VARCHAR(256) NOT NULL, "
047: + "ACKS_TO VARCHAR(1024) NOT NULL, "
048: + "LAST_MSG_NO DECIMAL(31, 0), "
049: + "ENDPOINT_ID VARCHAR(1024), "
050: + "ACKNOWLEDGED BLOB, "
051: + "PRIMARY KEY (SEQ_ID))";
052: private static final String CREATE_SRC_SEQUENCES_TABLE_STMT = "CREATE TABLE CELTIX_RM_SRC_SEQUENCES "
053: + "(SEQ_ID VARCHAR(256) NOT NULL, "
054: + "CUR_MSG_NO DECIMAL(31, 0) NOT NULL DEFAULT 1, "
055: + "LAST_MSG CHAR(1), "
056: + "EXPIRY BIGINT, "
057: + "OFFERING_SEQ_ID VARCHAR(256), "
058: + "ENDPOINT_ID VARCHAR(1024), " + "PRIMARY KEY (SEQ_ID))";
059: private static final String CREATE_MESSAGES_TABLE_STMT = "CREATE TABLE {0} "
060: + "(SEQ_ID VARCHAR(256) NOT NULL, "
061: + "MSG_NO DECIMAL(31, 0) NOT NULL, "
062: + "CONTEXT BLOB, "
063: + "PRIMARY KEY (SEQ_ID, MSG_NO))";
064: private static final String INBOUND_MSGS_TABLE_NAME = "CELTIX_RM_INBOUND_MESSAGES";
065: private static final String OUTBOUND_MSGS_TABLE_NAME = "CELTIX_RM_OUTBOUND_MESSAGES";
066:
067: private static final String CREATE_DEST_SEQUENCE_STMT_STR = "INSERT INTO CELTIX_RM_DEST_SEQUENCES (SEQ_ID, ACKS_TO, ENDPOINT_ID) VALUES(?, ?, ?)";
068: private static final String CREATE_SRC_SEQUENCE_STMT_STR = "INSERT INTO CELTIX_RM_SRC_SEQUENCES VALUES(?, 1, '0', ?, ?, ?)";
069: private static final String DELETE_DEST_SEQUENCE_STMT_STR = "DELETE FROM CELTIX_RM_DEST_SEQUENCES WHERE SEQ_ID = ?";
070: private static final String DELETE_SRC_SEQUENCE_STMT_STR = "DELETE FROM CELTIX_RM_SRC_SEQUENCES WHERE SEQ_ID = ?";
071: private static final String UPDATE_DEST_SEQUENCE_STMT_STR = "UPDATE CELTIX_RM_DEST_SEQUENCES SET LAST_MSG_NO = ?, ACKNOWLEDGED = ? WHERE SEQ_ID = ?";
072: private static final String UPDATE_SRC_SEQUENCE_STMT_STR = "UPDATE CELTIX_RM_SRC_SEQUENCES SET CUR_MSG_NO = ?, LAST_MSG = ? WHERE SEQ_ID = ?";
073: private static final String CREATE_MESSAGE_STMT_STR = "INSERT INTO {0} VALUES(?, ?, ?)";
074: private static final String DELETE_MESSAGE_STMT_STR = "DELETE FROM {0} WHERE SEQ_ID = ? AND MSG_NO = ?";
075:
076: private static final String SELECT_DEST_SEQUENCES_STMT_STR = "SELECT SEQ_ID, ACKS_TO, LAST_MSG_NO, ACKNOWLEDGED FROM CELTIX_RM_DEST_SEQUENCES "
077: + "WHERE ENDPOINT_ID = ?";
078: private static final String SELECT_SRC_SEQUENCES_STMT_STR = "SELECT SEQ_ID, CUR_MSG_NO, LAST_MSG, EXPIRY, OFFERING_SEQ_ID FROM CELTIX_RM_SRC_SEQUENCES "
079: + "WHERE ENDPOINT_ID = ?";
080: private static final String SELECT_MESSAGES_STMT_STR = "SELECT MSG_NO, CONTEXT FROM {0} WHERE SEQ_ID = ?";
081:
082: private static final Logger LOG = LogUtils
083: .getL7dLogger(RMTxStore.class);
084:
085: private static Map<String, Connection> connectionMap;
086: private Connection connection;
087: private PreparedStatement createDestSequenceStmt;
088: private PreparedStatement createSrcSequenceStmt;
089: private PreparedStatement deleteDestSequenceStmt;
090: private PreparedStatement deleteSrcSequenceStmt;
091: private PreparedStatement updateDestSequenceStmt;
092: private PreparedStatement updateSrcSequenceStmt;
093: private PreparedStatement selectDestSequencesStmt;
094: private PreparedStatement selectSrcSequencesStmt;
095:
096: private PreparedStatement createInboundMessageStmt;
097: private PreparedStatement createOutboundMessageStmt;
098: private PreparedStatement deleteInboundMessageStmt;
099: private PreparedStatement deleteOutboundMessageStmt;
100: private PreparedStatement selectInboundMessagesStmt;
101: private PreparedStatement selectOutboundMessagesStmt;
102:
103: // RMStore interface
104:
105: public void init(Map<String, String> params) {
106: connect(params);
107: }
108:
109: public void createSourceSequence(RMSourceSequence seq) {
110: String sequenceIdentifier = seq.getIdentifier().getValue();
111: String endpointIdentifier = seq.getEndpointIdentifier();
112: if (LOG.isLoggable(Level.FINE)) {
113: LOG.info("Creating source sequence: " + sequenceIdentifier
114: + ", (endpoint: " + endpointIdentifier + ")");
115: }
116:
117: try {
118: beginTransaction();
119:
120: if (null == createSrcSequenceStmt) {
121: createSrcSequenceStmt = connection
122: .prepareStatement(CREATE_SRC_SEQUENCE_STMT_STR);
123: }
124: assert null != createSrcSequenceStmt;
125: createSrcSequenceStmt.setString(1, sequenceIdentifier);
126: Date expiry = seq.getExpiry();
127: createSrcSequenceStmt.setLong(2, expiry == null ? 0
128: : expiry.getTime());
129: Identifier osid = seq.getOfferingSequenceIdentifier();
130: createSrcSequenceStmt.setString(3, osid == null ? null
131: : osid.getValue());
132: createSrcSequenceStmt.setString(4, endpointIdentifier);
133: createSrcSequenceStmt.execute();
134:
135: commit();
136:
137: } catch (SQLException ex) {
138: abort();
139: throw new RMStoreException(ex);
140: }
141: }
142:
143: public void createDestinationSequence(RMDestinationSequence seq) {
144: String sequenceIdentifier = seq.getIdentifier().getValue();
145: String endpointIdentifier = seq.getEndpointIdentifier();
146: if (LOG.isLoggable(Level.FINE)) {
147: LOG.info("Creating destination sequence: "
148: + sequenceIdentifier + ", (endpoint: "
149: + endpointIdentifier + ")");
150: }
151: try {
152: beginTransaction();
153:
154: if (null == createDestSequenceStmt) {
155: createDestSequenceStmt = connection
156: .prepareStatement(CREATE_DEST_SEQUENCE_STMT_STR);
157: }
158: createDestSequenceStmt.setString(1, sequenceIdentifier);
159: String addr = seq.getAcksTo().getAddress().getValue();
160: createDestSequenceStmt.setString(2, addr);
161: createDestSequenceStmt.setString(3, endpointIdentifier);
162:
163: createDestSequenceStmt.execute();
164:
165: commit();
166:
167: } catch (SQLException ex) {
168: abort();
169: throw new RMStoreException(ex);
170: }
171: }
172:
173: public void removeDestinationSequence(Identifier sid) {
174: try {
175: beginTransaction();
176:
177: if (null == deleteDestSequenceStmt) {
178: deleteDestSequenceStmt = connection
179: .prepareStatement(DELETE_DEST_SEQUENCE_STMT_STR);
180: }
181: deleteDestSequenceStmt.setString(1, sid.getValue());
182: deleteDestSequenceStmt.execute();
183:
184: commit();
185:
186: } catch (SQLException ex) {
187: abort();
188: throw new RMStoreException(ex);
189: }
190: }
191:
192: public void removeSourceSequence(Identifier sid) {
193: try {
194: beginTransaction();
195:
196: if (null == deleteSrcSequenceStmt) {
197: deleteSrcSequenceStmt = connection
198: .prepareStatement(DELETE_SRC_SEQUENCE_STMT_STR);
199: }
200: deleteSrcSequenceStmt.setString(1, sid.getValue());
201: deleteSrcSequenceStmt.execute();
202:
203: commit();
204:
205: } catch (SQLException ex) {
206: abort();
207: throw new RMStoreException(ex);
208: }
209: }
210:
211: public Collection<RMDestinationSequence> getDestinationSequences(
212: String endpointIdentifier) {
213: if (LOG.isLoggable(Level.FINE)) {
214: LOG.info("Getting destination sequences for endpoint: "
215: + endpointIdentifier);
216: }
217: Collection<RMDestinationSequence> seqs = new ArrayList<RMDestinationSequence>();
218: try {
219: if (null == selectDestSequencesStmt) {
220: selectDestSequencesStmt = connection
221: .prepareStatement(SELECT_DEST_SEQUENCES_STMT_STR);
222: }
223: selectDestSequencesStmt.setString(1, endpointIdentifier);
224:
225: ResultSet res = selectDestSequencesStmt.executeQuery();
226: while (res.next()) {
227: // do something
228: Identifier sid = RMUtils.getWSRMFactory()
229: .createIdentifier();
230: sid.setValue(res.getString(1));
231: EndpointReferenceType acksTo = RMUtils
232: .createReference(res.getString(2));
233: BigDecimal lm = res.getBigDecimal(3);
234: InputStream is = res.getBinaryStream(4);
235: SequenceAcknowledgement ack = null;
236: if (null != is) {
237: ack = RMUtils.getPersistenceUtils()
238: .getSequenceAcknowledgment(is);
239: }
240: DestinationSequence seq = new DestinationSequence(sid,
241: acksTo, lm == null ? null : lm.toBigInteger(),
242: ack);
243: seqs.add(seq);
244: }
245: } catch (SQLException ex) {
246: LOG.log(Level.WARNING, new Message(
247: "SELECT_DEST_SEQ_FAILED_MSG", LOG).toString(), ex);
248: }
249: return seqs;
250: }
251:
252: public Collection<RMSourceSequence> getSourceSequences(
253: String endpointIdentifier) {
254: if (LOG.isLoggable(Level.FINE)) {
255: LOG.info("Getting source sequences for endpoint: "
256: + endpointIdentifier);
257: }
258: Collection<RMSourceSequence> seqs = new ArrayList<RMSourceSequence>();
259: try {
260: if (null == selectSrcSequencesStmt) {
261: selectSrcSequencesStmt = connection
262: .prepareStatement(SELECT_SRC_SEQUENCES_STMT_STR);
263: }
264: selectSrcSequencesStmt.setString(1, endpointIdentifier);
265: ResultSet res = selectSrcSequencesStmt.executeQuery();
266:
267: while (res.next()) {
268: Identifier sid = RMUtils.getWSRMFactory()
269: .createIdentifier();
270: sid.setValue(res.getString(1));
271: BigInteger cmn = res.getBigDecimal(2).toBigInteger();
272: boolean lm = res.getBoolean(3);
273: long lval = res.getLong(4);
274: Date expiry = 0 == lval ? null : new Date(lval);
275: String oidValue = res.getString(5);
276: Identifier oi = null;
277: if (null != oidValue) {
278: oi = RMUtils.getWSRMFactory().createIdentifier();
279: oi.setValue(oidValue);
280: }
281: SourceSequence seq = new SourceSequence(sid, expiry,
282: oi, cmn, lm);
283: seqs.add(seq);
284: }
285: } catch (SQLException ex) {
286: // ignore
287: LOG.log(Level.WARNING, new Message(
288: "SELECT_SRC_SEQ_FAILED_MSG", LOG).toString(), ex);
289: }
290: return seqs;
291: }
292:
293: public Collection<RMMessage> getMessages(Identifier sid,
294: boolean outbound) {
295: Collection<RMMessage> msgs = new ArrayList<RMMessage>();
296: try {
297: PreparedStatement stmt = outbound ? selectOutboundMessagesStmt
298: : selectInboundMessagesStmt;
299: if (null == stmt) {
300: stmt = connection.prepareStatement(MessageFormat
301: .format(SELECT_MESSAGES_STMT_STR,
302: outbound ? OUTBOUND_MSGS_TABLE_NAME
303: : INBOUND_MSGS_TABLE_NAME));
304: if (outbound) {
305: selectOutboundMessagesStmt = stmt;
306: } else {
307: selectInboundMessagesStmt = stmt;
308: }
309: }
310:
311: stmt.setString(1, sid.getValue());
312: ResultSet res = stmt.executeQuery();
313: while (res.next()) {
314: BigInteger mn = res.getBigDecimal(1).toBigInteger();
315: InputStream is = res.getBinaryStream(2);
316: RMMessageImpl msg = new RMMessageImpl(mn, is);
317: msgs.add(msg);
318: }
319: } catch (SQLException ex) {
320: LOG.log(Level.WARNING, new Message(
321: outbound ? "SELECT_OUTBOUND_MSGS_FAILED_MSG"
322: : "SELECT_INBOUND_MSGS_FAILED_MSG", LOG)
323: .toString(), ex);
324: }
325: return msgs;
326: }
327:
328: public void persistIncoming(RMDestinationSequence seq, RMMessage msg) {
329: try {
330: beginTransaction();
331:
332: updateDestinationSequence(seq);
333:
334: storeMessage(seq.getIdentifier(), msg, false);
335:
336: commit();
337:
338: } catch (SQLException ex) {
339: abort();
340: throw new RMStoreException(ex);
341: } catch (IOException ex) {
342: abort();
343: throw new RMStoreException(ex);
344: }
345: }
346:
347: public void persistOutgoing(RMSourceSequence seq, RMMessage msg) {
348: try {
349: beginTransaction();
350:
351: updateSourceSequence(seq);
352:
353: storeMessage(seq.getIdentifier(), msg, true);
354:
355: commit();
356:
357: } catch (SQLException ex) {
358: abort();
359: throw new RMStoreException(ex);
360: } catch (IOException ex) {
361: abort();
362: throw new RMStoreException(ex);
363: }
364:
365: }
366:
367: public void removeMessages(Identifier sid,
368: Collection<BigInteger> messageNrs, boolean outbound) {
369: try {
370: beginTransaction();
371: PreparedStatement stmt = outbound ? deleteOutboundMessageStmt
372: : deleteInboundMessageStmt;
373: if (null == stmt) {
374: stmt = connection.prepareStatement(MessageFormat
375: .format(DELETE_MESSAGE_STMT_STR,
376: outbound ? OUTBOUND_MSGS_TABLE_NAME
377: : INBOUND_MSGS_TABLE_NAME));
378: if (outbound) {
379: deleteOutboundMessageStmt = stmt;
380: } else {
381: deleteInboundMessageStmt = stmt;
382: }
383: }
384:
385: stmt.setString(1, sid.getValue());
386:
387: for (BigInteger messageNr : messageNrs) {
388: stmt.setBigDecimal(2, new BigDecimal(messageNr));
389: stmt.execute();
390: }
391:
392: commit();
393:
394: } catch (SQLException ex) {
395: abort();
396: throw new RMStoreException(ex);
397: }
398: }
399:
400: // transaction demarcation
401:
402: protected void beginTransaction() {
403: // no-op
404: }
405:
406: protected void commit() throws SQLException {
407: connection.commit();
408: }
409:
410: protected void abort() {
411: try {
412: connection.rollback();
413: } catch (SQLException ex) {
414: LOG.log(Level.SEVERE, new Message("ABORT_FAILED_MSG", LOG)
415: .toString(), ex);
416: }
417: }
418:
419: // helpers
420:
421: protected void storeMessage(Identifier sid, RMMessage msg,
422: boolean outbound) throws IOException, SQLException {
423: PreparedStatement stmt = outbound ? createOutboundMessageStmt
424: : createInboundMessageStmt;
425: if (null == stmt) {
426: stmt = connection.prepareStatement(MessageFormat.format(
427: CREATE_MESSAGE_STMT_STR,
428: outbound ? OUTBOUND_MSGS_TABLE_NAME
429: : INBOUND_MSGS_TABLE_NAME));
430: if (outbound) {
431: createOutboundMessageStmt = stmt;
432: } else {
433: createInboundMessageStmt = stmt;
434: }
435: }
436:
437: int i = 1;
438: stmt.setString(i++, sid.getValue());
439: stmt.setBigDecimal(i++, new BigDecimal(msg.getMessageNr()));
440: InputStream is = msg.getContextAsStream();
441: stmt.setBinaryStream(i++, is, is.available());
442: stmt.execute();
443: }
444:
445: protected void updateSourceSequence(RMSourceSequence seq)
446: throws SQLException {
447: if (null == updateSrcSequenceStmt) {
448: updateSrcSequenceStmt = connection
449: .prepareStatement(UPDATE_SRC_SEQUENCE_STMT_STR);
450: }
451: updateSrcSequenceStmt.setBigDecimal(1, new BigDecimal(seq
452: .getCurrentMessageNr()));
453: updateSrcSequenceStmt.setBoolean(2, seq.isLastMessage());
454: updateSrcSequenceStmt.setString(3, seq.getIdentifier()
455: .getValue());
456: updateSrcSequenceStmt.execute();
457: }
458:
459: protected void updateDestinationSequence(RMDestinationSequence seq)
460: throws SQLException, IOException {
461: if (null == updateDestSequenceStmt) {
462: updateDestSequenceStmt = connection
463: .prepareStatement(UPDATE_DEST_SEQUENCE_STMT_STR);
464: }
465: BigInteger lastMessageNr = seq.getLastMessageNr();
466: updateDestSequenceStmt.setBigDecimal(1,
467: lastMessageNr == null ? null : new BigDecimal(
468: lastMessageNr));
469: InputStream is = seq.getAcknowledgmentAsStream();
470: updateDestSequenceStmt.setBinaryStream(2, is, is.available());
471: updateDestSequenceStmt.setString(3, seq.getIdentifier()
472: .getValue());
473: updateDestSequenceStmt.execute();
474: }
475:
476: protected void createTables() throws SQLException {
477:
478: Statement stmt = null;
479:
480: stmt = connection.createStatement();
481: try {
482: stmt.executeUpdate(CREATE_SRC_SEQUENCES_TABLE_STMT);
483: } catch (SQLException ex) {
484: if (!"X0Y32".equals(ex.getSQLState())) {
485: throw ex;
486: } else {
487: LOG
488: .fine("Table CELTIX_RM_SRC_SEQUENCES already exists.");
489: }
490: }
491: stmt.close();
492:
493: stmt = connection.createStatement();
494: try {
495: stmt.executeUpdate(CREATE_DEST_SEQUENCES_TABLE_STMT);
496: } catch (SQLException ex) {
497: if (!"X0Y32".equals(ex.getSQLState())) {
498: throw ex;
499: } else {
500: LOG
501: .fine("Table CELTIX_RM_DEST_SEQUENCES already exists.");
502: }
503: }
504: stmt.close();
505:
506: for (String tableName : new String[] {
507: OUTBOUND_MSGS_TABLE_NAME, INBOUND_MSGS_TABLE_NAME }) {
508: stmt = connection.createStatement();
509: try {
510: stmt.executeUpdate(MessageFormat.format(
511: CREATE_MESSAGES_TABLE_STMT, tableName));
512: } catch (SQLException ex) {
513: if (!"X0Y32".equals(ex.getSQLState())) {
514: throw ex;
515: } else {
516: if (LOG.isLoggable(Level.FINE)) {
517: LOG.fine("Table " + tableName
518: + " already exists.");
519: }
520: }
521: }
522: stmt.close();
523: }
524: }
525:
526: synchronized void connect(Map<String, String> params) {
527:
528: if (null == connectionMap) {
529: connectionMap = new HashMap<String, Connection>();
530: }
531: String url = params.get(CONNECTION_URL_PROPERTY);
532: assert null != url;
533: connection = connectionMap.get(url);
534: if (null != connection) {
535: return;
536: }
537:
538: String driverClassName = params.get(DRIVER_CLASS_NAME_PROPERTY);
539: assert null != driverClassName;
540: try {
541: Class.forName(driverClassName);
542: } catch (ClassNotFoundException ex) {
543: throw new RMStoreException(ex);
544: }
545:
546: // assert null != params.get(USER_NAME_PROPERTY);
547:
548: try {
549: connection = DriverManager.getConnection(url, params
550: .get(USER_NAME_PROPERTY), params
551: .get(PASSWORD_PROPERTY));
552: connection.setAutoCommit(false);
553: createTables();
554:
555: } catch (SQLException ex) {
556: throw new RMStoreException(ex);
557: }
558:
559: connectionMap.put(url, connection);
560: assert connection == connectionMap.get(url);
561: }
562:
563: /**
564: * Accessor for connection - used in tests only.
565: * @return the connection
566: */
567: Connection getConnection() {
568: return connection;
569: }
570:
571: public static void deleteDatabaseFiles(String path, boolean now) {
572: File root = null;
573: String dsh = System.getProperty("derby.system.home");
574: if (null == dsh) {
575: File log = new File("derby.log");
576: if (log.exists()) {
577: if (now) {
578: log.delete();
579: } else {
580: log.deleteOnExit();
581: }
582: }
583: root = new File(path);
584: } else {
585: root = new File(dsh);
586: }
587: if (root.exists()) {
588: recursiveDelete(root, now);
589: }
590:
591: }
592:
593: private static void recursiveDelete(File dir, boolean now) {
594: for (File f : dir.listFiles()) {
595: if (f.isDirectory()) {
596: recursiveDelete(f, now);
597: } else {
598: if (now) {
599: f.delete();
600: } else {
601: f.deleteOnExit();
602: }
603: }
604: }
605: if (now) {
606: dir.delete();
607: } else {
608: dir.deleteOnExit();
609: }
610: }
611: }
|