001: /*
002: * <copyright>
003: *
004: * Copyright 1997-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.core.persist;
028:
029: import java.io.FilterInputStream;
030: import java.io.IOException;
031: import java.io.ObjectInputStream;
032: import java.io.ObjectStreamClass;
033: import java.lang.reflect.InvocationTargetException;
034: import java.lang.reflect.Method;
035:
036: import org.cougaar.core.mts.MessageAddress;
037: import org.cougaar.util.log.Logger;
038:
039: /**
040: * Read persisted objects from a stream. Detects objects that have
041: * been wrapped in a PersistenceAssociation and resolves those to the
042: * current definition of the object. The first occurence of any object
043: * must be inside a PersistenceAssociation. This defining instance is
044: * stored in the identityTable. Thereafter, its values are updated
045: * from later versions of the same object.
046: */
047: public class PersistenceInputStream extends ObjectInputStream implements
048: PersistenceStream {
049: private Logger logger;
050:
051: public MessageAddress getOriginator() {
052: return null;
053: }
054:
055: public MessageAddress getTarget() {
056: return null;
057: }
058:
059: public void close() throws IOException {
060: super .close();
061: }
062:
063: /**
064: * The array of object references that are expected during the
065: * decoding of an object.
066: */
067: private PersistenceReference[] references;
068:
069: /**
070: * Steps through the references during decoding.
071: */
072: private int nextReadIndex;
073:
074: /**
075: * InputStream implementation that extracts a segment of bytes from
076: * an ObjectInputStream. This is the input counterpart of the
077: * PersistenceOutputStream.writeBytes
078: */
079: private static class Substream extends FilterInputStream {
080: private int bytes;
081:
082: public Substream(ObjectInputStream ois) throws IOException {
083: super (ois);
084: bytes = ois.readInt();
085: }
086:
087: public int read(byte[] buf) throws IOException {
088: return read(buf, 0, buf.length);
089: }
090:
091: public int read() throws IOException {
092: if (bytes == 0)
093: return -1;
094: bytes--;
095: return super .read();
096: }
097:
098: public int read(byte[] buf, int offset, int nb)
099: throws IOException {
100: if (nb > bytes)
101: nb = bytes;
102: nb = super .read(buf, offset, nb);
103: if (nb >= 0)
104: bytes -= nb;
105: return nb;
106: }
107: }
108:
109: /**
110: * Construct from the object stream
111: * @param ois ObjectInputStream
112: * @param logger Logger to use for progress
113: */
114: public PersistenceInputStream(ObjectInputStream ois, Logger logger)
115: throws IOException {
116: super (new Substream(ois));
117: enableResolveObject(true);
118: this .logger = logger;
119: }
120:
121: static void checkSuperclass() {
122: try {
123: PersistenceInputStream.class.getSuperclass()
124: .getDeclaredMethod("newInstanceFromDesc",
125: new Class[] { ObjectStreamClass.class });
126: } catch (Exception e) {
127: System.err.println("Fatal error " + e.toString());
128: System.err
129: .println("Incorrect boot class path does not contain modified java/io/ObjectInputStream.class");
130: System.err.println("class loader is "
131: + PersistenceInputStream.class.getClassLoader()
132: .getClass().getName());
133: System.exit(13);
134: }
135: }
136:
137: /**
138: * Read the association for one object. This is the inverse of
139: * PersistenceOutputStream.writeAssociation. The active state of the
140: * PersistenceAssociation is set according to whether it was active
141: * when the persistence delta was generated.
142: * @param references the array of references for objects that were
143: * written when this association was written. This allows us to know
144: * in advance the identity of each object as it is read.
145: */
146: public PersistenceAssociation readAssociation(
147: PersistenceReference[] references) throws IOException,
148: ClassNotFoundException {
149: this .references = references;
150: nextReadIndex = 0;
151: int active = readInt();
152: PersistenceIdentity clientId = (PersistenceIdentity) readObject();
153: Object object = readObject();
154: if (object == null) {
155: String msg = "Rehydrated object is null. nextReadIndex is "
156: + nextReadIndex + "/" + references.length;
157: logger.error(msg);
158: return null;
159: }
160: if (object instanceof ActivePersistenceObject) {
161: ((ActivePersistenceObject) object).checkRehydration(logger);
162: }
163: this .references = null;
164: PersistenceAssociation pAssoc = identityTable.find(object);
165: if (pAssoc == null) {
166: logger.error("Null PersistenceAssociation found for "
167: + object.getClass().getName() + ": " + object);
168: } else {
169: pAssoc.setActive(active);
170: pAssoc.setClientId(clientId);
171: }
172: if (logger.isDetailEnabled())
173: logger.detail("read association " + pAssoc);
174: return pAssoc;
175: }
176:
177: // Use reflection to avoid calling super.newInstanceFromDesc. Don't want to
178: // force installation of javaiopatch.jar for compilation if persistence not
179: // involved.
180: private static Method _ano = null;
181: private static Object _anoLock = new Object();
182:
183: private static Object callNewInstanceFromDesc(
184: ObjectInputStream stream, ObjectStreamClass desc)
185: throws InstantiationException, IllegalAccessException {
186: Method m;
187: synchronized (_anoLock) {
188: if ((m = _ano) == null) {
189: try {
190: Class c = ObjectInputStream.class;
191: Class[] argp = new Class[] { ObjectStreamClass.class };
192: m = c.getDeclaredMethod("real_newInstanceFromDesc",
193: argp);
194: _ano = m;
195: } catch (Exception e) {
196: e.printStackTrace();
197: System.err
198: .println("javaiopatch is not installed properly!");
199: throw new RuntimeException(
200: "javaiopatch not installed");
201: }
202: }
203: }
204: try {
205: Object[] args = new Object[] { desc };
206: return m.invoke(stream, args);
207: } catch (Exception e) {
208: e.printStackTrace();
209: if (e instanceof InvocationTargetException) {
210: Throwable t = ((InvocationTargetException) e)
211: .getTargetException();
212: if (t instanceof RuntimeException) {
213: throw (RuntimeException) t;
214: } else if (t instanceof InstantiationException) {
215: throw (InstantiationException) t;
216: } else if (t instanceof IllegalAccessException) {
217: throw (IllegalAccessException) t;
218: }
219: }
220: throw new RuntimeException("javaiopatch not installed");
221: }
222: }
223:
224: /**
225: * Allocate an object to be filled in from the serialized
226: * stream. This is a hook provided by the ObjectInputStream class
227: * for obtaining an object whose fields can be filled in. Normally,
228: * this returns a brand new object, but during rehydration we need
229: * to update the values of objects that already exist so we override
230: * this method and return existing objects corresponding to the
231: * reference ids we expect to encounter.
232: * @param desc description of class to create
233: * @return the object to be filled in.
234: */
235: protected Object newInstanceFromDesc(ObjectStreamClass desc)
236: throws InstantiationException, IllegalAccessException,
237: java.lang.reflect.InvocationTargetException {
238: Class clazz = desc.forClass();
239: if (references != null && clazz != PersistenceReference.class
240: && !clazz.isArray() && clazz != String.class) {
241: PersistenceReference reference = references[nextReadIndex++];
242: if (reference != null) {
243: PersistenceAssociation pAssoc = identityTable
244: .get(reference);
245: if (pAssoc == null) {
246: Object object = callNewInstanceFromDesc(this , desc);
247: pAssoc = identityTable.create(object, reference);
248: if (logger.isDetailEnabled())
249: logger.detail("Allocating "
250: + (nextReadIndex - 1)
251: + " "
252: + PersistenceServiceComponent
253: .getObjectName(object) + " @ "
254: + reference);
255: return object;
256: }
257: Object result = pAssoc.getObject();
258: if (result == null)
259: throw new InstantiationException("no object @ "
260: + reference);
261: if (result.getClass() != clazz)
262: throw new InstantiationException("wrong object @ "
263: + reference);
264: if (logger.isDetailEnabled())
265: logger.detail("Overwriting "
266: + (nextReadIndex - 1)
267: + " "
268: + PersistenceServiceComponent
269: .getObjectName(result) + " @ "
270: + reference);
271: return result;
272: } else {
273: Object result = callNewInstanceFromDesc(this , desc);
274: if (logger.isDetailEnabled())
275: logger.detail("Allocating "
276: + (nextReadIndex - 1)
277: + " "
278: + PersistenceServiceComponent
279: .getObjectName(result));
280: return result;
281: }
282: }
283: Object result = callNewInstanceFromDesc(this , desc);
284: if (logger.isDetailEnabled())
285: logger
286: .detail("Allocating "
287: + PersistenceServiceComponent
288: .getObjectName(result));
289: return result;
290: }
291:
292: /**
293: * Resolve an object just read from the stream into the actual
294: * result object. We replace PersistenceReference objects with the
295: * object to which they refer.
296: * @param o the object to resolve.
297: * @return the replacement.
298: */
299: protected Object resolveObject(Object o) throws IOException {
300: if (o instanceof PersistenceReference) {
301: PersistenceReference pRef = (PersistenceReference) o;
302: PersistenceAssociation pAssoc = identityTable.get(pRef);
303: if (pAssoc == null) {
304: logger.error("Reference to non-existent object id = "
305: + pRef);
306: for (int i = 0; i < identityTable.size(); i++) {
307: logger.error(i + ": " + identityTable.get(i));
308: }
309: throw new IOException(
310: "Reference to non-existent object id = " + pRef);
311: // return null;
312: }
313: Object result = pAssoc.getObject();
314: if (logger.isDetailEnabled())
315: logger.detail("Resolving "
316: + PersistenceServiceComponent
317: .getObjectName(result) + " @ " + pRef);
318: return result;
319: } else {
320: if (logger.isDetailEnabled())
321: logger.detail("Passing "
322: + PersistenceServiceComponent.getObjectName(o));
323: return o;
324: }
325: }
326:
327: /**
328: * Object identity table. This is supplied by the creator of this stream.
329: */
330: private IdentityTable identityTable;
331:
332: /**
333: * Get the IdentityTable being used by this stream. This is not
334: * normally used since the IdentityTable is usually maintained by
335: * the creator of this stream.
336: * @return the IdentityTable being used by this stream.
337: */
338: public IdentityTable getIdentityTable() {
339: return identityTable;
340: }
341:
342: /**
343: * Set the IdentityTable to be used by this stream. The
344: * IdentityTable contains assocations of objects to earlier
345: * persistence deltas. References to these earlier objects are
346: * replaced with reference objects to save space.
347: * @param identityTable the new IdentityTable to use.
348: */
349: public void setIdentityTable(IdentityTable identityTable) {
350: this.identityTable = identityTable;
351: }
352: }
|