001: /*
002: * Copyright (c) 1998-2008 Caucho Technology -- all rights reserved
003: *
004: * This file is part of Resin(R) Open Source
005: *
006: * Each copy or derived work must preserve the copyright notice and this
007: * notice unmodified.
008: *
009: * Resin Open Source is free software; you can redistribute it and/or modify
010: * it under the terms of the GNU General Public License as published by
011: * the Free Software Foundation; either version 2 of the License, or
012: * (at your option) any later version.
013: *
014: * Resin Open Source is distributed in the hope that it will be useful,
015: * but WITHOUT ANY WARRANTY; without even the implied warranty of
016: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE, or any warranty
017: * of NON-INFRINGEMENT. See the GNU General Public License for more
018: * details.
019: *
020: * You should have received a copy of the GNU General Public License
021: * along with Resin Open Source; if not, write to the
022: *
023: * Free Software Foundation, Inc.
024: * 59 Temple Place, Suite 330
025: * Boston, MA 02111-1307 USA
026: *
027: * @author Scott Ferguson
028: */
029:
030: package com.caucho.jms.jdbc;
031:
032: import com.caucho.config.ConfigException;
033: import com.caucho.jdbc.JdbcMetaData;
034: import com.caucho.jdbc.OracleMetaData;
035: import com.caucho.jms.JMSExceptionWrapper;
036: import com.caucho.jms.message.BytesMessageImpl;
037: import com.caucho.jms.message.MapMessageImpl;
038: import com.caucho.jms.message.MessageImpl;
039: import com.caucho.jms.message.ObjectMessageImpl;
040: import com.caucho.jms.message.StreamMessageImpl;
041: import com.caucho.jms.message.TextMessageImpl;
042: import com.caucho.jms.selector.Selector;
043: import com.caucho.util.CharBuffer;
044: import com.caucho.util.L10N;
045: import com.caucho.vfs.*;
046:
047: import javax.jms.*;
048: import javax.sql.DataSource;
049: import java.io.EOFException;
050: import java.io.IOException;
051: import java.io.InputStream;
052: import java.io.ObjectInputStream;
053: import java.io.ObjectOutputStream;
054: import java.sql.Connection;
055: import java.sql.PreparedStatement;
056: import java.sql.ResultSet;
057: import java.sql.SQLException;
058: import java.sql.Statement;
059: import java.sql.Types;
060: import java.util.Enumeration;
061: import java.util.logging.Level;
062: import java.util.logging.Logger;
063:
064: /**
065: * Represents a JDBC message.
066: */
067: public class JdbcMessage {
068: static final Logger log = Logger.getLogger(JdbcMessage.class
069: .getName());
070: static final L10N L = new L10N(JdbcMessage.class);
071:
072: private static final int MESSAGE = 0;
073: private static final int TEXT = 1;
074: private static final int BYTES = 2;
075: private static final int STREAM = 3;
076: private static final int OBJECT = 4;
077: private static final int MAP = 5;
078:
079: private final JdbcManager _jdbcManager;
080: private DataSource _dataSource;
081:
082: private String _messageTable;
083: private String _messageSequence;
084:
085: private boolean _isOracle;
086:
087: public JdbcMessage(JdbcManager jdbcManager) {
088: _jdbcManager = jdbcManager;
089: }
090:
091: /**
092: * Initializes the JdbcMessage
093: */
094: public void init() throws ConfigException, SQLException {
095: _messageTable = _jdbcManager.getMessageTable();
096: _dataSource = _jdbcManager.getDataSource();
097:
098: JdbcMetaData metaData = _jdbcManager.getMetaData();
099:
100: _isOracle = metaData instanceof OracleMetaData;
101:
102: String longType = _jdbcManager.getLongType();
103: String identity = longType + " PRIMARY KEY";
104:
105: if (metaData.supportsIdentity())
106: identity = metaData.createIdentitySQL(identity);
107: else
108: _messageSequence = _messageTable + "_cseq";
109:
110: Connection conn = _dataSource.getConnection();
111: try {
112: Statement stmt = conn.createStatement();
113: String sql = "SELECT 1 FROM " + _messageTable
114: + " WHERE 1=0";
115:
116: try {
117: ResultSet rs = stmt.executeQuery(sql);
118: rs.next();
119: rs.close();
120: stmt.close();
121:
122: return;
123: } catch (SQLException e) {
124: log.finest(e.toString());
125: }
126:
127: String blob = _jdbcManager.getBlob();
128:
129: log.info(L.l("creating JMS message table {0}",
130: _messageTable));
131:
132: sql = ("CREATE TABLE " + _messageTable + " (" + " m_id "
133: + identity + "," + " queue INTEGER NOT NULL,"
134: + " conn VARCHAR(255)," + " consumer " + longType
135: + "," + " delivered INTEGER NOT NULL,"
136: + " msg_type INTEGER NOT NULL," + " expire "
137: + longType + " NOT NULL," + " header " + blob
138: + "," + " body " + blob + ")");
139:
140: if (_isOracle) {
141: String extent = "";
142:
143: if (_jdbcManager.getTablespace() != null) {
144: extent = " tablespace "
145: + _jdbcManager.getTablespace();
146: }
147:
148: // oracle recommends using retention (over pctversion) for performance
149: // Oracle will keep deleted lobs for the retention time before
150: // releasing them (e.g. 900 seconds)
151: sql += (" LOB(header) STORE AS (cache retention"
152: + extent + ")");
153: sql += (" LOB(body) STORE AS (cache retention" + extent + ")");
154: }
155:
156: stmt.executeUpdate(sql);
157:
158: if (_messageSequence != null) {
159: stmt.executeUpdate(metaData.createSequenceSQL(
160: _messageSequence, 1));
161: }
162: } finally {
163: conn.close();
164: }
165: }
166:
167: /**
168: * Sends the message to the queue.
169: */
170: public long send(Message message, int queue, long expireTime)
171: throws SQLException, IOException, JMSException {
172: if (log.isLoggable(Level.FINE))
173: log.fine("jms jdbc queue:" + queue + " send message");
174:
175: TempStream header = new TempStream();
176: header.openWrite();
177:
178: WriteStream ws = new WriteStream(header);
179: writeMessageHeader(ws, message);
180: ws.close();
181:
182: TempStream body = null;
183:
184: int type = MESSAGE;
185:
186: if (message instanceof TextMessage) {
187: TextMessage text = (TextMessage) message;
188:
189: type = TEXT;
190:
191: if (text.getText() != null) {
192: body = new TempStream();
193: body.openWrite();
194:
195: ws = new WriteStream(body);
196: ws.setEncoding("UTF-8");
197: ws.print(text.getText());
198: ws.close();
199: }
200: } else if (message instanceof BytesMessage) {
201: BytesMessage bytes = (BytesMessage) message;
202:
203: type = BYTES;
204:
205: body = writeBytes(bytes);
206: } else if (message instanceof StreamMessage) {
207: StreamMessage stream = (StreamMessage) message;
208:
209: type = STREAM;
210:
211: body = writeStream(stream);
212: } else if (message instanceof ObjectMessage) {
213: ObjectMessage obj = (ObjectMessage) message;
214:
215: type = OBJECT;
216:
217: body = writeObject(obj);
218: } else if (message instanceof MapMessage) {
219: MapMessage obj = (MapMessage) message;
220:
221: type = MAP;
222:
223: body = writeMap(obj);
224: }
225:
226: Connection conn = _dataSource.getConnection();
227: try {
228: String sql;
229:
230: if (_messageSequence != null) {
231: sql = _jdbcManager.getMetaData().selectSequenceSQL(
232: _messageSequence);
233:
234: PreparedStatement pstmt = conn.prepareStatement(sql);
235: ;
236:
237: long mId = -1;
238:
239: ResultSet rs = pstmt.executeQuery();
240: if (rs.next())
241: mId = rs.getLong(1);
242: else
243: throw new RuntimeException("can't create message");
244:
245: sql = ("INSERT INTO "
246: + _messageTable
247: + "(m_id, queue, msg_type, expire, delivered, header, body) " + "VALUES (?,?,?,?,0,?,?)");
248:
249: pstmt = conn.prepareStatement(sql);
250:
251: int i = 1;
252: pstmt.setLong(i++, mId);
253: pstmt.setInt(i++, queue);
254: pstmt.setInt(i++, type);
255: pstmt.setLong(i++, expireTime);
256:
257: if (header.getLength() > 0)
258: pstmt.setBinaryStream(i++, header.openRead(),
259: header.getLength());
260: else
261: pstmt.setNull(i++, Types.BINARY);
262:
263: if (body != null)
264: pstmt.setBinaryStream(i++, body.openRead(), body
265: .getLength());
266: else
267: pstmt.setString(i++, "");
268:
269: pstmt.executeUpdate();
270: } else {
271: sql = ("INSERT INTO "
272: + _messageTable
273: + "(queue, msg_type, expire, delivered, header, body) " + "VALUES (?,?,?,0,?,?)");
274: PreparedStatement pstmt;
275:
276: pstmt = conn.prepareStatement(sql);
277:
278: int i = 1;
279: pstmt.setInt(i++, queue);
280: pstmt.setInt(i++, type);
281: pstmt.setLong(i++, expireTime);
282: pstmt.setBinaryStream(i++, header.openRead(), header
283: .getLength());
284:
285: if (body != null)
286: pstmt.setBinaryStream(i++, body.openRead(), body
287: .getLength());
288: else
289: pstmt.setString(i++, "");
290:
291: pstmt.executeUpdate();
292: }
293:
294: return 0;
295: } finally {
296: conn.close();
297: }
298: }
299:
300: /**
301: * Receives a message from the queue.
302: */
303: MessageImpl receive(int queue, int session) throws SQLException,
304: IOException, JMSException {
305: long minId = -1;
306:
307: Connection conn = _dataSource.getConnection();
308: try {
309: String sql = ("SELECT m_id, msg_type, delivered, body, header"
310: + " FROM "
311: + _messageTable
312: + " WHERE ?<id AND queue=? AND consumer IS NULL" + " ORDER BY id");
313:
314: PreparedStatement selectStmt = conn.prepareStatement(sql);
315:
316: sql = ("UPDATE " + _messageTable
317: + " SET consumer=?, delivered=1 " + "WHERE m_id=? AND consumer IS NULL");
318:
319: PreparedStatement updateStmt = conn.prepareStatement(sql);
320:
321: long id = -1;
322: while (true) {
323: id = -1;
324:
325: selectStmt.setLong(1, minId);
326: selectStmt.setInt(2, queue);
327:
328: MessageImpl msg = null;
329:
330: ResultSet rs = selectStmt.executeQuery();
331: while (rs.next()) {
332: id = rs.getLong(1);
333:
334: minId = id;
335:
336: msg = readMessage(rs);
337: }
338:
339: rs.close();
340:
341: if (msg == null)
342: return null;
343:
344: updateStmt.setInt(1, session);
345: updateStmt.setLong(2, id);
346:
347: int updateCount = updateStmt.executeUpdate();
348:
349: if (updateCount == 1)
350: return msg;
351: else if (log.isLoggable(Level.FINE)) {
352: log.fine("JdbcMessageQueue[" + queue
353: + "] can't update received message " + id
354: + " for session " + session + ".");
355: }
356: }
357: } finally {
358: conn.close();
359: }
360: }
361:
362: /**
363: * Acknowledges all received messages from the session.
364: */
365: void acknowledge(int session) throws SQLException {
366: Connection conn = _dataSource.getConnection();
367:
368: try {
369: String sql = ("DELETE FROM " + _messageTable + " " + "WHERE consumer=?");
370:
371: PreparedStatement pstmt;
372: pstmt = conn.prepareStatement(sql);
373:
374: pstmt.setInt(1, session);
375:
376: pstmt.executeUpdate();
377:
378: pstmt.close();
379: } finally {
380: conn.close();
381: }
382: }
383:
384: /**
385: * Reads the message from the result stream.
386: */
387: MessageImpl readMessage(ResultSet rs) throws SQLException,
388: IOException, JMSException {
389: int msgType = rs.getInt(2);
390: boolean redelivered = rs.getInt(3) == 1;
391:
392: MessageImpl msg;
393:
394: switch (msgType) {
395: case TEXT: {
396: InputStream is = rs.getBinaryStream(4);
397:
398: try {
399: msg = readTextMessage(is);
400: } finally {
401: if (is != null)
402: is.close();
403: }
404: break;
405: }
406:
407: case BYTES: {
408: InputStream is = rs.getBinaryStream(4);
409:
410: try {
411: msg = readBytesMessage(is);
412: } finally {
413: if (is != null)
414: is.close();
415: }
416: break;
417: }
418:
419: case STREAM: {
420: InputStream is = rs.getBinaryStream(4);
421:
422: try {
423: msg = readStreamMessage(is);
424: } finally {
425: if (is != null)
426: is.close();
427: }
428: break;
429: }
430:
431: case OBJECT: {
432: InputStream is = rs.getBinaryStream(4);
433:
434: try {
435: msg = readObjectMessage(is);
436: } finally {
437: if (is != null)
438: is.close();
439: }
440: break;
441: }
442:
443: case MAP: {
444: InputStream is = rs.getBinaryStream(4);
445:
446: try {
447: msg = readMapMessage(is);
448: } finally {
449: if (is != null)
450: is.close();
451: }
452: break;
453: }
454:
455: case MESSAGE:
456: default: {
457: msg = new MessageImpl();
458: break;
459: }
460: }
461:
462: InputStream is = rs.getBinaryStream(5);
463:
464: if (is != null) {
465: try {
466: readMessageHeader(is, msg);
467: } finally {
468: is.close();
469: }
470: }
471:
472: msg.setJMSRedelivered(redelivered);
473:
474: return msg;
475: }
476:
477: /**
478: * Writes the message header for a Resin message.
479: */
480: private void writeMessageHeader(WriteStream ws, Message msg)
481: throws IOException, JMSException {
482: Enumeration names = msg.getPropertyNames();
483: CharBuffer cb = new CharBuffer();
484:
485: while (names.hasMoreElements()) {
486: String name = (String) names.nextElement();
487: writeValue(ws, cb, name);
488:
489: String value = msg.getStringProperty(name);
490: writeValue(ws, cb, value);
491: }
492: }
493:
494: /**
495: * Writes a value to the output stream.
496: */
497: private void writeValue(WriteStream ws, CharBuffer cb, Object value)
498: throws IOException {
499: if (value == null)
500: ws.write('N');
501: else {
502: cb.clear();
503: cb.append(value);
504: int length = cb.length();
505: char[] buf = cb.getBuffer();
506:
507: ws.write('S');
508: ws.write(length >> 24);
509: ws.write(length >> 16);
510: ws.write(length >> 8);
511: ws.write(length);
512:
513: for (int i = 0; i < length; i++) {
514: int ch = buf[i];
515:
516: ws.write(ch >> 8);
517: ws.write(ch);
518: }
519: }
520: }
521:
522: /**
523: * Writes the bytes message.
524: */
525: private TempStream writeBytes(BytesMessage bytes)
526: throws IOException, JMSException {
527: TempStream body = new TempStream();
528: body.openWrite();
529:
530: WriteStream ws = new WriteStream(body);
531:
532: int data;
533: //bytes.reset();
534:
535: TempBuffer tb = TempBuffer.allocate();
536: byte[] buffer = tb.getBuffer();
537: int len;
538:
539: while ((len = bytes.readBytes(buffer, buffer.length)) >= 0) {
540: ws.write(buffer, 0, len);
541: }
542:
543: TempBuffer.free(tb);
544: tb = null;
545:
546: ws.close();
547:
548: return body;
549: }
550:
551: /**
552: * Writes the stream message.
553: */
554: private TempStream writeStream(StreamMessage stream)
555: throws IOException, JMSException {
556: TempStream body = new TempStream();
557: body.openWrite();
558:
559: WriteStream ws = new WriteStream(body);
560: ObjectOutputStream out = new ObjectOutputStream(ws);
561:
562: try {
563: while (true) {
564: Object data = stream.readObject();
565:
566: out.writeObject(data);
567: }
568: } catch (MessageEOFException e) {
569: }
570:
571: out.close();
572: ws.close();
573:
574: return body;
575: }
576:
577: /**
578: * Writes the object message.
579: */
580: private TempStream writeObject(ObjectMessage obj)
581: throws IOException, JMSException {
582: TempStream body = new TempStream();
583: body.openWrite();
584:
585: WriteStream ws = new WriteStream(body);
586: ObjectOutputStream out = new ObjectOutputStream(ws);
587:
588: out.writeObject(obj.getObject());
589:
590: out.close();
591: ws.close();
592:
593: return body;
594: }
595:
596: /**
597: * Writes the map message.
598: */
599: private TempStream writeMap(MapMessage map) throws IOException,
600: JMSException {
601: TempStream body = new TempStream();
602: body.openWrite();
603:
604: WriteStream ws = new WriteStream(body);
605: ObjectOutputStream out = new ObjectOutputStream(ws);
606:
607: try {
608: Enumeration e = map.getMapNames();
609: while (e.hasMoreElements()) {
610: String name = (String) e.nextElement();
611: out.writeUTF(name);
612:
613: Object data = map.getObject(name);
614: out.writeObject(data);
615: }
616: } catch (MessageEOFException e) {
617: }
618:
619: out.close();
620: ws.close();
621:
622: return body;
623: }
624:
625: /**
626: * Writes the message header for a Resin message.
627: */
628: private void readMessageHeader(InputStream is, Message msg)
629: throws IOException, JMSException {
630: CharBuffer cb = new CharBuffer();
631:
632: int type;
633:
634: while ((type = is.read()) > 0) {
635: String name = (String) readValue(is, type, cb);
636: Object value = readValue(is, is.read(), cb);
637:
638: msg.setObjectProperty(name, value);
639: }
640: }
641:
642: /**
643: * Writes the message header for a Resin message.
644: */
645: private TextMessageImpl readTextMessage(InputStream is)
646: throws IOException, JMSException {
647: TextMessageImpl text = new TextMessageImpl();
648:
649: if (is == null)
650: return text;
651:
652: ByteToChar byteToChar = ByteToChar.create();
653:
654: int ch;
655:
656: byteToChar.setEncoding("UTF-8");
657: while ((ch = is.read()) >= 0) {
658: byteToChar.addByte(ch);
659: }
660:
661: text.setText(byteToChar.getConvertedString());
662:
663: return text;
664: }
665:
666: /**
667: * Reads a bytes message.
668: */
669: private BytesMessageImpl readBytesMessage(InputStream is)
670: throws IOException, JMSException {
671: BytesMessageImpl bytes = new BytesMessageImpl();
672:
673: if (is == null) {
674: bytes.reset();
675:
676: return bytes;
677: }
678:
679: int data;
680:
681: while ((data = is.read()) >= 0) {
682: bytes.writeByte((byte) data);
683: }
684:
685: bytes.reset();
686:
687: return bytes;
688: }
689:
690: /**
691: * Reads a stream message.
692: */
693: private StreamMessageImpl readStreamMessage(InputStream is)
694: throws IOException, JMSException {
695: StreamMessageImpl stream = new StreamMessageImpl();
696:
697: if (is == null)
698: return stream;
699:
700: ObjectInputStream in = new ContextLoaderObjectInputStream(is);
701:
702: try {
703: while (true) {
704: Object obj = in.readObject();
705:
706: stream.writeObject(obj);
707: }
708: } catch (EOFException e) {
709: } catch (Exception e) {
710: throw new JMSExceptionWrapper(e);
711: }
712:
713: in.close();
714:
715: stream.reset();
716:
717: return stream;
718: }
719:
720: /**
721: * Reads a map message.
722: */
723: private MapMessageImpl readMapMessage(InputStream is)
724: throws IOException, JMSException {
725: MapMessageImpl map = new MapMessageImpl();
726:
727: if (is == null)
728: return map;
729:
730: ObjectInputStream in = new ContextLoaderObjectInputStream(is);
731:
732: try {
733: while (true) {
734: String name = in.readUTF();
735: Object obj = in.readObject();
736:
737: map.setObject(name, obj);
738: }
739: } catch (EOFException e) {
740: } catch (Exception e) {
741: throw new JMSExceptionWrapper(e);
742: }
743:
744: in.close();
745:
746: return map;
747: }
748:
749: /**
750: * Reads an object message.
751: */
752: private ObjectMessageImpl readObjectMessage(InputStream is)
753: throws IOException, JMSException {
754: ObjectMessageImpl msg = new ObjectMessageImpl();
755:
756: if (is == null)
757: return msg;
758:
759: ObjectInputStream in = new ContextLoaderObjectInputStream(is);
760:
761: try {
762: Object obj = in.readObject();
763: msg.setObject((java.io.Serializable) obj);
764: } catch (IOException e) {
765: throw e;
766: } catch (Exception e) {
767: throw new JMSExceptionWrapper(e);
768: }
769:
770: in.close();
771:
772: return msg;
773: }
774:
775: /**
776: * Writes a value to the output stream.
777: */
778: private Object readValue(InputStream is, int type, CharBuffer cb)
779: throws IOException {
780: switch (type) {
781: case 'N':
782: return null;
783: case 'S': {
784: cb.clear();
785: int length = readInt(is);
786:
787: for (int i = 0; i < length; i++) {
788: char ch = (char) ((is.read() << 8) + is.read());
789:
790: cb.append(ch);
791: }
792:
793: return cb.toString();
794: }
795: default:
796: throw new IOException(L.l("unknown header type"));
797: }
798: }
799:
800: /**
801: * Reads an integer value.
802: */
803: private int readInt(InputStream is) throws IOException {
804: return ((is.read() << 24) + (is.read() << 16)
805: + (is.read() << 8) + (is.read()));
806: }
807:
808: /**
809: * Removes the first message matching the selector.
810: */
811: private boolean hasMessage(Selector selector) throws JMSException {
812: return false;
813: }
814:
815: /**
816: * Returns a printable view of the queue.
817: */
818: public String toString() {
819: return "JdbcMessage[" + _messageTable + "]";
820: }
821: }
|