001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017:
018: package org.apache.catalina.ha.tcp;
019:
020: import java.io.IOException;
021: import java.util.StringTokenizer;
022: import java.util.regex.Pattern;
023: import java.util.ArrayList;
024: import java.util.List;
025: import java.util.Iterator;
026: import javax.servlet.ServletException;
027:
028: import org.apache.catalina.Manager;
029: import org.apache.catalina.Session;
030: import org.apache.catalina.Context;
031: import org.apache.catalina.core.StandardContext;
032: import org.apache.catalina.ha.CatalinaCluster;
033: import org.apache.catalina.ha.ClusterManager;
034: import org.apache.catalina.ha.ClusterMessage;
035: import org.apache.catalina.ha.ClusterSession;
036: import org.apache.catalina.ha.ClusterValve;
037: import org.apache.catalina.ha.session.DeltaManager;
038: import org.apache.catalina.ha.session.DeltaSession;
039: import org.apache.catalina.connector.Request;
040: import org.apache.catalina.connector.Response;
041: import org.apache.catalina.util.StringManager;
042: import org.apache.catalina.valves.ValveBase;
043:
044: /**
045: * <p>Implementation of a Valve that logs interesting contents from the
046: * specified Request (before processing) and the corresponding Response
047: * (after processing). It is especially useful in debugging problems
048: * related to headers and cookies.</p>
049: *
050: * <p>This Valve may be attached to any Container, depending on the granularity
051: * of the logging you wish to perform.</p>
052: *
053: * <p>primaryIndicator=true, then the request attribute <i>org.apache.catalina.ha.tcp.isPrimarySession.</i>
054: * is set true, when request processing is at sessions primary node.
055: * </p>
056: *
057: * @author Craig R. McClanahan
058: * @author Filip Hanik
059: * @author Peter Rossbach
060: * @version $Revision: 467222 $ $Date: 2006-10-24 05:17:11 +0200 (mar., 24 oct. 2006) $
061: */
062:
063: public class ReplicationValve extends ValveBase implements ClusterValve {
064:
065: private static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory
066: .getLog(ReplicationValve.class);
067:
068: // ----------------------------------------------------- Instance Variables
069:
070: /**
071: * The descriptive information related to this implementation.
072: */
073: private static final String info = "org.apache.catalina.ha.tcp.ReplicationValve/2.0";
074:
075: /**
076: * The StringManager for this package.
077: */
078: protected static StringManager sm = StringManager
079: .getManager(Constants.Package);
080:
081: private CatalinaCluster cluster = null;
082:
083: /**
084: * holds file endings to not call for like images and others
085: */
086: protected java.util.regex.Pattern[] reqFilters = new java.util.regex.Pattern[0];
087:
088: /**
089: * Orginal filter
090: */
091: protected String filter;
092:
093: /**
094: * crossContext session container
095: */
096: protected ThreadLocal crossContextSessions = new ThreadLocal();
097:
098: /**
099: * doProcessingStats (default = off)
100: */
101: protected boolean doProcessingStats = false;
102:
103: protected long totalRequestTime = 0;
104: protected long totalSendTime = 0;
105: protected long nrOfRequests = 0;
106: protected long lastSendTime = 0;
107: protected long nrOfFilterRequests = 0;
108: protected long nrOfSendRequests = 0;
109: protected long nrOfCrossContextSendRequests = 0;
110:
111: /**
112: * must primary change indicator set
113: */
114: protected boolean primaryIndicator = false;
115:
116: /**
117: * Name of primary change indicator as request attribute
118: */
119: protected String primaryIndicatorName = "org.apache.catalina.ha.tcp.isPrimarySession";
120:
121: // ------------------------------------------------------------- Properties
122:
123: public ReplicationValve() {
124: }
125:
126: /**
127: * Return descriptive information about this Valve implementation.
128: */
129: public String getInfo() {
130:
131: return (info);
132:
133: }
134:
135: /**
136: * @return Returns the cluster.
137: */
138: public CatalinaCluster getCluster() {
139: return cluster;
140: }
141:
142: /**
143: * @param cluster The cluster to set.
144: */
145: public void setCluster(CatalinaCluster cluster) {
146: this .cluster = cluster;
147: }
148:
149: /**
150: * @return Returns the filter
151: */
152: public String getFilter() {
153: return filter;
154: }
155:
156: /**
157: * compile filter string to regular expressions
158: * @see Pattern#compile(java.lang.String)
159: * @param filter
160: * The filter to set.
161: */
162: public void setFilter(String filter) {
163: if (log.isDebugEnabled())
164: log.debug(sm.getString("ReplicationValve.filter.loading",
165: filter));
166: this .filter = filter;
167: StringTokenizer t = new StringTokenizer(filter, ";");
168: this .reqFilters = new Pattern[t.countTokens()];
169: int i = 0;
170: while (t.hasMoreTokens()) {
171: String s = t.nextToken();
172: if (log.isTraceEnabled())
173: log.trace(sm.getString("ReplicationValve.filter.token",
174: s));
175: try {
176: reqFilters[i++] = Pattern.compile(s);
177: } catch (Exception x) {
178: log.error(sm.getString(
179: "ReplicationValve.filter.token.failure", s), x);
180: }
181: }
182: }
183:
184: /**
185: * @return Returns the primaryIndicator.
186: */
187: public boolean isPrimaryIndicator() {
188: return primaryIndicator;
189: }
190:
191: /**
192: * @param primaryIndicator The primaryIndicator to set.
193: */
194: public void setPrimaryIndicator(boolean primaryIndicator) {
195: this .primaryIndicator = primaryIndicator;
196: }
197:
198: /**
199: * @return Returns the primaryIndicatorName.
200: */
201: public String getPrimaryIndicatorName() {
202: return primaryIndicatorName;
203: }
204:
205: /**
206: * @param primaryIndicatorName The primaryIndicatorName to set.
207: */
208: public void setPrimaryIndicatorName(String primaryIndicatorName) {
209: this .primaryIndicatorName = primaryIndicatorName;
210: }
211:
212: /**
213: * Calc processing stats
214: */
215: public boolean doStatistics() {
216: return doProcessingStats;
217: }
218:
219: /**
220: * Set Calc processing stats
221: * @see #resetStatistics()
222: */
223: public void setStatistics(boolean doProcessingStats) {
224: this .doProcessingStats = doProcessingStats;
225: }
226:
227: /**
228: * @return Returns the lastSendTime.
229: */
230: public long getLastSendTime() {
231: return lastSendTime;
232: }
233:
234: /**
235: * @return Returns the nrOfRequests.
236: */
237: public long getNrOfRequests() {
238: return nrOfRequests;
239: }
240:
241: /**
242: * @return Returns the nrOfFilterRequests.
243: */
244: public long getNrOfFilterRequests() {
245: return nrOfFilterRequests;
246: }
247:
248: /**
249: * @return Returns the nrOfCrossContextSendRequests.
250: */
251: public long getNrOfCrossContextSendRequests() {
252: return nrOfCrossContextSendRequests;
253: }
254:
255: /**
256: * @return Returns the nrOfSendRequests.
257: */
258: public long getNrOfSendRequests() {
259: return nrOfSendRequests;
260: }
261:
262: /**
263: * @return Returns the totalRequestTime.
264: */
265: public long getTotalRequestTime() {
266: return totalRequestTime;
267: }
268:
269: /**
270: * @return Returns the totalSendTime.
271: */
272: public long getTotalSendTime() {
273: return totalSendTime;
274: }
275:
276: /**
277: * @return Returns the reqFilters.
278: */
279: protected java.util.regex.Pattern[] getReqFilters() {
280: return reqFilters;
281: }
282:
283: /**
284: * @param reqFilters The reqFilters to set.
285: */
286: protected void setReqFilters(java.util.regex.Pattern[] reqFilters) {
287: this .reqFilters = reqFilters;
288: }
289:
290: // --------------------------------------------------------- Public Methods
291:
292: /**
293: * Register all cross context sessions inside endAccess.
294: * Use a list with contains check, that the Portlet API can include a lot of fragments from same or
295: * different applications with session changes.
296: *
297: * @param session cross context session
298: */
299: public void registerReplicationSession(DeltaSession session) {
300: List sessions = (List) crossContextSessions.get();
301: if (sessions != null) {
302: if (!sessions.contains(session)) {
303: if (log.isDebugEnabled())
304: log
305: .debug(sm
306: .getString(
307: "ReplicationValve.crossContext.registerSession",
308: session.getIdInternal(),
309: session.getManager()
310: .getContainer()
311: .getName()));
312: sessions.add(session);
313: }
314: }
315: }
316:
317: /**
318: * Log the interesting request parameters, invoke the next Valve in the
319: * sequence, and log the interesting response parameters.
320: *
321: * @param request The servlet request to be processed
322: * @param response The servlet response to be created
323: *
324: * @exception IOException if an input/output error occurs
325: * @exception ServletException if a servlet error occurs
326: */
327: public void invoke(Request request, Response response)
328: throws IOException, ServletException {
329: long totalstart = 0;
330:
331: //this happens before the request
332: if (doStatistics()) {
333: totalstart = System.currentTimeMillis();
334: }
335: if (primaryIndicator) {
336: createPrimaryIndicator(request);
337: }
338: Context context = request.getContext();
339: boolean isCrossContext = context != null
340: && context instanceof StandardContext
341: && ((StandardContext) context).getCrossContext();
342: try {
343: if (isCrossContext) {
344: if (log.isDebugEnabled())
345: log
346: .debug(sm
347: .getString("ReplicationValve.crossContext.add"));
348: //FIXME add Pool of Arraylists
349: crossContextSessions.set(new ArrayList());
350: }
351: getNext().invoke(request, response);
352: Manager manager = request.getContext().getManager();
353: if (manager != null && manager instanceof ClusterManager) {
354: ClusterManager clusterManager = (ClusterManager) manager;
355: CatalinaCluster containerCluster = (CatalinaCluster) getContainer()
356: .getCluster();
357: if (containerCluster == null) {
358: if (log.isWarnEnabled())
359: log
360: .warn(sm
361: .getString("ReplicationValve.nocluster"));
362: return;
363: }
364: // valve cluster can access manager - other cluster handle replication
365: // at host level - hopefully!
366: if (containerCluster.getManager(clusterManager
367: .getName()) == null)
368: return;
369: if (containerCluster.hasMembers()) {
370: sendReplicationMessage(request, totalstart,
371: isCrossContext, clusterManager,
372: containerCluster);
373: } else {
374: resetReplicationRequest(request, isCrossContext);
375: }
376: }
377: } finally {
378: // Array must be remove: Current master request send endAccess at recycle.
379: // Don't register this request session again!
380: if (isCrossContext) {
381: if (log.isDebugEnabled())
382: log
383: .debug(sm
384: .getString("ReplicationValve.crossContext.remove"));
385: // crossContextSessions.remove() only exist at Java 5
386: // register ArrayList at a pool
387: crossContextSessions.set(null);
388: }
389: }
390: }
391:
392: /**
393: * reset the active statitics
394: */
395: public void resetStatistics() {
396: totalRequestTime = 0;
397: totalSendTime = 0;
398: lastSendTime = 0;
399: nrOfFilterRequests = 0;
400: nrOfRequests = 0;
401: nrOfSendRequests = 0;
402: nrOfCrossContextSendRequests = 0;
403: }
404:
405: /**
406: * Return a String rendering of this object.
407: */
408: public String toString() {
409:
410: StringBuffer sb = new StringBuffer("ReplicationValve[");
411: if (container != null)
412: sb.append(container.getName());
413: sb.append("]");
414: return (sb.toString());
415:
416: }
417:
418: // --------------------------------------------------------- Protected Methods
419:
420: /**
421: * @param request
422: * @param totalstart
423: * @param isCrossContext
424: * @param clusterManager
425: * @param containerCluster
426: */
427: protected void sendReplicationMessage(Request request,
428: long totalstart, boolean isCrossContext,
429: ClusterManager clusterManager,
430: CatalinaCluster containerCluster) {
431: //this happens after the request
432: long start = 0;
433: if (doStatistics()) {
434: start = System.currentTimeMillis();
435: }
436: try {
437: // send invalid sessions
438: // DeltaManager returns String[0]
439: if (!(clusterManager instanceof DeltaManager))
440: sendInvalidSessions(clusterManager, containerCluster);
441: // send replication
442: sendSessionReplicationMessage(request, clusterManager,
443: containerCluster);
444: if (isCrossContext)
445: sendCrossContextSession(containerCluster);
446: } catch (Exception x) {
447: // FIXME we have a lot of sends, but the trouble with one node stops the correct replication to other nodes!
448: log.error(sm.getString("ReplicationValve.send.failure"), x);
449: } finally {
450: // FIXME this stats update are not cheap!!
451: if (doStatistics()) {
452: updateStats(totalstart, start);
453: }
454: }
455: }
456:
457: /**
458: * Send all changed cross context sessions to backups
459: * @param containerCluster
460: */
461: protected void sendCrossContextSession(
462: CatalinaCluster containerCluster) {
463: Object sessions = crossContextSessions.get();
464: if (sessions != null && sessions instanceof List
465: && ((List) sessions).size() > 0) {
466: for (Iterator iter = ((List) sessions).iterator(); iter
467: .hasNext();) {
468: Session session = (Session) iter.next();
469: if (log.isDebugEnabled())
470: log.debug(sm.getString(
471: "ReplicationValve.crossContext.sendDelta",
472: session.getManager().getContainer()
473: .getName()));
474: sendMessage(session, (ClusterManager) session
475: .getManager(), containerCluster);
476: if (doStatistics()) {
477: nrOfCrossContextSendRequests++;
478: }
479: }
480: }
481: }
482:
483: /**
484: * Fix memory leak for long sessions with many changes, when no backup member exists!
485: * @param request current request after responce is generated
486: * @param isCrossContext check crosscontext threadlocal
487: */
488: protected void resetReplicationRequest(Request request,
489: boolean isCrossContext) {
490: Session contextSession = request.getSessionInternal(false);
491: if (contextSession != null
492: & contextSession instanceof DeltaSession) {
493: resetDeltaRequest(contextSession);
494: ((DeltaSession) contextSession).setPrimarySession(true);
495: }
496: if (isCrossContext) {
497: Object sessions = crossContextSessions.get();
498: if (sessions != null && sessions instanceof List
499: && ((List) sessions).size() > 0) {
500: Iterator iter = ((List) sessions).iterator();
501: for (; iter.hasNext();) {
502: Session session = (Session) iter.next();
503: resetDeltaRequest(session);
504: if (session instanceof DeltaSession)
505: ((DeltaSession) contextSession)
506: .setPrimarySession(true);
507:
508: }
509: }
510: }
511: }
512:
513: /**
514: * Reset DeltaRequest from session
515: * @param session HttpSession from current request or cross context session
516: */
517: protected void resetDeltaRequest(Session session) {
518: if (log.isDebugEnabled()) {
519: log.debug(sm.getString(
520: "ReplicationValve.resetDeltaRequest", session
521: .getManager().getContainer().getName()));
522: }
523: ((DeltaSession) session).resetDeltaRequest();
524: }
525:
526: /**
527: * Send Cluster Replication Request
528: * @param request current request
529: * @param manager session manager
530: * @param cluster replication cluster
531: */
532: protected void sendSessionReplicationMessage(Request request,
533: ClusterManager manager, CatalinaCluster cluster) {
534: Session session = request.getSessionInternal(false);
535: if (session != null) {
536: String uri = request.getDecodedRequestURI();
537: // request without session change
538: if (!isRequestWithoutSessionChange(uri)) {
539: if (log.isDebugEnabled())
540: log.debug(sm.getString(
541: "ReplicationValve.invoke.uri", uri));
542: sendMessage(session, manager, cluster);
543: } else if (doStatistics())
544: nrOfFilterRequests++;
545: }
546:
547: }
548:
549: /**
550: * Send message delta message from request session
551: * @param request current request
552: * @param manager session manager
553: * @param cluster replication cluster
554: */
555: protected void sendMessage(Session session, ClusterManager manager,
556: CatalinaCluster cluster) {
557: String id = session.getIdInternal();
558: if (id != null) {
559: send(manager, cluster, id);
560: }
561: }
562:
563: /**
564: * send manager requestCompleted message to cluster
565: * @param manager SessionManager
566: * @param cluster replication cluster
567: * @param sessionId sessionid from the manager
568: * @see DeltaManager#requestCompleted(String)
569: * @see SimpleTcpCluster#send(ClusterMessage)
570: */
571: protected void send(ClusterManager manager,
572: CatalinaCluster cluster, String sessionId) {
573: ClusterMessage msg = manager.requestCompleted(sessionId);
574: if (msg != null) {
575: if (manager.doDomainReplication()) {
576: cluster.sendClusterDomain(msg);
577: } else {
578: cluster.send(msg);
579: }
580: if (doStatistics())
581: nrOfSendRequests++;
582: }
583: }
584:
585: /**
586: * check for session invalidations
587: * @param manager
588: * @param cluster
589: */
590: protected void sendInvalidSessions(ClusterManager manager,
591: CatalinaCluster cluster) {
592: String[] invalidIds = manager.getInvalidatedSessions();
593: if (invalidIds.length > 0) {
594: for (int i = 0; i < invalidIds.length; i++) {
595: try {
596: send(manager, cluster, invalidIds[i]);
597: } catch (Exception x) {
598: log.error(sm.getString(
599: "ReplicationValve.send.invalid.failure",
600: invalidIds[i]), x);
601: }
602: }
603: }
604: }
605:
606: /**
607: * is request without possible session change
608: * @param uri The request uri
609: * @return True if no session change
610: */
611: protected boolean isRequestWithoutSessionChange(String uri) {
612:
613: boolean filterfound = false;
614:
615: for (int i = 0; (i < reqFilters.length) && (!filterfound); i++) {
616: java.util.regex.Matcher matcher = reqFilters[i]
617: .matcher(uri);
618: filterfound = matcher.matches();
619: }
620: return filterfound;
621: }
622:
623: /**
624: * protocol cluster replications stats
625: * @param requestTime
626: * @param clusterTime
627: */
628: protected void updateStats(long requestTime, long clusterTime) {
629: synchronized (this ) {
630: lastSendTime = System.currentTimeMillis();
631: totalSendTime += lastSendTime - clusterTime;
632: totalRequestTime += lastSendTime - requestTime;
633: nrOfRequests++;
634: }
635: if (log.isInfoEnabled()) {
636: if ((nrOfRequests % 100) == 0) {
637: log.info(sm.getString("ReplicationValve.stats",
638: new Object[] {
639: new Long(totalRequestTime
640: / nrOfRequests),
641: new Long(totalSendTime / nrOfRequests),
642: new Long(nrOfRequests),
643: new Long(nrOfSendRequests),
644: new Long(nrOfCrossContextSendRequests),
645: new Long(nrOfFilterRequests),
646: new Long(totalRequestTime),
647: new Long(totalSendTime) }));
648: }
649: }
650: }
651:
652: /**
653: * Mark Request that processed at primary node with attribute
654: * primaryIndicatorName
655: *
656: * @param request
657: * @throws IOException
658: */
659: protected void createPrimaryIndicator(Request request)
660: throws IOException {
661: String id = request.getRequestedSessionId();
662: if ((id != null) && (id.length() > 0)) {
663: Manager manager = request.getContext().getManager();
664: Session session = manager.findSession(id);
665: if (session instanceof ClusterSession) {
666: ClusterSession cses = (ClusterSession) session;
667: if (cses != null) {
668: if (log.isDebugEnabled())
669: log.debug(sm.getString(
670: "ReplicationValve.session.indicator",
671: request.getContext().getName(), id,
672: primaryIndicatorName, cses
673: .isPrimarySession()));
674: request.setAttribute(primaryIndicatorName, cses
675: .isPrimarySession() ? Boolean.TRUE
676: : Boolean.FALSE);
677: }
678: } else {
679: if (log.isDebugEnabled()) {
680: if (session != null) {
681: log.debug(sm.getString(
682: "ReplicationValve.session.found",
683: request.getContext().getName(), id));
684: } else {
685: log.debug(sm.getString(
686: "ReplicationValve.session.invalid",
687: request.getContext().getName(), id));
688: }
689: }
690: }
691: }
692: }
693:
694: }
|