001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017: package org.apache.catalina.ha.session;
018:
019: import java.io.IOException;
020:
021: import org.apache.catalina.LifecycleException;
022: import org.apache.catalina.Session;
023: import org.apache.catalina.ha.CatalinaCluster;
024: import org.apache.catalina.ha.ClusterManager;
025: import org.apache.catalina.ha.ClusterMessage;
026: import org.apache.catalina.tribes.Member;
027: import org.apache.catalina.realm.GenericPrincipal;
028: import org.apache.catalina.session.StandardManager;
029: import org.apache.catalina.tribes.io.ReplicationStream;
030: import java.io.ByteArrayInputStream;
031: import org.apache.catalina.Loader;
032:
033: /**
034: * Title: Tomcat Session Replication for Tomcat 4.0 <BR>
035: * Description: A very simple straight forward implementation of
036: * session replication of servers in a cluster.<BR>
037: * This session replication is implemented "live". By live
038: * I mean, when a session attribute is added into a session on Node A
039: * a message is broadcasted to other messages and setAttribute is called on the
040: * replicated sessions.<BR>
041: * A full description of this implementation can be found under
042: * <href="http://www.filip.net/tomcat/">Filip's Tomcat Page</a><BR>
043: *
044: * Copyright: See apache license
045: * Company: www.filip.net
046: * @author <a href="mailto:mail@filip.net">Filip Hanik</a>
047: * @author Bela Ban (modifications for synchronous replication)
048: * @version 1.0 for TC 4.0
049: * Description: The InMemoryReplicationManager is a session manager that replicated
050: * session information in memory.
051: * <BR><BR>
052: * The InMemoryReplicationManager extends the StandardManager hence it allows for us
053: * to inherit all the basic session management features like expiration, session listeners etc
054: * <BR><BR>
055: * To communicate with other nodes in the cluster, the InMemoryReplicationManager sends out 7 different type of multicast messages
056: * all defined in the SessionMessage class.<BR>
057: * When a session is replicated (not an attribute added/removed) the session is serialized into
058: * a byte array using the StandardSession.readObjectData, StandardSession.writeObjectData methods.
059: */
060: public class SimpleTcpReplicationManager extends StandardManager
061: implements ClusterManager {
062: public static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory
063: .getLog(SimpleTcpReplicationManager.class);
064:
065: //the channel configuration
066: protected String mChannelConfig = null;
067:
068: //the group name
069: protected String mGroupName = "TomcatReplication";
070:
071: //somehow start() gets called more than once
072: protected boolean mChannelStarted = false;
073:
074: //log to screen
075: protected boolean mPrintToScreen = true;
076:
077: protected boolean defaultMode = false;
078:
079: protected boolean mManagerRunning = false;
080:
081: /** Use synchronous rather than asynchronous replication. Every session modification (creation, change, removal etc)
082: * will be sent to all members. The call will then wait for max milliseconds, or forever (if timeout is 0) for
083: * all responses.
084: */
085: protected boolean synchronousReplication = true;
086:
087: /** Set to true if we don't want the sessions to expire on shutdown */
088: protected boolean mExpireSessionsOnShutdown = true;
089:
090: protected boolean useDirtyFlag = false;
091:
092: protected String name;
093:
094: protected boolean distributable = true;
095:
096: protected CatalinaCluster cluster;
097:
098: protected java.util.HashMap invalidatedSessions = new java.util.HashMap();
099:
100: /**
101: * Flag to keep track if the state has been transferred or not
102: * Assumes false.
103: */
104: protected boolean stateTransferred = false;
105: private boolean notifyListenersOnReplication;
106: private boolean sendClusterDomainOnly = true;
107:
108: /**
109: * Constructor, just calls super()
110: *
111: */
112: public SimpleTcpReplicationManager() {
113: super ();
114: }
115:
116: public boolean doDomainReplication() {
117: return sendClusterDomainOnly;
118: }
119:
120: /**
121: * @param sendClusterDomainOnly The sendClusterDomainOnly to set.
122: */
123: public void setDomainReplication(boolean sendClusterDomainOnly) {
124: this .sendClusterDomainOnly = sendClusterDomainOnly;
125: }
126:
127: /**
128: * @return Returns the defaultMode.
129: */
130: public boolean isDefaultMode() {
131: return defaultMode;
132: }
133:
134: /**
135: * @param defaultMode The defaultMode to set.
136: */
137: public void setDefaultMode(boolean defaultMode) {
138: this .defaultMode = defaultMode;
139: }
140:
141: public boolean isManagerRunning() {
142: return mManagerRunning;
143: }
144:
145: public void setUseDirtyFlag(boolean usedirtyflag) {
146: this .useDirtyFlag = usedirtyflag;
147: }
148:
149: public void setExpireSessionsOnShutdown(
150: boolean expireSessionsOnShutdown) {
151: mExpireSessionsOnShutdown = expireSessionsOnShutdown;
152: }
153:
154: public void setCluster(CatalinaCluster cluster) {
155: if (log.isDebugEnabled())
156: log
157: .debug("Cluster associated with SimpleTcpReplicationManager");
158: this .cluster = cluster;
159: }
160:
161: public boolean getExpireSessionsOnShutdown() {
162: return mExpireSessionsOnShutdown;
163: }
164:
165: public void setPrintToScreen(boolean printtoscreen) {
166: if (log.isDebugEnabled())
167: log.debug("Setting screen debug to:" + printtoscreen);
168: mPrintToScreen = printtoscreen;
169: }
170:
171: public void setSynchronousReplication(boolean flag) {
172: synchronousReplication = flag;
173: }
174:
175: /**
176: * Override persistence since they don't go hand in hand with replication for now.
177: */
178: public void unload() throws IOException {
179: if (!getDistributable()) {
180: super .unload();
181: }
182: }
183:
184: /**
185: * Creates a HTTP session.
186: * Most of the code in here is copied from the StandardManager.
187: * This is not pretty, yeah I know, but it was necessary since the
188: * StandardManager had hard coded the session instantiation to the a
189: * StandardSession, when we actually want to instantiate a ReplicatedSession<BR>
190: * If the call comes from the Tomcat servlet engine, a SessionMessage goes out to the other
191: * nodes in the cluster that this session has been created.
192: * @param notify - if set to true the other nodes in the cluster will be notified.
193: * This flag is needed so that we can create a session before we deserialize
194: * a replicated one
195: *
196: * @see ReplicatedSession
197: */
198: protected Session createSession(String sessionId, boolean notify,
199: boolean setId) {
200:
201: //inherited from the basic manager
202: if ((getMaxActiveSessions() >= 0)
203: && (sessions.size() >= getMaxActiveSessions()))
204: throw new IllegalStateException(sm
205: .getString("standardManager.createSession.ise"));
206:
207: Session session = new ReplicatedSession(this );
208:
209: // Initialize the properties of the new session and return it
210: session.setNew(true);
211: session.setValid(true);
212: session.setCreationTime(System.currentTimeMillis());
213: session.setMaxInactiveInterval(this .maxInactiveInterval);
214: if (sessionId == null)
215: sessionId = generateSessionId();
216: if (setId)
217: session.setId(sessionId);
218: if (notify && (cluster != null)) {
219: ((ReplicatedSession) session).setIsDirty(true);
220: }
221: return (session);
222: }//createSession
223:
224: //=========================================================================
225: // OVERRIDE THESE METHODS TO IMPLEMENT THE REPLICATION
226: //=========================================================================
227:
228: /**
229: * Construct and return a new session object, based on the default
230: * settings specified by this Manager's properties. The session
231: * id will be assigned by this method, and available via the getId()
232: * method of the returned session. If a new session cannot be created
233: * for any reason, return <code>null</code>.
234: *
235: * @exception IllegalStateException if a new session cannot be
236: * instantiated for any reason
237: */
238: public Session createSession(String sessionId) {
239: //create a session and notify the other nodes in the cluster
240: Session session = createSession(sessionId, getDistributable(),
241: true);
242: add(session);
243: return session;
244: }
245:
246: public void sessionInvalidated(String sessionId) {
247: synchronized (invalidatedSessions) {
248: invalidatedSessions.put(sessionId, sessionId);
249: }
250: }
251:
252: public String[] getInvalidatedSessions() {
253: synchronized (invalidatedSessions) {
254: String[] result = new String[invalidatedSessions.size()];
255: invalidatedSessions.values().toArray(result);
256: return result;
257: }
258:
259: }
260:
261: public ClusterMessage requestCompleted(String sessionId) {
262: if (!getDistributable()) {
263: log
264: .warn("Received requestCompleted message, although this context["
265: + getName()
266: + "] is not distributable. Ignoring message");
267: return null;
268: }
269: try {
270: if (invalidatedSessions.get(sessionId) != null) {
271: synchronized (invalidatedSessions) {
272: invalidatedSessions.remove(sessionId);
273: SessionMessage msg = new SessionMessageImpl(name,
274: SessionMessage.EVT_SESSION_EXPIRED, null,
275: sessionId, sessionId);
276: return msg;
277: }
278: } else {
279: ReplicatedSession session = (ReplicatedSession) findSession(sessionId);
280: if (session != null) {
281: //return immediately if the session is not dirty
282: if (useDirtyFlag && (!session.isDirty())) {
283: //but before we return doing nothing,
284: //see if we should send
285: //an updated last access message so that
286: //sessions across cluster dont expire
287: long interval = session
288: .getMaxInactiveInterval();
289: long lastaccdist = System.currentTimeMillis()
290: - session.getLastAccessWasDistributed();
291: if (((interval * 1000) / lastaccdist) < 3) {
292: SessionMessage accmsg = new SessionMessageImpl(
293: name,
294: SessionMessage.EVT_SESSION_ACCESSED,
295: null, sessionId, sessionId);
296: session.setLastAccessWasDistributed(System
297: .currentTimeMillis());
298: return accmsg;
299: }
300: return null;
301: }
302:
303: session.setIsDirty(false);
304: if (log.isDebugEnabled()) {
305: try {
306: log.debug("Sending session to cluster="
307: + session);
308: } catch (Exception ignore) {
309: }
310: }
311: SessionMessage msg = new SessionMessageImpl(name,
312: SessionMessage.EVT_SESSION_CREATED,
313: writeSession(session), session
314: .getIdInternal(), session
315: .getIdInternal());
316: return msg;
317: } //end if
318: }//end if
319: } catch (Exception x) {
320: log.error("Unable to replicate session", x);
321: }
322: return null;
323: }
324:
325: /**
326: * Serialize a session into a byte array<BR>
327: * This method simple calls the writeObjectData method on the session
328: * and returns the byte data from that call
329: * @param session - the session to be serialized
330: * @return a byte array containing the session data, null if the serialization failed
331: */
332: protected byte[] writeSession(Session session) {
333: try {
334: java.io.ByteArrayOutputStream session_data = new java.io.ByteArrayOutputStream();
335: java.io.ObjectOutputStream session_out = new java.io.ObjectOutputStream(
336: session_data);
337: session_out.flush();
338: boolean hasPrincipal = session.getPrincipal() != null;
339: session_out.writeBoolean(hasPrincipal);
340: if (hasPrincipal) {
341: session_out.writeObject(SerializablePrincipal
342: .createPrincipal((GenericPrincipal) session
343: .getPrincipal()));
344: }//end if
345: ((ReplicatedSession) session).writeObjectData(session_out);
346: return session_data.toByteArray();
347:
348: } catch (Exception x) {
349: log.error("Failed to serialize the session!", x);
350: }
351: return null;
352: }
353:
354: /**
355: * Open Stream and use correct ClassLoader (Container) Switch
356: * ThreadClassLoader
357: *
358: * @param data
359: * @return The object input stream
360: * @throws IOException
361: */
362: public ReplicationStream getReplicationStream(byte[] data)
363: throws IOException {
364: return getReplicationStream(data, 0, data.length);
365: }
366:
367: public ReplicationStream getReplicationStream(byte[] data,
368: int offset, int length) throws IOException {
369: ByteArrayInputStream fis = null;
370: ReplicationStream ois = null;
371: Loader loader = null;
372: ClassLoader classLoader = null;
373: //fix to be able to run the DeltaManager
374: //stand alone without a container.
375: //use the Threads context class loader
376: if (container != null)
377: loader = container.getLoader();
378: if (loader != null)
379: classLoader = loader.getClassLoader();
380: else
381: classLoader = Thread.currentThread()
382: .getContextClassLoader();
383: //end fix
384: fis = new ByteArrayInputStream(data, offset, length);
385: if (classLoader == Thread.currentThread()
386: .getContextClassLoader()) {
387: ois = new ReplicationStream(fis,
388: new ClassLoader[] { classLoader });
389: } else {
390: ois = new ReplicationStream(fis, new ClassLoader[] {
391: classLoader,
392: Thread.currentThread().getContextClassLoader() });
393: }
394: return ois;
395: }
396:
397: /**
398: * Reinstantiates a serialized session from the data passed in.
399: * This will first call createSession() so that we get a fresh instance with all
400: * the managers set and all the transient fields validated.
401: * Then it calls Session.readObjectData(byte[]) to deserialize the object
402: * @param data - a byte array containing session data
403: * @return a valid Session object, null if an error occurs
404: *
405: */
406: protected Session readSession(byte[] data, String sessionId) {
407: try {
408: ReplicationStream session_in = getReplicationStream(data);
409:
410: Session session = sessionId != null ? this
411: .findSession(sessionId) : null;
412: boolean isNew = (session == null);
413: //clear the old values from the existing session
414: if (session != null) {
415: ReplicatedSession rs = (ReplicatedSession) session;
416: rs.expire(false); //cleans up the previous values, since we are not doing removes
417: session = null;
418: }//end if
419:
420: if (session == null) {
421: session = createSession(null, false, false);
422: sessions.remove(session.getIdInternal());
423: }
424:
425: boolean hasPrincipal = session_in.readBoolean();
426: SerializablePrincipal p = null;
427: if (hasPrincipal)
428: p = (SerializablePrincipal) session_in.readObject();
429: ((ReplicatedSession) session).readObjectData(session_in);
430: if (hasPrincipal)
431: session.setPrincipal(p.getPrincipal(getContainer()
432: .getRealm()));
433: ((ReplicatedSession) session).setId(sessionId, isNew);
434: ReplicatedSession rsession = (ReplicatedSession) session;
435: rsession.setAccessCount(1);
436: session.setManager(this );
437: session.setValid(true);
438: rsession.setLastAccessedTime(System.currentTimeMillis());
439: rsession.setThisAccessedTime(System.currentTimeMillis());
440: ((ReplicatedSession) session).setAccessCount(0);
441: session.setNew(false);
442: if (log.isTraceEnabled())
443: log.trace("Session loaded id=" + sessionId
444: + " actualId=" + session.getId() + " exists="
445: + this .sessions.containsKey(sessionId)
446: + " valid=" + rsession.isValid());
447: return session;
448:
449: } catch (Exception x) {
450: log.error("Failed to deserialize the session!", x);
451: }
452: return null;
453: }
454:
455: public String getName() {
456: return this .name;
457: }
458:
459: /**
460: * Prepare for the beginning of active use of the public methods of this
461: * component. This method should be called after <code>configure()</code>,
462: * and before any of the public methods of the component are utilized.<BR>
463: * Starts the cluster communication channel, this will connect with the other nodes
464: * in the cluster, and request the current session state to be transferred to this node.
465: * @exception IllegalStateException if this component has already been
466: * started
467: * @exception LifecycleException if this component detects a fatal error
468: * that prevents this component from being used
469: */
470: public void start() throws LifecycleException {
471: mManagerRunning = true;
472: super .start();
473: try {
474: //the channel is already running
475: if (mChannelStarted)
476: return;
477: if (log.isInfoEnabled())
478: log.info("Starting clustering manager...:" + getName());
479: if (cluster == null) {
480: log
481: .error("Starting... no cluster associated with this context:"
482: + getName());
483: return;
484: }
485: cluster.registerManager(this );
486:
487: if (cluster.getMembers().length > 0) {
488: Member mbr = cluster.getMembers()[0];
489: SessionMessage msg = new SessionMessageImpl(this
490: .getName(),
491: SessionMessage.EVT_GET_ALL_SESSIONS, null,
492: "GET-ALL", "GET-ALL-" + this .getName());
493: cluster.send(msg, mbr);
494: if (log.isWarnEnabled())
495: log
496: .warn("Manager["
497: + getName()
498: + "], requesting session state from "
499: + mbr
500: + ". This operation will timeout if no session state has been received within "
501: + "60 seconds");
502: long reqStart = System.currentTimeMillis();
503: long reqNow = 0;
504: boolean isTimeout = false;
505: do {
506: try {
507: Thread.sleep(100);
508: } catch (Exception sleep) {
509: }
510: reqNow = System.currentTimeMillis();
511: isTimeout = ((reqNow - reqStart) > (1000 * 60));
512: } while ((!isStateTransferred()) && (!isTimeout));
513: if (isTimeout || (!isStateTransferred())) {
514: log
515: .error("Manager["
516: + getName()
517: + "], No session state received, timing out.");
518: } else {
519: if (log.isInfoEnabled())
520: log.info("Manager[" + getName()
521: + "], session state received in "
522: + (reqNow - reqStart) + " ms.");
523: }
524: } else {
525: if (log.isInfoEnabled())
526: log
527: .info("Manager["
528: + getName()
529: + "], skipping state transfer. No members active in cluster group.");
530: }//end if
531: mChannelStarted = true;
532: } catch (Exception x) {
533: log.error("Unable to start SimpleTcpReplicationManager", x);
534: }
535: }
536:
537: /**
538: * Gracefully terminate the active use of the public methods of this
539: * component. This method should be the last one called on a given
540: * instance of this component.<BR>
541: * This will disconnect the cluster communication channel and stop the listener thread.
542: * @exception IllegalStateException if this component has not been started
543: * @exception LifecycleException if this component detects a fatal error
544: * that needs to be reported
545: */
546: public void stop() throws LifecycleException {
547: mManagerRunning = false;
548: mChannelStarted = false;
549: super .stop();
550: try {
551: this .sessions.clear();
552: cluster.removeManager(this );
553: } catch (Exception x) {
554: log.error("Unable to stop SimpleTcpReplicationManager", x);
555: }
556: }
557:
558: public void setDistributable(boolean dist) {
559: this .distributable = dist;
560: }
561:
562: public boolean getDistributable() {
563: return distributable;
564: }
565:
566: /**
567: * This method is called by the received thread when a SessionMessage has
568: * been received from one of the other nodes in the cluster.
569: * @param msg - the message received
570: * @param sender - the sender of the message, this is used if we receive a
571: * EVT_GET_ALL_SESSION message, so that we only reply to
572: * the requesting node
573: */
574: protected void messageReceived(SessionMessage msg, Member sender) {
575: try {
576: if (log.isInfoEnabled()) {
577: log.debug("Received SessionMessage of type="
578: + msg.getEventTypeString());
579: log.debug("Received SessionMessage sender=" + sender);
580: }
581: switch (msg.getEventType()) {
582: case SessionMessage.EVT_GET_ALL_SESSIONS: {
583: //get a list of all the session from this manager
584: Object[] sessions = findSessions();
585: java.io.ByteArrayOutputStream bout = new java.io.ByteArrayOutputStream();
586: java.io.ObjectOutputStream oout = new java.io.ObjectOutputStream(
587: bout);
588: oout.writeInt(sessions.length);
589: for (int i = 0; i < sessions.length; i++) {
590: ReplicatedSession ses = (ReplicatedSession) sessions[i];
591: oout.writeUTF(ses.getIdInternal());
592: byte[] data = writeSession(ses);
593: oout.writeObject(data);
594: }//for
595: //don't send a message if we don't have to
596: oout.flush();
597: oout.close();
598: byte[] data = bout.toByteArray();
599: SessionMessage newmsg = new SessionMessageImpl(name,
600: SessionMessage.EVT_ALL_SESSION_DATA, data,
601: "SESSION-STATE", "SESSION-STATE-" + getName());
602: cluster.send(newmsg, sender);
603: break;
604: }
605: case SessionMessage.EVT_ALL_SESSION_DATA: {
606: java.io.ByteArrayInputStream bin = new java.io.ByteArrayInputStream(
607: msg.getSession());
608: java.io.ObjectInputStream oin = new java.io.ObjectInputStream(
609: bin);
610: int size = oin.readInt();
611: for (int i = 0; i < size; i++) {
612: String id = oin.readUTF();
613: byte[] data = (byte[]) oin.readObject();
614: Session session = readSession(data, id);
615: }//for
616: stateTransferred = true;
617: break;
618: }
619: case SessionMessage.EVT_SESSION_CREATED: {
620: Session session = this .readSession(msg.getSession(),
621: msg.getSessionID());
622: if (log.isDebugEnabled()) {
623: log.debug("Received replicated session=" + session
624: + " isValid=" + session.isValid());
625: }
626: break;
627: }
628: case SessionMessage.EVT_SESSION_EXPIRED: {
629: Session session = findSession(msg.getSessionID());
630: if (session != null) {
631: session.expire();
632: this .remove(session);
633: }//end if
634: break;
635: }
636: case SessionMessage.EVT_SESSION_ACCESSED: {
637: Session session = findSession(msg.getSessionID());
638: if (session != null) {
639: session.access();
640: session.endAccess();
641: }
642: break;
643: }
644: default: {
645: //we didn't recognize the message type, do nothing
646: break;
647: }
648: }//switch
649: } catch (Exception x) {
650: log.error("Unable to receive message through TCP channel",
651: x);
652: }
653: }
654:
655: public void messageDataReceived(ClusterMessage cmsg) {
656: try {
657: if (cmsg instanceof SessionMessage) {
658: SessionMessage msg = (SessionMessage) cmsg;
659: messageReceived(msg,
660: msg.getAddress() != null ? (Member) msg
661: .getAddress() : null);
662: }
663: } catch (Throwable ex) {
664: log.error(
665: "InMemoryReplicationManager.messageDataReceived()",
666: ex);
667: }//catch
668: }
669:
670: public boolean isStateTransferred() {
671: return stateTransferred;
672: }
673:
674: public void setName(String name) {
675: this .name = name;
676: }
677:
678: public boolean isNotifyListenersOnReplication() {
679: return notifyListenersOnReplication;
680: }
681:
682: public void setNotifyListenersOnReplication(
683: boolean notifyListenersOnReplication) {
684: this .notifyListenersOnReplication = notifyListenersOnReplication;
685: }
686:
687: /*
688: * @see org.apache.catalina.ha.ClusterManager#getCluster()
689: */
690: public CatalinaCluster getCluster() {
691: return cluster;
692: }
693:
694: public ClusterManager cloneFromTemplate() {
695: throw new UnsupportedOperationException();
696: }
697:
698: }
|