001: /*
002: * <copyright>
003: *
004: * Copyright 1997-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.core.blackboard;
028:
029: import java.io.Externalizable;
030: import java.io.IOException;
031: import java.io.ObjectInput;
032: import java.io.ObjectInputStream;
033: import java.io.ObjectOutput;
034: import java.io.ObjectOutputStream;
035: import java.util.Collection;
036:
037: import org.cougaar.core.agent.ClusterContextTable;
038: import org.cougaar.core.agent.ClusterMessage;
039: import org.cougaar.core.mts.MessageAddress;
040: import org.cougaar.core.persist.PersistenceInputStream;
041: import org.cougaar.core.persist.PersistenceOutputStream;
042: import org.cougaar.util.StringUtility;
043: import org.cougaar.util.log.Logging;
044:
045: /**
046: * A {@link org.cougaar.core.mts.Message} containing {@link Directive}s.
047: */
048: public class DirectiveMessage extends ClusterMessage implements
049: Externalizable {
050: private transient Directive[] directives;
051:
052: /**
053: * This signals that all messages prior to this have been acked.
054: * Used in keep alive messages to detect out-of-sync condition.
055: */
056: private boolean allMessagesAcknowledged = false;
057:
058: public DirectiveMessage() {
059: super ();
060: }
061:
062: /**
063: * constructor that takes multiple directives
064: * @param someDirectives to send
065: */
066: public DirectiveMessage(Directive[] someDirectives) {
067: directives = someDirectives;
068: }
069:
070: /**
071: * constructor that takes source, destination and some directives
072: */
073: public DirectiveMessage(MessageAddress source,
074: MessageAddress destination, long incarnationNumber,
075: Directive[] someDirectives) {
076: super (source, destination, incarnationNumber);
077: directives = someDirectives;
078: }
079:
080: /**
081: * @return the directives in the message
082: */
083: public Directive[] getDirectives() {
084: return directives;
085: }
086:
087: /**
088: * Sets the directives in this message.
089: */
090: public void setDirectives(Directive[] someDirectives) {
091: directives = someDirectives;
092: }
093:
094: public void setAllMessagesAcknowledged(boolean val) {
095: allMessagesAcknowledged = val;
096: }
097:
098: public boolean areAllMessagesAcknowledged() {
099: return allMessagesAcknowledged;
100: }
101:
102: public String toString() {
103: StringBuffer buf = new StringBuffer();
104: buf.append("<DirectiveMessage " + getSource() + " - "
105: + getDestination());
106: if (directives == null) {
107: buf.append("(Null directives)");
108: } else {
109: StringUtility.appendArray(buf, directives);
110: }
111: buf.append(">");
112: return buf.substring(0);
113: }
114:
115: private void withContext(MessageAddress ma, Runnable thunk)
116: throws IOException {
117: try {
118: ClusterContextTable.withMessageContext(ma, getSource(),
119: getDestination(), thunk);
120: } catch (RuntimeException re) {
121: Throwable t = re.getCause();
122: if (t == null) {
123: throw re;
124: } else if (t instanceof IOException) {
125: throw (IOException) t;
126: } else {
127: Logging.getLogger(DirectiveMessage.class).error(
128: "Serialization of " + this
129: + " caught exception", t);
130: throw new IOException("Serialization exception: " + t);
131: }
132: }
133: }
134:
135: /**
136: */
137: private void writeObject(final ObjectOutputStream stream)
138: throws IOException {
139: stream.defaultWriteObject();
140: Runnable thunk = new Runnable() {
141: public void run() {
142: try {
143: stream.writeInt(directives.length);
144: for (int i = 0; i < directives.length; i++) {
145: stream.writeObject(directives[i]);
146: }
147: } catch (Exception e) {
148: throw new RuntimeException("Thunk", e);
149: }
150: }
151: };
152: if (stream instanceof PersistenceOutputStream) {
153: thunk.run();
154: } else {
155: withContext(getSource(), thunk);
156: }
157: }
158:
159: /**
160: * when we deserialize, note the message context with the
161: * ClusterContextTable so that lower-level objects can
162: * reattach to the agent.
163: * @see ClusterContextTable
164: */
165: private void readObject(final ObjectInputStream stream)
166: throws IOException, ClassNotFoundException {
167: stream.defaultReadObject();
168:
169: Runnable thunk = new Runnable() {
170: public void run() {
171: try {
172: directives = new Directive[stream.readInt()];
173: for (int i = 0; i < directives.length; i++) {
174: directives[i] = (Directive) stream.readObject();
175: }
176: } catch (Exception e) {
177: throw new RuntimeException("Thunk", e);
178: }
179: }
180: };
181: if (stream instanceof PersistenceInputStream) {
182: thunk.run();
183: } else {
184: withContext(getDestination(), thunk);
185: }
186: }
187:
188: // Externalizable support
189: /*
190: */
191: public void writeExternal(final ObjectOutput out)
192: throws IOException {
193: super .writeExternal(out); // Message
194:
195: out.writeBoolean(allMessagesAcknowledged);
196:
197: Runnable thunk = new Runnable() {
198: public void run() {
199: try {
200: out.writeInt(directives.length);
201: for (int i = 0; i < directives.length; i++) {
202: out.writeObject(directives[i]);
203: }
204: } catch (Exception e) {
205: throw new RuntimeException("Thunk", e);
206: }
207: }
208: };
209: if (out instanceof PersistenceOutputStream) {
210: thunk.run();
211: } else {
212: withContext(getSource(), thunk);
213: }
214: }
215:
216: /**
217: * when we deserialize, note the message context with the
218: * ClusterContextTable so that lower-level objects can
219: * reattach to the agent.
220: * @see ClusterContextTable
221: */
222: public void readExternal(final ObjectInput in) throws IOException,
223: ClassNotFoundException {
224: super .readExternal(in); // Message
225:
226: allMessagesAcknowledged = in.readBoolean();
227:
228: Runnable thunk = new Runnable() {
229: public void run() {
230: try {
231: directives = new Directive[in.readInt()];
232: for (int i = 0; i < directives.length; i++) {
233: directives[i] = (Directive) in.readObject();
234: }
235: } catch (Exception e) {
236: throw new RuntimeException("Thunk", e);
237: }
238: }
239: };
240: if (in instanceof PersistenceInputStream) {
241: thunk.run();
242: } else {
243: withContext(getDestination(), thunk);
244: }
245: }
246:
247: /**
248: * A {@link Directive} with associated {@link ChangeReport}s.
249: */
250: public static final class DirectiveWithChangeReports implements
251: Directive {
252: private final Directive real;
253: private final Collection changes;
254:
255: public DirectiveWithChangeReports(Directive d, Collection cc) {
256: real = d;
257: changes = cc;
258: }
259:
260: public Directive getDirective() {
261: return real;
262: }
263:
264: public Collection getChangeReports() {
265: return changes;
266: }
267:
268: public MessageAddress getSource() {
269: return real.getSource();
270: }
271:
272: public MessageAddress getDestination() {
273: return real.getDestination();
274: }
275:
276: public String toString() {
277: return real.toString();
278: }
279: }
280:
281: }
|