001: /*
002: * $Id: ServerNotificationManager.java 11362 2008-03-14 11:27:59Z tcarlson $
003: * --------------------------------------------------------------------------------------
004: * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com
005: *
006: * The software in this package is published under the terms of the CPAL v1.0
007: * license, a copy of which has been included with this distribution in the
008: * LICENSE.txt file.
009: */
010:
011: package org.mule.context.notification;
012:
013: import org.mule.api.context.WorkManager;
014: import org.mule.api.context.notification.BlockingServerEvent;
015: import org.mule.api.context.notification.ServerNotification;
016: import org.mule.api.context.notification.ServerNotificationHandler;
017: import org.mule.api.context.notification.ServerNotificationListener;
018: import org.mule.api.lifecycle.Disposable;
019: import org.mule.api.lifecycle.LifecycleException;
020: import org.mule.util.ClassUtils;
021:
022: import java.util.Collection;
023: import java.util.Collections;
024: import java.util.Map;
025:
026: import javax.resource.spi.work.Work;
027: import javax.resource.spi.work.WorkException;
028: import javax.resource.spi.work.WorkListener;
029:
030: import edu.emory.mathcs.backport.java.util.concurrent.BlockingDeque;
031: import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingDeque;
032: import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
033:
034: import org.apache.commons.logging.Log;
035: import org.apache.commons.logging.LogFactory;
036:
037: /**
038: * A reworking of the event manager that allows efficient behaviour without global on/off
039: * switches in the config.
040: *
041: * <p>The configuration and resulting policy are separate; the policy
042: * is a summary of the configuration that contains information to decide whether a particular
043: * message can be handled, and which updates that with experience gained handling messages.
044: * When the configuration is changed the policy is rebuilt. In this way we get a fairly
045: * efficient system without needing controls elsewhere.
046: *
047: * <p>However, measurements showed that there was still a small impact on speed in some
048: * cases. To improve behaviour further the
049: * {@link org.mule.context.notification.OptimisedNotificationHandler} was
050: * added. This allows a service that generates notifications to cache locally a handler
051: * optimised for a particular class.
052: *
053: * <p>The dynamic flag stops this caching from occurring. This reduces efficiency slightly
054: * (about 15% cost on simple VM messages, less on other transports)</p>
055: *
056: * <p>Note that, because of subclass relationships, we need to be very careful about exactly
057: * what is enabled and disabled:
058: * <ul>
059: * <li>Disabling an event or interface disables all uses of that class or any subclass.</li>
060: * <li>Enquiring whether an event is enabled returns true if any subclass is enabled.</li>
061: * </ul>
062: */
063: public class ServerNotificationManager implements Work, Disposable,
064: ServerNotificationHandler {
065:
066: public static final String NULL_SUBSCRIPTION = "NULL";
067: protected Log logger = LogFactory.getLog(getClass());
068: private boolean dynamic = false;
069: private Configuration configuration = new Configuration();
070: private AtomicBoolean disposed = new AtomicBoolean(false);
071: private BlockingDeque eventQueue = new LinkedBlockingDeque();
072:
073: public boolean isNotificationDynamic() {
074: return dynamic;
075: }
076:
077: public void setNotificationDynamic(boolean dynamic) {
078: this .dynamic = dynamic;
079: }
080:
081: public void start(WorkManager workManager, WorkListener workListener)
082: throws LifecycleException {
083: try {
084: workManager.scheduleWork(this , WorkManager.INDEFINITE,
085: null, workListener);
086: } catch (WorkException e) {
087: throw new LifecycleException(e, this );
088: }
089: }
090:
091: public void addInterfaceToType(Class iface, Class event) {
092: configuration.addInterfaceToType(iface, event);
093: }
094:
095: public void setInterfaceToTypes(Map interfaceToEvents)
096: throws ClassNotFoundException {
097: configuration.addAllInterfaceToTypes(interfaceToEvents);
098: }
099:
100: public void addListenerSubscriptionPair(
101: ListenerSubscriptionPair pair) {
102: configuration.addListenerSubscriptionPair(pair);
103: }
104:
105: public void addListener(ServerNotificationListener listener) {
106: configuration
107: .addListenerSubscriptionPair(new ListenerSubscriptionPair(
108: listener));
109: }
110:
111: public void addListenerSubscription(
112: ServerNotificationListener listener, String subscription) {
113: configuration
114: .addListenerSubscriptionPair(new ListenerSubscriptionPair(
115: listener, subscription));
116: }
117:
118: public void setAllListenerSubscriptionPairs(Collection pairs) {
119: configuration.addAllListenerSubscriptionPairs(pairs);
120: }
121:
122: /**
123: * This removes *all* registrations that reference this listener
124: */
125: public void removeListener(ServerNotificationListener listener) {
126: configuration.removeListener(listener);
127: }
128:
129: public void removeAllListeners(Collection listeners) {
130: configuration.removeAllListeners(listeners);
131: }
132:
133: public void disableInterface(Class iface)
134: throws ClassNotFoundException {
135: configuration.disableInterface(iface);
136: }
137:
138: public void setDisabledInterfaces(Collection interfaces)
139: throws ClassNotFoundException {
140: configuration.disabledAllInterfaces(interfaces);
141: }
142:
143: public void disableType(Class type) throws ClassNotFoundException {
144: configuration.disableType(type);
145: }
146:
147: public void setDisabledTypes(Collection types)
148: throws ClassNotFoundException {
149: configuration.disableAllTypes(types);
150: }
151:
152: public void fireNotification(ServerNotification notification) {
153: if (!disposed.get()) {
154: if (notification instanceof BlockingServerEvent) {
155: notifyListeners(notification);
156: } else {
157: try {
158: eventQueue.put(notification);
159: } catch (InterruptedException e) {
160: if (!disposed.get()) {
161: logger.error("Failed to queue notification: "
162: + notification, e);
163: }
164: }
165: }
166: }
167: }
168:
169: public boolean isNotificationEnabled(Class type) {
170: return configuration.getPolicy().isNotificationEnabled(type);
171: }
172:
173: public void dispose() {
174: disposed.set(true);
175: configuration = null;
176: }
177:
178: protected void notifyListeners(ServerNotification notification) {
179: if (!disposed.get()) {
180: configuration.getPolicy().dispatch(notification);
181: }
182: }
183:
184: public void release() {
185: dispose();
186: }
187:
188: public void run() {
189: while (!disposed.get()) {
190: try {
191: ServerNotification notification = (ServerNotification) eventQueue
192: .take();
193: notifyListeners(notification);
194: } catch (InterruptedException e) {
195: // ignore - we just loop round
196: }
197: }
198: }
199:
200: /**
201: * Support string or class parameters
202: */
203: public static Class toClass(Object value)
204: throws ClassNotFoundException {
205: Class clazz;
206: if (value instanceof String) {
207: clazz = ClassUtils.loadClass(value.toString(), value
208: .getClass());
209: } else if (value instanceof Class) {
210: clazz = (Class) value;
211: } else {
212: throw new IllegalArgumentException(
213: "Notification types and listeners must be a Class with fully qualified class name. Value is: "
214: + value);
215: }
216: return clazz;
217: }
218:
219: // for tests -------------------------------------------------------
220:
221: Policy getPolicy() {
222: return configuration.getPolicy();
223: }
224:
225: public Map getInterfaceToTypes() {
226: return Collections.unmodifiableMap(configuration
227: .getInterfaceToTypes());
228: }
229:
230: public Collection getListeners() {
231: return Collections.unmodifiableCollection(configuration
232: .getListeners());
233: }
234:
235: }
|