001: /*
002: * Copyright 1999,2004 The Apache Software Foundation.
003: *
004: * Licensed under the Apache License, Version 2.0 (the "License");
005: * you may not use this file except in compliance with the License.
006: * You may obtain a copy of the License at
007: *
008: * http://www.apache.org/licenses/LICENSE-2.0
009: *
010: * Unless required by applicable law or agreed to in writing, software
011: * distributed under the License is distributed on an "AS IS" BASIS,
012: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013: * See the License for the specific language governing permissions and
014: * limitations under the License.
015: */
016: package org.apache.catalina.cluster.session;
017:
018: import java.io.*;
019: import org.apache.catalina.Session;
020: import org.apache.catalina.realm.GenericPrincipal;
021: import org.apache.catalina.LifecycleException;
022: import org.apache.catalina.util.CustomObjectInputStream;
023: import org.apache.catalina.cluster.io.ListenCallback;
024: import org.apache.catalina.cluster.tcp.ReplicationListener;
025: import org.apache.catalina.cluster.tcp.ReplicationTransmitter;
026: import org.apache.catalina.cluster.tcp.IDataSender;
027: import org.apache.catalina.cluster.tcp.SocketSender;
028: import org.apache.catalina.cluster.Member;
029: import org.apache.catalina.cluster.CatalinaCluster;
030: import org.apache.catalina.cluster.SessionMessage;
031:
032: import java.net.InetAddress;
033:
034: /**
035: * Title: Tomcat Session Replication for Tomcat 4.0 <BR>
036: * Description: A very simple straight forward implementation of
037: * session replication of servers in a cluster.<BR>
038: * This session replication is implemented "live". By live
039: * I mean, when a session attribute is added into a session on Node A
040: * a message is broadcasted to other messages and setAttribute is called on the
041: * replicated sessions.<BR>
042: * A full description of this implementation can be found under
043: * <href="http://www.filip.net/tomcat/">Filip's Tomcat Page</a><BR>
044: *
045: * Copyright: See apache license
046: * Company: www.filip.net
047: * @author <a href="mailto:mail@filip.net">Filip Hanik</a>
048: * @author Bela Ban (modifications for synchronous replication)
049: * @version 1.0 for TC 4.0
050: * Description: The InMemoryReplicationManager is a session manager that replicated
051: * session information in memory. It uses <a href="www.javagroups.com">JavaGroups</a> as
052: * a communication protocol to ensure guaranteed and ordered message delivery.
053: * JavaGroups also provides a very flexible protocol stack to ensure that the replication
054: * can be used in any environment.
055: * <BR><BR>
056: * The InMemoryReplicationManager extends the StandardManager hence it allows for us
057: * to inherit all the basic session management features like expiration, session listeners etc
058: * <BR><BR>
059: * To communicate with other nodes in the cluster, the InMemoryReplicationManager sends out 7 different type of multicast messages
060: * all defined in the SessionMessage class.<BR>
061: * When a session is replicated (not an attribute added/removed) the session is serialized into
062: * a byte array using the StandardSession.readObjectData, StandardSession.writeObjectData methods.
063: */
064: public class SimpleTcpReplicationManager extends
065: org.apache.catalina.session.StandardManager implements
066: org.apache.catalina.cluster.ClusterManager {
067: public static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory
068: .getLog(SimpleTcpReplicationManager.class);
069:
070: //the channel configuration
071: protected String mChannelConfig = null;
072:
073: //the group name
074: protected String mGroupName = "TomcatReplication";
075:
076: //somehow start() gets called more than once
077: protected boolean mChannelStarted = false;
078:
079: //log to screen
080: protected boolean mPrintToScreen = true;
081:
082: protected boolean mManagerRunning = false;
083:
084: /** Use synchronous rather than asynchronous replication. Every session modification (creation, change, removal etc)
085: * will be sent to all members. The call will then wait for max milliseconds, or forever (if timeout is 0) for
086: * all responses.
087: */
088: protected boolean synchronousReplication = true;
089:
090: /** Set to true if we don't want the sessions to expire on shutdown */
091: protected boolean mExpireSessionsOnShutdown = true;
092:
093: protected boolean useDirtyFlag = false;
094:
095: protected String name;
096:
097: protected boolean distributable = true;
098:
099: protected CatalinaCluster cluster;
100:
101: protected java.util.HashMap invalidatedSessions = new java.util.HashMap();
102:
103: /**
104: * Flag to keep track if the state has been transferred or not
105: * Assumes false.
106: */
107: protected boolean stateTransferred = false;
108:
109: /**
110: * Constructor, just calls super()
111: *
112: */
113: public SimpleTcpReplicationManager() {
114: super ();
115: }
116:
117: public boolean isManagerRunning() {
118: return mManagerRunning;
119: }
120:
121: public void setUseDirtyFlag(boolean usedirtyflag) {
122: this .useDirtyFlag = usedirtyflag;
123: }
124:
125: public void setExpireSessionsOnShutdown(
126: boolean expireSessionsOnShutdown) {
127: mExpireSessionsOnShutdown = expireSessionsOnShutdown;
128: }
129:
130: public void setCluster(CatalinaCluster cluster) {
131: this .log
132: .debug("Cluster associated with SimpleTcpReplicationManager");
133: this .cluster = cluster;
134: }
135:
136: public boolean getExpireSessionsOnShutdown() {
137: return mExpireSessionsOnShutdown;
138: }
139:
140: public void setPrintToScreen(boolean printtoscreen) {
141: log.debug("Setting screen debug to:" + printtoscreen);
142: mPrintToScreen = printtoscreen;
143: }
144:
145: public void setSynchronousReplication(boolean flag) {
146: synchronousReplication = flag;
147: }
148:
149: /**
150: * Override persistence since they don't go hand in hand with replication for now.
151: */
152: public void unload() throws IOException {
153: if (!getDistributable()) {
154: super .unload();
155: }
156: }
157:
158: /**
159: * Creates a HTTP session.
160: * Most of the code in here is copied from the StandardManager.
161: * This is not pretty, yeah I know, but it was necessary since the
162: * StandardManager had hard coded the session instantiation to the a
163: * StandardSession, when we actually want to instantiate a ReplicatedSession<BR>
164: * If the call comes from the Tomcat servlet engine, a SessionMessage goes out to the other
165: * nodes in the cluster that this session has been created.
166: * @param notify - if set to true the other nodes in the cluster will be notified.
167: * This flag is needed so that we can create a session before we deserialize
168: * a replicated one
169: *
170: * @see ReplicatedSession
171: */
172: protected Session createSession(boolean notify, boolean setId) {
173:
174: //inherited from the basic manager
175: if ((getMaxActiveSessions() >= 0)
176: && (sessions.size() >= getMaxActiveSessions()))
177: throw new IllegalStateException(sm
178: .getString("standardManager.createSession.ise"));
179:
180: Session session = new ReplicatedSession(this );
181:
182: // Initialize the properties of the new session and return it
183: session.setNew(true);
184: session.setValid(true);
185: session.setCreationTime(System.currentTimeMillis());
186: session.setMaxInactiveInterval(this .maxInactiveInterval);
187: String sessionId = generateSessionId();
188: String jvmRoute = getJvmRoute();
189: // @todo Move appending of jvmRoute generateSessionId()???
190: if (jvmRoute != null) {
191: sessionId += '.' + jvmRoute;
192: }
193: if (setId)
194: session.setId(sessionId);
195: if (notify && (cluster != null)) {
196: ((ReplicatedSession) session).setIsDirty(true);
197: }
198: return (session);
199: }//createSession
200:
201: //=========================================================================
202: // OVERRIDE THESE METHODS TO IMPLEMENT THE REPLICATION
203: //=========================================================================
204:
205: /**
206: * Construct and return a new session object, based on the default
207: * settings specified by this Manager's properties. The session
208: * id will be assigned by this method, and available via the getId()
209: * method of the returned session. If a new session cannot be created
210: * for any reason, return <code>null</code>.
211: *
212: * @exception IllegalStateException if a new session cannot be
213: * instantiated for any reason
214: */
215: public Session createSession() {
216: //create a session and notify the other nodes in the cluster
217: Session session = createSession(getDistributable(), true);
218: add(session);
219: return session;
220: }
221:
222: public void sessionInvalidated(String sessionId) {
223: synchronized (invalidatedSessions) {
224: invalidatedSessions.put(sessionId, sessionId);
225: }
226: }
227:
228: public String[] getInvalidatedSessions() {
229: synchronized (invalidatedSessions) {
230: String[] result = new String[invalidatedSessions.size()];
231: invalidatedSessions.values().toArray(result);
232: return result;
233: }
234:
235: }
236:
237: public SessionMessage requestCompleted(String sessionId) {
238: if (!getDistributable()) {
239: log
240: .warn("Received requestCompleted message, although this context["
241: + getName()
242: + "] is not distributable. Ignoring message");
243: return null;
244: }
245: //notify javagroups
246: try {
247: if (invalidatedSessions.get(sessionId) != null) {
248: synchronized (invalidatedSessions) {
249: invalidatedSessions.remove(sessionId);
250: SessionMessage msg = new SessionMessageImpl(name,
251: SessionMessage.EVT_SESSION_EXPIRED, null,
252: sessionId, sessionId);
253: return msg;
254: }
255: } else {
256: ReplicatedSession session = (ReplicatedSession) findSession(sessionId);
257: if (session != null) {
258: //return immediately if the session is not dirty
259: if (useDirtyFlag && (!session.isDirty())) {
260: //but before we return doing nothing,
261: //see if we should send
262: //an updated last access message so that
263: //sessions across cluster dont expire
264: long interval = session
265: .getMaxInactiveInterval();
266: long lastaccdist = System.currentTimeMillis()
267: - session.getLastAccessWasDistributed();
268: if (((interval * 1000) / lastaccdist) < 3) {
269: SessionMessage accmsg = new SessionMessageImpl(
270: name,
271: SessionMessage.EVT_SESSION_ACCESSED,
272: null, sessionId, sessionId);
273: session.setLastAccessWasDistributed(System
274: .currentTimeMillis());
275: return accmsg;
276: }
277: return null;
278: }
279:
280: session.setIsDirty(false);
281: if (log.isDebugEnabled()) {
282: try {
283: log.debug("Sending session to cluster="
284: + session);
285: } catch (Exception ignore) {
286: }
287: }
288: SessionMessage msg = new SessionMessageImpl(name,
289: SessionMessage.EVT_SESSION_CREATED,
290: writeSession(session), session.getId(),
291: session.getId());
292: return msg;
293: } //end if
294: }//end if
295: } catch (Exception x) {
296: log.error("Unable to replicate session", x);
297: }
298: return null;
299: }
300:
301: /**
302: * Serialize a session into a byte array<BR>
303: * This method simple calls the writeObjectData method on the session
304: * and returns the byte data from that call
305: * @param session - the session to be serialized
306: * @return a byte array containing the session data, null if the serialization failed
307: */
308: protected byte[] writeSession(Session session) {
309: try {
310: java.io.ByteArrayOutputStream session_data = new java.io.ByteArrayOutputStream();
311: java.io.ObjectOutputStream session_out = new java.io.ObjectOutputStream(
312: session_data);
313: session_out.flush();
314: boolean hasPrincipal = session.getPrincipal() != null;
315: session_out.writeBoolean(hasPrincipal);
316: if (hasPrincipal) {
317: session_out.writeObject(SerializablePrincipal
318: .createPrincipal((GenericPrincipal) session
319: .getPrincipal()));
320: }//end if
321: ((ReplicatedSession) session).writeObjectData(session_out);
322: return session_data.toByteArray();
323:
324: } catch (Exception x) {
325: log.error("Failed to serialize the session!", x);
326: }
327: return null;
328: }
329:
330: /**
331: * Reinstantiates a serialized session from the data passed in.
332: * This will first call createSession() so that we get a fresh instance with all
333: * the managers set and all the transient fields validated.
334: * Then it calls Session.readObjectData(byte[]) to deserialize the object
335: * @param data - a byte array containing session data
336: * @return a valid Session object, null if an error occurs
337: *
338: */
339: protected Session readSession(byte[] data, String sessionId) {
340: try {
341: java.io.ByteArrayInputStream session_data = new java.io.ByteArrayInputStream(
342: data);
343: ReplicationStream session_in = new ReplicationStream(
344: session_data, container.getLoader()
345: .getClassLoader());
346:
347: Session session = sessionId != null ? this
348: .findSession(sessionId) : null;
349: boolean isNew = (session == null);
350: //clear the old values from the existing session
351: if (session != null) {
352: ReplicatedSession rs = (ReplicatedSession) session;
353: rs.expire(false); //cleans up the previous values, since we are not doing removes
354: session = null;
355: }//end if
356:
357: if (session == null) {
358: session = createSession(false, false);
359: sessions.remove(session.getId());
360: }
361:
362: boolean hasPrincipal = session_in.readBoolean();
363: SerializablePrincipal p = null;
364: if (hasPrincipal)
365: p = (SerializablePrincipal) session_in.readObject();
366: ((ReplicatedSession) session).readObjectData(session_in);
367: if (hasPrincipal)
368: session.setPrincipal(p.getPrincipal(getContainer()
369: .getRealm()));
370: ((ReplicatedSession) session).setId(sessionId, isNew);
371: ReplicatedSession rsession = (ReplicatedSession) session;
372: rsession.setAccessCount(1);
373: session.setManager(this );
374: session.setValid(true);
375: rsession.setLastAccessedTime(System.currentTimeMillis());
376: rsession.setThisAccessedTime(System.currentTimeMillis());
377: ((ReplicatedSession) session).setAccessCount(0);
378: session.setNew(false);
379: // System.out.println("Session loaded id="+sessionId +
380: // " actualId="+session.getId()+
381: // " exists="+this.sessions.containsKey(sessionId)+
382: // " valid="+rsession.isValid());
383: return session;
384:
385: } catch (Exception x) {
386: log.error("Failed to deserialize the session!", x);
387: }
388: return null;
389: }
390:
391: public String getName() {
392: return this .name;
393: }
394:
395: /**
396: * Prepare for the beginning of active use of the public methods of this
397: * component. This method should be called after <code>configure()</code>,
398: * and before any of the public methods of the component are utilized.<BR>
399: * Starts the cluster communication channel, this will connect with the other nodes
400: * in the cluster, and request the current session state to be transferred to this node.
401: * @exception IllegalStateException if this component has already been
402: * started
403: * @exception LifecycleException if this component detects a fatal error
404: * that prevents this component from being used
405: */
406: public void start() throws LifecycleException {
407: mManagerRunning = true;
408: super .start();
409: //start the javagroups channel
410: try {
411: //the channel is already running
412: if (mChannelStarted)
413: return;
414: log.error("Starting clustering manager...:" + getName());
415: if (cluster == null) {
416: log
417: .error("Starting... no cluster associated with this context:"
418: + getName());
419: return;
420: }
421:
422: if (cluster.getMembers().length > 0) {
423: Member mbr = cluster.getMembers()[0];
424: SessionMessage msg = new SessionMessageImpl(this
425: .getName(),
426: SessionMessage.EVT_GET_ALL_SESSIONS, null,
427: "GET-ALL", "GET-ALL-" + this .getName());
428: cluster.send(msg, mbr);
429: log
430: .warn("Manager["
431: + getName()
432: + "], requesting session state from "
433: + mbr
434: + ". This operation will timeout if no session state has been received within "
435: + "60 seconds");
436: long reqStart = System.currentTimeMillis();
437: long reqNow = 0;
438: boolean isTimeout = false;
439: do {
440: try {
441: Thread.currentThread().sleep(100);
442: } catch (Exception sleep) {
443: }
444: reqNow = System.currentTimeMillis();
445: isTimeout = ((reqNow - reqStart) > (1000 * 60));
446: } while ((!isStateTransferred()) && (!isTimeout));
447: if (isTimeout || (!isStateTransferred())) {
448: log
449: .error("Manager["
450: + getName()
451: + "], No session state received, timing out.");
452: } else {
453: log.info("Manager[" + getName()
454: + "], session state received in "
455: + (reqNow - reqStart) + " ms.");
456: }
457: } else {
458: log
459: .info("Manager["
460: + getName()
461: + "], skipping state transfer. No members active in cluster group.");
462: }//end if
463: mChannelStarted = true;
464: } catch (Exception x) {
465: log.error("Unable to start SimpleTcpReplicationManager", x);
466: }
467: }
468:
469: /**
470: * Gracefully terminate the active use of the public methods of this
471: * component. This method should be the last one called on a given
472: * instance of this component.<BR>
473: * This will disconnect the cluster communication channel and stop the listener thread.
474: * @exception IllegalStateException if this component has not been started
475: * @exception LifecycleException if this component detects a fatal error
476: * that needs to be reported
477: */
478: public void stop() throws LifecycleException {
479: mManagerRunning = false;
480: mChannelStarted = false;
481: super .stop();
482: //stop the javagroup channel
483: try {
484: cluster.removeManager(getName());
485: // mReplicationListener.stopListening();
486: // mReplicationTransmitter.stop();
487: // service.stop();
488: // service = null;
489: } catch (Exception x) {
490: log.error("Unable to stop SimpleTcpReplicationManager", x);
491: }
492: }
493:
494: public void setDistributable(boolean dist) {
495: this .distributable = dist;
496: }
497:
498: public boolean getDistributable() {
499: return distributable;
500: }
501:
502: /**
503: * This method is called by the received thread when a SessionMessage has
504: * been received from one of the other nodes in the cluster.
505: * @param msg - the message received
506: * @param sender - the sender of the message, this is used if we receive a
507: * EVT_GET_ALL_SESSION message, so that we only reply to
508: * the requesting node
509: */
510: protected void messageReceived(SessionMessage msg, Member sender) {
511: try {
512: log.debug("Received SessionMessage of type="
513: + msg.getEventTypeString());
514: log.debug("Received SessionMessage sender=" + sender);
515: switch (msg.getEventType()) {
516: case SessionMessage.EVT_GET_ALL_SESSIONS: {
517: //get a list of all the session from this manager
518: Object[] sessions = findSessions();
519: java.io.ByteArrayOutputStream bout = new java.io.ByteArrayOutputStream();
520: java.io.ObjectOutputStream oout = new java.io.ObjectOutputStream(
521: bout);
522: oout.writeInt(sessions.length);
523: for (int i = 0; i < sessions.length; i++) {
524: ReplicatedSession ses = (ReplicatedSession) sessions[i];
525: oout.writeUTF(ses.getId());
526: byte[] data = writeSession(ses);
527: oout.writeObject(data);
528: }//for
529: //don't send a message if we don't have to
530: oout.flush();
531: oout.close();
532: byte[] data = bout.toByteArray();
533: SessionMessage newmsg = new SessionMessageImpl(name,
534: SessionMessage.EVT_ALL_SESSION_DATA, data,
535: "SESSION-STATE", "SESSION-STATE-" + getName());
536: cluster.send(newmsg, sender);
537: break;
538: }
539: case SessionMessage.EVT_ALL_SESSION_DATA: {
540: java.io.ByteArrayInputStream bin = new java.io.ByteArrayInputStream(
541: msg.getSession());
542: java.io.ObjectInputStream oin = new java.io.ObjectInputStream(
543: bin);
544: int size = oin.readInt();
545: for (int i = 0; i < size; i++) {
546: String id = oin.readUTF();
547: byte[] data = (byte[]) oin.readObject();
548: Session session = readSession(data, id);
549: }//for
550: stateTransferred = true;
551: break;
552: }
553: case SessionMessage.EVT_SESSION_CREATED: {
554: Session session = this .readSession(msg.getSession(),
555: msg.getSessionID());
556: if (log.isDebugEnabled()) {
557: log.debug("Received replicated session=" + session
558: + " isValid=" + session.isValid());
559: }
560: break;
561: }
562: case SessionMessage.EVT_SESSION_EXPIRED: {
563: Session session = findSession(msg.getSessionID());
564: if (session != null) {
565: session.expire();
566: this .remove(session);
567: }//end if
568: break;
569: }
570: case SessionMessage.EVT_SESSION_ACCESSED: {
571: Session session = findSession(msg.getSessionID());
572: if (session != null) {
573: session.access();
574: session.endAccess();
575: }
576: break;
577: }
578: default: {
579: //we didn't recognize the message type, do nothing
580: break;
581: }
582: }//switch
583: } catch (Exception x) {
584: log.error("Unable to receive message through TCP channel",
585: x);
586: }
587: }
588:
589: public void messageDataReceived(SessionMessage msg) {
590: try {
591: messageReceived(msg,
592: msg.getAddress() != null ? (Member) msg
593: .getAddress() : null);
594: } catch (Throwable ex) {
595: log.error(
596: "InMemoryReplicationManager.messageDataReceived()",
597: ex);
598: }//catch
599: }
600:
601: public boolean isStateTransferred() {
602: return stateTransferred;
603: }
604:
605: public void setName(String name) {
606: this.name = name;
607: }
608: }
|