0001: /*
0002: * Licensed to the Apache Software Foundation (ASF) under one or more
0003: * contributor license agreements. See the NOTICE file distributed with
0004: * this work for additional information regarding copyright ownership.
0005: * The ASF licenses this file to You under the Apache License, Version 2.0
0006: * (the "License"); you may not use this file except in compliance with
0007: * the License. You may obtain a copy of the License at
0008: *
0009: * http://www.apache.org/licenses/LICENSE-2.0
0010: *
0011: * Unless required by applicable law or agreed to in writing, software
0012: * distributed under the License is distributed on an "AS IS" BASIS,
0013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014: * See the License for the specific language governing permissions and
0015: * limitations under the License.
0016: */
0017:
0018: package org.apache.catalina.ha.session;
0019:
0020: import java.beans.PropertyChangeEvent;
0021: import java.io.BufferedOutputStream;
0022: import java.io.ByteArrayOutputStream;
0023: import java.io.IOException;
0024: import java.io.ObjectInputStream;
0025: import java.io.ObjectOutputStream;
0026: import java.util.ArrayList;
0027: import java.util.Date;
0028: import java.util.Iterator;
0029:
0030: import org.apache.catalina.Cluster;
0031: import org.apache.catalina.Container;
0032: import org.apache.catalina.Context;
0033: import org.apache.catalina.Engine;
0034: import org.apache.catalina.Host;
0035: import org.apache.catalina.LifecycleException;
0036: import org.apache.catalina.LifecycleListener;
0037: import org.apache.catalina.Session;
0038: import org.apache.catalina.Valve;
0039: import org.apache.catalina.core.StandardContext;
0040: import org.apache.catalina.ha.CatalinaCluster;
0041: import org.apache.catalina.ha.ClusterMessage;
0042: import org.apache.catalina.ha.tcp.ReplicationValve;
0043: import org.apache.catalina.tribes.Member;
0044: import org.apache.catalina.tribes.io.ReplicationStream;
0045: import org.apache.catalina.util.LifecycleSupport;
0046: import org.apache.catalina.util.StringManager;
0047: import org.apache.catalina.ha.ClusterManager;
0048:
0049: /**
0050: * The DeltaManager manages replicated sessions by only replicating the deltas
0051: * in data. For applications written to handle this, the DeltaManager is the
0052: * optimal way of replicating data.
0053: *
0054: * This code is almost identical to StandardManager with a difference in how it
0055: * persists sessions and some modifications to it.
0056: *
0057: * <b>IMPLEMENTATION NOTE </b>: Correct behavior of session storing and
0058: * reloading depends upon external calls to the <code>start()</code> and
0059: * <code>stop()</code> methods of this class at the correct times.
0060: *
0061: * @author Filip Hanik
0062: * @author Craig R. McClanahan
0063: * @author Jean-Francois Arcand
0064: * @author Peter Rossbach
0065: * @version $Revision: 467222 $ $Date: 2006-10-24 05:17:11 +0200 (mar., 24 oct. 2006) $
0066: */
0067:
0068: public class DeltaManager extends ClusterManagerBase {
0069:
0070: // ---------------------------------------------------- Security Classes
0071: public static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory
0072: .getLog(DeltaManager.class);
0073:
0074: /**
0075: * The string manager for this package.
0076: */
0077: protected static StringManager sm = StringManager
0078: .getManager(Constants.Package);
0079:
0080: // ----------------------------------------------------- Instance Variables
0081:
0082: /**
0083: * The descriptive information about this implementation.
0084: */
0085: private static final String info = "DeltaManager/2.1";
0086:
0087: /**
0088: * Has this component been started yet?
0089: */
0090: private boolean started = false;
0091:
0092: /**
0093: * The descriptive name of this Manager implementation (for logging).
0094: */
0095: protected static String managerName = "DeltaManager";
0096: protected String name = null;
0097: protected boolean defaultMode = false;
0098: private CatalinaCluster cluster = null;
0099:
0100: /**
0101: * cached replication valve cluster container!
0102: */
0103: private ReplicationValve replicationValve = null;
0104:
0105: /**
0106: * The lifecycle event support for this component.
0107: */
0108: protected LifecycleSupport lifecycle = new LifecycleSupport(this );
0109:
0110: /**
0111: * The maximum number of active Sessions allowed, or -1 for no limit.
0112: */
0113: private int maxActiveSessions = -1;
0114: private boolean expireSessionsOnShutdown = false;
0115: private boolean notifyListenersOnReplication = true;
0116: private boolean notifySessionListenersOnReplication = true;
0117: private boolean stateTransfered = false;
0118: private int stateTransferTimeout = 60;
0119: private boolean sendAllSessions = true;
0120: private boolean sendClusterDomainOnly = true;
0121: private int sendAllSessionsSize = 1000;
0122:
0123: /**
0124: * wait time between send session block (default 2 sec)
0125: */
0126: private int sendAllSessionsWaitTime = 2 * 1000;
0127: private ArrayList receivedMessageQueue = new ArrayList();
0128: private boolean receiverQueue = false;
0129: private boolean stateTimestampDrop = true;
0130: private long stateTransferCreateSendTime;
0131:
0132: // ------------------------------------------------------------------ stats attributes
0133:
0134: int rejectedSessions = 0;
0135: private long sessionReplaceCounter = 0;
0136: long processingTime = 0;
0137: private long counterReceive_EVT_GET_ALL_SESSIONS = 0;
0138: private long counterSend_EVT_ALL_SESSION_DATA = 0;
0139: private long counterReceive_EVT_ALL_SESSION_DATA = 0;
0140: private long counterReceive_EVT_SESSION_CREATED = 0;
0141: private long counterReceive_EVT_SESSION_EXPIRED = 0;
0142: private long counterReceive_EVT_SESSION_ACCESSED = 0;
0143: private long counterReceive_EVT_SESSION_DELTA = 0;
0144: private long counterSend_EVT_GET_ALL_SESSIONS = 0;
0145: private long counterSend_EVT_SESSION_CREATED = 0;
0146: private long counterSend_EVT_SESSION_DELTA = 0;
0147: private long counterSend_EVT_SESSION_ACCESSED = 0;
0148: private long counterSend_EVT_SESSION_EXPIRED = 0;
0149: private int counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE = 0;
0150: private int counterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE = 0;
0151: private int counterNoStateTransfered = 0;
0152:
0153: // ------------------------------------------------------------- Constructor
0154: public DeltaManager() {
0155: super ();
0156: }
0157:
0158: // ------------------------------------------------------------- Properties
0159:
0160: /**
0161: * Return descriptive information about this Manager implementation and the
0162: * corresponding version number, in the format
0163: * <code><description>/<version></code>.
0164: */
0165: public String getInfo() {
0166: return info;
0167: }
0168:
0169: public void setName(String name) {
0170: this .name = name;
0171: }
0172:
0173: /**
0174: * Return the descriptive short name of this Manager implementation.
0175: */
0176: public String getName() {
0177: return name;
0178: }
0179:
0180: /**
0181: * @return Returns the counterSend_EVT_GET_ALL_SESSIONS.
0182: */
0183: public long getCounterSend_EVT_GET_ALL_SESSIONS() {
0184: return counterSend_EVT_GET_ALL_SESSIONS;
0185: }
0186:
0187: /**
0188: * @return Returns the counterSend_EVT_SESSION_ACCESSED.
0189: */
0190: public long getCounterSend_EVT_SESSION_ACCESSED() {
0191: return counterSend_EVT_SESSION_ACCESSED;
0192: }
0193:
0194: /**
0195: * @return Returns the counterSend_EVT_SESSION_CREATED.
0196: */
0197: public long getCounterSend_EVT_SESSION_CREATED() {
0198: return counterSend_EVT_SESSION_CREATED;
0199: }
0200:
0201: /**
0202: * @return Returns the counterSend_EVT_SESSION_DELTA.
0203: */
0204: public long getCounterSend_EVT_SESSION_DELTA() {
0205: return counterSend_EVT_SESSION_DELTA;
0206: }
0207:
0208: /**
0209: * @return Returns the counterSend_EVT_SESSION_EXPIRED.
0210: */
0211: public long getCounterSend_EVT_SESSION_EXPIRED() {
0212: return counterSend_EVT_SESSION_EXPIRED;
0213: }
0214:
0215: /**
0216: * @return Returns the counterSend_EVT_ALL_SESSION_DATA.
0217: */
0218: public long getCounterSend_EVT_ALL_SESSION_DATA() {
0219: return counterSend_EVT_ALL_SESSION_DATA;
0220: }
0221:
0222: /**
0223: * @return Returns the counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE.
0224: */
0225: public int getCounterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE() {
0226: return counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE;
0227: }
0228:
0229: /**
0230: * @return Returns the counterReceive_EVT_ALL_SESSION_DATA.
0231: */
0232: public long getCounterReceive_EVT_ALL_SESSION_DATA() {
0233: return counterReceive_EVT_ALL_SESSION_DATA;
0234: }
0235:
0236: /**
0237: * @return Returns the counterReceive_EVT_GET_ALL_SESSIONS.
0238: */
0239: public long getCounterReceive_EVT_GET_ALL_SESSIONS() {
0240: return counterReceive_EVT_GET_ALL_SESSIONS;
0241: }
0242:
0243: /**
0244: * @return Returns the counterReceive_EVT_SESSION_ACCESSED.
0245: */
0246: public long getCounterReceive_EVT_SESSION_ACCESSED() {
0247: return counterReceive_EVT_SESSION_ACCESSED;
0248: }
0249:
0250: /**
0251: * @return Returns the counterReceive_EVT_SESSION_CREATED.
0252: */
0253: public long getCounterReceive_EVT_SESSION_CREATED() {
0254: return counterReceive_EVT_SESSION_CREATED;
0255: }
0256:
0257: /**
0258: * @return Returns the counterReceive_EVT_SESSION_DELTA.
0259: */
0260: public long getCounterReceive_EVT_SESSION_DELTA() {
0261: return counterReceive_EVT_SESSION_DELTA;
0262: }
0263:
0264: /**
0265: * @return Returns the counterReceive_EVT_SESSION_EXPIRED.
0266: */
0267: public long getCounterReceive_EVT_SESSION_EXPIRED() {
0268: return counterReceive_EVT_SESSION_EXPIRED;
0269: }
0270:
0271: /**
0272: * @return Returns the counterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE.
0273: */
0274: public int getCounterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE() {
0275: return counterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE;
0276: }
0277:
0278: /**
0279: * @return Returns the processingTime.
0280: */
0281: public long getProcessingTime() {
0282: return processingTime;
0283: }
0284:
0285: /**
0286: * @return Returns the sessionReplaceCounter.
0287: */
0288: public long getSessionReplaceCounter() {
0289: return sessionReplaceCounter;
0290: }
0291:
0292: /**
0293: * Number of session creations that failed due to maxActiveSessions
0294: *
0295: * @return The count
0296: */
0297: public int getRejectedSessions() {
0298: return rejectedSessions;
0299: }
0300:
0301: public void setRejectedSessions(int rejectedSessions) {
0302: this .rejectedSessions = rejectedSessions;
0303: }
0304:
0305: /**
0306: * @return Returns the counterNoStateTransfered.
0307: */
0308: public int getCounterNoStateTransfered() {
0309: return counterNoStateTransfered;
0310: }
0311:
0312: public int getReceivedQueueSize() {
0313: return receivedMessageQueue.size();
0314: }
0315:
0316: /**
0317: * @return Returns the stateTransferTimeout.
0318: */
0319: public int getStateTransferTimeout() {
0320: return stateTransferTimeout;
0321: }
0322:
0323: /**
0324: * @param timeoutAllSession The timeout
0325: */
0326: public void setStateTransferTimeout(int timeoutAllSession) {
0327: this .stateTransferTimeout = timeoutAllSession;
0328: }
0329:
0330: /**
0331: * is session state transfered complete?
0332: *
0333: */
0334: public boolean getStateTransfered() {
0335: return stateTransfered;
0336: }
0337:
0338: /**
0339: * set that state ist complete transfered
0340: * @param stateTransfered
0341: */
0342: public void setStateTransfered(boolean stateTransfered) {
0343: this .stateTransfered = stateTransfered;
0344: }
0345:
0346: /**
0347: * @return Returns the sendAllSessionsWaitTime in msec
0348: */
0349: public int getSendAllSessionsWaitTime() {
0350: return sendAllSessionsWaitTime;
0351: }
0352:
0353: /**
0354: * @param sendAllSessionsWaitTime The sendAllSessionsWaitTime to set at msec.
0355: */
0356: public void setSendAllSessionsWaitTime(int sendAllSessionsWaitTime) {
0357: this .sendAllSessionsWaitTime = sendAllSessionsWaitTime;
0358: }
0359:
0360: /**
0361: * @return Returns the sendClusterDomainOnly.
0362: */
0363: public boolean doDomainReplication() {
0364: return sendClusterDomainOnly;
0365: }
0366:
0367: /**
0368: * @param sendClusterDomainOnly The sendClusterDomainOnly to set.
0369: */
0370: public void setDomainReplication(boolean sendClusterDomainOnly) {
0371: this .sendClusterDomainOnly = sendClusterDomainOnly;
0372: }
0373:
0374: /**
0375: * @return Returns the stateTimestampDrop.
0376: */
0377: public boolean isStateTimestampDrop() {
0378: return stateTimestampDrop;
0379: }
0380:
0381: /**
0382: * @param isTimestampDrop The new flag value
0383: */
0384: public void setStateTimestampDrop(boolean isTimestampDrop) {
0385: this .stateTimestampDrop = isTimestampDrop;
0386: }
0387:
0388: /**
0389: * Return the maximum number of active Sessions allowed, or -1 for no limit.
0390: */
0391: public int getMaxActiveSessions() {
0392: return (this .maxActiveSessions);
0393: }
0394:
0395: /**
0396: * Set the maximum number of actives Sessions allowed, or -1 for no limit.
0397: *
0398: * @param max
0399: * The new maximum number of sessions
0400: */
0401: public void setMaxActiveSessions(int max) {
0402: int oldMaxActiveSessions = this .maxActiveSessions;
0403: this .maxActiveSessions = max;
0404: support.firePropertyChange("maxActiveSessions", new Integer(
0405: oldMaxActiveSessions), new Integer(
0406: this .maxActiveSessions));
0407: }
0408:
0409: /**
0410: *
0411: * @return Returns the sendAllSessions.
0412: */
0413: public boolean isSendAllSessions() {
0414: return sendAllSessions;
0415: }
0416:
0417: /**
0418: * @param sendAllSessions The sendAllSessions to set.
0419: */
0420: public void setSendAllSessions(boolean sendAllSessions) {
0421: this .sendAllSessions = sendAllSessions;
0422: }
0423:
0424: /**
0425: * @return Returns the sendAllSessionsSize.
0426: */
0427: public int getSendAllSessionsSize() {
0428: return sendAllSessionsSize;
0429: }
0430:
0431: /**
0432: * @param sendAllSessionsSize The sendAllSessionsSize to set.
0433: */
0434: public void setSendAllSessionsSize(int sendAllSessionsSize) {
0435: this .sendAllSessionsSize = sendAllSessionsSize;
0436: }
0437:
0438: /**
0439: * @return Returns the notifySessionListenersOnReplication.
0440: */
0441: public boolean isNotifySessionListenersOnReplication() {
0442: return notifySessionListenersOnReplication;
0443: }
0444:
0445: /**
0446: * @param notifyListenersCreateSessionOnReplication The notifySessionListenersOnReplication to set.
0447: */
0448: public void setNotifySessionListenersOnReplication(
0449: boolean notifyListenersCreateSessionOnReplication) {
0450: this .notifySessionListenersOnReplication = notifyListenersCreateSessionOnReplication;
0451: }
0452:
0453: public boolean isExpireSessionsOnShutdown() {
0454: return expireSessionsOnShutdown;
0455: }
0456:
0457: public void setExpireSessionsOnShutdown(
0458: boolean expireSessionsOnShutdown) {
0459: this .expireSessionsOnShutdown = expireSessionsOnShutdown;
0460: }
0461:
0462: public boolean isNotifyListenersOnReplication() {
0463: return notifyListenersOnReplication;
0464: }
0465:
0466: public void setNotifyListenersOnReplication(
0467: boolean notifyListenersOnReplication) {
0468: this .notifyListenersOnReplication = notifyListenersOnReplication;
0469: }
0470:
0471: /**
0472: * @return Returns the defaultMode.
0473: */
0474: public boolean isDefaultMode() {
0475: return defaultMode;
0476: }
0477:
0478: /**
0479: * @param defaultMode The defaultMode to set.
0480: */
0481: public void setDefaultMode(boolean defaultMode) {
0482: this .defaultMode = defaultMode;
0483: }
0484:
0485: public CatalinaCluster getCluster() {
0486: return cluster;
0487: }
0488:
0489: public void setCluster(CatalinaCluster cluster) {
0490: this .cluster = cluster;
0491: }
0492:
0493: /**
0494: * Set the Container with which this Manager has been associated. If it is a
0495: * Context (the usual case), listen for changes to the session timeout
0496: * property.
0497: *
0498: * @param container
0499: * The associated Container
0500: */
0501: public void setContainer(Container container) {
0502: // De-register from the old Container (if any)
0503: if ((this .container != null)
0504: && (this .container instanceof Context))
0505: ((Context) this .container)
0506: .removePropertyChangeListener(this );
0507:
0508: // Default processing provided by our superclass
0509: super .setContainer(container);
0510:
0511: // Register with the new Container (if any)
0512: if ((this .container != null)
0513: && (this .container instanceof Context)) {
0514: setMaxInactiveInterval(((Context) this .container)
0515: .getSessionTimeout() * 60);
0516: ((Context) this .container).addPropertyChangeListener(this );
0517: }
0518:
0519: }
0520:
0521: // --------------------------------------------------------- Public Methods
0522:
0523: /**
0524: * Construct and return a new session object, based on the default settings
0525: * specified by this Manager's properties. The session id will be assigned
0526: * by this method, and available via the getId() method of the returned
0527: * session. If a new session cannot be created for any reason, return
0528: * <code>null</code>.
0529: *
0530: * @exception IllegalStateException
0531: * if a new session cannot be instantiated for any reason
0532: *
0533: * Construct and return a new session object, based on the default settings
0534: * specified by this Manager's properties. The session id will be assigned
0535: * by this method, and available via the getId() method of the returned
0536: * session. If a new session cannot be created for any reason, return
0537: * <code>null</code>.
0538: *
0539: * @exception IllegalStateException
0540: * if a new session cannot be instantiated for any reason
0541: */
0542: public Session createSession(String sessionId) {
0543: return createSession(sessionId, true);
0544: }
0545:
0546: /**
0547: * create new session with check maxActiveSessions and send session creation
0548: * to other cluster nodes.
0549: *
0550: * @param distribute
0551: * @return The session
0552: */
0553: public Session createSession(String sessionId, boolean distribute) {
0554: if ((maxActiveSessions >= 0)
0555: && (sessions.size() >= maxActiveSessions)) {
0556: rejectedSessions++;
0557: throw new IllegalStateException(sm
0558: .getString("deltaManager.createSession.ise"));
0559: }
0560: DeltaSession session = (DeltaSession) super
0561: .createSession(sessionId);
0562: if (distribute) {
0563: sendCreateSession(session.getId(), session);
0564: }
0565: if (log.isDebugEnabled())
0566: log.debug(sm.getString(
0567: "deltaManager.createSession.newSession", session
0568: .getId(), new Integer(sessions.size())));
0569: return (session);
0570:
0571: }
0572:
0573: /**
0574: * Send create session evt to all backup node
0575: * @param sessionId
0576: * @param session
0577: */
0578: protected void sendCreateSession(String sessionId,
0579: DeltaSession session) {
0580: if (cluster.getMembers().length > 0) {
0581: SessionMessage msg = new SessionMessageImpl(getName(),
0582: SessionMessage.EVT_SESSION_CREATED, null,
0583: sessionId, sessionId + "-"
0584: + System.currentTimeMillis());
0585: if (log.isDebugEnabled())
0586: log.debug(sm.getString(
0587: "deltaManager.sendMessage.newSession", name,
0588: sessionId));
0589: msg.setTimestamp(session.getCreationTime());
0590: counterSend_EVT_SESSION_CREATED++;
0591: send(msg);
0592: }
0593: }
0594:
0595: /**
0596: * Send messages to other backup member (domain or all)
0597: * @param msg Session message
0598: */
0599: protected void send(SessionMessage msg) {
0600: if (cluster != null) {
0601: if (doDomainReplication())
0602: cluster.sendClusterDomain(msg);
0603: else
0604: cluster.send(msg);
0605: }
0606: }
0607:
0608: /**
0609: * Create DeltaSession
0610: * @see org.apache.catalina.Manager#createEmptySession()
0611: */
0612: public Session createEmptySession() {
0613: return getNewDeltaSession();
0614: }
0615:
0616: /**
0617: * Get new session class to be used in the doLoad() method.
0618: */
0619: protected DeltaSession getNewDeltaSession() {
0620: return new DeltaSession(this );
0621: }
0622:
0623: /**
0624: * Load Deltarequest from external node
0625: * Load the Class at container classloader
0626: * @see DeltaRequest#readExternal(java.io.ObjectInput)
0627: * @param session
0628: * @param data message data
0629: * @return The request
0630: * @throws ClassNotFoundException
0631: * @throws IOException
0632: */
0633: protected DeltaRequest deserializeDeltaRequest(
0634: DeltaSession session, byte[] data)
0635: throws ClassNotFoundException, IOException {
0636: ReplicationStream ois = getReplicationStream(data);
0637: session.getDeltaRequest().readExternal(ois);
0638: ois.close();
0639: return session.getDeltaRequest();
0640: }
0641:
0642: /**
0643: * serialize DeltaRequest
0644: * @see DeltaRequest#writeExternal(java.io.ObjectOutput)
0645: *
0646: * @param deltaRequest
0647: * @return serialized delta request
0648: * @throws IOException
0649: */
0650: protected byte[] serializeDeltaRequest(DeltaRequest deltaRequest)
0651: throws IOException {
0652: return deltaRequest.serialize();
0653: }
0654:
0655: /**
0656: * Load sessions from other cluster node.
0657: * FIXME replace currently sessions with same id without notifcation.
0658: * FIXME SSO handling is not really correct with the session replacement!
0659: * @exception ClassNotFoundException
0660: * if a serialized class cannot be found during the reload
0661: * @exception IOException
0662: * if an input/output error occurs
0663: */
0664: protected void deserializeSessions(byte[] data)
0665: throws ClassNotFoundException, IOException {
0666:
0667: // Initialize our internal data structures
0668: //sessions.clear(); //should not do this
0669: // Open an input stream to the specified pathname, if any
0670: ClassLoader originalLoader = Thread.currentThread()
0671: .getContextClassLoader();
0672: ObjectInputStream ois = null;
0673: // Load the previously unloaded active sessions
0674: try {
0675: ois = getReplicationStream(data);
0676: Integer count = (Integer) ois.readObject();
0677: int n = count.intValue();
0678: for (int i = 0; i < n; i++) {
0679: DeltaSession session = (DeltaSession) createEmptySession();
0680: session.readObjectData(ois);
0681: session.setManager(this );
0682: session.setValid(true);
0683: session.setPrimarySession(false);
0684: //in case the nodes in the cluster are out of
0685: //time synch, this will make sure that we have the
0686: //correct timestamp, isValid returns true, cause
0687: // accessCount=1
0688: session.access();
0689: //make sure that the session gets ready to expire if
0690: // needed
0691: session.setAccessCount(0);
0692: session.resetDeltaRequest();
0693: // FIXME How inform other session id cache like SingleSignOn
0694: // increment sessionCounter to correct stats report
0695: if (findSession(session.getIdInternal()) == null) {
0696: sessionCounter++;
0697: } else {
0698: sessionReplaceCounter++;
0699: // FIXME better is to grap this sessions again !
0700: if (log.isWarnEnabled())
0701: log
0702: .warn(sm
0703: .getString(
0704: "deltaManager.loading.existing.session",
0705: session.getIdInternal()));
0706: }
0707: add(session);
0708: }
0709: } catch (ClassNotFoundException e) {
0710: log.error(sm.getString("deltaManager.loading.cnfe", e), e);
0711: throw e;
0712: } catch (IOException e) {
0713: log.error(sm.getString("deltaManager.loading.ioe", e), e);
0714: throw e;
0715: } finally {
0716: // Close the input stream
0717: try {
0718: if (ois != null)
0719: ois.close();
0720: } catch (IOException f) {
0721: // ignored
0722: }
0723: ois = null;
0724: if (originalLoader != null)
0725: Thread.currentThread().setContextClassLoader(
0726: originalLoader);
0727: }
0728:
0729: }
0730:
0731: /**
0732: * Save any currently active sessions in the appropriate persistence
0733: * mechanism, if any. If persistence is not supported, this method returns
0734: * without doing anything.
0735: *
0736: * @exception IOException
0737: * if an input/output error occurs
0738: */
0739: protected byte[] serializeSessions(Session[] currentSessions)
0740: throws IOException {
0741:
0742: // Open an output stream to the specified pathname, if any
0743: ByteArrayOutputStream fos = null;
0744: ObjectOutputStream oos = null;
0745:
0746: try {
0747: fos = new ByteArrayOutputStream();
0748: oos = new ObjectOutputStream(new BufferedOutputStream(fos));
0749: oos.writeObject(new Integer(currentSessions.length));
0750: for (int i = 0; i < currentSessions.length; i++) {
0751: ((DeltaSession) currentSessions[i])
0752: .writeObjectData(oos);
0753: }
0754: // Flush and close the output stream
0755: oos.flush();
0756: } catch (IOException e) {
0757: log.error(sm.getString("deltaManager.unloading.ioe", e), e);
0758: throw e;
0759: } finally {
0760: if (oos != null) {
0761: try {
0762: oos.close();
0763: } catch (IOException f) {
0764: ;
0765: }
0766: oos = null;
0767: }
0768: }
0769: // send object data as byte[]
0770: return fos.toByteArray();
0771: }
0772:
0773: // ------------------------------------------------------ Lifecycle Methods
0774:
0775: /**
0776: * Add a lifecycle event listener to this component.
0777: *
0778: * @param listener
0779: * The listener to add
0780: */
0781: public void addLifecycleListener(LifecycleListener listener) {
0782: lifecycle.addLifecycleListener(listener);
0783: }
0784:
0785: /**
0786: * Get the lifecycle listeners associated with this lifecycle. If this
0787: * Lifecycle has no listeners registered, a zero-length array is returned.
0788: */
0789: public LifecycleListener[] findLifecycleListeners() {
0790: return lifecycle.findLifecycleListeners();
0791: }
0792:
0793: /**
0794: * Remove a lifecycle event listener from this component.
0795: *
0796: * @param listener
0797: * The listener to remove
0798: */
0799: public void removeLifecycleListener(LifecycleListener listener) {
0800: lifecycle.removeLifecycleListener(listener);
0801: }
0802:
0803: /**
0804: * Prepare for the beginning of active use of the public methods of this
0805: * component. This method should be called after <code>configure()</code>,
0806: * and before any of the public methods of the component are utilized.
0807: *
0808: * @exception LifecycleException
0809: * if this component detects a fatal error that prevents this
0810: * component from being used
0811: */
0812: public void start() throws LifecycleException {
0813: if (!initialized)
0814: init();
0815:
0816: // Validate and update our current component state
0817: if (started) {
0818: return;
0819: }
0820: started = true;
0821: lifecycle.fireLifecycleEvent(START_EVENT, null);
0822:
0823: // Force initialization of the random number generator
0824: generateSessionId();
0825:
0826: // Load unloaded sessions, if any
0827: try {
0828: //the channel is already running
0829: Cluster cluster = getCluster();
0830: // stop remove cluster binding
0831: //wow, how many nested levels of if statements can we have ;)
0832: if (cluster == null) {
0833: Container context = getContainer();
0834: if (context != null && context instanceof Context) {
0835: Container host = context.getParent();
0836: if (host != null && host instanceof Host) {
0837: cluster = host.getCluster();
0838: if (cluster != null
0839: && cluster instanceof CatalinaCluster) {
0840: setCluster((CatalinaCluster) cluster);
0841: } else {
0842: Container engine = host.getParent();
0843: if (engine != null
0844: && engine instanceof Engine) {
0845: cluster = engine.getCluster();
0846: if (cluster != null
0847: && cluster instanceof CatalinaCluster) {
0848: setCluster((CatalinaCluster) cluster);
0849: }
0850: } else {
0851: cluster = null;
0852: }
0853: }
0854: }
0855: }
0856: }
0857: if (cluster == null) {
0858: log.error(sm.getString("deltaManager.noCluster",
0859: getName()));
0860: return;
0861: } else {
0862: if (log.isInfoEnabled()) {
0863: String type = "unknown";
0864: if (cluster.getContainer() instanceof Host) {
0865: type = "Host";
0866: } else if (cluster.getContainer() instanceof Engine) {
0867: type = "Engine";
0868: }
0869: log.info(sm.getString(
0870: "deltaManager.registerCluster", getName(),
0871: type, cluster.getClusterName()));
0872: }
0873: }
0874: if (log.isInfoEnabled())
0875: log.info(sm.getString("deltaManager.startClustering",
0876: getName()));
0877: //to survice context reloads, as only a stop/start is called, not
0878: // createManager
0879: cluster.registerManager(this );
0880:
0881: getAllClusterSessions();
0882:
0883: } catch (Throwable t) {
0884: log.error(sm.getString("deltaManager.managerLoad"), t);
0885: }
0886: }
0887:
0888: /**
0889: * get from first session master the backup from all clustered sessions
0890: * @see #findSessionMasterMember()
0891: */
0892: public synchronized void getAllClusterSessions() {
0893: if (cluster != null && cluster.getMembers().length > 0) {
0894: long beforeSendTime = System.currentTimeMillis();
0895: Member mbr = findSessionMasterMember();
0896: if (mbr == null) { // No domain member found
0897: return;
0898: }
0899: SessionMessage msg = new SessionMessageImpl(this .getName(),
0900: SessionMessage.EVT_GET_ALL_SESSIONS, null,
0901: "GET-ALL", "GET-ALL-" + getName());
0902: // set reference time
0903: stateTransferCreateSendTime = beforeSendTime;
0904: // request session state
0905: counterSend_EVT_GET_ALL_SESSIONS++;
0906: stateTransfered = false;
0907: // FIXME This send call block the deploy thread, when sender waitForAck is enabled
0908: try {
0909: synchronized (receivedMessageQueue) {
0910: receiverQueue = true;
0911: }
0912: cluster.send(msg, mbr);
0913: if (log.isWarnEnabled())
0914: log.warn(sm.getString(
0915: "deltaManager.waitForSessionState",
0916: getName(), mbr));
0917: // FIXME At sender ack mode this method check only the state transfer and resend is a problem!
0918: waitForSendAllSessions(beforeSendTime);
0919: } finally {
0920: synchronized (receivedMessageQueue) {
0921: for (Iterator iter = receivedMessageQueue
0922: .iterator(); iter.hasNext();) {
0923: SessionMessage smsg = (SessionMessage) iter
0924: .next();
0925: if (!stateTimestampDrop) {
0926: messageReceived(
0927: smsg,
0928: smsg.getAddress() != null ? (Member) smsg
0929: .getAddress()
0930: : null);
0931: } else {
0932: if (smsg.getEventType() != SessionMessage.EVT_GET_ALL_SESSIONS
0933: && smsg.getTimestamp() >= stateTransferCreateSendTime) {
0934: // FIXME handle EVT_GET_ALL_SESSIONS later
0935: messageReceived(
0936: smsg,
0937: smsg.getAddress() != null ? (Member) smsg
0938: .getAddress()
0939: : null);
0940: } else {
0941: if (log.isWarnEnabled()) {
0942: log
0943: .warn(sm
0944: .getString(
0945: "deltaManager.dropMessage",
0946: getName(),
0947: smsg
0948: .getEventTypeString(),
0949: new Date(
0950: stateTransferCreateSendTime),
0951: new Date(
0952: smsg
0953: .getTimestamp())));
0954: }
0955: }
0956: }
0957: }
0958: receivedMessageQueue.clear();
0959: receiverQueue = false;
0960: }
0961: }
0962: } else {
0963: if (log.isInfoEnabled())
0964: log.info(sm.getString("deltaManager.noMembers",
0965: getName()));
0966: }
0967: }
0968:
0969: /**
0970: * Register cross context session at replication valve thread local
0971: * @param session cross context session
0972: */
0973: protected void registerSessionAtReplicationValve(
0974: DeltaSession session) {
0975: if (replicationValve == null) {
0976: if (container instanceof StandardContext
0977: && ((StandardContext) container).getCrossContext()) {
0978: Cluster cluster = getCluster();
0979: if (cluster != null
0980: && cluster instanceof CatalinaCluster) {
0981: Valve[] valves = ((CatalinaCluster) cluster)
0982: .getValves();
0983: if (valves != null && valves.length > 0) {
0984: for (int i = 0; replicationValve == null
0985: && i < valves.length; i++) {
0986: if (valves[i] instanceof ReplicationValve)
0987: replicationValve = (ReplicationValve) valves[i];
0988: }//for
0989:
0990: if (replicationValve == null
0991: && log.isDebugEnabled()) {
0992: log
0993: .debug("no ReplicationValve found for CrossContext Support");
0994: }//endif
0995: }//end if
0996: }//endif
0997: }//end if
0998: }//end if
0999: if (replicationValve != null) {
1000: replicationValve.registerReplicationSession(session);
1001: }
1002: }
1003:
1004: /**
1005: * Find the master of the session state
1006: * @return master member of sessions
1007: */
1008: protected Member findSessionMasterMember() {
1009: Member mbr = null;
1010: Member mbrs[] = cluster.getMembers();
1011: if (mbrs.length != 0)
1012: mbr = mbrs[0];
1013: if (mbr == null && log.isWarnEnabled())
1014: log.warn(sm.getString("deltaManager.noMasterMember",
1015: getName(), ""));
1016: if (mbr != null && log.isDebugEnabled())
1017: log.warn(sm.getString("deltaManager.foundMasterMember",
1018: getName(), mbr));
1019: return mbr;
1020: }
1021:
1022: /**
1023: * Wait that cluster session state is transfer or timeout after 60 Sec
1024: * With stateTransferTimeout == -1 wait that backup is transfered (forever mode)
1025: */
1026: protected void waitForSendAllSessions(long beforeSendTime) {
1027: long reqStart = System.currentTimeMillis();
1028: long reqNow = reqStart;
1029: boolean isTimeout = false;
1030: if (getStateTransferTimeout() > 0) {
1031: // wait that state is transfered with timeout check
1032: do {
1033: try {
1034: Thread.sleep(100);
1035: } catch (Exception sleep) {
1036: //
1037: }
1038: reqNow = System.currentTimeMillis();
1039: isTimeout = ((reqNow - reqStart) > (1000 * getStateTransferTimeout()));
1040: } while ((!getStateTransfered()) && (!isTimeout));
1041: } else {
1042: if (getStateTransferTimeout() == -1) {
1043: // wait that state is transfered
1044: do {
1045: try {
1046: Thread.sleep(100);
1047: } catch (Exception sleep) {
1048: }
1049: } while ((!getStateTransfered()));
1050: reqNow = System.currentTimeMillis();
1051: }
1052: }
1053: if (isTimeout || (!getStateTransfered())) {
1054: counterNoStateTransfered++;
1055: log.error(sm.getString("deltaManager.noSessionState",
1056: getName(), new Date(beforeSendTime), new Long(
1057: reqNow - beforeSendTime)));
1058: } else {
1059: if (log.isInfoEnabled())
1060: log.info(sm.getString("deltaManager.sessionReceived",
1061: getName(), new Date(beforeSendTime), new Long(
1062: reqNow - beforeSendTime)));
1063: }
1064: }
1065:
1066: /**
1067: * Gracefully terminate the active use of the public methods of this
1068: * component. This method should be the last one called on a given instance
1069: * of this component.
1070: *
1071: * @exception LifecycleException
1072: * if this component detects a fatal error that needs to be
1073: * reported
1074: */
1075: public void stop() throws LifecycleException {
1076:
1077: if (log.isDebugEnabled())
1078: log.debug(sm.getString("deltaManager.stopped", getName()));
1079:
1080: // Validate and update our current component state
1081: if (!started)
1082: throw new LifecycleException(sm
1083: .getString("deltaManager.notStarted"));
1084: lifecycle.fireLifecycleEvent(STOP_EVENT, null);
1085: started = false;
1086:
1087: // Expire all active sessions
1088: if (log.isInfoEnabled())
1089: log.info(sm.getString("deltaManager.expireSessions",
1090: getName()));
1091: Session sessions[] = findSessions();
1092: for (int i = 0; i < sessions.length; i++) {
1093: DeltaSession session = (DeltaSession) sessions[i];
1094: if (!session.isValid())
1095: continue;
1096: try {
1097: session.expire(true, isExpireSessionsOnShutdown());
1098: } catch (Throwable ignore) {
1099: ;
1100: }
1101: }
1102:
1103: // Require a new random number generator if we are restarted
1104: this .random = null;
1105: getCluster().removeManager(this );
1106: replicationValve = null;
1107: if (initialized) {
1108: destroy();
1109: }
1110: }
1111:
1112: // ----------------------------------------- PropertyChangeListener Methods
1113:
1114: /**
1115: * Process property change events from our associated Context.
1116: *
1117: * @param event
1118: * The property change event that has occurred
1119: */
1120: public void propertyChange(PropertyChangeEvent event) {
1121:
1122: // Validate the source of this event
1123: if (!(event.getSource() instanceof Context))
1124: return;
1125: // Process a relevant property change
1126: if (event.getPropertyName().equals("sessionTimeout")) {
1127: try {
1128: setMaxInactiveInterval(((Integer) event.getNewValue())
1129: .intValue() * 60);
1130: } catch (NumberFormatException e) {
1131: log.error(sm.getString("deltaManager.sessionTimeout",
1132: event.getNewValue()));
1133: }
1134: }
1135:
1136: }
1137:
1138: // -------------------------------------------------------- Replication
1139: // Methods
1140:
1141: /**
1142: * A message was received from another node, this is the callback method to
1143: * implement if you are interested in receiving replication messages.
1144: *
1145: * @param cmsg -
1146: * the message received.
1147: */
1148: public void messageDataReceived(ClusterMessage cmsg) {
1149: if (cmsg != null && cmsg instanceof SessionMessage) {
1150: SessionMessage msg = (SessionMessage) cmsg;
1151: switch (msg.getEventType()) {
1152: case SessionMessage.EVT_GET_ALL_SESSIONS:
1153: case SessionMessage.EVT_SESSION_CREATED:
1154: case SessionMessage.EVT_SESSION_EXPIRED:
1155: case SessionMessage.EVT_SESSION_ACCESSED:
1156: case SessionMessage.EVT_SESSION_DELTA: {
1157: synchronized (receivedMessageQueue) {
1158: if (receiverQueue) {
1159: receivedMessageQueue.add(msg);
1160: return;
1161: }
1162: }
1163: break;
1164: }
1165: default: {
1166: //we didn't queue, do nothing
1167: break;
1168: }
1169: } //switch
1170:
1171: messageReceived(msg,
1172: msg.getAddress() != null ? (Member) msg
1173: .getAddress() : null);
1174: }
1175: }
1176:
1177: /**
1178: * When the request has been completed, the replication valve will notify
1179: * the manager, and the manager will decide whether any replication is
1180: * needed or not. If there is a need for replication, the manager will
1181: * create a session message and that will be replicated. The cluster
1182: * determines where it gets sent.
1183: *
1184: * @param sessionId -
1185: * the sessionId that just completed.
1186: * @return a SessionMessage to be sent,
1187: */
1188: public ClusterMessage requestCompleted(String sessionId) {
1189: try {
1190: DeltaSession session = (DeltaSession) findSession(sessionId);
1191: DeltaRequest deltaRequest = session.getDeltaRequest();
1192: SessionMessage msg = null;
1193: boolean isDeltaRequest = false;
1194: synchronized (deltaRequest) {
1195: isDeltaRequest = deltaRequest.getSize() > 0;
1196: if (isDeltaRequest) {
1197: counterSend_EVT_SESSION_DELTA++;
1198: byte[] data = serializeDeltaRequest(deltaRequest);
1199: msg = new SessionMessageImpl(getName(),
1200: SessionMessage.EVT_SESSION_DELTA, data,
1201: sessionId, sessionId + "-"
1202: + System.currentTimeMillis());
1203: session.resetDeltaRequest();
1204: }
1205: }
1206: if (!isDeltaRequest) {
1207: if (!session.isPrimarySession()) {
1208: counterSend_EVT_SESSION_ACCESSED++;
1209: msg = new SessionMessageImpl(getName(),
1210: SessionMessage.EVT_SESSION_ACCESSED, null,
1211: sessionId, sessionId + "-"
1212: + System.currentTimeMillis());
1213: if (log.isDebugEnabled()) {
1214: log
1215: .debug(sm
1216: .getString(
1217: "deltaManager.createMessage.accessChangePrimary",
1218: getName(), sessionId));
1219: }
1220: }
1221: } else { // log only outside synch block!
1222: if (log.isDebugEnabled()) {
1223: log.debug(sm.getString(
1224: "deltaManager.createMessage.delta",
1225: getName(), sessionId));
1226: }
1227: }
1228: session.setPrimarySession(true);
1229: //check to see if we need to send out an access message
1230: if ((msg == null)) {
1231: long replDelta = System.currentTimeMillis()
1232: - session.getLastTimeReplicated();
1233: if (replDelta > (getMaxInactiveInterval() * 1000)) {
1234: counterSend_EVT_SESSION_ACCESSED++;
1235: msg = new SessionMessageImpl(getName(),
1236: SessionMessage.EVT_SESSION_ACCESSED, null,
1237: sessionId, sessionId + "-"
1238: + System.currentTimeMillis());
1239: if (log.isDebugEnabled()) {
1240: log.debug(sm.getString(
1241: "deltaManager.createMessage.access",
1242: getName(), sessionId));
1243: }
1244: }
1245:
1246: }
1247:
1248: //update last replicated time
1249: if (msg != null)
1250: session.setLastTimeReplicated(System
1251: .currentTimeMillis());
1252: return msg;
1253: } catch (IOException x) {
1254: log
1255: .error(
1256: sm
1257: .getString(
1258: "deltaManager.createMessage.unableCreateDeltaRequest",
1259: sessionId), x);
1260: return null;
1261: }
1262:
1263: }
1264:
1265: /**
1266: * Reset manager statistics
1267: */
1268: public synchronized void resetStatistics() {
1269: processingTime = 0;
1270: expiredSessions = 0;
1271: rejectedSessions = 0;
1272: sessionReplaceCounter = 0;
1273: counterNoStateTransfered = 0;
1274: maxActive = getActiveSessions();
1275: sessionCounter = getActiveSessions();
1276: counterReceive_EVT_ALL_SESSION_DATA = 0;
1277: counterReceive_EVT_GET_ALL_SESSIONS = 0;
1278: counterReceive_EVT_SESSION_ACCESSED = 0;
1279: counterReceive_EVT_SESSION_CREATED = 0;
1280: counterReceive_EVT_SESSION_DELTA = 0;
1281: counterReceive_EVT_SESSION_EXPIRED = 0;
1282: counterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE = 0;
1283: counterSend_EVT_ALL_SESSION_DATA = 0;
1284: counterSend_EVT_GET_ALL_SESSIONS = 0;
1285: counterSend_EVT_SESSION_ACCESSED = 0;
1286: counterSend_EVT_SESSION_CREATED = 0;
1287: counterSend_EVT_SESSION_DELTA = 0;
1288: counterSend_EVT_SESSION_EXPIRED = 0;
1289: counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE = 0;
1290:
1291: }
1292:
1293: // -------------------------------------------------------- persistence handler
1294:
1295: public void load() {
1296:
1297: }
1298:
1299: public void unload() {
1300:
1301: }
1302:
1303: // -------------------------------------------------------- expire
1304:
1305: /**
1306: * send session expired to other cluster nodes
1307: *
1308: * @param id
1309: * session id
1310: */
1311: protected void sessionExpired(String id) {
1312: counterSend_EVT_SESSION_EXPIRED++;
1313: SessionMessage msg = new SessionMessageImpl(getName(),
1314: SessionMessage.EVT_SESSION_EXPIRED, null, id, id
1315: + "-EXPIRED-MSG");
1316: if (log.isDebugEnabled())
1317: log.debug(sm.getString("deltaManager.createMessage.expire",
1318: getName(), id));
1319: send(msg);
1320: }
1321:
1322: /**
1323: * Exipre all find sessions.
1324: */
1325: public void expireAllLocalSessions() {
1326: long timeNow = System.currentTimeMillis();
1327: Session sessions[] = findSessions();
1328: int expireDirect = 0;
1329: int expireIndirect = 0;
1330:
1331: if (log.isDebugEnabled())
1332: log.debug("Start expire all sessions " + getName() + " at "
1333: + timeNow + " sessioncount " + sessions.length);
1334: for (int i = 0; i < sessions.length; i++) {
1335: if (sessions[i] instanceof DeltaSession) {
1336: DeltaSession session = (DeltaSession) sessions[i];
1337: if (session.isPrimarySession()) {
1338: if (session.isValid()) {
1339: session.expire();
1340: expireDirect++;
1341: } else {
1342: expireIndirect++;
1343: }//end if
1344: }//end if
1345: }//end if
1346: }//for
1347: long timeEnd = System.currentTimeMillis();
1348: if (log.isDebugEnabled())
1349: log.debug("End expire sessions " + getName()
1350: + " exipre processingTime " + (timeEnd - timeNow)
1351: + " expired direct sessions: " + expireDirect
1352: + " expired direct sessions: " + expireIndirect);
1353:
1354: }
1355:
1356: /**
1357: * When the manager expires session not tied to a request. The cluster will
1358: * periodically ask for a list of sessions that should expire and that
1359: * should be sent across the wire.
1360: *
1361: * @return The invalidated sessions array
1362: */
1363: public String[] getInvalidatedSessions() {
1364: return new String[0];
1365: }
1366:
1367: // -------------------------------------------------------- message receive
1368:
1369: /**
1370: * Test that sender and local domain is the same
1371: */
1372: protected boolean checkSenderDomain(SessionMessage msg,
1373: Member sender) {
1374: boolean sameDomain = true;
1375: if (!sameDomain && log.isWarnEnabled()) {
1376: log.warn(sm.getString(
1377: "deltaManager.receiveMessage.fromWrongDomain",
1378: new Object[] { getName(), msg.getEventTypeString(),
1379: sender, "", "" }));
1380: }
1381: return sameDomain;
1382: }
1383:
1384: /**
1385: * This method is called by the received thread when a SessionMessage has
1386: * been received from one of the other nodes in the cluster.
1387: *
1388: * @param msg -
1389: * the message received
1390: * @param sender -
1391: * the sender of the message, this is used if we receive a
1392: * EVT_GET_ALL_SESSION message, so that we only reply to the
1393: * requesting node
1394: */
1395: protected void messageReceived(SessionMessage msg, Member sender) {
1396: if (doDomainReplication() && !checkSenderDomain(msg, sender)) {
1397: return;
1398: }
1399: ClassLoader contextLoader = Thread.currentThread()
1400: .getContextClassLoader();
1401: try {
1402:
1403: ClassLoader[] loaders = getClassLoaders();
1404: if (loaders != null && loaders.length > 0)
1405: Thread.currentThread()
1406: .setContextClassLoader(loaders[0]);
1407: if (log.isDebugEnabled())
1408: log.debug(sm.getString(
1409: "deltaManager.receiveMessage.eventType",
1410: getName(), msg.getEventTypeString(), sender));
1411:
1412: switch (msg.getEventType()) {
1413: case SessionMessage.EVT_GET_ALL_SESSIONS: {
1414: handleGET_ALL_SESSIONS(msg, sender);
1415: break;
1416: }
1417: case SessionMessage.EVT_ALL_SESSION_DATA: {
1418: handleALL_SESSION_DATA(msg, sender);
1419: break;
1420: }
1421: case SessionMessage.EVT_ALL_SESSION_TRANSFERCOMPLETE: {
1422: handleALL_SESSION_TRANSFERCOMPLETE(msg, sender);
1423: break;
1424: }
1425: case SessionMessage.EVT_SESSION_CREATED: {
1426: handleSESSION_CREATED(msg, sender);
1427: break;
1428: }
1429: case SessionMessage.EVT_SESSION_EXPIRED: {
1430: handleSESSION_EXPIRED(msg, sender);
1431: break;
1432: }
1433: case SessionMessage.EVT_SESSION_ACCESSED: {
1434: handleSESSION_ACCESSED(msg, sender);
1435: break;
1436: }
1437: case SessionMessage.EVT_SESSION_DELTA: {
1438: handleSESSION_DELTA(msg, sender);
1439: break;
1440: }
1441: default: {
1442: //we didn't recognize the message type, do nothing
1443: break;
1444: }
1445: } //switch
1446: } catch (Exception x) {
1447: log.error(sm.getString("deltaManager.receiveMessage.error",
1448: getName()), x);
1449: } finally {
1450: Thread.currentThread().setContextClassLoader(contextLoader);
1451: }
1452: }
1453:
1454: // -------------------------------------------------------- message receiver handler
1455:
1456: /**
1457: * handle receive session state is complete transfered
1458: * @param msg
1459: * @param sender
1460: */
1461: protected void handleALL_SESSION_TRANSFERCOMPLETE(
1462: SessionMessage msg, Member sender) {
1463: counterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE++;
1464: if (log.isDebugEnabled())
1465: log.debug(sm.getString(
1466: "deltaManager.receiveMessage.transfercomplete",
1467: getName(), sender.getHost(), new Integer(sender
1468: .getPort())));
1469: stateTransferCreateSendTime = msg.getTimestamp();
1470: stateTransfered = true;
1471: }
1472:
1473: /**
1474: * handle receive session delta
1475: * @param msg
1476: * @param sender
1477: * @throws IOException
1478: * @throws ClassNotFoundException
1479: */
1480: protected void handleSESSION_DELTA(SessionMessage msg, Member sender)
1481: throws IOException, ClassNotFoundException {
1482: counterReceive_EVT_SESSION_DELTA++;
1483: byte[] delta = msg.getSession();
1484: DeltaSession session = (DeltaSession) findSession(msg
1485: .getSessionID());
1486: if (session != null) {
1487: if (log.isDebugEnabled())
1488: log.debug(sm.getString(
1489: "deltaManager.receiveMessage.delta", getName(),
1490: msg.getSessionID()));
1491: DeltaRequest dreq = deserializeDeltaRequest(session, delta);
1492: dreq.execute(session, notifyListenersOnReplication);
1493: session.setPrimarySession(false);
1494: }
1495: }
1496:
1497: /**
1498: * handle receive session is access at other node ( primary session is now false)
1499: * @param msg
1500: * @param sender
1501: * @throws IOException
1502: */
1503: protected void handleSESSION_ACCESSED(SessionMessage msg,
1504: Member sender) throws IOException {
1505: counterReceive_EVT_SESSION_ACCESSED++;
1506: DeltaSession session = (DeltaSession) findSession(msg
1507: .getSessionID());
1508: if (session != null) {
1509: if (log.isDebugEnabled())
1510: log.debug(sm.getString(
1511: "deltaManager.receiveMessage.accessed",
1512: getName(), msg.getSessionID()));
1513: session.access();
1514: session.setPrimarySession(false);
1515: session.endAccess();
1516: }
1517: }
1518:
1519: /**
1520: * handle receive session is expire at other node ( expire session also here)
1521: * @param msg
1522: * @param sender
1523: * @throws IOException
1524: */
1525: protected void handleSESSION_EXPIRED(SessionMessage msg,
1526: Member sender) throws IOException {
1527: counterReceive_EVT_SESSION_EXPIRED++;
1528: DeltaSession session = (DeltaSession) findSession(msg
1529: .getSessionID());
1530: if (session != null) {
1531: if (log.isDebugEnabled())
1532: log.debug(sm.getString(
1533: "deltaManager.receiveMessage.expired",
1534: getName(), msg.getSessionID()));
1535: session.expire(notifySessionListenersOnReplication, false);
1536: }
1537: }
1538:
1539: /**
1540: * handle receive new session is created at other node (create backup - primary false)
1541: * @param msg
1542: * @param sender
1543: */
1544: protected void handleSESSION_CREATED(SessionMessage msg,
1545: Member sender) {
1546: counterReceive_EVT_SESSION_CREATED++;
1547: if (log.isDebugEnabled())
1548: log.debug(sm.getString(
1549: "deltaManager.receiveMessage.createNewSession",
1550: getName(), msg.getSessionID()));
1551: DeltaSession session = (DeltaSession) createEmptySession();
1552: session.setManager(this );
1553: session.setValid(true);
1554: session.setPrimarySession(false);
1555: session.setCreationTime(msg.getTimestamp());
1556: session.access();
1557: if (notifySessionListenersOnReplication)
1558: session.setId(msg.getSessionID());
1559: else
1560: session.setIdInternal(msg.getSessionID());
1561: session.resetDeltaRequest();
1562: session.endAccess();
1563:
1564: }
1565:
1566: /**
1567: * handle receive sessions from other not ( restart )
1568: * @param msg
1569: * @param sender
1570: * @throws ClassNotFoundException
1571: * @throws IOException
1572: */
1573: protected void handleALL_SESSION_DATA(SessionMessage msg,
1574: Member sender) throws ClassNotFoundException, IOException {
1575: counterReceive_EVT_ALL_SESSION_DATA++;
1576: if (log.isDebugEnabled())
1577: log.debug(sm.getString(
1578: "deltaManager.receiveMessage.allSessionDataBegin",
1579: getName()));
1580: byte[] data = msg.getSession();
1581: deserializeSessions(data);
1582: if (log.isDebugEnabled())
1583: log.debug(sm.getString(
1584: "deltaManager.receiveMessage.allSessionDataAfter",
1585: getName()));
1586: //stateTransferred = true;
1587: }
1588:
1589: /**
1590: * handle receive that other node want all sessions ( restart )
1591: * a) send all sessions with one message
1592: * b) send session at blocks
1593: * After sending send state is complete transfered
1594: * @param msg
1595: * @param sender
1596: * @throws IOException
1597: */
1598: protected void handleGET_ALL_SESSIONS(SessionMessage msg,
1599: Member sender) throws IOException {
1600: counterReceive_EVT_GET_ALL_SESSIONS++;
1601: //get a list of all the session from this manager
1602: if (log.isDebugEnabled())
1603: log.debug(sm.getString(
1604: "deltaManager.receiveMessage.unloadingBegin",
1605: getName()));
1606: // Write the number of active sessions, followed by the details
1607: // get all sessions and serialize without sync
1608: Session[] currentSessions = findSessions();
1609: long findSessionTimestamp = System.currentTimeMillis();
1610: if (isSendAllSessions()) {
1611: sendSessions(sender, currentSessions, findSessionTimestamp);
1612: } else {
1613: // send session at blocks
1614: int len = currentSessions.length < getSendAllSessionsSize() ? currentSessions.length
1615: : getSendAllSessionsSize();
1616: Session[] sendSessions = new Session[len];
1617: for (int i = 0; i < currentSessions.length; i += getSendAllSessionsSize()) {
1618: len = i + getSendAllSessionsSize() > currentSessions.length ? currentSessions.length
1619: - i
1620: : getSendAllSessionsSize();
1621: System.arraycopy(currentSessions, i, sendSessions, 0,
1622: len);
1623: sendSessions(sender, sendSessions, findSessionTimestamp);
1624: if (getSendAllSessionsWaitTime() > 0) {
1625: try {
1626: Thread.sleep(getSendAllSessionsWaitTime());
1627: } catch (Exception sleep) {
1628: }
1629: }//end if
1630: }//for
1631: }//end if
1632:
1633: SessionMessage newmsg = new SessionMessageImpl(name,
1634: SessionMessage.EVT_ALL_SESSION_TRANSFERCOMPLETE, null,
1635: "SESSION-STATE-TRANSFERED", "SESSION-STATE-TRANSFERED"
1636: + getName());
1637: newmsg.setTimestamp(findSessionTimestamp);
1638: if (log.isDebugEnabled())
1639: log.debug(sm.getString(
1640: "deltaManager.createMessage.allSessionTransfered",
1641: getName()));
1642: counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE++;
1643: cluster.send(newmsg, sender);
1644: }
1645:
1646: /**
1647: * send a block of session to sender
1648: * @param sender
1649: * @param currentSessions
1650: * @param sendTimestamp
1651: * @throws IOException
1652: */
1653: protected void sendSessions(Member sender,
1654: Session[] currentSessions, long sendTimestamp)
1655: throws IOException {
1656: byte[] data = serializeSessions(currentSessions);
1657: if (log.isDebugEnabled())
1658: log.debug(sm.getString(
1659: "deltaManager.receiveMessage.unloadingAfter",
1660: getName()));
1661: SessionMessage newmsg = new SessionMessageImpl(name,
1662: SessionMessage.EVT_ALL_SESSION_DATA, data,
1663: "SESSION-STATE", "SESSION-STATE-" + getName());
1664: newmsg.setTimestamp(sendTimestamp);
1665: if (log.isDebugEnabled())
1666: log.debug(sm.getString(
1667: "deltaManager.createMessage.allSessionData",
1668: getName()));
1669: counterSend_EVT_ALL_SESSION_DATA++;
1670: cluster.send(newmsg, sender);
1671: }
1672:
1673: public ClusterManager cloneFromTemplate() {
1674: DeltaManager result = new DeltaManager();
1675: result.name = "Clone-from-" + name;
1676: result.cluster = cluster;
1677: result.replicationValve = replicationValve;
1678: result.maxActiveSessions = maxActiveSessions;
1679: result.expireSessionsOnShutdown = expireSessionsOnShutdown;
1680: result.notifyListenersOnReplication = notifyListenersOnReplication;
1681: result.notifySessionListenersOnReplication = notifySessionListenersOnReplication;
1682: result.stateTransferTimeout = stateTransferTimeout;
1683: result.sendAllSessions = sendAllSessions;
1684: result.sendClusterDomainOnly = sendClusterDomainOnly;
1685: result.sendAllSessionsSize = sendAllSessionsSize;
1686: result.sendAllSessionsWaitTime = sendAllSessionsWaitTime;
1687: result.receiverQueue = receiverQueue;
1688: result.stateTimestampDrop = stateTimestampDrop;
1689: result.stateTransferCreateSendTime = stateTransferCreateSendTime;
1690: return result;
1691: }
1692: }
|