001: // You can redistribute this software and/or modify it under the terms of
002: // the Ozone Core License version 1 published by ozone-db.org.
003: //
004: // The original code and portions created by SMB are
005: // Copyright (C) 1997-@year@ by SMB GmbH. All rights reserved.
006: //
007: // $Id: Cluster.java,v 1.1 2001/12/18 10:31:30 per_nyfelt Exp $
008:
009: package org.ozoneDB.core.classicStore;
010:
011: import java.io.*;
012: import org.ozoneDB.core.*;
013: import org.ozoneDB.util.*;
014: import org.ozoneDB.DxLib.*;
015:
016: public class Cluster extends Object {
017: public final static int VERSION = 1;
018: public static int MAX_SIZE = 64 * 1024;
019: public final static String CLUSTER_FILE_SUFF = ".cl";
020: public final static String LEAK_FILE_SUFF = ".lk";
021: public final static String RECOVERY_SUFF = ".rec";
022: public final static double LEAK_WEIGHT = 0.5;
023:
024: public final static int DATA = 1;
025: public final static int STATE = 2;
026: public final static int TRANS = 4;
027:
028: // chunk id's
029: public final static byte CL_HEADER_CHUNK = 1;
030: public final static byte DATA_OID_CHUNK = 2;
031: public final static byte DATA_HEADER_CHUNK = 3;
032: public final static byte DATA_CHUNK = 4;
033:
034: // recovery modes
035: public final static byte NONE = 0;
036: public final static byte OBJECTS = 1;
037: public final static byte LEAKS = 2;
038:
039: class Chunk extends Object {
040: public byte id;
041: public byte[] data;
042: public int dataLength;
043: public Object object;
044:
045: public Chunk() {
046: }
047:
048: public Chunk(byte _id, byte[] _data) {
049: id = _id;
050: data = _data;
051: dataLength = data.length;
052: }
053:
054: public Chunk(byte _id, Object obj) {
055: id = _id;
056: object = obj;
057: }
058: }
059:
060: /** */
061: ClusterID cid;
062: /** */
063: byte recoveryMode = NONE;
064: /** */
065: long clusterSize = 0;
066: /** */
067: long leakSize = -1;
068: /** */
069: DataOutputStream stream;
070:
071: /** */
072: DxCollection objects;
073:
074: /** */
075: Env env;
076: ClassicStore classicStore;
077:
078: /**
079: */
080: public Cluster(Env _env, ClassicStore _classicStore) {
081: env = _env;
082: classicStore = _classicStore;
083: }
084:
085: /**
086: */
087: public Cluster(Env _env, ClassicStore _classicStore, ClusterID _cid) {
088: env = _env;
089: classicStore = _classicStore;
090: cid = _cid;
091: }
092:
093: protected void finalize() throws Throwable {
094: super .finalize();
095: close();
096: }
097:
098: /**
099: */
100: public final ClusterID cluID() {
101: return cid;
102: }
103:
104: /**
105: */
106: public final DxCollection objects() {
107: return objects;
108: }
109:
110: /**
111: */
112: public final void beginRecovery(byte mode) {
113: recoveryMode = mode;
114: }
115:
116: /**
117: */
118: public final void endRecovery(byte mode) {
119: File file = mode == OBJECTS ? fileHandle() : leakFileHandle();
120: recoveryMode = NONE;
121: file
122: .renameTo(mode == OBJECTS ? fileHandle()
123: : leakFileHandle());
124: }
125:
126: /**
127: */
128: public final File fileHandle() {
129: return new File(
130: env.dir + Env.DATA_DIR,
131: cid.toString()
132: + Cluster.CLUSTER_FILE_SUFF
133: + (recoveryMode == OBJECTS ? Cluster.RECOVERY_SUFF
134: : ""));
135: }
136:
137: /**
138: */
139: public final File leakFileHandle() {
140: return new File(env.dir + Env.DATA_DIR, cid.toString()
141: + Cluster.LEAK_FILE_SUFF
142: + (recoveryMode == LEAKS ? Cluster.RECOVERY_SUFF : ""));
143: }
144:
145: /**
146: * The cluster size has to different meanings:
147: * - while writing it means the size of the cluster file, so that we can
148: * limit the file's size as good as possible to the user defined
149: * maximum cluster size
150: * - while reading it means the sum of the sizes of all its death objects,
151: * so that we can determine exactly the space we will need in the object
152: * buffer of the cluster space
153: */
154: public final long size() {
155: return clusterSize;
156: }
157:
158: /** */
159: public void open() throws IOException {
160: //env.logWriter.newEntry ("Cluster.open: " + cid, LogWriter.DEBUG);
161: if (stream != null) {
162: return;
163: }
164:
165: File file = fileHandle();
166: boolean newCluster = !file.exists();
167: stream = new DataOutputStream(new BufferedOutputStream(
168: new FileOutputStream(file.toString(), true), 4 * 1024));
169:
170: if (newCluster) {
171: writeHeader();
172: }
173: clusterSize = file.length();
174: }
175:
176: /** */
177: public void close() throws IOException {
178: //env.logWriter.newEntry ("Cluster.close: " + cid, LogWriter.DEBUG);
179: if (stream != null) {
180: stream.close();
181: stream = null;
182: }
183: }
184:
185: /** */
186: public void writeHeader() throws IOException {
187: //env.logWriter.newEntry ("Cluster.writeHeader: " + cid, LogWriter.DEBUG);
188: // write cluster version and id
189: stream.writeInt(Cluster.VERSION);
190: stream.writeLong(cid.value());
191: }
192:
193: /**
194: * size of a object entry:
195: * ObjectID : 8 bytes
196: * TransactionID : 8 bytes
197: * streamed ObjectContainer: comes from DeathObject.stateSize
198: * ChunkID : 1 byte
199: * data length : 4 bytes
200: * data itself : n bytes
201: * -> data overhead : 21 bytes + object state
202: */
203: private final long entrySize(DeathObject dobj) {
204: return dobj.size() + dobj.stateSize + 21;
205: }
206:
207: /**
208: */
209: public void appendObject(DeathObject dobj, TransactionID tid,
210: boolean serialize, boolean useClone) throws Exception {
211: env.logWriter.newEntry(this , "Cluster " + cid
212: + " appendObject: " + dobj.objID() + ", " + tid + ", "
213: + serialize + ", " + useClone, LogWriter.DEBUG3);
214: open();
215:
216: // write the object id
217: stream.writeLong(dobj.objID().value());
218:
219: // write transaction id
220: stream.writeLong(tid.value());
221:
222: // write the object' container
223: Chunk chunk = new Chunk(DATA_HEADER_CHUNK, dobj.container());
224: writeChunk(stream, chunk);
225: dobj.stateSize = chunk.dataLength;
226:
227: // write the object itself: if we are in recovery mode, we use the
228: // dobj.data() directly because it is already set
229: if (serialize) {
230: chunk = new Chunk(DATA_CHUNK, useClone ? dobj.container()
231: .targetShadow() : dobj.container().target());
232: } else {
233: chunk = new Chunk(DATA_CHUNK, dobj.data());
234: }
235:
236: writeChunk(stream, chunk);
237: dobj.setSize(chunk.data.length);
238:
239: clusterSize = fileHandle().length();
240: }
241:
242: /**
243: * reads all objects from the cluster, while dropping leaks in normal mode
244: * or broken-transaction-objects in recovery mode;
245: * returns false if any objects were dropped
246: */
247: public boolean readObjects(int whatToRead, TransactionID rollBackTid)
248: throws Exception {
249: //env.logWriter.newEntry ("Cluster " + cid + " readObjects: " + whatToRead + ", " + rollBackTid, LogWriter.DEBUG);
250: boolean result = true;
251:
252: DataInputStream fi = new DataInputStream(new FileInputStream(
253: fileHandle()));
254: int version = fi.readInt();
255: cid = new ClusterID(fi.readLong());
256:
257: DxMultiMap leaks = (DxMultiMap) readLeaks(rollBackTid, true);
258: //env.logWriter.newEntry ("leaks for " + cid + ": " + leaks.count(), LogWriter.DEBUG);
259:
260: objects = new DxArrayBag();
261:
262: while (fi.available() != 0) {
263: TransactionID tid = null;
264: ClassicObjectContainer os = null;
265: DeathObject dobj = null;
266: boolean isLeak = false;
267: boolean rollBack = false;
268:
269: try {
270: // read the object id
271: ObjectID oid = new ObjectID(fi.readLong());
272: //env.logWriter.newEntry ("\tnext object: " + oid, LogWriter.DEBUG);
273:
274: DxDeque oidLeaks = (DxDeque) leaks.elementsForKey(oid);
275: if (oidLeaks != null) {
276: isLeak = true;
277: //env.logWriter.newEntry ("\t" + oid + " is a leak", LogWriter.DEBUG);
278: if (oidLeaks.count() == 1) {
279: leaks.removeForKey(oid);
280: } else {
281: oidLeaks.popBottom();
282: }
283: }
284:
285: // read TransactionID
286: tid = new TransactionID(fi.readLong());
287: // check, if this an object of the broken tranaction
288: if (rollBackTid != null && rollBackTid.equals(tid)) {
289: rollBack = true;
290: }
291: if (rollBack || isLeak || (whatToRead & TRANS) == 0) {
292: tid = null;
293: }
294:
295: // read object state, if necessary
296: Chunk stateChunk = readChunk(fi,
297: (whatToRead & STATE) == 0 || rollBack || isLeak);
298: if (stateChunk.data != null) {
299: ObjectInputStream in = new ObjectInputStream(
300: new ByteArrayInputStream(stateChunk.data));
301: os = new ClassicObjectContainer();
302: os.loadExternal(in);
303: in.close();
304: }
305:
306: // create a new deathobj and read the data
307: Chunk dataChunk = readChunk(fi,
308: (whatToRead & DATA) == 0 || rollBack || isLeak);
309: if (dataChunk.data != null) {
310: dobj = new DeathObject(oid);
311: dobj.stateSize = stateChunk.dataLength;
312: dobj.setData(dataChunk.data);
313: clusterSize += dobj.stateSize;
314: clusterSize += dobj.size();
315: }
316:
317: } catch (Exception e) {
318: env.fatalError(this ,
319: "exception in readObjects() of cluster " + cid,
320: e);
321: break;
322: }
323:
324: // is everything was fine, add all required information,
325: // we have to do this at last, because only complete objects
326: // should be added
327: if (tid != null) {
328: objects.add(tid);
329: }
330: if (os != null) {
331: objects.add(os);
332: }
333: if (dobj != null) {
334: objects.add(dobj);
335: }
336:
337: result &= !rollBack;
338: }
339:
340: fi.close();
341: return result;
342: }
343:
344: /** */
345: public long leakSize() {
346: if (leakSize != -1) {
347: return leakSize;
348: }
349:
350: File file = new File(env.dir + Env.DATA_DIR, cid
351: + Cluster.LEAK_FILE_SUFF);
352: if (!file.exists()) {
353: return 0;
354: }
355:
356: try {
357: DataInputStream leakStream = new DataInputStream(
358: new FileInputStream(file));
359: leakStream.skip(leakStream.available() - 8);
360: leakSize = leakStream.readLong();
361: leakStream.close();
362: return leakSize;
363: } catch (IOException e) {
364: return 0;
365: }
366: }
367:
368: /** */
369: public void writeLeak(DeathObject dobj, TransactionID tid)
370: throws Exception {
371: writeLeak(dobj.objID(), tid, entrySize(dobj));
372: }
373:
374: /** */
375: public void writeLeak(ObjectID oid, TransactionID tid, long objSize)
376: throws Exception {
377: //env.logWriter.newEntry ("Cluster " + cid + " writeLeak: " + oid + ", " + tid + ", " + objSize, LogWriter.DEBUG);
378: File file = leakFileHandle();
379: boolean newFile = !file.exists();
380: DataOutputStream leakStream = new DataOutputStream(
381: new BufferedOutputStream(new FileOutputStream(file
382: .toString(), true)));
383:
384: // write header, if necessary
385: if (newFile) {
386: leakStream.writeInt(Cluster.VERSION);
387: leakStream.writeLong(cid.value());
388: }
389:
390: // update the leakSize
391: leakSize();
392: // increase the leak size
393: leakSize += objSize;
394:
395: // write leak entry
396: leakStream.writeLong(oid.value());
397: leakStream.writeLong(tid.value());
398: leakStream.writeLong(leakSize);
399:
400: leakStream.close();
401: }
402:
403: /** */
404: public DxCollection readLeaks(TransactionID rollBackTid,
405: boolean ordered) throws Exception {
406: File file = leakFileHandle();
407:
408: DxCollection result;
409: if (ordered) {
410: result = new DxMultiMap(new DxHashMap(), new DxListDeque());
411: } else {
412: result = new DxListDeque();
413: }
414:
415: if (!file.exists()) {
416: return result;
417: }
418:
419: DataInputStream leakStream = new DataInputStream(
420: new FileInputStream(file));
421:
422: leakStream.readInt();
423: leakStream.readLong();
424:
425: while (leakStream.available() != 0) {
426: // read object id
427: ObjectID oid = new ObjectID(leakStream.readLong());
428: // read transaction id
429: TransactionID tid = new TransactionID(leakStream.readLong());
430: // read leak size
431: Long leakSize = new Long(leakStream.readLong());
432:
433: if (rollBackTid == null || !rollBackTid.equals(tid)) {
434: if (ordered) {
435: ((DxMultiMap) result).addForKey(tid, oid);
436: } else {
437: ((DxDeque) result).pushTop(oid);
438: ((DxDeque) result).pushTop(tid);
439: ((DxDeque) result).pushTop(leakSize);
440: }
441: }
442: }
443:
444: leakStream.close();
445:
446: return result;
447: }
448:
449: /** */
450: public void removeFromDisk() throws IOException {
451: //env.logWriter.newEntry ("Cluster " + cid + " removeFromDisk", LogWriter.DEBUG);
452: File f = fileHandle();
453: if (f.exists()) {
454: f.delete();
455: }
456: f = leakFileHandle();
457: if (f.exists()) {
458: f.delete();
459: }
460: }
461:
462: /** */
463: private void writeChunk(DataOutputStream out, Chunk chunk)
464: throws IOException {
465: if (chunk.object != null) {
466: ByteArrayOutputStream bs = new ByteArrayOutputStream();
467: ObjectOutputStream os = new ObjectOutputStream(bs);
468: if (chunk.object instanceof ClassicObjectContainer) {
469: ((ClassicObjectContainer) chunk.object)
470: .storeExternal(os);
471: } else {
472: os.writeObject(chunk.object);
473: }
474: chunk.data = bs.toByteArray();
475: chunk.dataLength = chunk.data.length;
476: os.close();
477: }
478:
479: env.logWriter.newEntry(this , "Cluster " + cid + " writeChunk: "
480: + chunk.id + ", " + chunk.dataLength, LogWriter.DEBUG3);
481: out.writeByte(chunk.id);
482: out.writeInt(chunk.dataLength);
483: out.write(chunk.data);
484: }
485:
486: /** */
487: private Chunk readChunk(DataInputStream in, boolean skip)
488: throws IOException {
489: Chunk chunk = new Chunk();
490: chunk.id = in.readByte();
491:
492: chunk.dataLength = in.readInt();
493: //env.logWriter.newEntry ("Cluster " + cid + " readChunk: " + chunk.id + ", " + chunk.dataLength, LogWriter.DEBUG);
494: if (skip) {
495: in.skip(chunk.dataLength);
496: } else {
497: chunk.data = new byte[chunk.dataLength];
498: in.read(chunk.data);
499: }
500:
501: return chunk;
502: }
503:
504: public void rollBack(TransactionID rollBackTid) throws Exception {
505: //env.logWriter.newEntry ("Cluster " + cid + " rollback: " + rollBackTid, LogWriter.DEBUG);
506: rollBackLeaks(rollBackTid);
507:
508: boolean clusterIsClean = false;
509:
510: try {
511: clusterIsClean = readObjects(Cluster.STATE | Cluster.TRANS
512: | Cluster.DATA, rollBackTid);
513: } catch (Exception e) {
514: //env.logWriter.newEntry (this, "rollBack: cluster " + cid + " corrupted", LogWriter.WARN);
515: }
516:
517: if (!clusterIsClean) {
518: if (objects().count() > 0) {
519: // switch into recovery mode
520: beginRecovery(OBJECTS);
521: open();
522:
523: // rewrite all valid objects in the shadow cluster
524: DxIterator it = objects().iterator();
525: while (it.next() != null) {
526: TransactionID tid = (TransactionID) it.object();
527: ObjectContainer os = (ObjectContainer) it.next();
528: DeathObject dobj = (DeathObject) it.next();
529: // swap the object state, because it may have changed
530: // while the transaction rollBackTid
531: classicStore.objectSpace.deleteObject(os);
532: classicStore.objectSpace.addObject(os);
533: appendObject(dobj, tid, false, false);
534: }
535:
536: close();
537: // switch back to normal mode
538: endRecovery(OBJECTS);
539: } else {
540: // if all objects of the cluster are invalid simply delete it
541: removeFromDisk();
542: }
543: }
544: }
545:
546: public void rollBackLeaks(TransactionID rollBackTid)
547: throws Exception {
548: DxDeque leaks = null;
549: try {
550: leaks = (DxDeque) readLeaks(rollBackTid, false);
551: } catch (Exception e) {
552: //env.logWriter.newEntry (this, "rollBackLeaks: leaks " + cid + " corrupted", LogWriter.WARN);
553: }
554:
555: if (leaks.count() > 0) {
556: beginRecovery(LEAKS);
557:
558: while (leaks.count() > 0) {
559: writeLeak((ObjectID) leaks.popBottom(),
560: (TransactionID) leaks.popBottom(),
561: ((Long) leaks.popBottom()).longValue());
562: }
563:
564: endRecovery(LEAKS);
565: } else {
566: if (leakFileHandle().exists()) {
567: leakFileHandle().delete();
568: }
569: }
570: }
571:
572: /**
573: * Checks, if the specified cluster needs to be compressed.
574: */
575: protected boolean needsCompressing() {
576: boolean result = false;
577:
578: // retrieve the cluster size simply of it's file size
579: // this is much faster than reading the whole cluster
580: long clSize = fileHandle().length();
581: if (clSize > 0) {
582: result = (double) leakSize() / clSize > Cluster.LEAK_WEIGHT;
583: }
584:
585: return result;
586: }
587:
588: }
|