Source Code Cross Referenced for RemoteTransactionManagerImpl.java in  » Net » Terracotta » com » tc » object » tx » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Net » Terracotta » com.tc.object.tx 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        /*
002:         * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003:         * notice. All rights reserved.
004:         */
005:        package com.tc.object.tx;
006:
007:        import com.tc.logging.TCLogger;
008:        import com.tc.object.lockmanager.api.LockFlushCallback;
009:        import com.tc.object.lockmanager.api.LockID;
010:        import com.tc.object.msg.CompletedTransactionLowWaterMarkMessage;
011:        import com.tc.object.net.DSOClientMessageChannel;
012:        import com.tc.object.session.SessionID;
013:        import com.tc.object.session.SessionManager;
014:        import com.tc.properties.TCPropertiesImpl;
015:        import com.tc.util.Assert;
016:        import com.tc.util.SequenceID;
017:        import com.tc.util.State;
018:        import com.tc.util.TCAssertionError;
019:        import com.tc.util.TCTimerImpl;
020:        import com.tc.util.Util;
021:
022:        import java.util.ArrayList;
023:        import java.util.Arrays;
024:        import java.util.Collection;
025:        import java.util.Collections;
026:        import java.util.HashMap;
027:        import java.util.HashSet;
028:        import java.util.Iterator;
029:        import java.util.LinkedHashSet;
030:        import java.util.List;
031:        import java.util.Map;
032:        import java.util.Set;
033:        import java.util.Timer;
034:        import java.util.TimerTask;
035:        import java.util.Map.Entry;
036:
037:        /**
038:         * Sends off committed transactions
039:         * 
040:         * @author steve
041:         */
042:        public class RemoteTransactionManagerImpl implements 
043:                RemoteTransactionManager {
044:
045:            private static final long TIMEOUT = 30000L;
046:
047:            private static final int MAX_OUTSTANDING_BATCHES = TCPropertiesImpl
048:                    .getProperties().getInt(
049:                            "l1.transactionmanager.maxOutstandingBatchSize");
050:            private static final long COMPLETED_ACK_FLUSH_TIMEOUT = TCPropertiesImpl
051:                    .getProperties().getLong(
052:                            "l1.transactionmanager.completedAckFlushTimeout");
053:
054:            private static final State STARTING = new State("STARTING");
055:            private static final State RUNNING = new State("RUNNING");
056:            private static final State PAUSED = new State("PAUSED");
057:            private static final State STOP_INITIATED = new State(
058:                    "STOP-INITIATED");
059:            private static final State STOPPED = new State("STOPPED");
060:
061:            private final Object lock = new Object();
062:            private final Map incompleteBatches = new HashMap();
063:            private final HashMap lockFlushCallbacks = new HashMap();
064:
065:            private int outStandingBatches = 0;
066:            private final TCLogger logger;
067:            private final TransactionBatchAccounting batchAccounting;
068:            private final LockAccounting lockAccounting;
069:
070:            private State status;
071:            private final SessionManager sessionManager;
072:            private final TransactionSequencer sequencer;
073:            private final DSOClientMessageChannel channel;
074:            private final Timer timer = new TCTimerImpl(
075:                    "RemoteTransactionManager Flusher", true);
076:
077:            public RemoteTransactionManagerImpl(TCLogger logger,
078:                    final TransactionBatchFactory batchFactory,
079:                    TransactionBatchAccounting batchAccounting,
080:                    LockAccounting lockAccounting,
081:                    SessionManager sessionManager,
082:                    DSOClientMessageChannel channel) {
083:                this .logger = logger;
084:                this .batchAccounting = batchAccounting;
085:                this .lockAccounting = lockAccounting;
086:                this .sessionManager = sessionManager;
087:                this .channel = channel;
088:                this .status = RUNNING;
089:                this .sequencer = new TransactionSequencer(batchFactory);
090:                this .timer.schedule(new RemoteTransactionManagerTimerTask(),
091:                        COMPLETED_ACK_FLUSH_TIMEOUT,
092:                        COMPLETED_ACK_FLUSH_TIMEOUT);
093:            }
094:
095:            public void pause() {
096:                synchronized (lock) {
097:                    if (isStoppingOrStopped())
098:                        return;
099:                    if (this .status == PAUSED)
100:                        throw new AssertionError(
101:                                "Attempt to pause while already paused.");
102:                    this .status = PAUSED;
103:                }
104:            }
105:
106:            public void starting() {
107:                synchronized (lock) {
108:                    if (isStoppingOrStopped())
109:                        return;
110:                    if (this .status != PAUSED)
111:                        throw new AssertionError(
112:                                "Attempt to start while not paused.");
113:                    this .status = STARTING;
114:                }
115:            }
116:
117:            public void unpause() {
118:                synchronized (lock) {
119:                    if (isStoppingOrStopped())
120:                        return;
121:                    if (this .status != STARTING)
122:                        throw new AssertionError(
123:                                "Attempt to unpause while not in starting.");
124:                    this .status = RUNNING;
125:                    lock.notifyAll();
126:                }
127:            }
128:
129:            /**
130:             * This is for testing only.
131:             */
132:            public void clear() {
133:                synchronized (lock) {
134:                    sequencer.clear();
135:                    incompleteBatches.clear();
136:                }
137:            }
138:
139:            /**
140:             * This is for testing only.
141:             */
142:            public int getMaxOutStandingBatches() {
143:                return MAX_OUTSTANDING_BATCHES;
144:            }
145:
146:            public void stopProcessing() {
147:                sequencer.shutdown();
148:                channel.close();
149:            }
150:
151:            public void stop() {
152:                final long start = System.currentTimeMillis();
153:                logger.debug("stop() is called on "
154:                        + System.identityHashCode(this ));
155:                synchronized (lock) {
156:                    this .status = STOP_INITIATED;
157:
158:                    sendBatches(true, "stop()");
159:
160:                    int count = 10;
161:                    long t0 = System.currentTimeMillis();
162:                    if (incompleteBatches.size() != 0) {
163:                        try {
164:                            int incompleteBatchesCount = 0;
165:                            while (status != STOPPED
166:                                    && (t0 + TIMEOUT * count) > System
167:                                            .currentTimeMillis()) {
168:                                if (incompleteBatchesCount != incompleteBatches
169:                                        .size()) {
170:                                    logger
171:                                            .debug("stop(): incompleteBatches.size() = "
172:                                                    + (incompleteBatchesCount = incompleteBatches
173:                                                            .size()));
174:                                }
175:                                lock.wait(TIMEOUT);
176:                            }
177:                        } catch (InterruptedException e) {
178:                            logger.warn("stop(): Interrupted " + e);
179:                        }
180:                        if (status != STOPPED) {
181:                            logger
182:                                    .error("stop() : There are still UNACKed Transactions! incompleteBatches.size() = "
183:                                            + incompleteBatches.size());
184:                        }
185:                    }
186:                    this .status = STOPPED;
187:                }
188:                logger.info("stop(): took "
189:                        + (System.currentTimeMillis() - start)
190:                        + " millis to complete");
191:            }
192:
193:            public void flush(LockID lockID) {
194:                boolean isInterrupted = false;
195:                Collection c;
196:                synchronized (lock) {
197:                    while ((!(c = lockAccounting.getTransactionsFor(lockID))
198:                            .isEmpty())) {
199:                        try {
200:                            long waitTime = 15 * 1000;
201:                            long t0 = System.currentTimeMillis();
202:                            lock.wait(waitTime);
203:                            if ((System.currentTimeMillis() - t0) > waitTime) {
204:                                logger.info("Flush for " + lockID
205:                                        + " took longer than: " + waitTime
206:                                        + "ms. # Transactions not yet Acked = "
207:                                        + c.size() + "\n");
208:                            }
209:                        } catch (InterruptedException e) {
210:                            isInterrupted = true;
211:                        }
212:                    }
213:                }
214:                Util.selfInterruptIfNeeded(isInterrupted);
215:            }
216:
217:            /* This does not block unlike flush() */
218:            public boolean isTransactionsForLockFlushed(LockID lockID,
219:                    LockFlushCallback callback) {
220:                synchronized (lock) {
221:
222:                    if ((lockAccounting.getTransactionsFor(lockID)).isEmpty()) {
223:                        // All transactions are flushed !
224:                        return true;
225:                    } else {
226:                        // register for call back
227:                        Object prev = lockFlushCallbacks.put(lockID, callback);
228:                        if (prev != null) {
229:                            // Will this scenario comeup in server restart scenario ? It should as we check for greediness in the Lock
230:                            // Manager before making this call
231:                            throw new TCAssertionError(
232:                                    "There is already a registered call back on Lock Flush for this lock ID - "
233:                                            + lockID);
234:                        }
235:                        return false;
236:                    }
237:                }
238:            }
239:
240:            public void commit(ClientTransaction txn) {
241:
242:                if (!txn.hasChangesOrNotifies())
243:                    throw new AssertionError(
244:                            "Attempt to commit an empty transaction.");
245:
246:                commitInternal(txn);
247:            }
248:
249:            private void commitInternal(ClientTransaction txn) {
250:                TransactionID txID = txn.getTransactionID();
251:                if (!txn.isConcurrent()) {
252:                    lockAccounting
253:                            .add(txID, Arrays.asList(txn.getAllLockIDs()));
254:                }
255:
256:                long start = System.currentTimeMillis();
257:                sequencer.addTransaction(txn);
258:                long diff = System.currentTimeMillis() - start;
259:                if (diff > 1000) {
260:                    logger
261:                            .info("WARNING ! Took more than 1000ms to add to sequencer  : "
262:                                    + diff + " ms");
263:                }
264:
265:                synchronized (lock) {
266:                    if (isStoppingOrStopped()) {
267:                        // Send now if stop is requested
268:                        sendBatches(true, "commit() : Stop initiated.");
269:                    }
270:                    waitUntilRunning();
271:                    sendBatches(false);
272:                }
273:            }
274:
275:            private void sendBatches(boolean ignoreMax) {
276:                sendBatches(ignoreMax, null);
277:            }
278:
279:            private void sendBatches(boolean ignoreMax, String message) {
280:                ClientTransactionBatch batch;
281:                while ((ignoreMax || canSendBatch())
282:                        && (batch = sequencer.getNextBatch()) != null) {
283:                    if (message != null) {
284:                        logger.debug(message + " : Sending batch containing "
285:                                + batch.numberOfTxns() + " Txns.");
286:                    }
287:                    sendBatch(batch, true);
288:                }
289:            }
290:
291:            private boolean canSendBatch() {
292:                return (outStandingBatches < MAX_OUTSTANDING_BATCHES);
293:            }
294:
295:            public void resendOutstanding() {
296:                synchronized (lock) {
297:                    if (status != STARTING && !isStoppingOrStopped()) {
298:                        // formatting
299:                        throw new AssertionError(
300:                                this 
301:                                        + ": Attempt to resend incomplete batches while not starting.  Status="
302:                                        + status);
303:                    }
304:                    logger.debug("resendOutstanding()...");
305:                    outStandingBatches = 0;
306:                    List toSend = batchAccounting
307:                            .addIncompleteBatchIDsTo(new ArrayList());
308:                    if (toSend.size() == 0) {
309:                        sendBatches(false, " resendOutstanding()");
310:                    } else {
311:                        for (Iterator i = toSend.iterator(); i.hasNext();) {
312:                            TxnBatchID id = (TxnBatchID) i.next();
313:                            ClientTransactionBatch batch = (ClientTransactionBatch) incompleteBatches
314:                                    .get(id);
315:                            if (batch == null)
316:                                throw new AssertionError("Unknown batch: " + id);
317:                            logger
318:                                    .debug("Resending outstanding batch: "
319:                                            + id
320:                                            + ", "
321:                                            + batch
322:                                                    .addTransactionIDsTo(new LinkedHashSet()));
323:                            sendBatch(batch, false);
324:                        }
325:                    }
326:                }
327:            }
328:
329:            public Collection getTransactionSequenceIDs() {
330:                synchronized (lock) {
331:                    HashSet sequenceIDs = new HashSet();
332:                    if (!isStoppingOrStopped() && (status != STARTING)) {
333:                        throw new AssertionError(
334:                                "Attempt to get current transaction sequence while not starting: "
335:                                        + status);
336:                    } else {
337:                        // Add list of SequenceIDs that are going to be resent
338:                        List toSend = batchAccounting
339:                                .addIncompleteBatchIDsTo(new ArrayList());
340:                        for (Iterator i = toSend.iterator(); i.hasNext();) {
341:                            TxnBatchID id = (TxnBatchID) i.next();
342:                            ClientTransactionBatch batch = (ClientTransactionBatch) incompleteBatches
343:                                    .get(id);
344:                            if (batch == null)
345:                                throw new AssertionError("Unknown batch: " + id);
346:                            batch.addTransactionSequenceIDsTo(sequenceIDs);
347:                        }
348:                        // Add Last next
349:                        SequenceID currentBatchMinSeq = sequencer
350:                                .getNextSequenceID();
351:                        Assert.assertFalse(SequenceID.NULL_ID
352:                                .equals(currentBatchMinSeq));
353:                        sequenceIDs.add(currentBatchMinSeq);
354:                    }
355:                    return sequenceIDs;
356:                }
357:            }
358:
359:            public Collection getResentTransactionIDs() {
360:                synchronized (lock) {
361:                    HashSet txIDs = new HashSet();
362:                    if (!isStoppingOrStopped() && (status != STARTING)) {
363:                        throw new AssertionError(
364:                                "Attempt to get resent transaction IDs while not starting: "
365:                                        + status);
366:                    } else {
367:                        // Add list of TransactionIDs that are going to be resent
368:                        List toSend = batchAccounting
369:                                .addIncompleteBatchIDsTo(new ArrayList());
370:                        for (Iterator i = toSend.iterator(); i.hasNext();) {
371:                            TxnBatchID id = (TxnBatchID) i.next();
372:                            ClientTransactionBatch batch = (ClientTransactionBatch) incompleteBatches
373:                                    .get(id);
374:                            if (batch == null)
375:                                throw new AssertionError("Unknown batch: " + id);
376:                            batch.addTransactionIDsTo(txIDs);
377:                        }
378:                    }
379:                    return txIDs;
380:                }
381:            }
382:
383:            private boolean isStoppingOrStopped() {
384:                return status == STOP_INITIATED || status == STOPPED;
385:            }
386:
387:            private void sendBatch(ClientTransactionBatch batchToSend,
388:                    boolean account) {
389:                synchronized (lock) {
390:                    if (account) {
391:                        if (incompleteBatches.put(batchToSend
392:                                .getTransactionBatchID(), batchToSend) != null) {
393:                            // formatting
394:                            throw new AssertionError(
395:                                    "Batch has already been sent!");
396:                        }
397:                        Collection txnIds = batchToSend
398:                                .addTransactionIDsTo(new HashSet());
399:                        batchAccounting.addBatch(batchToSend
400:                                .getTransactionBatchID(), txnIds);
401:                    }
402:                    batchToSend.send();
403:                    outStandingBatches++;
404:                }
405:            }
406:
407:            // XXX:: Currently server always sends NULL BatchID
408:            public void receivedBatchAcknowledgement(TxnBatchID txnBatchID) {
409:                synchronized (lock) {
410:                    if (status == STOP_INITIATED) {
411:                        logger.warn(status + " : Received ACK for batch = "
412:                                + txnBatchID);
413:                        lock.notifyAll();
414:                        return;
415:                    }
416:
417:                    waitUntilRunning();
418:                    outStandingBatches--;
419:                    sendBatches(false);
420:                    lock.notifyAll();
421:                }
422:            }
423:
424:            public void receivedAcknowledgement(SessionID sessionID,
425:                    TransactionID txID) {
426:                Map callbacks;
427:                synchronized (lock) {
428:                    // waitUntilRunning();
429:                    if (!sessionManager.isCurrentSession(sessionID)) {
430:                        logger.warn("Ignoring Transaction ACK for " + txID
431:                                + " from previous session = " + sessionID);
432:                        return;
433:                    }
434:
435:                    Set completedLocks = lockAccounting.acknowledge(txID);
436:
437:                    TxnBatchID container = batchAccounting
438:                            .getBatchByTransactionID(txID);
439:                    if (!container.isNull()) {
440:                        ClientTransactionBatch containingBatch = (ClientTransactionBatch) incompleteBatches
441:                                .get(container);
442:                        containingBatch.removeTransaction(txID);
443:                        TxnBatchID completed = batchAccounting
444:                                .acknowledge(txID);
445:                        if (!completed.isNull()) {
446:                            incompleteBatches.remove(completed);
447:                            if (status == STOP_INITIATED
448:                                    && incompleteBatches.size() == 0) {
449:                                logger
450:                                        .debug("Received ACK for the last Transaction. Moving to STOPPED state.");
451:                                status = STOPPED;
452:                            }
453:                        }
454:                    } else {
455:                        logger.fatal("No batch found for acknowledgement: "
456:                                + txID + " The batch accounting is "
457:                                + batchAccounting);
458:                        throw new AssertionError(
459:                                "No batch found for acknowledgement: " + txID);
460:                    }
461:                    lock.notifyAll();
462:                    callbacks = getLockFlushCallbacks(completedLocks);
463:                }
464:                fireLockFlushCallbacks(callbacks);
465:            }
466:
467:            private TransactionID getCompletedTransactionIDLowWaterMark() {
468:                synchronized (lock) {
469:                    waitUntilRunning();
470:                    return batchAccounting.getLowWaterMark();
471:                }
472:            }
473:
474:            /*
475:             * Never fire callbacks while holding lock
476:             */
477:            private void fireLockFlushCallbacks(Map callbacks) {
478:                if (callbacks.isEmpty())
479:                    return;
480:                for (Iterator i = callbacks.entrySet().iterator(); i.hasNext();) {
481:                    Entry e = (Entry) i.next();
482:                    LockID lid = (LockID) e.getKey();
483:                    LockFlushCallback callback = (LockFlushCallback) e
484:                            .getValue();
485:                    callback.transactionsForLockFlushed(lid);
486:                }
487:            }
488:
489:            private Map getLockFlushCallbacks(Set completedLocks) {
490:                Map callbacks = Collections.EMPTY_MAP;
491:                if (!completedLocks.isEmpty() && !lockFlushCallbacks.isEmpty()) {
492:                    for (Iterator i = completedLocks.iterator(); i.hasNext();) {
493:                        Object lid = i.next();
494:                        Object callback = lockFlushCallbacks.remove(lid);
495:                        if (callback != null) {
496:                            if (callbacks == Collections.EMPTY_MAP) {
497:                                callbacks = new HashMap();
498:                            }
499:                            callbacks.put(lid, callback);
500:                        }
501:                    }
502:                }
503:                return callbacks;
504:            }
505:
506:            private void waitUntilRunning() {
507:                boolean isInterrupted = false;
508:                while (status != RUNNING) {
509:                    try {
510:                        lock.wait();
511:                    } catch (InterruptedException e) {
512:                        isInterrupted = true;
513:                    }
514:                }
515:                Util.selfInterruptIfNeeded(isInterrupted);
516:            }
517:
518:            // This method exists so that both these (resending and unpausing) should happen in
519:            // atomically or else there exists a race condition.
520:            public void resendOutstandingAndUnpause() {
521:                synchronized (lock) {
522:                    resendOutstanding();
523:                    unpause();
524:                }
525:            }
526:
527:            private class RemoteTransactionManagerTimerTask extends TimerTask {
528:
529:                public void run() {
530:                    try {
531:                        TransactionID lwm = getCompletedTransactionIDLowWaterMark();
532:                        if (lwm.isNull())
533:                            return;
534:                        CompletedTransactionLowWaterMarkMessage ctm = channel
535:                                .getCompletedTransactionLowWaterMarkMessageFactory()
536:                                .newCompletedTransactionLowWaterMarkMessage();
537:                        ctm.initialize(lwm);
538:                        ctm.send();
539:                    } catch (Exception e) {
540:                        logger.error("Error sending Low water mark : ", e);
541:                        throw new AssertionError(e);
542:                    }
543:                }
544:            }
545:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.