001: package org.jgroups.persistence;
002:
003: /**
004: * @author Mandar Shinde
005: * This class implements the DB storage pattern for the Persistence
006: * Manager interface. The implementation is open and can be used (and
007: * tested) over more than one databases. It uses a string (VARCHAR)
008: * as the key and either BLOB or VARBINARY db-datatype for the
009: * serialized objects. THe user has the option to choose his/her own
010: * schema over the ones provided.
011: */
012:
013: import org.apache.commons.logging.Log;
014: import org.apache.commons.logging.LogFactory;
015:
016: import java.io.*;
017: import java.sql.*;
018: import java.util.*;
019:
020: /**
021: * Class will be utilized
022: */
023: public class DBPersistenceManager implements PersistenceManager {
024:
025: protected final Log log = LogFactory.getLog(this .getClass());
026:
027: /**
028: * Default construct
029: * @param filename absolute filepath
030: * @exception Exception;
031: */
032: public DBPersistenceManager(String filename) throws Exception {
033: String home_dir = null;
034:
035: // PropertyPermission not granted if running in an untrusted environment with JNLP.
036: try {
037: home_dir = System.getProperty("user.home");
038: } catch (SecurityException ex1) {
039: }
040:
041: // 1. Try ${user.home}/persist.properties
042: try {
043: home_dir = home_dir + '/' + filename;
044: init(new FileInputStream(home_dir));
045: return;
046: } catch (Exception ex) {
047: ;
048: }
049:
050: // 2. Try to find persist.properties from somewhere on the CLASSPATH
051: try {
052: InputStream in = DBPersistenceManager.class
053: .getResourceAsStream('/' + filename);
054: if (in != null) {
055: init(in);
056: return;
057: }
058: } catch (Exception x) {
059: if (log.isErrorEnabled())
060: log.error("failed reading database properties from "
061: + filename + ", exception=" + x);
062: }
063:
064: // 3. Finally maybe the user specified -Dpersist.properties=/home/user/mypersist.properties
065: try {
066: home_dir = System.getProperty("persist.properties");
067: init(new FileInputStream(home_dir));
068: return;
069: } catch (Exception ex) {
070: ;
071: }
072:
073: // 4. If none of the above helped us to find persist.properties, give up and throw an exception
074: throw new Exception(
075: "DBPersistenceManager.DBPersistenceManager(): "
076: + "failed reading database properties from "
077: + filename);
078: }
079:
080: /**
081: * Duplicate constructor allowing inputstream
082: * @param input
083: * @exception Exception
084: */
085: public DBPersistenceManager(InputStream input) throws Exception {
086: init(input);
087: }
088:
089: /**
090: * used to intitiailize complete DB access. THis method will use
091: * existing database to create schema (if it doesnt exist) and
092: * get PersistenceManager in usable condition
093: * @param in
094: * @exception Exception;
095: */
096: protected void init(InputStream in) throws Exception {
097: list = new Vector();
098: readProps(in);
099: loadDriver();
100:
101: //check conn
102: Connection conn = this .getConnection();
103: this .closeConnection(conn);
104: createDBTables();
105: retrieveAll(); // work around to make sure, no duplicates are created.
106: log.error(" Done constructing DB Persist Manager");
107: }
108:
109: // TODO list for this implementation
110: // add constructor for xml file
111: // add constructor for default
112:
113: /**
114: * Saves NV pair as serializable object;
115: * creates if new, stores new state if already exists.
116: * @param key
117: * @param val
118: * @exception CannotPersistException;
119: */
120: public void save(Serializable key, Serializable val)
121: throws CannotPersistException {
122: // checking if this is update or new entry
123: if (!entryExists(key)) {
124: log.error(" entry doesnt exist for " + key.toString());
125: try {
126: addNewEntry(key, val);
127: list.add(key.toString());
128: return;
129: } catch (Throwable t1) {
130: t1.printStackTrace();
131: //trace here
132: throw new CannotPersistException(t1,
133: " error adding a completely new entry in to DB ");
134: }
135: }// checking entries
136:
137: // THis is for regular updates to the key,val pair
138: Connection conn = null;
139: PreparedStatement prepStat = null;
140: try {
141: conn = this .getConnection();
142: String keyStr = null;
143: keyStr = key.toString();
144: byte[] keyBytes = getBytes(key);
145: byte[] valBytes = getBytes(val);
146: log.error(" value is " + val);
147: //use simple execute, do not create prepared statement
148: prepStat = conn.prepareStatement(updateStat);
149: prepStat.setString(3, keyStr);
150: prepStat.setBytes(1, keyBytes);
151: prepStat.setBytes(2, valBytes);
152: prepStat.executeQuery();
153: } catch (Throwable t) {
154: //trace here
155: t.printStackTrace();
156: // throw exception here
157: throw new CannotPersistException(t,
158: "error updating an existing entry in to the database ");
159: }
160: // cleanup
161: finally {
162: try {
163: if (prepStat != null)
164: prepStat.close();
165: this .closeConnection(conn);
166: } catch (Throwable t) {
167: // trace
168: conn = null;
169: prepStat = null;
170: }
171: }
172: }
173:
174: /**
175: * Removes existing entry.
176: * @param key
177: * @exception CannotRemoveException;
178: */
179: public Serializable remove(Serializable key)
180: throws CannotRemoveException {
181: Connection conn = null;
182: Statement stat = null;
183: PreparedStatement prepStat = null;
184: ResultSet set = null;
185: Serializable val = null;
186:
187: try {
188: conn = this .getConnection();
189: stat = conn.createStatement();
190: String exQuery = " select * from replhashmap where key like '"
191: + key.toString() + '\'';
192: set = stat.executeQuery(exQuery);
193: set.next();
194: val = getSerializable(set.getBinaryStream(3));
195: } catch (Throwable t3) {
196: //trace
197: t3.printStackTrace();
198: throw new CannotRemoveException(t3,
199: " Error retrieving value for given key");
200: } finally {
201: try {
202: if (prepStat != null)
203: prepStat.close();
204: this .closeConnection(conn);
205: } catch (Throwable t) {
206: // trace
207: conn = null;
208: prepStat = null;
209: }
210: }
211:
212: try {
213: conn = this .getConnection();
214: prepStat = conn.prepareStatement(removeStat);
215: prepStat.setString(1, key.toString());
216: prepStat.executeQuery();
217: list.remove(key.toString());
218: } catch (Throwable t) {
219: //trace here..
220: t.printStackTrace();
221: // throw Exception
222: throw new CannotRemoveException(t,
223: "Could not remove existing entry due to error in jdbc transaction");
224: }
225:
226: // cleanup
227: finally {
228: try {
229: set.close();
230: stat.close();
231: if (prepStat != null)
232: prepStat.close();
233: this .closeConnection(conn);
234: } catch (Throwable t) {
235: // trace
236: conn = null;
237: stat = null;
238: }//end of try..catch
239: }// end of finally..
240: return val;
241: }// end of remove
242:
243: /**
244: * Saves all row entries for the map to DB.
245: * @param map
246: * @exception CannotPersistException;
247: */
248: public synchronized void saveAll(Map map)
249: throws CannotPersistException {
250: Iterator iter = null;
251: try {
252: Set keySet = map.keySet();
253: iter = keySet.iterator();
254: } catch (Throwable t) {
255: t.printStackTrace();
256: //trace here
257: throw new CannotPersistException(t,
258: "Error with the map entered to saveAll");
259: }
260:
261: //Individually saving all
262: while (iter.hasNext()) {
263: try {
264: Serializable key = (Serializable) iter.next();
265: Serializable val = (Serializable) map.get(key);
266:
267: // dont this in same thread, optimization can be added
268: this .save(key, val);
269: } catch (Throwable t2) {
270: t2.printStackTrace();
271: //trace here
272: continue;
273: }
274: }// end of while..
275: }// end of saveall
276:
277: /**
278: * Used to retrieve the persisted map back to its last known state
279: * @return Map;
280: * @exception CannotRetrieveException;
281: */
282: public synchronized Map retrieveAll()
283: throws CannotRetrieveException {
284: Connection conn = null;
285: Statement stat = null;
286: ResultSet set = null;
287: Map map = null;
288: try {
289: conn = this .getConnection();
290: stat = conn.createStatement();
291: set = stat.executeQuery(" select * from replhashmap");
292: map = retrieveAll(set);
293: } catch (Throwable t) {
294: //trace here
295: throw new CannotRetrieveException(
296: t,
297: "Error happened while querying the database for bulk retrieve, try starting DB manually");
298: }
299:
300: //finally
301: try {
302: stat.close();
303: this .closeConnection(conn);
304: } catch (Throwable t1) {
305: // trace it
306: // ignore
307: }
308:
309: return map;
310: }// end of retrieveall
311:
312: /**
313: * Helper method to get get back the map
314: * @return Map;
315: * @exception Exception;
316: */
317: private Map retrieveAll(ResultSet result) throws Exception {
318: HashMap map = new HashMap();
319: while (result.next()) {
320: InputStream inputStrKey = result.getBinaryStream(2);
321: InputStream inputStrVal = result.getBinaryStream(3);
322: Serializable key = getSerializable(inputStrKey);
323: Serializable val = getSerializable(inputStrVal);
324: map.put(key, val);
325: list.add(key.toString());
326: }// end of while..
327: return map;
328: }
329:
330: /**
331: * Clears the key-cache as well as all entries
332: * @exception CannotRemoveException;
333: */
334: public void clear() throws CannotRemoveException {
335: Connection conn = null;
336: Statement stat = null;
337: try {
338: conn = this .getConnection();
339: stat = conn.createStatement();
340: stat.executeQuery("delete from replhashmap");
341: } catch (Throwable t) {
342: //trace here
343: throw new CannotRemoveException(t,
344: " delete all query failed with existing database");
345: }
346:
347: //finally
348: try {
349: stat.close();
350: this .closeConnection(conn);
351: } catch (Throwable t) {
352: conn = null;
353: stat = null;
354: }
355: }
356:
357: /**
358: * Shutting down the database cleanly
359: */
360: public void shutDown() {
361: // non-trivial problem, more research required
362: // no-op for now..
363: }
364:
365: /**
366: * The private interfaces are used specifically to this manager
367: */
368:
369: /**
370: * Used to enter a completely new row in to the current table
371: * @param Serializable; key
372: * @param Serializable; value
373: * @exception CannotPersistException;
374: */
375: private void addNewEntry(Serializable key, Serializable val)
376: throws CannotPersistException, CannotConnectException {
377: Connection conn = getConnection();
378: try {
379: PreparedStatement prepStat = conn
380: .prepareStatement(insertStat);
381: prepStat.setString(1, key.toString());
382: byte[] keyBytes = getBytes(key);
383: byte[] valBytes = getBytes(val);
384: //InputStream keyStream = getBinaryInputStream(key);
385: //InputStream valStream = getBinaryInputStream(val);
386: prepStat.setBytes(2, keyBytes);
387: prepStat.setBytes(3, valBytes);
388: //prepStat.setBinaryStream(keyStream);
389: //prepStat.setBinaryStream(valStream);
390: prepStat.executeQuery();
391: conn.commit();
392: log.error(" executing insert " + insertStat);
393: } catch (Throwable t) {
394: //conn.rollback();
395: t.printStackTrace();
396: //trace here
397: throw new CannotPersistException(t,
398: "error adding new entry using creating Db connection and schema");
399: }
400: }// end of addentry..
401:
402: /**
403: * Gets a binaryinputstream from a serialized object
404: * @param Serializable;
405: * @return BinaryInputStream;
406: * @exception Exception;
407: */
408: private java.io.InputStream getBinaryInputStream(Serializable ser)
409: throws Exception {
410: ByteArrayOutputStream stream = new ByteArrayOutputStream();
411: ObjectOutputStream keyoos = new ObjectOutputStream(stream);
412: keyoos.writeObject(ser);
413: ByteArrayInputStream pipe = new ByteArrayInputStream(stream
414: .toByteArray());
415: return pipe;
416: }// end of stream conversion
417:
418: /**
419: * Gets a serializable back from a InputStream
420: * @param InputStream;
421: * @return Serializable;
422: * @exception Exception;
423: */
424: private Serializable getSerializable(java.io.InputStream stream)
425: throws Exception {
426: ObjectInputStream ooStr = new ObjectInputStream(stream);
427: Serializable tmp = (Serializable) ooStr.readObject();
428: return tmp;
429: }
430:
431: /**
432: * Used to enter a completely new row in to the current table
433: * @param Serializable; key
434: * @param Serializable; value
435: * @exception CannotPersistException;
436: */
437: private void addNewEntryGen(Serializable key, Serializable val)
438: throws CannotPersistException, CannotConnectException {
439: Connection conn = getConnection();
440: try {
441: PreparedStatement prepStat = conn
442: .prepareStatement(insertStat);
443: prepStat.setString(1, key.toString());
444: prepStat.setBytes(2, getBytes(key));
445: prepStat.setBytes(3, getBytes(val));
446: prepStat.executeUpdate();
447: } catch (Throwable t) {
448: //trace here
449: throw new CannotPersistException(t,
450: "error adding new entry using creating Db connection and schema");
451: }
452: }// end of entering new row gen
453:
454: /**
455: * Used to enter a completely new row in to the current table
456: * @param Serializable; key
457: * @param Serializable; value
458: * @exception CannotPersistException;
459: */
460: private void addNewEntryOra(Serializable key, Serializable val)
461: throws CannotPersistException, CannotConnectException {
462: Connection conn = getConnection();
463: try {
464: PreparedStatement prepStat = conn
465: .prepareStatement(insertStat);
466: prepStat.setString(1, key.toString());
467: InputStream keyBin = getBinaryInputStream(key);
468: InputStream keyVal = getBinaryInputStream(val);
469: byte[] keyBytes = getBytes(key);
470: byte[] valBytes = getBytes(val);
471: prepStat.setBytes(2, keyBytes);
472: prepStat.setBytes(3, valBytes);
473: prepStat.executeBatch();
474: } catch (Throwable t) {
475: //trace here
476: throw new CannotPersistException(t,
477: "error adding new entry using creating Db connection and schema");
478: }
479: }// end of entering new row ora
480:
481: /**
482: * Cache checking
483: * @param java.io.Serializable
484: * @return boolean;
485: */
486: private boolean entryExists(Serializable key) {
487: return list.contains(key.toString());
488: }
489:
490: /**
491: * Conversion helper
492: * @param Serializable;
493: * @return byte[];
494: */
495: private byte[] getBytes(Serializable ser) throws Exception {
496: ByteArrayOutputStream stream = new ByteArrayOutputStream();
497: ObjectOutputStream keyoos = new ObjectOutputStream(stream);
498: keyoos.writeObject(ser);
499: byte[] keyBytes = stream.toByteArray();
500: return keyBytes;
501: }// end of getBytes
502:
503: /**
504: * ALL IMPL below is for INIT purposes
505: */
506:
507: /**
508: * This method will be invoked by defauly by each persistence
509: * manager to read from a default location or one provided by
510: * the caller.
511: * @return void;
512: * @exception Exception;
513: */
514: private void readProps(String filePath) throws Exception {
515: FileInputStream _stream = new FileInputStream(filePath);
516: props = new Properties();
517: props.load(_stream);
518:
519: // using properties to set most used variables
520: driverName = props.getProperty("jdbc.Driver");
521: connStr = props.getProperty("jdbc.Conn").trim();
522: userName = props.getProperty("jdbc.User").trim();
523: userPass = props.getProperty("jdbc.Pass").trim();
524: createTable = props.getProperty("jdbc.table").trim();
525: }
526:
527: /**
528: * Duplicate reader using stream instead of dile
529: * @param InputStream;
530: * @exception Exception;
531: */
532: private void readProps(InputStream input) throws Exception {
533: props = new Properties();
534: props.load(input);
535:
536: // using properties to set most used variables
537: driverName = props.getProperty("jdbc.Driver");
538: connStr = props.getProperty("jdbc.Conn");
539: userName = props.getProperty("jdbc.User");
540: userPass = props.getProperty("jdbc.Pass");
541: createTable = props.getProperty("jdbc.table");
542: }
543:
544: /**
545: * Loads the driver using the driver class name. Drivers can be simply
546: * loaded by loading the class or by registering specifically using the
547: * JDBC DriverManager
548: * @return void;
549: * @exception Exception;
550: */
551: private void loadDriver() throws Exception {
552: // driver classes when loaded load the driver into VM
553: Class.forName(driverName);
554: }
555:
556: /**
557: * Once the driver is loaded, the DB is ready to be connected. This
558: * method provides a handle to connect to the DB.
559: * @return Connection;
560: * @exception CannotConnectException;
561: */
562: private Connection getConnection() throws CannotConnectException {
563: try {
564: connStr = connStr.trim();
565: Connection conn = DriverManager.getConnection(connStr,
566: userName, userPass);
567: if (log.isInfoEnabled())
568: log.info("userName=" + userName + ", userPass="
569: + userPass + ", connStr=" + connStr);
570: return conn;
571: } catch (Throwable t) {
572: t.printStackTrace();
573: //trace here
574: throw new CannotConnectException(t,
575: "Error in creating connection using provided properties ");
576: }
577: }// end of get conn..
578:
579: /**
580: * Method is used for closing created connection.
581: * Pooling is not implemented currently, but will be made available
582: * as soon as this manager uses large number of transactions
583: * @param Connection
584: */
585: private void closeConnection(Connection conn) {
586: try {
587: if (conn != null) {
588: conn.close();
589: conn = null;
590: }
591: } catch (Throwable t) {
592: //trace here
593: conn = null;
594: }
595: }// end of closeConn
596:
597: /**
598: * Used to create table provided the DB instance
599: * @exception CannotCreateSchemaException;
600: * @exception CannotConnectException;
601: */
602: private void createDBTables() throws CannotCreateSchemaException,
603: CannotConnectException {
604: Connection conn = this .getConnection();
605: Statement stat = null;
606: try {
607: stat = conn.createStatement();
608: } catch (Exception e) {
609: //trace here..
610: e.printStackTrace();
611: throw new CannotConnectException(
612: e,
613: "there was an error in creating statements for persisting data using created connection");
614: }
615: try {
616: ResultSet set = stat
617: .executeQuery("select * from replhashmap");
618: } catch (Throwable t) {
619: t.printStackTrace();
620: //use connection to create new statement
621: addSchemaToDB(conn);
622: }// end of out throwable..
623: }// end of method..
624:
625: /**
626: * used to create required table within the DB
627: * @param Connection;
628: * @exception CannotCreateSchema;
629: */
630: private void addSchemaToDB(Connection conn)
631: throws CannotCreateSchemaException {
632: Statement stat = null;
633: Statement stat2 = null;
634: try {
635:
636: stat = conn.createStatement();
637: log.error(" executing query for oracle " + createTable);
638: stat.executeQuery(createTable);
639: } catch (Throwable t) {
640: t.printStackTrace();
641: // trace here
642: throw new CannotCreateSchemaException(t,
643: "error was using schema with blobs");
644: }// end of catch
645:
646: // clean up is required after init
647: finally {
648: try {
649: if (stat != null)
650: stat.close();
651: this .closeConnection(conn);
652: } catch (Throwable t3) {
653: }
654: }// end of finally..
655: }// end of gen schema..
656:
657: private Properties props = null;
658: private String driverName = null;
659: private String userName = null;
660: private String userPass = null;
661: private String connStr = null;
662: private String createTable = null;
663: private final boolean oracleDB = false;
664: private Vector list = null;
665:
666: private static final String tabName = "replhashmap";
667: private static final String insertStat = "insert into replhashmap(key, keyBin, valBin) values (?, ?, ?)";
668: private static final String updateStat = "update replhashmap set keyBin = ?, valBin = ? where key like ?";
669: private static final String removeStat = " delete from replhashmap where key like ?";
670: private static final String createTableGen = " create table replhashmap(key varchar, keyBin varbinary, valBin varbinary)";
671: private static final String createTableOra = " create table replhashmap ( key varchar2(100), keyBin blob, valBin blob)";
672: }
|