001: /*
002: * <copyright>
003: *
004: * Copyright 1997-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.mts.std;
028:
029: import java.io.FilterInputStream;
030: import java.io.FilterOutputStream;
031: import java.io.IOException;
032: import java.io.InputStream;
033: import java.io.ObjectInput;
034: import java.io.ObjectInputStream;
035: import java.io.ObjectOutput;
036: import java.io.ObjectOutputStream;
037: import java.io.OutputStream;
038:
039: import org.cougaar.core.mts.MessageAddress;
040: import org.cougaar.core.mts.MessageAttributes;
041: import org.cougaar.mts.base.CommFailureException;
042: import org.cougaar.mts.base.DestinationLink;
043: import org.cougaar.mts.base.DestinationLinkDelegateImplBase;
044: import org.cougaar.mts.base.MessageDeliverer;
045: import org.cougaar.mts.base.MessageDelivererDelegateImplBase;
046: import org.cougaar.mts.base.MessageReader;
047: import org.cougaar.mts.base.MessageReaderDelegateImplBase;
048: import org.cougaar.mts.base.MessageWriter;
049: import org.cougaar.mts.base.MessageWriterDelegateImplBase;
050: import org.cougaar.mts.base.MisdeliveredMessageException;
051: import org.cougaar.mts.base.NameLookupException;
052: import org.cougaar.mts.base.StandardAspect;
053: import org.cougaar.mts.base.UnregisteredNameException;
054:
055: /**
056: * This class provides an example of adding trailers to serialized
057: * {@link AttributedMessage}s. The {@link MessageWriter} computes a checksum
058: * (as a long) and sends the eight bytes after the message content.
059: * The {@link MessageReader} computes its own checksum and compares to
060: * the one that was sent. The CHECKSUM_VALID_ATTR records whether or
061: * not they matched. This attribute is stored in the
062: * AttributedMessage and is available to all receive-side Aspects.
063: */
064:
065: public class ChecksumStreamsAspect extends StandardAspect {
066:
067: private static final String CHECKSUM_ENABLE_ATTR = "org.cougaar.core.security.checksum.enable";
068: private static final String CHECKSUM_VALID_ATTR = "org.cougaar.core.security.checksum.valid";
069:
070: public Object getDelegate(Object delegatee, Class type) {
071: if (type == MessageWriter.class) {
072: MessageWriter wtr = (MessageWriter) delegatee;
073: return new ChecksumMessageWriter(wtr);
074: } else if (type == MessageReader.class) {
075: MessageReader rdr = (MessageReader) delegatee;
076: return new ChecksumMessageReader(rdr);
077: } else if (type == DestinationLink.class) {
078: DestinationLink link = (DestinationLink) delegatee;
079: return new ForceChecksum(link);
080: } else if (type == MessageDeliverer.class) {
081: MessageDeliverer deliverer = (MessageDeliverer) delegatee;
082: return new ViewReceiveAttribute(deliverer);
083: } else {
084: return null;
085: }
086: }
087:
088: // Raw i/o of a long. Not used anymore
089: // void writeLong(OutputStream stream, long x) throws IOException {
090: // byte[] bytes = new byte[8];
091: // for (int i=7; i>=0; i--) {
092: // bytes[i] = (byte) (x & 0xFF);
093: // x = x >>> 8;
094: // }
095: // stream.write(bytes);
096: // }
097:
098: // long readLong(InputStream stream) throws IOException {
099:
100: // byte[] bytes = new byte[8];
101: // int count = stream.read(bytes);
102: // long result = 0;
103: // for (int i=0; i<8; i++) {
104: // result = result << 8;
105: // // watch out for sign-extension
106: // result = result | (((long) bytes[i]) & 0xFF);
107: // }
108: // return result;
109:
110: // }
111:
112: private class ViewReceiveAttribute extends
113: MessageDelivererDelegateImplBase {
114: ViewReceiveAttribute(MessageDeliverer delegatee) {
115: super (delegatee);
116: }
117:
118: public MessageAttributes deliverMessage(
119: AttributedMessage message, MessageAddress dest)
120: throws MisdeliveredMessageException {
121:
122: System.out.println("Message Checksum Valid = "
123: + message.getAttribute(CHECKSUM_VALID_ATTR));
124:
125: return super .deliverMessage(message, dest);
126: }
127:
128: }
129:
130: private class ForceChecksum extends DestinationLinkDelegateImplBase {
131: ForceChecksum(DestinationLink delegatee) {
132: super (delegatee);
133: }
134:
135: public MessageAttributes forwardMessage(
136: AttributedMessage message)
137: throws UnregisteredNameException, NameLookupException,
138: CommFailureException, MisdeliveredMessageException {
139: // Register checksum Aspect as a Message Streaming filter
140: message.addFilter(ChecksumStreamsAspect.this );
141: // Force on checksum
142: message.setAttribute(CHECKSUM_ENABLE_ATTR, Boolean.TRUE);
143:
144: return super .forwardMessage(message);
145: }
146:
147: }
148:
149: private class ChecksumMessageWriter extends
150: MessageWriterDelegateImplBase {
151: private ObjectOutputStream stream;
152: private long checksum = 0;
153:
154: private class ChecksumOutputStream extends FilterOutputStream {
155:
156: private ChecksumOutputStream(OutputStream wrapped) {
157: super (wrapped);
158: }
159:
160: public void write(int b) throws IOException {
161: super .write(b);
162: checksum += b;
163: }
164:
165: public void write(byte[] b, int off, int len)
166: throws java.io.IOException {
167: out.write(b, off, len);
168: int end = Math.min(off + len, b.length);
169: for (int i = off; i < end; i++)
170: checksum += b[i];
171: }
172:
173: public void write(byte[] b) throws java.io.IOException {
174: out.write(b);
175: int end = b.length;
176: for (int i = 0; i < end; i++)
177: checksum += b[i];
178: }
179:
180: }
181:
182: ChecksumMessageWriter(MessageWriter delegatee) {
183: super (delegatee);
184: }
185:
186: public OutputStream getObjectOutputStream(ObjectOutput out)
187: throws java.io.IOException {
188: OutputStream raw_os = super .getObjectOutputStream(out);
189: stream = new ObjectOutputStream(new ChecksumOutputStream(
190: raw_os));
191: return stream;
192: }
193:
194: public void finishOutput() throws java.io.IOException {
195: //Send the Checksum as a tailer
196: try {
197: // writeLong(stream, checksum);
198: stream.writeObject(new Long(checksum));
199: } catch (java.io.IOException iox) {
200: throw iox;
201: }
202: System.err.println("Checksum output finished");
203: super .finishOutput();
204:
205: }
206: }
207:
208: private class ChecksumMessageReader extends
209: MessageReaderDelegateImplBase {
210: ObjectInputStream stream;
211: AttributedMessage msg;
212: private long checksum = 0;
213:
214: private class ChecksumInputStream extends FilterInputStream {
215:
216: private ChecksumInputStream(InputStream wrapped) {
217: super (wrapped);
218: }
219:
220: public int read() throws IOException {
221: int b = in.read();
222: checksum += b;
223: return b;
224: }
225:
226: public int read(byte[] b, int off, int len)
227: throws IOException {
228: int count = in.read(b, off, len);
229:
230: // Even though these are bytes rather than ints (as in
231: // read()), we don't need to worry about sign
232: // extension, presumably because the ints as written
233: // are also sign extended. If true, this should
234: // ensure that the reader and writer compute the same
235: // checksum. Otherwise, add code for sign extension,
236: // but do it everywhere, not just here.
237: for (int i = 0; i < count; i++)
238: checksum += b[i + off];
239:
240: return count;
241: }
242:
243: public int read(byte[] b) throws IOException {
244: int count = in.read(b);
245: for (int i = 0; i < count; i++)
246: checksum += b[i];
247:
248: return count;
249: }
250:
251: }
252:
253: ChecksumMessageReader(MessageReader delegatee) {
254: super (delegatee);
255: }
256:
257: public void finalizeAttributes(AttributedMessage msg) {
258: super .finalizeAttributes(msg);
259: this .msg = msg;
260: }
261:
262: public InputStream getObjectInputStream(ObjectInput in)
263: throws java.io.IOException, ClassNotFoundException {
264: InputStream raw_is = super .getObjectInputStream(in);
265: stream = new ObjectInputStream(new ChecksumInputStream(
266: raw_is));
267: return stream;
268: }
269:
270: public void finishInput() throws java.io.IOException {
271: System.err
272: .println("Entering ChecksumStreamsAspect finishInput");
273: // The tailer itself wasn't included in the checksum
274: // computed by the sender. So grab the computed value
275: // before reading the remote value.
276: long sum = checksum;
277: long checksum_as_read = 0; // readLong(stream);
278: try {
279: checksum_as_read = ((Long) stream.readObject())
280: .longValue();
281: } catch (ClassNotFoundException cnf) {
282: cnf.printStackTrace();
283: }
284: System.out.println("Read checksum=" + checksum_as_read
285: + " Computed checksum=" + sum);
286:
287: // added Checksum validity to message attributes
288: if (checksum_as_read == sum)
289: msg.setAttribute(CHECKSUM_VALID_ATTR, Boolean.TRUE);
290: else
291: msg.setAttribute(CHECKSUM_VALID_ATTR, Boolean.FALSE);
292:
293: super.finishInput();
294:
295: }
296: }
297:
298: }
|