001: /**
002: * Copyright (C) 2006 NetMind Consulting Bt.
003: *
004: * This library is free software; you can redistribute it and/or
005: * modify it under the terms of the GNU Lesser General Public
006: * License as published by the Free Software Foundation; either
007: * version 3 of the License, or (at your option) any later version.
008: *
009: * This library is distributed in the hope that it will be useful,
010: * but WITHOUT ANY WARRANTY; without even the implied warranty of
011: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
012: * Lesser General Public License for more details.
013: *
014: * You should have received a copy of the GNU Lesser General Public
015: * License along with this library; if not, write to the Free Software
016: * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
017: */package hu.netmind.persistence.node;
018:
019: import hu.netmind.persistence.*;
020: import hu.netmind.persistence.parser.*;
021: import java.util.*;
022: import java.sql.Connection;
023: import java.sql.PreparedStatement;
024: import org.apache.log4j.Logger;
025:
026: /**
027: * This manager enables the Store to function on a peer-to-peer
028: * fashion with other Store instances which are pointed to the same
029: * database. The class takes care of all IP communication related
030: * work, such as reconnecting, communication protocoll, etc. All
031: * synchronization points must occur through this manager, which guarantees
032: * synchronization across all other Store instances.
033: * @author Brautigam Robert
034: * @version Revision: $Revision$
035: */
036: public class NodeManager implements ServiceProvider, Runnable {
037: private static Logger logger = Logger.getLogger(NodeManager.class);
038:
039: public static int NODE_TIMEOUT = 10000;
040: public static int HEARTBEAT_INTERVAL = 6000;
041:
042: public static final int STATE_OFFLINE = 0;
043: public static final int STATE_UNINITIALIZED = 1;
044: public static final int STATE_INITIALIZED = 2;
045: public static final int STATE_WAITING = 3;
046: public static final int STATE_CONNECTED = 4;
047:
048: private StoreContext context;
049: private NodeServer server;
050: private NodeClient client;
051: private int index;
052: private int heartbeat = 1;
053: private String ips;
054: private List nodeList;
055: private ServiceProvider provider;
056:
057: private boolean running = true;
058: private int state = STATE_OFFLINE;
059: private Object stateMutex = new Object();
060: private Thread heartbeatThread = null;
061:
062: /**
063: * Construct node manager, establish identity, and make
064: * initial connection.
065: */
066: public NodeManager(StoreContext context) {
067: this .context = context;
068: // Start new thread
069: heartbeatThread = new Thread(this );
070: heartbeatThread.setName("Persistence-heartbeat");
071: heartbeatThread.setDaemon(true);
072: heartbeatThread.start();
073: }
074:
075: public int getNodeIndex() {
076: return index;
077: }
078:
079: public void close() {
080: logger.debug("closing node manager.");
081: // Release all resources of state
082: try {
083: ensureState(STATE_UNINITIALIZED);
084: } catch (Exception e) {
085: logger.error("error while shutting down node manager", e);
086: }
087: // Shutdown heartbeat thread
088: running = false;
089: heartbeatThread.interrupt();
090: }
091:
092: /**
093: * Ensure that, if possible the given state is reached. This method
094: * makes all the necessary calls of state changes up and down.
095: */
096: public void ensureState(int newState) {
097: synchronized (stateMutex) {
098: logger.debug("ensuring state: " + newState
099: + ", current state: " + state);
100: if (state == newState)
101: return;
102: // Separate two events: When state is increased, and when
103: // state is decreased.
104: if (newState > state) {
105: // Node is offline, all functions work here
106: if ((state == STATE_OFFLINE)
107: && (newState > STATE_OFFLINE))
108: state = STATE_UNINITIALIZED;
109: // State is increased
110: if ((state == STATE_UNINITIALIZED)
111: && (newState > STATE_UNINITIALIZED)) {
112: // Initialize
113: initialize();
114: }
115: if (((state == STATE_INITIALIZED) || (state == STATE_WAITING))
116: && (newState > STATE_WAITING)) {
117: // Initialized, now determine server and connect to it.
118: // Note, that connect may stay in WAITING state.
119: connect();
120: // Initialize cache on new connection
121: if (state == STATE_CONNECTED)
122: context.getCache().init();
123: }
124: } else {
125: // State is decreased
126: if (((state == STATE_CONNECTED) || (state == STATE_WAITING))
127: && (newState < STATE_WAITING)) {
128: // Set state
129: state = STATE_INITIALIZED;
130: // Lost connection to server, now we can't be sure
131: // of anything, whether the old server is still intact,
132: // or a new server is chosen, whether other clients did
133: // something we are not aware of. To compensate for this,
134: // all stateful functions will be aborted:
135: // - cache: cleared
136: // - transactions: all roll back sometime in the future
137: // - transaction locks: taken care when all transaction roll back
138: context.getCache().clear();
139: context.getTransactionTracker().markRollbackAll(
140: new StoreException(
141: "lost connection to server"));
142: // Disconnect client entirely, and allocate new
143: if (client != null)
144: client.disconnect();
145: }
146: if ((state == STATE_INITIALIZED)
147: && (newState < STATE_INITIALIZED)) {
148: // Set state
149: state = STATE_UNINITIALIZED;
150: // Node could execute heartbeat. This means connection
151: // to database is lost, and new identity will have to
152: // be established.
153: server.disconnect();
154: // Remove the node entry from database
155: clearNode();
156: }
157: }
158: }
159: logger.debug("state: " + state
160: + ", successfully established, requested was: "
161: + newState);
162: }
163:
164: /**
165: * Load the node list into a list.
166: */
167: private List loadNodeList(Transaction transaction, int searchIndex) {
168: Vector resultList = new Vector();
169: Vector orderByList = new Vector();
170: orderByList.add(new OrderBy(new ReferenceTerm(
171: "persistence_nodes", null, "nodeindex"),
172: OrderBy.ASCENDING));
173: Expression expr = null;
174: if (searchIndex > 0) {
175: expr = new Expression();
176: expr.add(new ReferenceTerm("persistence_nodes", null,
177: "nodeindex"));
178: expr.add("<");
179: expr.add(new ConstantTerm(new Integer(searchIndex)));
180: }
181: QueryStatement stmt = new QueryStatement("persistence_nodes",
182: expr, orderByList);
183: SearchResult result = context.getDatabase().search(transaction,
184: stmt, null);
185: for (int i = 0; i < result.getResult().size(); i++) {
186: Map attributes = (Map) result.getResult().get(i);
187: NodeEntry entry = new NodeEntry();
188: entry.ips = (String) attributes.get("ips");
189: entry.port = ((Number) attributes.get("command_port"))
190: .intValue();
191: entry.index = ((Number) attributes.get("nodeindex"))
192: .intValue();
193: entry.heartbeat = ((Number) attributes.get("heartbeat"))
194: .intValue();
195: entry.timestamp = System.currentTimeMillis();
196: entry.lastHeartbeatChange = entry.timestamp;
197: resultList.add(entry);
198: }
199: return resultList;
200: }
201:
202: /**
203: * Connect to the server.
204: */
205: private void connect() {
206: logger.debug("node connecting to server...");
207: // New client
208: client = null;
209: // Reload server node list, check if some nodes disapeared,
210: // so we don't have to check those
211: Transaction transaction = context.getTransactionTracker()
212: .getTransaction(TransactionTracker.TX_REQUIRED);
213: transaction.begin();
214: try {
215: List resultNodeList = loadNodeList(transaction, index);
216: nodeList.retainAll(resultNodeList); // Deletes all which are not in result
217: } catch (Exception e) {
218: logger.error("could not reload node list", e);
219: transaction.markRollbackOnly();
220: ensureState(STATE_UNINITIALIZED);
221: } finally {
222: transaction.commit();
223: }
224: // Try to select a server node from list
225: NodeEntry serverNode = null;
226: for (int i = 0; (i < nodeList.size()) && (serverNode == null); i++) {
227: long currentTimestamp = System.currentTimeMillis();
228: NodeEntry entry = (NodeEntry) nodeList.get(i);
229: // If data is too old, set to unknown
230: if (currentTimestamp - entry.timestamp > NODE_TIMEOUT)
231: entry.state = NodeEntry.STATE_UNKNOWN;
232: // Try to determine whether node is accessible. If it is,
233: // it is considered alive. Note, that if it isn't, that does
234: // not mean it is dead! It merely means it may be dead, but
235: // it may only be not reachable from this client. In this case,
236: // it stays "UNKNOWN", and the periodic heartbeat will decide,
237: // whether that node has updated it's heartbeat in the database.
238: if (entry.state == NodeEntry.STATE_UNKNOWN) {
239: logger
240: .debug("entry state unknown, trying to connect...");
241: if (NodeClient.isAlive(entry.ips, entry.port)) {
242: // Node is alive, because we can connect to it
243: entry.state = NodeEntry.STATE_ALIVE;
244: } else if (entry.ips.equals(ips)) {
245: // Node is not alive, and is on the same host
246: // (bacause it has the same ips). This means the node
247: // is dead, because there can't be any network errors.
248: entry.state = NodeEntry.STATE_DEAD;
249: }
250: }
251: // Check
252: if (entry.state == NodeEntry.STATE_UNKNOWN) {
253: logger
254: .debug("entry state unknown, waiting for answers...");
255: // We arrived at a node, which we don't know if it's alive,
256: // so we wait, until it becomes clear.
257: state = STATE_WAITING;
258: try {
259: while (entry.state == NodeEntry.STATE_UNKNOWN)
260: stateMutex.wait();
261: } catch (Exception e) {
262: throw new StoreException(
263: "exception while waiting for a node state",
264: e);
265: }
266: }
267: if (entry.state == NodeEntry.STATE_ALIVE) {
268: // Node is alive, this will be our server
269: serverNode = entry;
270: }
271: }
272: if (serverNode == null) {
273: logger.debug("node will be the appointed server.");
274: // If server node is null, we will be the server
275: provider = server;
276: server.activate();
277: // Change to ready state, skip CONNECTED state, because
278: // we don't have to handshake with ourselves
279: state = STATE_CONNECTED;
280: // Clear the node list
281: clearNodeList();
282: } else {
283: logger.debug("determined to be client node, server is: "
284: + serverNode.ips + ":" + serverNode.port);
285: // If server node is not null, we should be connected to it
286: client = new NodeClient(context);
287: provider = client;
288: client.connect(serverNode.ips, serverNode.port);
289: // If all goes well, change to connected state
290: state = STATE_CONNECTED;
291: }
292: }
293:
294: /**
295: * Clear this node from the database.
296: */
297: private void clearNode() {
298: logger.debug("clearing node from database: " + index);
299: Connection conn = context.getDatabase().getConnectionSource()
300: .getConnection();
301: try {
302: PreparedStatement pstmt = conn
303: .prepareStatement("delete from nodes where nodeindex = "
304: + index);
305: pstmt.executeUpdate();
306: pstmt.close();
307: conn.commit();
308: } catch (Exception e) {
309: logger.error("error while clearing node: " + index);
310: } finally {
311: context.getDatabase().getConnectionSource()
312: .releaseConnection(conn);
313: }
314: logger.debug("node cleared from database.");
315: }
316:
317: /**
318: * Clear node list. This is called as part of the initialization
319: * process, if all previous node entries in the database
320: * are dead.
321: */
322: private void clearNodeList() {
323: Transaction transaction = context.getTransactionTracker()
324: .getTransaction(TransactionTracker.TX_REQUIRED);
325: transaction.begin();
326: try {
327: // Go through all nodes on our list, and remove all dead ones
328: for (int i = 0; i < nodeList.size(); i++) {
329: NodeEntry entry = (NodeEntry) nodeList.get(i);
330: if (entry.state == NodeEntry.STATE_DEAD) {
331: Map attrs = new HashMap();
332: attrs.put("nodeindex", new Integer(entry.index));
333: context.getDatabase().remove(transaction,
334: "persistence_nodes", attrs);
335: }
336: }
337: } catch (StoreException e) {
338: transaction.markRollbackOnly();
339: throw e;
340: } catch (Exception e) {
341: transaction.markRollbackOnly();
342: throw new StoreException(
343: "exception while deleting dead nodes from database",
344: e);
345: } finally {
346: transaction.commit();
347: }
348: }
349:
350: /**
351: * Initialize node identity.
352: */
353: private void initialize() {
354: logger.debug("node initializing...");
355: Transaction transaction = context.getTransactionTracker()
356: .getTransaction(TransactionTracker.TX_NEW);
357: transaction
358: .setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
359: transaction.begin();
360: try {
361: // First ensure that table exists
362: HashMap tableAttrs = new HashMap();
363: tableAttrs.put("nodeindex", Integer.class);
364: tableAttrs.put("ips", String.class);
365: tableAttrs.put("command_port", Integer.class);
366: tableAttrs.put("heartbeat", Integer.class);
367: Vector tableKeys = new Vector();
368: tableKeys.add("nodeindex");
369: context.getDatabase().ensureTable(transaction,
370: "persistence_nodes", tableAttrs, tableKeys, true);
371: // Start the node server
372: server = new NodeServer(context);
373: ips = server.getHostAddresses();
374: server.bind();
375: // Load nodes table
376: nodeList = loadNodeList(transaction, 0);
377: // Determine identity
378: int port = server.getServerSocket().getLocalPort();
379: index = 1;
380: if (nodeList.size() > 0)
381: index = 1 + ((NodeEntry) nodeList
382: .get(nodeList.size() - 1)).index;
383: logger.debug("node identity determined, index: " + index
384: + ", address: " + ips + ":" + port);
385: // Insert my index, port and ip to the nodes table.
386: // This may not fail, because we have isolation level "serializable",
387: // so previously computed index should not be taken.
388: Map attrs = new HashMap();
389: attrs.put("nodeindex", new Integer(index));
390: attrs.put("ips", ips);
391: attrs.put("command_port", new Integer(port));
392: attrs.put("heartbeat", new Integer(heartbeat));
393: context.getDatabase().insert(transaction,
394: "persistence_nodes", attrs);
395: // If all successful, set state to initialized
396: state = STATE_INITIALIZED;
397: } catch (StoreException e) {
398: logger.fatal("could not initialize node subsystem.", e);
399: transaction.markRollbackOnly();
400: throw e;
401: } catch (Throwable e) {
402: logger.fatal("could not initialize node subsystem.", e);
403: transaction.markRollbackOnly();
404: throw new StoreException("unexcepted error", e);
405: } finally {
406: transaction.commit();
407: }
408: }
409:
410: /**
411: * This is the heartbeat of the node. If the heartbeat fails,
412: * meaning it can't update the database, the node is considered dead.
413: * Secondary function of the heartbeat is, to check on nodes which
414: * have unknown state.
415: */
416: public void run() {
417: while (running) {
418: try {
419: // Wait heartbeat
420: Thread.currentThread().sleep(HEARTBEAT_INTERVAL);
421: // Determine whether to do something
422: int currentIndex = 0;
423: int currentState = 0;
424: Vector currentNodeList = null;
425: synchronized (stateMutex) {
426: if (state < STATE_INITIALIZED)
427: continue; // Do not run
428: currentState = state;
429: currentIndex = index;
430: currentNodeList = new Vector(nodeList);
431: }
432: // Do database functions
433: logger.debug("heartbeat running: " + index);
434: Transaction transaction = context
435: .getTransactionTracker().getTransaction(
436: TransactionTracker.TX_REQUIRED);
437: transaction.begin();
438: try {
439: // Update database heartbeat number
440: heartbeat++;
441: Map attrs = new HashMap();
442: attrs.put("heartbeat", new Integer(heartbeat));
443: Map keys = new HashMap();
444: keys.put("nodeindex", new Integer(currentIndex));
445: context.getDatabase().save(transaction,
446: "persistence_nodes", keys, attrs);
447: // Check unknown hosts for heartbeat sign
448: if (state == STATE_WAITING) {
449: // Get list from database, and diff it to
450: // out current list. If the heartbeat increased
451: // since last time, mark node as alive.
452: List resultNodeList = loadNodeList(transaction,
453: index);
454: int resultIndex = 0;
455: int listIndex = 0;
456: while ((resultIndex < resultNodeList.size())
457: && (listIndex < currentNodeList.size())) {
458: NodeEntry resultEntry = (NodeEntry) resultNodeList
459: .get(resultIndex);
460: NodeEntry currentEntry = (NodeEntry) currentNodeList
461: .get(listIndex);
462: if (currentEntry.index < resultEntry.index)
463: listIndex++;
464: else if (resultEntry.index < currentEntry.index)
465: resultIndex++;
466: else {
467: // Found a match, so look at heartbeat
468: long currentTimestamp = System
469: .currentTimeMillis();
470: // This check is inside timeout, and the heartbeat
471: // increased. Node is alive then.
472: if ((currentTimestamp
473: - currentEntry.timestamp < NODE_TIMEOUT)
474: && (resultEntry.heartbeat > currentEntry.heartbeat)) {
475: currentEntry.state = NodeEntry.STATE_ALIVE;
476: synchronized (stateMutex) {
477: stateMutex.notifyAll(); // Notify wait state
478: }
479: }
480: // If the timeout is reached, and the heartbeat
481: // still did not change, then node is dead.
482: if ((currentTimestamp
483: - currentEntry.lastHeartbeatChange >= NODE_TIMEOUT)
484: && (resultEntry.heartbeat == currentEntry.heartbeat)) {
485: currentEntry.state = NodeEntry.STATE_DEAD;
486: synchronized (stateMutex) {
487: stateMutex.notifyAll(); // Notify wait state
488: }
489: }
490: // Set stuff
491: if (currentEntry.heartbeat < resultEntry.heartbeat)
492: currentEntry.lastHeartbeatChange = currentTimestamp;
493: currentEntry.heartbeat = resultEntry.heartbeat;
494: currentEntry.timestamp = currentTimestamp;
495: // Increase both
496: listIndex++;
497: resultIndex++;
498: }
499: }
500: }
501: } catch (Exception e) {
502: // Mark transaction rollback
503: logger
504: .error(
505: "exception while heartbeat database functions",
506: e);
507: transaction.markRollbackOnly();
508: // Scale back to uninitialized state
509: ensureState(STATE_UNINITIALIZED);
510: } finally {
511: transaction.commit();
512: }
513: } catch (InterruptedException e) {
514: // Nothing to do, thread probably will be shut down
515: logger
516: .debug("heartbeat received interrupt, running flag: "
517: + running);
518: } catch (Exception e) {
519: logger.error("exception in heartbeat", e);
520: }
521: }
522: }
523:
524: public class NodeEntry {
525: public static final int STATE_UNKNOWN = 0;
526: public static final int STATE_ALIVE = 1;
527: public static final int STATE_DEAD = 2;
528:
529: public String ips;
530: public int port;
531: public int index;
532: public int heartbeat = 0;
533: public int state = STATE_UNKNOWN;
534: public long timestamp;
535: public long lastHeartbeatChange = 0;
536:
537: public int hashCode() {
538: return index;
539: }
540:
541: public boolean equals(Object obj) {
542: if (!(obj instanceof NodeEntry))
543: return false;
544: return index == ((NodeEntry) obj).index;
545: }
546:
547: public String toString() {
548: return "[Node: " + ips + ":" + port + ", index: " + index
549: + "]";
550: }
551: }
552:
553: /*
554: * Service interface
555: */
556:
557: /**
558: * Get a new serial number.
559: */
560: public Long getNextSerial() {
561: ensureState(STATE_CONNECTED);
562: if (server.isActive())
563: return server.getNextSerial();
564: else if (client != null)
565: return client.getNextSerial();
566: throw new StoreException("node is not ready for service");
567: }
568:
569: /**
570: * Send cache update request.
571: */
572: public void updateEntries(String tableName, Long serial) {
573: ensureState(STATE_CONNECTED);
574: if (server.isActive())
575: server.updateEntries(tableName, serial);
576: else if (client != null)
577: client.updateEntries(tableName, serial);
578: else
579: throw new StoreException("node is not ready for service");
580: }
581:
582: /**
583: * Lock an object.
584: * @return A transaction if lock failed (the transaction which
585: * has a lock on the object), null else.
586: */
587: public SessionInfo lock(int index, long threadId, long txSerial,
588: List metas, SessionInfo info, int wait,
589: boolean ensureCurrent) {
590: ensureState(STATE_CONNECTED);
591: if (server.isActive())
592: return server.lock(index, threadId, txSerial, metas, info,
593: wait, ensureCurrent);
594: else if (client != null)
595: return client.lock(index, threadId, txSerial, metas, info,
596: wait, ensureCurrent);
597: throw new StoreException("node is not ready for service");
598: }
599:
600: /**
601: * Unlock object.
602: */
603: public void unlock(int index, long threadId, long txSerial,
604: List metas) {
605: ensureState(STATE_CONNECTED);
606: if (server.isActive())
607: server.unlock(index, threadId, txSerial, metas);
608: else if (client != null)
609: client.unlock(index, threadId, txSerial, metas);
610: else
611: throw new StoreException("node is not ready for service");
612: }
613:
614: /**
615: * Wait for a query to execute with the given serial.
616: * This method returns, if all commits before the given serial
617: * are finished.
618: */
619: public void waitForQuery(Long serial) {
620: ensureState(STATE_CONNECTED);
621: if (server.isActive())
622: server.waitForQuery(serial);
623: else if (client != null)
624: client.waitForQuery(serial);
625: else
626: throw new StoreException("node is not ready for service");
627: }
628:
629: /**
630: * Wait for starting a commit. The commit can start if
631: * there are no queries executed with greater serial.
632: * @return The serial the commit can run with.
633: */
634: public Long startCommit(int index) {
635: ensureState(STATE_CONNECTED);
636: if (server.isActive())
637: return server.startCommit(index);
638: else if (client != null)
639: return client.startCommit(index);
640: throw new StoreException("node is not ready for service");
641: }
642:
643: /**
644: * Mark the end of a commit phase.
645: */
646: public void endCommit(int index, Long serial) {
647: ensureState(STATE_CONNECTED);
648: if (server.isActive())
649: server.endCommit(index, serial);
650: else if (client != null)
651: client.endCommit(index, serial);
652: else
653: throw new StoreException("node is not ready for service");
654: }
655:
656: /**
657: * Notify server of object changes.
658: */
659: public void notifyChange(List metas, Long endSerial, Long txSerial) {
660: ensureState(STATE_CONNECTED);
661: if (server.isActive())
662: server.notifyChange(metas, endSerial, txSerial);
663: else if (client != null)
664: client.notifyChange(metas, endSerial, txSerial);
665: else
666: throw new StoreException("node is not ready for service");
667: }
668:
669: static {
670: try {
671: ResourceBundle config = ResourceBundle
672: .getBundle("beankeeper");
673: NODE_TIMEOUT = Integer.valueOf(
674: config.getString("node.timeout")).intValue();
675: HEARTBEAT_INTERVAL = Integer.valueOf(
676: config.getString("node.heartbeat")).intValue();
677: } catch (Exception e) {
678: logger
679: .error(
680: "could not load configuration, using hardcoded defaults",
681: e);
682: }
683: }
684: }
|