001: /**
002: * Copyright (C) 2006 NetMind Consulting Bt.
003: *
004: * This library is free software; you can redistribute it and/or
005: * modify it under the terms of the GNU Lesser General Public
006: * License as published by the Free Software Foundation; either
007: * version 3 of the License, or (at your option) any later version.
008: *
009: * This library is distributed in the hope that it will be useful,
010: * but WITHOUT ANY WARRANTY; without even the implied warranty of
011: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
012: * Lesser General Public License for more details.
013: *
014: * You should have received a copy of the GNU Lesser General Public
015: * License along with this library; if not, write to the Free Software
016: * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
017: */package hu.netmind.persistence.node;
018:
019: import hu.netmind.persistence.*;
020: import hu.netmind.persistence.event.*;
021: import java.net.*;
022: import java.io.*;
023: import java.util.*;
024: import org.apache.log4j.Logger;
025:
026: /**
027: * A node service provider implmenetation, which uses a connection
028: * to a server node to delegate functionality.
029: * @author Brautigam Robert
030: * @version Revision: $Revision$
031: */
032: public class NodeClient implements ServiceProvider {
033: private static Logger logger = Logger.getLogger(NodeClient.class);
034:
035: private static int SOCKET_CONNECT_TIMEOUT = 3000;
036: private StoreContext context;
037: private Socket socket;
038: private ObjectOutputStream oOutput;
039: private ObjectInputStream oInput;
040: private String hostips;
041: private int hostport;
042: private boolean connected = false;
043: private GenericResponse responseObject;
044: private Object responseSemaphore = new Object();
045:
046: public NodeClient(StoreContext context) {
047: this .context = context;
048: }
049:
050: /**
051: * Connect to server.
052: */
053: public synchronized void connect(String ips, int port) {
054: // Memorize passed arguments for re-connect
055: this .hostips = ips;
056: try {
057: if ("".equals(ips))
058: hostips = InetAddress.getLocalHost().getHostAddress();
059: } catch (Exception e) {
060: throw new StoreException(
061: "can not determine local adapter, but there is another node, which would need to be contacted.",
062: e);
063: }
064: this .hostport = port;
065: // Connect
066: connect();
067: }
068:
069: /**
070: * Send a message to the server and wait for a response.
071: * If the connection is lost, a re-connect is tried once.
072: */
073: public synchronized GenericResponse sendObject(CommObject obj) {
074: if (logger.isDebugEnabled())
075: logger.debug("outgoing message to server, object: " + obj);
076: // Do communication
077: try {
078: // Send message
079: oOutput.writeObject(obj);
080: // Wait for answer
081: GenericResponse response = null;
082: synchronized (responseSemaphore) {
083: if (responseObject == null) {
084: logger.debug("no response, entering wait...");
085: responseSemaphore.wait();
086: }
087: if (logger.isDebugEnabled()) {
088: if (responseObject == null)
089: logger
090: .debug("client received null object, probably shutting down, or error occured.");
091: else
092: logger.debug("client received object: "
093: + responseObject);
094: }
095: response = responseObject;
096: responseObject = null;
097: }
098: // If shutting down, then send exception
099: if (response == null)
100: throw new StoreException(
101: "client thread is shutting down, so no response will be available.");
102: // Return with result
103: return response;
104: } catch (StoreException e) {
105: throw e;
106: } catch (Exception e) {
107: context.getNodeManager().ensureState(
108: NodeManager.STATE_INITIALIZED);
109: throw new StoreException("error while communication", e);
110: }
111: }
112:
113: /**
114: * Server notification messages.
115: */
116: public void handleMessage(CommObject obj) {
117: if (logger.isDebugEnabled())
118: logger
119: .debug("incoming message from server, object: "
120: + obj);
121: // Handle broadcast message cache update
122: if (obj instanceof CacheUpdateRequest) {
123: CacheUpdateRequest req = (CacheUpdateRequest) obj;
124: context.getCache().updateEntries(req.getTableName(),
125: req.getSerial());
126: }
127: }
128:
129: /**
130: * Disconnect the client.
131: */
132: public synchronized void disconnect() {
133: logger.debug("explicit disconnecting called.");
134: connected = false;
135: try {
136: socket.close();
137: } catch (Exception e) {
138: logger.error("error while disconnecting client", e);
139: }
140: }
141:
142: /**
143: * Determine if an address is available.
144: */
145: public static boolean isAlive(String ips, int port) {
146: try {
147: if ("".equals(ips))
148: ips = InetAddress.getLocalHost().getHostAddress();
149: } catch (Exception e) {
150: throw new StoreException(
151: "can not determine local adapter, but there is another node, which would need to be contacted.",
152: e);
153: }
154: StringTokenizer tokens = new StringTokenizer(ips, ",");
155: while (tokens.hasMoreTokens()) {
156: String ip = tokens.nextToken();
157: try {
158: Socket socket = new Socket();
159: socket.connect(new InetSocketAddress(ip, port),
160: SOCKET_CONNECT_TIMEOUT);
161: socket.close();
162: return true; // Success, so return true
163: } catch (Exception e) {
164: if (!tokens.hasMoreTokens())
165: return false; // No more possiblilities so return false
166: }
167: }
168: return false;
169: }
170:
171: /**
172: * Connect to the host specified by the class variables.
173: * The connect is only successful, if the first "init"
174: * message receives a correct answer.
175: */
176: private void connect() {
177: logger.debug("(re)connecting to server: " + hostips + ":"
178: + hostport);
179: try {
180: // Connect physically
181: StringTokenizer tokens = new StringTokenizer(hostips, ",");
182: boolean established = false;
183: while ((!established) && (tokens.hasMoreTokens())) {
184: try {
185: String ip = tokens.nextToken();
186: socket = new Socket();
187: socket.connect(new InetSocketAddress(ip, hostport),
188: SOCKET_CONNECT_TIMEOUT);
189: established = true;
190: logger.debug("established connection with: " + ip
191: + ", out of " + hostips);
192: } catch (Exception e) {
193: if (!tokens.hasMoreTokens())
194: throw e; // If no more ips, throw exception
195: }
196: }
197: // Establish streams
198: oOutput = new ObjectOutputStream(socket.getOutputStream());
199: oInput = new ObjectInputStream(socket.getInputStream());
200: // Start listener thread
201: Thread listenerThread = new Thread(new MessageListener(
202: this , oInput));
203: listenerThread.setName("Persistence-client");
204: listenerThread.setDaemon(true);
205: listenerThread.start();
206: // If connection is established, send and wait for the first
207: // "init" message
208: GenericResponse response = sendObject(new InitMessage(
209: context.getNodeManager().getNodeIndex()));
210: if (response.getResponseCode() != 0) {
211: // Response was not successful, so reset to INITIALIZED state
212: // and connect again
213: context.getNodeManager().ensureState(
214: NodeManager.STATE_INITIALIZED);
215: // Throw exception
216: throw new StoreException(
217: "server was not ready to accept commands.");
218: } else {
219: // Response was successful, extract the serial id, and
220: // adjust our serial tracker
221: context.getSerialTracker().adjustOffset(
222: ((SerialResponse) response).getSerial());
223: }
224: // All ok, then set connected flag
225: connected = true;
226: } catch (StoreException e) {
227: throw e;
228: } catch (Exception e) {
229: throw new StoreException(
230: "exception while trying to connect " + hostips
231: + ":" + hostport, e);
232: }
233: }
234:
235: /**
236: * Class listens for messages from the server.
237: */
238: public class MessageListener implements Runnable {
239: private ObjectInputStream oInput;
240: private NodeClient client;
241:
242: public MessageListener(NodeClient client,
243: ObjectInputStream oInput) {
244: this .oInput = oInput;
245: this .client = client;
246: }
247:
248: public void run() {
249: try {
250: CommObject obj = null;
251: while ((obj = (CommObject) oInput.readObject()) != null) {
252: if (obj instanceof GenericResponse) {
253: // Response arrived
254: synchronized (responseSemaphore) {
255: responseObject = (GenericResponse) obj;
256: responseSemaphore.notifyAll();
257: }
258: } else {
259: // Out of response message
260: handleMessage(obj);
261: }
262: }
263: } catch (Exception e) {
264: // Wake up waiting threads, if there are
265: synchronized (responseSemaphore) {
266: responseObject = null;
267: responseSemaphore.notifyAll();
268: }
269: // Disconnect
270: if (connected) {
271: logger.error(
272: "error while listening for server events.",
273: e);
274: // Synchronized disconnect to other state methods in container class
275: // (prevents deadlocks!)
276: synchronized (client) {
277: context.getNodeManager().ensureState(
278: NodeManager.STATE_INITIALIZED);
279: }
280: } else {
281: logger.debug("disconnected from server", e);
282: }
283: }
284: }
285: }
286:
287: /*
288: * Implementing the service provider interface.
289: */
290:
291: /**
292: * Get the next serial for database functions. Send request to
293: * server, and wait for a serial response.
294: */
295: public Long getNextSerial() {
296: GenericResponse response = sendObject(new SerialRequest());
297: if (response.getResponseCode() != 0)
298: throw new StoreException(
299: "serial request failed, returned code: "
300: + response.getResponseCode());
301: return ((SerialResponse) response).getSerial();
302: }
303:
304: /**
305: * Send cache update request.
306: */
307: public void updateEntries(String tableName, Long serial) {
308: GenericResponse response = sendObject(new CacheUpdateRequest(
309: tableName, serial));
310: if (response.getResponseCode() != 0)
311: throw new StoreException(
312: "cache update request was not successful, error code: "
313: + response.getResponseCode());
314: }
315:
316: /**
317: * Lock an object.
318: */
319: public SessionInfo lock(int index, long threadId, long txSerial,
320: List metas, SessionInfo info, int wait,
321: boolean ensureCurrent) {
322: logger.debug("sending lock request with: " + metas.size()
323: + " objects, wait: " + wait + ", ensure current: "
324: + ensureCurrent);
325: GenericResponse response = sendObject(new LockRequest(threadId,
326: txSerial, metas, info, wait, ensureCurrent));
327: if (response.getResponseCode() == GenericResponse.ALREADY_LOCKED)
328: return ((LockResponse) response).getSessionInfo();
329: if (response.getResponseCode() != GenericResponse.ACTION_SUCCESS)
330: throw new StoreException(
331: "unknown exception while remote locking object, response code: "
332: + response.getResponseCode());
333: return null;
334: }
335:
336: /**
337: * Unlock object.
338: */
339: public void unlock(int index, long threadId, long txSerial,
340: List metas) {
341: GenericResponse response = sendObject(new UnlockRequest(
342: threadId, txSerial, metas));
343: if (response.getResponseCode() != GenericResponse.ACTION_SUCCESS)
344: throw new StoreException(
345: "unknown exception while remote unlocking object, response code: "
346: + response.getResponseCode());
347: }
348:
349: /**
350: * Wait for a query to execute with the given serial.
351: * This method returns, if all commits before the given serial
352: * are finished.
353: */
354: public void waitForQuery(Long serial) {
355: GenericResponse response = sendObject(new QueryRequest(serial));
356: if (response.getResponseCode() != GenericResponse.ACTION_SUCCESS)
357: throw new StoreException(
358: "exception while waiting for query to execute, response code: "
359: + response.getResponseCode());
360: }
361:
362: /**
363: * Wait for starting a commit. The commit can start if
364: * there are no queries executed with greater serial.
365: * @return The serial the commit can run with.
366: */
367: public Long startCommit(int index) {
368: GenericResponse response = (GenericResponse) sendObject(new CommitStartRequest());
369: if (response.getResponseCode() != GenericResponse.ACTION_SUCCESS)
370: throw new StoreException(
371: "exception while waiting commit start, response code: "
372: + response.getResponseCode());
373: return ((SerialResponse) response).getSerial();
374: }
375:
376: /**
377: * Mark the end of a commit phase.
378: */
379: public void endCommit(int index, Long serial) {
380: GenericResponse response = sendObject(new CommitEndRequest(
381: serial));
382: if (response.getResponseCode() != GenericResponse.ACTION_SUCCESS)
383: throw new StoreException(
384: "exception while waiting for commit to end, response code: "
385: + response.getResponseCode());
386: }
387:
388: /**
389: * Notify the server of object changes.
390: */
391: public void notifyChange(List metas, Long endSerial, Long txSerial) {
392: GenericResponse response = sendObject(new NotifyChangeRequest(
393: metas, endSerial, txSerial));
394: if (response.getResponseCode() != GenericResponse.ACTION_SUCCESS)
395: throw new StoreException(
396: "exception while notifying server of changes, response code: "
397: + response.getResponseCode());
398: }
399:
400: static {
401: try {
402: ResourceBundle config = ResourceBundle
403: .getBundle("beankeeper");
404: SOCKET_CONNECT_TIMEOUT = Integer.valueOf(
405: config.getString("net.connect_timeout")).intValue();
406: } catch (Exception e) {
407: logger
408: .error(
409: "could not load configuration, using hardcoded defaults",
410: e);
411: }
412: }
413: }
|