001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: */
019:
020: package org.apache.synapse.core.axis2;
021:
022: import org.apache.axiom.om.OMAbstractFactory;
023: import org.apache.axis2.context.ConfigurationContext;
024: import org.apache.axis2.context.OperationContext;
025: import org.apache.axis2.context.ServiceContext;
026: import org.apache.axis2.description.InOutAxisOperation;
027: import org.apache.commons.logging.Log;
028: import org.apache.commons.logging.LogFactory;
029: import org.apache.synapse.Mediator;
030: import org.apache.synapse.MessageContext;
031: import org.apache.synapse.SynapseConstants;
032: import org.apache.synapse.SynapseException;
033: import org.apache.synapse.config.SynapseConfiguration;
034: import org.apache.synapse.core.SynapseEnvironment;
035: import org.apache.synapse.endpoints.utils.EndpointDefinition;
036: import org.apache.synapse.mediators.MediatorWorker;
037: import org.apache.synapse.mediators.base.SequenceMediator;
038: import org.apache.synapse.statistics.StatisticsCollector;
039: import org.apache.synapse.statistics.StatisticsUtils;
040: import org.apache.synapse.util.UUIDGenerator;
041: import org.apache.synapse.util.concurrent.SynapseThreadPool;
042:
043: import java.util.concurrent.ExecutorService;
044:
045: /**
046: * This is the Axis2 implementation of the SynapseEnvironment
047: */
048: public class Axis2SynapseEnvironment implements SynapseEnvironment {
049:
050: private static final Log log = LogFactory
051: .getLog(Axis2SynapseEnvironment.class);
052:
053: private SynapseConfiguration synapseConfig;
054: private ConfigurationContext configContext;
055: private ExecutorService executorService;
056: private boolean initialized = false;
057:
058: /** The StatisticsCollector object */
059: private StatisticsCollector statisticsCollector;
060:
061: public Axis2SynapseEnvironment(SynapseConfiguration synCfg) {
062:
063: int coreThreads = SynapseThreadPool.SYNAPSE_CORE_THREADS;
064: int maxThreads = SynapseThreadPool.SYNAPSE_MAX_THREADS;
065: long keepAlive = SynapseThreadPool.SYNAPSE_KEEP_ALIVE;
066: int qlength = SynapseThreadPool.SYNAPSE_THREAD_QLEN;
067:
068: try {
069: qlength = Integer.parseInt(synCfg
070: .getProperty(SynapseThreadPool.SYN_THREAD_QLEN));
071: } catch (Exception ignore) {
072: }
073:
074: try {
075: coreThreads = Integer.parseInt(synCfg
076: .getProperty(SynapseThreadPool.SYN_THREAD_CORE));
077: } catch (Exception ignore) {
078: }
079:
080: try {
081: maxThreads = Integer.parseInt(synCfg
082: .getProperty(SynapseThreadPool.SYN_THREAD_MAX));
083: } catch (Exception ignore) {
084: }
085:
086: try {
087: keepAlive = Long.parseLong(synCfg
088: .getProperty(SynapseThreadPool.SYN_THREAD_ALIVE));
089: } catch (Exception ignore) {
090: }
091:
092: this .executorService = new SynapseThreadPool(coreThreads,
093: maxThreads, keepAlive, qlength, synCfg.getProperty(
094: SynapseThreadPool.SYN_THREAD_GROUP,
095: SynapseThreadPool.SYNAPSE_THREAD_GROUP),
096: synCfg.getProperty(
097: SynapseThreadPool.SYN_THREAD_IDPREFIX,
098: SynapseThreadPool.SYNAPSE_THREAD_ID_PREFIX));
099: }
100:
101: public Axis2SynapseEnvironment(ConfigurationContext cfgCtx,
102: SynapseConfiguration synapseConfig) {
103: this (synapseConfig);
104: this .configContext = cfgCtx;
105: this .synapseConfig = synapseConfig;
106: }
107:
108: public boolean injectMessage(final MessageContext synCtx) {
109: if (log.isDebugEnabled()) {
110: log.debug("Injecting MessageContext");
111: }
112: synCtx.setEnvironment(this );
113: if (synCtx.isResponse()) {
114: //Process statistics related to a sequence which has send mediator as a child,end point
115: StatisticsUtils.processEndPointStatistics(synCtx);
116: StatisticsUtils.processProxyServiceStatistics(synCtx);
117: StatisticsUtils.processSequenceStatistics(synCtx);
118: }
119:
120: // if this is a response to a proxy service
121: if (synCtx.getProperty(SynapseConstants.PROXY_SERVICE) != null) {
122:
123: if (synCtx
124: .getConfiguration()
125: .getProxyService(
126: (String) synCtx
127: .getProperty(SynapseConstants.PROXY_SERVICE))
128: .getTargetOutSequence() != null) {
129:
130: String sequenceName = synCtx
131: .getConfiguration()
132: .getProxyService(
133: (String) synCtx
134: .getProperty(SynapseConstants.PROXY_SERVICE))
135: .getTargetOutSequence();
136: Mediator outSequence = synCtx.getSequence(sequenceName);
137:
138: if (outSequence != null) {
139: if (log.isDebugEnabled()) {
140: log
141: .debug("Using the sequence named "
142: + sequenceName
143: + " for the outgoing message mediation of the proxy service "
144: + synCtx
145: .getProperty(SynapseConstants.PROXY_SERVICE));
146: }
147: outSequence.mediate(synCtx);
148: } else {
149: log.error("Unable to find the out-sequence "
150: + "specified by the name " + sequenceName);
151: throw new SynapseException("Unable to find the "
152: + "out-sequence specified by the name "
153: + sequenceName);
154: }
155:
156: } else if (synCtx
157: .getConfiguration()
158: .getProxyService(
159: (String) synCtx
160: .getProperty(SynapseConstants.PROXY_SERVICE))
161: .getTargetInLineOutSequence() != null) {
162: if (log.isDebugEnabled()) {
163: log
164: .debug("Using the anonymous out-sequence specified in the proxy service "
165: + synCtx
166: .getProperty(SynapseConstants.PROXY_SERVICE)
167: + " for outgoing message mediation");
168: }
169: synCtx
170: .getConfiguration()
171: .getProxyService(
172: (String) synCtx
173: .getProperty(SynapseConstants.PROXY_SERVICE))
174: .getTargetInLineOutSequence().mediate(synCtx);
175: } else {
176: if (log.isDebugEnabled()) {
177: log
178: .debug("Proxy service "
179: + synCtx
180: .getProperty(SynapseConstants.PROXY_SERVICE)
181: + " does not specifies an out-sequence - sending the response back");
182: }
183: Axis2Sender.sendBack(synCtx);
184: }
185:
186: } else {
187: if (log.isDebugEnabled()) {
188: log.debug("Using Main Sequence for injected message");
189: }
190: return synCtx.getMainSequence().mediate(synCtx);
191: }
192: return true;
193: }
194:
195: public void injectAsync(final MessageContext synCtx,
196: SequenceMediator seq) {
197: if (log.isDebugEnabled()) {
198: log
199: .debug("Injecting MessageContext for asynchronous mediation using the : "
200: + (seq.getName() == null ? "Anonymous"
201: : seq.getName()) + " Sequence");
202: }
203: synCtx.setEnvironment(this );
204: // todo: do we need to have this in here ? ruwan
205: if (synCtx.isResponse()) {
206: //Process statistics related to a sequence which has send mediator as a child,end point
207: StatisticsUtils.processEndPointStatistics(synCtx);
208: StatisticsUtils.processProxyServiceStatistics(synCtx);
209: StatisticsUtils.processSequenceStatistics(synCtx);
210: }
211:
212: executorService.execute(new MediatorWorker(seq, synCtx));
213:
214: }
215:
216: /**
217: * This will be used for sending the message provided, to the endpoint specified by the
218: * EndpointDefinition using the axis2 environment.
219: *
220: * @param endpoint - EndpointDefinition to be used to find the endpoint information
221: * and the properties of the sending process
222: * @param synCtx - Synapse MessageContext to be sent
223: */
224: public void send(EndpointDefinition endpoint, MessageContext synCtx) {
225: if (synCtx.isResponse()) {
226:
227: if (endpoint != null) {
228: // not sure whether we need to collect statistics here
229: StatisticsUtils.processEndPointStatistics(synCtx);
230: StatisticsUtils.processProxyServiceStatistics(synCtx);
231: StatisticsUtils.processAllSequenceStatistics(synCtx);
232:
233: Axis2Sender.sendOn(endpoint, synCtx);
234:
235: } else {
236: Axis2Sender.sendBack(synCtx);
237: }
238: } else {
239: Axis2Sender.sendOn(endpoint, synCtx);
240: }
241: }
242:
243: /**
244: * This method will be used to create a new MessageContext in the Axis2 environment for
245: * Synapse. This will set all the relevant parts to the messagecontext, but for this message
246: * context to be useful creator has to fill in the data like envelope and operation context
247: * and so on. This will set a default envelope of type soap12 and a new messageID for the
248: * created message along with the ConfigurationContext is being set in to the message
249: * correctly.
250: *
251: * @return Synapse MessageContext with the underlying axis2 message context set
252: */
253: public MessageContext createMessageContext() {
254:
255: if (log.isDebugEnabled()) {
256: log.debug("Creating Message Context");
257: }
258:
259: org.apache.axis2.context.MessageContext axis2MC = new org.apache.axis2.context.MessageContext();
260: axis2MC.setConfigurationContext(this .configContext);
261:
262: ServiceContext svcCtx = new ServiceContext();
263: OperationContext opCtx = new OperationContext(
264: new InOutAxisOperation(), svcCtx);
265: axis2MC.setServiceContext(svcCtx);
266: axis2MC.setOperationContext(opCtx);
267: MessageContext mc = new Axis2MessageContext(axis2MC,
268: synapseConfig, this );
269: mc.setMessageID(UUIDGenerator.getUUID());
270: try {
271: mc.setEnvelope(OMAbstractFactory.getSOAP12Factory()
272: .createSOAPEnvelope());
273: mc.getEnvelope().addChild(
274: OMAbstractFactory.getSOAP12Factory()
275: .createSOAPBody());
276: } catch (Exception e) {
277: e.printStackTrace();
278: }
279:
280: return mc;
281: }
282:
283: /**
284: * This method returns the StatisticsCollector
285: *
286: * @return Retruns the StatisticsCollector
287: */
288: public StatisticsCollector getStatisticsCollector() {
289: return statisticsCollector;
290: }
291:
292: /**
293: * To set the StatisticsCollector
294: *
295: * @param collector - Statistics collector to be set
296: */
297: public void setStatisticsCollector(StatisticsCollector collector) {
298: this .statisticsCollector = collector;
299: }
300:
301: /**
302: * This will give the access to the synapse thread pool for the
303: * advanced mediation tasks.
304: *
305: * @return an ExecutorService to execute the tasks in a new thread from the pool
306: */
307: public ExecutorService getExecutorService() {
308: return executorService;
309: }
310:
311: /**
312: * Has this environment properly initialized?
313: * @return true if ready for processing
314: */
315: public boolean isInitialized() {
316: return initialized;
317: }
318:
319: /**
320: * Mark this environment as ready for processing
321: * @param state true means ready for processing
322: */
323: public void setInitialized(boolean state) {
324: this.initialized = state;
325: }
326:
327: }
|