0001: /*
0002: * Copyright 1999,2004 The Apache Software Foundation.
0003: *
0004: * Licensed under the Apache License, Version 2.0 (the "License");
0005: * you may not use this file except in compliance with the License.
0006: * You may obtain a copy of the License at
0007: *
0008: * http://www.apache.org/licenses/LICENSE-2.0
0009: *
0010: * Unless required by applicable law or agreed to in writing, software
0011: * distributed under the License is distributed on an "AS IS" BASIS,
0012: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0013: * See the License for the specific language governing permissions and
0014: * limitations under the License.
0015: */
0016:
0017: package org.apache.catalina.cluster.session;
0018:
0019: import java.beans.PropertyChangeEvent;
0020: import java.beans.PropertyChangeListener;
0021: import java.io.BufferedInputStream;
0022: import java.io.BufferedOutputStream;
0023: import java.io.ByteArrayInputStream;
0024: import java.io.ByteArrayOutputStream;
0025: import java.io.IOException;
0026: import java.io.ObjectInputStream;
0027: import java.io.ObjectOutputStream;
0028: import java.security.AccessController;
0029: import java.security.PrivilegedActionException;
0030: import java.security.PrivilegedExceptionAction;
0031: import java.util.ArrayList;
0032: import java.util.Iterator;
0033: import javax.servlet.ServletContext;
0034: import org.apache.catalina.Container;
0035: import org.apache.catalina.Context;
0036: import org.apache.catalina.Globals;
0037: import org.apache.catalina.Lifecycle;
0038: import org.apache.catalina.LifecycleException;
0039: import org.apache.catalina.LifecycleListener;
0040: import org.apache.catalina.Loader;
0041: import org.apache.catalina.Session;
0042: import org.apache.catalina.util.CustomObjectInputStream;
0043: import org.apache.catalina.util.LifecycleSupport;
0044:
0045: import org.apache.catalina.session.ManagerBase;
0046: import org.apache.catalina.cluster.ClusterManager;
0047: import org.apache.catalina.cluster.SessionMessage;
0048: import org.apache.catalina.cluster.Member;
0049: import org.apache.catalina.cluster.CatalinaCluster;
0050:
0051: /**
0052: * The DeltaManager manages replicated sessions by only
0053: * replicating the deltas in data. For applications written
0054: * to handle this, the DeltaManager is the optimal way of replicating data.
0055: *
0056: * This code is almost identical to StandardManager with a difference in
0057: * how it persists sessions and some modifications to it.
0058: *
0059: * <b>IMPLEMENTATION NOTE</b>: Correct behavior of session storing and
0060: * reloading depends upon external calls to the <code>start()</code> and
0061: * <code>stop()</code> methods of this class at the correct times.
0062: *
0063: * @author Filip Hanik
0064: * @author Craig R. McClanahan
0065: * @author Jean-Francois Arcand
0066: * @version $Revision: 1.27 $ $Date: 2004/06/04 20:22:27 $
0067: */
0068:
0069: public class DeltaManager extends ManagerBase implements Lifecycle,
0070: PropertyChangeListener, ClusterManager {
0071:
0072: // ---------------------------------------------------- Security Classes
0073:
0074: public static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory
0075: .getLog(DeltaManager.class);
0076:
0077: // ----------------------------------------------------- Instance Variables
0078:
0079: /**
0080: * The descriptive information about this implementation.
0081: */
0082: private static final String info = "DeltaManager/1.0";
0083:
0084: /**
0085: * The lifecycle event support for this component.
0086: */
0087: protected LifecycleSupport lifecycle = new LifecycleSupport(this );
0088:
0089: /**
0090: * The maximum number of active Sessions allowed, or -1 for no limit.
0091: */
0092: private int maxActiveSessions = -1;
0093:
0094: /**
0095: * The descriptive name of this Manager implementation (for logging).
0096: */
0097: protected static String managerName = "DeltaManager";
0098:
0099: protected String name = null;
0100:
0101: /**
0102: * Path name of the disk file in which active sessions are saved
0103: * when we stop, and from which these sessions are loaded when we start.
0104: * A <code>null</code> value indicates that no persistence is desired.
0105: * If this pathname is relative, it will be resolved against the
0106: * temporary working directory provided by our context, available via
0107: * the <code>javax.servlet.context.tempdir</code> context attribute.
0108: */
0109: private String pathname = "SESSIONS.ser";
0110:
0111: /**
0112: * Has this component been started yet?
0113: */
0114: private boolean started = false;
0115:
0116: int rejectedSessions = 0;
0117: int expiredSessions = 0;
0118: long processingTime = 0;
0119:
0120: private CatalinaCluster cluster = null;
0121: private boolean stateTransferred;
0122: private boolean useDirtyFlag;
0123: private boolean expireSessionsOnShutdown;
0124: private boolean printToScreen;
0125:
0126: // ------------------------------------------------------------- Constructor
0127: public DeltaManager() {
0128: super ();
0129: }
0130:
0131: // ------------------------------------------------------------- Properties
0132:
0133: /**
0134: * Set the Container with which this Manager has been associated. If
0135: * it is a Context (the usual case), listen for changes to the session
0136: * timeout property.
0137: *
0138: * @param container The associated Container
0139: */
0140: public void setContainer(Container container) {
0141:
0142: // De-register from the old Container (if any)
0143: if ((this .container != null)
0144: && (this .container instanceof Context))
0145: ((Context) this .container)
0146: .removePropertyChangeListener(this );
0147:
0148: // Default processing provided by our superclass
0149: super .setContainer(container);
0150:
0151: // Register with the new Container (if any)
0152: if ((this .container != null)
0153: && (this .container instanceof Context)) {
0154: setMaxInactiveInterval(((Context) this .container)
0155: .getSessionTimeout() * 60);
0156: ((Context) this .container).addPropertyChangeListener(this );
0157: }
0158:
0159: }
0160:
0161: /**
0162: * Return descriptive information about this Manager implementation and
0163: * the corresponding version number, in the format
0164: * <code><description>/<version></code>.
0165: */
0166: public String getInfo() {
0167:
0168: return (this .info);
0169:
0170: }
0171:
0172: /**
0173: * Return the maximum number of active Sessions allowed, or -1 for
0174: * no limit.
0175: */
0176: public int getMaxActiveSessions() {
0177:
0178: return (this .maxActiveSessions);
0179:
0180: }
0181:
0182: /** Number of session creations that failed due to maxActiveSessions
0183: *
0184: * @return The count
0185: */
0186: public int getRejectedSessions() {
0187: return rejectedSessions;
0188: }
0189:
0190: public void setRejectedSessions(int rejectedSessions) {
0191: this .rejectedSessions = rejectedSessions;
0192: }
0193:
0194: /** Number of sessions that expired.
0195: *
0196: * @return The count
0197: */
0198: public int getExpiredSessions() {
0199: return expiredSessions;
0200: }
0201:
0202: public void setExpiredSessions(int expiredSessions) {
0203: this .expiredSessions = expiredSessions;
0204: }
0205:
0206: public long getProcessingTime() {
0207: return processingTime;
0208: }
0209:
0210: public void setProcessingTime(long processingTime) {
0211: this .processingTime = processingTime;
0212: }
0213:
0214: /**
0215: * Set the maximum number of actives Sessions allowed, or -1 for
0216: * no limit.
0217: *
0218: * @param max The new maximum number of sessions
0219: */
0220: public void setMaxActiveSessions(int max) {
0221:
0222: int oldMaxActiveSessions = this .maxActiveSessions;
0223: this .maxActiveSessions = max;
0224: support.firePropertyChange("maxActiveSessions", new Integer(
0225: oldMaxActiveSessions), new Integer(
0226: this .maxActiveSessions));
0227:
0228: }
0229:
0230: /**
0231: * Return the descriptive short name of this Manager implementation.
0232: */
0233: public String getName() {
0234:
0235: return (name);
0236:
0237: }
0238:
0239: /**
0240: * Return the session persistence pathname, if any.
0241: */
0242: public String getPathname() {
0243:
0244: return (this .pathname);
0245:
0246: }
0247:
0248: /**
0249: * Set the session persistence pathname to the specified value. If no
0250: * persistence support is desired, set the pathname to <code>null</code>.
0251: *
0252: * @param pathname New session persistence pathname
0253: */
0254: public void setPathname(String pathname) {
0255:
0256: String oldPathname = this .pathname;
0257: this .pathname = pathname;
0258: support.firePropertyChange("pathname", oldPathname,
0259: this .pathname);
0260:
0261: }
0262:
0263: // --------------------------------------------------------- Public Methods
0264:
0265: /**
0266: * Construct and return a new session object, based on the default
0267: * settings specified by this Manager's properties. The session
0268: * id will be assigned by this method, and available via the getId()
0269: * method of the returned session. If a new session cannot be created
0270: * for any reason, return <code>null</code>.
0271: *
0272: * @exception IllegalStateException if a new session cannot be
0273: * instantiated for any reason
0274: *
0275: * Construct and return a new session object, based on the default
0276: * settings specified by this Manager's properties. The session
0277: * id will be assigned by this method, and available via the getId()
0278: * method of the returned session. If a new session cannot be created
0279: * for any reason, return <code>null</code>.
0280: *
0281: * @exception IllegalStateException if a new session cannot be
0282: * instantiated for any reason
0283: */
0284: public Session createSession() {
0285: return createSession(true);
0286: }
0287:
0288: public Session createSession(boolean distribute) {
0289:
0290: if ((maxActiveSessions >= 0)
0291: && (sessions.size() >= maxActiveSessions)) {
0292: rejectedSessions++;
0293: throw new IllegalStateException(sm
0294: .getString("standardManager.createSession.ise"));
0295: }
0296:
0297: // Recycle or create a Session instance
0298: DeltaSession session = getNewDeltaSession();
0299: String sessionId = generateSessionId();
0300:
0301: String jvmRoute = getJvmRoute();
0302: // @todo Move appending of jvmRoute generateSessionId()???
0303: if (jvmRoute != null) {
0304: sessionId += '.' + jvmRoute;
0305: }
0306: synchronized (sessions) {
0307: while (sessions.get(sessionId) != null) { // Guarantee uniqueness
0308: duplicates++;
0309: sessionId = generateSessionId();
0310: // @todo Move appending of jvmRoute generateSessionId()???
0311: if (jvmRoute != null) {
0312: sessionId += '.' + jvmRoute;
0313: }
0314: }
0315: }
0316:
0317: session.setNew(true);
0318: session.setValid(true);
0319: session.setCreationTime(System.currentTimeMillis());
0320: session.setMaxInactiveInterval(this .maxInactiveInterval);
0321: session.setId(sessionId);
0322: session.resetDeltaRequest();
0323: // Initialize the properties of the new session and return it
0324:
0325: sessionCounter++;
0326:
0327: if (distribute) {
0328: SessionMessage msg = new SessionMessageImpl(getName(),
0329: SessionMessage.EVT_SESSION_CREATED, null,
0330: sessionId, sessionId + System.currentTimeMillis());
0331: cluster.send(msg);
0332: session.resetDeltaRequest();
0333: }
0334: log.debug("Created a DeltaSession with Id[" + session.getId()
0335: + "] Total count=" + sessions.size());
0336:
0337: return (session);
0338:
0339: }
0340:
0341: /**
0342: * Get new session class to be used in the doLoad() method.
0343: */
0344: protected DeltaSession getNewDeltaSession() {
0345: return new DeltaSession(this );
0346: }
0347:
0348: private DeltaRequest loadDeltaRequest(DeltaSession session,
0349: byte[] data) throws ClassNotFoundException, IOException {
0350: ByteArrayInputStream fis = null;
0351: ReplicationStream ois = null;
0352: Loader loader = null;
0353: ClassLoader classLoader = null;
0354: //fix to be able to run the DeltaManager
0355: //stand alone without a container.
0356: //use the Threads context class loader
0357: if (container != null)
0358: loader = container.getLoader();
0359: if (loader != null)
0360: classLoader = loader.getClassLoader();
0361: else
0362: classLoader = Thread.currentThread()
0363: .getContextClassLoader();
0364: //end fix
0365: fis = new ByteArrayInputStream(data);
0366: ois = new ReplicationStream(fis, classLoader);
0367: session.getDeltaRequest().readExternal(ois);
0368: ois.close();
0369: return session.getDeltaRequest();
0370: }
0371:
0372: private byte[] unloadDeltaRequest(DeltaRequest deltaRequest)
0373: throws IOException {
0374: ByteArrayOutputStream bos = new ByteArrayOutputStream();
0375: ObjectOutputStream oos = new ObjectOutputStream(bos);
0376: deltaRequest.writeExternal(oos);
0377: oos.flush();
0378: oos.close();
0379: return bos.toByteArray();
0380: }
0381:
0382: /**
0383: * Load any currently active sessions that were previously unloaded
0384: * to the appropriate persistence mechanism, if any. If persistence is not
0385: * supported, this method returns without doing anything.
0386: *
0387: * @exception ClassNotFoundException if a serialized class cannot be
0388: * found during the reload
0389: * @exception IOException if an input/output error occurs
0390: */
0391: private void doLoad(byte[] data) throws ClassNotFoundException,
0392: IOException {
0393:
0394: // Initialize our internal data structures
0395: //sessions.clear(); //should not do this
0396: // Open an input stream to the specified pathname, if any
0397: ByteArrayInputStream fis = null;
0398: ObjectInputStream ois = null;
0399: Loader loader = null;
0400: ClassLoader classLoader = null;
0401: try {
0402: fis = new ByteArrayInputStream(data);
0403: BufferedInputStream bis = new BufferedInputStream(fis);
0404: if (container != null)
0405: loader = container.getLoader();
0406: if (loader != null)
0407: classLoader = loader.getClassLoader();
0408: if (classLoader != null) {
0409: ois = new CustomObjectInputStream(bis, classLoader);
0410: } else {
0411: ois = new ObjectInputStream(bis);
0412: }
0413: } catch (IOException e) {
0414: log
0415: .error(sm.getString("standardManager.loading.ioe",
0416: e), e);
0417: if (ois != null) {
0418: try {
0419: ois.close();
0420: } catch (IOException f) {
0421: ;
0422: }
0423: ois = null;
0424: }
0425: throw e;
0426: }
0427:
0428: // Load the previously unloaded active sessions
0429: synchronized (sessions) {
0430: try {
0431: Integer count = (Integer) ois.readObject();
0432: int n = count.intValue();
0433: for (int i = 0; i < n; i++) {
0434: DeltaSession session = getNewDeltaSession();
0435: session.readObjectData(ois);
0436: session.setManager(this );
0437: session.setValid(true);
0438: session.setPrimarySession(false);
0439: //in case the nodes in the cluster are out of
0440: //time synch, this will make sure that we have the
0441: //correct timestamp, isValid returns true, cause accessCount=1
0442: session.access();
0443: //make sure that the session gets ready to expire if needed
0444: session.setAccessCount(0);
0445: sessions.put(session.getId(), session);
0446: }
0447: } catch (ClassNotFoundException e) {
0448: log.error(sm.getString("standardManager.loading.cnfe",
0449: e), e);
0450: if (ois != null) {
0451: try {
0452: ois.close();
0453: } catch (IOException f) {
0454: ;
0455: }
0456: ois = null;
0457: }
0458: throw e;
0459: } catch (IOException e) {
0460: log
0461: .error(sm.getString(
0462: "standardManager.loading.ioe", e), e);
0463: if (ois != null) {
0464: try {
0465: ois.close();
0466: } catch (IOException f) {
0467: ;
0468: }
0469: ois = null;
0470: }
0471: throw e;
0472: } finally {
0473: // Close the input stream
0474: try {
0475: if (ois != null)
0476: ois.close();
0477: } catch (IOException f) {
0478: // ignored
0479: }
0480:
0481: }
0482: }
0483:
0484: }
0485:
0486: /**
0487: * Save any currently active sessions in the appropriate persistence
0488: * mechanism, if any. If persistence is not supported, this method
0489: * returns without doing anything.
0490: *
0491: * @exception IOException if an input/output error occurs
0492: */
0493: private byte[] doUnload() throws IOException {
0494:
0495: // Open an output stream to the specified pathname, if any
0496: ByteArrayOutputStream fos = null;
0497: ObjectOutputStream oos = null;
0498: try {
0499: fos = new ByteArrayOutputStream();
0500: oos = new ObjectOutputStream(new BufferedOutputStream(fos));
0501: } catch (IOException e) {
0502: log.error(sm.getString("standardManager.unloading.ioe", e),
0503: e);
0504: if (oos != null) {
0505: try {
0506: oos.close();
0507: } catch (IOException f) {
0508: ;
0509: }
0510: oos = null;
0511: }
0512: throw e;
0513: }
0514:
0515: // Write the number of active sessions, followed by the details
0516: ArrayList list = new ArrayList();
0517: synchronized (sessions) {
0518: try {
0519: oos.writeObject(new Integer(sessions.size()));
0520: Iterator elements = sessions.values().iterator();
0521: while (elements.hasNext()) {
0522: DeltaSession session = (DeltaSession) elements
0523: .next();
0524: list.add(session);
0525: session.writeObjectData(oos);
0526: }
0527: oos.flush();
0528: oos.close();
0529: oos = null;
0530: } catch (IOException e) {
0531: log.error(sm.getString("standardManager.unloading.ioe",
0532: e), e);
0533: if (oos != null) {
0534: try {
0535: oos.close();
0536: } catch (IOException f) {
0537: ;
0538: }
0539: oos = null;
0540: }
0541: throw e;
0542: }
0543: }
0544:
0545: // Flush and close the output stream
0546: return fos.toByteArray();
0547: }
0548:
0549: // ------------------------------------------------------ Lifecycle Methods
0550:
0551: /**
0552: * Add a lifecycle event listener to this component.
0553: *
0554: * @param listener The listener to add
0555: */
0556: public void addLifecycleListener(LifecycleListener listener) {
0557:
0558: lifecycle.addLifecycleListener(listener);
0559:
0560: }
0561:
0562: /**
0563: * Get the lifecycle listeners associated with this lifecycle. If this
0564: * Lifecycle has no listeners registered, a zero-length array is returned.
0565: */
0566: public LifecycleListener[] findLifecycleListeners() {
0567:
0568: return lifecycle.findLifecycleListeners();
0569:
0570: }
0571:
0572: /**
0573: * Remove a lifecycle event listener from this component.
0574: *
0575: * @param listener The listener to remove
0576: */
0577: public void removeLifecycleListener(LifecycleListener listener) {
0578:
0579: lifecycle.removeLifecycleListener(listener);
0580:
0581: }
0582:
0583: /**
0584: * Prepare for the beginning of active use of the public methods of this
0585: * component. This method should be called after <code>configure()</code>,
0586: * and before any of the public methods of the component are utilized.
0587: *
0588: * @exception LifecycleException if this component detects a fatal error
0589: * that prevents this component from being used
0590: */
0591: public void start() throws LifecycleException {
0592: if (!initialized)
0593: init();
0594:
0595: // Validate and update our current component state
0596: if (started) {
0597: return;
0598: }
0599: started = true;
0600: lifecycle.fireLifecycleEvent(START_EVENT, null);
0601:
0602: // Force initialization of the random number generator
0603: String dummy = generateSessionId();
0604:
0605: // Load unloaded sessions, if any
0606: try {
0607: //the channel is already running
0608: log.info("Starting clustering manager...:" + getName());
0609: if (cluster == null) {
0610: log
0611: .error("Starting... no cluster associated with this context:"
0612: + getName());
0613: return;
0614: }
0615:
0616: if (cluster.getMembers().length > 0) {
0617: Member mbr = cluster.getMembers()[0];
0618: SessionMessage msg = new SessionMessageImpl(this
0619: .getName(),
0620: SessionMessage.EVT_GET_ALL_SESSIONS, null,
0621: "GET-ALL", "GET-ALL-" + getName());
0622: //just to make sure the other server has the context started
0623: // long timetowait = 20000-mbr.getMemberAliveTime();
0624: // if ( timetowait > 0 ) {
0625: // log.info("The other server has not been around more than 20 seconds, will sleep for "+timetowait+" ms. in order to let it startup");
0626: // try { Thread.currentThread().sleep(timetowait); } catch ( Exception x ) {}
0627: // }//end if
0628:
0629: //request session state
0630: cluster.send(msg, mbr);
0631: log
0632: .warn("Manager["
0633: + getName()
0634: + "], requesting session state from "
0635: + mbr
0636: + ". This operation will timeout if no session state has been received within "
0637: + "60 seconds");
0638: long reqStart = System.currentTimeMillis();
0639: long reqNow = 0;
0640: boolean isTimeout = false;
0641: do {
0642: try {
0643: Thread.currentThread().sleep(100);
0644: } catch (Exception sleep) {
0645: }
0646: reqNow = System.currentTimeMillis();
0647: isTimeout = ((reqNow - reqStart) > (1000 * 60));
0648: } while ((!getStateTransferred()) && (!isTimeout));
0649: if (isTimeout || (!getStateTransferred())) {
0650: log
0651: .error("Manager["
0652: + getName()
0653: + "], No session state received, timing out.");
0654: } else {
0655: log.info("Manager[" + getName()
0656: + "], session state received in "
0657: + (reqNow - reqStart) + " ms.");
0658: }
0659: } else {
0660: log
0661: .info("Manager["
0662: + getName()
0663: + "], skipping state transfer. No members active in cluster group.");
0664: }//end if
0665:
0666: } catch (Throwable t) {
0667: log.error(sm.getString("standardManager.managerLoad"), t);
0668: }
0669:
0670: }
0671:
0672: /**
0673: * Gracefully terminate the active use of the public methods of this
0674: * component. This method should be the last one called on a given
0675: * instance of this component.
0676: *
0677: * @exception LifecycleException if this component detects a fatal error
0678: * that needs to be reported
0679: */
0680: public void stop() throws LifecycleException {
0681:
0682: if (log.isDebugEnabled())
0683: log.debug("Stopping");
0684:
0685: // Validate and update our current component state
0686: if (!started)
0687: throw new LifecycleException(sm
0688: .getString("standardManager.notStarted"));
0689: lifecycle.fireLifecycleEvent(STOP_EVENT, null);
0690: started = false;
0691:
0692: // Expire all active sessions
0693: if (this .getExpireSessionsOnShutdown()) {
0694: log.info("Expiring sessions upon shutdown");
0695: Session sessions[] = findSessions();
0696: for (int i = 0; i < sessions.length; i++) {
0697: DeltaSession session = (DeltaSession) sessions[i];
0698: if (!session.isValid())
0699: continue;
0700: try {
0701: session.expire();
0702: } catch (Throwable t) {
0703: ;
0704: } //catch
0705: } //for
0706: }//end if
0707:
0708: // Require a new random number generator if we are restarted
0709: this .random = null;
0710:
0711: if (initialized) {
0712: destroy();
0713: }
0714: getCluster().removeManager(getName());
0715: }
0716:
0717: // ----------------------------------------- PropertyChangeListener Methods
0718:
0719: /**
0720: * Process property change events from our associated Context.
0721: *
0722: * @param event The property change event that has occurred
0723: */
0724: public void propertyChange(PropertyChangeEvent event) {
0725:
0726: // Validate the source of this event
0727: if (!(event.getSource() instanceof Context))
0728: return;
0729: Context context = (Context) event.getSource();
0730:
0731: // Process a relevant property change
0732: if (event.getPropertyName().equals("sessionTimeout")) {
0733: try {
0734: setMaxInactiveInterval(((Integer) event.getNewValue())
0735: .intValue() * 60);
0736: } catch (NumberFormatException e) {
0737: log.error(sm.getString(
0738: "standardManager.sessionTimeout", event
0739: .getNewValue().toString()));
0740: }
0741: }
0742:
0743: }
0744:
0745: // -------------------------------------------------------- Replication Methods
0746:
0747: /**
0748: * A message was received from another node, this
0749: * is the callback method to implement if you are interested in
0750: * receiving replication messages.
0751: * @param msg - the message received.
0752: */
0753: public void messageDataReceived(SessionMessage msg) {
0754: messageReceived(msg, msg.getAddress() != null ? (Member) msg
0755: .getAddress() : null);
0756: }
0757:
0758: /**
0759: * When the request has been completed, the replication valve
0760: * will notify the manager, and the manager will decide whether
0761: * any replication is needed or not.
0762: * If there is a need for replication, the manager will
0763: * create a session message and that will be replicated.
0764: * The cluster determines where it gets sent.
0765: * @param sessionId - the sessionId that just completed.
0766: * @return a SessionMessage to be sent,
0767: */
0768: public SessionMessage requestCompleted(String sessionId) {
0769: try {
0770: DeltaSession session = (DeltaSession) findSession(sessionId);
0771: DeltaRequest deltaRequest = session.getDeltaRequest();
0772: SessionMessage msg = null;
0773: if (deltaRequest.getSize() > 0) {
0774:
0775: byte[] data = unloadDeltaRequest(deltaRequest);
0776: msg = new SessionMessageImpl(name,
0777: SessionMessage.EVT_SESSION_DELTA, data,
0778: sessionId, sessionId
0779: + System.currentTimeMillis());
0780: session.resetDeltaRequest();
0781: } else if (!session.isPrimarySession()) {
0782: msg = new SessionMessageImpl(getName(),
0783: SessionMessage.EVT_SESSION_ACCESSED, null,
0784: sessionId, sessionId
0785: + System.currentTimeMillis());
0786: }
0787: session.setPrimarySession(true);
0788: //check to see if we need to send out an access message
0789: if ((msg == null)) {
0790: long replDelta = System.currentTimeMillis()
0791: - session.getLastTimeReplicated();
0792: if (replDelta > (getMaxInactiveInterval() * 1000)) {
0793: msg = new SessionMessageImpl(getName(),
0794: SessionMessage.EVT_SESSION_ACCESSED, null,
0795: sessionId, sessionId
0796: + System.currentTimeMillis());
0797: }
0798:
0799: }
0800:
0801: //update last replicated time
0802: if (msg != null)
0803: session.setLastTimeReplicated(System
0804: .currentTimeMillis());
0805: return msg;
0806: } catch (IOException x) {
0807: log.error("Unable to serialize delta request", x);
0808: return null;
0809: }
0810:
0811: }
0812:
0813: protected void sessionExpired(String id) {
0814: SessionMessage msg = new SessionMessageImpl(getName(),
0815: SessionMessage.EVT_SESSION_EXPIRED, null, id, id
0816: + "-EXPIRED-MSG");
0817: cluster.send(msg);
0818: }
0819:
0820: /**
0821: * When the manager expires session not tied to a request.
0822: * The cluster will periodically ask for a list of sessions
0823: * that should expire and that should be sent across the wire.
0824: * @return
0825: */
0826: public String[] getInvalidatedSessions() {
0827: return new String[0];
0828: }
0829:
0830: /**
0831: * This method is called by the received thread when a SessionMessage has
0832: * been received from one of the other nodes in the cluster.
0833: * @param msg - the message received
0834: * @param sender - the sender of the message, this is used if we receive a
0835: * EVT_GET_ALL_SESSION message, so that we only reply to
0836: * the requesting node
0837: */
0838: protected void messageReceived(SessionMessage msg, Member sender) {
0839: try {
0840: log.debug("Manager (" + name
0841: + ") Received SessionMessage of type="
0842: + msg.getEventTypeString() + " from " + sender);
0843: switch (msg.getEventType()) {
0844: case SessionMessage.EVT_GET_ALL_SESSIONS: {
0845: //get a list of all the session from this manager
0846: log.debug("Manager (" + name + ") unloading sessions");
0847: byte[] data = doUnload();
0848: log.debug("Manager (" + name
0849: + ") unloading sessions complete");
0850: SessionMessage newmsg = new SessionMessageImpl(name,
0851: SessionMessage.EVT_ALL_SESSION_DATA, data,
0852: "SESSION-STATE", "SESSION-STATE-" + getName());
0853: cluster.send(newmsg, sender);
0854: break;
0855: }
0856: case SessionMessage.EVT_ALL_SESSION_DATA: {
0857: log.debug("Manager (" + name
0858: + ") received session state data.");
0859: byte[] data = msg.getSession();
0860: doLoad(data);
0861: log.debug("Manager (" + name + ") state deserialized.");
0862: stateTransferred = true;
0863: break;
0864: }
0865: case SessionMessage.EVT_SESSION_CREATED: {
0866: DeltaSession session = (DeltaSession) createSession(false);
0867: session.setId(msg.getSessionID());
0868: session.setNew(false);
0869: session.setPrimarySession(false);
0870: session.resetDeltaRequest();
0871: break;
0872: }
0873: case SessionMessage.EVT_SESSION_EXPIRED: {
0874: DeltaSession session = (DeltaSession) findSession(msg
0875: .getSessionID());
0876: if (session != null) {
0877: session.expire(true, false);
0878: } //end if
0879: break;
0880: }
0881: case SessionMessage.EVT_SESSION_ACCESSED: {
0882: DeltaSession session = (DeltaSession) findSession(msg
0883: .getSessionID());
0884: if (session != null) {
0885: session.access();
0886: session.setPrimarySession(false);
0887: session.endAccess();
0888: }
0889: break;
0890: }
0891: case SessionMessage.EVT_SESSION_DELTA: {
0892: byte[] delta = msg.getSession();
0893: DeltaSession session = (DeltaSession) findSession(msg
0894: .getSessionID());
0895: if (session != null) {
0896: DeltaRequest dreq = loadDeltaRequest(session, delta);
0897: dreq.execute(session);
0898: session.setPrimarySession(false);
0899: }
0900:
0901: break;
0902: }
0903: default: {
0904: //we didn't recognize the message type, do nothing
0905: break;
0906: }
0907: } //switch
0908: } catch (Exception x) {
0909: log.error("Unable to receive message through TCP channel",
0910: x);
0911: }
0912: }
0913:
0914: // -------------------------------------------------------- Private Methods
0915:
0916: public void backgroundProcess() {
0917: log.debug("DeltaManager.backgroundProcess invoked at "
0918: + System.currentTimeMillis());
0919: processExpires();
0920: }
0921:
0922: /**
0923: * Invalidate all sessions that have expired.
0924: */
0925: public void processExpires() {
0926: long timeNow = System.currentTimeMillis();
0927: Session sessions[] = findSessions();
0928:
0929: for (int i = 0; i < sessions.length; i++) {
0930: DeltaSession session = (DeltaSession) sessions[i];
0931: if (!session.isValid()) {
0932: try {
0933: expiredSessions++;
0934: } catch (Throwable t) {
0935: log
0936: .error(
0937: sm
0938: .getString("standardManager.expireException"),
0939: t);
0940: }
0941: }
0942: }
0943: long timeEnd = System.currentTimeMillis();
0944: processingTime += (timeEnd - timeNow);
0945: }
0946:
0947: public boolean getStateTransferred() {
0948: return stateTransferred;
0949: }
0950:
0951: public void setStateTransferred(boolean stateTransferred) {
0952: this .stateTransferred = stateTransferred;
0953: }
0954:
0955: public CatalinaCluster getCluster() {
0956: return cluster;
0957: }
0958:
0959: public void setCluster(CatalinaCluster cluster) {
0960: this .cluster = cluster;
0961: }
0962:
0963: public void load() {
0964:
0965: }
0966:
0967: public void unload() {
0968:
0969: }
0970:
0971: public boolean getUseDirtyFlag() {
0972: return useDirtyFlag;
0973: }
0974:
0975: public void setUseDirtyFlag(boolean useDirtyFlag) {
0976: this .useDirtyFlag = useDirtyFlag;
0977: }
0978:
0979: public boolean getExpireSessionsOnShutdown() {
0980: return expireSessionsOnShutdown;
0981: }
0982:
0983: public void setExpireSessionsOnShutdown(
0984: boolean expireSessionsOnShutdown) {
0985: this .expireSessionsOnShutdown = expireSessionsOnShutdown;
0986: }
0987:
0988: public boolean getPrintToScreen() {
0989: return printToScreen;
0990: }
0991:
0992: public void setPrintToScreen(boolean printToScreen) {
0993: this .printToScreen = printToScreen;
0994: }
0995:
0996: public void setName(String name) {
0997: this.name = name;
0998: }
0999:
1000: }
|