Source Code Cross Referenced for ReplicatedObjectManagerImpl.java in  » Net » Terracotta » com » tc » l2 » objectserver » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Net » Terracotta » com.tc.l2.objectserver 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        /*
002:         * All content copyright (c) 2003-2007 Terracotta, Inc., except as may otherwise be noted in a separate copyright
003:         * notice. All rights reserved.
004:         */
005:        package com.tc.l2.objectserver;
006:
007:        import com.tc.async.api.Sink;
008:        import com.tc.l2.context.SyncObjectsRequest;
009:        import com.tc.l2.ha.L2HAZapNodeRequestProcessor;
010:        import com.tc.l2.msg.GCResultMessage;
011:        import com.tc.l2.msg.GCResultMessageFactory;
012:        import com.tc.l2.msg.ObjectListSyncMessage;
013:        import com.tc.l2.msg.ObjectListSyncMessageFactory;
014:        import com.tc.l2.msg.ObjectSyncCompleteMessage;
015:        import com.tc.l2.msg.ObjectSyncCompleteMessageFactory;
016:        import com.tc.l2.state.StateManager;
017:        import com.tc.logging.TCLogger;
018:        import com.tc.logging.TCLogging;
019:        import com.tc.net.groups.GroupException;
020:        import com.tc.net.groups.GroupManager;
021:        import com.tc.net.groups.GroupMessage;
022:        import com.tc.net.groups.GroupMessageListener;
023:        import com.tc.net.groups.GroupResponse;
024:        import com.tc.net.groups.NodeID;
025:        import com.tc.objectserver.api.GCStats;
026:        import com.tc.objectserver.api.ObjectManager;
027:        import com.tc.objectserver.api.ObjectManagerEventListener;
028:        import com.tc.objectserver.tx.ServerTransactionManager;
029:        import com.tc.objectserver.tx.TxnsInSystemCompletionLister;
030:        import com.tc.util.Assert;
031:        import com.tc.util.sequence.SequenceGenerator;
032:        import com.tc.util.sequence.SequenceGenerator.SequenceGeneratorException;
033:
034:        import java.util.HashMap;
035:        import java.util.Iterator;
036:        import java.util.LinkedHashMap;
037:        import java.util.Map;
038:        import java.util.Set;
039:        import java.util.Map.Entry;
040:
041:        public class ReplicatedObjectManagerImpl implements 
042:                ReplicatedObjectManager, GroupMessageListener,
043:                L2ObjectStateListener {
044:
045:            private static final TCLogger logger = TCLogging
046:                    .getLogger(ReplicatedObjectManagerImpl.class);
047:
048:            private final ObjectManager objectManager;
049:            private final GroupManager groupManager;
050:            private final StateManager stateManager;
051:            private final L2ObjectStateManager l2ObjectStateManager;
052:            private final ReplicatedTransactionManager rTxnManager;
053:            private final ServerTransactionManager transactionManager;
054:            private final Sink objectsSyncRequestSink;
055:            private final SequenceGenerator sequenceGenerator;
056:            private final GCMonitor gcMonitor;
057:
058:            public ReplicatedObjectManagerImpl(GroupManager groupManager,
059:                    StateManager stateManager,
060:                    L2ObjectStateManager l2ObjectStateManager,
061:                    ReplicatedTransactionManager txnManager,
062:                    ObjectManager objectManager,
063:                    ServerTransactionManager transactionManager,
064:                    Sink objectsSyncRequestSink,
065:                    SequenceGenerator sequenceGenerator) {
066:                this .groupManager = groupManager;
067:                this .stateManager = stateManager;
068:                this .rTxnManager = txnManager;
069:                this .objectManager = objectManager;
070:                this .transactionManager = transactionManager;
071:                this .objectsSyncRequestSink = objectsSyncRequestSink;
072:                this .l2ObjectStateManager = l2ObjectStateManager;
073:                this .sequenceGenerator = sequenceGenerator;
074:                this .gcMonitor = new GCMonitor();
075:                this .objectManager.getGarbageCollector().addListener(gcMonitor);
076:                l2ObjectStateManager.registerForL2ObjectStateChangeEvents(this );
077:                this .groupManager.registerForMessages(
078:                        ObjectListSyncMessage.class, this );
079:            }
080:
081:            /**
082:             * This method is used to sync up all ObjectIDs from the remote ObjectManagers. It is synchronous and after when it
083:             * returns nobody is allowed to join the cluster with exisiting objects.
084:             */
085:            public void sync() {
086:                try {
087:                    GroupResponse gr = groupManager
088:                            .sendAllAndWaitForResponse(ObjectListSyncMessageFactory
089:                                    .createObjectListSyncRequestMessage());
090:                    Map nodeID2ObjectIDs = new LinkedHashMap();
091:                    for (Iterator i = gr.getResponses().iterator(); i.hasNext();) {
092:                        ObjectListSyncMessage msg = (ObjectListSyncMessage) i
093:                                .next();
094:                        if (msg.getType() == ObjectListSyncMessage.RESPONSE) {
095:                            nodeID2ObjectIDs.put(msg.messageFrom(), msg
096:                                    .getObjectIDs());
097:                        } else {
098:                            logger
099:                                    .error("Received wrong response for ObjectListSyncMessage Request  from "
100:                                            + msg.messageFrom()
101:                                            + " : msg : "
102:                                            + msg);
103:                            groupManager
104:                                    .zapNode(
105:                                            msg.messageFrom(),
106:                                            L2HAZapNodeRequestProcessor.PROGRAM_ERROR,
107:                                            "Recd wrong response from : "
108:                                                    + msg.messageFrom()
109:                                                    + " for ObjectListSyncMessage Request"
110:                                                    + L2HAZapNodeRequestProcessor
111:                                                            .getErrorString(new Throwable()));
112:                        }
113:                    }
114:                    if (!nodeID2ObjectIDs.isEmpty()) {
115:                        gcMonitor
116:                                .disableAndAdd2L2StateManager(nodeID2ObjectIDs);
117:                    }
118:                } catch (GroupException e) {
119:                    logger.error(e);
120:                    throw new AssertionError(e);
121:                }
122:            }
123:
124:            // Query current state of the other L2
125:            public void query(NodeID nodeID) throws GroupException {
126:                groupManager.sendTo(nodeID, ObjectListSyncMessageFactory
127:                        .createObjectListSyncRequestMessage());
128:            }
129:
130:            public void clear(NodeID nodeID) {
131:                l2ObjectStateManager.removeL2(nodeID);
132:                gcMonitor.clear(nodeID);
133:            }
134:
135:            public void messageReceived(NodeID fromNode, GroupMessage msg) {
136:                if (msg instanceof  ObjectListSyncMessage) {
137:                    ObjectListSyncMessage clusterMsg = (ObjectListSyncMessage) msg;
138:                    handleClusterObjectMessage(fromNode, clusterMsg);
139:                } else {
140:                    throw new AssertionError(
141:                            "ReplicatedObjectManagerImpl : Received wrong message type :"
142:                                    + msg.getClass().getName() + " : " + msg);
143:
144:                }
145:            }
146:
147:            public void handleGCResult(GCResultMessage gcMsg) {
148:                Set gcedOids = gcMsg.getGCedObjectIDs();
149:                if (stateManager.isActiveCoordinator()) {
150:                    logger.warn("Received GC Result from "
151:                            + gcMsg.messageFrom()
152:                            + " While this node is ACTIVE. Ignoring result : "
153:                            + gcedOids.size());
154:                    return;
155:                }
156:                objectManager.notifyGCComplete(gcedOids);
157:                logger
158:                        .info("Removed "
159:                                + gcedOids.size()
160:                                + "objects from passive ObjectManager from last GC from Active");
161:            }
162:
163:            private void handleClusterObjectMessage(NodeID nodeID,
164:                    ObjectListSyncMessage clusterMsg) {
165:                try {
166:                    switch (clusterMsg.getType()) {
167:                    case ObjectListSyncMessage.REQUEST:
168:                        handleObjectListRequest(nodeID, clusterMsg);
169:                        break;
170:                    case ObjectListSyncMessage.RESPONSE:
171:                        handleObjectListResponse(nodeID, clusterMsg);
172:                        break;
173:
174:                    default:
175:                        throw new AssertionError(
176:                                "This message shouldn't have been routed here : "
177:                                        + clusterMsg);
178:                    }
179:                } catch (GroupException e) {
180:                    logger.error("Error handling message : " + clusterMsg, e);
181:                    throw new AssertionError(e);
182:                }
183:            }
184:
185:            private void handleObjectListResponse(NodeID nodeID,
186:                    ObjectListSyncMessage clusterMsg) {
187:                Assert.assertTrue(stateManager.isActiveCoordinator());
188:                Set oids = clusterMsg.getObjectIDs();
189:                if (!oids.isEmpty()) {
190:                    String error = "Nodes joining the cluster after startup shouldnt have any Objects. "
191:                            + nodeID
192:                            + " contains "
193:                            + oids.size()
194:                            + " Objects !!!";
195:                    logger.error(error + " Forcing node to Quit !!");
196:                    groupManager
197:                            .zapNode(
198:                                    nodeID,
199:                                    L2HAZapNodeRequestProcessor.NODE_JOINED_WITH_DIRTY_DB,
200:                                    error
201:                                            + L2HAZapNodeRequestProcessor
202:                                                    .getErrorString(new Throwable()));
203:                } else {
204:                    gcMonitor.add2L2StateManagerWhenGCDisabled(nodeID, oids);
205:                }
206:            }
207:
208:            private boolean add2L2StateManager(NodeID nodeID, Set oids) {
209:                return l2ObjectStateManager.addL2(nodeID, oids);
210:            }
211:
212:            public void missingObjectsFor(NodeID nodeID, int missingObjects) {
213:                if (missingObjects == 0) {
214:                    stateManager.moveNodeToPassiveStandby(nodeID);
215:                    gcMonitor.syncCompleteFor(nodeID);
216:                } else {
217:                    objectsSyncRequestSink.add(new SyncObjectsRequest(nodeID));
218:                }
219:            }
220:
221:            public void objectSyncCompleteFor(NodeID nodeID) {
222:                try {
223:                    gcMonitor.syncCompleteFor(nodeID);
224:                    ObjectSyncCompleteMessage msg = ObjectSyncCompleteMessageFactory
225:                            .createObjectSyncCompleteMessageFor(nodeID,
226:                                    sequenceGenerator.getNextSequence(nodeID));
227:                    groupManager.sendTo(nodeID, msg);
228:                } catch (GroupException e) {
229:                    logger.error(
230:                            "Error Sending Object Sync complete message  to : "
231:                                    + nodeID, e);
232:                    groupManager.zapNode(nodeID,
233:                            L2HAZapNodeRequestProcessor.COMMUNICATION_ERROR,
234:                            "Error sending Object Sync complete message "
235:                                    + L2HAZapNodeRequestProcessor
236:                                            .getErrorString(e));
237:                } catch (SequenceGeneratorException e) {
238:                    logger.error(
239:                            "Error Sending Object Sync complete message  to : "
240:                                    + nodeID, e);
241:                }
242:            }
243:
244:            /**
245:             * ACTIVE queries PASSIVES for the list of known object ids and this response is the one that opens up the
246:             * transactions from ACTIVE to PASSIVE. So the replicated transaction manager is initialized here.
247:             */
248:            private void handleObjectListRequest(NodeID nodeID,
249:                    ObjectListSyncMessage clusterMsg) throws GroupException {
250:                if (!stateManager.isActiveCoordinator()) {
251:                    Set knownIDs = objectManager.getAllObjectIDs();
252:                    rTxnManager.init(knownIDs);
253:                    logger
254:                            .info("Send response to Active's query : known id lists = "
255:                                    + knownIDs.size());
256:                    groupManager.sendTo(nodeID, ObjectListSyncMessageFactory
257:                            .createObjectListSyncResponseMessage(clusterMsg,
258:                                    knownIDs));
259:                } else {
260:                    logger
261:                            .error("Recd. ObjectListRequest when in ACTIVE state from "
262:                                    + nodeID + ". Zapping node ...");
263:                    groupManager
264:                            .sendTo(
265:                                    nodeID,
266:                                    ObjectListSyncMessageFactory
267:                                            .createObjectListSyncFailedResponseMessage(clusterMsg));
268:                    // Now ZAP the node
269:                    groupManager.zapNode(nodeID,
270:                            L2HAZapNodeRequestProcessor.SPLIT_BRAIN,
271:                            "Recd ObjectListRequest from : "
272:                                    + nodeID
273:                                    + " while in ACTIVE-COORDINATOR state"
274:                                    + L2HAZapNodeRequestProcessor
275:                                            .getErrorString(new Throwable()));
276:                }
277:            }
278:
279:            public boolean relayTransactions() {
280:                return l2ObjectStateManager.getL2Count() > 0;
281:            }
282:
283:            private static final Object ADDED = new Object();
284:
285:            private final class GCMonitor implements  ObjectManagerEventListener {
286:
287:                boolean disabled = false;
288:                Map syncingPassives = new HashMap();
289:
290:                public void garbageCollectionComplete(GCStats stats, Set deleted) {
291:                    Map toAdd = null;
292:                    notifyGCResultToPassives(deleted);
293:                    synchronized (this ) {
294:                        if (syncingPassives.isEmpty())
295:                            return;
296:                        toAdd = new LinkedHashMap();
297:                        for (Iterator i = syncingPassives.entrySet().iterator(); i
298:                                .hasNext();) {
299:                            Entry e = (Entry) i.next();
300:                            if (e.getValue() != ADDED) {
301:                                NodeID nodeID = (NodeID) e.getKey();
302:                                logger
303:                                        .info("GC Completed : Starting scheduled passive sync for "
304:                                                + nodeID);
305:                                disableGCIfNecessary();
306:                                // Shouldn't happen as this is in GC call back after GC completion
307:                                assertGCDisabled();
308:                                toAdd.put(nodeID, e.getValue());
309:                                e.setValue(ADDED);
310:                            }
311:                        }
312:                    }
313:                    add2L2StateManager(toAdd);
314:                }
315:
316:                private void notifyGCResultToPassives(Set deleted) {
317:                    if (deleted.isEmpty())
318:                        return;
319:                    final GCResultMessage msg = GCResultMessageFactory
320:                            .createGCResultMessage(deleted);
321:                    transactionManager
322:                            .callBackOnTxnsInSystemCompletion(new TxnsInSystemCompletionLister() {
323:                                public void onCompletion() {
324:                                    try {
325:                                        groupManager.sendAll(msg);
326:                                    } catch (GroupException e) {
327:                                        logger.error(
328:                                                "Error sending gc results : ",
329:                                                e);
330:                                    }
331:                                }
332:                            });
333:                }
334:
335:                private void add2L2StateManager(Map toAdd) {
336:                    for (Iterator i = toAdd.entrySet().iterator(); i.hasNext();) {
337:                        Entry e = (Entry) i.next();
338:                        NodeID nodeID = (NodeID) e.getKey();
339:                        if (!ReplicatedObjectManagerImpl.this 
340:                                .add2L2StateManager(nodeID, (Set) e.getValue())) {
341:                            logger
342:                                    .warn(nodeID
343:                                            + " is already added to L2StateManager, clearing our internal data structures.");
344:                            syncCompleteFor(nodeID);
345:                        }
346:                    }
347:                }
348:
349:                private void disableGCIfNecessary() {
350:                    if (!disabled) {
351:                        disabled = objectManager.getGarbageCollector()
352:                                .disableGC();
353:                    }
354:                }
355:
356:                private void assertGCDisabled() {
357:                    if (!disabled) {
358:                        throw new AssertionError("Cant disable GC");
359:                    }
360:                }
361:
362:                public void add2L2StateManagerWhenGCDisabled(NodeID nodeID,
363:                        Set oids) {
364:                    boolean toAdd = false;
365:                    synchronized (this ) {
366:                        disableGCIfNecessary();
367:                        if (syncingPassives.containsKey(nodeID)) {
368:                            logger
369:                                    .warn("Not adding "
370:                                            + nodeID
371:                                            + " since it is already present in syncingPassives : "
372:                                            + syncingPassives.keySet());
373:                            return;
374:                        }
375:                        if (disabled) {
376:                            syncingPassives.put(nodeID, ADDED);
377:                            toAdd = true;
378:                        } else {
379:                            logger
380:                                    .info("Couldnt disable GC, probably because GC is currently running. So scheduling passive sync up for later after GC completion");
381:                            syncingPassives.put(nodeID, oids);
382:                        }
383:                    }
384:                    if (toAdd) {
385:                        if (!ReplicatedObjectManagerImpl.this 
386:                                .add2L2StateManager(nodeID, oids)) {
387:                            logger
388:                                    .warn(nodeID
389:                                            + " is already added to L2StateManager, clearing our internal data structures.");
390:                            syncCompleteFor(nodeID);
391:                        }
392:                    }
393:                }
394:
395:                public synchronized void clear(NodeID nodeID) {
396:                    Object val = syncingPassives.remove(nodeID);
397:                    if (val != null) {
398:                        enableGCIfNecessary();
399:                    }
400:                }
401:
402:                private void enableGCIfNecessary() {
403:                    if (syncingPassives.isEmpty() && disabled) {
404:                        logger
405:                                .info("Reenabling GC as all passive are synced up");
406:                        objectManager.getGarbageCollector().enableGC();
407:                        disabled = false;
408:                    }
409:                }
410:
411:                public synchronized void syncCompleteFor(NodeID nodeID) {
412:                    Object val = syncingPassives.remove(nodeID);
413:                    // val could be null if the node disconnects before fully synching up.
414:                    Assert.assertTrue(val == ADDED || val == null);
415:                    if (val != null) {
416:                        Assert.assertTrue(disabled);
417:                        enableGCIfNecessary();
418:                    }
419:                }
420:
421:                public synchronized void disableAndAdd2L2StateManager(
422:                        Map nodeID2ObjectIDs) {
423:                    synchronized (this ) {
424:                        if (nodeID2ObjectIDs.size() > 0 && !disabled) {
425:                            logger.info("Disabling GC since "
426:                                    + nodeID2ObjectIDs.size() + " passives ["
427:                                    + nodeID2ObjectIDs.keySet()
428:                                    + "] needs to sync up");
429:                            disableGCIfNecessary();
430:                            // Shouldnt happen as GC should be running yet. We havent started yet.
431:                            assertGCDisabled();
432:                        }
433:                        for (Iterator i = nodeID2ObjectIDs.entrySet()
434:                                .iterator(); i.hasNext();) {
435:                            Entry e = (Entry) i.next();
436:                            NodeID nodeID = (NodeID) e.getKey();
437:                            if (!syncingPassives.containsKey(nodeID)) {
438:                                syncingPassives.put(nodeID, ADDED);
439:                            } else {
440:                                logger
441:                                        .info("Removing "
442:                                                + e
443:                                                + " from the list to add to L2ObjectStateManager since its present in syncingPassives : "
444:                                                + syncingPassives.keySet());
445:                                i.remove();
446:                            }
447:                        }
448:                    }
449:                    add2L2StateManager(nodeID2ObjectIDs);
450:                }
451:
452:            }
453:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.