001: /**
002: * Copyright (C) 2006 NetMind Consulting Bt.
003: *
004: * This library is free software; you can redistribute it and/or
005: * modify it under the terms of the GNU Lesser General Public
006: * License as published by the Free Software Foundation; either
007: * version 3 of the License, or (at your option) any later version.
008: *
009: * This library is distributed in the hope that it will be useful,
010: * but WITHOUT ANY WARRANTY; without even the implied warranty of
011: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
012: * Lesser General Public License for more details.
013: *
014: * You should have received a copy of the GNU Lesser General Public
015: * License along with this library; if not, write to the Free Software
016: * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
017: */package hu.netmind.persistence.node;
018:
019: import hu.netmind.persistence.*;
020: import hu.netmind.persistence.event.*;
021: import java.io.*;
022: import java.net.*;
023: import java.util.*;
024: import org.apache.log4j.Logger;
025:
026: /**
027: * Implementation of a server node. This is a synchronization
028: * point for all nodes connected to the same database.
029: * @author Brautigam Robert
030: * @version Revision: $Revision$
031: */
032: public class NodeServer implements ServiceProvider, Runnable {
033: private static Logger logger = Logger.getLogger(NodeServer.class);
034: private StoreContext context;
035: private ServerSocket serverSocket;
036: private Vector clientHandlers = new Vector();
037: private boolean running = true;
038: private boolean active = false;
039:
040: private RemoteLockTracker remoteLockTracker;
041: private CommitAndQueryTracker commitAndQueryTracker;
042: private ModificationTracker modificationTracker;
043:
044: public NodeServer(StoreContext context) {
045: this .context = context;
046: commitAndQueryTracker = new CommitAndQueryTracker(context);
047: modificationTracker = new ModificationTracker(context);
048: remoteLockTracker = new RemoteLockTracker(modificationTracker);
049: }
050:
051: /**
052: * Activate server, this means read the maximal serial from
053: * database and set the active flag.
054: */
055: public void activate() {
056: active = true;
057: }
058:
059: public boolean isActive() {
060: return active;
061: }
062:
063: public ServerSocket getServerSocket() {
064: return serverSocket;
065: }
066:
067: public void setServerSocket(ServerSocket serverSocket) {
068: this .serverSocket = serverSocket;
069: }
070:
071: /**
072: * Broadcast a message to all connected clients.
073: */
074: public void broadcastObject(CommObject obj) {
075: // Safe-copy all clients
076: Vector handlers = null;
077: synchronized (clientHandlers) {
078: handlers = new Vector(clientHandlers);
079: }
080: // Broadcast message
081: for (int i = 0; i < handlers.size(); i++) {
082: ClientHandler handler = (ClientHandler) handlers.get(i);
083: try {
084: handler.sendObject(obj);
085: } catch (Exception e) {
086: logger.warn("could not deliver message '" + obj
087: + "' to client.", e);
088: }
089: }
090: }
091:
092: /**
093: * Setup server and bind to a random port.
094: */
095: public void bind() {
096: try {
097: // Setup channel
098: serverSocket = new ServerSocket(0); // Choose an ip:port
099: // Start listening
100: Thread listenerThread = new Thread(this );
101: listenerThread.setName("Persistence-node");
102: listenerThread.setDaemon(true);
103: listenerThread.start();
104: } catch (Exception e) {
105: throw new StoreException(
106: "exception while binding to server port", e);
107: }
108: }
109:
110: /**
111: * Get the server addresses from interfaces.
112: */
113: public static String getHostAddresses() {
114: try {
115: Enumeration interfaceEnumeration = NetworkInterface
116: .getNetworkInterfaces();
117: // Copy from enumeration to addresses vector, but filter loopback addresses
118: Vector addresses = new Vector();
119: while (interfaceEnumeration.hasMoreElements()) {
120: NetworkInterface intf = (NetworkInterface) interfaceEnumeration
121: .nextElement();
122: // Remove loopback addresses
123: Enumeration addressEnumeration = intf
124: .getInetAddresses();
125: while (addressEnumeration.hasMoreElements()) {
126: InetAddress address = (InetAddress) addressEnumeration
127: .nextElement();
128: // Insert to addresses only if not loopback
129: if (!address.isLoopbackAddress())
130: addresses.add(address);
131: }
132: }
133: // Pick one address from the remaining address space
134: logger.debug("server available local addresses: "
135: + addresses);
136: // Now, multiple addresses are in the list, so copy all of them
137: // into the result string.
138: StringBuffer ips = new StringBuffer();
139: for (int i = 0; i < addresses.size(); i++) {
140: InetAddress address = (InetAddress) addresses.get(i);
141: if (ips.length() > 0)
142: ips.append(",");
143: ips.append(address.getHostAddress());
144: }
145: return ips.toString();
146: } catch (StoreException e) {
147: throw e;
148: } catch (Exception e) {
149: throw new StoreException(
150: "exception while determining server address", e);
151: }
152: }
153:
154: /**
155: * Disconnect the server.
156: */
157: public void disconnect() {
158: // Set to not running
159: active = false;
160: running = false;
161: // Close server socket
162: try {
163: serverSocket.close();
164: } catch (Exception e) {
165: logger.error("error while disconnecting server socket", e);
166: }
167: // Close all client sockets
168: synchronized (clientHandlers) {
169: for (int i = 0; i < clientHandlers.size(); i++) {
170: ClientHandler handler = (ClientHandler) clientHandlers
171: .get(i);
172: handler.disconnect();
173: }
174: }
175: }
176:
177: /**
178: * Run listener thread. This thread accepts incoming connections,
179: * and incoming socket data.
180: */
181: public void run() {
182: try {
183: while (running) {
184: // Listen for socket
185: Socket socket = serverSocket.accept();
186: logger.debug("server received connect from client...");
187: // Enter into client sockets
188: ClientHandler handler = new ClientHandler(socket);
189: synchronized (clientHandlers) {
190: clientHandlers.add(handler);
191: }
192: // Run thread to handle socket
193: Thread handlerThread = new Thread(handler);
194: handlerThread.setName("Persistence-handler");
195: handlerThread.setDaemon(true);
196: handlerThread.start();
197: }
198: } catch (Exception e) {
199: if (running) // If it should be running, set node manager to new state
200: {
201: logger.warn("server socket threw error", e);
202: context.getNodeManager().ensureState(
203: NodeManager.STATE_UNINITIALIZED);
204: } else {
205: logger.debug("server socket was shutdown: "
206: + e.getMessage());
207: }
208: } finally {
209: try {
210: serverSocket.close();
211: } catch (Exception e) {
212: logger
213: .warn(
214: "error while closing socket, may already been closed",
215: e);
216: }
217: }
218: }
219:
220: /**
221: * Client handler class runs on a separate thread, and handles
222: * communication to a specific client. Note that the whole
223: * communication could be implemented using nio, but the client
224: * <strong>should</strong> be handled on another thread, because
225: * the communication is a synchron request-response protocol.
226: */
227: public class ClientHandler implements Runnable {
228: private Socket socket;
229: private ObjectInputStream oInput;
230: private ObjectOutputStream oOutput;
231: private int index;
232:
233: public ClientHandler(Socket socket) {
234: this .socket = socket;
235: }
236:
237: /**
238: * Send an object to the client.
239: */
240: public synchronized void sendObject(CommObject obj)
241: throws IOException {
242: if (logger.isDebugEnabled())
243: logger.debug("outgoing message to: " + index
244: + ", object: " + obj);
245: // Write to output
246: oOutput.writeObject(obj);
247: }
248:
249: /**
250: * Handle a single request object.
251: */
252: public CommObject handleObject(CommObject obj) {
253: if (logger.isDebugEnabled())
254: logger.debug("incoming message from: " + index
255: + ", object: " + obj);
256: try {
257: // Initialization message
258: if (obj instanceof InitMessage) {
259: index = ((InitMessage) obj).getNodeIndex();
260: // If not active, maybe the client node detected earlier
261: // that the current server node is out, so check with
262: // a re-connect. If the server node did not die, we loose
263: // our cache and current transactions! But if the server
264: // node really died, we reconnect and accept the client's
265: // init request immediately.
266: if (!active) {
267: logger
268: .info("server got init request from client: "
269: + index
270: + ", try to reconnect to server to check, whether it's available");
271: context.getNodeManager().ensureState(
272: NodeManager.STATE_INITIALIZED);
273: context.getNodeManager().ensureState(
274: NodeManager.STATE_CONNECTED);
275: }
276: // If active, accept init request
277: if (!active)
278: return new GenericResponse(
279: GenericResponse.SERVER_INACTIVE); // Server not active
280: else
281: return new SerialResponse(
282: GenericResponse.ACTION_SUCCESS,
283: getNextSerial()); // Ok
284: }
285: // Serial request
286: if (obj instanceof SerialRequest) {
287: return new SerialResponse(
288: GenericResponse.ACTION_SUCCESS,
289: getNextSerial());
290: }
291: // Update cache message
292: if (obj instanceof CacheUpdateRequest) {
293: CacheUpdateRequest req = (CacheUpdateRequest) obj;
294: context.getCache().updateEntries(
295: req.getTableName(), req.getSerial());
296: broadcastObject(new CacheUpdateRequest(req
297: .getTableName(), req.getSerial()));
298: }
299: // Lock message
300: if (obj instanceof LockRequest) {
301: // Only "remote" lock tracker needs to be asked, because
302: // all tracked objects (local also) will be present
303: LockRequest req = (LockRequest) obj;
304: SessionInfo info = remoteLockTracker.lock(index,
305: req.getThreadId(), req.getTxSerial(), req
306: .getMetas(), req.getSessionInfo(),
307: req.getWait(), req.getEnsureCurrent());
308: if (info != null)
309: return new LockResponse(
310: GenericResponse.ALREADY_LOCKED, info);
311: else
312: return new LockResponse(
313: GenericResponse.ACTION_SUCCESS, null);
314: }
315: // Unlock
316: if (obj instanceof UnlockRequest) {
317: UnlockRequest req = (UnlockRequest) obj;
318: remoteLockTracker.unlock(index, req.getThreadId(),
319: req.getTxSerial(), req.getMetas());
320: }
321: // Wait for query
322: if (obj instanceof QueryRequest) {
323: QueryRequest req = (QueryRequest) obj;
324: commitAndQueryTracker.waitForQuery(req.getSerial());
325: }
326: // Start query
327: if (obj instanceof CommitStartRequest) {
328: CommitStartRequest req = (CommitStartRequest) obj;
329: return new SerialResponse(
330: GenericResponse.ACTION_SUCCESS,
331: commitAndQueryTracker.startCommit(index));
332: }
333: // End query
334: if (obj instanceof CommitEndRequest) {
335: CommitEndRequest req = (CommitEndRequest) obj;
336: endCommit(index, req.getSerial());
337: }
338: // Change management
339: if (obj instanceof NotifyChangeRequest) {
340: NotifyChangeRequest req = (NotifyChangeRequest) obj;
341: notifyChange(req.getMetas(), req.getEndSerial(),
342: req.getTxSerial());
343: }
344: } catch (Exception e) {
345: logger.warn("message request was not successful.", e);
346: return new GenericResponse(
347: GenericResponse.UNEXPECTED_ERROR);
348: }
349: return null;
350: }
351:
352: /**
353: * Disconnect socket.
354: */
355: public void disconnect() {
356: try {
357: oInput.close();
358: oOutput.close();
359: socket.close();
360: } catch (Exception e) {
361: logger.warn("error while disconnecting client", e);
362: }
363: }
364:
365: /**
366: * Handle communication in this thread.
367: */
368: public void run() {
369: try {
370: // Allocate streams
371: oInput = new ObjectInputStream(socket.getInputStream());
372: oOutput = new ObjectOutputStream(socket
373: .getOutputStream());
374: // Wait for objects and send responses
375: CommObject obj = null;
376: while ((obj = (CommObject) oInput.readObject()) != null) {
377: CommObject response = handleObject(obj);
378: if (response != null)
379: sendObject(response);
380: else
381: sendObject(new GenericResponse(0));
382: }
383: } catch (Exception e) {
384: if (index == 0)
385: logger
386: .debug(
387: "there was a connection with no communication, probably an empty connect for testing",
388: e);
389: else
390: logger.error(
391: "error while communication with client: "
392: + index, e);
393: // Close socket
394: try {
395: socket.close();
396: } catch (Exception f) {
397: logger.error(
398: "error while closing client socket, client: "
399: + index, f);
400: }
401: } finally {
402: // Remove from socket handlers
403: synchronized (clientHandlers) {
404: clientHandlers.remove(this );
405: }
406: // Relinquish all locks associated with this client
407: remoteLockTracker.unlockAll(index);
408: commitAndQueryTracker.endAllCommits(index);
409: }
410: }
411: }
412:
413: /*
414: * Implementing the service provider interface.
415: */
416:
417: /**
418: * Get a new serial number.
419: */
420: public Long getNextSerial() {
421: return context.getSerialTracker().getNextSerial();
422: }
423:
424: /**
425: * Update entries of client caches according to received parameters.
426: */
427: public void updateEntries(String tableName, Long serial) {
428: // Remote caches
429: broadcastObject(new CacheUpdateRequest(tableName, serial));
430: }
431:
432: /**
433: * Lock an object.
434: */
435: public SessionInfo lock(int index, long threadId, long txSerial,
436: List metas, SessionInfo info, int wait,
437: boolean ensureCurrent) {
438: logger.debug("server locking with: " + metas.size()
439: + " objects, wait: " + wait);
440: SessionInfo result = remoteLockTracker.lock(index, threadId,
441: txSerial, metas, info, wait, ensureCurrent);
442: return result;
443: }
444:
445: /**
446: * Unlock object.
447: */
448: public void unlock(int index, long threadId, long txSerial,
449: List metas) {
450: remoteLockTracker.unlock(index, threadId, txSerial, metas);
451: }
452:
453: /**
454: * Wait for a query to execute with the given serial.
455: * This method returns, if all commits before the given serial
456: * are finished.
457: */
458: public void waitForQuery(Long serial) {
459: commitAndQueryTracker.waitForQuery(serial);
460: }
461:
462: /**
463: * Wait for starting a commit. The commit can start if
464: * there are no queries executed with greater serial.
465: * @return The serial the commit can run with.
466: */
467: public Long startCommit(int index) {
468: return commitAndQueryTracker.startCommit(index);
469: }
470:
471: /**
472: * Mark the end of a commit phase.
473: */
474: public void endCommit(int index, Long serial) {
475: commitAndQueryTracker.endCommit(index, serial);
476: modificationTracker.endTransaction(serial);
477: }
478:
479: /**
480: * Add a change notification to change tracker.
481: */
482: public void notifyChange(List metas, Long endSerial, Long txSerial) {
483: modificationTracker
484: .changeCandidates(metas, endSerial, txSerial);
485: }
486:
487: }
|