001: /*
002: * <copyright>
003: *
004: * Copyright 1997-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.mts.std;
028:
029: import java.io.ByteArrayInputStream;
030: import java.io.ByteArrayOutputStream;
031: import java.io.InputStream;
032: import java.io.ObjectInput;
033: import java.io.ObjectInputStream;
034: import java.io.ObjectOutput;
035: import java.io.ObjectOutputStream;
036: import java.io.OutputStream;
037:
038: import org.cougaar.core.mts.MessageAttributes;
039: import org.cougaar.mts.base.CommFailureException;
040: import org.cougaar.mts.base.DestinationLink;
041: import org.cougaar.mts.base.DestinationLinkDelegateImplBase;
042: import org.cougaar.mts.base.MessageReader;
043: import org.cougaar.mts.base.MessageReaderDelegateImplBase;
044: import org.cougaar.mts.base.MessageWriter;
045: import org.cougaar.mts.base.MessageWriterDelegateImplBase;
046: import org.cougaar.mts.base.MisdeliveredMessageException;
047: import org.cougaar.mts.base.NameLookupException;
048: import org.cougaar.mts.base.RPCLinkProtocol;
049: import org.cougaar.mts.base.StandardAspect;
050: import org.cougaar.mts.base.UnregisteredNameException;
051:
052: /**
053: * This test Aspect preserializes messages into a byte array, sends
054: * the array instead of the message over the RMI stream, and then
055: * deserializes on the other end.
056: */
057: public class PreserializingStreamsAspect extends StandardAspect {
058:
059: // Return delegates for MessageReader, MessageWriter and
060: // DestinationLink.
061: public Object getDelegate(Object delegatee, Class type) {
062: if (type == MessageWriter.class) {
063: MessageWriter wtr = (MessageWriter) delegatee;
064: return new PSMessageWriter(wtr);
065: } else if (type == MessageReader.class) {
066: MessageReader rdr = (MessageReader) delegatee;
067: return new PSMessageReader(rdr);
068: } else if (type == DestinationLink.class) {
069: DestinationLink link = (DestinationLink) delegatee;
070: // Only RPC is relevant here
071: Class cls = link.getProtocolClass();
072: if (RPCLinkProtocol.class.isAssignableFrom(cls))
073: return new PSDestinationLink(link);
074: }
075:
076: return null;
077: }
078:
079: // The DestinationLink delegate
080: private class PSDestinationLink extends
081: DestinationLinkDelegateImplBase {
082: PSDestinationLink(DestinationLink delegatee) {
083: super (delegatee);
084: }
085:
086: public MessageAttributes forwardMessage(
087: AttributedMessage message) throws NameLookupException,
088: UnregisteredNameException, CommFailureException,
089: MisdeliveredMessageException {
090: // Register Aspect as a Message Streaming filter
091: message.addFilter(PreserializingStreamsAspect.this );
092:
093: return super .forwardMessage(message);
094: }
095: }
096:
097: private class PSMessageWriter extends MessageWriterDelegateImplBase {
098:
099: private ByteArrayOutputStream byte_os;
100: private OutputStream next;
101: private byte[] bytes;
102:
103: PSMessageWriter(MessageWriter delegatee) {
104: super (delegatee);
105: }
106:
107: // Cache the next stream in the chain and return a standalone
108: // ByteArrayOutputStream. Nothing downstream will see any
109: // data at all until the byte-stream is closed at
110: // finishOutput.
111: public OutputStream getObjectOutputStream(ObjectOutput out)
112: throws java.io.IOException {
113: next = super .getObjectOutputStream(out);
114: byte_os = new ByteArrayOutputStream();
115: return byte_os;
116: }
117:
118: // Done writing to the ByteArrayOutputStream. Extract the
119: // byte array and write it to the next filter.
120: public void finishOutput() throws java.io.IOException {
121: byte_os.flush();
122: byte_os.close();
123: bytes = byte_os.toByteArray();
124:
125: System.err.println("Preserialized " + bytes.length
126: + " bytes");
127:
128: ObjectOutputStream object_out = null;
129: // 'out' should be an ObjectOutputStream but might just be an
130: // OutputStream. In the latter case, wrap it here.
131: if (next instanceof ObjectOutputStream)
132: object_out = (ObjectOutputStream) next;
133: else
134: object_out = new ObjectOutputStream(next);
135:
136: object_out.writeObject(bytes);
137: super .finishOutput();
138: }
139:
140: }
141:
142: // MessageReader delegate.
143: private class PSMessageReader extends MessageReaderDelegateImplBase {
144:
145: PSMessageReader(MessageReader delegatee) {
146: super (delegatee);
147: }
148:
149: // At this point we should get a byte array from the next
150: // stream in the chain. Make a ByteArrayInputStream out of
151: // it. Earlier filters will be reading from that.
152: public InputStream getObjectInputStream(ObjectInput in)
153: throws java.io.IOException, ClassNotFoundException {
154: InputStream raw_is = super .getObjectInputStream(in);
155: ObjectInputStream object_in = null;
156: if (raw_is instanceof ObjectInputStream)
157: object_in = (ObjectInputStream) raw_is;
158: else
159: object_in = new ObjectInputStream(raw_is);
160: byte[] bytes = (byte[]) object_in.readObject();
161:
162: // for (int i=0; i<bytes.length; i++) {
163: // System.out.print(' ');
164: // System.out.print(Integer.toHexString(0xFF & bytes[i]));
165: // }
166: // System.out.println("");
167:
168: System.err.println("Read " + bytes.length + " bytes");
169:
170: return new ByteArrayInputStream(bytes);
171: }
172: }
173:
174: }
|