Source Code Cross Referenced for TransactionalObjectManagerImpl.java in  » Net » Terracotta » com » tc » objectserver » tx » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Net » Terracotta » com.tc.objectserver.tx 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        /*
002:         * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003:         * notice. All rights reserved.
004:         */
005:        package com.tc.objectserver.tx;
006:
007:        import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
008:
009:        import com.tc.logging.TCLogger;
010:        import com.tc.logging.TCLogging;
011:        import com.tc.object.ObjectID;
012:        import com.tc.object.tx.ServerTransactionID;
013:        import com.tc.objectserver.api.ObjectManager;
014:        import com.tc.objectserver.api.ObjectManagerLookupResults;
015:        import com.tc.objectserver.context.ApplyTransactionContext;
016:        import com.tc.objectserver.context.CommitTransactionContext;
017:        import com.tc.objectserver.context.ObjectManagerResultsContext;
018:        import com.tc.objectserver.context.RecallObjectsContext;
019:        import com.tc.objectserver.gtx.ServerGlobalTransactionManager;
020:        import com.tc.properties.TCPropertiesImpl;
021:        import com.tc.text.PrettyPrintable;
022:        import com.tc.text.PrettyPrinter;
023:        import com.tc.util.Assert;
024:
025:        import java.io.PrintWriter;
026:        import java.util.ArrayList;
027:        import java.util.Collection;
028:        import java.util.Collections;
029:        import java.util.HashMap;
030:        import java.util.HashSet;
031:        import java.util.IdentityHashMap;
032:        import java.util.Iterator;
033:        import java.util.LinkedHashMap;
034:        import java.util.List;
035:        import java.util.Map;
036:        import java.util.Set;
037:        import java.util.Map.Entry;
038:
039:        /**
040:         * This class keeps track of locally checked out objects for applys and maintain the objects to txnid mapping in the
041:         * server. It wraps calls going to object manager from lookup, apply, commit stages
042:         */
043:        public class TransactionalObjectManagerImpl implements 
044:                TransactionalObjectManager, PrettyPrintable {
045:
046:            private static final TCLogger logger = TCLogging
047:                    .getLogger(TransactionalObjectManagerImpl.class);
048:            private static final int MAX_COMMIT_SIZE = TCPropertiesImpl
049:                    .getProperties().getInt(
050:                            "l2.objectmanager.maxObjectsToCommit");
051:            private final ObjectManager objectManager;
052:            private final TransactionSequencer sequencer;
053:            private final ServerGlobalTransactionManager gtxm;
054:
055:            /*
056:             * This map contains ObjectIDs to TxnObjectGrouping that contains these objects
057:             */
058:            private final Map checkedOutObjects = new HashMap();
059:            private final Map applyPendingTxns = new HashMap();
060:            private final LinkedHashMap commitPendingTxns = new LinkedHashMap();
061:
062:            private final Set pendingObjectRequest = new HashSet();
063:            private final PendingList pendingTxnList = new PendingList();
064:            private final LinkedQueue processedPendingLookups = new LinkedQueue();
065:            private final LinkedQueue processedApplys = new LinkedQueue();
066:
067:            private final TransactionalStageCoordinator txnStageCoordinator;
068:
069:            public TransactionalObjectManagerImpl(ObjectManager objectManager,
070:                    TransactionSequencer sequencer,
071:                    ServerGlobalTransactionManager gtxm,
072:                    TransactionalStageCoordinator txnStageCoordinator) {
073:                this .objectManager = objectManager;
074:                this .sequencer = sequencer;
075:                this .gtxm = gtxm;
076:                this .txnStageCoordinator = txnStageCoordinator;
077:            }
078:
079:            // ProcessTransactionHandler Method
080:            public void addTransactions(Collection txns) {
081:                sequencer.addTransactions(txns);
082:                txnStageCoordinator.initiateLookup();
083:            }
084:
085:            // LookupHandler Method
086:            public void lookupObjectsForTransactions() {
087:                processPendingIfNecessary();
088:                while (true) {
089:                    ServerTransaction txn = sequencer.getNextTxnToProcess();
090:                    if (txn == null)
091:                        break;
092:                    ServerTransactionID stxID = txn.getServerTransactionID();
093:                    if (gtxm.initiateApply(stxID)) {
094:                        lookupObjectsForApplyAndAddToSink(txn, true);
095:                    } else {
096:                        // These txns are already applied, hence just sending it to the next stage.
097:                        txnStageCoordinator
098:                                .addToApplyStage(new ApplyTransactionContext(
099:                                        txn));
100:                    }
101:                }
102:            }
103:
104:            private synchronized void processPendingIfNecessary() {
105:                if (addProcessedPendingLookups()) {
106:                    processPendingTransactions();
107:                }
108:            }
109:
110:            public synchronized void lookupObjectsForApplyAndAddToSink(
111:                    ServerTransaction txn, boolean newTxn) {
112:                Collection oids = txn.getObjectIDs();
113:                // log("lookupObjectsForApplyAndAddToSink(): START : " + txn.getServerTransactionID() + " : " + oids);
114:                Set newRequests = new HashSet();
115:                boolean makePending = false;
116:                for (Iterator i = oids.iterator(); i.hasNext();) {
117:                    ObjectID oid = (ObjectID) i.next();
118:                    TxnObjectGrouping tog;
119:                    if (pendingObjectRequest.contains(oid)) {
120:                        makePending = true;
121:                    } else if ((tog = (TxnObjectGrouping) checkedOutObjects
122:                            .get(oid)) == null) {
123:                        // 1) Object is not already checked out or
124:                        newRequests.add(oid);
125:                    } else if (tog.limitReached()) {
126:                        // 2) the object is available, but we dont use it to prevent huge commits, large txn acks etc
127:                        newRequests.add(oid);
128:                        // log(shortDescription());
129:                        // log("Limit Reached. " + oid + " - " + tog.shortDescription());
130:                    }
131:                }
132:                // TODO:: make cache and stats right
133:                LookupContext lookupContext = null;
134:                if (!newRequests.isEmpty()) {
135:                    lookupContext = new LookupContext(newRequests,
136:                            (newTxn ? txn.getNewObjectIDs()
137:                                    : Collections.EMPTY_SET), txn
138:                                    .getServerTransactionID());
139:                    if (objectManager.lookupObjectsFor(txn.getSourceID(),
140:                            lookupContext)) {
141:                        addLookedupObjects(lookupContext);
142:                    } else {
143:                        // New request went pending in object manager
144:                        // log("lookupObjectsForApplyAndAddToSink(): New Request went pending : " + newRequests);
145:                        makePending = true;
146:                        pendingObjectRequest.addAll(newRequests);
147:                    }
148:                }
149:                if (makePending) {
150:                    // log("lookupObjectsForApplyAndAddToSink(): Make Pending : " + txn.getServerTransactionID());
151:                    makePending(txn);
152:                    if (lookupContext != null)
153:                        lookupContext.makePending();
154:                } else {
155:                    ServerTransactionID txnID = txn.getServerTransactionID();
156:                    TxnObjectGrouping newGrouping = new TxnObjectGrouping(
157:                            txnID, txn.getNewRoots());
158:                    mergeTransactionGroupings(oids, newGrouping);
159:                    applyPendingTxns.put(txnID, newGrouping);
160:                    txnStageCoordinator
161:                            .addToApplyStage(new ApplyTransactionContext(txn,
162:                                    getRequiredObjectsMap(oids, newGrouping
163:                                            .getObjects())));
164:                    makeUnpending(txn);
165:                    // log("lookupObjectsForApplyAndAddToSink(): Success: " + txn.getServerTransactionID());
166:                }
167:            }
168:
169:            public String shortDescription() {
170:                return "TxnObjectManager : checked Out count = "
171:                        + checkedOutObjects.size() + " apply pending txn = "
172:                        + applyPendingTxns.size() + " commit pending = "
173:                        + commitPendingTxns.size() + " pending txns = "
174:                        + pendingTxnList.size() + " pending object requests = "
175:                        + pendingObjectRequest.size();
176:            }
177:
178:            private Map getRequiredObjectsMap(Collection oids, Map objects) {
179:                HashMap map = new HashMap(oids.size());
180:                for (Iterator i = oids.iterator(); i.hasNext();) {
181:                    Object oid = i.next();
182:                    Object mo = objects.get(oid);
183:                    if (mo == null) {
184:                        dump();
185:                        log("NULL !! " + oid + " not found ! " + oids);
186:                        log("Map contains " + objects);
187:                        throw new AssertionError("Object is NULL !! : " + oid);
188:                    }
189:                    map.put(oid, mo);
190:                }
191:                return map;
192:            }
193:
194:            private void log(String message) {
195:                logger.info(message);
196:            }
197:
198:            // This method written to be optimized to perform large merges fast. Hence the code flow might not
199:            // look natural.
200:            private void mergeTransactionGroupings(Collection oids,
201:                    TxnObjectGrouping newGrouping) {
202:                long start = System.currentTimeMillis();
203:                for (Iterator i = oids.iterator(); i.hasNext();) {
204:                    ObjectID oid = (ObjectID) i.next();
205:                    TxnObjectGrouping oldGrouping = (TxnObjectGrouping) checkedOutObjects
206:                            .get(oid);
207:                    if (oldGrouping == null) {
208:                        throw new AssertionError(
209:                                "Transaction Grouping for lookedup objects is Null !! "
210:                                        + oid);
211:                    } else if (oldGrouping != newGrouping
212:                            && oldGrouping.isActive()) {
213:                        ServerTransactionID oldTxnId = oldGrouping
214:                                .getServerTransactionID();
215:                        // This merge has a sideeffect of setting all reference contained in oldGrouping to null.
216:                        newGrouping.merge(oldGrouping);
217:                        commitPendingTxns.remove(oldTxnId);
218:                    }
219:                }
220:                for (Iterator j = newGrouping.getObjects().keySet().iterator(); j
221:                        .hasNext();) {
222:                    checkedOutObjects.put(j.next(), newGrouping);
223:                }
224:                for (Iterator j = newGrouping.getApplyPendingTxnsIterator(); j
225:                        .hasNext();) {
226:                    ServerTransactionID oldTxnId = (ServerTransactionID) j
227:                            .next();
228:                    if (applyPendingTxns.containsKey(oldTxnId)) {
229:                        applyPendingTxns.put(oldTxnId, newGrouping);
230:                    }
231:                }
232:                long timeTaken = System.currentTimeMillis() - start;
233:                if (timeTaken > 500) {
234:                    log("Merged " + oids.size() + " object into "
235:                            + newGrouping.shortDescription() + " in "
236:                            + timeTaken + " ms");
237:                }
238:            }
239:
240:            private synchronized void addLookedupObjects(LookupContext context) {
241:                Map lookedUpObjects = context.getLookedUpObjects();
242:                if (lookedUpObjects == null || lookedUpObjects.isEmpty()) {
243:                    throw new AssertionError("Lookedup object is null : "
244:                            + lookedUpObjects + " context = " + context);
245:                }
246:                TxnObjectGrouping tg = new TxnObjectGrouping(lookedUpObjects);
247:                for (Iterator i = lookedUpObjects.keySet().iterator(); i
248:                        .hasNext();) {
249:                    Object oid = i.next();
250:                    pendingObjectRequest.remove(oid);
251:                    checkedOutObjects.put(oid, tg);
252:                }
253:            }
254:
255:            private void makePending(ServerTransaction txn) {
256:                if (pendingTxnList.add(txn)) {
257:                    sequencer.makePending(txn);
258:                }
259:            }
260:
261:            private void makeUnpending(ServerTransaction txn) {
262:                if (pendingTxnList.remove(txn)) {
263:                    sequencer.makeUnpending(txn);
264:                }
265:            }
266:
267:            private boolean addProcessedPendingLookups() {
268:                LookupContext c;
269:                boolean processedPending = false;
270:                try {
271:                    while ((c = (LookupContext) processedPendingLookups.poll(0)) != null) {
272:                        addLookedupObjects(c);
273:                        processedPending = true;
274:                    }
275:                } catch (InterruptedException e) {
276:                    throw new AssertionError(e);
277:                }
278:                return processedPending;
279:            }
280:
281:            private void addProcessedPending(LookupContext context) {
282:                try {
283:                    processedPendingLookups.put(context);
284:                } catch (InterruptedException e) {
285:                    throw new AssertionError(e);
286:                }
287:                txnStageCoordinator.initiateLookup();
288:            }
289:
290:            private void processPendingTransactions() {
291:                List copy = pendingTxnList.copy();
292:                for (Iterator i = copy.iterator(); i.hasNext();) {
293:                    ServerTransaction txn = (ServerTransaction) i.next();
294:                    lookupObjectsForApplyAndAddToSink(txn, false);
295:                }
296:            }
297:
298:            // ApplyTransaction stage method
299:            public boolean applyTransactionComplete(ServerTransactionID stxnID) {
300:                try {
301:                    processedApplys.put(stxnID);
302:                } catch (InterruptedException e) {
303:                    throw new AssertionError(e);
304:                }
305:                txnStageCoordinator.initiateApplyComplete();
306:                return true;
307:            }
308:
309:            // Apply Complete stage method
310:            public void processApplyComplete() {
311:                try {
312:                    ServerTransactionID txnID;
313:                    ArrayList txnIDs = new ArrayList();
314:                    while ((txnID = (ServerTransactionID) processedApplys
315:                            .poll(0)) != null) {
316:                        txnIDs.add(txnID);
317:                    }
318:                    if (txnIDs.size() > 0) {
319:                        processApplyTxnComplete(txnIDs);
320:                    }
321:                } catch (InterruptedException e) {
322:                    throw new AssertionError(e);
323:                }
324:            }
325:
326:            private synchronized void processApplyTxnComplete(ArrayList txnIDs) {
327:                for (Iterator i = txnIDs.iterator(); i.hasNext();) {
328:                    ServerTransactionID stxnID = (ServerTransactionID) i.next();
329:                    processApplyTxnComplete(stxnID);
330:                }
331:            }
332:
333:            private void processApplyTxnComplete(ServerTransactionID stxnID) {
334:                TxnObjectGrouping grouping = (TxnObjectGrouping) applyPendingTxns
335:                        .remove(stxnID);
336:                Assert.assertNotNull(grouping);
337:                if (grouping.applyComplete(stxnID)) {
338:                    // Since verifying against all txns is costly, only the prime one (the one that created this grouping) is verfied
339:                    // against
340:                    ServerTransactionID pTxnID = grouping
341:                            .getServerTransactionID();
342:                    Assert.assertNull(applyPendingTxns.get(pTxnID));
343:                    Object old = commitPendingTxns.put(pTxnID, grouping);
344:                    Assert.assertNull(old);
345:                    txnStageCoordinator.initiateCommit();
346:                }
347:            }
348:
349:            // Commit Transaction stage method
350:            public synchronized void commitTransactionsComplete(
351:                    CommitTransactionContext ctc) {
352:
353:                if (commitPendingTxns.isEmpty())
354:                    return;
355:
356:                Map newRoots = new HashMap();
357:                Map objects = new HashMap();
358:                Collection txnIDs = new ArrayList();
359:                for (Iterator i = commitPendingTxns.values().iterator(); i
360:                        .hasNext();) {
361:                    TxnObjectGrouping tog = (TxnObjectGrouping) i.next();
362:                    newRoots.putAll(tog.getNewRoots());
363:                    txnIDs.addAll(tog.getTxnIDs());
364:                    objects.putAll(tog.getObjects());
365:                    i.remove();
366:                    if (objects.size() > MAX_COMMIT_SIZE) {
367:                        break;
368:                    }
369:                }
370:
371:                ctc.initialize(txnIDs, objects.values(), newRoots);
372:
373:                for (Iterator j = objects.keySet().iterator(); j.hasNext();) {
374:                    Object old = checkedOutObjects.remove(j.next());
375:                    Assert.assertNotNull(old);
376:                }
377:
378:                if (!commitPendingTxns.isEmpty()) {
379:                    // More commits needed
380:                    txnStageCoordinator.initiateCommit();
381:                }
382:            }
383:
384:            // recall from ObjectManager on GC start
385:            public void recallAllCheckedoutObject() {
386:                txnStageCoordinator.initiateRecallAll();
387:            }
388:
389:            // Recall Stage method
390:            public synchronized void recallCheckedoutObject(
391:                    RecallObjectsContext roc) {
392:                processPendingIfNecessary();
393:                if (roc.recallAll()) {
394:                    IdentityHashMap recalled = new IdentityHashMap();
395:                    HashMap recalledObjects = new HashMap();
396:                    for (Iterator i = checkedOutObjects.entrySet().iterator(); i
397:                            .hasNext();) {
398:                        Entry e = (Entry) i.next();
399:                        TxnObjectGrouping tog = (TxnObjectGrouping) e
400:                                .getValue();
401:                        if (tog.getServerTransactionID().isNull()) {
402:                            i.remove();
403:                            if (!recalled.containsKey(tog)) {
404:                                recalled.put(tog, null);
405:                                recalledObjects.putAll(tog.getObjects());
406:                            }
407:                        }
408:                    }
409:                    if (!recalledObjects.isEmpty()) {
410:                        logger.info("Recalling " + recalledObjects.size()
411:                                + " Objects to ObjectManager");
412:                        objectManager.releaseAll(recalledObjects.values());
413:                    }
414:                }
415:            }
416:
417:            public void dump() {
418:                PrintWriter pw = new PrintWriter(System.err);
419:                new PrettyPrinter(pw).visit(this );
420:                pw.flush();
421:            }
422:
423:            public synchronized PrettyPrinter prettyPrint(PrettyPrinter out) {
424:                out.println(getClass().getName());
425:                out.indent().print("checkedOutObjects: ").visit(
426:                        checkedOutObjects).println();
427:                out.indent().print("applyPendingTxns: ")
428:                        .visit(applyPendingTxns).println();
429:                out.indent().print("commitPendingTxns: ").visit(
430:                        commitPendingTxns).println();
431:                out.indent().print("pendingTxnList: ").visit(pendingTxnList)
432:                        .println();
433:                out.indent().print("pendingObjectRequest: ").visit(
434:                        pendingObjectRequest).println();
435:                return out;
436:            }
437:
438:            private class LookupContext implements  ObjectManagerResultsContext {
439:
440:                private boolean pending = false;
441:                private boolean resultsSet = false;
442:                private Map lookedUpObjects;
443:                private final Set oids;
444:                private final Set newOids;
445:                private final ServerTransactionID transactionID;
446:
447:                public LookupContext(Set oids, Set newOids,
448:                        ServerTransactionID transactionID) {
449:                    this .oids = oids;
450:                    this .newOids = newOids;
451:                    this .transactionID = transactionID;
452:                }
453:
454:                public synchronized void makePending() {
455:                    pending = true;
456:                    if (resultsSet) {
457:                        TransactionalObjectManagerImpl.this 
458:                                .addProcessedPending(this );
459:                    }
460:                }
461:
462:                public synchronized void setResults(
463:                        ObjectManagerLookupResults results) {
464:                    lookedUpObjects = results.getObjects();
465:                    resultsSet = true;
466:                    if (pending) {
467:                        TransactionalObjectManagerImpl.this 
468:                                .addProcessedPending(this );
469:                    }
470:                }
471:
472:                public synchronized Map getLookedUpObjects() {
473:                    return lookedUpObjects;
474:                }
475:
476:                public String toString() {
477:                    return "LookupContext [ txnID = "
478:                            + transactionID
479:                            + ", oids = "
480:                            + oids
481:                            + "] = { pending = "
482:                            + pending
483:                            + ", lookedupObjects = "
484:                            + (lookedUpObjects == null ? "null"
485:                                    : lookedUpObjects.keySet().toString())
486:                            + "}";
487:                }
488:
489:                public Set getLookupIDs() {
490:                    return oids;
491:                }
492:
493:                public Set getNewObjectIDs() {
494:                    return newOids;
495:                }
496:
497:                public void missingObject(ObjectID oid) {
498:                    throw new AssertionError(
499:                            "Lookup for non-exisistent Object : " + oid
500:                                    + " lookup context is : " + this );
501:                }
502:
503:            }
504:
505:            private static final class PendingList {
506:                LinkedHashMap pending = new LinkedHashMap();
507:
508:                public boolean add(ServerTransaction txn) {
509:                    ServerTransactionID sTxID = txn.getServerTransactionID();
510:                    // Doing two lookups to avoid reordering
511:                    if (pending.containsKey(sTxID)) {
512:                        return false;
513:                    } else {
514:                        pending.put(sTxID, txn);
515:                        return true;
516:                    }
517:                }
518:
519:                public List copy() {
520:                    return new ArrayList(pending.values());
521:                }
522:
523:                public boolean remove(ServerTransaction txn) {
524:                    return (pending.remove(txn.getServerTransactionID()) != null);
525:                }
526:
527:                public String toString() {
528:                    return "PendingList : pending Txns = " + pending;
529:                }
530:
531:                public int size() {
532:                    return pending.size();
533:                }
534:            }
535:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.