001: /*
002: * BEGIN_HEADER - DO NOT EDIT
003: *
004: * The contents of this file are subject to the terms
005: * of the Common Development and Distribution License
006: * (the "License"). You may not use this file except
007: * in compliance with the License.
008: *
009: * You can obtain a copy of the license at
010: * https://open-esb.dev.java.net/public/CDDLv1.0.html.
011: * See the License for the specific language governing
012: * permissions and limitations under the License.
013: *
014: * When distributing Covered Code, include this CDDL
015: * HEADER in each file and include the License file at
016: * https://open-esb.dev.java.net/public/CDDLv1.0.html.
017: * If applicable add the following below this CDDL HEADER,
018: * with the fields enclosed by brackets "[]" replaced with
019: * your own identifying information: Portions Copyright
020: * [year] [name of copyright owner]
021: */
022:
023: /*
024: * @(#)MEPInputStream.java
025: * Copyright 2004-2007 Sun Microsystems, Inc. All Rights Reserved.
026: *
027: * END_HEADER - DO NOT EDIT
028: */
029: package com.sun.jbi.binding.proxy.util;
030:
031: import com.sun.jbi.binding.proxy.ExchangeEntry;
032: import com.sun.jbi.binding.proxy.LocalStringKeys;
033: import com.sun.jbi.binding.proxy.ProxyBinding;
034:
035: import com.sun.jbi.binding.proxy.connection.ClientConnection;
036:
037: import com.sun.jbi.binding.proxy.util.Translator;
038:
039: import com.sun.jbi.messaging.MessageExchange;
040:
041: import java.io.ByteArrayInputStream;
042: import java.io.ObjectInputStream;
043: import java.io.InputStream;
044: import java.io.OutputStream;
045:
046: import java.net.URI;
047:
048: import java.util.HashMap;
049: import java.util.logging.Logger;
050:
051: import javax.activation.DataHandler;
052:
053: import javax.jbi.messaging.DeliveryChannel;
054: import javax.jbi.messaging.ExchangeStatus;
055: import javax.jbi.messaging.Fault;
056: import javax.jbi.messaging.MessagingException;
057: import javax.jbi.messaging.NormalizedMessage;
058:
059: import javax.jbi.servicedesc.ServiceEndpoint;
060:
061: import javax.xml.namespace.QName;
062:
063: import javax.xml.transform.Source;
064: import javax.xml.transform.Transformer;
065: import javax.xml.transform.TransformerFactory;
066:
067: import javax.xml.transform.dom.DOMSource;
068: import javax.xml.transform.dom.DOMResult;
069:
070: import javax.xml.transform.stream.StreamResult;
071: import javax.xml.transform.stream.StreamSource;
072:
073: /** Implementation of an input stream that can deserialize a MEP.
074: * @author Sun Microsystems, Inc
075: */
076: public class MEPInputStream extends java.io.InputStream {
077: private InputStream mIS;
078: private MEPObjectInputStream mOIS;
079: private int mCount;
080: private int mOffset;
081: private int mStream;
082: private int mNextStream;
083: private int mMsgType;
084: private byte[] mBytes;
085: private String mExchangeId;
086: private Exception mException;
087: private Transformer mTransform;
088: private ProxyBinding mPB;
089: private ExchangeEntry mExchangeEntry;
090: private boolean mMEPOK;
091:
092: static final int BUFFER_SIZE = 1024;
093: public static final int TYPE_MEP = 1;
094: public static final int TYPE_EXCEPTION = 2;
095: public static final int TYPE_ISMEPOK = 3;
096: public static final int TYPE_MEPOK = 4;
097:
098: static private Logger mLog;
099:
100: static final int VERSION_1 = 1;
101: static final int VERSION_CURRRENT = VERSION_1;
102:
103: public MEPInputStream(ProxyBinding proxyBinding)
104: throws javax.jbi.messaging.MessagingException {
105: mPB = proxyBinding;
106: mBytes = new byte[BUFFER_SIZE];
107: if (mLog == null) {
108: mLog = mPB.getLogger("input");
109: }
110:
111: try {
112: // initialize transformer details
113: mTransform = TransformerFactory.newInstance()
114: .newTransformer();
115: } catch (javax.xml.transform.TransformerFactoryConfigurationError tfcEx) {
116: throw new javax.jbi.messaging.MessagingException(
117: Translator
118: .translate(LocalStringKeys.MEPINPUT_TRANSFORMER_ERROR),
119: tfcEx);
120: } catch (javax.xml.transform.TransformerConfigurationException cfgEx) {
121: throw new javax.jbi.messaging.MessagingException(
122: Translator
123: .translate(LocalStringKeys.MEPINPUT_TRANSFORMER_ERROR),
124: cfgEx);
125: }
126: }
127:
128: public int readMessage(ClientConnection cc)
129: throws javax.jbi.messaging.MessagingException {
130: MessageExchange me = null;
131: ExchangeEntry ee = null;
132: byte[] bytes = cc.receive();
133:
134: try {
135: //
136: // Prepare for reading.
137: //
138: mCount = 0;
139: mOffset = 0;
140: mStream = 0;
141: mNextStream = 0;
142: mIS = new ByteArrayInputStream(bytes);
143: mOIS = new MEPObjectInputStream(this );
144: mOIS
145: .setClassLoader(((com.sun.jbi.messaging.DeliveryChannel) mPB
146: .getDeliveryChannel()).getClassLoader());
147:
148: //
149: // Get message type.
150: //
151: mMsgType = mOIS.readByte();
152:
153: //
154: // Read contents of message type.
155: //
156: if (mMsgType == TYPE_EXCEPTION) {
157: readException();
158: } else if (mMsgType == TYPE_MEP) {
159: mExchangeEntry = ee = readMEP();
160: ee.setClientConnection(cc);
161: ee.receivedBytes(bytes.length);
162: } else if (mMsgType == TYPE_ISMEPOK) {
163: mExchangeEntry = ee = readISMEPOK();
164: ee.setClientConnection(cc);
165: ee.receivedBytes(bytes.length);
166: } else if (mMsgType == TYPE_MEPOK) {
167: mMEPOK = readMEPOK();
168: } else {
169:
170: }
171:
172: //
173: // This may reference a large byte[], so kill the reference ASAP.
174: //
175: mIS = null;
176: } catch (java.io.IOException ioEx) {
177: throw new javax.jbi.messaging.MessagingException(Translator
178: .translate(LocalStringKeys.MEPINPUT_IO_ERROR), ioEx);
179: }
180: return (mMsgType);
181: }
182:
183: void readException() throws javax.jbi.messaging.MessagingException,
184: java.io.IOException {
185: if (mOIS.readByte() == VERSION_1) {
186: //
187: // Read the exchangeId.
188: //
189: mExchangeId = mOIS.readUTF();
190:
191: //
192: // Read the exception.
193: //
194: try {
195: mException = (Exception) mOIS.readObject();
196: } catch (java.lang.ClassNotFoundException cnfEx) {
197: throw new javax.jbi.messaging.MessagingException(
198: Translator
199: .translate(LocalStringKeys.MEPINPUT_IO_ERROR),
200: cnfEx);
201: }
202:
203: }
204:
205: }
206:
207: boolean readMEPOK() throws javax.jbi.messaging.MessagingException,
208: java.io.IOException {
209: if (mOIS.readByte() == VERSION_1) {
210: //
211: // Read the exchangeId.
212: //
213: mExchangeId = mOIS.readUTF();
214:
215: //
216: // Read the exception.
217: //
218: return (mOIS.readBoolean());
219: }
220: return (false);
221: }
222:
223: ExchangeEntry readISMEPOK()
224: throws javax.jbi.messaging.MessagingException,
225: java.io.IOException {
226: ExchangeEntry ee = null;
227:
228: //
229: // Check version #.
230: //
231: if (mOIS.readByte() == VERSION_1) {
232: //
233: // Read the exchangeId.
234: //
235: mExchangeId = mOIS.readUTF();
236:
237: //
238: // Read the proposed exchange.
239: //
240: ee = readExchange();
241: } else {
242:
243: }
244:
245: return (ee);
246:
247: }
248:
249: ExchangeEntry readMEP()
250: throws javax.jbi.messaging.MessagingException,
251: java.io.IOException {
252: ExchangeEntry ee = null;
253:
254: //
255: // Check version #.
256: //
257: if (mOIS.readByte() == VERSION_1) {
258: //
259: // Read the exchange contents.
260: //
261: ee = readExchange();
262: } else {
263:
264: }
265:
266: return (ee);
267: }
268:
269: ExchangeEntry readExchange()
270: throws javax.jbi.messaging.MessagingException,
271: java.io.IOException {
272: ExchangeEntry ee = null;
273: MessageExchange me;
274: String endpoint;
275: QName qname;
276: URI pattern;
277: NormalizedMessage nm;
278:
279: try {
280: //
281: // Read the exchangeId.
282: //
283: mExchangeId = mOIS.readUTF();
284:
285: //
286: // Only read the addressing information when sent (typically just the first time.)
287: //
288: if (mOIS.readBoolean()) {
289: ServiceEndpoint se;
290:
291: pattern = new URI(mOIS.readUTF());
292: me = (com.sun.jbi.messaging.MessageExchange) ((com.sun.jbi.messaging.DeliveryChannel) mPB
293: .getDeliveryChannel()).createExchange(pattern,
294: mExchangeId);
295: qname = readQName();
296: endpoint = mOIS.readUTF();
297: se = ((com.sun.jbi.messaging.DeliveryChannel) mPB
298: .getDeliveryChannel()).createEndpoint(qname,
299: endpoint);
300: if (se == null) {
301: throw new javax.jbi.messaging.MessagingException(
302: Translator
303: .translate(
304: LocalStringKeys.MEPINPUT_SERVICEENDPOINT_NOTFOUND,
305: qname, endpoint));
306: }
307: me.setEndpoint(se);
308: me.setOperation(readQName());
309: ee = mPB.trackExchange(mExchangeId, me, false);
310: ee.setStatus(ExchangeEntry.STATUS_FIRSTSENT);
311: ee.setService(qname);
312: } else {
313: ee = mPB.getExchangeEntry(mExchangeId);
314: if (ee != null) {
315: me = ee.getMessageExchange();
316: } else {
317: throw new javax.jbi.messaging.MessagingException(
318: Translator
319: .translate(
320: LocalStringKeys.MEPINPUT_UNKNOWN_EXCHANGE,
321: mExchangeId));
322:
323: }
324: }
325:
326: if (mOIS.readBoolean()) {
327: me.setStatus(ExchangeStatus.valueOf(mOIS.readUTF()));
328: ee.setStatus(ExchangeEntry.STATUS_STATUSSENT);
329: }
330: if (mOIS.readBoolean()) {
331: me.setError((Exception) mOIS.readObject());
332: ee.setStatus(ExchangeEntry.STATUS_ERRORSENT);
333: }
334: if (mOIS.readBoolean()) {
335: nm = readMessage(me);
336: if (nm != null) {
337: me.setFault((Fault) nm);
338: }
339: ee.setStatus(ExchangeEntry.STATUS_FAULTSENT);
340: }
341: if (mOIS.readBoolean()) {
342: nm = readMessage(me);
343: if (nm != null) {
344: me.setMessage(nm, "in");
345: }
346: ee.setStatus(ExchangeEntry.STATUS_INSENT);
347: }
348: if (mOIS.readBoolean()) {
349: nm = readMessage(me);
350: if (nm != null) {
351: me.setMessage(nm, "out");
352: }
353: ee.setStatus(ExchangeEntry.STATUS_OUTSENT);
354:
355: }
356: if (mOIS.readBoolean()) {
357: readMEProperties(me);
358: me.mergeProperties();
359: }
360: } catch (java.lang.ClassNotFoundException cnfEx) {
361: throw new javax.jbi.messaging.MessagingException(Translator
362: .translate(LocalStringKeys.MEPINPUT_IO_ERROR),
363: cnfEx);
364: } catch (java.net.URISyntaxException usEx) {
365: throw new javax.jbi.messaging.MessagingException(Translator
366: .translate(LocalStringKeys.MEPINPUT_IO_ERROR), usEx);
367: }
368:
369: return (ee);
370: }
371:
372: void readMEProperties(MessageExchange me)
373: throws javax.jbi.messaging.MessagingException {
374: int size;
375: String name = null;
376: Object value;
377:
378: try {
379: for (size = mOIS.readInt(); size > 0; size--) {
380: name = mOIS.readUTF();
381: value = mOIS.readObject();
382: me.setProperty(name, value);
383: }
384: } catch (java.io.IOException ioEx) {
385: throw new javax.jbi.messaging.MessagingException(Translator
386: .translate(LocalStringKeys.MEPINPUT_ME_PROP_ERROR,
387: name), ioEx);
388: } catch (java.lang.ClassNotFoundException cnfEx) {
389: throw new javax.jbi.messaging.MessagingException(Translator
390: .translate(LocalStringKeys.MEPINPUT_ME_PROP_ERROR,
391: name), cnfEx);
392: }
393: }
394:
395: NormalizedMessage readMessage(MessageExchange me)
396: throws java.io.IOException,
397: javax.jbi.messaging.MessagingException {
398: NormalizedMessage m = null;
399:
400: if (mOIS.readBoolean()) {
401: DOMResult dr;
402:
403: if (mOIS.readBoolean()) {
404: m = me.createMessage();
405: } else {
406: m = me.createFault();
407: }
408: readNMProperties(m);
409: try {
410: if (mOIS.readBoolean()) {
411: Source source;
412: String systemId = null;
413:
414: if (mOIS.readBoolean()) {
415: systemId = mOIS.readUTF();
416: }
417:
418: startStream();
419: source = new StreamSource(this );
420: mTransform.transform(source, dr = new DOMResult());
421: endStream();
422: m.setContent(source = new DOMSource(dr.getNode()));
423: if (systemId != null) {
424: source.setSystemId(systemId);
425: }
426: }
427: } catch (javax.xml.transform.TransformerException tEx) {
428: throw new javax.jbi.messaging.MessagingException(tEx);
429: }
430: readAttachments(m);
431: }
432:
433: return (m);
434: }
435:
436: void readAttachments(NormalizedMessage nm)
437: throws javax.jbi.messaging.MessagingException {
438: int size;
439: String name = null;
440: Object value;
441: String contentName;
442: String content;
443: DataHandler attach = null;
444:
445: try {
446: for (size = mOIS.readInt(); size > 0; size--) {
447: name = mOIS.readUTF();
448: contentName = mOIS.readUTF();
449: content = mOIS.readUTF();
450: attach = new DataHandler(new StreamDataSource(
451: contentName, content));
452: startStream();
453: writeTo(attach.getOutputStream());
454: endStream();
455: nm.addAttachment(name, attach);
456: }
457: } catch (java.io.IOException ioEx) {
458: throw new javax.jbi.messaging.MessagingException(Translator
459: .translate(
460: LocalStringKeys.MEPINPUT_ME_ATTACH_ERROR,
461: name), ioEx);
462: } catch (javax.jbi.messaging.MessagingException mEx) {
463: throw new javax.jbi.messaging.MessagingException(Translator
464: .translate(
465: LocalStringKeys.MEPINPUT_ME_ATTACH_ERROR,
466: name), mEx);
467: }
468: }
469:
470: void readNMProperties(NormalizedMessage nm)
471: throws javax.jbi.messaging.MessagingException {
472: int size;
473: String name = null;
474: Object value;
475:
476: try {
477: for (size = mOIS.readInt(); size > 0; size--) {
478: name = mOIS.readUTF();
479: value = mOIS.readObject();
480: nm.setProperty(name, value);
481: }
482: } catch (java.io.IOException ioEx) {
483: throw new javax.jbi.messaging.MessagingException(Translator
484: .translate(LocalStringKeys.MEPINPUT_NM_PROP_ERROR,
485: name), ioEx);
486: } catch (java.lang.ClassNotFoundException cnfEx) {
487: throw new javax.jbi.messaging.MessagingException(Translator
488: .translate(LocalStringKeys.MEPINPUT_NM_PROP_ERROR,
489: name), cnfEx);
490: }
491: }
492:
493: QName readQName() throws java.io.IOException {
494: String namespace;
495: String localPart;
496: String prefix;
497:
498: namespace = mOIS.readUTF();
499: localPart = mOIS.readUTF();
500: prefix = mOIS.readUTF();
501: return (new QName(namespace, localPart, prefix));
502: }
503:
504: //
505: // ------------------- Methods to query about message infomation ------------------
506: //
507:
508: public String getExchangeId() {
509: return (mExchangeId);
510: }
511:
512: public Exception getException() {
513: return (mException);
514: }
515:
516: public ExchangeEntry getExchangeEntry() {
517: return (mExchangeEntry);
518: }
519:
520: public boolean getMEPOk() {
521: return (true);
522: }
523:
524: //
525: // ------------------- Methods that implement InputStream ------------------
526: //
527:
528: public int available() throws java.io.IOException {
529: return (mCount);
530: }
531:
532: public void close() throws java.io.IOException {
533:
534: }
535:
536: public void mark(int limit) {
537:
538: }
539:
540: public boolean markSupported() {
541: return (false);
542: }
543:
544: public int read() throws java.io.IOException {
545: if (mCount == 0) {
546: if (fill() == 0)
547: return (-1);
548: }
549:
550: mCount--;
551: return ((int) (mBytes[mOffset++]) & 0xff);
552: }
553:
554: public int read(byte[] b) throws java.io.IOException {
555: int len = b.length;
556:
557: return (read(b, 0, len));
558: }
559:
560: public int read(byte[] b, int off, int len)
561: throws java.io.IOException {
562: int count = 0;
563:
564: if (mNextStream != mStream) {
565: return (-1);
566: }
567: while (len > 0) {
568: if (mCount == 0) {
569: if (fill() == 0) {
570: if (count == 0) {
571: return (-1);
572: }
573: return (count);
574: }
575: }
576: if (len < mCount) {
577: System.arraycopy(mBytes, mOffset, b, off, len);
578: mCount -= len;
579: mOffset += len;
580: count += len;
581: return (count);
582: }
583: System.arraycopy(mBytes, mOffset, b, off, mCount);
584: off += mCount;
585: count += mCount;
586: len -= mCount;
587: mOffset -= mCount;
588: mCount = 0;
589: }
590:
591: return (count);
592: }
593:
594: public void writeTo(OutputStream os) throws java.io.IOException {
595: if (mNextStream == mStream) {
596: byte[] buffer = new byte[1024];
597: int len;
598:
599: while ((len = read(buffer)) > 0) {
600: os.write(buffer, 0, len);
601: }
602: os.close();
603: return;
604: }
605: throw new java.io.IOException(Translator.translate(
606: LocalStringKeys.MEPINPUT_STREAM_SYNC_ERROR,
607: new Integer(mStream), new Integer(mNextStream)));
608: }
609:
610: public void reset() throws java.io.IOException {
611:
612: }
613:
614: //
615: // ------------------- Internal implementation of InputStream ------------------
616: //
617:
618: private void startStream() throws java.io.IOException {
619: int s = 0;
620:
621: if (mCount == 0) {
622: mStream++;
623: if ((s = read()) == mStream) {
624: return;
625: }
626: }
627: throw new java.io.IOException(Translator.translate(
628: LocalStringKeys.MEPINPUT_START_STREAM_ERROR,
629: new Integer(s), new Integer(mStream)));
630: }
631:
632: private void endStream() throws java.io.IOException {
633: if (mStream != mNextStream) {
634: mStream--;
635: return;
636: }
637: throw new java.io.IOException(Translator.translate(
638: LocalStringKeys.MEPINPUT_END_STREAM_ERROR, new Integer(
639: mStream)));
640: }
641:
642: private int fill() throws java.io.IOException {
643: byte stream, len1, len2;
644:
645: mNextStream = (byte) mIS.read();
646: len1 = (byte) mIS.read();
647: len2 = (byte) mIS.read();
648: mCount = (len1 & 0xff) << 8 | (len2 & 0xff);
649: mOffset = 0;
650: if (mCount == 0) {
651: return (0);
652: }
653: if (mIS.read(mBytes, 0, mCount) != mCount) {
654: throw new java.io.IOException(Translator.translate(
655: LocalStringKeys.MEPINPUT_FILL_STREAM_ERROR,
656: new Integer(mCount)));
657: }
658: if (mLog.isLoggable(java.util.logging.Level.FINE)) {
659: dumpBuffer("MEPInputStream(" + mNextStream + ") Len ("
660: + mCount + ")", mBytes, mCount);
661: }
662: if (mNextStream != mStream) {
663: return (0);
664: }
665: return (mCount);
666: }
667:
668: //
669: // ------------------- Debugging stuff ------------------
670: //
671:
672: static void dumpBuffer(String prefix, byte[] buffer, int count) {
673: StringBuffer sb = new StringBuffer();
674: sb.append(prefix);
675: sb.append("\n ");
676: for (int i = 0;; i++) {
677: int c;
678:
679: if (i < count) {
680: c = (char) buffer[i];
681: sb.append("0123456789ABCDEF".charAt((c >> 4) & 0xf));
682: sb.append("0123456789ABCDEF".charAt(c & 0xf));
683: sb.append(" ");
684: } else {
685: sb.append(" ");
686: }
687: if (i != 0 && (i % 16) == 15) {
688: sb.append("|");
689: for (int j = i - 15; j <= i; j++) {
690: if (j < count) {
691: char cc = (char) buffer[j];
692: if (cc >= ' ' && cc <= '~') {
693: sb.append(cc);
694: } else {
695: sb.append(".");
696: }
697: } else {
698: sb.append(" ");
699: }
700: }
701: sb.append("|\n");
702:
703: if (i + 1 >= count) {
704: break;
705: }
706: sb.append(" ");
707: }
708: }
709: mLog.fine(sb.toString());
710:
711: }
712: }
|