001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.mx.notification;
023:
024: import javax.management.Notification;
025: import javax.management.NotificationListener;
026:
027: import org.jboss.logging.Logger;
028: import org.jboss.mx.util.JBossNotificationBroadcasterSupport;
029: import org.jboss.util.threadpool.BasicThreadPool;
030: import org.jboss.util.threadpool.ThreadPool;
031:
032: /**
033: * A notification broadcaster with asynch notifications
034: *
035: * @author <a href="mailto:Adrian.Brock@HappeningTimes.com">Adrian Brock</a>.
036: * @author Scott.Stark@jboss.org
037: * @version $Revision: 57200 $
038: */
039: public class AsynchNotificationBroadcasterSupport extends
040: JBossNotificationBroadcasterSupport {
041: // Attributes ----------------------------------------------------
042: private static Logger log = Logger.getLogger(AsynchNotifier.class);
043: /** The default pool used in the absence of on instance specific one */
044: private static ThreadPool defaultPool = new BasicThreadPool(
045: "AsynchNotificationBroadcasterSupport");
046: /** The default comp*/
047: private static long defaultNotificationTimeout;
048:
049: /** The instance */
050: private long notificationTimeout;
051: private ThreadPool pool;
052:
053: public static synchronized void setDefaultThreadPool(ThreadPool tp) {
054: defaultPool = tp;
055: }
056:
057: public static long getDefaultNotificationTimeout() {
058: return defaultNotificationTimeout;
059: }
060:
061: public static void setDefaultNotificationTimeout(
062: long defaultNotificationTimeout) {
063: AsynchNotificationBroadcasterSupport.defaultNotificationTimeout = defaultNotificationTimeout;
064: }
065:
066: // Constructor ---------------------------------------------------
067:
068: /**
069: * Construct a new Asyncrhonous broadcaster
070: * Calls this(defaultNotificationTimeout, defaultPool)
071: */
072: public AsynchNotificationBroadcasterSupport() {
073: this (defaultNotificationTimeout, defaultPool);
074: }
075:
076: /**
077: * Construct a new Asyncrhonous broadcaster. Calls
078: * this(notificationTimeout, defaultPool)
079: * @param notificationTimeout the notification completion timeout in MS. A
080: * 0 value means no timeout.
081: */
082: public AsynchNotificationBroadcasterSupport(long notificationTimeout) {
083: this (notificationTimeout, defaultPool);
084: }
085:
086: /**
087: * Construct a new Asyncrhonous broadcaster
088: * @param notificationTimeout - the notification completion timeout in MS. A
089: * 0 value means no timeout.
090: * @param pool - the thread pool to use for the asynchronous notifcations
091: */
092: public AsynchNotificationBroadcasterSupport(
093: long notificationTimeout, ThreadPool pool) {
094: this .notificationTimeout = notificationTimeout;
095: this .pool = pool;
096: }
097:
098: // Public --------------------------------------------------------
099:
100: public long getNotificationTimeout() {
101: return notificationTimeout;
102: }
103:
104: public void setNotificationTimeout(long notificationTimeout) {
105: this .notificationTimeout = notificationTimeout;
106: }
107:
108: public ThreadPool getThreadPool() {
109: return pool;
110: }
111:
112: public void setThreadPool(ThreadPool pool) {
113: this .pool = pool;
114: }
115:
116: // NotificationBroadcasterSupport overrides ----------------------
117:
118: /**
119: * Handle the notification, asynchronously invoke the listener.
120: *
121: * @param listener the listener to notify
122: * @param notification the notification
123: * @param handback the handback object
124: */
125: public void handleNotification(NotificationListener listener,
126: Notification notification, Object handback) {
127: AsynchNotifier notifier = new AsynchNotifier(listener,
128: notification, handback);
129: pool.run(notifier, 0, notificationTimeout);
130: }
131:
132: /** Invoke stop on the thread pool if its not the class default pool.
133: *
134: * @param immeadiate the immeadiate flag passed to the TheadPool#stop
135: */
136: protected void stopThreadPool(boolean immeadiate) {
137: if (pool != defaultPool) {
138: pool.stop(immeadiate);
139: }
140: }
141:
142: // Inner classes -------------------------------------------------
143:
144: public class AsynchNotifier implements Runnable {
145: NotificationListener listener;
146: Notification notification;
147: Object handback;
148:
149: public AsynchNotifier(NotificationListener listener,
150: Notification notification, Object handback) {
151: this .listener = listener;
152: this .notification = notification;
153: this .handback = handback;
154: }
155:
156: public void run() {
157: try {
158: listener.handleNotification(notification, handback);
159: } catch (Throwable throwable) {
160: log.error("Error processing notification="
161: + notification + " listener=" + listener
162: + " handback=" + handback, throwable);
163: }
164: }
165: }
166: }
|