001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.ha.hasessionstate.server;
023:
024: import java.io.ByteArrayInputStream;
025: import java.io.ByteArrayOutputStream;
026: import java.io.IOException;
027: import java.io.ObjectInputStream;
028: import java.io.ObjectOutputStream;
029: import java.io.Serializable;
030: import java.util.ArrayList;
031: import java.util.Enumeration;
032: import java.util.HashMap;
033: import java.util.Hashtable;
034: import java.util.Iterator;
035: import java.util.Vector;
036: import java.util.zip.Deflater;
037: import java.util.zip.DeflaterOutputStream;
038: import java.util.zip.InflaterInputStream;
039:
040: import javax.naming.Context;
041: import javax.naming.InitialContext;
042: import javax.naming.Name;
043: import javax.naming.NameNotFoundException;
044: import javax.naming.Reference;
045: import javax.naming.StringRefAddr;
046:
047: import org.jboss.ha.framework.interfaces.HAPartition;
048: import org.jboss.ha.hasessionstate.interfaces.PackagedSession;
049: import org.jboss.logging.Logger;
050: import org.jboss.naming.NonSerializableFactory;
051: import org.jboss.system.server.ServerConfigUtil;
052:
053: import EDU.oswego.cs.dl.util.concurrent.Mutex;
054:
055: /**
056: * Default implementation of HASessionState
057: *
058: * @see org.jboss.ha.hasessionstate.interfaces.HASessionState
059: * @author sacha.labourey@cogito-info.ch
060: * @author <a href="bill@burkecentral.com">Bill Burke</a>
061: * @version $Revision: 57188 $
062: *
063: * <p><b>Revisions:</b><br>
064: * <p><b>2002/01/09: billb</b>
065: * <ol>
066: * <li>ripped out sub partitioning stuff. It really belongs as a subclass of HAPartition
067: * </ol>
068: *
069: */
070:
071: public class HASessionStateImpl implements
072: org.jboss.ha.hasessionstate.interfaces.HASessionState,
073: HAPartition.HAPartitionStateTransfer {
074:
075: protected String _sessionStateName;
076: protected Logger log;
077: protected HAPartition hapGeneral;
078: protected String sessionStateIdentifier;
079: protected String myNodeName;
080:
081: protected long beanCleaningDelay;
082: protected String haPartitionName;
083: protected String haPartitionJndiName;
084:
085: protected final String DEFAULT_PARTITION_JNDI_NAME = ServerConfigUtil
086: .getDefaultPartitionName();
087: protected final String JNDI_FOLDER_NAME_FOR_HASESSIONSTATE = org.jboss.metadata.ClusterConfigMetaData.JNDI_PREFIX_FOR_SESSION_STATE;
088: protected final String JNDI_FOLDER_NAME_FOR_HAPARTITION = "/HAPartition/";
089: protected final long MAX_DELAY_BEFORE_CLEANING_UNRECLAIMED_STATE = 30L * 60L * 1000L; // 30 minutes... should be set externally or use cache settings
090: protected static final String HA_SESSION_STATE_STATE_TRANSFER = "HASessionStateTransfer";
091:
092: protected HashMap locks = new HashMap();
093:
094: public HASessionStateImpl() {
095: }
096:
097: public HASessionStateImpl(String sessionStateName,
098: HAPartition partition, long beanCleaningDelay) {
099: this (sessionStateName, partition.getPartitionName(),
100: beanCleaningDelay);
101: this .hapGeneral = partition;
102: }
103:
104: public HASessionStateImpl(String sessionStateName,
105: String mainHAPartitionName, long beanCleaningDelay) {
106: if (sessionStateName == null)
107: this ._sessionStateName = org.jboss.metadata.ClusterConfigMetaData.DEFAULT_SESSION_STATE_NAME;
108: else
109: this ._sessionStateName = sessionStateName;
110:
111: this .sessionStateIdentifier = "SessionState-'"
112: + this ._sessionStateName + "'";
113:
114: if (mainHAPartitionName == null)
115: haPartitionName = DEFAULT_PARTITION_JNDI_NAME;
116: else
117: haPartitionName = mainHAPartitionName;
118:
119: haPartitionJndiName = JNDI_FOLDER_NAME_FOR_HAPARTITION
120: + haPartitionName;
121:
122: if (beanCleaningDelay > 0)
123: this .beanCleaningDelay = beanCleaningDelay;
124: else
125: this .beanCleaningDelay = MAX_DELAY_BEFORE_CLEANING_UNRECLAIMED_STATE;
126:
127: }
128:
129: public void init() throws Exception {
130: this .log = Logger.getLogger(HASessionStateImpl.class.getName()
131: + "." + this ._sessionStateName);
132:
133: // BES 20060416 -- if people used an old config, we may not
134: // have been passed the partition, so have to find in JNDI
135: // JNDI work s/b done in start(), but we have no choice, as
136: // we must register for state transfer in init
137: if (this .hapGeneral == null) {
138: Context ctx = new InitialContext();
139: this .hapGeneral = (HAPartition) ctx
140: .lookup(haPartitionJndiName);
141: }
142:
143: if (hapGeneral == null)
144: log.error("Unable to get default HAPartition under name '"
145: + haPartitionJndiName + "'.");
146:
147: this .hapGeneral.registerRPCHandler(this .sessionStateIdentifier,
148: this );
149: this .hapGeneral.subscribeToStateTransferEvents(
150: HA_SESSION_STATE_STATE_TRANSFER, this );
151: }
152:
153: public void start() throws Exception {
154: this .myNodeName = this .hapGeneral.getNodeName();
155: log.debug("HASessionState node name : " + this .myNodeName);
156:
157: // BES 4/7/06 clean up lifecycle; move this to start, as it can't be
158: // called until startService due to JNDI dependency
159: Context ctx = new InitialContext();
160: this .bind(this ._sessionStateName, this ,
161: HASessionStateImpl.class, ctx);
162: }
163:
164: protected void bind(String jndiName, Object who, Class classType,
165: Context ctx) throws Exception {
166: // Ah ! This service isn't serializable, so we use a helper class
167: //
168: NonSerializableFactory.bind(jndiName, who);
169: Name n = ctx.getNameParser("").parse(jndiName);
170: while (n.size() > 1) {
171: String ctxName = n.get(0);
172: try {
173: ctx = (Context) ctx.lookup(ctxName);
174: } catch (NameNotFoundException e) {
175: log.debug("creating Subcontext" + ctxName);
176: ctx = ctx.createSubcontext(ctxName);
177: }
178: n = n.getSuffix(1);
179: }
180:
181: // The helper class NonSerializableFactory uses address type nns, we go on to
182: // use the helper class to bind the service object in JNDI
183: //
184: StringRefAddr addr = new StringRefAddr("nns", jndiName);
185: Reference ref = new Reference(classType.getName(), addr,
186: NonSerializableFactory.class.getName(), null);
187: ctx.bind(n.get(0), ref);
188: }
189:
190: public void stop() throws Exception {
191: purgeState();
192:
193: // Unbind so we can rebind if restarted
194: try {
195: Context ctx = new InitialContext();
196: ctx.unbind(this ._sessionStateName);
197: NonSerializableFactory.unbind(this ._sessionStateName);
198: } catch (Exception ignored) {
199: }
200: }
201:
202: public void destroy() throws Exception {
203: // Remove ref to ourself from HAPartition
204: this .hapGeneral.unregisterRPCHandler(
205: this .sessionStateIdentifier, this );
206: this .hapGeneral.unsubscribeFromStateTransferEvents(
207: HA_SESSION_STATE_STATE_TRANSFER, this );
208: }
209:
210: public String getNodeName() {
211: return this .myNodeName;
212: }
213:
214: // Used for Session state transfer
215: //
216: public Serializable getCurrentState() {
217: log.debug("Building and returning state of HASessionState");
218:
219: if (this .appSessions == null)
220: this .appSessions = new Hashtable();
221:
222: Serializable result = null;
223:
224: synchronized (this .lockAppSession) {
225: this .purgeState();
226:
227: try {
228: result = deflate(this .appSessions);
229: } catch (Exception e) {
230: log.error("operation failed", e);
231: }
232: }
233: return result;
234: }
235:
236: public void setCurrentState(Serializable newState) {
237: log.debug("Receiving state of HASessionState");
238:
239: if (this .appSessions == null)
240: this .appSessions = new Hashtable();
241:
242: synchronized (this .lockAppSession) {
243: try {
244: this .appSessions.clear(); // hope to facilitate the job of the GC
245: this .appSessions = (Hashtable) inflate((byte[]) newState);
246: } catch (Exception e) {
247: log.error("operation failed", e);
248: }
249: }
250: }
251:
252: public void purgeState() {
253: synchronized (this .lockAppSession) {
254: for (Enumeration keyEnum = this .appSessions.keys(); keyEnum
255: .hasMoreElements();) {
256: // trip in apps..
257: //
258: Object key = keyEnum.nextElement();
259: Hashtable value = (Hashtable) this .appSessions.get(key);
260: long currentTime = System.currentTimeMillis();
261:
262: for (Iterator iterSessions = value.values().iterator(); iterSessions
263: .hasNext();) {
264: PackagedSession ps = (PackagedSession) iterSessions
265: .next();
266: if ((currentTime - ps.unmodifiedExistenceInVM()) > beanCleaningDelay)
267: iterSessions.remove();
268: }
269: }
270: }
271:
272: }
273:
274: protected byte[] deflate(Object object) throws IOException {
275: ByteArrayOutputStream baos = new ByteArrayOutputStream();
276: Deflater def = new Deflater(
277: java.util.zip.Deflater.BEST_COMPRESSION);
278: DeflaterOutputStream dos = new DeflaterOutputStream(baos, def);
279:
280: ObjectOutputStream out = new ObjectOutputStream(dos);
281: out.writeObject(object);
282: out.close();
283: dos.finish();
284: dos.close();
285:
286: return baos.toByteArray();
287: }
288:
289: protected Object inflate(byte[] compressedContent)
290: throws IOException {
291: if (compressedContent == null)
292: return null;
293:
294: try {
295: ObjectInputStream in = new ObjectInputStream(
296: new InflaterInputStream(new ByteArrayInputStream(
297: compressedContent)));
298:
299: Object object = in.readObject();
300: in.close();
301: return object;
302: } catch (Exception e) {
303: throw new IOException(e.toString());
304: }
305: }
306:
307: protected Hashtable appSessions = new Hashtable();
308: protected Object lockAppSession = new Object();
309:
310: protected Hashtable getHashtableForApp(String appName) {
311: if (this .appSessions == null)
312: this .appSessions = new Hashtable(); // should never happen though...
313:
314: Hashtable result = null;
315:
316: synchronized (this .lockAppSession) {
317: result = (Hashtable) this .appSessions.get(appName);
318: if (result == null) {
319: result = new Hashtable();
320: this .appSessions.put(appName, result);
321: }
322: }
323: return result;
324: }
325:
326: public void createSession(String appName, Object keyId) {
327: this ._createSession(appName, keyId);
328: }
329:
330: public PackagedSessionImpl _createSession(String appName,
331: Object keyId) {
332: Hashtable app = this .getHashtableForApp(appName);
333: PackagedSessionImpl result = new PackagedSessionImpl(
334: (Serializable) keyId, null, this .myNodeName);
335: app.put(keyId, result);
336: return result;
337: }
338:
339: public void setState(String appName, Object keyId, byte[] state)
340: throws java.rmi.RemoteException {
341: Hashtable app = this .getHashtableForApp(appName);
342: PackagedSession ps = (PackagedSession) app.get(keyId);
343:
344: if (ps == null) {
345: ps = _createSession(appName, keyId);
346: }
347:
348: boolean isStateIdentical = false;
349:
350: Mutex mtx = getLock(appName, keyId);
351: try {
352: if (!mtx.attempt(0))
353: throw new java.rmi.RemoteException(
354: "Concurent calls on session object.");
355: } catch (InterruptedException ie) {
356: log.info(ie);
357: return;
358: }
359:
360: try {
361: isStateIdentical = ps.setState(state);
362: if (!isStateIdentical) {
363: Object[] args = { appName, ps };
364: try {
365: this .hapGeneral.callMethodOnCluster(
366: this .sessionStateIdentifier, "_setState",
367: args, new Class[] { String.class,
368: PackagedSession.class }, true);
369: } catch (Exception e) {
370: log.error("operation failed", e);
371: }
372: }
373: } finally {
374: mtx.release();
375: }
376: }
377:
378: /*
379: public void _setStates (String appName, Hashtable packagedSessions)
380: {
381: synchronized (this.lockAppSession)
382: {
383: Hashtable app = this.getHashtableForApp (appName);
384:
385: if (app == null)
386: {
387: app = new Hashtable (packagedSessions.size ());
388: this.appSessions.put (appName, app);
389: }
390: app.putAll (packagedSessions);
391: }
392: }*/
393:
394: public void _setState(String appName, PackagedSession session) {
395: Hashtable app = this .getHashtableForApp(appName);
396: PackagedSession ps = (PackagedSession) app
397: .get(session.getKey());
398:
399: if (ps == null) {
400: ps = session;
401: synchronized (app) {
402: app.put(ps.getKey(), ps);
403: }
404: } else {
405: Mutex mtx = getLock(appName, session.getKey());
406: try {
407: mtx.acquire();
408: } catch (InterruptedException ie) {
409: log.info(ie);
410: return;
411: }
412:
413: try {
414: if (ps.getOwner().equals(this .myNodeName)) {
415: // a modification has occured externally while we were the owner
416: //
417: ownedObjectExternallyModified(appName, session
418: .getKey(), ps, session);
419: }
420: ps.update(session);
421: } finally {
422: mtx.release();
423: }
424: }
425:
426: }
427:
428: public PackagedSession getState(String appName, Object keyId) {
429: Hashtable app = this .getHashtableForApp(appName);
430: return (PackagedSession) app.get(keyId);
431: }
432:
433: public PackagedSession getStateWithOwnership(String appName,
434: Object keyId) throws java.rmi.RemoteException {
435: return this .localTakeOwnership(appName, keyId);
436: }
437:
438: public PackagedSession localTakeOwnership(String appName,
439: Object keyId) throws java.rmi.RemoteException {
440: Hashtable app = this .getHashtableForApp(appName);
441: PackagedSession ps = (PackagedSession) app.get(keyId);
442:
443: // if the session is not yet available, we simply return null. The persistence manager
444: // will have to take an action accordingly
445: //
446: if (ps == null)
447: return null;
448:
449: Mutex mtx = getLock(appName, keyId);
450:
451: try {
452: if (!mtx.attempt(0))
453: throw new java.rmi.RemoteException(
454: "Concurent calls on session object.");
455: } catch (InterruptedException ie) {
456: log.info(ie);
457: return null;
458: }
459:
460: try {
461: if (!ps.getOwner().equals(this .myNodeName)) {
462: Object[] args = { appName, keyId, this .myNodeName,
463: new Long(ps.getVersion()) };
464: ArrayList answers = null;
465: try {
466: answers = this .hapGeneral.callMethodOnCluster(
467: this .sessionStateIdentifier,
468: "_setOwnership", args, new Class[] {
469: String.class, Object.class,
470: String.class, Long.class }, true);
471: } catch (Exception e) {
472: log.error("operation failed", e);
473: }
474:
475: if (answers != null && answers.contains(Boolean.FALSE))
476: throw new java.rmi.RemoteException(
477: "Concurent calls on session object.");
478: else {
479: ps.setOwner(this .myNodeName);
480: return ps;
481: }
482: } else
483: return ps;
484: } finally {
485: mtx.release();
486: }
487: }
488:
489: public Boolean _setOwnership(String appName, Object keyId,
490: String newOwner, Long remoteVersion) {
491: Hashtable app = this .getHashtableForApp(appName);
492: PackagedSession ps = (PackagedSession) app.get(keyId);
493: Boolean answer = Boolean.TRUE;
494: Mutex mtx = getLock(appName, keyId);
495:
496: try {
497: if (!mtx.attempt(0))
498: return Boolean.FALSE;
499: } catch (InterruptedException ie) {
500: log.info(ie);
501: return Boolean.FALSE;
502: }
503:
504: try {
505: if (!ps.getOwner().equals(this .myNodeName)) {
506: // this is not our business... we don't care
507: // we do not update the owner of ps as another host may refuse the _setOwnership call
508: // anyway, the update will be sent to us later if state is modified
509: //
510: //ps.setOwner (newOwner);
511: answer = Boolean.TRUE;
512: } else if (ps.getVersion() > remoteVersion.longValue()) {
513: // we are concerned and our version is more recent than the one of the remote host!
514: // it means that we have concurrent calls on the same state that has not yet been updated
515: // this means we will need to raise a java.rmi.RemoteException
516: //
517: answer = Boolean.FALSE;
518: } else {
519: // the remote host has the same version as us (or more recent? possible?)
520: // we need to update the ownership. We can do this because we know that no other
521: // node can refuse the _setOwnership call
522: ps.setOwner(newOwner);
523: ownedObjectExternallyModified(appName, keyId, ps, ps);
524: answer = Boolean.TRUE;
525: }
526: } finally {
527: mtx.release();
528: }
529: return answer;
530: }
531:
532: public void takeOwnership(String appName, Object keyId)
533: throws java.rmi.RemoteException {
534: this .localTakeOwnership(appName, keyId);
535: }
536:
537: public void removeSession(String appName, Object keyId) {
538: Hashtable app = this .getHashtableForApp(appName);
539: if (app != null) {
540: PackagedSession ps = (PackagedSession) app.remove(keyId);
541: if (ps != null) {
542: removeLock(appName, keyId);
543: Object[] args = { appName, keyId };
544: try {
545: this .hapGeneral.callMethodOnCluster(
546: this .sessionStateIdentifier,
547: "_removeSession", args, new Class[] {
548: String.class, Object.class }, true);
549: } catch (Exception e) {
550: log.error("operation failed", e);
551: }
552: }
553: }
554: }
555:
556: public void _removeSession(String appName, Object keyId) {
557: Hashtable app = this .getHashtableForApp(appName);
558: PackagedSession ps = null;
559: ps = (PackagedSession) app.remove(keyId);
560: if (ps != null && ps.getOwner().equals(this .myNodeName))
561: ownedObjectExternallyModified(appName, keyId, ps, ps);
562:
563: removeLock(appName, keyId);
564: }
565:
566: protected Hashtable listeners = new Hashtable();
567:
568: public synchronized void subscribe(String appName,
569: HASessionStateListener listener) {
570: Vector members = (Vector) listeners.get(appName);
571: if (members == null) {
572: members = new Vector();
573: listeners.put(appName, members);
574: }
575: if (!members.contains(listener)) {
576: members.add(listener);
577: }
578:
579: }
580:
581: public synchronized void unsubscribe(String appName,
582: HASessionStateListener listener) {
583: Vector members = (Vector) listeners.get(appName);
584: if ((members != null) && members.contains(listener))
585: members.remove(listener);
586: }
587:
588: public void ownedObjectExternallyModified(String appName,
589: Object key, PackagedSession oldSession,
590: PackagedSession newSession) {
591: Vector members = (Vector) listeners.get(appName);
592: if (members != null)
593: for (int i = 0; i < members.size(); i++)
594: try {
595: ((HASessionStateListener) members.elementAt(i))
596: .sessionExternallyModified(newSession);
597: } catch (Throwable t) {
598: log.debug(t);
599: }
600: }
601:
602: public HAPartition getCurrentHAPartition() {
603: return this .hapGeneral;
604: }
605:
606: protected boolean lockExists(String appName, Object key) {
607: synchronized (this .locks) {
608: HashMap ls = (HashMap) this .locks.get(appName);
609: if (ls == null)
610: return false;
611:
612: return (ls.get(key) != null);
613: }
614: }
615:
616: protected Mutex getLock(String appName, Object key) {
617: synchronized (this .locks) {
618: HashMap ls = (HashMap) this .locks.get(appName);
619: if (ls == null) {
620: ls = new HashMap();
621: this .locks.put(appName, ls);
622: }
623:
624: Mutex mutex = (Mutex) ls.get(key);
625: if (mutex == null) {
626: mutex = new Mutex();
627: ls.put(key, mutex);
628: }
629:
630: return mutex;
631: }
632: }
633:
634: protected void removeLock(String appName, Object key) {
635: synchronized (this .locks) {
636: HashMap ls = (HashMap) this.locks.get(appName);
637: if (ls == null)
638: return;
639: ls.remove(key);
640: }
641: }
642:
643: }
|