001: // You can redistribute this software and/or modify it under the terms of
002: // the Ozone Core License version 1 published by ozone-db.org.
003: //
004: // The original code and portions created by SMB are
005: // Copyright (C) 1997-@year@ by SMB GmbH. All rights reserved.
006: //
007: // $Id: ClusterStore.java,v 1.2 2002/06/08 00:49:39 mediumnet Exp $
008:
009: package org.ozoneDB.core.wizardStore;
010:
011: import java.io.*;
012: import java.util.zip.*;
013: import org.ozoneDB.DxLib.*;
014: import org.ozoneDB.*;
015: import org.ozoneDB.core.*;
016: import org.ozoneDB.util.*;
017:
018: /**
019: * The ClusterStore is the back-end store of the wizardStore. It maintains the
020: * cluster cache, activation/passivation and the actual persistent commits.
021: *
022: *
023: * @author <a href="http://www.softwarebuero.de/">SMB</a>
024: * @author <a href="http://www.medium.net/">Medium.net</a>
025: * @version $Revision: 1.2 $Date: 2002/06/08 00:49:39 $
026: */
027: public final class ClusterStore {
028:
029: public final static String POSTFIX_CLUSTER = ".cl";
030: public final static String POSTFIX_LOCK = ".lk";
031: public final static String POSTFIX_TEMP = ".tm";
032: public final static String POSTFIX_SHADOW = ".sh";
033:
034: protected final static int compressionFactor = 3;
035:
036: protected transient Env env;
037:
038: protected transient long touchCount;
039:
040: protected DxMap cachedClusters;
041:
042: protected int maxClusterSize = 64 * 1024;
043:
044: /**
045: * Table that maps Permissions to ClusterIDs.
046: */
047: protected DxMap growingClusterIDs;
048:
049: private boolean compressClusters;
050:
051: ClusterStore(Env _env) {
052: env = _env;
053: maxClusterSize = env.config.intProperty(Setup.WS_CLUSTER_SIZE,
054: -1);
055: cachedClusters = new DxHashMap(64);
056: compressClusters = env.config.booleanProperty(
057: Setup.WS_COMPRESS_CLUSTERS, true);
058: }
059:
060: public void startup() throws Exception {
061: growingClusterIDs = new DxHashMap(32);
062: }
063:
064: public void shutdown() {
065: }
066:
067: /**
068: * Check if the ClusterStore was cleanly shutted down.
069: */
070: public boolean isCleanShutdown() {
071: File file = new File(env.dir + Env.DATA_DIR);
072: String[] fileList = file.list();
073:
074: for (int i = 0; i < fileList.length; i++) {
075: if (fileList[i].endsWith(POSTFIX_SHADOW)
076: || fileList[i].endsWith(POSTFIX_TEMP)) {
077: return false;
078: }
079: }
080: return true;
081: }
082:
083: /**
084: * Search the DATA dir and recover all ClusterIDs.
085: */
086: public DxSet recoverClusterIDs() {
087: File file = new File(env.dir + Env.DATA_DIR);
088: String[] fileList = file.list();
089:
090: DxSet result = new DxHashSet();
091: for (int i = 0; i < fileList.length; i++) {
092: if (fileList[i].endsWith(POSTFIX_CLUSTER)
093: || fileList[i].endsWith(POSTFIX_SHADOW)) {
094: String cidString = fileList[i].substring(0, fileList[i]
095: .indexOf('.'));
096: long cid = Long.parseLong(cidString);
097: result.add(new ClusterID(cid));
098: }
099: }
100: return result;
101: }
102:
103: public long currentCacheSize() {
104: long result = 0;
105: DxIterator it = cachedClusters.iterator();
106: Cluster cluster;
107: while ((cluster = (Cluster) it.next()) != null) {
108: result += cluster.size();
109: }
110: return result;
111: }
112:
113: protected int currentBytesPerContainer() {
114: int result = env.config.intProperty(
115: Setup.WS_CLUSTER_SIZE_RATIO, 256);
116: // env.logWriter.newEntry( this, "currentBytesPerContainer(): setup:" + result, LogWriter.DEBUG );
117: return result;
118:
119: // if (cachedClusters.count() < 3) {
120: // int result = env.config.intProperty (Setup.WS_CLUSTER_SIZE_RATIO, 256);
121: // env.logWriter.newEntry (this, "currentBytesPerContainer(): config:" + result, LogWriter.DEBUG);
122: // return result;
123: // }
124: // else {
125: // int bpc = 0;
126: // int count = 0;
127: // DxIterator it = cachedClusters.iterator();
128: // Cluster cluster;
129: // while ((cluster=(Cluster)it.next()) != null) {
130: // count ++;
131: // bpc += cluster.bytesPerContainer;
132: // }
133: // int result = bpc / count;
134: // env.logWriter.newEntry (this, "currentBytesPerContainer(): new:" + result, LogWriter.DEBUG);
135: // return result;
136: // }
137: }
138:
139: // public Cluster lruCluster() {
140: // search the LRU cluster to speed things up; since this is not
141: // synchronized, checking and accessing currentCluster must be done in
142: // one line to avoid other thread to change the variable in between
143: // container = (currentCluster != null && currentCluster.lock != null) ? currentCluster.containerForID (id) : null;
144: // if (container != null) {
145: // // System.out.print ("+");
146: // return container.isDeleted() ? null : container;
147: // }
148:
149: /**
150: * @param perms Permissions of the cluster to search.
151: * @return Cluster with the specified permissions that is good to store a
152: * new container in it.
153: */
154: protected synchronized Cluster growingCluster(Permissions perms)
155: throws Exception {
156: if (false && env.logWriter.hasTarget(LogWriter.DEBUG3)) {
157: env.logWriter.newEntry(this , "growingCluster() ",
158: LogWriter.DEBUG3);
159: }
160:
161: Cluster cluster = null;
162: ClusterID cid = (ClusterID) growingClusterIDs
163: .elementForKey(perms);
164:
165: // load the current growing cluster and check space
166: if (cid != null) {
167: cluster = (Cluster) cachedClusters.elementForKey(cid);
168: if (cluster == null) {
169: cluster = loadCluster(cid, false);
170: }
171: // check cluster size and if it was deactivated by the trimCache();
172: // use this cluster only if it isn't used by another ta
173: if (cluster.lock == null
174: || cluster.size() >= maxClusterSize
175: || cluster.lock.level(null) > Lock.LEVEL_NONE
176: && !cluster.lock
177: .isAcquiredBy(env.transactionManager
178: .currentTA())) {
179:
180: if (false && env.logWriter.hasTarget(LogWriter.DEBUG)) {
181: env.logWriter.newEntry(this ,
182: "growingCluster(): growing cluster not usable: cid="
183: + cluster.clusterID()
184: + " size="
185: + cluster.size()
186: + " lockLevel="
187: + (cluster.lock != null ? String
188: .valueOf(cluster.lock
189: .level(null))
190: : "null"), LogWriter.DEBUG);
191: }
192:
193: growingClusterIDs.removeForKey(perms);
194: cluster = null;
195: }
196: }
197:
198: // search all currently loaded clusters
199: if (cluster == null) {
200: DxIterator it = cachedClusters.iterator();
201: Cluster cursor;
202: while ((cursor = (Cluster) it.next()) != null) {
203: // System.out.println (cursor.size());
204: if (cursor.size() < maxClusterSize
205: && cursor.permissions.equals(perms)) {
206: cluster = cursor;
207:
208: // make sure that there is enough space for the clusters to be
209: // able to grow to the max size
210: // ensureCacheSpace (maxClusterSize - cluster.size());
211: trimCache();
212:
213: // check if the cluster deactivated be the ensureCacheSpace
214: if (cluster.getLock() == null) {
215: env.logWriter.newEntry(this ,
216: "growingCluster(): loaded cluster was deactivated: "
217: + cluster.clusterID(),
218: LogWriter.DEBUG);
219: cluster = null;
220: } else if (cluster.lock.level(null) > Lock.LEVEL_NONE
221: && !cluster.lock
222: .isAcquiredBy(env.transactionManager
223: .currentTA())) {
224: // use this cluster only if it isn't used by another ta
225: if (false) {
226: env.logWriter.newEntry(this ,
227: "growingCluster(): loaded cluster is locked by another transaction: "
228: + cluster.clusterID(),
229: LogWriter.DEBUG);
230: }
231: cluster = null;
232: } else {
233: growingClusterIDs.addForKey(
234: cluster.clusterID(), perms);
235: if (false) {
236: env.logWriter
237: .newEntry(
238: this ,
239: "growingCluster(): loaded cluster is now growing cluster: "
240: + cluster
241: .clusterID()
242: + " size:"
243: + cluster.size(),
244: LogWriter.DEBUG);
245: }
246: break;
247: }
248: }
249: }
250: }
251:
252: // write a new, empty cluster and load it just after to ensures
253: // that new cluster is "regularly" loaded
254: if (cluster == null) {
255: cluster = createANewEmptyAndUsableCluster(perms);
256: }
257:
258: return cluster;
259: }
260:
261: /**
262: Creates a cluster which is
263: <UL>
264: <LI>new</LI>
265: <LI>empty</LI>
266: <LI>usable and</LI>
267: <LI>not locked</LI>
268: </UL>
269: */
270: protected synchronized Cluster createANewEmptyAndUsableCluster(
271: Permissions perms) throws IOException,
272: ClassNotFoundException {
273: // env.logWriter.newEntry( this, "growingCluster(): creating new cluster...", LogWriter.DEBUG );
274: Cluster cluster = new Cluster(new ClusterID(env.keyGenerator
275: .nextID()), perms, env.transactionManager.newLock(),
276: 256);
277:
278: // the new cluster has to be written to disk in order to make
279: // saveShadow() and things work;
280: storeData(cluster, basename(cluster.clusterID())
281: + POSTFIX_CLUSTER);
282:
283: if (false) { // Old
284: // If we do not pin, the freshly created cluster may be deactivated and thus its lock may be null
285: cluster.pin();
286: try {
287: // since we don't check the cache size after registering a cont
288: // we have to make sure that there is enough space for this cluster
289: // to grow to the max size
290: // ensureCacheSpace (maxClusterSize);
291: trimCache();
292: cluster = loadCluster(cluster.clusterID(), false);
293: } finally {
294: cluster.unpin();
295: }
296: } else {
297: // since we don't check the cache size after registering a cont
298: // we have to make sure that there is enough space for this cluster
299: // to grow to the max size
300: // ensureCacheSpace (maxClusterSize);
301: trimCache();
302:
303: // We need to load the cluster pinned because loadCluster guarantees only to return a not-unloaded cluster if it is pinned.
304: cluster = loadCluster(cluster.clusterID(), true);
305: cluster.unpin();
306: }
307:
308: growingClusterIDs.addForKey(cluster.clusterID(), perms);
309: // env.logWriter.newEntry( this, "growingCluster(): new cluster created: " + cluster.clusterID(), LogWriter.DEBUG );
310:
311: return cluster;
312: }
313:
314: /**
315: Returns or creates a cluster which is not locked so that locking it will succeed.
316: The returned cluster is only guaranteed to be not locked by any other thread as long as this
317: method is called during synchronization to this ClusterStore.
318: */
319: protected Cluster giveMeAnUnlockedCluster(Permissions perms)
320: throws IOException, ClassNotFoundException {
321: return createANewEmptyAndUsableCluster(perms);
322: }
323:
324: /**
325: Associates the specified container with a cluster.
326:
327: Iff this method returns normally (without exception), the container is pinned and thus
328: has to be unpinned.
329:
330: Iff this method returns normally (without exception), the container (and thus the cluster of the container)
331: is write locked
332:
333: @param container Container to be registered with one cluster.
334: */
335: public void registerContainerAndPinAndLock(
336: WizardObjectContainer container, Permissions perms,
337: Transaction locker, int lockLevel) throws Exception {
338: if (false && env.logWriter.hasTarget(LogWriter.DEBUG3)) {
339: env.logWriter.newEntry(this , "registerContainer()",
340: LogWriter.DEBUG3);
341: }
342:
343: Cluster cluster = null;
344:
345: boolean pinned = false;
346: boolean locked = false;
347: boolean alright = false;
348:
349: try {
350: synchronized (this ) {
351: cluster = growingCluster(perms);
352:
353: Lock clusterLock = cluster.getLock();
354: int prevLevel = clusterLock.tryAcquire(locker,
355: lockLevel);
356:
357: if (prevLevel == Lock.NOT_ACQUIRED) { // The cluster we are trying to lock is already locked, so we take another cluster
358: cluster = giveMeAnUnlockedCluster(perms);
359:
360: clusterLock = cluster.getLock();
361: prevLevel = clusterLock.tryAcquire(locker,
362: lockLevel);
363:
364: if (prevLevel == Lock.NOT_ACQUIRED) {
365: throw new Error(
366: "BUG! We could not acquire a lock for an unlocked cluster.");
367: }
368: }
369: locked = true;
370:
371: cluster.registerContainer(container);
372: container.pin();
373: pinned = true;
374: }
375: cluster.updateLockLevel(locker);
376:
377: if (false && env.logWriter.hasTarget(LogWriter.DEBUG3)) {
378: env.logWriter.newEntry(this , " cluster: "
379: + cluster.clusterID(), LogWriter.DEBUG3);
380: }
381: alright = true;
382: } finally {
383: if (!alright) {
384: if (locked) {
385: cluster.getLock().release(locker);
386: }
387: if (pinned) {
388: container.unpin();
389: }
390: }
391: }
392: }
393:
394: public void invalidateContainer(WizardObjectContainer container) /*throws Exception*/{
395: synchronized (container) {
396: container.getCluster().removeContainer(container);
397: container.setCluster(null);
398: }
399: }
400:
401: protected Cluster restoreCluster(ClusterID cid) throws Exception {
402: String basename = basename(cid);
403: Cluster cluster;
404:
405: new File(basename + POSTFIX_LOCK).delete();
406: new File(basename + POSTFIX_TEMP).delete();
407:
408: File shadowFile = new File(basename + POSTFIX_SHADOW);
409: File clusterFile = new File(basename + POSTFIX_CLUSTER);
410:
411: if (shadowFile.exists()) {
412: if (!shadowFile.renameTo(clusterFile)) {
413: throw new IOException("Unable to rename shadow file.");
414: }
415: }
416: cluster = (Cluster) loadData(basename + POSTFIX_CLUSTER);
417: activateCluster(cluster, 0);
418:
419: return cluster;
420: }
421:
422: /**
423: * Make sure the corresponding cluster is in the cache. While loading
424: * clusters, we may have to throw away (and maybe store) some currently
425: * cached clusters.
426: *
427: *
428: * @param cid ClusterID of the cluster to load.
429: * @param pin
430: wether the loaded cluster should be pinned as soon as it is loaded
431: so that there may be no chance to unload unless it is unpinned.
432: If this parameter is set to true, the user has to unpin the cluster.
433: If this parameter is set to false, the cluster may already be unloaded when this method returns.
434: after using it.
435: */
436: public Cluster loadCluster(ClusterID cid, boolean pin)
437: throws IOException, ClassNotFoundException {
438: Cluster cluster = (Cluster) cachedClusters.elementForKey(cid);
439: if (cluster == null) {
440:
441: if (false && env.logWriter.hasTarget(LogWriter.INFO)) {
442: env.logWriter.newEntry(this ,
443: "loadCluster(): load cluster from disk: "
444: + cid.toString(), LogWriter.INFO);
445: }
446:
447: String basename = basename(cid);
448: String clusterName = basename + POSTFIX_CLUSTER;
449: String lockName = basename + POSTFIX_LOCK;
450:
451: int clusterByteSize = (int) new File(clusterName).length();
452: if (compressClusters) {
453: clusterByteSize *= compressionFactor;
454: }
455:
456: // make sure that many different threads don't load
457: // to much data before the currently synchronized thread
458: // can trim the cache
459: trimCache();
460: cluster = (Cluster) loadData(clusterName);
461:
462: synchronized (this ) {
463:
464: // now we have to check the cachedClusters table inside the
465: // synchronized block to see if someone did register this
466: // cluster while we loaded it
467: Cluster interimCluster = (Cluster) cachedClusters
468: .elementForKey(cid);
469: if (interimCluster != null) {
470: if (false) { // This is not worth a warning. It happens, and with appropriate locking|pinning, it is no problem or danger.
471: env.logWriter
472: .newEntry(
473: this ,
474: "loadCluster(): cluster was loaded by another thread too; droping my copy",
475: LogWriter.WARN);
476: }
477: cluster = interimCluster;
478:
479: if (pin) {
480: cluster.pin();
481: }
482: } else {
483: // we are going to mess with the cluster; it seems that the cluster
484: // is not visible to other thread until it is added to cachedClusters,
485: // however, IBM jdk throws an exception in cluster.updateLockLevel, which
486: // seems to be related to the initialization in the following block
487: synchronized (cluster) {
488: // locks are only there if the lock level is >= READ
489: try {
490: cluster.setLock((Lock) loadData(lockName));
491: new File(lockName).delete();
492: ((MROWLock) cluster.getLock())
493: .setClusterID(cluster.clusterID());
494: } catch (Exception e) {
495: if (env.logWriter
496: .hasTarget(LogWriter.DEBUG3)) {
497: env.logWriter
498: .newEntry(
499: this ,
500: " Unable to load lock from disk - creating a new lock.",
501: LogWriter.DEBUG3);
502: }
503: cluster.setLock(env.transactionManager
504: .newLock());
505: ((MROWLock) cluster.getLock())
506: .setClusterID(cluster.clusterID());
507: }
508:
509: if (pin) { // We pin inside the synchronization to the cluster, because calling pin() will try another synchronization and two nested synchronizations to an object are faster than two serial synchronizations.
510: cluster.pin();
511: }
512:
513: activateCluster(cluster, clusterByteSize);
514: }
515:
516: if (clusterByteSize > maxClusterSize * 2) {
517: splitCluster(cluster);
518: }
519:
520: cachedClusters.addForKey(cluster, cluster
521: .clusterID());
522:
523: trimCache();
524: }
525: }
526: }
527: return cluster;
528: }
529:
530: public void splitCluster(Cluster cluster) {
531: }
532:
533: /**
534: * Remove cluster from the cluster cache.
535: * @param cid
536: */
537: public void unloadCluster(ClusterID cid, boolean deactivate)
538: throws IOException {
539: if (env.logWriter.hasTarget(LogWriter.DEBUG)) {
540: env.logWriter.newEntry(this , "unloadCluster(" + cid + ","
541: + deactivate + ").", LogWriter.DEBUG);
542: }
543:
544: Cluster cluster = (Cluster) cachedClusters.removeForKey(cid);
545:
546: if (deactivate) {
547: deactivateCluster(cluster);
548: }
549: }
550:
551: /**
552: * Ensure that there is at least the specified size of free space in the
553: * cluster cache. Under some circumstances clusters (currently invoked)
554: * cannot be deactivated. Therefore this method cannot guarantee that the
555: * needed space is free afterwards.<p>
556: *
557: * This is the central method of the deactivation of containers that are
558: * currently in use. This is different from the commit behaviour.
559: */
560: protected void trimCache() throws IOException {
561:
562: long freeSpace = env.freeMemory();
563: if (false && env.logWriter.hasTarget(LogWriter.DEBUG)) {
564: env.logWriter.newEntry(this , "trimCache(): free:"
565: + freeSpace, LogWriter.DEBUG2);
566: }
567:
568: if (freeSpace <= 0) {
569: synchronized (this ) {
570: long cacheSize = 0;
571:
572: // build priority queue for all currently loaded clusters
573: DxMap priorityQueue = new DxTreeMap();
574: DxIterator it = cachedClusters.iterator();
575: Cluster cluster;
576: while ((cluster = (Cluster) it.next()) != null) {
577: priorityQueue.addForKey(cluster, cluster
578: .cachePriority());
579: cacheSize += cluster.size();
580: }
581:
582: // free at least 20% of the cache
583: long neededSpace = Math.max(maxClusterSize * 2,
584: cacheSize / 20);
585: if (env.logWriter.hasTarget(LogWriter.DEBUG)) {
586: env.logWriter.newEntry(this ,
587: " cache: " + cacheSize + " to be freed:"
588: + neededSpace, LogWriter.DEBUG2);
589: }
590:
591: // throw away (deactivate) clusters, lowest priority first
592: it = priorityQueue.iterator();
593: while (freeSpace < neededSpace
594: && (cluster = (Cluster) it.next()) != null) {
595:
596: // if any of the containers is currently invoked, the cluster
597: // must not be written and must stay in memory
598: // The same applies for pinned containers.
599: // FIXME: Once pinning is fully established, we may not need to call isInvoked() anymore.
600: if ((!cluster.isPinned()) && (!cluster.isInvoked())) {
601: if (env.logWriter.hasTarget(LogWriter.DEBUG)) {
602: env.logWriter.newEntry(this ,
603: "DEACTIVATE cluster: "
604: + cluster.clusterID(),
605: LogWriter.DEBUG2);
606: }
607:
608: cluster = (Cluster) it.removeObject();
609: unloadCluster(cluster.clusterID(), true);
610:
611: // try to free the memory of the unloaded cluster
612: System.gc();
613: freeSpace = env.freeMemory();
614:
615: if (env.logWriter.hasTarget(LogWriter.DEBUG)) {
616: env.logWriter.newEntry(this , " free:"
617: + freeSpace, LogWriter.DEBUG2);
618: }
619: } else {
620: if (false) {
621: env.logWriter.newEntry(this ,
622: "trying to DEACTIVATE 'invoked' cluster: "
623: + cluster.clusterID(),
624: LogWriter.WARN);
625: }
626: }
627: }
628: }
629: }
630: }
631:
632: /**
633: * This method us called right after the specified Cluster was loaded from
634: * disk.
635: */
636: protected void activateCluster(Cluster cluster, int size) {
637: if (env.logWriter.hasTarget(LogWriter.DEBUG3)) {
638: env.logWriter.newEntry(this , "activateCluster(): "
639: + cluster.clusterID, LogWriter.DEBUG3);
640: }
641: cluster.env = env;
642: cluster.clusterStore = this ;
643: cluster.touch();
644: cluster.setCurrentSize(size);
645: }
646:
647: /**
648: * Deactivate the specified cluster before it is written to disk. The
649: * specified cluster will be removed from the cluster cache. If it currently
650: * has shadows, they are written to disk. If any of the containers are
651: * currently invoked (should normally never happen), the shadows must stay
652: * in memory.
653: */
654: protected void deactivateCluster(Cluster cluster)
655: throws IOException {
656: if (env.logWriter.hasTarget(LogWriter.DEBUG)) {
657: env.logWriter.newEntry(this , "deactivateCluster(): "
658: + cluster.clusterID + " priority: "
659: + cluster.cachePriority(), LogWriter.DEBUG);
660: env.logWriter.newEntry(this , " lock: "
661: + cluster.lock.level(null), LogWriter.DEBUG);
662: }
663:
664: String basename = basename(cluster.clusterID());
665:
666: synchronized (this ) { // We synchronize on this ClusterStore so that a freshly returned cluster within the ClusterStore lock may not be deactivated during the lock time.
667: synchronized (cluster) {
668: // any lock levels >= READ has to be persistent
669: if (cluster.lock.level(null) >= Lock.LEVEL_READ) {
670: if (env.logWriter.hasTarget(LogWriter.DEBUG)) {
671: env.logWriter.newEntry(this ,
672: " write lock to disk: "
673: + cluster.clusterID,
674: LogWriter.DEBUG);
675: }
676:
677: storeData(cluster.getLock(), basename
678: + POSTFIX_LOCK);
679: } else {
680: File lockFile = new File(basename + POSTFIX_LOCK);
681: if (lockFile.exists()) {
682: lockFile.delete();
683: }
684: }
685:
686: // clusters with WRITE lock are supposed to be dirty
687: if (cluster.lock.level(null) > Lock.LEVEL_UPGRADE) {
688: if (env.logWriter.hasTarget(LogWriter.DEBUG)) {
689: env.logWriter.newEntry(this ,
690: " write cluster: "
691: + cluster.clusterID,
692: LogWriter.DEBUG);
693: }
694:
695: storeData(cluster, basename + POSTFIX_CLUSTER);
696: }
697:
698: // mark the cluster to be not valid
699: cluster.setLock(null);
700: }
701: }
702: }
703:
704: /**
705: * Store the specified cluster on disk. Write temp files first. If this
706: * write fails, the original are still valid. The cluster may has been
707: * written to the disk already, if is was deactivated while transaction.
708: * But in case the cluster (and its changes) are only in memory, we have to
709: * write now to check if this is possible without errors.
710: *
711: * Note: This only writes all currently commited transaction results to the
712: * disk. This is different from the deactivation behaviour.
713: *
714: *
715: * @param cid Cluster to be prepare-commited.
716: * @exception Exception None of the clusters are written to disk.
717: */
718: public synchronized void prepareCommitCluster(Transaction ta,
719: ClusterID cid) throws IOException, ClassNotFoundException {
720: if (false && env.logWriter.hasTarget(LogWriter.DEBUG3)) {
721: env.logWriter.newEntry(this , "prepareCommitCluster(): "
722: + cid, LogWriter.DEBUG3);
723: }
724:
725: // Cluster cluster = loadCluster(cid,false);
726:
727: // If we do not pin, loadCluster may just loose the loaded cluster due to trim() after load
728:
729: Cluster cluster = loadCluster(cid, true);
730: cluster.unpin();
731: cluster.prepareCommit(ta);
732: if (cluster.lock.level(null) >= Lock.LEVEL_WRITE) {
733: String tempFilename = basename(cid) + POSTFIX_TEMP;
734:
735: // write changed cluster in temp file; the lock is written in
736: // commit() and abort()
737: storeData(cluster, tempFilename);
738:
739: long fileSize = new File(tempFilename).length();
740: if (fileSize == 0L) {
741: throw new IOException(
742: "Unable to determine cluster file size.");
743: }
744:
745: if (compressClusters) {
746: fileSize *= compressionFactor;
747: }
748: cluster.setCurrentSize((int) fileSize);
749: }
750: }
751:
752: /**
753: * Actually commit the specified cluster. This simply renames the temp file
754: * to be the new "original" ones. The rename operation MUST NOT fail.
755: *
756: *
757: * @param cid Cluster to be commited.
758: */
759: public synchronized void commitCluster(Transaction ta, ClusterID cid)
760: throws IOException, ClassNotFoundException {
761: if (false && env.logWriter.hasTarget(LogWriter.DEBUG3)) {
762: env.logWriter.newEntry(this , "commitCluster(): " + cid,
763: LogWriter.DEBUG3);
764: }
765:
766: String basename = basename(cid);
767: File clusterFile = new File(basename + POSTFIX_CLUSTER);
768: File tempFile = new File(basename + POSTFIX_TEMP);
769:
770: if (tempFile.exists()) {
771: clusterFile.delete();
772: if (!tempFile.renameTo(clusterFile)) {
773: throw new IOException("Unable to rename temp cluster.");
774: }
775: }
776:
777: // FIXME: if transaction size exceeds cache size, this loads the
778: // cluster again altough it's not really needed
779: // Cluster cluster = loadCluster(cid,false);
780:
781: // If we do not pin, loadCluster may just loose the loaded cluster due to trim() after load
782: Cluster cluster = loadCluster(cid, true);
783: cluster.unpin();
784: cluster.commit(ta);
785:
786: // after the cluster is commited its lock is released and has to be
787: // updated on disk; if no lock file exists, the lock is newly created
788: // when loading
789: updateLockOnDisk(cluster, ta);
790: }
791:
792: /**
793: * Actually abort the specified cluster. This deletes t
794: * @param cid Cluster to be aborted.
795: */
796: public synchronized void abortCluster(Transaction ta, ClusterID cid)
797: throws IOException, ClassNotFoundException {
798: File tempFile = new File(basename(cid) + POSTFIX_TEMP);
799: if (tempFile.exists()) {
800: if (!tempFile.delete()) {
801: throw new IOException("Unable to delete temp cluster.");
802: }
803: }
804:
805: // FIXME: if transaction size exceeds cache size, this loads the
806: // cluster again altough it's not really needed
807: // Cluster cluster = loadCluster(cid,false);
808:
809: // If we do not pin, loadCluster may just loose the loaded cluster due to trim() after load
810: Cluster cluster = loadCluster(cid, true);
811: cluster.unpin();
812: cluster.abort(ta);
813:
814: if (cluster.isPinned()) { // If the cluster is pinned, it should be reloaded immediately.
815: /*
816: To other ozone-developers:
817:
818: What do we do if the cluster is pinned and thus may not be removed from memory?
819: Is this only the case if another transaction is waiting for this cluster to
820: be unlocked?
821: If so, should, in this case, the transaction simply reload the cluster?
822: */
823:
824: if (false) {
825: env.logWriter.newEntry(this ,
826: "abortCluster(): Unloading pinned cluster "
827: + cid + ". Should we really do that?",
828: LogWriter.INFO);
829: }
830:
831: // the above abort() call does not change the cluster in memory, so
832: // we have to reload the cluster next time
833: unloadCluster(cid, false);
834:
835: } else {
836: // the above abort() call does not change the cluster in memory, so
837: // we have to reload the cluster next time
838: unloadCluster(cid, false);
839: }
840:
841: // after the cluster is aborted its lock is released and has to be
842: // updated on disk; if no lock file exists, the lock is newly created
843: // when loading
844: updateLockOnDisk(cluster, ta);
845:
846: }
847:
848: protected void updateLockOnDisk(Cluster cluster, Transaction ta)
849: throws IOException {
850: // System.out.println ("commit " + cid + ": " + ((DefaultLock)cluster.lock).lockers.count());
851: ClusterID cid = cluster.clusterID();
852: if (cluster.lock.level(ta) == Lock.LEVEL_NONE) {
853: File lockFile = new File(basename(cid) + POSTFIX_LOCK);
854: if (lockFile.exists() && !lockFile.delete()) {
855: throw new IOException("Unable to delete lock file.");
856: }
857: } else {
858: storeData(cluster.lock, basename(cid) + POSTFIX_LOCK);
859: }
860: }
861:
862: /**
863: * Serialize and store the specified object for the specified key. This
864: * current implementation uses the file system as back end store.
865: */
866: protected void storeData(Object obj, String key) throws IOException {
867: if (env.logWriter.hasTarget(LogWriter.DEBUG3)) {
868: env.logWriter.newEntry(this , "storeData(): " + key,
869: LogWriter.DEBUG3);
870: }
871:
872: OutputStream out = new FileOutputStream(key);
873:
874: if (compressClusters) {
875: out = new GZIPOutputStream(out, 3 * 4096);
876: } else {
877: out = new BufferedOutputStream(out, 3 * 4096);
878: }
879:
880: ObjectOutputStream oout = new ObjectOutputStream(out);
881: try {
882: oout.writeObject(obj);
883: } finally {
884: oout.close();
885: }
886: }
887:
888: /**
889: * Load the data that previously has been stored for the given key.
890: */
891: protected Object loadData(String key) throws IOException,
892: ClassNotFoundException {
893: if (false && env.logWriter.hasTarget(LogWriter.DEBUG3)) {
894: env.logWriter.newEntry(this , "loadData(): " + key,
895: LogWriter.DEBUG3);
896: }
897:
898: InputStream in = new FileInputStream(key);
899:
900: if (compressClusters) {
901: in = new GZIPInputStream(in, 3 * 4096);
902: } else {
903: in = new BufferedInputStream(in, 3 * 4096);
904: }
905:
906: ObjectInputStream oin = new ObjectInputStream(in);
907: try {
908: Object result = oin.readObject();
909: return result;
910: } finally {
911: oin.close();
912: }
913: }
914:
915: protected String basename(ClusterID cid) {
916: StringBuffer filename = new StringBuffer(env.dir);
917: filename.append(Env.DATA_DIR);
918: // filename.append (File.separator);
919: filename.append(cid.value());
920: return filename.toString();
921: }
922:
923: }
|