001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017: package org.apache.servicemix.jbi.nmr;
018:
019: import java.util.ArrayList;
020: import java.util.List;
021:
022: import javax.jbi.JBIException;
023: import javax.jbi.component.Component;
024: import javax.jbi.messaging.MessageExchange;
025: import javax.jbi.messaging.MessageExchange.Role;
026: import javax.jbi.messaging.MessagingException;
027: import javax.jbi.servicedesc.ServiceEndpoint;
028: import javax.management.JMException;
029: import javax.management.MBeanOperationInfo;
030: import javax.xml.namespace.QName;
031:
032: import org.apache.commons.logging.Log;
033: import org.apache.commons.logging.LogFactory;
034: import org.apache.servicemix.jbi.container.ActivationSpec;
035: import org.apache.servicemix.jbi.container.JBIContainer;
036: import org.apache.servicemix.jbi.framework.ComponentContextImpl;
037: import org.apache.servicemix.jbi.framework.ComponentMBeanImpl;
038: import org.apache.servicemix.jbi.framework.ComponentNameSpace;
039: import org.apache.servicemix.jbi.framework.Registry;
040: import org.apache.servicemix.jbi.management.BaseSystemService;
041: import org.apache.servicemix.jbi.management.ManagementContext;
042: import org.apache.servicemix.jbi.management.OperationInfoHelper;
043: import org.apache.servicemix.jbi.messaging.MessageExchangeImpl;
044: import org.apache.servicemix.jbi.nmr.flow.DefaultFlowChooser;
045: import org.apache.servicemix.jbi.nmr.flow.Flow;
046: import org.apache.servicemix.jbi.nmr.flow.FlowChooser;
047: import org.apache.servicemix.jbi.nmr.flow.FlowProvider;
048: import org.apache.servicemix.jbi.resolver.ConsumerComponentEndpointFilter;
049: import org.apache.servicemix.jbi.resolver.EndpointChooser;
050: import org.apache.servicemix.jbi.resolver.EndpointFilter;
051: import org.apache.servicemix.jbi.resolver.EndpointResolver;
052: import org.apache.servicemix.jbi.resolver.FirstChoicePolicy;
053: import org.apache.servicemix.jbi.resolver.ProducerComponentEndpointFilter;
054: import org.apache.servicemix.jbi.servicedesc.AbstractServiceEndpoint;
055: import org.apache.servicemix.jbi.servicedesc.ExternalEndpoint;
056: import org.apache.servicemix.jbi.servicedesc.InternalEndpoint;
057: import org.apache.servicemix.jbi.servicedesc.LinkedEndpoint;
058:
059: /**
060: * The Broker handles Nomalised Message Routing within ServiceMix
061: *
062: * @version $Revision: 384328 $
063: */
064: public class DefaultBroker extends BaseSystemService implements Broker {
065:
066: private static final Log LOG = LogFactory
067: .getLog(DefaultBroker.class);
068:
069: private Registry registry;
070: private String flowNames = "seda";
071: private String subscriptionFlowName;
072: private Flow[] flows;
073: private EndpointChooser defaultServiceChooser = new FirstChoicePolicy();
074: private EndpointChooser defaultInterfaceChooser = new FirstChoicePolicy();
075: private SubscriptionManager subscriptionManager = new SubscriptionManager();
076: private FlowChooser defaultFlowChooser = new DefaultFlowChooser();
077:
078: /**
079: * Constructor
080: */
081: public DefaultBroker() {
082: }
083:
084: /**
085: * Get the description
086: *
087: * @return description
088: */
089: public String getDescription() {
090: return "Normalized Message Router";
091: }
092:
093: public SubscriptionManager getSubscriptionManager() {
094: return subscriptionManager;
095: }
096:
097: /**
098: * Sets the subscription manager
099: */
100: public void setSubscriptionManager(
101: SubscriptionManager subscriptionManager) {
102: this .subscriptionManager = subscriptionManager;
103: }
104:
105: /**
106: * initialize the broker
107: *
108: * @param container
109: * @throws JBIException
110: */
111: public void init(JBIContainer container) throws JBIException {
112: super .init(container);
113: this .registry = container.getRegistry();
114: // Create and initialize flows
115: if (this .flows == null) {
116: String[] names = flowNames.split(",");
117: flows = new Flow[names.length];
118: for (int i = 0; i < names.length; i++) {
119: flows[i] = FlowProvider.getFlow(names[i]);
120: flows[i].init(this );
121: }
122: } else {
123: for (int i = 0; i < flows.length; i++) {
124: flows[i].init(this );
125: }
126: }
127: subscriptionManager.init(this , registry);
128: }
129:
130: protected Class<BrokerMBean> getServiceMBean() {
131: return BrokerMBean.class;
132: }
133:
134: /**
135: * Get the name of the Container
136: *
137: * @return containerName
138: */
139: public String getContainerName() {
140: return container.getName();
141: }
142:
143: /**
144: * Get the ManagementContext
145: *
146: * @return the managementContext
147: */
148: public ManagementContext getManagementContext() {
149: return container.getManagementContext();
150: }
151:
152: /**
153: * Get the Registry
154: *
155: * @return the registry
156: */
157: public Registry getRegistry() {
158: return registry;
159: }
160:
161: /**
162: * start brokering
163: *
164: * @throws JBIException
165: */
166: public void start() throws JBIException {
167: for (int i = 0; i < flows.length; i++) {
168: flows[i].start();
169: }
170: super .start();
171: }
172:
173: /**
174: * stop brokering
175: *
176: * @throws JBIException
177: */
178: public void stop() throws JBIException {
179: for (int i = 0; i < flows.length; i++) {
180: flows[i].stop();
181: }
182: super .stop();
183: }
184:
185: /**
186: * shutdown all Components
187: *
188: * @throws JBIException
189: */
190: public void shutDown() throws JBIException {
191: stop();
192: for (int i = 0; i < flows.length; i++) {
193: flows[i].shutDown();
194: }
195: container
196: .deactivateComponent(SubscriptionManager.COMPONENT_NAME);
197: super .shutDown();
198: container.getManagementContext().unregisterMBean(this );
199: }
200:
201: /**
202: * @return Returns the flow.
203: */
204: public String getFlowNames() {
205: return flowNames;
206: }
207:
208: /**
209: * @param flowName
210: * The flow to set.
211: */
212: public void setFlowNames(String flowNames) {
213: this .flowNames = flowNames;
214: }
215:
216: /**
217: * @return the subscriptionFlowName
218: */
219: public String getSubscriptionFlowName() {
220: return subscriptionFlowName;
221: }
222:
223: /**
224: * Set the subscription flow name
225: *
226: * @param subscriptionFlowName
227: */
228: public void setSubscriptionFlowName(String subscriptionFlowName) {
229: this .subscriptionFlowName = subscriptionFlowName;
230: }
231:
232: /**
233: * Set the flow
234: *
235: * @param flow
236: */
237: public void setFlows(Flow[] flows) {
238: this .flows = flows;
239: }
240:
241: /**
242: * @return the Flow
243: */
244: public Flow[] getFlows() {
245: return this .flows;
246: }
247:
248: /**
249: * suspend the flow to prevent any message exchanges
250: */
251: public void suspend() {
252: for (int i = 0; i < flows.length; i++) {
253: flows[i].suspend();
254: }
255: }
256:
257: /**
258: * resume message exchange processing
259: */
260: public void resume() {
261: for (int i = 0; i < flows.length; i++) {
262: flows[i].resume();
263: }
264: }
265:
266: /**
267: * Route an ExchangePacket to a destination
268: *
269: * @param exchange
270: * @throws JBIException
271: */
272: public void sendExchangePacket(MessageExchange me)
273: throws JBIException {
274: MessageExchangeImpl exchange = (MessageExchangeImpl) me;
275: if (exchange.getRole() == Role.PROVIDER
276: && exchange.getDestinationId() == null) {
277: resolveAddress(exchange);
278: }
279:
280: boolean foundRoute = false;
281: // If we found a destination, or this is a reply
282: if (exchange.getEndpoint() != null
283: || exchange.getRole() == Role.CONSUMER) {
284: foundRoute = true;
285: Flow flow = defaultFlowChooser.chooseFlow(flows, exchange);
286: if (flow == null) {
287: throw new MessagingException(
288: "Unable to choose a flow for exchange: "
289: + exchange);
290: }
291: flow.send(exchange);
292: }
293:
294: if (exchange.getRole() == Role.PROVIDER) {
295: getSubscriptionManager().dispatchToSubscribers(exchange);
296: }
297:
298: if (!foundRoute) {
299: boolean throwException = true;
300: ActivationSpec activationSpec = exchange
301: .getActivationSpec();
302: if (activationSpec != null) {
303: throwException = activationSpec
304: .isFailIfNoDestinationEndpoint();
305: }
306: if (throwException) {
307: throw new MessagingException(
308: "Could not find route for exchange: "
309: + exchange + " for service: "
310: + exchange.getService()
311: + " and interface: "
312: + exchange.getInterfaceName());
313: } else if (exchange.getMirror().getSyncState() == MessageExchangeImpl.SYNC_STATE_SYNC_SENT) {
314: exchange.handleAccept();
315: ComponentContextImpl ctx = (ComponentContextImpl) getSubscriptionManager()
316: .getContext();
317: exchange.setDestinationId(ctx.getComponentNameSpace());
318: // TODO: this will fail if exchange is InOut
319: getSubscriptionManager().done(exchange);
320: }
321: }
322: }
323:
324: protected void resolveAddress(MessageExchangeImpl exchange)
325: throws JBIException {
326: ServiceEndpoint theEndpoint = exchange.getEndpoint();
327: if (theEndpoint != null) {
328: if (theEndpoint instanceof ExternalEndpoint) {
329: throw new JBIException(
330: "External endpoints can not be used for routing: should be an internal or dynamic endpoint.");
331: }
332: if (!(theEndpoint instanceof AbstractServiceEndpoint)) {
333: throw new JBIException(
334: "Component-specific endpoints can not be used for routing: should be an internal or dynamic endpoint.");
335: }
336: }
337: // Resolve linked endpoints
338: if (theEndpoint instanceof LinkedEndpoint) {
339: QName svcName = ((LinkedEndpoint) theEndpoint)
340: .getToService();
341: String epName = ((LinkedEndpoint) theEndpoint)
342: .getToEndpoint();
343: ServiceEndpoint ep = registry.getInternalEndpoint(svcName,
344: epName);
345: if (ep == null) {
346: throw new JBIException(
347: "Could not resolve linked endpoint: "
348: + theEndpoint);
349: }
350: theEndpoint = ep;
351: }
352:
353: // get the context which created the exchange
354: ComponentContextImpl context = exchange.getSourceContext();
355: if (theEndpoint == null) {
356: QName serviceName = exchange.getService();
357: QName interfaceName = exchange.getInterfaceName();
358:
359: // check in order, ServiceName then InterfaceName
360: // check to see if there is a match on the serviceName
361: if (serviceName != null) {
362: ServiceEndpoint[] endpoints = registry
363: .getEndpointsForService(serviceName);
364: endpoints = getMatchingEndpoints(endpoints, exchange);
365: theEndpoint = getServiceChooser(exchange)
366: .chooseEndpoint(endpoints, context, exchange);
367: if (theEndpoint == null) {
368: LOG
369: .warn("ServiceName ("
370: + serviceName
371: + ") specified for routing, but can't find it registered");
372: }
373: }
374: if (theEndpoint == null && interfaceName != null) {
375: ServiceEndpoint[] endpoints = registry
376: .getEndpointsForInterface(interfaceName);
377: endpoints = getMatchingEndpoints(endpoints, exchange);
378: theEndpoint = (InternalEndpoint) getInterfaceChooser(
379: exchange).chooseEndpoint(endpoints, context,
380: exchange);
381: if (theEndpoint == null) {
382: LOG
383: .warn("InterfaceName ("
384: + interfaceName
385: + ") specified for routing, but can't find any matching components");
386: }
387: }
388: if (theEndpoint == null) {
389: // lets use the resolver on the activation spec if
390: // applicable
391: ActivationSpec activationSpec = exchange
392: .getActivationSpec();
393: if (activationSpec != null) {
394: EndpointResolver destinationResolver = activationSpec
395: .getDestinationResolver();
396: if (destinationResolver != null) {
397: try {
398: EndpointFilter filter = createEndpointFilter(
399: context, exchange);
400: theEndpoint = (InternalEndpoint) destinationResolver
401: .resolveEndpoint(context, exchange,
402: filter);
403: } catch (JBIException e) {
404: throw new MessagingException(
405: "Failed to resolve endpoint: " + e,
406: e);
407: }
408: }
409: }
410: }
411: }
412: if (theEndpoint != null) {
413: exchange.setEndpoint(theEndpoint);
414: }
415: if (LOG.isTraceEnabled()) {
416: LOG.trace("Routing exchange " + exchange + " to: "
417: + theEndpoint);
418: }
419: }
420:
421: /**
422: * Filter the given endpoints by asking to the provider and consumer if they
423: * are both ok to process the exchange.
424: *
425: * @param endpoints
426: * an array of internal endpoints to check
427: * @param exchange
428: * the exchange that will be serviced
429: * @return an array of endpoints on which both consumer and provider agrees
430: */
431: protected ServiceEndpoint[] getMatchingEndpoints(
432: ServiceEndpoint[] endpoints, MessageExchangeImpl exchange) {
433: List<ServiceEndpoint> filtered = new ArrayList<ServiceEndpoint>();
434: ComponentMBeanImpl consumer = getRegistry().getComponent(
435: exchange.getSourceId());
436:
437: for (int i = 0; i < endpoints.length; i++) {
438: ComponentNameSpace id = ((InternalEndpoint) endpoints[i])
439: .getComponentNameSpace();
440: if (id != null) {
441: ComponentMBeanImpl provider = getRegistry()
442: .getComponent(id);
443: if (provider != null
444: && (!consumer.getComponent()
445: .isExchangeWithProviderOkay(
446: endpoints[i], exchange) || !provider
447: .getComponent()
448: .isExchangeWithConsumerOkay(
449: endpoints[i], exchange))) {
450: continue;
451: }
452: }
453: filtered.add(endpoints[i]);
454: }
455: return filtered.toArray(new ServiceEndpoint[filtered.size()]);
456: }
457:
458: /**
459: * @return the default EndpointChooser
460: */
461: public EndpointChooser getDefaultInterfaceChooser() {
462: return defaultInterfaceChooser;
463: }
464:
465: /**
466: * Set the default EndpointChooser
467: *
468: * @param defaultInterfaceChooser
469: */
470: public void setDefaultInterfaceChooser(
471: EndpointChooser defaultInterfaceChooser) {
472: this .defaultInterfaceChooser = defaultInterfaceChooser;
473: }
474:
475: /**
476: * @return the default EndpointChooser
477: */
478: public EndpointChooser getDefaultServiceChooser() {
479: return defaultServiceChooser;
480: }
481:
482: /**
483: * Set default EndpointChooser
484: *
485: * @param defaultServiceChooser
486: */
487: public void setDefaultServiceChooser(
488: EndpointChooser defaultServiceChooser) {
489: this .defaultServiceChooser = defaultServiceChooser;
490: }
491:
492: /**
493: * @return the defaultFlowChooser
494: */
495: public FlowChooser getDefaultFlowChooser() {
496: return defaultFlowChooser;
497: }
498:
499: /**
500: * @param defaultFlowChooser
501: * the defaultFlowChooser to set
502: */
503: public void setDefaultFlowChooser(FlowChooser defaultFlowChooser) {
504: this .defaultFlowChooser = defaultFlowChooser;
505: }
506:
507: /**
508: * Returns the endpoint chooser for endpoints found by service which will
509: * use the chooser on the exchange's activation spec if available otherwise
510: * will use the default
511: *
512: * @param exchange
513: * @return the EndpointChooser
514: */
515: protected EndpointChooser getServiceChooser(
516: MessageExchangeImpl exchange) {
517: EndpointChooser chooser = null;
518: ActivationSpec activationSpec = exchange.getActivationSpec();
519: if (activationSpec != null) {
520: chooser = activationSpec.getServiceChooser();
521: }
522: if (chooser == null) {
523: chooser = defaultServiceChooser;
524: }
525: return chooser;
526: }
527:
528: /**
529: * Returns the endpoint chooser for endpoints found by service which will
530: * use the chooser on the exchange's activation spec if available otherwise
531: * will use the default
532: *
533: * @param exchange
534: * @return the EndpointChooser
535: */
536: protected EndpointChooser getInterfaceChooser(
537: MessageExchangeImpl exchange) {
538: EndpointChooser chooser = null;
539: ActivationSpec activationSpec = exchange.getActivationSpec();
540: if (activationSpec != null) {
541: chooser = activationSpec.getInterfaceChooser();
542: }
543: if (chooser == null) {
544: chooser = defaultInterfaceChooser;
545: }
546: return chooser;
547: }
548:
549: /**
550: * Factory method to create an endpoint filter for the given component
551: * context and message exchange
552: *
553: * @param context
554: * @param exchange
555: * @return the EndpointFilter
556: */
557: protected EndpointFilter createEndpointFilter(
558: ComponentContextImpl context, MessageExchangeImpl exchange) {
559: Component component = context.getComponent();
560: if (exchange.getRole() == Role.PROVIDER) {
561: return new ConsumerComponentEndpointFilter(component);
562: } else {
563: return new ProducerComponentEndpointFilter(component);
564: }
565: }
566:
567: /**
568: * Get an array of MBeanOperationInfo
569: *
570: * @return array of OperationInfos
571: * @throws JMException
572: */
573: public MBeanOperationInfo[] getOperationInfos() throws JMException {
574: OperationInfoHelper helper = new OperationInfoHelper();
575: helper.addOperation(getObjectToManage(), "suspend",
576: "suspend the NMR processing");
577: helper.addOperation(getObjectToManage(), "resume",
578: "resume the NMR processing");
579:
580: return OperationInfoHelper.join(super .getOperationInfos(),
581: helper.getOperationInfos());
582: }
583:
584: public JBIContainer getContainer() {
585: return container;
586: }
587:
588: }
|