001: /*
002: * Copyright (c) 1998-2008 Caucho Technology -- all rights reserved
003: *
004: * This file is part of Resin(R) Open Source
005: *
006: * Each copy or derived work must preserve the copyright notice and this
007: * notice unmodified.
008: *
009: * Resin Open Source is free software; you can redistribute it and/or modify
010: * it under the terms of the GNU General Public License as published by
011: * the Free Software Foundation; either version 2 of the License, or
012: * (at your option) any later version.
013: *
014: * Resin Open Source is distributed in the hope that it will be useful,
015: * but WITHOUT ANY WARRANTY; without even the implied warranty of
016: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE, or any warranty
017: * of NON-INFRINGEMENT. See the GNU General Public License for more
018: * details.
019: *
020: * You should have received a copy of the GNU General Public License
021: * along with Resin Open Source; if not, write to the
022: *
023: * Free Software Foundation, Inc.
024: * 59 Temple Place, Suite 330
025: * Boston, MA 02111-1307 USA
026: *
027: * @author Scott Ferguson
028: */
029:
030: package com.caucho.server.cluster;
031:
032: import com.caucho.config.types.Period;
033: import com.caucho.lifecycle.Lifecycle;
034: import com.caucho.loader.ClassLoaderListener;
035: import com.caucho.loader.DynamicClassLoader;
036: import com.caucho.loader.Environment;
037: import com.caucho.loader.EnvironmentClassLoader;
038: import com.caucho.loader.EnvironmentListener;
039: import com.caucho.management.server.PersistentStoreMXBean;
040: import com.caucho.util.Alarm;
041: import com.caucho.util.AlarmListener;
042: import com.caucho.util.L10N;
043: import com.caucho.util.LruCache;
044: import com.caucho.vfs.TempStream;
045:
046: import javax.annotation.PostConstruct;
047: import java.io.IOException;
048: import java.util.HashMap;
049: import java.util.logging.Level;
050: import java.util.logging.Logger;
051:
052: /**
053: * Base class for distributed stores.
054: */
055: abstract public class StoreManager implements AlarmListener,
056: EnvironmentListener, ClassLoaderListener {
057: static protected final Logger log = Logger
058: .getLogger(StoreManager.class.getName());
059: static final L10N L = new L10N(StoreManager.class);
060:
061: private static int DECODE[];
062:
063: private Cluster _cluster;
064: private String _serverId;
065:
066: protected int _selfIndex;
067: private ServerConnector[] _serverList;
068:
069: private Alarm _alarm;
070:
071: // protected long _maxIdleTime = 24 * 3600 * 1000L;
072: protected long _maxIdleTime = 3 * 3600 * 1000L;
073: protected long _idleCheckInterval = 5 * 60 * 1000L;
074:
075: protected boolean _isAlwaysLoad;
076: protected boolean _isAlwaysSave;
077:
078: protected HashMap<String, Store> _storeMap;
079: protected LruCache<String, ClusterObject> _clusterObjects;
080:
081: private final Lifecycle _lifecycle = new Lifecycle(log, toString());
082:
083: //
084: // statistics
085: //
086:
087: protected volatile long _loadCount;
088: protected volatile long _loadFailCount;
089:
090: protected volatile long _saveCount;
091: protected volatile long _saveFailCount;
092:
093: protected StoreManager() {
094: _clusterObjects = new LruCache<String, ClusterObject>(4096);
095: _clusterObjects.setEnableListeners(false);
096:
097: _storeMap = new HashMap<String, Store>();
098:
099: _alarm = new Alarm(this );
100:
101: Environment.addClassLoaderListener(this );
102: }
103:
104: /**
105: * Sets the cluster.
106: */
107: public void setCluster(Cluster cluster) {
108: _cluster = cluster;
109: }
110:
111: /**
112: * Gets the cluster.
113: */
114: public Cluster getCluster() {
115: return _cluster;
116: }
117:
118: /**
119: * Returns the admin.
120: */
121: public PersistentStoreMXBean getAdmin() {
122: return null;
123: }
124:
125: /**
126: * Set true if the store should always try to load the object.
127: */
128: public void setAlwaysLoad(boolean alwaysLoad) {
129: _isAlwaysLoad = alwaysLoad;
130: }
131:
132: /**
133: * Set true if the store should always try to load the object.
134: */
135: public boolean isAlwaysLoad() {
136: return _isAlwaysLoad;
137: }
138:
139: /**
140: * Set true if the store should always try to store the object.
141: */
142: public void setAlwaysSave(boolean alwaysSave) {
143: _isAlwaysSave = alwaysSave;
144: }
145:
146: /**
147: * Set true if the store should always try to store the object.
148: */
149: public boolean isAlwaysSave() {
150: return _isAlwaysSave;
151: }
152:
153: /**
154: * Returns the length of time an idle object can remain in the store before
155: * being cleaned.
156: */
157: public long getMaxIdleTime() {
158: return _maxIdleTime;
159: }
160:
161: /**
162: * Sets the length of time an idle object can remain in the store before
163: * being cleaned.
164: */
165: public void setMaxIdleTime(Period maxIdleTime) {
166: _maxIdleTime = maxIdleTime.getPeriod();
167: }
168:
169: /**
170: * Sets the idle check interval for the alarm.
171: */
172: public void updateIdleCheckInterval(long idleCheckInterval) {
173: if (_idleCheckInterval > 0 && idleCheckInterval > 0
174: && idleCheckInterval < _idleCheckInterval) {
175: _idleCheckInterval = idleCheckInterval;
176: _alarm.queue(idleCheckInterval);
177: }
178:
179: if (_idleCheckInterval >= 0 && _idleCheckInterval < 1000)
180: _idleCheckInterval = 1000;
181: }
182:
183: /**
184: * Sets the idle check interval for the alarm.
185: */
186: public long getIdleCheckTime() {
187: if (_idleCheckInterval > 0)
188: return _idleCheckInterval;
189: else
190: return _maxIdleTime;
191: }
192:
193: /**
194: * Returns the length of time an idle object can remain in the store before
195: * being cleaned.
196: */
197: public long getAccessWindowTime() {
198: long window = _maxIdleTime / 4;
199:
200: if (window < 60000L)
201: return 60000L;
202: else
203: return window;
204: }
205:
206: //
207: // statistics
208: //
209:
210: /**
211: * Returns the objects in the store
212: */
213: public long getObjectCount() {
214: return -1;
215: }
216:
217: /**
218: * Returns the total objects loaded.
219: */
220: public long getLoadCount() {
221: return _loadCount;
222: }
223:
224: /**
225: * Returns the objects which failed to load.
226: */
227: public long getLoadFailCount() {
228: return _loadFailCount;
229: }
230:
231: /**
232: * Returns the total objects saved.
233: */
234: public long getSaveCount() {
235: return _saveCount;
236: }
237:
238: /**
239: * Returns the objects which failed to save.
240: */
241: public long getSaveFailCount() {
242: return _saveFailCount;
243: }
244:
245: /**
246: * Creates a Store. The Store manages
247: * persistent objects for a particular domain, like the sesions
248: * for the /foo URL.
249: *
250: * @param storeId the persistent domain.
251: */
252: public Store createStore(String storeId, ObjectManager objectManager) {
253: Store store = getStore(storeId);
254:
255: store.setObjectManager(objectManager);
256:
257: return store;
258: }
259:
260: /**
261: * Removes a Store. The Store manages
262: * persistent objects for a particular domain, like the sesions
263: * for the /foo URL.
264: *
265: * @param storeId the persistent domain.
266: */
267: public Store removeStore(String storeId) {
268: Store store = getStore(storeId);
269:
270: store.setObjectManager(null);
271:
272: return store;
273: }
274:
275: /**
276: * Creates a ClusterObjectManager. The ClusterObjectManager manages
277: * persistent objects for a particular domain, like the sesions
278: * for the /foo URL.
279: *
280: * @param storeId the persistent domain.
281: */
282: public Store getStore(String storeId) {
283: synchronized (_storeMap) {
284: Store store = _storeMap.get(storeId);
285: if (store == null) {
286: store = new Store(storeId, this );
287: _storeMap.put(storeId, store);
288: }
289:
290: return store;
291: }
292: }
293:
294: /**
295: * Called after any factory settings.
296: */
297: @PostConstruct
298: public boolean init() throws Exception {
299: if (!_lifecycle.toInit())
300: return false;
301:
302: _lifecycle.setName(toString());
303:
304: if (_cluster == null)
305: _cluster = Cluster.getLocal();
306:
307: if (_cluster != null) {
308: _serverId = Cluster.getServerId();
309: ClusterServer selfServer = _cluster.getSelfServer();
310:
311: if (selfServer != null)
312: _selfIndex = selfServer.getIndex();
313: else if (_cluster.getServerList().length > 1) {
314: // XXX: error?
315: log
316: .warning(L
317: .l(
318: "cluster-store for '{0}' needs an <srun> configuration for it.",
319: _serverId));
320: }
321:
322: ClusterServer[] serverList = _cluster.getServerList();
323:
324: _serverList = new ServerConnector[serverList.length];
325:
326: for (int i = 0; i < serverList.length; i++) {
327: _serverList[i] = serverList[i].getServerConnector();
328: }
329: }
330:
331: Environment.addEnvironmentListener(this );
332:
333: return true;
334: }
335:
336: /**
337: * Called to start the store.
338: */
339: public boolean start() throws Exception {
340: if (!_lifecycle.toActive())
341: return false;
342:
343: // notify the siblings that we're awake
344: if (_serverList != null) {
345: ServerConnector[] serverList = _serverList;
346:
347: for (int i = 0; i < serverList.length; i++) {
348: ServerConnector server = serverList[i];
349:
350: if (server == null)
351: continue;
352:
353: try {
354: ClusterStream s = server.open();
355: s.close();
356: } catch (Throwable e) {
357: }
358: }
359:
360: }
361:
362: handleAlarm(_alarm);
363:
364: return true;
365: }
366:
367: /**
368: * Cleans old objects. Living objects corresponding to the old
369: * objects are not cleared, since their timeout should be less than
370: * the store timeout.
371: */
372: public void clearOldObjects() throws Exception {
373: }
374:
375: /**
376: * Returns true if this server is a primary for the given object id.
377: */
378: protected boolean isPrimary(String id) {
379: return getPrimaryIndex(id, 0) == getSelfIndex();
380: }
381:
382: /**
383: * Returns the owning index.
384: */
385: public int getPrimaryIndex(String id, int offset) {
386: return 0;
387: }
388:
389: /**
390: * Returns the backup index.
391: */
392: public int getSecondaryIndex(String id, int offset) {
393: return 0;
394: }
395:
396: /**
397: * Returns the backup index.
398: */
399: public int getTertiaryIndex(String id, int offset) {
400: return 0;
401: }
402:
403: /**
404: * Loads object access time.
405: *
406: * @param obj the object to update.
407: */
408: abstract protected boolean load(ClusterObject clusterObject,
409: Object obj) throws Exception;
410:
411: /**
412: * Updates the object's access time.
413: *
414: * @param storeId the identifier of the storage group
415: * @param obj the object to update.
416: */
417: public void access(String uniqueId) throws Exception {
418: ClusterObject obj = getClusterObject(uniqueId);
419:
420: if (obj != null)
421: obj.access();
422: else
423: accessImpl(uniqueId);
424: }
425:
426: /**
427: * Updates the object's access time.
428: *
429: * @param storeId the identifier of the storage group
430: * @param obj the object to update.
431: */
432: public void access(Store store, String id) throws Exception {
433: getClusterObject(store, id).access();
434: }
435:
436: /**
437: * Updates the object's access time in the persistent store.
438: *
439: * @param uniqueId the identifier of the object.
440: */
441: abstract public void accessImpl(String uniqueId) throws Exception;
442:
443: /**
444: * Sets the timef for the expires interval.
445: *
446: * @param uniqueId the identifier of the object.
447: * @param long the time in ms for the expire
448: */
449: public void setExpireInterval(String uniqueId, long expires)
450: throws Exception {
451: }
452:
453: /**
454: * When the object is no longer valid, remove it from the backing store.
455: *
456: * @param storeId the identifier of the storeage group
457: * @param objectId the identifier of the object to remove
458: */
459: public void update(String storeId, String objectId)
460: throws Exception {
461: ClusterObject obj = getClusterObject(storeId, objectId);
462:
463: if (obj != null)
464: obj.update();
465: }
466:
467: /**
468: * Updates the owner object.
469: *
470: * @param uniqueId the identifier of the storage group
471: */
472: public void updateOwner(String objectId, String uniqueId)
473: throws Exception {
474:
475: }
476:
477: /**
478: * Saves the object to the cluster.
479: *
480: * @param storeId the identifier of the storage group
481: * @param obj the object to store.
482: */
483: public void store(Store store, String id, Object obj)
484: throws IOException {
485: ClusterObject clusterObj = getClusterObject(store, id);
486:
487: if (clusterObj != null) {
488: } else if (store.getObjectManager().isEmpty(obj))
489: return;
490: else
491: clusterObj = createClusterObject(store, id);
492:
493: clusterObj.store(obj);
494: }
495:
496: /**
497: * Returns the cluster object.
498: *
499: * @param storeId the identifier of the storage group
500: * @param obj the object to store.
501: */
502: ClusterObject createClusterObject(Store store, String id) {
503: try {
504: String uniqueId = store.getId() + ';' + id;
505:
506: synchronized (this ) {
507: ClusterObject clusterObj = _clusterObjects
508: .get(uniqueId);
509: if (clusterObj == null) {
510: clusterObj = create(store, id);
511: _clusterObjects.put(clusterObj.getUniqueId(),
512: clusterObj);
513: }
514:
515: return clusterObj;
516: }
517: } catch (Exception e) {
518: log.log(Level.WARNING, e.toString(), e);
519:
520: return null;
521: }
522: }
523:
524: /**
525: * Returns the cluster object.
526: *
527: * @param storeId the identifier of the storage group
528: * @param obj the object to store.
529: */
530: ClusterObject getClusterObject(Store store, String id) {
531: return getClusterObject(makeUniqueId(store, id));
532: }
533:
534: /**
535: * Returns the cluster object.
536: *
537: * @param storeId the identifier of the storage group
538: * @param obj the object to store.
539: */
540: ClusterObject getClusterObject(String storeId, String id) {
541: return getClusterObject(makeUniqueId(storeId, id));
542: }
543:
544: /**
545: * Returns the cluster object.
546: *
547: * @param storeId the identifier of the storage group
548: * @param obj the object to store.
549: */
550: ClusterObject getClusterObject(String uniqueId) {
551: return _clusterObjects.get(uniqueId);
552: }
553:
554: /**
555: * Returns the cluster object.
556: *
557: * @param storeId the identifier of the storage group
558: * @param obj the object to store.
559: */
560: ClusterObject removeClusterObject(String storeId, String id) {
561: synchronized (this ) {
562: return _clusterObjects.remove(makeUniqueId(storeId, id));
563: }
564: }
565:
566: /**
567: * Creates the cluster object.
568: */
569: ClusterObject create(Store store, String id) {
570: return new ClusterObject(this , store, id);
571: }
572:
573: /**
574: * Save the object to the store.
575: *
576: * @param storeId the identifier of the storage group
577: * @param obj the object to store.
578: */
579: abstract protected void store(ClusterObject clusterObject,
580: TempStream tempStream, long crc) throws Exception;
581:
582: /**
583: * Handles a callback from an alarm, scheduling the timeout.
584: */
585: public void handleAlarm(Alarm alarm) {
586: if (!_lifecycle.isActive())
587: return;
588:
589: try {
590: clearOldObjects();
591: } catch (Throwable e) {
592: log.log(Level.WARNING, e.toString(), e);
593: } finally {
594: _alarm.queue(getIdleCheckTime());
595: }
596: }
597:
598: /**
599: * When the object is no longer valid, remove it from the backing store.
600: *
601: * @param obj the object to remove
602: */
603: public void remove(ClusterObject obj) throws Exception {
604:
605: }
606:
607: /**
608: * When the object is no longer valid, remove it from the backing store.
609: *
610: * @param store the identifier of the storeage group
611: * @param objectId the identifier of the object to remove
612: */
613: public void remove(Store store, String objectId) throws Exception {
614: }
615:
616: /**
617: * Returns the unique id.
618: */
619: private String makeUniqueId(Store store, String objectId) {
620: return store.getId() + ';' + objectId;
621: }
622:
623: /**
624: * Returns the unique id.
625: */
626: private String makeUniqueId(String storeId, String objectId) {
627: return storeId + ';' + objectId;
628: }
629:
630: /**
631: * Returns the self servers.
632: */
633: protected int getSelfIndex() {
634: return _selfIndex;
635: }
636:
637: /**
638: * Returns the list of cluster servers.
639: */
640: protected ServerConnector[] getServerList() {
641: return _serverList;
642: }
643:
644: /**
645: * Returns the cluster server which owns the object
646: */
647: protected ServerConnector getOwningServer(String objectId) {
648: if (_cluster == null)
649: return null;
650:
651: char ch = objectId.charAt(0);
652:
653: ServerConnector[] serverList = _serverList;
654:
655: if (serverList.length > 0) {
656: int srunIndex = decode(ch) % serverList.length;
657:
658: return serverList[srunIndex];
659: } else
660: return null;
661: }
662:
663: /**
664: * Handles the case where the environment is activated.
665: */
666: public void environmentConfig(EnvironmentClassLoader loader) {
667: }
668:
669: /**
670: * Handles the case where the environment is activated.
671: */
672: public void environmentStart(EnvironmentClassLoader loader) {
673: try {
674: start();
675: } catch (Exception e) {
676: log.log(Level.WARNING, e.toString(), e);
677: }
678: }
679:
680: /**
681: * Handles the case where the environment loader is stops
682: */
683: public void environmentStop(EnvironmentClassLoader loader) {
684: }
685:
686: /**
687: * Handles the case where the environment is activated.
688: */
689: public void classLoaderInit(DynamicClassLoader loader) {
690: }
691:
692: /**
693: * Handles the case where the environment loader is dropped.
694: */
695: public void classLoaderDestroy(DynamicClassLoader loader) {
696: destroy();
697: }
698:
699: /**
700: * Called at end of life.
701: */
702: public void destroy() {
703: if (!_lifecycle.toDestroy())
704: return;
705:
706: _alarm.dequeue();
707: }
708:
709: public String toString() {
710: return getClass().getSimpleName() + "[" + _serverId + "]";
711: }
712:
713: static class ObjectKey {
714: private String _storeId;
715: private String _objectId;
716:
717: ObjectKey() {
718: }
719:
720: ObjectKey(String storeId, String objectId) {
721: init(storeId, objectId);
722: }
723:
724: void init(String storeId, String objectId) {
725: _storeId = storeId;
726: _objectId = objectId;
727: }
728:
729: @Override
730: public int hashCode() {
731: return _storeId.hashCode() * 65521 + _objectId.hashCode();
732: }
733:
734: @Override
735: public boolean equals(Object o) {
736: if (this == o)
737: return true;
738: else if (!(o instanceof ObjectKey))
739: return false;
740:
741: ObjectKey key = (ObjectKey) o;
742:
743: return _objectId.equals(key._objectId)
744: && _storeId.equals(key._storeId);
745: }
746: }
747:
748: static int decode(int code) {
749: return DECODE[code & 0x7f];
750: }
751:
752: /**
753: * Converts an integer to a printable character
754: */
755: private static char convert(long code) {
756: code = code & 0x3f;
757:
758: if (code < 26)
759: return (char) ('a' + code);
760: else if (code < 52)
761: return (char) ('A' + code - 26);
762: else if (code < 62)
763: return (char) ('0' + code - 52);
764: else if (code == 62)
765: return '_';
766: else
767: return '-';
768: }
769:
770: static {
771: DECODE = new int[128];
772: for (int i = 0; i < 64; i++)
773: DECODE[(int) convert(i)] = i;
774: }
775: }
|