Source Code Cross Referenced for NodeManager.java in  » Database-ORM » beankeeper » hu » netmind » persistence » node » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Database ORM » beankeeper » hu.netmind.persistence.node 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        /**
002:         * Copyright (C) 2006 NetMind Consulting Bt.
003:         *
004:         * This library is free software; you can redistribute it and/or
005:         * modify it under the terms of the GNU Lesser General Public
006:         * License as published by the Free Software Foundation; either
007:         * version 3 of the License, or (at your option) any later version.
008:         *
009:         * This library is distributed in the hope that it will be useful,
010:         * but WITHOUT ANY WARRANTY; without even the implied warranty of
011:         * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
012:         * Lesser General Public License for more details.
013:         *
014:         * You should have received a copy of the GNU Lesser General Public
015:         * License along with this library; if not, write to the Free Software
016:         * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
017:         */package hu.netmind.persistence.node;
018:
019:        import hu.netmind.persistence.*;
020:        import hu.netmind.persistence.parser.*;
021:        import java.util.*;
022:        import java.sql.Connection;
023:        import java.sql.PreparedStatement;
024:        import org.apache.log4j.Logger;
025:
026:        /**
027:         * This manager enables the Store to function on a peer-to-peer
028:         * fashion with other Store instances which are pointed to the same
029:         * database. The class takes care of all IP communication related
030:         * work, such as reconnecting, communication protocoll, etc. All
031:         * synchronization points must occur through this manager, which guarantees
032:         * synchronization across all other Store instances.
033:         * @author Brautigam Robert
034:         * @version Revision: $Revision$
035:         */
036:        public class NodeManager implements  ServiceProvider, Runnable {
037:            private static Logger logger = Logger.getLogger(NodeManager.class);
038:
039:            public static int NODE_TIMEOUT = 10000;
040:            public static int HEARTBEAT_INTERVAL = 6000;
041:
042:            public static final int STATE_OFFLINE = 0;
043:            public static final int STATE_UNINITIALIZED = 1;
044:            public static final int STATE_INITIALIZED = 2;
045:            public static final int STATE_WAITING = 3;
046:            public static final int STATE_CONNECTED = 4;
047:
048:            private StoreContext context;
049:            private NodeServer server;
050:            private NodeClient client;
051:            private int index;
052:            private int heartbeat = 1;
053:            private String ips;
054:            private List nodeList;
055:            private ServiceProvider provider;
056:
057:            private boolean running = true;
058:            private int state = STATE_OFFLINE;
059:            private Object stateMutex = new Object();
060:            private Thread heartbeatThread = null;
061:
062:            /**
063:             * Construct node manager, establish identity, and make
064:             * initial connection.
065:             */
066:            public NodeManager(StoreContext context) {
067:                this .context = context;
068:                // Start new thread
069:                heartbeatThread = new Thread(this );
070:                heartbeatThread.setName("Persistence-heartbeat");
071:                heartbeatThread.setDaemon(true);
072:                heartbeatThread.start();
073:            }
074:
075:            public int getNodeIndex() {
076:                return index;
077:            }
078:
079:            public void close() {
080:                logger.debug("closing node manager.");
081:                // Release all resources of state
082:                try {
083:                    ensureState(STATE_UNINITIALIZED);
084:                } catch (Exception e) {
085:                    logger.error("error while shutting down node manager", e);
086:                }
087:                // Shutdown heartbeat thread
088:                running = false;
089:                heartbeatThread.interrupt();
090:            }
091:
092:            /**
093:             * Ensure that, if possible the given state is reached. This method
094:             * makes all the necessary calls of state changes up and down.
095:             */
096:            public void ensureState(int newState) {
097:                synchronized (stateMutex) {
098:                    logger.debug("ensuring state: " + newState
099:                            + ", current state: " + state);
100:                    if (state == newState)
101:                        return;
102:                    // Separate two events: When state is increased, and when
103:                    // state is decreased.
104:                    if (newState > state) {
105:                        // Node is offline, all functions work here
106:                        if ((state == STATE_OFFLINE)
107:                                && (newState > STATE_OFFLINE))
108:                            state = STATE_UNINITIALIZED;
109:                        // State is increased
110:                        if ((state == STATE_UNINITIALIZED)
111:                                && (newState > STATE_UNINITIALIZED)) {
112:                            // Initialize
113:                            initialize();
114:                        }
115:                        if (((state == STATE_INITIALIZED) || (state == STATE_WAITING))
116:                                && (newState > STATE_WAITING)) {
117:                            // Initialized, now determine server and connect to it.
118:                            // Note, that connect may stay in WAITING state.
119:                            connect();
120:                            // Initialize cache on new connection
121:                            if (state == STATE_CONNECTED)
122:                                context.getCache().init();
123:                        }
124:                    } else {
125:                        // State is decreased
126:                        if (((state == STATE_CONNECTED) || (state == STATE_WAITING))
127:                                && (newState < STATE_WAITING)) {
128:                            // Set state
129:                            state = STATE_INITIALIZED;
130:                            // Lost connection to server, now we can't be sure
131:                            // of anything, whether the old server is still intact,
132:                            // or a new server is chosen, whether other clients did
133:                            // something we are not aware of. To compensate for this,
134:                            // all stateful functions will be aborted:
135:                            // - cache: cleared
136:                            // - transactions: all roll back sometime in the future
137:                            // - transaction locks: taken care when all transaction roll back
138:                            context.getCache().clear();
139:                            context.getTransactionTracker().markRollbackAll(
140:                                    new StoreException(
141:                                            "lost connection to server"));
142:                            // Disconnect client entirely, and allocate new
143:                            if (client != null)
144:                                client.disconnect();
145:                        }
146:                        if ((state == STATE_INITIALIZED)
147:                                && (newState < STATE_INITIALIZED)) {
148:                            // Set state
149:                            state = STATE_UNINITIALIZED;
150:                            // Node could execute heartbeat. This means connection
151:                            // to database is lost, and new identity will have to
152:                            // be established.
153:                            server.disconnect();
154:                            // Remove the node entry from database
155:                            clearNode();
156:                        }
157:                    }
158:                }
159:                logger.debug("state: " + state
160:                        + ", successfully established, requested was: "
161:                        + newState);
162:            }
163:
164:            /**
165:             * Load the node list into a list.
166:             */
167:            private List loadNodeList(Transaction transaction, int searchIndex) {
168:                Vector resultList = new Vector();
169:                Vector orderByList = new Vector();
170:                orderByList.add(new OrderBy(new ReferenceTerm(
171:                        "persistence_nodes", null, "nodeindex"),
172:                        OrderBy.ASCENDING));
173:                Expression expr = null;
174:                if (searchIndex > 0) {
175:                    expr = new Expression();
176:                    expr.add(new ReferenceTerm("persistence_nodes", null,
177:                            "nodeindex"));
178:                    expr.add("<");
179:                    expr.add(new ConstantTerm(new Integer(searchIndex)));
180:                }
181:                QueryStatement stmt = new QueryStatement("persistence_nodes",
182:                        expr, orderByList);
183:                SearchResult result = context.getDatabase().search(transaction,
184:                        stmt, null);
185:                for (int i = 0; i < result.getResult().size(); i++) {
186:                    Map attributes = (Map) result.getResult().get(i);
187:                    NodeEntry entry = new NodeEntry();
188:                    entry.ips = (String) attributes.get("ips");
189:                    entry.port = ((Number) attributes.get("command_port"))
190:                            .intValue();
191:                    entry.index = ((Number) attributes.get("nodeindex"))
192:                            .intValue();
193:                    entry.heartbeat = ((Number) attributes.get("heartbeat"))
194:                            .intValue();
195:                    entry.timestamp = System.currentTimeMillis();
196:                    entry.lastHeartbeatChange = entry.timestamp;
197:                    resultList.add(entry);
198:                }
199:                return resultList;
200:            }
201:
202:            /**
203:             * Connect to the server.
204:             */
205:            private void connect() {
206:                logger.debug("node connecting to server...");
207:                // New client
208:                client = null;
209:                // Reload server node list, check if some nodes disapeared,
210:                // so we don't have to check those
211:                Transaction transaction = context.getTransactionTracker()
212:                        .getTransaction(TransactionTracker.TX_REQUIRED);
213:                transaction.begin();
214:                try {
215:                    List resultNodeList = loadNodeList(transaction, index);
216:                    nodeList.retainAll(resultNodeList); // Deletes all which are not in result
217:                } catch (Exception e) {
218:                    logger.error("could not reload node list", e);
219:                    transaction.markRollbackOnly();
220:                    ensureState(STATE_UNINITIALIZED);
221:                } finally {
222:                    transaction.commit();
223:                }
224:                // Try to select a server node from list
225:                NodeEntry serverNode = null;
226:                for (int i = 0; (i < nodeList.size()) && (serverNode == null); i++) {
227:                    long currentTimestamp = System.currentTimeMillis();
228:                    NodeEntry entry = (NodeEntry) nodeList.get(i);
229:                    // If data is too old, set to unknown
230:                    if (currentTimestamp - entry.timestamp > NODE_TIMEOUT)
231:                        entry.state = NodeEntry.STATE_UNKNOWN;
232:                    // Try to determine whether node is accessible. If it is,
233:                    // it is considered alive. Note, that if it isn't, that does
234:                    // not mean it is dead! It merely means it may be dead, but
235:                    // it may only be not reachable from this client. In this case,
236:                    // it stays "UNKNOWN", and the periodic heartbeat will decide,
237:                    // whether that node has updated it's heartbeat in the database.
238:                    if (entry.state == NodeEntry.STATE_UNKNOWN) {
239:                        logger
240:                                .debug("entry state unknown, trying to connect...");
241:                        if (NodeClient.isAlive(entry.ips, entry.port)) {
242:                            // Node is alive, because we can connect to it
243:                            entry.state = NodeEntry.STATE_ALIVE;
244:                        } else if (entry.ips.equals(ips)) {
245:                            // Node is not alive, and is on the same host
246:                            // (bacause it has the same ips). This means the node
247:                            // is dead, because there can't be any network errors.
248:                            entry.state = NodeEntry.STATE_DEAD;
249:                        }
250:                    }
251:                    // Check
252:                    if (entry.state == NodeEntry.STATE_UNKNOWN) {
253:                        logger
254:                                .debug("entry state unknown, waiting for answers...");
255:                        // We arrived at a node, which we don't know if it's alive,
256:                        // so we wait, until it becomes clear.
257:                        state = STATE_WAITING;
258:                        try {
259:                            while (entry.state == NodeEntry.STATE_UNKNOWN)
260:                                stateMutex.wait();
261:                        } catch (Exception e) {
262:                            throw new StoreException(
263:                                    "exception while waiting for a node state",
264:                                    e);
265:                        }
266:                    }
267:                    if (entry.state == NodeEntry.STATE_ALIVE) {
268:                        // Node is alive, this will be our server
269:                        serverNode = entry;
270:                    }
271:                }
272:                if (serverNode == null) {
273:                    logger.debug("node will be the appointed server.");
274:                    // If server node is null, we will be the server
275:                    provider = server;
276:                    server.activate();
277:                    // Change to ready state, skip CONNECTED state, because
278:                    // we don't have to handshake with ourselves
279:                    state = STATE_CONNECTED;
280:                    // Clear the node list
281:                    clearNodeList();
282:                } else {
283:                    logger.debug("determined to be client node, server is: "
284:                            + serverNode.ips + ":" + serverNode.port);
285:                    // If server node is not null, we should be connected to it
286:                    client = new NodeClient(context);
287:                    provider = client;
288:                    client.connect(serverNode.ips, serverNode.port);
289:                    // If all goes well, change to connected state
290:                    state = STATE_CONNECTED;
291:                }
292:            }
293:
294:            /**
295:             * Clear this node from the database.
296:             */
297:            private void clearNode() {
298:                logger.debug("clearing node from database: " + index);
299:                Connection conn = context.getDatabase().getConnectionSource()
300:                        .getConnection();
301:                try {
302:                    PreparedStatement pstmt = conn
303:                            .prepareStatement("delete from nodes where nodeindex = "
304:                                    + index);
305:                    pstmt.executeUpdate();
306:                    pstmt.close();
307:                    conn.commit();
308:                } catch (Exception e) {
309:                    logger.error("error while clearing node: " + index);
310:                } finally {
311:                    context.getDatabase().getConnectionSource()
312:                            .releaseConnection(conn);
313:                }
314:                logger.debug("node cleared from database.");
315:            }
316:
317:            /**
318:             * Clear node list. This is called as part of the initialization
319:             * process, if all previous node entries in the database
320:             * are dead.
321:             */
322:            private void clearNodeList() {
323:                Transaction transaction = context.getTransactionTracker()
324:                        .getTransaction(TransactionTracker.TX_REQUIRED);
325:                transaction.begin();
326:                try {
327:                    // Go through all nodes on our list, and remove all dead ones
328:                    for (int i = 0; i < nodeList.size(); i++) {
329:                        NodeEntry entry = (NodeEntry) nodeList.get(i);
330:                        if (entry.state == NodeEntry.STATE_DEAD) {
331:                            Map attrs = new HashMap();
332:                            attrs.put("nodeindex", new Integer(entry.index));
333:                            context.getDatabase().remove(transaction,
334:                                    "persistence_nodes", attrs);
335:                        }
336:                    }
337:                } catch (StoreException e) {
338:                    transaction.markRollbackOnly();
339:                    throw e;
340:                } catch (Exception e) {
341:                    transaction.markRollbackOnly();
342:                    throw new StoreException(
343:                            "exception while deleting dead nodes from database",
344:                            e);
345:                } finally {
346:                    transaction.commit();
347:                }
348:            }
349:
350:            /**
351:             * Initialize node identity.
352:             */
353:            private void initialize() {
354:                logger.debug("node initializing...");
355:                Transaction transaction = context.getTransactionTracker()
356:                        .getTransaction(TransactionTracker.TX_NEW);
357:                transaction
358:                        .setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
359:                transaction.begin();
360:                try {
361:                    // First ensure that table exists
362:                    HashMap tableAttrs = new HashMap();
363:                    tableAttrs.put("nodeindex", Integer.class);
364:                    tableAttrs.put("ips", String.class);
365:                    tableAttrs.put("command_port", Integer.class);
366:                    tableAttrs.put("heartbeat", Integer.class);
367:                    Vector tableKeys = new Vector();
368:                    tableKeys.add("nodeindex");
369:                    context.getDatabase().ensureTable(transaction,
370:                            "persistence_nodes", tableAttrs, tableKeys, true);
371:                    // Start the node server
372:                    server = new NodeServer(context);
373:                    ips = server.getHostAddresses();
374:                    server.bind();
375:                    // Load nodes table
376:                    nodeList = loadNodeList(transaction, 0);
377:                    // Determine identity
378:                    int port = server.getServerSocket().getLocalPort();
379:                    index = 1;
380:                    if (nodeList.size() > 0)
381:                        index = 1 + ((NodeEntry) nodeList
382:                                .get(nodeList.size() - 1)).index;
383:                    logger.debug("node identity determined, index: " + index
384:                            + ", address: " + ips + ":" + port);
385:                    // Insert my index, port and ip to the nodes table.
386:                    // This may not fail, because we have isolation level "serializable",
387:                    // so previously computed index should not be taken.
388:                    Map attrs = new HashMap();
389:                    attrs.put("nodeindex", new Integer(index));
390:                    attrs.put("ips", ips);
391:                    attrs.put("command_port", new Integer(port));
392:                    attrs.put("heartbeat", new Integer(heartbeat));
393:                    context.getDatabase().insert(transaction,
394:                            "persistence_nodes", attrs);
395:                    // If all successful, set state to initialized
396:                    state = STATE_INITIALIZED;
397:                } catch (StoreException e) {
398:                    logger.fatal("could not initialize node subsystem.", e);
399:                    transaction.markRollbackOnly();
400:                    throw e;
401:                } catch (Throwable e) {
402:                    logger.fatal("could not initialize node subsystem.", e);
403:                    transaction.markRollbackOnly();
404:                    throw new StoreException("unexcepted error", e);
405:                } finally {
406:                    transaction.commit();
407:                }
408:            }
409:
410:            /**
411:             * This is the heartbeat of the node. If the heartbeat fails,
412:             * meaning it can't update the database, the node is considered dead.
413:             * Secondary function of the heartbeat is, to check on nodes which
414:             * have unknown state.
415:             */
416:            public void run() {
417:                while (running) {
418:                    try {
419:                        // Wait heartbeat
420:                        Thread.currentThread().sleep(HEARTBEAT_INTERVAL);
421:                        // Determine whether to do something
422:                        int currentIndex = 0;
423:                        int currentState = 0;
424:                        Vector currentNodeList = null;
425:                        synchronized (stateMutex) {
426:                            if (state < STATE_INITIALIZED)
427:                                continue; // Do not run
428:                            currentState = state;
429:                            currentIndex = index;
430:                            currentNodeList = new Vector(nodeList);
431:                        }
432:                        // Do database functions
433:                        logger.debug("heartbeat running: " + index);
434:                        Transaction transaction = context
435:                                .getTransactionTracker().getTransaction(
436:                                        TransactionTracker.TX_REQUIRED);
437:                        transaction.begin();
438:                        try {
439:                            // Update database heartbeat number
440:                            heartbeat++;
441:                            Map attrs = new HashMap();
442:                            attrs.put("heartbeat", new Integer(heartbeat));
443:                            Map keys = new HashMap();
444:                            keys.put("nodeindex", new Integer(currentIndex));
445:                            context.getDatabase().save(transaction,
446:                                    "persistence_nodes", keys, attrs);
447:                            // Check unknown hosts for heartbeat sign
448:                            if (state == STATE_WAITING) {
449:                                // Get list from database, and diff it to
450:                                // out current list. If the heartbeat increased
451:                                // since last time, mark node as alive.
452:                                List resultNodeList = loadNodeList(transaction,
453:                                        index);
454:                                int resultIndex = 0;
455:                                int listIndex = 0;
456:                                while ((resultIndex < resultNodeList.size())
457:                                        && (listIndex < currentNodeList.size())) {
458:                                    NodeEntry resultEntry = (NodeEntry) resultNodeList
459:                                            .get(resultIndex);
460:                                    NodeEntry currentEntry = (NodeEntry) currentNodeList
461:                                            .get(listIndex);
462:                                    if (currentEntry.index < resultEntry.index)
463:                                        listIndex++;
464:                                    else if (resultEntry.index < currentEntry.index)
465:                                        resultIndex++;
466:                                    else {
467:                                        // Found a match, so look at heartbeat
468:                                        long currentTimestamp = System
469:                                                .currentTimeMillis();
470:                                        // This check is inside timeout, and the heartbeat
471:                                        // increased. Node is alive then.
472:                                        if ((currentTimestamp
473:                                                - currentEntry.timestamp < NODE_TIMEOUT)
474:                                                && (resultEntry.heartbeat > currentEntry.heartbeat)) {
475:                                            currentEntry.state = NodeEntry.STATE_ALIVE;
476:                                            synchronized (stateMutex) {
477:                                                stateMutex.notifyAll(); // Notify wait state
478:                                            }
479:                                        }
480:                                        // If the timeout is reached, and the heartbeat
481:                                        // still did not change, then node is dead.
482:                                        if ((currentTimestamp
483:                                                - currentEntry.lastHeartbeatChange >= NODE_TIMEOUT)
484:                                                && (resultEntry.heartbeat == currentEntry.heartbeat)) {
485:                                            currentEntry.state = NodeEntry.STATE_DEAD;
486:                                            synchronized (stateMutex) {
487:                                                stateMutex.notifyAll(); // Notify wait state
488:                                            }
489:                                        }
490:                                        // Set stuff
491:                                        if (currentEntry.heartbeat < resultEntry.heartbeat)
492:                                            currentEntry.lastHeartbeatChange = currentTimestamp;
493:                                        currentEntry.heartbeat = resultEntry.heartbeat;
494:                                        currentEntry.timestamp = currentTimestamp;
495:                                        // Increase both
496:                                        listIndex++;
497:                                        resultIndex++;
498:                                    }
499:                                }
500:                            }
501:                        } catch (Exception e) {
502:                            // Mark transaction rollback
503:                            logger
504:                                    .error(
505:                                            "exception while heartbeat database functions",
506:                                            e);
507:                            transaction.markRollbackOnly();
508:                            // Scale back to uninitialized state
509:                            ensureState(STATE_UNINITIALIZED);
510:                        } finally {
511:                            transaction.commit();
512:                        }
513:                    } catch (InterruptedException e) {
514:                        // Nothing to do, thread probably will be shut down
515:                        logger
516:                                .debug("heartbeat received interrupt, running flag: "
517:                                        + running);
518:                    } catch (Exception e) {
519:                        logger.error("exception in heartbeat", e);
520:                    }
521:                }
522:            }
523:
524:            public class NodeEntry {
525:                public static final int STATE_UNKNOWN = 0;
526:                public static final int STATE_ALIVE = 1;
527:                public static final int STATE_DEAD = 2;
528:
529:                public String ips;
530:                public int port;
531:                public int index;
532:                public int heartbeat = 0;
533:                public int state = STATE_UNKNOWN;
534:                public long timestamp;
535:                public long lastHeartbeatChange = 0;
536:
537:                public int hashCode() {
538:                    return index;
539:                }
540:
541:                public boolean equals(Object obj) {
542:                    if (!(obj instanceof  NodeEntry))
543:                        return false;
544:                    return index == ((NodeEntry) obj).index;
545:                }
546:
547:                public String toString() {
548:                    return "[Node: " + ips + ":" + port + ", index: " + index
549:                            + "]";
550:                }
551:            }
552:
553:            /*
554:             * Service interface
555:             */
556:
557:            /**
558:             * Get a new serial number.
559:             */
560:            public Long getNextSerial() {
561:                ensureState(STATE_CONNECTED);
562:                if (server.isActive())
563:                    return server.getNextSerial();
564:                else if (client != null)
565:                    return client.getNextSerial();
566:                throw new StoreException("node is not ready for service");
567:            }
568:
569:            /**
570:             * Send cache update request.
571:             */
572:            public void updateEntries(String tableName, Long serial) {
573:                ensureState(STATE_CONNECTED);
574:                if (server.isActive())
575:                    server.updateEntries(tableName, serial);
576:                else if (client != null)
577:                    client.updateEntries(tableName, serial);
578:                else
579:                    throw new StoreException("node is not ready for service");
580:            }
581:
582:            /**
583:             * Lock an object.
584:             * @return A transaction if lock failed (the transaction which
585:             * has a lock on the object), null else.
586:             */
587:            public SessionInfo lock(int index, long threadId, long txSerial,
588:                    List metas, SessionInfo info, int wait,
589:                    boolean ensureCurrent) {
590:                ensureState(STATE_CONNECTED);
591:                if (server.isActive())
592:                    return server.lock(index, threadId, txSerial, metas, info,
593:                            wait, ensureCurrent);
594:                else if (client != null)
595:                    return client.lock(index, threadId, txSerial, metas, info,
596:                            wait, ensureCurrent);
597:                throw new StoreException("node is not ready for service");
598:            }
599:
600:            /**
601:             * Unlock object.
602:             */
603:            public void unlock(int index, long threadId, long txSerial,
604:                    List metas) {
605:                ensureState(STATE_CONNECTED);
606:                if (server.isActive())
607:                    server.unlock(index, threadId, txSerial, metas);
608:                else if (client != null)
609:                    client.unlock(index, threadId, txSerial, metas);
610:                else
611:                    throw new StoreException("node is not ready for service");
612:            }
613:
614:            /**
615:             * Wait for a query to execute with the given serial.
616:             * This method returns, if all commits before the given serial
617:             * are finished.
618:             */
619:            public void waitForQuery(Long serial) {
620:                ensureState(STATE_CONNECTED);
621:                if (server.isActive())
622:                    server.waitForQuery(serial);
623:                else if (client != null)
624:                    client.waitForQuery(serial);
625:                else
626:                    throw new StoreException("node is not ready for service");
627:            }
628:
629:            /**
630:             * Wait for starting a commit. The commit can start if
631:             * there are no queries executed with greater serial.
632:             * @return The serial the commit can run with.
633:             */
634:            public Long startCommit(int index) {
635:                ensureState(STATE_CONNECTED);
636:                if (server.isActive())
637:                    return server.startCommit(index);
638:                else if (client != null)
639:                    return client.startCommit(index);
640:                throw new StoreException("node is not ready for service");
641:            }
642:
643:            /**
644:             * Mark the end of a commit phase.
645:             */
646:            public void endCommit(int index, Long serial) {
647:                ensureState(STATE_CONNECTED);
648:                if (server.isActive())
649:                    server.endCommit(index, serial);
650:                else if (client != null)
651:                    client.endCommit(index, serial);
652:                else
653:                    throw new StoreException("node is not ready for service");
654:            }
655:
656:            /**
657:             * Notify server of object changes.
658:             */
659:            public void notifyChange(List metas, Long endSerial, Long txSerial) {
660:                ensureState(STATE_CONNECTED);
661:                if (server.isActive())
662:                    server.notifyChange(metas, endSerial, txSerial);
663:                else if (client != null)
664:                    client.notifyChange(metas, endSerial, txSerial);
665:                else
666:                    throw new StoreException("node is not ready for service");
667:            }
668:
669:            static {
670:                try {
671:                    ResourceBundle config = ResourceBundle
672:                            .getBundle("beankeeper");
673:                    NODE_TIMEOUT = Integer.valueOf(
674:                            config.getString("node.timeout")).intValue();
675:                    HEARTBEAT_INTERVAL = Integer.valueOf(
676:                            config.getString("node.heartbeat")).intValue();
677:                } catch (Exception e) {
678:                    logger
679:                            .error(
680:                                    "could not load configuration, using hardcoded defaults",
681:                                    e);
682:                }
683:            }
684:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.