001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: */
019:
020: package org.apache.synapse.endpoints;
021:
022: import org.apache.axis2.clustering.ClusterManager;
023: import org.apache.axis2.context.ConfigurationContext;
024: import org.apache.commons.logging.Log;
025: import org.apache.commons.logging.LogFactory;
026: import org.apache.synapse.FaultHandler;
027: import org.apache.synapse.MessageContext;
028: import org.apache.synapse.SynapseConstants;
029: import org.apache.synapse.core.axis2.Axis2MessageContext;
030: import org.apache.synapse.endpoints.utils.EndpointDefinition;
031: import org.apache.synapse.statistics.impl.EndPointStatisticsStack;
032:
033: import java.util.Stack;
034:
035: /**
036: * This class represents an actual endpoint to send the message. It is responsible for sending the
037: * message, performing retries if a failure occurred and informing the parent endpoint if a failure
038: * couldn't be recovered.
039: */
040: public class AddressEndpoint extends FaultHandler implements Endpoint {
041:
042: private static final Log log = LogFactory
043: .getLog(AddressEndpoint.class);
044: private static final Log trace = LogFactory
045: .getLog(SynapseConstants.TRACE_LOGGER);
046:
047: /**
048: * Name of the endpoint. Used for named endpoints which can be referred using the key attribute
049: * of indirect endpoints.
050: */
051: private String name = null;
052:
053: /**
054: * Stores the endpoint details for this endpoint. Details include EPR, WS-Addressing information,
055: * WS-Security information, etc.
056: */
057: private EndpointDefinition endpoint = null;
058:
059: /**
060: * Parent endpoint of this endpoint if this used inside another endpoint. Possible parents are
061: * LoadbalanceEndpoint, SALoadbalanceEndpoint and FailoverEndpoint objects.
062: */
063: private Endpoint parentEndpoint = null;
064:
065: /**
066: * Leaf level endpoints will be suspended for the specified time by this variable, after a
067: * failure. If this is not explicitly set, it is set to -1, which causes endpoints to suspended forever.
068: */
069: private long suspendOnFailDuration = -1;
070:
071: /**
072: * The endpoint context , place holder for keep any runtime states related to the endpoint
073: */
074: private final EndpointContext endpointContext = new EndpointContext();
075:
076: public EndpointDefinition getEndpoint() {
077: return endpoint;
078: }
079:
080: public void setEndpoint(EndpointDefinition endpoint) {
081: this .endpoint = endpoint;
082: }
083:
084: public String getName() {
085: return name;
086: }
087:
088: public void setName(String name) {
089: this .name = name.trim();
090: }
091:
092: /**
093: * Checks if the endpoint is active (failed or not). If endpoint is in failed state and
094: * suspendOnFailDuration has elapsed, it will be set to active.
095: *
096: * @param synMessageContext MessageContext of the current message. This is not used here.
097: * @return true if endpoint is active. false otherwise.
098: */
099: public boolean isActive(MessageContext synMessageContext) {
100:
101: boolean active = endpointContext.isActive();
102: if (!active) {
103:
104: long recoverOn = endpointContext.getRecoverOn();
105: if (System.currentTimeMillis() > recoverOn) {
106:
107: endpointContext.setActive(true);
108: endpointContext.setRecoverOn(0);
109:
110: }
111: }
112: return active;
113: }
114:
115: /**
116: * Sets if endpoint active or not. if endpoint is set as failed (active = false), the recover on
117: * time is calculated so that it will be activated after the recover on time.
118: *
119: * @param active true if active. false otherwise.
120: * @param synMessageContext MessageContext of the current message. This is not used here.
121: */
122: public synchronized void setActive(boolean active,
123: MessageContext synMessageContext) {
124:
125: // this is synchronized as recoverOn can be set to unpredictable values if two threads call
126: // this method simultaneously.
127:
128: if (!active) {
129: if (suspendOnFailDuration != -1) {
130: // Calculating a new value by adding suspendOnFailDuration to current time.
131: // as the endpoint is set as failed
132: endpointContext.setRecoverOn(System.currentTimeMillis()
133: + suspendOnFailDuration);
134: } else {
135: endpointContext.setRecoverOn(Long.MAX_VALUE);
136: }
137: }
138:
139: this .endpointContext.setActive(active);
140: }
141:
142: /**
143: * Sends the message through this endpoint. This method just handles statistics related functions
144: * and gives the message to the Synapse environment to send. It does not add any endpoint
145: * specific details to the message context. These details are added only to the cloned message
146: * context by the Axis2FlexibleMepClient. So that we can reuse the original message context for
147: * resending through different endpoints.
148: *
149: * @param synCtx MessageContext sent by client to Synapse
150: */
151: public void send(MessageContext synCtx) {
152:
153: boolean traceOn = isTraceOn(synCtx);
154: boolean traceOrDebugOn = isTraceOrDebugOn(traceOn);
155:
156: if (traceOrDebugOn) {
157: traceOrDebug(traceOn, "Start : Address Endpoint");
158:
159: if (traceOn && trace.isTraceEnabled()) {
160: trace.trace("Message : " + synCtx.getEnvelope());
161: }
162: }
163:
164: boolean isClusteringEnable = false;
165: // get Axis2 MessageContext and ConfigurationContext
166: org.apache.axis2.context.MessageContext axisMC = ((Axis2MessageContext) synCtx)
167: .getAxis2MessageContext();
168: ConfigurationContext cc = axisMC.getConfigurationContext();
169:
170: //The check for clustering environment
171:
172: ClusterManager clusterManager = cc.getAxisConfiguration()
173: .getClusterManager();
174: if (clusterManager != null
175: && clusterManager.getContextManager() != null) {
176: isClusteringEnable = true;
177: }
178:
179: String endPointName = this .getName();
180: if (endPointName == null) {
181:
182: if (traceOrDebugOn && isClusteringEnable) {
183: log
184: .warn("In a clustering environment , the endpoint name should be specified"
185: + "even for anonymous endpoints. Otherwise , the clustering would not be "
186: + "functioned correctly if there are more than one anonymous endpoints. ");
187: }
188: endPointName = SynapseConstants.ANONYMOUS_ENDPOINT;
189: }
190:
191: if (isClusteringEnable) {
192:
193: // if this is a cluster environment , then set configuration context to endpoint context
194: if (endpointContext.getConfigurationContext() == null) {
195: endpointContext.setConfigurationContext(cc);
196: endpointContext.setContextID(endPointName); // The context ID
197: }
198: }
199:
200: // Setting Required property to collect the End Point statistics
201: boolean statisticsEnable = (SynapseConstants.STATISTICS_ON == endpoint
202: .getStatisticsState());
203: if (statisticsEnable) {
204: EndPointStatisticsStack endPointStatisticsStack = null;
205: Object statisticsStackObj = synCtx
206: .getProperty(org.apache.synapse.SynapseConstants.ENDPOINT_STATS);
207: if (statisticsStackObj == null) {
208: endPointStatisticsStack = new EndPointStatisticsStack();
209: synCtx
210: .setProperty(
211: org.apache.synapse.SynapseConstants.ENDPOINT_STATS,
212: endPointStatisticsStack);
213: } else if (statisticsStackObj instanceof EndPointStatisticsStack) {
214: endPointStatisticsStack = (EndPointStatisticsStack) statisticsStackObj;
215: }
216: if (endPointStatisticsStack != null) {
217: boolean isFault = synCtx.getEnvelope().getBody()
218: .hasFault();
219: endPointStatisticsStack.put(endPointName, System
220: .currentTimeMillis(), !synCtx.isResponse(),
221: statisticsEnable, isFault);
222: }
223: }
224:
225: if (endpoint.getAddress() != null) {
226: if (traceOrDebugOn) {
227: traceOrDebug(traceOn, "Sending message to endpoint : "
228: + endPointName + " resolves to address = "
229: + endpoint.getAddress());
230: traceOrDebug(traceOn, "SOAPAction: "
231: + (synCtx.getSoapAction() != null ? synCtx
232: .getSoapAction() : "null"));
233: traceOrDebug(traceOn, "WSA-Action: "
234: + (synCtx.getWSAAction() != null ? synCtx
235: .getWSAAction() : "null"));
236:
237: if (traceOn && trace.isTraceEnabled()) {
238: trace.trace("Envelope : \n" + synCtx.getEnvelope());
239: }
240: }
241: }
242:
243: // register this as the immediate fault handler for this message.
244: synCtx.pushFaultHandler(this );
245:
246: // add this as the last endpoint to process this message. it is used by statistics code.
247: synCtx.setProperty(SynapseConstants.PROCESSED_ENDPOINT, this );
248:
249: synCtx.getEnvironment().send(endpoint, synCtx);
250: }
251:
252: public void onChildEndpointFail(Endpoint endpoint,
253: MessageContext synMessageContext) {
254: // nothing to do as this is a leaf level endpoint
255: }
256:
257: public void setParentEndpoint(Endpoint parentEndpoint) {
258: this .parentEndpoint = parentEndpoint;
259: }
260:
261: public long getSuspendOnFailDuration() {
262: return suspendOnFailDuration;
263: }
264:
265: public void setSuspendOnFailDuration(long suspendOnFailDuration) {
266: this .suspendOnFailDuration = suspendOnFailDuration;
267: }
268:
269: public void onFault(MessageContext synCtx) {
270: // perform retries here
271:
272: // if this endpoint has actually failed, inform the parent.
273: setActive(false, synCtx);
274:
275: if (parentEndpoint != null) {
276: parentEndpoint.onChildEndpointFail(this , synCtx);
277: } else {
278: Stack faultStack = synCtx.getFaultStack();
279: if (!faultStack.isEmpty()) {
280: ((FaultHandler) faultStack.pop()).handleFault(synCtx);
281: }
282: }
283: }
284:
285: /**
286: * Should this mediator perform tracing? True if its explicitly asked to
287: * trace, or its parent has been asked to trace and it does not reject it
288: *
289: * @param msgCtx the current message
290: * @return true if tracing should be performed
291: */
292: protected boolean isTraceOn(MessageContext msgCtx) {
293: return (endpoint.getTraceState() == SynapseConstants.TRACING_ON)
294: || (endpoint.getTraceState() == SynapseConstants.TRACING_UNSET && msgCtx
295: .getTracingState() == SynapseConstants.TRACING_ON);
296: }
297:
298: /**
299: * Is tracing or debug logging on?
300: *
301: * @param isTraceOn is tracing known to be on?
302: * @return true, if either tracing or debug logging is on
303: */
304: protected boolean isTraceOrDebugOn(boolean isTraceOn) {
305: return isTraceOn || log.isDebugEnabled();
306: }
307:
308: /**
309: * Perform Trace and Debug logging of a message @INFO (trace) and DEBUG (log)
310: *
311: * @param traceOn is runtime trace on for this message?
312: * @param msg the message to log/trace
313: */
314: protected void traceOrDebug(boolean traceOn, String msg) {
315: if (traceOn) {
316: trace.info(msg);
317: }
318: if (log.isDebugEnabled()) {
319: log.debug(msg);
320: }
321: }
322: }
|