001: /*
002: * <copyright>
003: *
004: * Copyright 1997-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.mts.std;
028:
029: import java.io.ByteArrayInputStream;
030: import java.io.ByteArrayOutputStream;
031: import java.io.Externalizable;
032: import java.io.InputStream;
033: import java.io.ObjectInput;
034: import java.io.ObjectInputStream;
035: import java.io.ObjectOutput;
036: import java.io.ObjectOutputStream;
037: import java.io.OutputStream;
038: import java.security.GeneralSecurityException;
039: import java.util.ArrayList;
040:
041: import org.cougaar.core.mts.Attributes;
042: import org.cougaar.core.mts.AttributeConstants;
043: import org.cougaar.core.mts.Message;
044: import org.cougaar.core.mts.MessageAddress;
045: import org.cougaar.core.mts.MessageAttributes;
046: import org.cougaar.core.mts.SimpleMessageAttributes;
047: import org.cougaar.core.service.MessageProtectionService;
048: import org.cougaar.util.log.Logger;
049: import org.cougaar.util.log.Logging;
050:
051: import org.cougaar.mts.base.MessageReader;
052: import org.cougaar.mts.base.MessageWriter;
053: import org.cougaar.mts.base.MessageSecurityException;
054: import org.cougaar.mts.base.MessageSerializationException;
055: import org.cougaar.mts.base.MessageStreamsFactory;
056:
057: /**
058: * An AttributedMessage is a Message with metadata, the latter
059: * represented as HashMap with String keys. When a Message enters the
060: * MTS it will be wrapped in an AttributedMessage, passed through the
061: * MTS in that form, and then unwrapped and delivered at the point of
062: * final delivery. AttributedMessages should not be construed as
063: * envelopes. In particular, it's not appropriate to introduce
064: * further levels of 'wrapping' beyond the initial one.
065: */
066: public class AttributedMessage extends Message implements
067: Externalizable, MessageAttributes, AttributeConstants {
068:
069: private transient Logger logger = Logging.getLogger(getClass()
070: .getName());
071:
072: private String FILTERS_ATTRIBUTE = "Filters";
073:
074: private Message contents;
075: private MessageAttributes attributes;
076:
077: // We control the externalization, so the 'transient' tag here is
078: // really just documentation.
079: private transient MessageAttributes snapshot;
080:
081: /**
082: * Only invoked by server-side RMI when it's creating one of these
083: * to correspond to one that was sent as data. */
084: public AttributedMessage() {
085: }
086:
087: /**
088: * Make an AttributedMessage whose content is the given message
089: * and whose source and destination are those of the contents.
090: * The resulting AttributedMessage will have no attributes.
091: */
092: public AttributedMessage(Message contents) {
093: super (contents.getOriginator(), contents.getTarget());
094: this .contents = contents;
095: attributes = new SimpleMessageAttributes();
096: attributes.setAttribute(FILTERS_ATTRIBUTE, new ArrayList());
097: }
098:
099: public AttributedMessage(Message contents,
100: MessageAttributes initialAttributes) {
101: super (contents == null ? null : contents.getOriginator(),
102: contents == null ? null : contents.getTarget());
103: this .contents = contents;
104: if (initialAttributes != null)
105: attributes = (MessageAttributes) initialAttributes
106: .cloneAttributes();
107: else
108: attributes = new SimpleMessageAttributes();
109: attributes.setAttribute(FILTERS_ATTRIBUTE, new ArrayList());
110: }
111:
112: /**
113: * Make an AttributedMessage whose content, source and destination
114: * are copied from the argument, and whose initial set of
115: * attributes is a snapshot of the argument's current set of
116: * attributes.
117: */
118: public AttributedMessage(AttributedMessage msg) {
119: super (msg.getOriginator(), msg.getTarget());
120: this .contents = msg.contents;
121: attributes = (MessageAttributes) msg.attributes
122: .cloneAttributes();
123: }
124:
125: // Should only be used by MessageReply. The second argument is
126: // only there to distinguish the constructor signature. It's not
127: // used for anything. Since this is a reply, flip the addresses.
128: public AttributedMessage(AttributedMessage source, Class msgClass) {
129: super (source.getTarget(), source.getOriginator());
130: this .contents = null;
131: attributes = (MessageAttributes) source.attributes
132: .cloneAttributes();
133: }
134:
135: /**
136: * Make an AttributedMessage whose content, source and destination
137: * are copied from the first argument, and whose initial set of
138: * attributes is a snapshot of the second argument's current set
139: * of attributes.
140: */
141: public AttributedMessage(Message contents,
142: AttributedMessage initialAttributes) {
143: super (contents == null ? null : contents.getOriginator(),
144: contents == null ? null : contents.getTarget());
145: this .contents = contents;
146: attributes = (MessageAttributes) initialAttributes.attributes
147: .cloneAttributes();
148: }
149:
150: public synchronized void snapshotAttributes() {
151: snapshot = (MessageAttributes) attributes.cloneAttributes();
152: }
153:
154: public synchronized void restoreSnapshot() {
155: if (snapshot != null)
156: attributes = (MessageAttributes) snapshot.cloneAttributes();
157: }
158:
159: // aspect was previously typed StandardAspect, but this presumes
160: // that we are using mtsstd.
161: public void addFilter(Object aspect) {
162: String name = aspect.getClass().getName();
163: if (logger.isDebugEnabled()) {
164: Object old = getAttribute(FILTERS_ATTRIBUTE);
165: if (old != null) {
166: if (old instanceof ArrayList) {
167: ArrayList list = (ArrayList) old;
168: if (list.contains(name))
169: logger.debug("Duplicated filter " + name);
170: } else {
171: logger.debug("Filters attribute is not a list!");
172: }
173: }
174: }
175: pushValue(FILTERS_ATTRIBUTE, name);
176:
177: }
178:
179: protected boolean replyOnly() {
180: return false;
181: }
182:
183: /**
184: * Returns the raw (unattributed) message.
185: */
186: public Message getRawMessage() {
187: return contents;
188: }
189:
190: // MessageAttributes interface
191: // Delegate all calls
192:
193: public String getAttributesAsString() {
194: return attributes.getAttributesAsString();
195: }
196:
197: public Attributes cloneAttributes() {
198: return attributes.cloneAttributes();
199: }
200:
201: public Object getAttribute(String attribute) {
202: return attributes.getAttribute(attribute);
203: }
204:
205: public void setAttribute(String attribute, Object value) {
206: attributes.setAttribute(attribute, value);
207: }
208:
209: public void removeAttribute(String attribute) {
210: attributes.removeAttribute(attribute);
211: }
212:
213: public void addValue(String attribute, Object value) {
214: attributes.addValue(attribute, value);
215: }
216:
217: public void pushValue(String attribute, Object value) {
218: attributes.pushValue(attribute, value);
219: }
220:
221: public void removeValue(String attribute, Object value) {
222: attributes.removeValue(attribute, value);
223: }
224:
225: public void setLocalAttribute(String attribute, Object value) {
226: attributes.setLocalAttribute(attribute, value);
227: }
228:
229: public void removeLocalAttribute(String attribute) {
230: attributes.removeLocalAttribute(attribute);
231: }
232:
233: public void addLocalValue(String attribute, Object value) {
234: attributes.addLocalValue(attribute, value);
235: }
236:
237: public void pushLocalValue(String attribute, Object value) {
238: attributes.pushLocalValue(attribute, value);
239: }
240:
241: public void removeLocalValue(String attribute, Object value) {
242: attributes.removeLocalValue(attribute, value);
243: }
244:
245: public void clearAttributes() {
246: attributes.clearAttributes();
247: }
248:
249: public void mergeAttributes(Attributes attributes) {
250: this .attributes.mergeAttributes(attributes);
251: }
252:
253: private byte[] serializeObject(Object object) {
254: ByteArrayOutputStream bos = new ByteArrayOutputStream();
255: try {
256: ObjectOutputStream oos = new ObjectOutputStream(bos);
257: oos.writeObject(object);
258: oos.close();
259: } catch (java.io.IOException ex) {
260: logger.error(null, ex);
261: }
262: return bos.toByteArray();
263: }
264:
265: private Object deserializeObject(byte[] bytes) {
266: Object result = null;
267: try {
268: ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
269: ObjectInputStream ois = new ObjectInputStream(bis);
270: result = ois.readObject();
271: ois.close();
272: } catch (Exception ex) {
273: logger.error(null, ex);
274: }
275:
276: return result;
277: }
278:
279: private void sendAttributes(ObjectOutput out)
280: throws java.io.IOException, GeneralSecurityException {
281: MessageProtectionService svc = MessageProtectionAspect
282: .getMessageProtectionService();
283: byte[] bytes = null;
284: if (svc != null) {
285: bytes = svc.protectHeader(attributes, getOriginator(),
286: getTarget());
287: } else {
288: bytes = serializeObject(attributes);
289: }
290: out.writeObject(bytes);
291: // save HeaderLength as local attribute (not sent remotely)
292: attributes.setLocalAttribute(HEADER_BYTES_ATTRIBUTE,
293: new Integer(bytes.length));
294: }
295:
296: private void readAttributes(ObjectInput in)
297: throws java.io.IOException, GeneralSecurityException,
298: ClassNotFoundException {
299: MessageProtectionService svc = MessageProtectionAspect
300: .getMessageProtectionService();
301: byte[] rawData = (byte[]) in.readObject();
302: if (svc != null) {
303: attributes = svc.unprotectHeader(rawData, getOriginator(),
304: getTarget());
305: } else {
306: attributes = (SimpleMessageAttributes) deserializeObject(rawData);
307: }
308: attributes.setLocalAttribute(HEADER_BYTES_ATTRIBUTE,
309: new Integer(rawData.length));
310: }
311:
312: // Externalizable interface
313:
314: /**
315: * First, write special metadata directly to the output stream.
316: * Currently the only special metadata is the list of Aspect
317: * classes for the filtering streams. Next, generate the nested
318: * filtering streams, using those Aspects. Next, allow the Aspect
319: * delegates to perform any preprocessing they might need to do.
320: * Next, write the raw message and the raw attributes through the
321: * nested filtering streams. Finally, allow the Attributes to do
322: * postprocessing.
323: */
324: public void writeExternal(ObjectOutput rawOut)
325: throws java.io.IOException {
326: try {
327: // Source and Destination MUST be in the clear
328: rawOut.writeObject(getOriginator());
329: rawOut.writeObject(getTarget());
330:
331: MessageStreamsFactory factory = MessageStreamsFactory
332: .getFactory();
333: ArrayList aspectNames = (ArrayList) attributes
334: .getAttribute(FILTERS_ATTRIBUTE);
335: MessageWriter writer = factory
336: .getMessageWriter(aspectNames);
337:
338: writer.finalizeAttributes(this );
339:
340: sendAttributes(rawOut);
341:
342: if (replyOnly())
343: return;
344:
345: writer.preProcess();
346:
347: OutputStream out = writer.getObjectOutputStream(rawOut);
348: ObjectOutputStream object_out = null;
349: // 'out' should be an ObjectOutputStream but might just be an
350: // OutputStream. In the latter case, wrap it here.
351: if (out instanceof ObjectOutputStream)
352: object_out = (ObjectOutputStream) out;
353: else
354: object_out = new ObjectOutputStream(out);
355:
356: object_out.writeObject(contents);
357:
358: writer.finishOutput();
359: writer.postProcess();
360: } catch (java.io.NotSerializableException ex1) {
361: throw new MessageSerializationException(ex1);
362: } catch (GeneralSecurityException ex2) {
363: throw new MessageSecurityException(ex2);
364: }
365: }
366:
367: /**
368: * First, read special metadata directly from the output stream.
369: * Currently the only special metadata is the list of Aspect
370: * classes for the filtering streams. Next, generate the nested
371: * filtering streams, using those Aspects. Next, allow the Aspect
372: * delegates to perform any preprocessing they might need to do.
373: * Next, read the raw message and the raw attributes through the
374: * nested filtering streams. Finally, allow the Attributes to do
375: * postprocessing.
376: */
377: public void readExternal(ObjectInput rawIn)
378: throws java.io.IOException, ClassNotFoundException {
379: try {
380: setOriginator((MessageAddress) rawIn.readObject());
381: setTarget((MessageAddress) rawIn.readObject());
382:
383: readAttributes(rawIn);
384:
385: if (replyOnly())
386: return;
387:
388: MessageStreamsFactory factory = MessageStreamsFactory
389: .getFactory();
390: ArrayList aspectNames = (ArrayList) attributes
391: .getAttribute(FILTERS_ATTRIBUTE);
392: MessageReader reader = factory
393: .getMessageReader(aspectNames);
394:
395: reader.finalizeAttributes(this );
396:
397: reader.preProcess();
398: InputStream in = reader.getObjectInputStream(rawIn);
399: ObjectInputStream object_in = null;
400: if (in instanceof ObjectInputStream)
401: object_in = (ObjectInputStream) in;
402: else
403: object_in = new ObjectInputStream(in);
404:
405: contents = (Message) object_in.readObject();
406:
407: reader.finishInput();
408: reader.postProcess();
409: } catch (java.io.NotSerializableException ex1) {
410: throwDelayedException(new MessageSerializationException(ex1));
411: } catch (GeneralSecurityException ex2) {
412: throwDelayedException(new MessageSecurityException(ex2));
413: }
414: }
415:
416: private void throwDelayedException(java.io.IOException ex)
417: throws java.io.IOException {
418: if (logger != null) {
419: MessageAddress src = getOriginator();
420: MessageAddress dst = getTarget();
421: String msg = "Receiver Exception " + src + "->" + dst;
422: logger.error(msg, ex);
423: }
424:
425: // There's a problem here. If we throw the exception
426: // right away, the sender might still be streaming the
427: // data. In that case it will see a SocketClosed error,
428: // which it won't recognize as a security exception, and
429: // it will retry the send. There's no good solution to
430: // this, so use a bad solution: give the sender a second
431: // to get the rest of the data out. If that's not long
432: // enough, we lose. In addition, preserialized
433: // notification will be delayed for a second for no
434: // reason. Bad,
435: try {
436: Thread.sleep(1000);
437: } catch (InterruptedException xxx) {
438: }
439:
440: throw ex;
441: }
442:
443: String logString() {
444: return "<From: " + getOriginator().getAddress() + " To: "
445: + getTarget().getAddress() + " Hash: " + hashCode()
446: + " Id: " + getContentsId() + ">";
447: }
448:
449: public String toString() {
450: String s = getClass().getName();
451: s = s.substring(s.lastIndexOf('.') + 1);
452: return "(" + s + " from=" + getOriginator().getAddress()
453: + " to=" + getTarget().getAddress() + " content="
454: + contents + " attributes=" + attributes + ")";
455: }
456: }
|