001: /*
002: * $Id: MuleEventMulticaster.java 11376 2008-03-16 17:44:10Z dfeist $
003: * --------------------------------------------------------------------------------------
004: * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com
005: *
006: * The software in this package is published under the terms of the CPAL v1.0
007: * license, a copy of which has been included with this distribution in the
008: * LICENSE.txt file.
009: */
010:
011: package org.mule.module.spring.events;
012:
013: import org.mule.DefaultMuleEvent;
014: import org.mule.DefaultMuleMessage;
015: import org.mule.DefaultMuleSession;
016: import org.mule.RequestContext;
017: import org.mule.api.MuleContext;
018: import org.mule.api.MuleEventContext;
019: import org.mule.api.MuleException;
020: import org.mule.api.MuleRuntimeException;
021: import org.mule.api.MuleSession;
022: import org.mule.api.config.MuleProperties;
023: import org.mule.api.config.ThreadingProfile;
024: import org.mule.api.context.MuleContextAware;
025: import org.mule.api.endpoint.EndpointBuilder;
026: import org.mule.api.endpoint.EndpointFactory;
027: import org.mule.api.endpoint.EndpointURI;
028: import org.mule.api.endpoint.InboundEndpoint;
029: import org.mule.api.endpoint.MalformedEndpointException;
030: import org.mule.api.endpoint.OutboundEndpoint;
031: import org.mule.api.lifecycle.Callable;
032: import org.mule.api.lifecycle.Initialisable;
033: import org.mule.api.lifecycle.InitialisationException;
034: import org.mule.api.lifecycle.LifecycleTransitionResult;
035: import org.mule.api.model.Model;
036: import org.mule.api.routing.InboundRouterCollection;
037: import org.mule.api.routing.filter.ObjectFilter;
038: import org.mule.api.service.Service;
039: import org.mule.api.transformer.Transformer;
040: import org.mule.api.transformer.TransformerException;
041: import org.mule.api.transport.Connector;
042: import org.mule.component.DefaultJavaComponent;
043: import org.mule.config.QueueProfile;
044: import org.mule.endpoint.MuleEndpointURI;
045: import org.mule.model.seda.SedaModel;
046: import org.mule.model.seda.SedaService;
047: import org.mule.module.spring.i18n.SpringMessages;
048: import org.mule.object.SingletonObjectFactory;
049: import org.mule.routing.filters.WildcardFilter;
050: import org.mule.transport.AbstractConnector;
051: import org.mule.util.ClassUtils;
052:
053: import java.beans.ExceptionListener;
054: import java.util.ArrayList;
055: import java.util.Iterator;
056: import java.util.List;
057: import java.util.Map;
058: import java.util.Set;
059:
060: import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArraySet;
061: import edu.emory.mathcs.backport.java.util.concurrent.ExecutorService;
062:
063: import org.apache.commons.logging.Log;
064: import org.apache.commons.logging.LogFactory;
065: import org.springframework.beans.BeansException;
066: import org.springframework.context.ApplicationContext;
067: import org.springframework.context.ApplicationContextAware;
068: import org.springframework.context.ApplicationEvent;
069: import org.springframework.context.ApplicationListener;
070: import org.springframework.context.event.ApplicationEventMulticaster;
071: import org.springframework.context.event.ContextClosedEvent;
072: import org.springframework.context.event.ContextRefreshedEvent;
073:
074: /**
075: * <code>MuleEventMulticaster</code> is an implementation of a Spring
076: * ApplicationeventMulticaster. This implementation allows Mule event to be sent and
077: * received through the Spring ApplicationContext. This allows any Spring bean to
078: * receive and send events from any transport that Mule supports such as Jms, Http,
079: * Tcp, Pop3, Smtp, File, etc. All a bean needs to do to receive and send events is
080: * to implement MuleEventListener. Beans can also have subscriptions to certain
081: * events by implementing MuleSubscriptionEventListener, where the bean can provide a
082: * list of endpoints on which to receive events i.e. <code>
083: * <bean id="myListener" class="com.foo.MyListener">
084: * <property name="subscriptions">
085: * <list>
086: * <value>jms://customer.support</value>
087: * <value>pop3://support:123456@mail.mycompany.com</value>
088: * </list>
089: * </property>
090: * </bean>
091: * </code>
092: * <p/> Endpoints are specified as a Mule Url which is used to register a listener
093: * for the subscription In the previous version of the MuleEventMulticaster it was
094: * possible to specify wildcard endpoints. This is still possible but you need to
095: * tell the multicaster which specific endpoints to listen on and then your
096: * subscription listeners can use wildcards. To register the specific endpoints on
097: * the MuleEvent Multicaster you use the <i>subscriptions</i> property. <p/> <code>
098: * <bean id="applicationEventMulticaster" class="org.mule.module.spring.events.MuleEventMulticaster">
099: * <property name="subscriptions">
100: * <list>
101: * <value>jms://orders.queue</value>
102: * <value>jms://another.orders.queue</value>
103: * </list>
104: * </property>
105: * </bean>
106: * <p/>
107: * <bean id="myListener" class="com.foo.MyListener">
108: * <property name="subscriptions">
109: * <list>
110: * <value>jms://*.orders.*.</value>
111: * </list>
112: * </property>
113: * </bean>
114: * <p/>
115: * </code>
116: *
117: * @see MuleEventListener
118: * @see MuleSubscriptionEventListener
119: * @see ApplicationEventMulticaster
120: */
121: public class MuleEventMulticaster implements
122: ApplicationEventMulticaster, ApplicationContextAware,
123: MuleContextAware, Callable, Initialisable {
124: public static final String EVENT_MULTICASTER_DESCRIPTOR_NAME = "muleEventMulticasterDescriptor";
125:
126: /**
127: * logger used by this class
128: */
129: protected static final Log logger = LogFactory
130: .getLog(MuleEventMulticaster.class);
131:
132: /**
133: * The set of listeners for this Multicaster
134: */
135: protected final Set listeners = new CopyOnWriteArraySet();
136:
137: /**
138: * Determines whether events will be processed asynchronously
139: */
140: protected boolean asynchronous = false;
141:
142: /**
143: * An ExecutorService for handling asynchronous events
144: */
145: protected ExecutorService asyncPool = null;
146:
147: /**
148: * A list of endpoints the eventMulticaster will receive events on Note that if
149: * this eventMulticaster has a Mule Descriptor associated with it, these
150: * endpoints are ignored and the ones on the Mule Descriptor are used. These are
151: * here for convenience, the event multicaster will use these to create a default
152: * MuleDescriptor for itself at runtime
153: */
154: protected String[] subscriptions = null;
155:
156: /**
157: * The Spring acpplication context
158: */
159: protected ApplicationContext applicationContext;
160:
161: /**
162: * The mule instance compoennt for the Multicaster
163: */
164: protected Service service;
165:
166: /**
167: * The filter used to match subscriptions
168: */
169: protected Class subscriptionFilter = WildcardFilter.class;
170:
171: /**
172: * Used to store parsed endpoints
173: */
174: protected ExceptionListener exceptionListener = new LoggingExceptionListener();
175:
176: protected MuleContext muleContext;
177:
178: public void setMuleContext(MuleContext context) {
179: this .muleContext = context;
180: }
181:
182: public LifecycleTransitionResult initialise()
183: throws InitialisationException {
184: if (asynchronous) {
185: if (asyncPool == null) {
186: asyncPool = muleContext.getDefaultThreadingProfile()
187: .createPool("spring-events");
188: }
189: } else {
190: if (asyncPool != null) {
191: asyncPool.shutdown();
192: asyncPool = null;
193: }
194: }
195: return LifecycleTransitionResult.OK;
196: }
197:
198: /**
199: * Adds a listener to the the Multicaster. If asynchronous is set to true, an
200: * <code>AsynchronousMessageListener</code> is used to wrap the listener. This
201: * listener will be initialised with a threadpool. The configuration for the
202: * threadpool can be set on this multicaster or inherited from the MuleManager
203: * configuration, which is good for most cases.
204: *
205: * @param listener the ApplicationListener to register with this Multicaster
206: * @see AsynchronousEventListener
207: * @see ThreadingProfile
208: */
209: public void addApplicationListener(ApplicationListener listener) {
210: Object listenerToAdd = listener;
211:
212: if (asynchronous) {
213: listenerToAdd = new AsynchronousEventListener(asyncPool,
214: listener);
215: }
216:
217: listeners.add(listenerToAdd);
218: }
219:
220: /**
221: * Removes a listener from the multicaster
222: *
223: * @param listener the listener to remove
224: */
225: public void removeApplicationListener(ApplicationListener listener) {
226: for (Iterator iterator = listeners.iterator(); iterator
227: .hasNext();) {
228: ApplicationListener applicationListener = (ApplicationListener) iterator
229: .next();
230: if (applicationListener instanceof AsynchronousEventListener) {
231: if (((AsynchronousEventListener) applicationListener)
232: .getListener().equals(listener)) {
233: listeners.remove(applicationListener);
234: return;
235: }
236: } else {
237: if (applicationListener.equals(listener)) {
238: listeners.remove(applicationListener);
239: return;
240: }
241: }
242: }
243: listeners.remove(listener);
244: }
245:
246: /**
247: * Removes all the listeners from the multicaster
248: */
249: public void removeAllListeners() {
250: listeners.clear();
251: }
252:
253: /**
254: * Method is used to dispatch events to listeners registered with the
255: * EventManager or dispatches events to Mule depending on the type and state of
256: * the event received. If the event is not a Mule event it will be dispatched to
257: * any listeners registered that are NOT MuleEventListeners. If the event is a
258: * Mule event and there is no source event attached to it, it is assumed that the
259: * event was dispatched by an object in the context using context.publishEvent()
260: * and will be dispatched by Mule. If the event does have a source event attached
261: * to it, it is assumed that the event was dispatched by Mule and will be
262: * delivered to any listeners subscribed to the event.
263: *
264: * @param e the application event received by the context
265: */
266: public void multicastEvent(ApplicationEvent e) {
267: MuleApplicationEvent muleEvent = null;
268: // if the context gets refreshed we need to reinitialise
269: if (e instanceof ContextRefreshedEvent) {
270: try {
271: registerMulticasterComponent();
272: } catch (MuleException ex) {
273: throw new MuleRuntimeException(SpringMessages
274: .failedToReinitMule(), ex);
275: }
276: } else if (e instanceof ContextClosedEvent) {
277: muleContext.dispose();
278: return;
279: } else if (e instanceof MuleApplicationEvent) {
280: muleEvent = (MuleApplicationEvent) e;
281: // If there is no Mule event the event didn't originate from Mule
282: // so its an outbound event
283: if (muleEvent.getMuleEventContext() == null) {
284: try {
285: dispatchEvent(muleEvent);
286: } catch (ApplicationEventException e1) {
287: exceptionListener.exceptionThrown(e1);
288: }
289: return;
290: }
291: }
292:
293: for (Iterator iterator = listeners.iterator(); iterator
294: .hasNext();) {
295: ApplicationListener listener = (ApplicationListener) iterator
296: .next();
297: if (muleEvent != null) {
298: // As the asynchronous listener wraps the real listener we need
299: // to check the type of the wrapped listener, but invoke the Async
300: // listener
301: if (listener instanceof AsynchronousEventListener) {
302: AsynchronousEventListener asyncListener = (AsynchronousEventListener) listener;
303: if (asyncListener.getListener() instanceof MuleSubscriptionEventListener) {
304: if (isSubscriptionMatch(
305: muleEvent.getEndpoint(),
306: ((MuleSubscriptionEventListener) asyncListener
307: .getListener())
308: .getSubscriptions())) {
309: asyncListener.onApplicationEvent(muleEvent);
310: }
311: } else if (asyncListener.getListener() instanceof MuleEventListener) {
312: asyncListener.onApplicationEvent(muleEvent);
313: } else if (!(asyncListener.getListener() instanceof MuleEventListener)) {
314: asyncListener.onApplicationEvent(e);
315: }
316: // Synchronous MuleEvent listener Checks
317: } else if (listener instanceof MuleSubscriptionEventListener) {
318: if (isSubscriptionMatch(muleEvent.getEndpoint(),
319: ((MuleSubscriptionEventListener) listener)
320: .getSubscriptions())) {
321: listener.onApplicationEvent(muleEvent);
322: }
323: } else if (listener instanceof MuleEventListener) {
324: listener.onApplicationEvent(muleEvent);
325: }
326: } else if (listener instanceof AsynchronousEventListener
327: && !(((AsynchronousEventListener) listener)
328: .getListener() instanceof MuleEventListener)) {
329: listener.onApplicationEvent(e);
330: } else if (!(listener instanceof MuleEventListener)) {
331: listener.onApplicationEvent(e);
332: } else {
333: // Finally only propagate the Application event if the
334: // ApplicationEvent interface is explicitly implemented
335: for (int i = 0; i < listener.getClass().getInterfaces().length; i++) {
336: if (listener.getClass().getInterfaces()[i]
337: .equals(ApplicationListener.class)) {
338: listener.onApplicationEvent(e);
339: break;
340: }
341: }
342:
343: }
344: }
345: }
346:
347: /**
348: * Matches a subscription to the current event endpointUri
349: *
350: * @param endpoint endpoint
351: * @param subscriptions subscriptions
352: * @return true if there's a match
353: */
354: private boolean isSubscriptionMatch(String endpoint,
355: String[] subscriptions) {
356: for (int i = 0; i < subscriptions.length; i++) {
357: String subscription = subscriptions[i];
358:
359: // Subscriptions can be full Mule Urls or resource specific such as
360: // my.queue
361: // if it is a MuleEndpointURI we need to extract the Resource
362: // specific part
363: // if (MuleEndpointURI.isMuleUri(subscription)) {
364: // EndpointURI ep = (EndpointURI) endpointsCache.get(subscription);
365: // if (ep == null) {
366: // try {
367: // ep = new MuleEndpointURI(subscription);
368: // } catch (MalformedEndpointException e) {
369: // throw new IllegalArgumentException(e.getMessage());
370: // }
371: // endpointsCache.put(subscription, ep);
372: // }
373: // subscription = ep.getAddress();
374: // }
375:
376: ObjectFilter filter = createFilter(subscription);
377: if (filter.accept(endpoint)) {
378: return true;
379: }
380: }
381: return false;
382: }
383:
384: /**
385: * Determines whether events will be processed asynchronously
386: *
387: * @return tru if asynchronous. The default is false
388: */
389: public boolean isAsynchronous() {
390: return asynchronous;
391: }
392:
393: /**
394: * Determines whether events will be processed asynchronously
395: *
396: * @param asynchronous true if aysnchronous
397: */
398: public void setAsynchronous(boolean asynchronous) {
399: this .asynchronous = asynchronous;
400: }
401:
402: /**
403: * This is the callback method used by Mule to give Mule events to this
404: * Multicaster
405: *
406: * @param context the context received by Mule
407: */
408: public Object onCall(MuleEventContext context)
409: throws TransformerException, MalformedEndpointException {
410: multicastEvent(new MuleApplicationEvent(context
411: .transformMessage(), context, applicationContext));
412: context.setStopFurtherProcessing(true);
413: return null;
414: }
415:
416: /**
417: * Will dispatch an application event through Mule
418: *
419: * @param applicationEvent the Spring event to be dispatched
420: * @throws ApplicationEventException if the event cannot be dispatched i.e. if
421: * the underlying transport throws an exception
422: */
423: protected void dispatchEvent(MuleApplicationEvent applicationEvent)
424: throws ApplicationEventException {
425: OutboundEndpoint endpoint;
426: try {
427: endpoint = muleContext.getRegistry()
428: .lookupEndpointFactory().getOutboundEndpoint(
429: applicationEvent.getEndpoint());
430: } catch (MuleException e) {
431: throw new ApplicationEventException(
432: "Failed to get endpoint for endpointUri: "
433: + applicationEvent.getEndpoint(), e);
434: }
435: if (endpoint != null) {
436: try {
437: // if (applicationEvent.getEndpoint() != null) {
438: // endpoint.setEndpointURI(applicationEvent.getEndpoint());
439: // }
440:
441: DefaultMuleMessage message = new DefaultMuleMessage(
442: applicationEvent.getSource(), applicationEvent
443: .getProperties());
444: // has dispatch been triggered using beanFactory.publish()
445: // without a current event
446: if (applicationEvent.getMuleEventContext() != null) {
447: // tell mule not to try and route this event itself
448: applicationEvent.getMuleEventContext()
449: .setStopFurtherProcessing(true);
450: applicationEvent.getMuleEventContext()
451: .dispatchEvent(message, endpoint);
452: } else {
453: MuleSession session = new DefaultMuleSession(
454: message, ((AbstractConnector) endpoint
455: .getConnector())
456: .getSessionHandler(), service,
457: muleContext);
458: RequestContext.setEvent(new DefaultMuleEvent(
459: message, endpoint, session, false));
460: // transform if necessary
461: if (endpoint.getTransformers() != null) {
462: message = new DefaultMuleMessage(
463: applicationEvent.getSource(),
464: applicationEvent.getProperties());
465: message.applyTransformers(endpoint
466: .getTransformers());
467: }
468: endpoint.dispatch(new DefaultMuleEvent(message,
469: endpoint, session, false));
470: }
471: } catch (Exception e1) {
472: throw new ApplicationEventException(
473: "Failed to dispatch event: " + e1.getMessage(),
474: e1);
475: }
476: } else {
477: throw new ApplicationEventException(
478: "Failed endpoint using name: "
479: + applicationEvent.getEndpoint());
480: }
481: }
482:
483: /**
484: * Set the current Spring application context
485: *
486: * @param applicationContext application context
487: * @throws BeansException
488: */
489: public void setApplicationContext(
490: ApplicationContext applicationContext)
491: throws BeansException {
492: this .applicationContext = applicationContext;
493: }
494:
495: protected void registerMulticasterComponent() throws MuleException {
496: // A discriptor hasn't been explicitly configured, so create a default
497: if (service == null) {
498: service = getDefaultService();
499: setSubscriptionsOnService(service);
500: muleContext.getRegistry().registerService(service);
501: }
502: }
503:
504: protected void setSubscriptionsOnService(Service service)
505: throws MuleException {
506: String[] subscriptions;
507: List endpoints = new ArrayList();
508: for (Iterator iterator = listeners.iterator(); iterator
509: .hasNext();) {
510: ApplicationListener listener = (ApplicationListener) iterator
511: .next();
512: if (listener instanceof AsynchronousEventListener) {
513: listener = ((AsynchronousEventListener) listener)
514: .getListener();
515: }
516: if (listener instanceof MuleSubscriptionEventListener) {
517: subscriptions = ((MuleSubscriptionEventListener) listener)
518: .getSubscriptions();
519: for (int i = 0; i < subscriptions.length; i++) {
520: if (subscriptions[i].indexOf("*") == -1
521: && MuleEndpointURI
522: .isMuleUri(subscriptions[i])) {
523: boolean isSoap = registerAsSoap(
524: subscriptions[i], listener);
525:
526: if (!isSoap) {
527: endpoints.add(subscriptions[i]);
528: }
529: }
530: }
531: }
532: }
533: if (endpoints.size() > 0) {
534: String endpoint;
535: for (Iterator iterator = endpoints.iterator(); iterator
536: .hasNext();) {
537: endpoint = (String) iterator.next();
538:
539: InboundEndpoint ep = muleContext.getRegistry()
540: .lookupEndpointFactory().getInboundEndpoint(
541: endpoint);
542:
543: // check whether the endpoint has already been set on the
544: // MuleEventMulticastor
545: if (service.getInboundRouter()
546: .getEndpoint(ep.getName()) == null) {
547: service.getInboundRouter().addEndpoint(ep);
548: }
549: }
550: }
551: }
552:
553: private boolean registerAsSoap(String endpoint, Object listener)
554: throws MuleException {
555: if (endpoint.startsWith("glue") || endpoint.startsWith("soap")
556: || endpoint.startsWith("axis")
557: || endpoint.startsWith("xfire")) {
558: EndpointURI ep = new MuleEndpointURI(endpoint);
559:
560: // get the service name from the URI path
561: String serviceName = null;
562: if (ep.getPath() != null) {
563: String path = ep.getPath();
564: if (path.endsWith("/")) {
565: path = path.substring(0, path.length() - 1);
566: }
567: int i = path.lastIndexOf("/");
568: if (i > -1) {
569: serviceName = path.substring(i + 1);
570: }
571: } else {
572: serviceName = service.getName();
573: }
574: // now strip off the service name
575: String newEndpoint = endpoint;
576: int i = newEndpoint.indexOf(serviceName);
577: newEndpoint = newEndpoint.substring(0, i - 1);
578: SedaService s = new SedaService();
579: s.setName(serviceName);
580: s.setModel(muleContext.getRegistry().lookupSystemModel());
581: s.setQueueProfile(new QueueProfile());
582: s.getInboundRouter().addEndpoint(
583: muleContext.getRegistry().lookupEndpointFactory()
584: .getInboundEndpoint(newEndpoint));
585: s.setComponent(new DefaultJavaComponent(
586: new SingletonObjectFactory(listener)));
587: muleContext.getRegistry().registerService(s);
588: return true;
589: } else {
590: return false;
591: }
592: }
593:
594: protected void registerConnectors() throws MuleException {
595: if (!muleContext.isInitialised()) {
596: // Next see if there are any UMOConnectors to register
597: Map connectors = applicationContext.getBeansOfType(
598: Connector.class, true, true);
599: if (connectors.size() > 0) {
600: Map.Entry entry;
601: Connector c;
602: for (Iterator iterator = connectors.entrySet()
603: .iterator(); iterator.hasNext();) {
604: entry = (Map.Entry) iterator.next();
605: c = (Connector) entry.getValue();
606: if (c.getName() == null) {
607: c.setName(entry.getKey().toString());
608: }
609: muleContext.getRegistry().registerConnector(c);
610: }
611: }
612: }
613: }
614:
615: protected void registerTransformers() throws MuleException {
616: if (!muleContext.isInitialised()) {
617: // Next see if there are any UMOConnectors to register
618: Map transformers = applicationContext.getBeansOfType(
619: Transformer.class, true, true);
620: if (transformers.size() > 0) {
621: Map.Entry entry;
622: Transformer t;
623: for (Iterator iterator = transformers.entrySet()
624: .iterator(); iterator.hasNext();) {
625: entry = (Map.Entry) iterator.next();
626: t = (Transformer) entry.getValue();
627: if (t.getName() == null) {
628: t.setName(entry.getKey().toString());
629: }
630: muleContext.getRegistry().registerTransformer(t);
631: }
632: }
633: }
634: }
635:
636: protected Service getDefaultService() throws MuleException {
637: // When the the beanFactory is refreshed all the beans get
638: // reloaded so we need to unregister the service from Mule
639: Model model = muleContext.getRegistry().lookupModel(
640: MuleProperties.OBJECT_SYSTEM_MODEL);
641: if (model == null) {
642: model = new SedaModel();
643: model.setName(MuleProperties.OBJECT_SYSTEM_MODEL);
644: muleContext.getRegistry().registerModel(model);
645: }
646: Service service = muleContext.getRegistry().lookupService(
647: EVENT_MULTICASTER_DESCRIPTOR_NAME);
648: if (service != null) {
649: muleContext.getRegistry().unregisterService(
650: service.getName());
651: }
652: service = new SedaService();
653: service.setName(EVENT_MULTICASTER_DESCRIPTOR_NAME);
654: service.setModel(model);
655: if (subscriptions == null) {
656: logger
657: .info("No receive endpoints have been set, using default '*'");
658: service.getInboundRouter().addEndpoint(
659: muleContext.getRegistry().lookupEndpointFactory()
660: .getInboundEndpoint("vm://*"));
661: } else {
662: // Set multiple inbound subscriptions on the descriptor
663: InboundRouterCollection messageRouter = service
664: .getInboundRouter();
665:
666: for (int i = 0; i < subscriptions.length; i++) {
667: String subscription = subscriptions[i];
668:
669: EndpointFactory endpointFactory = muleContext
670: .getRegistry().lookupEndpointFactory();
671: EndpointBuilder endpointBuilder = endpointFactory
672: .getEndpointBuilder(subscription);
673: endpointBuilder.setSynchronous(!asynchronous);
674: InboundEndpoint endpoint = endpointFactory
675: .getInboundEndpoint(endpointBuilder);
676:
677: messageRouter.addEndpoint(endpoint);
678: }
679: }
680: service.setComponent(new DefaultJavaComponent(
681: new SingletonObjectFactory(this )));
682: return service;
683: }
684:
685: protected ObjectFilter createFilter(String pattern) {
686: try {
687: if (getSubscriptionFilter() == null) {
688: setSubscriptionFilter(WildcardFilter.class);
689: }
690: ObjectFilter filter = (ObjectFilter) ClassUtils
691: .instanciateClass(getSubscriptionFilter(),
692: new Object[] { pattern });
693: return filter;
694: } catch (Exception e) {
695: exceptionListener.exceptionThrown(e);
696: return new WildcardFilter(pattern);
697: }
698: }
699:
700: /**
701: * the type of filter used to filter subscriptions
702: *
703: * @return the class of the filter to use. The default is WildcardFilter
704: * @see WildcardFilter
705: */
706: public Class getSubscriptionFilter() {
707: return subscriptionFilter;
708: }
709:
710: /**
711: * sets the type of filter used to filter subscriptions
712: *
713: * @param subscriptionFilter the class of the filter to use.
714: */
715: public void setSubscriptionFilter(Class subscriptionFilter) {
716: this .subscriptionFilter = subscriptionFilter;
717: }
718:
719: /**
720: * A list of endpoints the eventMulticaster will receive events on Note that if
721: * this eventMulticaster has a Mule Descriptor associated with it, these
722: * endpoints are ignored and the ones on the Mule Descriptor are used. These are
723: * here for convenience, the event multicaster will use these to create a default
724: * MuleDescriptor for itself at runtime
725: *
726: * @return endpoints List being listened on
727: */
728: public String[] getSubscriptions() {
729: return subscriptions;
730: }
731:
732: /**
733: * A list of endpoints the eventMulticaster will receive events on Note that if
734: * this eventMulticaster has a Mule Descriptor associated with it, these
735: * endpoints are ignored and the ones on the Mule Descriptor are used. These are
736: * here for convenience, the event multicaster will use these to create a default
737: * MuleDescriptor for itself at runtime
738: *
739: * @param subscriptions a list of enpoints to listen on
740: */
741: public void setSubscriptions(String[] subscriptions) {
742: this .subscriptions = subscriptions;
743: }
744:
745: protected void setExceptionListener(ExceptionListener listener) {
746: if (listener != null) {
747: this .exceptionListener = listener;
748: } else {
749: throw new IllegalArgumentException(
750: "exceptionListener may not be null");
751: }
752: }
753:
754: private class LoggingExceptionListener implements ExceptionListener {
755: public void exceptionThrown(Exception e) {
756: logger.error(e.getMessage(), e);
757: }
758: }
759: }
|