Source Code Cross Referenced for DeltaManager.java in  » Sevlet-Container » apache-tomcat-6.0.14 » org » apache » catalina » ha » session » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Sevlet Container » apache tomcat 6.0.14 » org.apache.catalina.ha.session 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


0001:        /*
0002:         * Licensed to the Apache Software Foundation (ASF) under one or more
0003:         * contributor license agreements.  See the NOTICE file distributed with
0004:         * this work for additional information regarding copyright ownership.
0005:         * The ASF licenses this file to You under the Apache License, Version 2.0
0006:         * (the "License"); you may not use this file except in compliance with
0007:         * the License.  You may obtain a copy of the License at
0008:         * 
0009:         *      http://www.apache.org/licenses/LICENSE-2.0
0010:         * 
0011:         * Unless required by applicable law or agreed to in writing, software
0012:         * distributed under the License is distributed on an "AS IS" BASIS,
0013:         * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014:         * See the License for the specific language governing permissions and
0015:         * limitations under the License.
0016:         */
0017:
0018:        package org.apache.catalina.ha.session;
0019:
0020:        import java.beans.PropertyChangeEvent;
0021:        import java.io.BufferedOutputStream;
0022:        import java.io.ByteArrayOutputStream;
0023:        import java.io.IOException;
0024:        import java.io.ObjectInputStream;
0025:        import java.io.ObjectOutputStream;
0026:        import java.util.ArrayList;
0027:        import java.util.Date;
0028:        import java.util.Iterator;
0029:
0030:        import org.apache.catalina.Cluster;
0031:        import org.apache.catalina.Container;
0032:        import org.apache.catalina.Context;
0033:        import org.apache.catalina.Engine;
0034:        import org.apache.catalina.Host;
0035:        import org.apache.catalina.LifecycleException;
0036:        import org.apache.catalina.LifecycleListener;
0037:        import org.apache.catalina.Session;
0038:        import org.apache.catalina.Valve;
0039:        import org.apache.catalina.core.StandardContext;
0040:        import org.apache.catalina.ha.CatalinaCluster;
0041:        import org.apache.catalina.ha.ClusterMessage;
0042:        import org.apache.catalina.ha.tcp.ReplicationValve;
0043:        import org.apache.catalina.tribes.Member;
0044:        import org.apache.catalina.tribes.io.ReplicationStream;
0045:        import org.apache.catalina.util.LifecycleSupport;
0046:        import org.apache.catalina.util.StringManager;
0047:        import org.apache.catalina.ha.ClusterManager;
0048:
0049:        /**
0050:         * The DeltaManager manages replicated sessions by only replicating the deltas
0051:         * in data. For applications written to handle this, the DeltaManager is the
0052:         * optimal way of replicating data.
0053:         * 
0054:         * This code is almost identical to StandardManager with a difference in how it
0055:         * persists sessions and some modifications to it.
0056:         * 
0057:         * <b>IMPLEMENTATION NOTE </b>: Correct behavior of session storing and
0058:         * reloading depends upon external calls to the <code>start()</code> and
0059:         * <code>stop()</code> methods of this class at the correct times.
0060:         * 
0061:         * @author Filip Hanik
0062:         * @author Craig R. McClanahan
0063:         * @author Jean-Francois Arcand
0064:         * @author Peter Rossbach
0065:         * @version $Revision: 467222 $ $Date: 2006-10-24 05:17:11 +0200 (mar., 24 oct. 2006) $
0066:         */
0067:
0068:        public class DeltaManager extends ClusterManagerBase {
0069:
0070:            // ---------------------------------------------------- Security Classes
0071:            public static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory
0072:                    .getLog(DeltaManager.class);
0073:
0074:            /**
0075:             * The string manager for this package.
0076:             */
0077:            protected static StringManager sm = StringManager
0078:                    .getManager(Constants.Package);
0079:
0080:            // ----------------------------------------------------- Instance Variables
0081:
0082:            /**
0083:             * The descriptive information about this implementation.
0084:             */
0085:            private static final String info = "DeltaManager/2.1";
0086:
0087:            /**
0088:             * Has this component been started yet?
0089:             */
0090:            private boolean started = false;
0091:
0092:            /**
0093:             * The descriptive name of this Manager implementation (for logging).
0094:             */
0095:            protected static String managerName = "DeltaManager";
0096:            protected String name = null;
0097:            protected boolean defaultMode = false;
0098:            private CatalinaCluster cluster = null;
0099:
0100:            /**
0101:             * cached replication valve cluster container!
0102:             */
0103:            private ReplicationValve replicationValve = null;
0104:
0105:            /**
0106:             * The lifecycle event support for this component.
0107:             */
0108:            protected LifecycleSupport lifecycle = new LifecycleSupport(this );
0109:
0110:            /**
0111:             * The maximum number of active Sessions allowed, or -1 for no limit.
0112:             */
0113:            private int maxActiveSessions = -1;
0114:            private boolean expireSessionsOnShutdown = false;
0115:            private boolean notifyListenersOnReplication = true;
0116:            private boolean notifySessionListenersOnReplication = true;
0117:            private boolean stateTransfered = false;
0118:            private int stateTransferTimeout = 60;
0119:            private boolean sendAllSessions = true;
0120:            private boolean sendClusterDomainOnly = true;
0121:            private int sendAllSessionsSize = 1000;
0122:
0123:            /**
0124:             * wait time between send session block (default 2 sec) 
0125:             */
0126:            private int sendAllSessionsWaitTime = 2 * 1000;
0127:            private ArrayList receivedMessageQueue = new ArrayList();
0128:            private boolean receiverQueue = false;
0129:            private boolean stateTimestampDrop = true;
0130:            private long stateTransferCreateSendTime;
0131:
0132:            // ------------------------------------------------------------------ stats attributes
0133:
0134:            int rejectedSessions = 0;
0135:            private long sessionReplaceCounter = 0;
0136:            long processingTime = 0;
0137:            private long counterReceive_EVT_GET_ALL_SESSIONS = 0;
0138:            private long counterSend_EVT_ALL_SESSION_DATA = 0;
0139:            private long counterReceive_EVT_ALL_SESSION_DATA = 0;
0140:            private long counterReceive_EVT_SESSION_CREATED = 0;
0141:            private long counterReceive_EVT_SESSION_EXPIRED = 0;
0142:            private long counterReceive_EVT_SESSION_ACCESSED = 0;
0143:            private long counterReceive_EVT_SESSION_DELTA = 0;
0144:            private long counterSend_EVT_GET_ALL_SESSIONS = 0;
0145:            private long counterSend_EVT_SESSION_CREATED = 0;
0146:            private long counterSend_EVT_SESSION_DELTA = 0;
0147:            private long counterSend_EVT_SESSION_ACCESSED = 0;
0148:            private long counterSend_EVT_SESSION_EXPIRED = 0;
0149:            private int counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE = 0;
0150:            private int counterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE = 0;
0151:            private int counterNoStateTransfered = 0;
0152:
0153:            // ------------------------------------------------------------- Constructor
0154:            public DeltaManager() {
0155:                super ();
0156:            }
0157:
0158:            // ------------------------------------------------------------- Properties
0159:
0160:            /**
0161:             * Return descriptive information about this Manager implementation and the
0162:             * corresponding version number, in the format
0163:             * <code>&lt;description&gt;/&lt;version&gt;</code>.
0164:             */
0165:            public String getInfo() {
0166:                return info;
0167:            }
0168:
0169:            public void setName(String name) {
0170:                this .name = name;
0171:            }
0172:
0173:            /**
0174:             * Return the descriptive short name of this Manager implementation.
0175:             */
0176:            public String getName() {
0177:                return name;
0178:            }
0179:
0180:            /**
0181:             * @return Returns the counterSend_EVT_GET_ALL_SESSIONS.
0182:             */
0183:            public long getCounterSend_EVT_GET_ALL_SESSIONS() {
0184:                return counterSend_EVT_GET_ALL_SESSIONS;
0185:            }
0186:
0187:            /**
0188:             * @return Returns the counterSend_EVT_SESSION_ACCESSED.
0189:             */
0190:            public long getCounterSend_EVT_SESSION_ACCESSED() {
0191:                return counterSend_EVT_SESSION_ACCESSED;
0192:            }
0193:
0194:            /**
0195:             * @return Returns the counterSend_EVT_SESSION_CREATED.
0196:             */
0197:            public long getCounterSend_EVT_SESSION_CREATED() {
0198:                return counterSend_EVT_SESSION_CREATED;
0199:            }
0200:
0201:            /**
0202:             * @return Returns the counterSend_EVT_SESSION_DELTA.
0203:             */
0204:            public long getCounterSend_EVT_SESSION_DELTA() {
0205:                return counterSend_EVT_SESSION_DELTA;
0206:            }
0207:
0208:            /**
0209:             * @return Returns the counterSend_EVT_SESSION_EXPIRED.
0210:             */
0211:            public long getCounterSend_EVT_SESSION_EXPIRED() {
0212:                return counterSend_EVT_SESSION_EXPIRED;
0213:            }
0214:
0215:            /**
0216:             * @return Returns the counterSend_EVT_ALL_SESSION_DATA.
0217:             */
0218:            public long getCounterSend_EVT_ALL_SESSION_DATA() {
0219:                return counterSend_EVT_ALL_SESSION_DATA;
0220:            }
0221:
0222:            /**
0223:             * @return Returns the counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE.
0224:             */
0225:            public int getCounterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE() {
0226:                return counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE;
0227:            }
0228:
0229:            /**
0230:             * @return Returns the counterReceive_EVT_ALL_SESSION_DATA.
0231:             */
0232:            public long getCounterReceive_EVT_ALL_SESSION_DATA() {
0233:                return counterReceive_EVT_ALL_SESSION_DATA;
0234:            }
0235:
0236:            /**
0237:             * @return Returns the counterReceive_EVT_GET_ALL_SESSIONS.
0238:             */
0239:            public long getCounterReceive_EVT_GET_ALL_SESSIONS() {
0240:                return counterReceive_EVT_GET_ALL_SESSIONS;
0241:            }
0242:
0243:            /**
0244:             * @return Returns the counterReceive_EVT_SESSION_ACCESSED.
0245:             */
0246:            public long getCounterReceive_EVT_SESSION_ACCESSED() {
0247:                return counterReceive_EVT_SESSION_ACCESSED;
0248:            }
0249:
0250:            /**
0251:             * @return Returns the counterReceive_EVT_SESSION_CREATED.
0252:             */
0253:            public long getCounterReceive_EVT_SESSION_CREATED() {
0254:                return counterReceive_EVT_SESSION_CREATED;
0255:            }
0256:
0257:            /**
0258:             * @return Returns the counterReceive_EVT_SESSION_DELTA.
0259:             */
0260:            public long getCounterReceive_EVT_SESSION_DELTA() {
0261:                return counterReceive_EVT_SESSION_DELTA;
0262:            }
0263:
0264:            /**
0265:             * @return Returns the counterReceive_EVT_SESSION_EXPIRED.
0266:             */
0267:            public long getCounterReceive_EVT_SESSION_EXPIRED() {
0268:                return counterReceive_EVT_SESSION_EXPIRED;
0269:            }
0270:
0271:            /**
0272:             * @return Returns the counterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE.
0273:             */
0274:            public int getCounterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE() {
0275:                return counterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE;
0276:            }
0277:
0278:            /**
0279:             * @return Returns the processingTime.
0280:             */
0281:            public long getProcessingTime() {
0282:                return processingTime;
0283:            }
0284:
0285:            /**
0286:             * @return Returns the sessionReplaceCounter.
0287:             */
0288:            public long getSessionReplaceCounter() {
0289:                return sessionReplaceCounter;
0290:            }
0291:
0292:            /**
0293:             * Number of session creations that failed due to maxActiveSessions
0294:             * 
0295:             * @return The count
0296:             */
0297:            public int getRejectedSessions() {
0298:                return rejectedSessions;
0299:            }
0300:
0301:            public void setRejectedSessions(int rejectedSessions) {
0302:                this .rejectedSessions = rejectedSessions;
0303:            }
0304:
0305:            /**
0306:             * @return Returns the counterNoStateTransfered.
0307:             */
0308:            public int getCounterNoStateTransfered() {
0309:                return counterNoStateTransfered;
0310:            }
0311:
0312:            public int getReceivedQueueSize() {
0313:                return receivedMessageQueue.size();
0314:            }
0315:
0316:            /**
0317:             * @return Returns the stateTransferTimeout.
0318:             */
0319:            public int getStateTransferTimeout() {
0320:                return stateTransferTimeout;
0321:            }
0322:
0323:            /**
0324:             * @param timeoutAllSession The timeout
0325:             */
0326:            public void setStateTransferTimeout(int timeoutAllSession) {
0327:                this .stateTransferTimeout = timeoutAllSession;
0328:            }
0329:
0330:            /**
0331:             * is session state transfered complete?
0332:             * 
0333:             */
0334:            public boolean getStateTransfered() {
0335:                return stateTransfered;
0336:            }
0337:
0338:            /**
0339:             * set that state ist complete transfered  
0340:             * @param stateTransfered
0341:             */
0342:            public void setStateTransfered(boolean stateTransfered) {
0343:                this .stateTransfered = stateTransfered;
0344:            }
0345:
0346:            /**
0347:             * @return Returns the sendAllSessionsWaitTime in msec
0348:             */
0349:            public int getSendAllSessionsWaitTime() {
0350:                return sendAllSessionsWaitTime;
0351:            }
0352:
0353:            /**
0354:             * @param sendAllSessionsWaitTime The sendAllSessionsWaitTime to set at msec.
0355:             */
0356:            public void setSendAllSessionsWaitTime(int sendAllSessionsWaitTime) {
0357:                this .sendAllSessionsWaitTime = sendAllSessionsWaitTime;
0358:            }
0359:
0360:            /**
0361:             * @return Returns the sendClusterDomainOnly.
0362:             */
0363:            public boolean doDomainReplication() {
0364:                return sendClusterDomainOnly;
0365:            }
0366:
0367:            /**
0368:             * @param sendClusterDomainOnly The sendClusterDomainOnly to set.
0369:             */
0370:            public void setDomainReplication(boolean sendClusterDomainOnly) {
0371:                this .sendClusterDomainOnly = sendClusterDomainOnly;
0372:            }
0373:
0374:            /**
0375:             * @return Returns the stateTimestampDrop.
0376:             */
0377:            public boolean isStateTimestampDrop() {
0378:                return stateTimestampDrop;
0379:            }
0380:
0381:            /**
0382:             * @param isTimestampDrop The new flag value
0383:             */
0384:            public void setStateTimestampDrop(boolean isTimestampDrop) {
0385:                this .stateTimestampDrop = isTimestampDrop;
0386:            }
0387:
0388:            /**
0389:             * Return the maximum number of active Sessions allowed, or -1 for no limit.
0390:             */
0391:            public int getMaxActiveSessions() {
0392:                return (this .maxActiveSessions);
0393:            }
0394:
0395:            /**
0396:             * Set the maximum number of actives Sessions allowed, or -1 for no limit.
0397:             * 
0398:             * @param max
0399:             *            The new maximum number of sessions
0400:             */
0401:            public void setMaxActiveSessions(int max) {
0402:                int oldMaxActiveSessions = this .maxActiveSessions;
0403:                this .maxActiveSessions = max;
0404:                support.firePropertyChange("maxActiveSessions", new Integer(
0405:                        oldMaxActiveSessions), new Integer(
0406:                        this .maxActiveSessions));
0407:            }
0408:
0409:            /**
0410:             * 
0411:             * @return Returns the sendAllSessions.
0412:             */
0413:            public boolean isSendAllSessions() {
0414:                return sendAllSessions;
0415:            }
0416:
0417:            /**
0418:             * @param sendAllSessions The sendAllSessions to set.
0419:             */
0420:            public void setSendAllSessions(boolean sendAllSessions) {
0421:                this .sendAllSessions = sendAllSessions;
0422:            }
0423:
0424:            /**
0425:             * @return Returns the sendAllSessionsSize.
0426:             */
0427:            public int getSendAllSessionsSize() {
0428:                return sendAllSessionsSize;
0429:            }
0430:
0431:            /**
0432:             * @param sendAllSessionsSize The sendAllSessionsSize to set.
0433:             */
0434:            public void setSendAllSessionsSize(int sendAllSessionsSize) {
0435:                this .sendAllSessionsSize = sendAllSessionsSize;
0436:            }
0437:
0438:            /**
0439:             * @return Returns the notifySessionListenersOnReplication.
0440:             */
0441:            public boolean isNotifySessionListenersOnReplication() {
0442:                return notifySessionListenersOnReplication;
0443:            }
0444:
0445:            /**
0446:             * @param notifyListenersCreateSessionOnReplication The notifySessionListenersOnReplication to set.
0447:             */
0448:            public void setNotifySessionListenersOnReplication(
0449:                    boolean notifyListenersCreateSessionOnReplication) {
0450:                this .notifySessionListenersOnReplication = notifyListenersCreateSessionOnReplication;
0451:            }
0452:
0453:            public boolean isExpireSessionsOnShutdown() {
0454:                return expireSessionsOnShutdown;
0455:            }
0456:
0457:            public void setExpireSessionsOnShutdown(
0458:                    boolean expireSessionsOnShutdown) {
0459:                this .expireSessionsOnShutdown = expireSessionsOnShutdown;
0460:            }
0461:
0462:            public boolean isNotifyListenersOnReplication() {
0463:                return notifyListenersOnReplication;
0464:            }
0465:
0466:            public void setNotifyListenersOnReplication(
0467:                    boolean notifyListenersOnReplication) {
0468:                this .notifyListenersOnReplication = notifyListenersOnReplication;
0469:            }
0470:
0471:            /**
0472:             * @return Returns the defaultMode.
0473:             */
0474:            public boolean isDefaultMode() {
0475:                return defaultMode;
0476:            }
0477:
0478:            /**
0479:             * @param defaultMode The defaultMode to set.
0480:             */
0481:            public void setDefaultMode(boolean defaultMode) {
0482:                this .defaultMode = defaultMode;
0483:            }
0484:
0485:            public CatalinaCluster getCluster() {
0486:                return cluster;
0487:            }
0488:
0489:            public void setCluster(CatalinaCluster cluster) {
0490:                this .cluster = cluster;
0491:            }
0492:
0493:            /**
0494:             * Set the Container with which this Manager has been associated. If it is a
0495:             * Context (the usual case), listen for changes to the session timeout
0496:             * property.
0497:             * 
0498:             * @param container
0499:             *            The associated Container
0500:             */
0501:            public void setContainer(Container container) {
0502:                // De-register from the old Container (if any)
0503:                if ((this .container != null)
0504:                        && (this .container instanceof  Context))
0505:                    ((Context) this .container)
0506:                            .removePropertyChangeListener(this );
0507:
0508:                // Default processing provided by our superclass
0509:                super .setContainer(container);
0510:
0511:                // Register with the new Container (if any)
0512:                if ((this .container != null)
0513:                        && (this .container instanceof  Context)) {
0514:                    setMaxInactiveInterval(((Context) this .container)
0515:                            .getSessionTimeout() * 60);
0516:                    ((Context) this .container).addPropertyChangeListener(this );
0517:                }
0518:
0519:            }
0520:
0521:            // --------------------------------------------------------- Public Methods
0522:
0523:            /**
0524:             * Construct and return a new session object, based on the default settings
0525:             * specified by this Manager's properties. The session id will be assigned
0526:             * by this method, and available via the getId() method of the returned
0527:             * session. If a new session cannot be created for any reason, return
0528:             * <code>null</code>.
0529:             * 
0530:             * @exception IllegalStateException
0531:             *                if a new session cannot be instantiated for any reason
0532:             * 
0533:             * Construct and return a new session object, based on the default settings
0534:             * specified by this Manager's properties. The session id will be assigned
0535:             * by this method, and available via the getId() method of the returned
0536:             * session. If a new session cannot be created for any reason, return
0537:             * <code>null</code>.
0538:             * 
0539:             * @exception IllegalStateException
0540:             *                if a new session cannot be instantiated for any reason
0541:             */
0542:            public Session createSession(String sessionId) {
0543:                return createSession(sessionId, true);
0544:            }
0545:
0546:            /**
0547:             * create new session with check maxActiveSessions and send session creation
0548:             * to other cluster nodes.
0549:             * 
0550:             * @param distribute
0551:             * @return The session
0552:             */
0553:            public Session createSession(String sessionId, boolean distribute) {
0554:                if ((maxActiveSessions >= 0)
0555:                        && (sessions.size() >= maxActiveSessions)) {
0556:                    rejectedSessions++;
0557:                    throw new IllegalStateException(sm
0558:                            .getString("deltaManager.createSession.ise"));
0559:                }
0560:                DeltaSession session = (DeltaSession) super 
0561:                        .createSession(sessionId);
0562:                if (distribute) {
0563:                    sendCreateSession(session.getId(), session);
0564:                }
0565:                if (log.isDebugEnabled())
0566:                    log.debug(sm.getString(
0567:                            "deltaManager.createSession.newSession", session
0568:                                    .getId(), new Integer(sessions.size())));
0569:                return (session);
0570:
0571:            }
0572:
0573:            /**
0574:             * Send create session evt to all backup node
0575:             * @param sessionId
0576:             * @param session
0577:             */
0578:            protected void sendCreateSession(String sessionId,
0579:                    DeltaSession session) {
0580:                if (cluster.getMembers().length > 0) {
0581:                    SessionMessage msg = new SessionMessageImpl(getName(),
0582:                            SessionMessage.EVT_SESSION_CREATED, null,
0583:                            sessionId, sessionId + "-"
0584:                                    + System.currentTimeMillis());
0585:                    if (log.isDebugEnabled())
0586:                        log.debug(sm.getString(
0587:                                "deltaManager.sendMessage.newSession", name,
0588:                                sessionId));
0589:                    msg.setTimestamp(session.getCreationTime());
0590:                    counterSend_EVT_SESSION_CREATED++;
0591:                    send(msg);
0592:                }
0593:            }
0594:
0595:            /**
0596:             * Send messages to other backup member (domain or all)
0597:             * @param msg Session message
0598:             */
0599:            protected void send(SessionMessage msg) {
0600:                if (cluster != null) {
0601:                    if (doDomainReplication())
0602:                        cluster.sendClusterDomain(msg);
0603:                    else
0604:                        cluster.send(msg);
0605:                }
0606:            }
0607:
0608:            /**
0609:             * Create DeltaSession
0610:             * @see org.apache.catalina.Manager#createEmptySession()
0611:             */
0612:            public Session createEmptySession() {
0613:                return getNewDeltaSession();
0614:            }
0615:
0616:            /**
0617:             * Get new session class to be used in the doLoad() method.
0618:             */
0619:            protected DeltaSession getNewDeltaSession() {
0620:                return new DeltaSession(this );
0621:            }
0622:
0623:            /**
0624:             * Load Deltarequest from external node
0625:             * Load the Class at container classloader
0626:             * @see DeltaRequest#readExternal(java.io.ObjectInput)
0627:             * @param session
0628:             * @param data message data
0629:             * @return The request
0630:             * @throws ClassNotFoundException
0631:             * @throws IOException
0632:             */
0633:            protected DeltaRequest deserializeDeltaRequest(
0634:                    DeltaSession session, byte[] data)
0635:                    throws ClassNotFoundException, IOException {
0636:                ReplicationStream ois = getReplicationStream(data);
0637:                session.getDeltaRequest().readExternal(ois);
0638:                ois.close();
0639:                return session.getDeltaRequest();
0640:            }
0641:
0642:            /**
0643:             * serialize DeltaRequest
0644:             * @see DeltaRequest#writeExternal(java.io.ObjectOutput)
0645:             * 
0646:             * @param deltaRequest
0647:             * @return serialized delta request
0648:             * @throws IOException
0649:             */
0650:            protected byte[] serializeDeltaRequest(DeltaRequest deltaRequest)
0651:                    throws IOException {
0652:                return deltaRequest.serialize();
0653:            }
0654:
0655:            /**
0656:             * Load sessions from other cluster node.
0657:             * FIXME replace currently sessions with same id without notifcation.
0658:             * FIXME SSO handling is not really correct with the session replacement!
0659:             * @exception ClassNotFoundException
0660:             *                if a serialized class cannot be found during the reload
0661:             * @exception IOException
0662:             *                if an input/output error occurs
0663:             */
0664:            protected void deserializeSessions(byte[] data)
0665:                    throws ClassNotFoundException, IOException {
0666:
0667:                // Initialize our internal data structures
0668:                //sessions.clear(); //should not do this
0669:                // Open an input stream to the specified pathname, if any
0670:                ClassLoader originalLoader = Thread.currentThread()
0671:                        .getContextClassLoader();
0672:                ObjectInputStream ois = null;
0673:                // Load the previously unloaded active sessions
0674:                try {
0675:                    ois = getReplicationStream(data);
0676:                    Integer count = (Integer) ois.readObject();
0677:                    int n = count.intValue();
0678:                    for (int i = 0; i < n; i++) {
0679:                        DeltaSession session = (DeltaSession) createEmptySession();
0680:                        session.readObjectData(ois);
0681:                        session.setManager(this );
0682:                        session.setValid(true);
0683:                        session.setPrimarySession(false);
0684:                        //in case the nodes in the cluster are out of
0685:                        //time synch, this will make sure that we have the
0686:                        //correct timestamp, isValid returns true, cause
0687:                        // accessCount=1
0688:                        session.access();
0689:                        //make sure that the session gets ready to expire if
0690:                        // needed
0691:                        session.setAccessCount(0);
0692:                        session.resetDeltaRequest();
0693:                        // FIXME How inform other session id cache like SingleSignOn
0694:                        // increment sessionCounter to correct stats report
0695:                        if (findSession(session.getIdInternal()) == null) {
0696:                            sessionCounter++;
0697:                        } else {
0698:                            sessionReplaceCounter++;
0699:                            // FIXME better is to grap this sessions again !
0700:                            if (log.isWarnEnabled())
0701:                                log
0702:                                        .warn(sm
0703:                                                .getString(
0704:                                                        "deltaManager.loading.existing.session",
0705:                                                        session.getIdInternal()));
0706:                        }
0707:                        add(session);
0708:                    }
0709:                } catch (ClassNotFoundException e) {
0710:                    log.error(sm.getString("deltaManager.loading.cnfe", e), e);
0711:                    throw e;
0712:                } catch (IOException e) {
0713:                    log.error(sm.getString("deltaManager.loading.ioe", e), e);
0714:                    throw e;
0715:                } finally {
0716:                    // Close the input stream
0717:                    try {
0718:                        if (ois != null)
0719:                            ois.close();
0720:                    } catch (IOException f) {
0721:                        // ignored
0722:                    }
0723:                    ois = null;
0724:                    if (originalLoader != null)
0725:                        Thread.currentThread().setContextClassLoader(
0726:                                originalLoader);
0727:                }
0728:
0729:            }
0730:
0731:            /**
0732:             * Save any currently active sessions in the appropriate persistence
0733:             * mechanism, if any. If persistence is not supported, this method returns
0734:             * without doing anything.
0735:             * 
0736:             * @exception IOException
0737:             *                if an input/output error occurs
0738:             */
0739:            protected byte[] serializeSessions(Session[] currentSessions)
0740:                    throws IOException {
0741:
0742:                // Open an output stream to the specified pathname, if any
0743:                ByteArrayOutputStream fos = null;
0744:                ObjectOutputStream oos = null;
0745:
0746:                try {
0747:                    fos = new ByteArrayOutputStream();
0748:                    oos = new ObjectOutputStream(new BufferedOutputStream(fos));
0749:                    oos.writeObject(new Integer(currentSessions.length));
0750:                    for (int i = 0; i < currentSessions.length; i++) {
0751:                        ((DeltaSession) currentSessions[i])
0752:                                .writeObjectData(oos);
0753:                    }
0754:                    // Flush and close the output stream
0755:                    oos.flush();
0756:                } catch (IOException e) {
0757:                    log.error(sm.getString("deltaManager.unloading.ioe", e), e);
0758:                    throw e;
0759:                } finally {
0760:                    if (oos != null) {
0761:                        try {
0762:                            oos.close();
0763:                        } catch (IOException f) {
0764:                            ;
0765:                        }
0766:                        oos = null;
0767:                    }
0768:                }
0769:                // send object data as byte[]
0770:                return fos.toByteArray();
0771:            }
0772:
0773:            // ------------------------------------------------------ Lifecycle Methods
0774:
0775:            /**
0776:             * Add a lifecycle event listener to this component.
0777:             * 
0778:             * @param listener
0779:             *            The listener to add
0780:             */
0781:            public void addLifecycleListener(LifecycleListener listener) {
0782:                lifecycle.addLifecycleListener(listener);
0783:            }
0784:
0785:            /**
0786:             * Get the lifecycle listeners associated with this lifecycle. If this
0787:             * Lifecycle has no listeners registered, a zero-length array is returned.
0788:             */
0789:            public LifecycleListener[] findLifecycleListeners() {
0790:                return lifecycle.findLifecycleListeners();
0791:            }
0792:
0793:            /**
0794:             * Remove a lifecycle event listener from this component.
0795:             * 
0796:             * @param listener
0797:             *            The listener to remove
0798:             */
0799:            public void removeLifecycleListener(LifecycleListener listener) {
0800:                lifecycle.removeLifecycleListener(listener);
0801:            }
0802:
0803:            /**
0804:             * Prepare for the beginning of active use of the public methods of this
0805:             * component. This method should be called after <code>configure()</code>,
0806:             * and before any of the public methods of the component are utilized.
0807:             * 
0808:             * @exception LifecycleException
0809:             *                if this component detects a fatal error that prevents this
0810:             *                component from being used
0811:             */
0812:            public void start() throws LifecycleException {
0813:                if (!initialized)
0814:                    init();
0815:
0816:                // Validate and update our current component state
0817:                if (started) {
0818:                    return;
0819:                }
0820:                started = true;
0821:                lifecycle.fireLifecycleEvent(START_EVENT, null);
0822:
0823:                // Force initialization of the random number generator
0824:                generateSessionId();
0825:
0826:                // Load unloaded sessions, if any
0827:                try {
0828:                    //the channel is already running
0829:                    Cluster cluster = getCluster();
0830:                    // stop remove cluster binding
0831:                    //wow, how many nested levels of if statements can we have ;)
0832:                    if (cluster == null) {
0833:                        Container context = getContainer();
0834:                        if (context != null && context instanceof  Context) {
0835:                            Container host = context.getParent();
0836:                            if (host != null && host instanceof  Host) {
0837:                                cluster = host.getCluster();
0838:                                if (cluster != null
0839:                                        && cluster instanceof  CatalinaCluster) {
0840:                                    setCluster((CatalinaCluster) cluster);
0841:                                } else {
0842:                                    Container engine = host.getParent();
0843:                                    if (engine != null
0844:                                            && engine instanceof  Engine) {
0845:                                        cluster = engine.getCluster();
0846:                                        if (cluster != null
0847:                                                && cluster instanceof  CatalinaCluster) {
0848:                                            setCluster((CatalinaCluster) cluster);
0849:                                        }
0850:                                    } else {
0851:                                        cluster = null;
0852:                                    }
0853:                                }
0854:                            }
0855:                        }
0856:                    }
0857:                    if (cluster == null) {
0858:                        log.error(sm.getString("deltaManager.noCluster",
0859:                                getName()));
0860:                        return;
0861:                    } else {
0862:                        if (log.isInfoEnabled()) {
0863:                            String type = "unknown";
0864:                            if (cluster.getContainer() instanceof  Host) {
0865:                                type = "Host";
0866:                            } else if (cluster.getContainer() instanceof  Engine) {
0867:                                type = "Engine";
0868:                            }
0869:                            log.info(sm.getString(
0870:                                    "deltaManager.registerCluster", getName(),
0871:                                    type, cluster.getClusterName()));
0872:                        }
0873:                    }
0874:                    if (log.isInfoEnabled())
0875:                        log.info(sm.getString("deltaManager.startClustering",
0876:                                getName()));
0877:                    //to survice context reloads, as only a stop/start is called, not
0878:                    // createManager
0879:                    cluster.registerManager(this );
0880:
0881:                    getAllClusterSessions();
0882:
0883:                } catch (Throwable t) {
0884:                    log.error(sm.getString("deltaManager.managerLoad"), t);
0885:                }
0886:            }
0887:
0888:            /**
0889:             * get from first session master the backup from all clustered sessions
0890:             * @see #findSessionMasterMember()
0891:             */
0892:            public synchronized void getAllClusterSessions() {
0893:                if (cluster != null && cluster.getMembers().length > 0) {
0894:                    long beforeSendTime = System.currentTimeMillis();
0895:                    Member mbr = findSessionMasterMember();
0896:                    if (mbr == null) { // No domain member found
0897:                        return;
0898:                    }
0899:                    SessionMessage msg = new SessionMessageImpl(this .getName(),
0900:                            SessionMessage.EVT_GET_ALL_SESSIONS, null,
0901:                            "GET-ALL", "GET-ALL-" + getName());
0902:                    // set reference time
0903:                    stateTransferCreateSendTime = beforeSendTime;
0904:                    // request session state
0905:                    counterSend_EVT_GET_ALL_SESSIONS++;
0906:                    stateTransfered = false;
0907:                    // FIXME This send call block the deploy thread, when sender waitForAck is enabled
0908:                    try {
0909:                        synchronized (receivedMessageQueue) {
0910:                            receiverQueue = true;
0911:                        }
0912:                        cluster.send(msg, mbr);
0913:                        if (log.isWarnEnabled())
0914:                            log.warn(sm.getString(
0915:                                    "deltaManager.waitForSessionState",
0916:                                    getName(), mbr));
0917:                        // FIXME At sender ack mode this method check only the state transfer and resend is a problem!
0918:                        waitForSendAllSessions(beforeSendTime);
0919:                    } finally {
0920:                        synchronized (receivedMessageQueue) {
0921:                            for (Iterator iter = receivedMessageQueue
0922:                                    .iterator(); iter.hasNext();) {
0923:                                SessionMessage smsg = (SessionMessage) iter
0924:                                        .next();
0925:                                if (!stateTimestampDrop) {
0926:                                    messageReceived(
0927:                                            smsg,
0928:                                            smsg.getAddress() != null ? (Member) smsg
0929:                                                    .getAddress()
0930:                                                    : null);
0931:                                } else {
0932:                                    if (smsg.getEventType() != SessionMessage.EVT_GET_ALL_SESSIONS
0933:                                            && smsg.getTimestamp() >= stateTransferCreateSendTime) {
0934:                                        // FIXME handle EVT_GET_ALL_SESSIONS later
0935:                                        messageReceived(
0936:                                                smsg,
0937:                                                smsg.getAddress() != null ? (Member) smsg
0938:                                                        .getAddress()
0939:                                                        : null);
0940:                                    } else {
0941:                                        if (log.isWarnEnabled()) {
0942:                                            log
0943:                                                    .warn(sm
0944:                                                            .getString(
0945:                                                                    "deltaManager.dropMessage",
0946:                                                                    getName(),
0947:                                                                    smsg
0948:                                                                            .getEventTypeString(),
0949:                                                                    new Date(
0950:                                                                            stateTransferCreateSendTime),
0951:                                                                    new Date(
0952:                                                                            smsg
0953:                                                                                    .getTimestamp())));
0954:                                        }
0955:                                    }
0956:                                }
0957:                            }
0958:                            receivedMessageQueue.clear();
0959:                            receiverQueue = false;
0960:                        }
0961:                    }
0962:                } else {
0963:                    if (log.isInfoEnabled())
0964:                        log.info(sm.getString("deltaManager.noMembers",
0965:                                getName()));
0966:                }
0967:            }
0968:
0969:            /**
0970:             * Register cross context session at replication valve thread local
0971:             * @param session cross context session
0972:             */
0973:            protected void registerSessionAtReplicationValve(
0974:                    DeltaSession session) {
0975:                if (replicationValve == null) {
0976:                    if (container instanceof  StandardContext
0977:                            && ((StandardContext) container).getCrossContext()) {
0978:                        Cluster cluster = getCluster();
0979:                        if (cluster != null
0980:                                && cluster instanceof  CatalinaCluster) {
0981:                            Valve[] valves = ((CatalinaCluster) cluster)
0982:                                    .getValves();
0983:                            if (valves != null && valves.length > 0) {
0984:                                for (int i = 0; replicationValve == null
0985:                                        && i < valves.length; i++) {
0986:                                    if (valves[i] instanceof  ReplicationValve)
0987:                                        replicationValve = (ReplicationValve) valves[i];
0988:                                }//for
0989:
0990:                                if (replicationValve == null
0991:                                        && log.isDebugEnabled()) {
0992:                                    log
0993:                                            .debug("no ReplicationValve found for CrossContext Support");
0994:                                }//endif 
0995:                            }//end if
0996:                        }//endif
0997:                    }//end if
0998:                }//end if
0999:                if (replicationValve != null) {
1000:                    replicationValve.registerReplicationSession(session);
1001:                }
1002:            }
1003:
1004:            /**
1005:             * Find the master of the session state
1006:             * @return master member of sessions 
1007:             */
1008:            protected Member findSessionMasterMember() {
1009:                Member mbr = null;
1010:                Member mbrs[] = cluster.getMembers();
1011:                if (mbrs.length != 0)
1012:                    mbr = mbrs[0];
1013:                if (mbr == null && log.isWarnEnabled())
1014:                    log.warn(sm.getString("deltaManager.noMasterMember",
1015:                            getName(), ""));
1016:                if (mbr != null && log.isDebugEnabled())
1017:                    log.warn(sm.getString("deltaManager.foundMasterMember",
1018:                            getName(), mbr));
1019:                return mbr;
1020:            }
1021:
1022:            /**
1023:             * Wait that cluster session state is transfer or timeout after 60 Sec
1024:             * With stateTransferTimeout == -1 wait that backup is transfered (forever mode)
1025:             */
1026:            protected void waitForSendAllSessions(long beforeSendTime) {
1027:                long reqStart = System.currentTimeMillis();
1028:                long reqNow = reqStart;
1029:                boolean isTimeout = false;
1030:                if (getStateTransferTimeout() > 0) {
1031:                    // wait that state is transfered with timeout check
1032:                    do {
1033:                        try {
1034:                            Thread.sleep(100);
1035:                        } catch (Exception sleep) {
1036:                            //
1037:                        }
1038:                        reqNow = System.currentTimeMillis();
1039:                        isTimeout = ((reqNow - reqStart) > (1000 * getStateTransferTimeout()));
1040:                    } while ((!getStateTransfered()) && (!isTimeout));
1041:                } else {
1042:                    if (getStateTransferTimeout() == -1) {
1043:                        // wait that state is transfered
1044:                        do {
1045:                            try {
1046:                                Thread.sleep(100);
1047:                            } catch (Exception sleep) {
1048:                            }
1049:                        } while ((!getStateTransfered()));
1050:                        reqNow = System.currentTimeMillis();
1051:                    }
1052:                }
1053:                if (isTimeout || (!getStateTransfered())) {
1054:                    counterNoStateTransfered++;
1055:                    log.error(sm.getString("deltaManager.noSessionState",
1056:                            getName(), new Date(beforeSendTime), new Long(
1057:                                    reqNow - beforeSendTime)));
1058:                } else {
1059:                    if (log.isInfoEnabled())
1060:                        log.info(sm.getString("deltaManager.sessionReceived",
1061:                                getName(), new Date(beforeSendTime), new Long(
1062:                                        reqNow - beforeSendTime)));
1063:                }
1064:            }
1065:
1066:            /**
1067:             * Gracefully terminate the active use of the public methods of this
1068:             * component. This method should be the last one called on a given instance
1069:             * of this component.
1070:             * 
1071:             * @exception LifecycleException
1072:             *                if this component detects a fatal error that needs to be
1073:             *                reported
1074:             */
1075:            public void stop() throws LifecycleException {
1076:
1077:                if (log.isDebugEnabled())
1078:                    log.debug(sm.getString("deltaManager.stopped", getName()));
1079:
1080:                // Validate and update our current component state
1081:                if (!started)
1082:                    throw new LifecycleException(sm
1083:                            .getString("deltaManager.notStarted"));
1084:                lifecycle.fireLifecycleEvent(STOP_EVENT, null);
1085:                started = false;
1086:
1087:                // Expire all active sessions
1088:                if (log.isInfoEnabled())
1089:                    log.info(sm.getString("deltaManager.expireSessions",
1090:                            getName()));
1091:                Session sessions[] = findSessions();
1092:                for (int i = 0; i < sessions.length; i++) {
1093:                    DeltaSession session = (DeltaSession) sessions[i];
1094:                    if (!session.isValid())
1095:                        continue;
1096:                    try {
1097:                        session.expire(true, isExpireSessionsOnShutdown());
1098:                    } catch (Throwable ignore) {
1099:                        ;
1100:                    }
1101:                }
1102:
1103:                // Require a new random number generator if we are restarted
1104:                this .random = null;
1105:                getCluster().removeManager(this );
1106:                replicationValve = null;
1107:                if (initialized) {
1108:                    destroy();
1109:                }
1110:            }
1111:
1112:            // ----------------------------------------- PropertyChangeListener Methods
1113:
1114:            /**
1115:             * Process property change events from our associated Context.
1116:             * 
1117:             * @param event
1118:             *            The property change event that has occurred
1119:             */
1120:            public void propertyChange(PropertyChangeEvent event) {
1121:
1122:                // Validate the source of this event
1123:                if (!(event.getSource() instanceof  Context))
1124:                    return;
1125:                // Process a relevant property change
1126:                if (event.getPropertyName().equals("sessionTimeout")) {
1127:                    try {
1128:                        setMaxInactiveInterval(((Integer) event.getNewValue())
1129:                                .intValue() * 60);
1130:                    } catch (NumberFormatException e) {
1131:                        log.error(sm.getString("deltaManager.sessionTimeout",
1132:                                event.getNewValue()));
1133:                    }
1134:                }
1135:
1136:            }
1137:
1138:            // -------------------------------------------------------- Replication
1139:            // Methods
1140:
1141:            /**
1142:             * A message was received from another node, this is the callback method to
1143:             * implement if you are interested in receiving replication messages.
1144:             * 
1145:             * @param cmsg -
1146:             *            the message received.
1147:             */
1148:            public void messageDataReceived(ClusterMessage cmsg) {
1149:                if (cmsg != null && cmsg instanceof  SessionMessage) {
1150:                    SessionMessage msg = (SessionMessage) cmsg;
1151:                    switch (msg.getEventType()) {
1152:                    case SessionMessage.EVT_GET_ALL_SESSIONS:
1153:                    case SessionMessage.EVT_SESSION_CREATED:
1154:                    case SessionMessage.EVT_SESSION_EXPIRED:
1155:                    case SessionMessage.EVT_SESSION_ACCESSED:
1156:                    case SessionMessage.EVT_SESSION_DELTA: {
1157:                        synchronized (receivedMessageQueue) {
1158:                            if (receiverQueue) {
1159:                                receivedMessageQueue.add(msg);
1160:                                return;
1161:                            }
1162:                        }
1163:                        break;
1164:                    }
1165:                    default: {
1166:                        //we didn't queue, do nothing
1167:                        break;
1168:                    }
1169:                    } //switch
1170:
1171:                    messageReceived(msg,
1172:                            msg.getAddress() != null ? (Member) msg
1173:                                    .getAddress() : null);
1174:                }
1175:            }
1176:
1177:            /**
1178:             * When the request has been completed, the replication valve will notify
1179:             * the manager, and the manager will decide whether any replication is
1180:             * needed or not. If there is a need for replication, the manager will
1181:             * create a session message and that will be replicated. The cluster
1182:             * determines where it gets sent.
1183:             * 
1184:             * @param sessionId -
1185:             *            the sessionId that just completed.
1186:             * @return a SessionMessage to be sent,
1187:             */
1188:            public ClusterMessage requestCompleted(String sessionId) {
1189:                try {
1190:                    DeltaSession session = (DeltaSession) findSession(sessionId);
1191:                    DeltaRequest deltaRequest = session.getDeltaRequest();
1192:                    SessionMessage msg = null;
1193:                    boolean isDeltaRequest = false;
1194:                    synchronized (deltaRequest) {
1195:                        isDeltaRequest = deltaRequest.getSize() > 0;
1196:                        if (isDeltaRequest) {
1197:                            counterSend_EVT_SESSION_DELTA++;
1198:                            byte[] data = serializeDeltaRequest(deltaRequest);
1199:                            msg = new SessionMessageImpl(getName(),
1200:                                    SessionMessage.EVT_SESSION_DELTA, data,
1201:                                    sessionId, sessionId + "-"
1202:                                            + System.currentTimeMillis());
1203:                            session.resetDeltaRequest();
1204:                        }
1205:                    }
1206:                    if (!isDeltaRequest) {
1207:                        if (!session.isPrimarySession()) {
1208:                            counterSend_EVT_SESSION_ACCESSED++;
1209:                            msg = new SessionMessageImpl(getName(),
1210:                                    SessionMessage.EVT_SESSION_ACCESSED, null,
1211:                                    sessionId, sessionId + "-"
1212:                                            + System.currentTimeMillis());
1213:                            if (log.isDebugEnabled()) {
1214:                                log
1215:                                        .debug(sm
1216:                                                .getString(
1217:                                                        "deltaManager.createMessage.accessChangePrimary",
1218:                                                        getName(), sessionId));
1219:                            }
1220:                        }
1221:                    } else { // log only outside synch block!
1222:                        if (log.isDebugEnabled()) {
1223:                            log.debug(sm.getString(
1224:                                    "deltaManager.createMessage.delta",
1225:                                    getName(), sessionId));
1226:                        }
1227:                    }
1228:                    session.setPrimarySession(true);
1229:                    //check to see if we need to send out an access message
1230:                    if ((msg == null)) {
1231:                        long replDelta = System.currentTimeMillis()
1232:                                - session.getLastTimeReplicated();
1233:                        if (replDelta > (getMaxInactiveInterval() * 1000)) {
1234:                            counterSend_EVT_SESSION_ACCESSED++;
1235:                            msg = new SessionMessageImpl(getName(),
1236:                                    SessionMessage.EVT_SESSION_ACCESSED, null,
1237:                                    sessionId, sessionId + "-"
1238:                                            + System.currentTimeMillis());
1239:                            if (log.isDebugEnabled()) {
1240:                                log.debug(sm.getString(
1241:                                        "deltaManager.createMessage.access",
1242:                                        getName(), sessionId));
1243:                            }
1244:                        }
1245:
1246:                    }
1247:
1248:                    //update last replicated time
1249:                    if (msg != null)
1250:                        session.setLastTimeReplicated(System
1251:                                .currentTimeMillis());
1252:                    return msg;
1253:                } catch (IOException x) {
1254:                    log
1255:                            .error(
1256:                                    sm
1257:                                            .getString(
1258:                                                    "deltaManager.createMessage.unableCreateDeltaRequest",
1259:                                                    sessionId), x);
1260:                    return null;
1261:                }
1262:
1263:            }
1264:
1265:            /**
1266:             * Reset manager statistics
1267:             */
1268:            public synchronized void resetStatistics() {
1269:                processingTime = 0;
1270:                expiredSessions = 0;
1271:                rejectedSessions = 0;
1272:                sessionReplaceCounter = 0;
1273:                counterNoStateTransfered = 0;
1274:                maxActive = getActiveSessions();
1275:                sessionCounter = getActiveSessions();
1276:                counterReceive_EVT_ALL_SESSION_DATA = 0;
1277:                counterReceive_EVT_GET_ALL_SESSIONS = 0;
1278:                counterReceive_EVT_SESSION_ACCESSED = 0;
1279:                counterReceive_EVT_SESSION_CREATED = 0;
1280:                counterReceive_EVT_SESSION_DELTA = 0;
1281:                counterReceive_EVT_SESSION_EXPIRED = 0;
1282:                counterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE = 0;
1283:                counterSend_EVT_ALL_SESSION_DATA = 0;
1284:                counterSend_EVT_GET_ALL_SESSIONS = 0;
1285:                counterSend_EVT_SESSION_ACCESSED = 0;
1286:                counterSend_EVT_SESSION_CREATED = 0;
1287:                counterSend_EVT_SESSION_DELTA = 0;
1288:                counterSend_EVT_SESSION_EXPIRED = 0;
1289:                counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE = 0;
1290:
1291:            }
1292:
1293:            //  -------------------------------------------------------- persistence handler
1294:
1295:            public void load() {
1296:
1297:            }
1298:
1299:            public void unload() {
1300:
1301:            }
1302:
1303:            //  -------------------------------------------------------- expire
1304:
1305:            /**
1306:             * send session expired to other cluster nodes
1307:             * 
1308:             * @param id
1309:             *            session id
1310:             */
1311:            protected void sessionExpired(String id) {
1312:                counterSend_EVT_SESSION_EXPIRED++;
1313:                SessionMessage msg = new SessionMessageImpl(getName(),
1314:                        SessionMessage.EVT_SESSION_EXPIRED, null, id, id
1315:                                + "-EXPIRED-MSG");
1316:                if (log.isDebugEnabled())
1317:                    log.debug(sm.getString("deltaManager.createMessage.expire",
1318:                            getName(), id));
1319:                send(msg);
1320:            }
1321:
1322:            /**
1323:             * Exipre all find sessions.
1324:             */
1325:            public void expireAllLocalSessions() {
1326:                long timeNow = System.currentTimeMillis();
1327:                Session sessions[] = findSessions();
1328:                int expireDirect = 0;
1329:                int expireIndirect = 0;
1330:
1331:                if (log.isDebugEnabled())
1332:                    log.debug("Start expire all sessions " + getName() + " at "
1333:                            + timeNow + " sessioncount " + sessions.length);
1334:                for (int i = 0; i < sessions.length; i++) {
1335:                    if (sessions[i] instanceof  DeltaSession) {
1336:                        DeltaSession session = (DeltaSession) sessions[i];
1337:                        if (session.isPrimarySession()) {
1338:                            if (session.isValid()) {
1339:                                session.expire();
1340:                                expireDirect++;
1341:                            } else {
1342:                                expireIndirect++;
1343:                            }//end if
1344:                        }//end if
1345:                    }//end if
1346:                }//for
1347:                long timeEnd = System.currentTimeMillis();
1348:                if (log.isDebugEnabled())
1349:                    log.debug("End expire sessions " + getName()
1350:                            + " exipre processingTime " + (timeEnd - timeNow)
1351:                            + " expired direct sessions: " + expireDirect
1352:                            + " expired direct sessions: " + expireIndirect);
1353:
1354:            }
1355:
1356:            /**
1357:             * When the manager expires session not tied to a request. The cluster will
1358:             * periodically ask for a list of sessions that should expire and that
1359:             * should be sent across the wire.
1360:             * 
1361:             * @return The invalidated sessions array
1362:             */
1363:            public String[] getInvalidatedSessions() {
1364:                return new String[0];
1365:            }
1366:
1367:            //  -------------------------------------------------------- message receive
1368:
1369:            /**
1370:             * Test that sender and local domain is the same
1371:             */
1372:            protected boolean checkSenderDomain(SessionMessage msg,
1373:                    Member sender) {
1374:                boolean sameDomain = true;
1375:                if (!sameDomain && log.isWarnEnabled()) {
1376:                    log.warn(sm.getString(
1377:                            "deltaManager.receiveMessage.fromWrongDomain",
1378:                            new Object[] { getName(), msg.getEventTypeString(),
1379:                                    sender, "", "" }));
1380:                }
1381:                return sameDomain;
1382:            }
1383:
1384:            /**
1385:             * This method is called by the received thread when a SessionMessage has
1386:             * been received from one of the other nodes in the cluster.
1387:             * 
1388:             * @param msg -
1389:             *            the message received
1390:             * @param sender -
1391:             *            the sender of the message, this is used if we receive a
1392:             *            EVT_GET_ALL_SESSION message, so that we only reply to the
1393:             *            requesting node
1394:             */
1395:            protected void messageReceived(SessionMessage msg, Member sender) {
1396:                if (doDomainReplication() && !checkSenderDomain(msg, sender)) {
1397:                    return;
1398:                }
1399:                ClassLoader contextLoader = Thread.currentThread()
1400:                        .getContextClassLoader();
1401:                try {
1402:
1403:                    ClassLoader[] loaders = getClassLoaders();
1404:                    if (loaders != null && loaders.length > 0)
1405:                        Thread.currentThread()
1406:                                .setContextClassLoader(loaders[0]);
1407:                    if (log.isDebugEnabled())
1408:                        log.debug(sm.getString(
1409:                                "deltaManager.receiveMessage.eventType",
1410:                                getName(), msg.getEventTypeString(), sender));
1411:
1412:                    switch (msg.getEventType()) {
1413:                    case SessionMessage.EVT_GET_ALL_SESSIONS: {
1414:                        handleGET_ALL_SESSIONS(msg, sender);
1415:                        break;
1416:                    }
1417:                    case SessionMessage.EVT_ALL_SESSION_DATA: {
1418:                        handleALL_SESSION_DATA(msg, sender);
1419:                        break;
1420:                    }
1421:                    case SessionMessage.EVT_ALL_SESSION_TRANSFERCOMPLETE: {
1422:                        handleALL_SESSION_TRANSFERCOMPLETE(msg, sender);
1423:                        break;
1424:                    }
1425:                    case SessionMessage.EVT_SESSION_CREATED: {
1426:                        handleSESSION_CREATED(msg, sender);
1427:                        break;
1428:                    }
1429:                    case SessionMessage.EVT_SESSION_EXPIRED: {
1430:                        handleSESSION_EXPIRED(msg, sender);
1431:                        break;
1432:                    }
1433:                    case SessionMessage.EVT_SESSION_ACCESSED: {
1434:                        handleSESSION_ACCESSED(msg, sender);
1435:                        break;
1436:                    }
1437:                    case SessionMessage.EVT_SESSION_DELTA: {
1438:                        handleSESSION_DELTA(msg, sender);
1439:                        break;
1440:                    }
1441:                    default: {
1442:                        //we didn't recognize the message type, do nothing
1443:                        break;
1444:                    }
1445:                    } //switch
1446:                } catch (Exception x) {
1447:                    log.error(sm.getString("deltaManager.receiveMessage.error",
1448:                            getName()), x);
1449:                } finally {
1450:                    Thread.currentThread().setContextClassLoader(contextLoader);
1451:                }
1452:            }
1453:
1454:            // -------------------------------------------------------- message receiver handler
1455:
1456:            /**
1457:             * handle receive session state is complete transfered
1458:             * @param msg
1459:             * @param sender
1460:             */
1461:            protected void handleALL_SESSION_TRANSFERCOMPLETE(
1462:                    SessionMessage msg, Member sender) {
1463:                counterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE++;
1464:                if (log.isDebugEnabled())
1465:                    log.debug(sm.getString(
1466:                            "deltaManager.receiveMessage.transfercomplete",
1467:                            getName(), sender.getHost(), new Integer(sender
1468:                                    .getPort())));
1469:                stateTransferCreateSendTime = msg.getTimestamp();
1470:                stateTransfered = true;
1471:            }
1472:
1473:            /**
1474:             * handle receive session delta
1475:             * @param msg
1476:             * @param sender
1477:             * @throws IOException
1478:             * @throws ClassNotFoundException
1479:             */
1480:            protected void handleSESSION_DELTA(SessionMessage msg, Member sender)
1481:                    throws IOException, ClassNotFoundException {
1482:                counterReceive_EVT_SESSION_DELTA++;
1483:                byte[] delta = msg.getSession();
1484:                DeltaSession session = (DeltaSession) findSession(msg
1485:                        .getSessionID());
1486:                if (session != null) {
1487:                    if (log.isDebugEnabled())
1488:                        log.debug(sm.getString(
1489:                                "deltaManager.receiveMessage.delta", getName(),
1490:                                msg.getSessionID()));
1491:                    DeltaRequest dreq = deserializeDeltaRequest(session, delta);
1492:                    dreq.execute(session, notifyListenersOnReplication);
1493:                    session.setPrimarySession(false);
1494:                }
1495:            }
1496:
1497:            /**
1498:             * handle receive session is access at other node ( primary session is now false)
1499:             * @param msg
1500:             * @param sender
1501:             * @throws IOException
1502:             */
1503:            protected void handleSESSION_ACCESSED(SessionMessage msg,
1504:                    Member sender) throws IOException {
1505:                counterReceive_EVT_SESSION_ACCESSED++;
1506:                DeltaSession session = (DeltaSession) findSession(msg
1507:                        .getSessionID());
1508:                if (session != null) {
1509:                    if (log.isDebugEnabled())
1510:                        log.debug(sm.getString(
1511:                                "deltaManager.receiveMessage.accessed",
1512:                                getName(), msg.getSessionID()));
1513:                    session.access();
1514:                    session.setPrimarySession(false);
1515:                    session.endAccess();
1516:                }
1517:            }
1518:
1519:            /**
1520:             * handle receive session is expire at other node ( expire session also here)
1521:             * @param msg
1522:             * @param sender
1523:             * @throws IOException
1524:             */
1525:            protected void handleSESSION_EXPIRED(SessionMessage msg,
1526:                    Member sender) throws IOException {
1527:                counterReceive_EVT_SESSION_EXPIRED++;
1528:                DeltaSession session = (DeltaSession) findSession(msg
1529:                        .getSessionID());
1530:                if (session != null) {
1531:                    if (log.isDebugEnabled())
1532:                        log.debug(sm.getString(
1533:                                "deltaManager.receiveMessage.expired",
1534:                                getName(), msg.getSessionID()));
1535:                    session.expire(notifySessionListenersOnReplication, false);
1536:                }
1537:            }
1538:
1539:            /**
1540:             * handle receive new session is created at other node (create backup - primary false)
1541:             * @param msg
1542:             * @param sender
1543:             */
1544:            protected void handleSESSION_CREATED(SessionMessage msg,
1545:                    Member sender) {
1546:                counterReceive_EVT_SESSION_CREATED++;
1547:                if (log.isDebugEnabled())
1548:                    log.debug(sm.getString(
1549:                            "deltaManager.receiveMessage.createNewSession",
1550:                            getName(), msg.getSessionID()));
1551:                DeltaSession session = (DeltaSession) createEmptySession();
1552:                session.setManager(this );
1553:                session.setValid(true);
1554:                session.setPrimarySession(false);
1555:                session.setCreationTime(msg.getTimestamp());
1556:                session.access();
1557:                if (notifySessionListenersOnReplication)
1558:                    session.setId(msg.getSessionID());
1559:                else
1560:                    session.setIdInternal(msg.getSessionID());
1561:                session.resetDeltaRequest();
1562:                session.endAccess();
1563:
1564:            }
1565:
1566:            /**
1567:             * handle receive sessions from other not ( restart )
1568:             * @param msg
1569:             * @param sender
1570:             * @throws ClassNotFoundException
1571:             * @throws IOException
1572:             */
1573:            protected void handleALL_SESSION_DATA(SessionMessage msg,
1574:                    Member sender) throws ClassNotFoundException, IOException {
1575:                counterReceive_EVT_ALL_SESSION_DATA++;
1576:                if (log.isDebugEnabled())
1577:                    log.debug(sm.getString(
1578:                            "deltaManager.receiveMessage.allSessionDataBegin",
1579:                            getName()));
1580:                byte[] data = msg.getSession();
1581:                deserializeSessions(data);
1582:                if (log.isDebugEnabled())
1583:                    log.debug(sm.getString(
1584:                            "deltaManager.receiveMessage.allSessionDataAfter",
1585:                            getName()));
1586:                //stateTransferred = true;
1587:            }
1588:
1589:            /**
1590:             * handle receive that other node want all sessions ( restart )
1591:             * a) send all sessions with one message
1592:             * b) send session at blocks
1593:             * After sending send state is complete transfered
1594:             * @param msg
1595:             * @param sender
1596:             * @throws IOException
1597:             */
1598:            protected void handleGET_ALL_SESSIONS(SessionMessage msg,
1599:                    Member sender) throws IOException {
1600:                counterReceive_EVT_GET_ALL_SESSIONS++;
1601:                //get a list of all the session from this manager
1602:                if (log.isDebugEnabled())
1603:                    log.debug(sm.getString(
1604:                            "deltaManager.receiveMessage.unloadingBegin",
1605:                            getName()));
1606:                // Write the number of active sessions, followed by the details
1607:                // get all sessions and serialize without sync
1608:                Session[] currentSessions = findSessions();
1609:                long findSessionTimestamp = System.currentTimeMillis();
1610:                if (isSendAllSessions()) {
1611:                    sendSessions(sender, currentSessions, findSessionTimestamp);
1612:                } else {
1613:                    // send session at blocks
1614:                    int len = currentSessions.length < getSendAllSessionsSize() ? currentSessions.length
1615:                            : getSendAllSessionsSize();
1616:                    Session[] sendSessions = new Session[len];
1617:                    for (int i = 0; i < currentSessions.length; i += getSendAllSessionsSize()) {
1618:                        len = i + getSendAllSessionsSize() > currentSessions.length ? currentSessions.length
1619:                                - i
1620:                                : getSendAllSessionsSize();
1621:                        System.arraycopy(currentSessions, i, sendSessions, 0,
1622:                                len);
1623:                        sendSessions(sender, sendSessions, findSessionTimestamp);
1624:                        if (getSendAllSessionsWaitTime() > 0) {
1625:                            try {
1626:                                Thread.sleep(getSendAllSessionsWaitTime());
1627:                            } catch (Exception sleep) {
1628:                            }
1629:                        }//end if
1630:                    }//for
1631:                }//end if
1632:
1633:                SessionMessage newmsg = new SessionMessageImpl(name,
1634:                        SessionMessage.EVT_ALL_SESSION_TRANSFERCOMPLETE, null,
1635:                        "SESSION-STATE-TRANSFERED", "SESSION-STATE-TRANSFERED"
1636:                                + getName());
1637:                newmsg.setTimestamp(findSessionTimestamp);
1638:                if (log.isDebugEnabled())
1639:                    log.debug(sm.getString(
1640:                            "deltaManager.createMessage.allSessionTransfered",
1641:                            getName()));
1642:                counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE++;
1643:                cluster.send(newmsg, sender);
1644:            }
1645:
1646:            /**
1647:             * send a block of session to sender
1648:             * @param sender
1649:             * @param currentSessions
1650:             * @param sendTimestamp
1651:             * @throws IOException
1652:             */
1653:            protected void sendSessions(Member sender,
1654:                    Session[] currentSessions, long sendTimestamp)
1655:                    throws IOException {
1656:                byte[] data = serializeSessions(currentSessions);
1657:                if (log.isDebugEnabled())
1658:                    log.debug(sm.getString(
1659:                            "deltaManager.receiveMessage.unloadingAfter",
1660:                            getName()));
1661:                SessionMessage newmsg = new SessionMessageImpl(name,
1662:                        SessionMessage.EVT_ALL_SESSION_DATA, data,
1663:                        "SESSION-STATE", "SESSION-STATE-" + getName());
1664:                newmsg.setTimestamp(sendTimestamp);
1665:                if (log.isDebugEnabled())
1666:                    log.debug(sm.getString(
1667:                            "deltaManager.createMessage.allSessionData",
1668:                            getName()));
1669:                counterSend_EVT_ALL_SESSION_DATA++;
1670:                cluster.send(newmsg, sender);
1671:            }
1672:
1673:            public ClusterManager cloneFromTemplate() {
1674:                DeltaManager result = new DeltaManager();
1675:                result.name = "Clone-from-" + name;
1676:                result.cluster = cluster;
1677:                result.replicationValve = replicationValve;
1678:                result.maxActiveSessions = maxActiveSessions;
1679:                result.expireSessionsOnShutdown = expireSessionsOnShutdown;
1680:                result.notifyListenersOnReplication = notifyListenersOnReplication;
1681:                result.notifySessionListenersOnReplication = notifySessionListenersOnReplication;
1682:                result.stateTransferTimeout = stateTransferTimeout;
1683:                result.sendAllSessions = sendAllSessions;
1684:                result.sendClusterDomainOnly = sendClusterDomainOnly;
1685:                result.sendAllSessionsSize = sendAllSessionsSize;
1686:                result.sendAllSessionsWaitTime = sendAllSessionsWaitTime;
1687:                result.receiverQueue = receiverQueue;
1688:                result.stateTimestampDrop = stateTimestampDrop;
1689:                result.stateTransferCreateSendTime = stateTransferCreateSendTime;
1690:                return result;
1691:            }
1692:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.