Source Code Cross Referenced for ClusterStore.java in  » Database-DBMS » Ozone-1.1 » org » ozoneDB » core » wizardStore » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Database DBMS » Ozone 1.1 » org.ozoneDB.core.wizardStore 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        // You can redistribute this software and/or modify it under the terms of
002:        // the Ozone Core License version 1 published by ozone-db.org.
003:        //
004:        // The original code and portions created by SMB are
005:        // Copyright (C) 1997-@year@ by SMB GmbH. All rights reserved.
006:        //
007:        // $Id: ClusterStore.java,v 1.2 2002/06/08 00:49:39 mediumnet Exp $
008:
009:        package org.ozoneDB.core.wizardStore;
010:
011:        import java.io.*;
012:        import java.util.zip.*;
013:        import org.ozoneDB.DxLib.*;
014:        import org.ozoneDB.*;
015:        import org.ozoneDB.core.*;
016:        import org.ozoneDB.util.*;
017:
018:        /**
019:         * The ClusterStore is the back-end store of the wizardStore. It maintains the
020:         * cluster cache, activation/passivation and the actual persistent commits.
021:         * 
022:         * 
023:         * @author <a href="http://www.softwarebuero.de/">SMB</a>
024:         * @author <a href="http://www.medium.net/">Medium.net</a>
025:         * @version $Revision: 1.2 $Date: 2002/06/08 00:49:39 $
026:         */
027:        public final class ClusterStore {
028:
029:            public final static String POSTFIX_CLUSTER = ".cl";
030:            public final static String POSTFIX_LOCK = ".lk";
031:            public final static String POSTFIX_TEMP = ".tm";
032:            public final static String POSTFIX_SHADOW = ".sh";
033:
034:            protected final static int compressionFactor = 3;
035:
036:            protected transient Env env;
037:
038:            protected transient long touchCount;
039:
040:            protected DxMap cachedClusters;
041:
042:            protected int maxClusterSize = 64 * 1024;
043:
044:            /**
045:             * Table that maps Permissions to ClusterIDs.
046:             */
047:            protected DxMap growingClusterIDs;
048:
049:            private boolean compressClusters;
050:
051:            ClusterStore(Env _env) {
052:                env = _env;
053:                maxClusterSize = env.config.intProperty(Setup.WS_CLUSTER_SIZE,
054:                        -1);
055:                cachedClusters = new DxHashMap(64);
056:                compressClusters = env.config.booleanProperty(
057:                        Setup.WS_COMPRESS_CLUSTERS, true);
058:            }
059:
060:            public void startup() throws Exception {
061:                growingClusterIDs = new DxHashMap(32);
062:            }
063:
064:            public void shutdown() {
065:            }
066:
067:            /**
068:             * Check if the ClusterStore was cleanly shutted down.
069:             */
070:            public boolean isCleanShutdown() {
071:                File file = new File(env.dir + Env.DATA_DIR);
072:                String[] fileList = file.list();
073:
074:                for (int i = 0; i < fileList.length; i++) {
075:                    if (fileList[i].endsWith(POSTFIX_SHADOW)
076:                            || fileList[i].endsWith(POSTFIX_TEMP)) {
077:                        return false;
078:                    }
079:                }
080:                return true;
081:            }
082:
083:            /**
084:             * Search the DATA dir and recover all ClusterIDs.
085:             */
086:            public DxSet recoverClusterIDs() {
087:                File file = new File(env.dir + Env.DATA_DIR);
088:                String[] fileList = file.list();
089:
090:                DxSet result = new DxHashSet();
091:                for (int i = 0; i < fileList.length; i++) {
092:                    if (fileList[i].endsWith(POSTFIX_CLUSTER)
093:                            || fileList[i].endsWith(POSTFIX_SHADOW)) {
094:                        String cidString = fileList[i].substring(0, fileList[i]
095:                                .indexOf('.'));
096:                        long cid = Long.parseLong(cidString);
097:                        result.add(new ClusterID(cid));
098:                    }
099:                }
100:                return result;
101:            }
102:
103:            public long currentCacheSize() {
104:                long result = 0;
105:                DxIterator it = cachedClusters.iterator();
106:                Cluster cluster;
107:                while ((cluster = (Cluster) it.next()) != null) {
108:                    result += cluster.size();
109:                }
110:                return result;
111:            }
112:
113:            protected int currentBytesPerContainer() {
114:                int result = env.config.intProperty(
115:                        Setup.WS_CLUSTER_SIZE_RATIO, 256);
116:                //      env.logWriter.newEntry( this, "currentBytesPerContainer(): setup:" + result, LogWriter.DEBUG );
117:                return result;
118:
119:                //        if (cachedClusters.count() < 3) {
120:                //            int result = env.config.intProperty (Setup.WS_CLUSTER_SIZE_RATIO, 256);
121:                //            env.logWriter.newEntry (this, "currentBytesPerContainer(): config:" + result, LogWriter.DEBUG);
122:                //            return result;
123:                //            }
124:                //        else {
125:                //            int bpc = 0;
126:                //            int count = 0;
127:                //            DxIterator it = cachedClusters.iterator();
128:                //            Cluster cluster;
129:                //            while ((cluster=(Cluster)it.next()) != null) {
130:                //                count ++;
131:                //                bpc += cluster.bytesPerContainer;
132:                //                }
133:                //            int result = bpc / count;
134:                //            env.logWriter.newEntry (this, "currentBytesPerContainer(): new:" + result, LogWriter.DEBUG);
135:                //            return result;
136:                //            }
137:            }
138:
139:            //    public Cluster lruCluster() {
140:            // search the LRU cluster to speed things up; since this is not
141:            // synchronized, checking and accessing currentCluster must be done in
142:            // one line to avoid other thread to change the variable in between
143:            //        container = (currentCluster != null && currentCluster.lock != null) ? currentCluster.containerForID (id) : null;
144:            //        if (container != null) {
145:            //           // System.out.print ("+");
146:            //            return container.isDeleted() ? null : container;
147:            //            }
148:
149:            /**
150:             * @param perms Permissions of the cluster to search.
151:             * @return Cluster with the specified permissions that is good to store a
152:             * new container in it.
153:             */
154:            protected synchronized Cluster growingCluster(Permissions perms)
155:                    throws Exception {
156:                if (false && env.logWriter.hasTarget(LogWriter.DEBUG3)) {
157:                    env.logWriter.newEntry(this , "growingCluster() ",
158:                            LogWriter.DEBUG3);
159:                }
160:
161:                Cluster cluster = null;
162:                ClusterID cid = (ClusterID) growingClusterIDs
163:                        .elementForKey(perms);
164:
165:                // load the current growing cluster and check space
166:                if (cid != null) {
167:                    cluster = (Cluster) cachedClusters.elementForKey(cid);
168:                    if (cluster == null) {
169:                        cluster = loadCluster(cid, false);
170:                    }
171:                    // check cluster size and if it was deactivated by the trimCache();
172:                    // use this cluster only if it isn't used by another ta
173:                    if (cluster.lock == null
174:                            || cluster.size() >= maxClusterSize
175:                            || cluster.lock.level(null) > Lock.LEVEL_NONE
176:                            && !cluster.lock
177:                                    .isAcquiredBy(env.transactionManager
178:                                            .currentTA())) {
179:
180:                        if (false && env.logWriter.hasTarget(LogWriter.DEBUG)) {
181:                            env.logWriter.newEntry(this ,
182:                                    "growingCluster(): growing cluster not usable: cid="
183:                                            + cluster.clusterID()
184:                                            + " size="
185:                                            + cluster.size()
186:                                            + " lockLevel="
187:                                            + (cluster.lock != null ? String
188:                                                    .valueOf(cluster.lock
189:                                                            .level(null))
190:                                                    : "null"), LogWriter.DEBUG);
191:                        }
192:
193:                        growingClusterIDs.removeForKey(perms);
194:                        cluster = null;
195:                    }
196:                }
197:
198:                // search all currently loaded clusters
199:                if (cluster == null) {
200:                    DxIterator it = cachedClusters.iterator();
201:                    Cluster cursor;
202:                    while ((cursor = (Cluster) it.next()) != null) {
203:                        // System.out.println (cursor.size());
204:                        if (cursor.size() < maxClusterSize
205:                                && cursor.permissions.equals(perms)) {
206:                            cluster = cursor;
207:
208:                            // make sure that there is enough space for the clusters to be
209:                            // able to grow to the max size
210:                            // ensureCacheSpace (maxClusterSize - cluster.size());
211:                            trimCache();
212:
213:                            // check if the cluster deactivated be the ensureCacheSpace
214:                            if (cluster.getLock() == null) {
215:                                env.logWriter.newEntry(this ,
216:                                        "growingCluster(): loaded cluster was deactivated: "
217:                                                + cluster.clusterID(),
218:                                        LogWriter.DEBUG);
219:                                cluster = null;
220:                            } else if (cluster.lock.level(null) > Lock.LEVEL_NONE
221:                                    && !cluster.lock
222:                                            .isAcquiredBy(env.transactionManager
223:                                                    .currentTA())) {
224:                                // use this cluster only if it isn't used by another ta
225:                                if (false) {
226:                                    env.logWriter.newEntry(this ,
227:                                            "growingCluster(): loaded cluster is locked by another transaction: "
228:                                                    + cluster.clusterID(),
229:                                            LogWriter.DEBUG);
230:                                }
231:                                cluster = null;
232:                            } else {
233:                                growingClusterIDs.addForKey(
234:                                        cluster.clusterID(), perms);
235:                                if (false) {
236:                                    env.logWriter
237:                                            .newEntry(
238:                                                    this ,
239:                                                    "growingCluster(): loaded cluster is now growing cluster: "
240:                                                            + cluster
241:                                                                    .clusterID()
242:                                                            + " size:"
243:                                                            + cluster.size(),
244:                                                    LogWriter.DEBUG);
245:                                }
246:                                break;
247:                            }
248:                        }
249:                    }
250:                }
251:
252:                // write a new, empty cluster and load it just after to ensures
253:                // that new cluster is "regularly" loaded
254:                if (cluster == null) {
255:                    cluster = createANewEmptyAndUsableCluster(perms);
256:                }
257:
258:                return cluster;
259:            }
260:
261:            /**
262:                Creates a cluster which is
263:                <UL>
264:                    <LI>new</LI>
265:                    <LI>empty</LI>
266:                    <LI>usable and</LI>
267:                    <LI>not locked</LI>
268:                </UL>
269:             */
270:            protected synchronized Cluster createANewEmptyAndUsableCluster(
271:                    Permissions perms) throws IOException,
272:                    ClassNotFoundException {
273:                //      env.logWriter.newEntry( this, "growingCluster(): creating new cluster...", LogWriter.DEBUG );
274:                Cluster cluster = new Cluster(new ClusterID(env.keyGenerator
275:                        .nextID()), perms, env.transactionManager.newLock(),
276:                        256);
277:
278:                // the new cluster has to be written to disk in order to make
279:                // saveShadow() and things work;
280:                storeData(cluster, basename(cluster.clusterID())
281:                        + POSTFIX_CLUSTER);
282:
283:                if (false) { // Old
284:                    // If we do not pin, the freshly created cluster may be deactivated and thus its lock may be null
285:                    cluster.pin();
286:                    try {
287:                        // since we don't check the cache size after registering a cont
288:                        // we have to make sure that there is enough space for this cluster
289:                        // to grow to the max size
290:                        // ensureCacheSpace (maxClusterSize);
291:                        trimCache();
292:                        cluster = loadCluster(cluster.clusterID(), false);
293:                    } finally {
294:                        cluster.unpin();
295:                    }
296:                } else {
297:                    // since we don't check the cache size after registering a cont
298:                    // we have to make sure that there is enough space for this cluster
299:                    // to grow to the max size
300:                    // ensureCacheSpace (maxClusterSize);
301:                    trimCache();
302:
303:                    // We need to load the cluster pinned because loadCluster guarantees only to return a not-unloaded cluster if it is pinned.
304:                    cluster = loadCluster(cluster.clusterID(), true);
305:                    cluster.unpin();
306:                }
307:
308:                growingClusterIDs.addForKey(cluster.clusterID(), perms);
309:                //      env.logWriter.newEntry( this, "growingCluster(): new cluster created: " + cluster.clusterID(), LogWriter.DEBUG );
310:
311:                return cluster;
312:            }
313:
314:            /**
315:                Returns or creates a cluster which is not locked so that locking it will succeed.
316:                The returned cluster is only guaranteed to be not locked by any other thread as long as this
317:                method is called during synchronization to this ClusterStore.
318:             */
319:            protected Cluster giveMeAnUnlockedCluster(Permissions perms)
320:                    throws IOException, ClassNotFoundException {
321:                return createANewEmptyAndUsableCluster(perms);
322:            }
323:
324:            /**
325:                Associates the specified container with a cluster.
326:
327:                Iff this method returns normally (without exception), the container is pinned and thus
328:                has to be unpinned.
329:
330:                Iff this method returns normally (without exception), the container (and thus the cluster of the container)
331:                is write locked
332:                
333:                @param container Container to be registered with one cluster.
334:             */
335:            public void registerContainerAndPinAndLock(
336:                    WizardObjectContainer container, Permissions perms,
337:                    Transaction locker, int lockLevel) throws Exception {
338:                if (false && env.logWriter.hasTarget(LogWriter.DEBUG3)) {
339:                    env.logWriter.newEntry(this , "registerContainer()",
340:                            LogWriter.DEBUG3);
341:                }
342:
343:                Cluster cluster = null;
344:
345:                boolean pinned = false;
346:                boolean locked = false;
347:                boolean alright = false;
348:
349:                try {
350:                    synchronized (this ) {
351:                        cluster = growingCluster(perms);
352:
353:                        Lock clusterLock = cluster.getLock();
354:                        int prevLevel = clusterLock.tryAcquire(locker,
355:                                lockLevel);
356:
357:                        if (prevLevel == Lock.NOT_ACQUIRED) { // The cluster we are trying to lock is already locked, so we take another cluster
358:                            cluster = giveMeAnUnlockedCluster(perms);
359:
360:                            clusterLock = cluster.getLock();
361:                            prevLevel = clusterLock.tryAcquire(locker,
362:                                    lockLevel);
363:
364:                            if (prevLevel == Lock.NOT_ACQUIRED) {
365:                                throw new Error(
366:                                        "BUG! We could not acquire a lock for an unlocked cluster.");
367:                            }
368:                        }
369:                        locked = true;
370:
371:                        cluster.registerContainer(container);
372:                        container.pin();
373:                        pinned = true;
374:                    }
375:                    cluster.updateLockLevel(locker);
376:
377:                    if (false && env.logWriter.hasTarget(LogWriter.DEBUG3)) {
378:                        env.logWriter.newEntry(this , "    cluster: "
379:                                + cluster.clusterID(), LogWriter.DEBUG3);
380:                    }
381:                    alright = true;
382:                } finally {
383:                    if (!alright) {
384:                        if (locked) {
385:                            cluster.getLock().release(locker);
386:                        }
387:                        if (pinned) {
388:                            container.unpin();
389:                        }
390:                    }
391:                }
392:            }
393:
394:            public void invalidateContainer(WizardObjectContainer container) /*throws Exception*/{
395:                synchronized (container) {
396:                    container.getCluster().removeContainer(container);
397:                    container.setCluster(null);
398:                }
399:            }
400:
401:            protected Cluster restoreCluster(ClusterID cid) throws Exception {
402:                String basename = basename(cid);
403:                Cluster cluster;
404:
405:                new File(basename + POSTFIX_LOCK).delete();
406:                new File(basename + POSTFIX_TEMP).delete();
407:
408:                File shadowFile = new File(basename + POSTFIX_SHADOW);
409:                File clusterFile = new File(basename + POSTFIX_CLUSTER);
410:
411:                if (shadowFile.exists()) {
412:                    if (!shadowFile.renameTo(clusterFile)) {
413:                        throw new IOException("Unable to rename shadow file.");
414:                    }
415:                }
416:                cluster = (Cluster) loadData(basename + POSTFIX_CLUSTER);
417:                activateCluster(cluster, 0);
418:
419:                return cluster;
420:            }
421:
422:            /**
423:             * Make sure the corresponding cluster is in the cache. While loading
424:             * clusters, we may have to throw away (and maybe store) some currently
425:             * cached clusters.
426:             * 
427:             * 
428:             * @param cid ClusterID of the cluster to load.
429:             * @param pin
430:             	wether the loaded cluster should be pinned as soon as it is loaded
431:             	so that there may be no chance to unload unless it is unpinned.
432:             	If this parameter is set to true, the user has to unpin the cluster.
433:             	If this parameter is set to false, the cluster may already be unloaded when this method returns.
434:             	after using it.
435:             */
436:            public Cluster loadCluster(ClusterID cid, boolean pin)
437:                    throws IOException, ClassNotFoundException {
438:                Cluster cluster = (Cluster) cachedClusters.elementForKey(cid);
439:                if (cluster == null) {
440:
441:                    if (false && env.logWriter.hasTarget(LogWriter.INFO)) {
442:                        env.logWriter.newEntry(this ,
443:                                "loadCluster(): load cluster from disk: "
444:                                        + cid.toString(), LogWriter.INFO);
445:                    }
446:
447:                    String basename = basename(cid);
448:                    String clusterName = basename + POSTFIX_CLUSTER;
449:                    String lockName = basename + POSTFIX_LOCK;
450:
451:                    int clusterByteSize = (int) new File(clusterName).length();
452:                    if (compressClusters) {
453:                        clusterByteSize *= compressionFactor;
454:                    }
455:
456:                    // make sure that many different threads don't load
457:                    // to much data before the currently synchronized thread
458:                    // can trim the cache
459:                    trimCache();
460:                    cluster = (Cluster) loadData(clusterName);
461:
462:                    synchronized (this ) {
463:
464:                        // now we have to check the cachedClusters table inside the
465:                        // synchronized block to see if someone did register this
466:                        // cluster while we loaded it
467:                        Cluster interimCluster = (Cluster) cachedClusters
468:                                .elementForKey(cid);
469:                        if (interimCluster != null) {
470:                            if (false) { // This is not worth a warning. It happens, and with appropriate locking|pinning, it is no problem or danger.
471:                                env.logWriter
472:                                        .newEntry(
473:                                                this ,
474:                                                "loadCluster(): cluster was loaded by another thread too; droping my copy",
475:                                                LogWriter.WARN);
476:                            }
477:                            cluster = interimCluster;
478:
479:                            if (pin) {
480:                                cluster.pin();
481:                            }
482:                        } else {
483:                            // we are going to mess with the cluster; it seems that the cluster
484:                            // is not visible to other thread until it is added to cachedClusters,
485:                            // however, IBM jdk throws an exception in cluster.updateLockLevel, which
486:                            // seems to be related to the initialization in the following block
487:                            synchronized (cluster) {
488:                                // locks are only there if the lock level is >= READ
489:                                try {
490:                                    cluster.setLock((Lock) loadData(lockName));
491:                                    new File(lockName).delete();
492:                                    ((MROWLock) cluster.getLock())
493:                                            .setClusterID(cluster.clusterID());
494:                                } catch (Exception e) {
495:                                    if (env.logWriter
496:                                            .hasTarget(LogWriter.DEBUG3)) {
497:                                        env.logWriter
498:                                                .newEntry(
499:                                                        this ,
500:                                                        "    Unable to load lock from disk - creating a new lock.",
501:                                                        LogWriter.DEBUG3);
502:                                    }
503:                                    cluster.setLock(env.transactionManager
504:                                            .newLock());
505:                                    ((MROWLock) cluster.getLock())
506:                                            .setClusterID(cluster.clusterID());
507:                                }
508:
509:                                if (pin) { // We pin inside the synchronization to the cluster, because calling pin() will try another synchronization and two nested synchronizations to an object are faster than two serial synchronizations.
510:                                    cluster.pin();
511:                                }
512:
513:                                activateCluster(cluster, clusterByteSize);
514:                            }
515:
516:                            if (clusterByteSize > maxClusterSize * 2) {
517:                                splitCluster(cluster);
518:                            }
519:
520:                            cachedClusters.addForKey(cluster, cluster
521:                                    .clusterID());
522:
523:                            trimCache();
524:                        }
525:                    }
526:                }
527:                return cluster;
528:            }
529:
530:            public void splitCluster(Cluster cluster) {
531:            }
532:
533:            /**
534:             * Remove cluster from the cluster cache.
535:             * @param cid
536:             */
537:            public void unloadCluster(ClusterID cid, boolean deactivate)
538:                    throws IOException {
539:                if (env.logWriter.hasTarget(LogWriter.DEBUG)) {
540:                    env.logWriter.newEntry(this , "unloadCluster(" + cid + ","
541:                            + deactivate + ").", LogWriter.DEBUG);
542:                }
543:
544:                Cluster cluster = (Cluster) cachedClusters.removeForKey(cid);
545:
546:                if (deactivate) {
547:                    deactivateCluster(cluster);
548:                }
549:            }
550:
551:            /**
552:             * Ensure that there is at least the specified size of free space in the
553:             * cluster cache. Under some circumstances clusters (currently invoked)
554:             * cannot be deactivated. Therefore this method cannot guarantee that the
555:             * needed space is free afterwards.<p>
556:             *
557:             * This is the central method of the deactivation of containers that are
558:             * currently in use. This is different from the commit behaviour.
559:             */
560:            protected void trimCache() throws IOException {
561:
562:                long freeSpace = env.freeMemory();
563:                if (false && env.logWriter.hasTarget(LogWriter.DEBUG)) {
564:                    env.logWriter.newEntry(this , "trimCache():  free:"
565:                            + freeSpace, LogWriter.DEBUG2);
566:                }
567:
568:                if (freeSpace <= 0) {
569:                    synchronized (this ) {
570:                        long cacheSize = 0;
571:
572:                        // build priority queue for all currently loaded clusters
573:                        DxMap priorityQueue = new DxTreeMap();
574:                        DxIterator it = cachedClusters.iterator();
575:                        Cluster cluster;
576:                        while ((cluster = (Cluster) it.next()) != null) {
577:                            priorityQueue.addForKey(cluster, cluster
578:                                    .cachePriority());
579:                            cacheSize += cluster.size();
580:                        }
581:
582:                        // free at least 20% of the cache
583:                        long neededSpace = Math.max(maxClusterSize * 2,
584:                                cacheSize / 20);
585:                        if (env.logWriter.hasTarget(LogWriter.DEBUG)) {
586:                            env.logWriter.newEntry(this ,
587:                                    "   cache: " + cacheSize + " to be freed:"
588:                                            + neededSpace, LogWriter.DEBUG2);
589:                        }
590:
591:                        // throw away (deactivate) clusters, lowest priority first
592:                        it = priorityQueue.iterator();
593:                        while (freeSpace < neededSpace
594:                                && (cluster = (Cluster) it.next()) != null) {
595:
596:                            // if any of the containers is currently invoked, the cluster
597:                            // must not be written and must stay in memory
598:                            // The same applies for pinned containers.
599:                            // FIXME: Once pinning is fully established, we may not need to call isInvoked() anymore.
600:                            if ((!cluster.isPinned()) && (!cluster.isInvoked())) {
601:                                if (env.logWriter.hasTarget(LogWriter.DEBUG)) {
602:                                    env.logWriter.newEntry(this ,
603:                                            "DEACTIVATE cluster: "
604:                                                    + cluster.clusterID(),
605:                                            LogWriter.DEBUG2);
606:                                }
607:
608:                                cluster = (Cluster) it.removeObject();
609:                                unloadCluster(cluster.clusterID(), true);
610:
611:                                // try to free the memory of the unloaded cluster
612:                                System.gc();
613:                                freeSpace = env.freeMemory();
614:
615:                                if (env.logWriter.hasTarget(LogWriter.DEBUG)) {
616:                                    env.logWriter.newEntry(this , "   free:"
617:                                            + freeSpace, LogWriter.DEBUG2);
618:                                }
619:                            } else {
620:                                if (false) {
621:                                    env.logWriter.newEntry(this ,
622:                                            "trying to DEACTIVATE 'invoked' cluster: "
623:                                                    + cluster.clusterID(),
624:                                            LogWriter.WARN);
625:                                }
626:                            }
627:                        }
628:                    }
629:                }
630:            }
631:
632:            /**
633:             * This method us called right after the specified Cluster was loaded from
634:             * disk.
635:             */
636:            protected void activateCluster(Cluster cluster, int size) {
637:                if (env.logWriter.hasTarget(LogWriter.DEBUG3)) {
638:                    env.logWriter.newEntry(this , "activateCluster(): "
639:                            + cluster.clusterID, LogWriter.DEBUG3);
640:                }
641:                cluster.env = env;
642:                cluster.clusterStore = this ;
643:                cluster.touch();
644:                cluster.setCurrentSize(size);
645:            }
646:
647:            /**
648:             * Deactivate the specified cluster before it is written to disk. The
649:             * specified cluster will be removed from the cluster cache. If it currently
650:             * has shadows, they are written to disk. If any of the containers are
651:             * currently invoked (should normally never happen), the shadows must stay
652:             * in memory.
653:             */
654:            protected void deactivateCluster(Cluster cluster)
655:                    throws IOException {
656:                if (env.logWriter.hasTarget(LogWriter.DEBUG)) {
657:                    env.logWriter.newEntry(this , "deactivateCluster(): "
658:                            + cluster.clusterID + " priority: "
659:                            + cluster.cachePriority(), LogWriter.DEBUG);
660:                    env.logWriter.newEntry(this , "    lock: "
661:                            + cluster.lock.level(null), LogWriter.DEBUG);
662:                }
663:
664:                String basename = basename(cluster.clusterID());
665:
666:                synchronized (this ) { // We synchronize on this ClusterStore so that a freshly returned cluster within the ClusterStore lock may not be deactivated during the lock time.
667:                    synchronized (cluster) {
668:                        // any lock levels >= READ has to be persistent
669:                        if (cluster.lock.level(null) >= Lock.LEVEL_READ) {
670:                            if (env.logWriter.hasTarget(LogWriter.DEBUG)) {
671:                                env.logWriter.newEntry(this ,
672:                                        "    write lock to disk: "
673:                                                + cluster.clusterID,
674:                                        LogWriter.DEBUG);
675:                            }
676:
677:                            storeData(cluster.getLock(), basename
678:                                    + POSTFIX_LOCK);
679:                        } else {
680:                            File lockFile = new File(basename + POSTFIX_LOCK);
681:                            if (lockFile.exists()) {
682:                                lockFile.delete();
683:                            }
684:                        }
685:
686:                        // clusters with WRITE lock are supposed to be dirty
687:                        if (cluster.lock.level(null) > Lock.LEVEL_UPGRADE) {
688:                            if (env.logWriter.hasTarget(LogWriter.DEBUG)) {
689:                                env.logWriter.newEntry(this ,
690:                                        "    write cluster: "
691:                                                + cluster.clusterID,
692:                                        LogWriter.DEBUG);
693:                            }
694:
695:                            storeData(cluster, basename + POSTFIX_CLUSTER);
696:                        }
697:
698:                        // mark the cluster to be not valid
699:                        cluster.setLock(null);
700:                    }
701:                }
702:            }
703:
704:            /**
705:             * Store the specified cluster on disk. Write temp files first. If this
706:             * write fails, the original are still valid. The cluster may has been
707:             * written to the disk already, if is was deactivated while transaction.
708:             * But in case the cluster (and its changes) are only in memory, we have to
709:             * write now to check if this is possible without errors.
710:             * 
711:             * Note: This only writes all currently commited transaction results to the
712:             * disk. This is different from the deactivation behaviour.
713:             * 
714:             * 
715:             * @param cid Cluster to be prepare-commited.
716:             * @exception Exception None of the clusters are written to disk.
717:             */
718:            public synchronized void prepareCommitCluster(Transaction ta,
719:                    ClusterID cid) throws IOException, ClassNotFoundException {
720:                if (false && env.logWriter.hasTarget(LogWriter.DEBUG3)) {
721:                    env.logWriter.newEntry(this , "prepareCommitCluster(): "
722:                            + cid, LogWriter.DEBUG3);
723:                }
724:
725:                //      Cluster cluster = loadCluster(cid,false);
726:
727:                // If we do not pin, loadCluster may just loose the loaded cluster due to trim() after load
728:
729:                Cluster cluster = loadCluster(cid, true);
730:                cluster.unpin();
731:                cluster.prepareCommit(ta);
732:                if (cluster.lock.level(null) >= Lock.LEVEL_WRITE) {
733:                    String tempFilename = basename(cid) + POSTFIX_TEMP;
734:
735:                    // write changed cluster in temp file; the lock is written in
736:                    // commit() and abort()
737:                    storeData(cluster, tempFilename);
738:
739:                    long fileSize = new File(tempFilename).length();
740:                    if (fileSize == 0L) {
741:                        throw new IOException(
742:                                "Unable to determine cluster file size.");
743:                    }
744:
745:                    if (compressClusters) {
746:                        fileSize *= compressionFactor;
747:                    }
748:                    cluster.setCurrentSize((int) fileSize);
749:                }
750:            }
751:
752:            /**
753:             * Actually commit the specified cluster. This simply renames the temp file
754:             * to be the new "original" ones. The rename operation MUST NOT fail.
755:             * 
756:             * 
757:             * @param cid Cluster to be commited.
758:             */
759:            public synchronized void commitCluster(Transaction ta, ClusterID cid)
760:                    throws IOException, ClassNotFoundException {
761:                if (false && env.logWriter.hasTarget(LogWriter.DEBUG3)) {
762:                    env.logWriter.newEntry(this , "commitCluster(): " + cid,
763:                            LogWriter.DEBUG3);
764:                }
765:
766:                String basename = basename(cid);
767:                File clusterFile = new File(basename + POSTFIX_CLUSTER);
768:                File tempFile = new File(basename + POSTFIX_TEMP);
769:
770:                if (tempFile.exists()) {
771:                    clusterFile.delete();
772:                    if (!tempFile.renameTo(clusterFile)) {
773:                        throw new IOException("Unable to rename temp cluster.");
774:                    }
775:                }
776:
777:                // FIXME: if transaction size exceeds cache size, this loads the
778:                // cluster again altough it's not really needed
779:                //      Cluster cluster = loadCluster(cid,false);
780:
781:                // If we do not pin, loadCluster may just loose the loaded cluster due to trim() after load
782:                Cluster cluster = loadCluster(cid, true);
783:                cluster.unpin();
784:                cluster.commit(ta);
785:
786:                // after the cluster is commited its lock is released and has to be
787:                // updated on disk; if no lock file exists, the lock is newly created
788:                // when loading
789:                updateLockOnDisk(cluster, ta);
790:            }
791:
792:            /**
793:             * Actually abort the specified cluster. This deletes t
794:             * @param cid Cluster to be aborted.
795:             */
796:            public synchronized void abortCluster(Transaction ta, ClusterID cid)
797:                    throws IOException, ClassNotFoundException {
798:                File tempFile = new File(basename(cid) + POSTFIX_TEMP);
799:                if (tempFile.exists()) {
800:                    if (!tempFile.delete()) {
801:                        throw new IOException("Unable to delete temp cluster.");
802:                    }
803:                }
804:
805:                // FIXME: if transaction size exceeds cache size, this loads the
806:                // cluster again altough it's not really needed
807:                //      Cluster cluster = loadCluster(cid,false);
808:
809:                // If we do not pin, loadCluster may just loose the loaded cluster due to trim() after load
810:                Cluster cluster = loadCluster(cid, true);
811:                cluster.unpin();
812:                cluster.abort(ta);
813:
814:                if (cluster.isPinned()) { // If the cluster is pinned, it should be reloaded immediately.
815:                    /*
816:                        To other ozone-developers:
817:                        
818:                        What do we do if the cluster is pinned and thus may not be removed from memory?
819:                        Is this only the case if another transaction is waiting for this cluster to
820:                        be unlocked?
821:                        If so, should, in this case, the transaction simply reload the cluster?
822:                     */
823:
824:                    if (false) {
825:                        env.logWriter.newEntry(this ,
826:                                "abortCluster(): Unloading pinned cluster "
827:                                        + cid + ". Should we really do that?",
828:                                LogWriter.INFO);
829:                    }
830:
831:                    // the above abort() call does not change the cluster in memory, so
832:                    // we have to reload the cluster next time
833:                    unloadCluster(cid, false);
834:
835:                } else {
836:                    // the above abort() call does not change the cluster in memory, so
837:                    // we have to reload the cluster next time
838:                    unloadCluster(cid, false);
839:                }
840:
841:                // after the cluster is aborted its lock is released and has to be
842:                // updated on disk; if no lock file exists, the lock is newly created
843:                // when loading
844:                updateLockOnDisk(cluster, ta);
845:
846:            }
847:
848:            protected void updateLockOnDisk(Cluster cluster, Transaction ta)
849:                    throws IOException {
850:                //        System.out.println ("commit " + cid + ": " + ((DefaultLock)cluster.lock).lockers.count());
851:                ClusterID cid = cluster.clusterID();
852:                if (cluster.lock.level(ta) == Lock.LEVEL_NONE) {
853:                    File lockFile = new File(basename(cid) + POSTFIX_LOCK);
854:                    if (lockFile.exists() && !lockFile.delete()) {
855:                        throw new IOException("Unable to delete lock file.");
856:                    }
857:                } else {
858:                    storeData(cluster.lock, basename(cid) + POSTFIX_LOCK);
859:                }
860:            }
861:
862:            /**
863:             * Serialize and store the specified object for the specified key. This
864:             * current implementation uses the file system as back end store.
865:             */
866:            protected void storeData(Object obj, String key) throws IOException {
867:                if (env.logWriter.hasTarget(LogWriter.DEBUG3)) {
868:                    env.logWriter.newEntry(this , "storeData(): " + key,
869:                            LogWriter.DEBUG3);
870:                }
871:
872:                OutputStream out = new FileOutputStream(key);
873:
874:                if (compressClusters) {
875:                    out = new GZIPOutputStream(out, 3 * 4096);
876:                } else {
877:                    out = new BufferedOutputStream(out, 3 * 4096);
878:                }
879:
880:                ObjectOutputStream oout = new ObjectOutputStream(out);
881:                try {
882:                    oout.writeObject(obj);
883:                } finally {
884:                    oout.close();
885:                }
886:            }
887:
888:            /**
889:             * Load the data that previously has been stored for the given key.
890:             */
891:            protected Object loadData(String key) throws IOException,
892:                    ClassNotFoundException {
893:                if (false && env.logWriter.hasTarget(LogWriter.DEBUG3)) {
894:                    env.logWriter.newEntry(this , "loadData(): " + key,
895:                            LogWriter.DEBUG3);
896:                }
897:
898:                InputStream in = new FileInputStream(key);
899:
900:                if (compressClusters) {
901:                    in = new GZIPInputStream(in, 3 * 4096);
902:                } else {
903:                    in = new BufferedInputStream(in, 3 * 4096);
904:                }
905:
906:                ObjectInputStream oin = new ObjectInputStream(in);
907:                try {
908:                    Object result = oin.readObject();
909:                    return result;
910:                } finally {
911:                    oin.close();
912:                }
913:            }
914:
915:            protected String basename(ClusterID cid) {
916:                StringBuffer filename = new StringBuffer(env.dir);
917:                filename.append(Env.DATA_DIR);
918:                // filename.append (File.separator);
919:                filename.append(cid.value());
920:                return filename.toString();
921:            }
922:
923:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.