001: /*
002: * Copyright (c) 1998-2008 Caucho Technology -- all rights reserved
003: *
004: * This file is part of Resin(R) Open Source
005: *
006: * Each copy or derived work must preserve the copyright notice and this
007: * notice unmodified.
008: *
009: * Resin Open Source is free software; you can redistribute it and/or modify
010: * it under the terms of the GNU General Public License as published by
011: * the Free Software Foundation; either version 2 of the License, or
012: * (at your option) any later version.
013: *
014: * Resin Open Source is distributed in the hope that it will be useful,
015: * but WITHOUT ANY WARRANTY; without even the implied warranty of
016: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE, or any warranty
017: * of NON-INFRINGEMENT. See the GNU General Public License for more
018: * details.
019: *
020: * You should have received a copy of the GNU General Public License
021: * along with Resin Open Source; if not, write to the
022: *
023: * Free Software Foundation, Inc.
024: * 59 Temple Place, Suite 330
025: * Boston, MA 02111-1307 USA
026: *
027: * @author Scott Ferguson
028: */
029:
030: package com.caucho.server.cluster;
031:
032: import com.caucho.config.ConfigException;
033: import com.caucho.db.jdbc.DataSourceImpl;
034: import com.caucho.util.Alarm;
035: import com.caucho.util.FreeList;
036: import com.caucho.util.L10N;
037: import com.caucho.util.Log;
038: import com.caucho.vfs.Path;
039: import com.caucho.vfs.ReadStream;
040: import com.caucho.vfs.WriteStream;
041:
042: import javax.sql.DataSource;
043: import java.io.IOException;
044: import java.io.InputStream;
045: import java.sql.Connection;
046: import java.sql.PreparedStatement;
047: import java.sql.ResultSet;
048: import java.sql.SQLException;
049: import java.sql.Statement;
050: import java.util.logging.Level;
051: import java.util.logging.Logger;
052:
053: /**
054: * Manages the backing for the file store.
055: */
056: public class FileBacking {
057: private static final L10N L = new L10N(FileBacking.class);
058: private static final Logger log = Logger
059: .getLogger(FileBacking.class.getName());
060:
061: private FreeList<ClusterConnection> _freeConn = new FreeList<ClusterConnection>(
062: 32);
063:
064: private String _name;
065:
066: private Path _path;
067:
068: private DataSource _dataSource;
069:
070: private String _tableName;
071: private String _loadQuery;
072: private String _updateQuery;
073: private String _accessQuery;
074: private String _setExpiresQuery;
075: private String _insertQuery;
076: private String _invalidateQuery;
077: private String _timeoutQuery;
078: private String _dumpQuery;
079: private String _countQuery;
080:
081: public FileBacking() {
082: }
083:
084: public FileBacking(String name) {
085: _name = name;
086: }
087:
088: /**
089: * Returns the path to the directory.
090: */
091: public Path getPath() {
092: return _path;
093: }
094:
095: /**
096: * Sets the path to the saved file.
097: */
098: public void setPath(Path path) {
099: _path = path;
100: }
101:
102: /**
103: * Sets the table name
104: */
105: public void setTableName(String table) {
106: _tableName = table;
107: }
108:
109: public boolean init(int clusterLength) throws Exception {
110: if (_path == null)
111: throw new ConfigException(L.l("file-backing needs path."));
112:
113: if (_tableName == null)
114: throw new ConfigException(L
115: .l("file-backing needs tableName."));
116:
117: int length = clusterLength;
118:
119: if (length <= 0)
120: length = 1;
121:
122: _loadQuery = "SELECT access_time,data FROM " + _tableName
123: + " WHERE id=? AND is_valid=1";
124: _insertQuery = ("INSERT into "
125: + _tableName
126: + " (id,is_valid,data,mod_time,access_time,expire_interval,server1,server2,server3) " + "VALUES(?,1,?,?,?,?,?,?,?)");
127: _updateQuery = "UPDATE "
128: + _tableName
129: + " SET data=?, mod_time=?, access_time=?,is_valid=1 WHERE id=?";
130: _accessQuery = "UPDATE " + _tableName
131: + " SET access_time=? WHERE id=?";
132: _setExpiresQuery = "UPDATE " + _tableName
133: + " SET expire_interval=? WHERE id=?";
134: _invalidateQuery = "UPDATE " + _tableName
135: + " SET is_valid=0 WHERE id=?";
136:
137: // access window is 1/4 the expire interval
138: _timeoutQuery = ("DELETE FROM " + _tableName + " WHERE access_time + 5 * expire_interval / 4 < ?");
139:
140: _dumpQuery = ("SELECT id, is_valid, expire_interval, data"
141: + " FROM " + _tableName + " WHERE ? <= mod_time AND " + " (?=server1 OR ?=server2 OR ?=server3)");
142:
143: _countQuery = "SELECT count(*) FROM " + _tableName;
144:
145: try {
146: _path.mkdirs();
147: } catch (IOException e) {
148: }
149:
150: DataSourceImpl dataSource = new DataSourceImpl();
151: dataSource.setPath(_path);
152: dataSource.setRemoveOnError(true);
153: dataSource.init();
154:
155: _dataSource = dataSource;
156:
157: initDatabase();
158:
159: return true;
160: }
161:
162: /**
163: * Returns the data source.
164: */
165: public DataSource getDataSource() {
166: return _dataSource;
167: }
168:
169: /**
170: * Create the database, initializing if necessary.
171: */
172: private void initDatabase() throws Exception {
173: Connection conn = _dataSource.getConnection();
174:
175: try {
176: Statement stmt = conn.createStatement();
177:
178: boolean hasDatabase = false;
179:
180: try {
181: String sql = "SELECT expire_interval,is_valid FROM "
182: + _tableName + " WHERE 1=0";
183:
184: ResultSet rs = stmt.executeQuery(sql);
185: rs.next();
186: rs.close();
187:
188: return;
189: } catch (Exception e) {
190: log.log(Level.FINEST, e.toString(), e);
191: log.finer(this + " " + e.toString());
192: }
193:
194: try {
195: stmt.executeQuery("DROP TABLE " + _tableName);
196: } catch (Exception e) {
197: log.log(Level.FINEST, e.toString(), e);
198: }
199:
200: String sql = ("CREATE TABLE " + _tableName + " (\n"
201: + " id VARBINARY(64) PRIMARY KEY,\n"
202: + " is_valid BIT,\n" + " data BLOB,\n"
203: + " expire_interval INTEGER,\n"
204: + " access_time INTEGER,\n"
205: + " mod_time INTEGER,\n" + " mod_count BIGINT,\n"
206: + " server1 INTEGER,\n" + " server2 INTEGER,\n" + " server3 INTEGER)");
207:
208: log.fine(sql);
209:
210: stmt.executeUpdate(sql);
211: } finally {
212: conn.close();
213: }
214: }
215:
216: public long start() throws Exception {
217: long delta = -Alarm.getCurrentTime();
218:
219: Connection conn = null;
220: try {
221: conn = _dataSource.getConnection();
222:
223: Statement stmt = conn.createStatement();
224:
225: String sql = "SELECT MAX(access_time) FROM " + _tableName;
226:
227: ResultSet rs = stmt.executeQuery(sql);
228:
229: if (rs.next())
230: delta = rs.getInt(1) * 60000L - Alarm.getCurrentTime();
231: } finally {
232: if (conn != null)
233: conn.close();
234: }
235:
236: return delta;
237: }
238:
239: /**
240: * Clears the old objects.
241: */
242: public void clearOldObjects(long maxIdleTime) throws SQLException {
243: Connection conn = null;
244:
245: try {
246: if (maxIdleTime > 0) {
247: conn = _dataSource.getConnection();
248:
249: PreparedStatement pstmt = conn
250: .prepareStatement(_timeoutQuery);
251:
252: long now = Alarm.getCurrentTime();
253: int nowMinute = (int) (now / 60000L);
254:
255: pstmt.setInt(1, nowMinute);
256:
257: int count = pstmt.executeUpdate();
258:
259: // System.out.println("OBSOLETE:" + count);
260:
261: if (count > 0)
262: log.fine(this + " purged " + count
263: + " old sessions");
264:
265: pstmt.close();
266: }
267: } finally {
268: if (conn != null)
269: conn.close();
270: }
271: }
272:
273: /**
274: * Load the session from the jdbc store.
275: *
276: * @param session the session to fill.
277: *
278: * @return true if the load was valid.
279: */
280: public boolean loadSelf(ClusterObject clusterObj, Object obj)
281: throws Exception {
282: String uniqueId = clusterObj.getUniqueId();
283:
284: ClusterConnection conn = getConnection();
285: try {
286: PreparedStatement stmt = conn.prepareLoad();
287: stmt.setString(1, uniqueId);
288:
289: ResultSet rs = stmt.executeQuery();
290: boolean validLoad = false;
291:
292: if (rs.next()) {
293: //System.out.println("LOAD: " + uniqueId);
294: long accessTime = rs.getInt(1) * 60000L;
295:
296: InputStream is = rs.getBinaryStream(2);
297:
298: if (log.isLoggable(Level.FINE))
299: log.fine(this + " load " + uniqueId);
300:
301: validLoad = clusterObj.load(is, obj);
302:
303: if (validLoad)
304: clusterObj.setAccessTime(accessTime);
305:
306: is.close();
307: } else if (log.isLoggable(Level.FINE))
308: log.fine(this + " load: no local object loaded for "
309: + uniqueId);
310: else {
311: // System.out.println("NO-LOAD: " + uniqueId);
312: }
313:
314: rs.close();
315:
316: return validLoad;
317: } finally {
318: conn.close();
319: }
320: }
321:
322: /**
323: * Updates the object's access time.
324: *
325: * @param obj the object to store.
326: */
327: public void updateAccess(String uniqueId) throws Exception {
328: ClusterConnection conn = getConnection();
329:
330: try {
331: PreparedStatement stmt = conn.prepareAccess();
332:
333: long now = Alarm.getCurrentTime();
334: int nowMinutes = (int) (now / 60000L);
335: stmt.setInt(1, nowMinutes);
336: stmt.setString(2, uniqueId);
337:
338: int count = stmt.executeUpdate();
339:
340: if (count > 0) {
341: if (log.isLoggable(Level.FINE))
342: log.fine(this + " access " + uniqueId);
343: return;
344: }
345: } finally {
346: conn.close();
347: }
348: }
349:
350: /**
351: * Sets the object's expire_interval.
352: *
353: * @param obj the object to store.
354: */
355: public void setExpireInterval(String uniqueId, long expireInterval)
356: throws Exception {
357: ClusterConnection conn = getConnection();
358:
359: try {
360: PreparedStatement stmt = conn.prepareSetExpireInterval();
361:
362: int expireMinutes = (int) (expireInterval / 60000L);
363: stmt.setInt(1, expireMinutes);
364: stmt.setString(2, uniqueId);
365:
366: int count = stmt.executeUpdate();
367:
368: if (count > 0) {
369: if (log.isLoggable(Level.FINE))
370: log.fine(this + " set expire interval: " + uniqueId
371: + " " + expireInterval);
372: return;
373: }
374: } finally {
375: conn.close();
376: }
377: }
378:
379: /**
380: * Removes the named object from the store.
381: */
382: public void remove(String uniqueId) throws Exception {
383: ClusterConnection conn = getConnection();
384:
385: try {
386: PreparedStatement pstmt = conn.prepareInvalidate();
387: pstmt.setString(1, uniqueId);
388:
389: int count = pstmt.executeUpdate();
390:
391: if (log.isLoggable(Level.FINE))
392: log.fine(this + " remove " + uniqueId);
393: } finally {
394: conn.close();
395: }
396: }
397:
398: /**
399: * Reads from the store.
400: */
401: public long read(String uniqueId, WriteStream os)
402: throws IOException {
403: Connection conn = null;
404: try {
405: conn = _dataSource.getConnection();
406:
407: PreparedStatement pstmt = conn.prepareStatement(_loadQuery);
408: pstmt.setString(1, uniqueId);
409:
410: ResultSet rs = pstmt.executeQuery();
411: if (rs.next()) {
412: long accessTime = rs.getInt(1) * 60000L;
413:
414: InputStream is = rs.getBinaryStream(2);
415:
416: os.writeStream(is);
417:
418: is.close();
419:
420: return accessTime;
421: }
422: } catch (SQLException e) {
423: log.log(Level.FINE, e.toString(), e);
424: } finally {
425: try {
426: if (conn != null)
427: conn.close();
428: } catch (SQLException e) {
429: }
430: }
431:
432: return -1;
433: }
434:
435: /**
436: * Stores the cluster object on the local store.
437: *
438: * @param uniqueId the object's unique id.
439: * @param id the input stream to the serialized object
440: * @param length the length object the serialized object
441: * @param expireInterval how long the object lives w/o access
442: */
443: public void storeSelf(String uniqueId, ReadStream is, int length,
444: long expireInterval, int primary, int secondary,
445: int tertiary) {
446: ClusterConnection conn = null;
447:
448: try {
449: conn = getConnection();
450: // Try to update first, and insert if fail.
451: // The binary stream can be reused because it won't actually be
452: // read on a failure
453:
454: if (storeSelfUpdate(conn, uniqueId, is, length)) {
455: } else if (storeSelfInsert(conn, uniqueId, is, length,
456: expireInterval, primary, secondary, tertiary)) {
457: } else {
458: // XXX: For now, avoid this case since the self-update query doesn't
459: // check for any update count, i.e. it can't tell which update is
460: // the most recent. Also, the input stream would need to change
461: // to a tempStream to allow the re-write
462:
463: /*
464: if (storeSelfUpdate(conn, uniqueId, is, length)) {
465: // The second update is for the rare case where
466: // two threads try to update the database simultaneously
467: }
468: else {
469: log.fine(L.l("Can't store session {0}", uniqueId));
470: }
471: */
472: }
473: } catch (SQLException e) {
474: log.log(Level.FINE, e.toString(), e);
475: } finally {
476: if (conn != null)
477: conn.close();
478: }
479: }
480:
481: /**
482: * Stores the cluster object on the local store using an update query.
483: *
484: * @param conn the database connection
485: * @param uniqueId the object's unique id.
486: * @param id the input stream to the serialized object
487: * @param length the length object the serialized object
488: */
489: private boolean storeSelfUpdate(ClusterConnection conn,
490: String uniqueId, ReadStream is, int length) {
491: try {
492: PreparedStatement stmt = conn.prepareUpdate();
493: stmt.setBinaryStream(1, is, length);
494:
495: long now = Alarm.getCurrentTime();
496: int nowMinutes = (int) (now / 60000L);
497: stmt.setInt(2, nowMinutes);
498: stmt.setInt(3, nowMinutes);
499: stmt.setString(4, uniqueId);
500:
501: int count = stmt.executeUpdate();
502:
503: if (count > 0) {
504: if (log.isLoggable(Level.FINE))
505: log.fine(this + " update " + uniqueId + " length:"
506: + length);
507:
508: return true;
509: }
510: } catch (SQLException e) {
511: log.log(Level.WARNING, e.toString(), e);
512: }
513:
514: return false;
515: }
516:
517: private boolean storeSelfInsert(ClusterConnection conn,
518: String uniqueId, ReadStream is, int length,
519: long expireInterval, int primary, int secondary,
520: int tertiary) {
521: try {
522: PreparedStatement stmt = conn.prepareInsert();
523:
524: stmt.setString(1, uniqueId);
525:
526: stmt.setBinaryStream(2, is, length);
527:
528: int nowMinutes = (int) (Alarm.getCurrentTime() / 60000L);
529:
530: stmt.setInt(3, nowMinutes);
531: stmt.setInt(4, nowMinutes);
532: stmt.setInt(5, (int) (expireInterval / 60000L));
533:
534: stmt.setInt(6, primary);
535: stmt.setInt(7, secondary);
536: stmt.setInt(8, tertiary);
537:
538: stmt.executeUpdate();
539:
540: if (log.isLoggable(Level.FINE))
541: log.fine(this + " insert " + uniqueId + " length:"
542: + length);
543:
544: return true;
545: } catch (SQLException e) {
546: System.out.println(e);
547:
548: log.log(Level.FINE, e.toString(), e);
549: }
550:
551: return false;
552: }
553:
554: //
555: // statistics
556: //
557:
558: public long getObjectCount() throws SQLException {
559: ClusterConnection conn = getConnection();
560:
561: try {
562: PreparedStatement stmt = conn.prepareCount();
563:
564: ResultSet rs = stmt.executeQuery();
565:
566: if (rs != null && rs.next()) {
567: long value = rs.getLong(1);
568: rs.close();
569: return value;
570: }
571:
572: return -1;
573: } catch (SQLException e) {
574: log.log(Level.FINE, e.toString(), e);
575: } finally {
576: conn.close();
577: }
578:
579: return -1;
580: }
581:
582: public void destroy() {
583: _dataSource = null;
584: _freeConn = null;
585: }
586:
587: private ClusterConnection getConnection() throws SQLException {
588: ClusterConnection cConn = _freeConn.allocate();
589:
590: if (cConn == null) {
591: Connection conn = _dataSource.getConnection();
592: cConn = new ClusterConnection(conn);
593: }
594:
595: return cConn;
596: }
597:
598: public String serverNameToTableName(String serverName) {
599: if (serverName == null)
600: return "srun";
601:
602: StringBuilder cb = new StringBuilder();
603: cb.append("srun_");
604:
605: for (int i = 0; i < serverName.length(); i++) {
606: char ch = serverName.charAt(i);
607:
608: if ('a' <= ch && ch <= 'z') {
609: cb.append(ch);
610: } else if ('A' <= ch && ch <= 'Z') {
611: cb.append(ch);
612: } else if ('0' <= ch && ch <= '9') {
613: cb.append(ch);
614: } else if (ch == '_') {
615: cb.append(ch);
616: } else
617: cb.append('_');
618: }
619:
620: return cb.toString();
621: }
622:
623: public String toString() {
624: return getClass().getSimpleName() + "[" + _tableName + "]";
625: }
626:
627: class ClusterConnection {
628: private Connection _conn;
629:
630: private PreparedStatement _loadStatement;
631: private PreparedStatement _updateStatement;
632: private PreparedStatement _insertStatement;
633: private PreparedStatement _accessStatement;
634: private PreparedStatement _setExpiresStatement;
635: private PreparedStatement _invalidateStatement;
636: private PreparedStatement _timeoutStatement;
637: private PreparedStatement _countStatement;
638:
639: ClusterConnection(Connection conn) {
640: _conn = conn;
641: }
642:
643: PreparedStatement prepareLoad() throws SQLException {
644: if (_loadStatement == null)
645: _loadStatement = _conn.prepareStatement(_loadQuery);
646:
647: return _loadStatement;
648: }
649:
650: PreparedStatement prepareUpdate() throws SQLException {
651: if (_updateStatement == null)
652: _updateStatement = _conn.prepareStatement(_updateQuery);
653:
654: return _updateStatement;
655: }
656:
657: PreparedStatement prepareInsert() throws SQLException {
658: if (_insertStatement == null)
659: _insertStatement = _conn.prepareStatement(_insertQuery);
660:
661: return _insertStatement;
662: }
663:
664: PreparedStatement prepareAccess() throws SQLException {
665: if (_accessStatement == null)
666: _accessStatement = _conn.prepareStatement(_accessQuery);
667:
668: return _accessStatement;
669: }
670:
671: PreparedStatement prepareSetExpireInterval()
672: throws SQLException {
673: if (_setExpiresStatement == null)
674: _setExpiresStatement = _conn
675: .prepareStatement(_setExpiresQuery);
676:
677: return _setExpiresStatement;
678: }
679:
680: PreparedStatement prepareInvalidate() throws SQLException {
681: if (_invalidateStatement == null)
682: _invalidateStatement = _conn
683: .prepareStatement(_invalidateQuery);
684:
685: return _invalidateStatement;
686: }
687:
688: PreparedStatement prepareCount() throws SQLException {
689: if (_countStatement == null)
690: _countStatement = _conn.prepareStatement(_countQuery);
691:
692: return _countStatement;
693: }
694:
695: void close() {
696: if (_freeConn != null)
697: _freeConn.free(this);
698: }
699: }
700: }
|