001: /*
002: * Copyright 1999,2004 The Apache Software Foundation.
003: *
004: * Licensed under the Apache License, Version 2.0 (the "License");
005: * you may not use this file except in compliance with the License.
006: * You may obtain a copy of the License at
007: *
008: * http://www.apache.org/licenses/LICENSE-2.0
009: *
010: * Unless required by applicable law or agreed to in writing, software
011: * distributed under the License is distributed on an "AS IS" BASIS,
012: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013: * See the License for the specific language governing permissions and
014: * limitations under the License.
015: */
016:
017: package org.apache.catalina.cluster.tcp;
018:
019: import java.beans.PropertyChangeSupport;
020: import java.net.InetAddress;
021: import java.net.MulticastSocket;
022: import java.net.UnknownHostException;
023: import java.io.IOException;
024: import java.util.HashMap;
025: import java.util.Vector;
026:
027: import org.apache.catalina.ServerFactory;
028: import org.apache.catalina.core.StandardServer;
029:
030: import org.apache.catalina.Container;
031: import org.apache.catalina.Lifecycle;
032: import org.apache.catalina.LifecycleEvent;
033: import org.apache.catalina.LifecycleException;
034: import org.apache.catalina.LifecycleListener;
035: import org.apache.catalina.Logger;
036: import org.apache.catalina.Manager;
037: import org.apache.catalina.util.LifecycleSupport;
038: import org.apache.catalina.util.StringManager;
039: import org.apache.catalina.Valve;
040: import org.apache.catalina.Host;
041:
042: import org.apache.catalina.cluster.Member;
043: import org.apache.catalina.cluster.CatalinaCluster;
044: import org.apache.catalina.cluster.MessageListener;
045: import org.apache.catalina.cluster.MembershipListener;
046: import org.apache.catalina.cluster.MembershipService;
047: import org.apache.commons.beanutils.MethodUtils;
048: import org.apache.catalina.cluster.tcp.ReplicationListener;
049: import org.apache.catalina.cluster.tcp.ReplicationTransmitter;
050: import org.apache.catalina.cluster.tcp.SocketSender;
051: import org.apache.catalina.cluster.io.ListenCallback;
052:
053: import org.apache.catalina.cluster.SessionMessage;
054: import org.apache.catalina.cluster.ClusterMessage;
055: import org.apache.catalina.cluster.session.ReplicationStream;
056: import org.apache.catalina.cluster.ClusterManager;
057: import org.apache.catalina.cluster.Constants;
058: import org.apache.catalina.cluster.ClusterReceiver;
059: import org.apache.catalina.cluster.ClusterSender;
060: import org.apache.catalina.cluster.ClusterDeployer;
061:
062: import org.apache.commons.logging.Log;
063:
064: import java.io.IOException;
065: import java.net.URL;
066:
067: /**
068: * A <b>Cluster</b> implementation using simple multicast.
069: * Responsible for setting
070: * up a cluster and provides callers with a valid multicast receiver/sender.
071: *
072: * @author Filip Hanik
073: * @author Remy Maucherat
074: * @version $Revision: 1.41 $, $Date: 2004/06/04 20:22:27 $
075: */
076:
077: public class SimpleTcpCluster implements CatalinaCluster, Lifecycle,
078: MembershipListener, ListenCallback, LifecycleListener {
079:
080: public static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory
081: .getLog(SimpleTcpCluster.class);
082:
083: // ----------------------------------------------------- Instance Variables
084:
085: /**
086: * Descriptive information about this component implementation.
087: */
088: protected static final String info = "SimpleTcpCluster/1.0";
089:
090: /**
091: * the service that provides the membership
092: */
093: protected MembershipService membershipService = null;
094:
095: /**
096: * Whether to expire sessions when shutting down
097: */
098: protected boolean expireSessionsOnShutdown = true;
099: /**
100: * Print debug to std.out?
101: */
102: protected boolean printToScreen = false;
103: /**
104: * Replicate only sessions that have been marked dirty
105: * false=replicate sessions after each request
106: */
107: protected boolean useDirtyFlag = false;
108:
109: /**
110: * Name for logging purpose
111: */
112: protected String clusterImpName = "SimpleTcpCluster";
113:
114: /**
115: * The string manager for this package.
116: */
117: protected StringManager sm = StringManager
118: .getManager(Constants.Package);
119:
120: /**
121: * The cluster name to join
122: */
123: protected String clusterName = null;
124:
125: /**
126: * The Container associated with this Cluster.
127: */
128: protected Container container = null;
129:
130: /**
131: * The lifecycle event support for this component.
132: */
133: protected LifecycleSupport lifecycle = new LifecycleSupport(this );
134:
135: /**
136: * Has this component been started?
137: */
138: protected boolean started = false;
139:
140: /**
141: * The property change support for this component.
142: */
143: protected PropertyChangeSupport support = new PropertyChangeSupport(
144: this );
145:
146: /**
147: * The debug level for this Container
148: */
149: protected int debug = 0;
150:
151: /**
152: * The context name <-> manager association for distributed contexts.
153: */
154: protected HashMap managers = new HashMap();
155:
156: /**
157: * Sending Stats
158: */
159: private long nrOfMsgsReceived = 0;
160: private long msgSendTime = 0;
161: private long lastChecked = System.currentTimeMillis();
162:
163: //sort members by alive time
164: protected MemberComparator memberComparator = new MemberComparator();
165:
166: private String managerClassName = "org.apache.catalina.cluster.session.DeltaManager";
167:
168: /**
169: * Sender to send data with
170: */
171: private org.apache.catalina.cluster.ClusterSender clusterSender;
172:
173: /**
174: * Receiver to register call back with
175: */
176: private org.apache.catalina.cluster.ClusterReceiver clusterReceiver;
177: private org.apache.catalina.Valve valve;
178: private org.apache.catalina.cluster.ClusterDeployer clusterDeployer;
179:
180: /**
181: * Listeners of messages
182: */
183: protected Vector clusterListeners = new Vector();
184:
185: // ------------------------------------------------------------- Properties
186:
187: public SimpleTcpCluster() {
188: }
189:
190: /**
191: * Return descriptive information about this Cluster implementation and
192: * the corresponding version number, in the format
193: * <code><description>/<version></code>.
194: */
195: public String getInfo() {
196: return (this .info);
197: }
198:
199: /**
200: * Set the debug level for this component
201: *
202: * @param debug The debug level
203: */
204: public void setDebug(int debug) {
205: this .debug = debug;
206: }
207:
208: /**
209: * Get the debug level for this component
210: *
211: * @return The debug level
212: */
213: public int getDebug() {
214: return (this .debug);
215: }
216:
217: /**
218: * Set the name of the cluster to join, if no cluster with
219: * this name is present create one.
220: *
221: * @param clusterName The clustername to join
222: */
223: public void setClusterName(String clusterName) {
224: this .clusterName = clusterName;
225: }
226:
227: /**
228: * Return the name of the cluster that this Server is currently
229: * configured to operate within.
230: *
231: * @return The name of the cluster associated with this server
232: */
233: public String getClusterName() {
234: return clusterName;
235: }
236:
237: /**
238: * Set the Container associated with our Cluster
239: *
240: * @param container The Container to use
241: */
242: public void setContainer(Container container) {
243: Container oldContainer = this .container;
244: this .container = container;
245: support.firePropertyChange("container", oldContainer,
246: this .container);
247: //this.container.
248: }
249:
250: /**
251: * Get the Container associated with our Cluster
252: *
253: * @return The Container associated with our Cluster
254: */
255: public Container getContainer() {
256: return (this .container);
257: }
258:
259: /**
260: * Sets the configurable protocol stack. This is a setting in server.xml
261: * where you can configure your protocol.
262: *
263: * @param protocol the protocol stack - this method is called by
264: * the server configuration at startup
265: * @see <a href="www.javagroups.com">JavaGroups</a> for details
266: */
267: public void setProtocol(String protocol) {
268: }
269:
270: /**
271: * Returns the protocol.
272: */
273: public String getProtocol() {
274: return null;
275: }
276:
277: public Member[] getMembers() {
278: Member[] members = membershipService.getMembers();
279: //sort by alive time
280: java.util.Arrays.sort(members, memberComparator);
281: return members;
282: }
283:
284: /**
285: * Return the member that represents this node.
286: * @return Member
287: */
288: public Member getLocalMember() {
289: return membershipService.getLocalMember();
290: }
291:
292: // --------------------------------------------------------- Public Methods
293:
294: public synchronized Manager createManager(String name) {
295: log.debug("Creating ClusterManager for context " + name
296: + " using class " + getManagerClassName());
297: System.out
298: .println("\n\n\n\nCreating ClusterManager for context "
299: + name + " using class "
300: + getManagerClassName() + "\n\n\n\n");
301: ClusterManager manager = null;
302: try {
303: manager = (ClusterManager) getClass().getClassLoader()
304: .loadClass(getManagerClassName()).newInstance();
305: } catch (Exception x) {
306: log
307: .error(
308: "Unable to load class for replication manager",
309: x);
310: manager = new org.apache.catalina.cluster.session.SimpleTcpReplicationManager();
311: }
312: manager.setName(name);
313: manager.setCluster(this );
314: manager.setDistributable(true);
315: manager.setExpireSessionsOnShutdown(expireSessionsOnShutdown);
316: manager.setUseDirtyFlag(useDirtyFlag);
317: managers.put(name, manager);
318:
319: return manager;
320: }
321:
322: public void removeManager(String name) {
323: managers.remove(name);
324: }
325:
326: public Manager getManager(String name) {
327: return (Manager) managers.get(name);
328: }
329:
330: // ------------------------------------------------------ Lifecycle Methods
331:
332: /**
333: * Add a lifecycle event listener to this component.
334: *
335: * @param listener The listener to add
336: */
337: public void addLifecycleListener(LifecycleListener listener) {
338: lifecycle.addLifecycleListener(listener);
339: }
340:
341: /**
342: * Get the lifecycle listeners associated with this lifecycle. If this
343: * Lifecycle has no listeners registered, a zero-length array is returned.
344: */
345: public LifecycleListener[] findLifecycleListeners() {
346:
347: return lifecycle.findLifecycleListeners();
348:
349: }
350:
351: /**
352: * Remove a lifecycle event listener from this component.
353: *
354: * @param listener The listener to remove
355: */
356: public void removeLifecycleListener(LifecycleListener listener) {
357: lifecycle.removeLifecycleListener(listener);
358: }
359:
360: /**
361: * Prepare for the beginning of active use of the public methods of this
362: * component. This method should be called after <code>configure()</code>,
363: * and before any of the public methods of the component are utilized.<BR>
364: * Starts the cluster communication channel, this will connect with the
365: * other nodes in the cluster, and request the current session state to
366: * be transferred to this node.
367: *
368: * @exception IllegalStateException if this component has already been
369: * started
370: * @exception LifecycleException if this component detects a fatal error
371: * that prevents this component from being used
372: */
373: public void start() throws LifecycleException {
374: if (started)
375: throw new LifecycleException(sm
376: .getString("cluster.alreadyStarted"));
377: log.info("Cluster is about to start");
378: try {
379: MethodUtils.invokeMethod(getContainer(), "addValve", valve);
380: clusterReceiver.setIsSenderSynchronized(clusterSender
381: .getIsSenderSynchronized());
382: clusterReceiver.setCatalinaCluster(this );
383: clusterReceiver.start();
384: clusterSender.start();
385: membershipService.setLocalMemberProperties(clusterReceiver
386: .getHost(), clusterReceiver.getPort());
387: membershipService.addMembershipListener(this );
388: membershipService.start();
389: //set the deployer.
390: try {
391: if (clusterDeployer != null) {
392: clusterDeployer.setCluster(this );
393: Object deployer = MethodUtils.invokeMethod(
394: getContainer(), "getDeployer",
395: new Object[0], new Class[0]);
396: clusterDeployer
397: .setDeployer((org.apache.catalina.Deployer) deployer);
398: clusterDeployer.start();
399: }
400: } catch (Throwable x) {
401: log
402: .fatal(
403: "Unable to retrieve the container deployer. Cluster deployment disabled.",
404: x);
405: } //catch
406: this .started = true;
407: } catch (Exception x) {
408: log.error("Unable to start cluster.", x);
409: throw new LifecycleException(x);
410: }
411: }
412:
413: public void send(ClusterMessage msg, Member dest) {
414: try {
415: msg.setAddress(membershipService.getLocalMember());
416: Member destination = dest;
417:
418: if (msg instanceof SessionMessage) {
419: SessionMessage smsg = (SessionMessage) msg;
420: //if we request session state, send to the oldest of members
421: if ((destination == null)
422: && (smsg.getEventType() == SessionMessage.EVT_GET_ALL_SESSIONS)
423: && (membershipService.getMembers().length > 0)) {
424: destination = membershipService.getMembers()[0];
425: }//end if
426: }//end if
427: msg.setTimestamp(System.currentTimeMillis());
428: java.io.ByteArrayOutputStream outs = new java.io.ByteArrayOutputStream();
429: java.io.ObjectOutputStream out = new java.io.ObjectOutputStream(
430: outs);
431: out.writeObject(msg);
432: byte[] data = outs.toByteArray();
433: if (destination != null) {
434: Member tcpdest = dest;
435: if ((tcpdest != null)
436: && (!membershipService.getLocalMember().equals(
437: tcpdest))) {
438: clusterSender.sendMessage(msg.getUniqueId(), data,
439: tcpdest);
440: }//end if
441: } else {
442: clusterSender.sendMessage(msg.getUniqueId(), data);
443: }
444: } catch (Exception x) {
445: log.error("Unable to send message through cluster sender.",
446: x);
447: }
448: }
449:
450: public void send(ClusterMessage msg) {
451: send(msg, null);
452: }
453:
454: /**
455: * Gracefully terminate the active use of the public methods of this
456: * component. This method should be the last one called on a given
457: * instance of this component.<BR>
458: * This will disconnect the cluster communication channel and stop
459: * the listener thread.
460: *
461: * @exception IllegalStateException if this component has not been started
462: * @exception LifecycleException if this component detects a fatal error
463: * that needs to be reported
464: */
465: public void stop() throws LifecycleException {
466:
467: if (!started)
468: throw new IllegalStateException(sm
469: .getString("cluster.notStarted"));
470:
471: membershipService.stop();
472: membershipService.removeMembershipListener();
473: try {
474: clusterSender.stop();
475: } catch (Exception x) {
476: log.error("Unable to stop cluster sender.", x);
477: }
478: try {
479: clusterReceiver.stop();
480: clusterReceiver.setCatalinaCluster(null);
481: } catch (Exception x) {
482: log.error("Unable to stop cluster receiver.", x);
483: }
484: if (clusterDeployer != null) {
485: clusterDeployer.stop();
486: }
487: started = false;
488: }
489:
490: public void memberAdded(Member member) {
491: try {
492: log.info("Replication member added:" + member);
493: clusterSender.add(member);
494: } catch (Exception x) {
495: log.error("Unable to connect to replication system.", x);
496: }
497:
498: }
499:
500: public void memberDisappeared(Member member) {
501: log.info("Received member disappeared:" + member);
502: try {
503: clusterSender.remove(member);
504: } catch (Exception x) {
505: log
506: .error(
507: "Unable remove cluster node from replication system.",
508: x);
509: }
510:
511: }
512:
513: public void setExpireSessionsOnShutdown(
514: boolean expireSessionsOnShutdown) {
515: this .expireSessionsOnShutdown = expireSessionsOnShutdown;
516: }
517:
518: public void setPrintToScreen(boolean printToScreen) {
519: this .printToScreen = printToScreen;
520: }
521:
522: public void setUseDirtyFlag(boolean useDirtyFlag) {
523: this .useDirtyFlag = useDirtyFlag;
524: }
525:
526: public void messageDataReceived(byte[] data) {
527: try {
528: ReplicationStream stream = new ReplicationStream(
529: new java.io.ByteArrayInputStream(data), getClass()
530: .getClassLoader());
531: Object myobj = stream.readObject();
532: if (myobj != null && myobj instanceof SessionMessage) {
533:
534: SessionMessage msg = (SessionMessage) myobj;
535: log
536: .debug("Assuming clocks are synched: Replication took="
537: + (System.currentTimeMillis() - msg
538: .getTimestamp()) + " ms.");
539: String ctxname = msg.getContextName();
540: //check if the message is a EVT_GET_ALL_SESSIONS,
541: //if so, wait until we are fully started up
542: if (ctxname == null) {
543: java.util.Iterator i = managers.keySet().iterator();
544: while (i.hasNext()) {
545: String key = (String) i.next();
546: ClusterManager mgr = (ClusterManager) managers
547: .get(key);
548: if (mgr != null)
549: mgr.messageDataReceived(msg);
550: else {
551: //this happens a lot before the system has started up
552: log.debug("Context manager doesn't exist:"
553: + key);
554: }
555: }//while
556: } else {
557: ClusterManager mgr = (ClusterManager) managers
558: .get(ctxname);
559: if (mgr != null)
560: mgr.messageDataReceived(msg);
561: else
562: log.warn("Context manager doesn't exist:"
563: + ctxname);
564: }//end if
565: } else {
566: //invoke all the listeners
567: for (int i = 0; i < clusterListeners.size(); i++) {
568: MessageListener listener = (MessageListener) clusterListeners
569: .elementAt(i);
570: if (myobj != null
571: && myobj instanceof ClusterMessage
572: && listener.accept((ClusterMessage) myobj)) {
573: listener
574: .messageReceived((ClusterMessage) myobj);
575: }//end if
576:
577: }//for
578: }//end if
579:
580: } catch (Exception x) {
581: log.error("Unable to deserialize session message.", x);
582: }
583: }
584:
585: public void lifecycleEvent(LifecycleEvent lifecycleEvent) {
586: log.debug("\nlifecycleEvent\n\nType" + lifecycleEvent.getType()
587: + "\nData" + lifecycleEvent.getData() + "\n\n\n");
588: }
589:
590: // --------------------------------------------------------- Cluster Wide Deployments
591: /**
592: * Start an existing web application, attached to the specified context
593: * path in all the other nodes in the cluster.
594: * Only starts a web application if it is not running.
595: *
596: * @param contextPath The context path of the application to be started
597: *
598: * @exception IllegalArgumentException if the specified context path
599: * is malformed (it must be "" or start with a slash)
600: * @exception IllegalArgumentException if the specified context path does
601: * not identify a currently installed web application
602: * @exception IOException if an input/output error occurs during
603: * startup
604: */
605: public void startContext(String contextPath) throws IOException {
606: return;
607: }
608:
609: /**
610: * Install a new web application, whose web application archive is at the
611: * specified URL, into this container with the specified context path.
612: * A context path of "" (the empty string) should be used for the root
613: * application for this container. Otherwise, the context path must
614: * start with a slash.
615: * <p>
616: * If this application is successfully installed, a ContainerEvent of type
617: * <code>PRE_INSTALL_EVENT</code> will be sent to registered listeners
618: * before the associated Context is started, and a ContainerEvent of type
619: * <code>INSTALL_EVENT</code> will be sent to all registered listeners
620: * after the associated Context is started, with the newly created
621: * <code>Context</code> as an argument.
622: *
623: * @param contextPath The context path to which this application should
624: * be installed (must be unique)
625: * @param war A URL of type "jar:" that points to a WAR file, or type
626: * "file:" that points to an unpacked directory structure containing
627: * the web application to be installed
628: *
629: * @exception IllegalArgumentException if the specified context path
630: * is malformed (it must be "" or start with a slash)
631: * @exception IllegalStateException if the specified context path
632: * is already attached to an existing web application
633: */
634: public void installContext(String contextPath, URL war) {
635: log.debug("\n\n\n\nCluster Install called for context:"
636: + contextPath + "\n\n\n\n");
637: }
638:
639: /**
640: * Stop an existing web application, attached to the specified context
641: * path. Only stops a web application if it is running.
642: *
643: * @param contextPath The context path of the application to be stopped
644: *
645: * @exception IllegalArgumentException if the specified context path
646: * is malformed (it must be "" or start with a slash)
647: * @exception IllegalArgumentException if the specified context path does
648: * not identify a currently installed web application
649: * @exception IOException if an input/output error occurs while stopping
650: * the web application
651: */
652: public void stop(String contextPath) throws IOException {
653: return;
654: }
655:
656: public Log getLogger() {
657: return log;
658: }
659:
660: // --------------------------------------------- Inner Class
661:
662: // --------------------------------------------- Performance
663:
664: private void perfMessageRecvd(long timeSent) {
665: nrOfMsgsReceived++;
666: msgSendTime += (System.currentTimeMillis() - timeSent);
667: if ((System.currentTimeMillis() - lastChecked) > 5000) {
668: log.debug("Calc msg send time total=" + msgSendTime
669: + "ms num request=" + nrOfMsgsReceived
670: + " average per msg="
671: + (msgSendTime / nrOfMsgsReceived) + "ms.");
672: }
673: }
674:
675: public String getManagerClassName() {
676: return managerClassName;
677: }
678:
679: public void setManagerClassName(String managerClassName) {
680: this .managerClassName = managerClassName;
681: }
682:
683: public org.apache.catalina.cluster.ClusterSender getClusterSender() {
684: return clusterSender;
685: }
686:
687: public void setClusterSender(
688: org.apache.catalina.cluster.ClusterSender clusterSender) {
689: this .clusterSender = clusterSender;
690: }
691:
692: public org.apache.catalina.cluster.ClusterReceiver getClusterReceiver() {
693: return clusterReceiver;
694: }
695:
696: public void setClusterReceiver(
697: org.apache.catalina.cluster.ClusterReceiver clusterReceiver) {
698: this .clusterReceiver = clusterReceiver;
699: }
700:
701: public MembershipService getMembershipService() {
702: return membershipService;
703: }
704:
705: public void setMembershipService(MembershipService membershipService) {
706: this .membershipService = membershipService;
707: }
708:
709: public void addValve(Valve valve) {
710: this .valve = valve;
711: }
712:
713: public void addClusterListener(MessageListener listener) {
714: if (!clusterListeners.contains(listener)) {
715: clusterListeners.addElement(listener);
716: }
717: }
718:
719: public void removeClusterListener(MessageListener listener) {
720: clusterListeners.removeElement(listener);
721: }
722:
723: public org.apache.catalina.cluster.ClusterDeployer getClusterDeployer() {
724: return clusterDeployer;
725: }
726:
727: public void setClusterDeployer(
728: org.apache.catalina.cluster.ClusterDeployer clusterDeployer) {
729: this .clusterDeployer = clusterDeployer;
730: }
731:
732: private class MemberComparator implements java.util.Comparator {
733:
734: public int compare(Object o1, Object o2) {
735: try {
736: return compare((Member) o1, (Member) o2);
737: } catch (ClassCastException x) {
738: return 0;
739: }
740: }
741:
742: public int compare(Member m1, Member m2) {
743: //longer alive time, means sort first
744: long result = m2.getMemberAliveTime()
745: - m1.getMemberAliveTime();
746: if (result < 0)
747: return -1;
748: else if (result == 0)
749: return 0;
750: else
751: return 1;
752: }
753: }
754:
755: }
|