001: /*
002: * Copyright 2003-2006 Sun Microsystems, Inc. All Rights Reserved.
003: * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
004: *
005: * This code is free software; you can redistribute it and/or modify it
006: * under the terms of the GNU General Public License version 2 only, as
007: * published by the Free Software Foundation. Sun designates this
008: * particular file as subject to the "Classpath" exception as provided
009: * by Sun in the LICENSE file that accompanied this code.
010: *
011: * This code is distributed in the hope that it will be useful, but WITHOUT
012: * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
013: * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
014: * version 2 for more details (a copy is included in the LICENSE file that
015: * accompanied this code).
016: *
017: * You should have received a copy of the GNU General Public License version
018: * 2 along with this work; if not, write to the Free Software Foundation,
019: * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
020: *
021: * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
022: * CA 95054 USA or visit www.sun.com if you need additional information or
023: * have any questions.
024: */
025:
026: package com.sun.jmx.remote.internal;
027:
028: import java.security.AccessController;
029: import java.security.PrivilegedAction;
030: import java.security.PrivilegedActionException;
031: import java.security.PrivilegedExceptionAction;
032: import java.util.ArrayList;
033: import java.util.Collection;
034: import java.util.Collections;
035: import java.util.HashSet;
036: import java.util.List;
037: import java.util.Set;
038: import java.util.HashMap;
039: import java.util.Map;
040:
041: import javax.management.InstanceNotFoundException;
042: import javax.management.MBeanServer;
043: import javax.management.MBeanServerDelegate;
044: import javax.management.MBeanServerNotification;
045: import javax.management.Notification;
046: import javax.management.NotificationBroadcaster;
047: import javax.management.NotificationFilter;
048: import javax.management.NotificationFilterSupport;
049: import javax.management.NotificationListener;
050: import javax.management.ObjectName;
051: import javax.management.QueryEval;
052: import javax.management.QueryExp;
053:
054: import javax.management.remote.NotificationResult;
055: import javax.management.remote.TargetedNotification;
056:
057: import com.sun.jmx.remote.util.EnvHelp;
058: import com.sun.jmx.remote.util.ClassLogger;
059:
060: /** A circular buffer of notifications received from an MBean server. */
061: /*
062: There is one instance of ArrayNotificationBuffer for every
063: MBeanServer object that has an attached ConnectorServer. Then, for
064: every ConnectorServer attached to a given MBeanServer, there is an
065: instance of the inner class ShareBuffer. So for example with two
066: ConnectorServers it looks like this:
067:
068: ConnectorServer1 -> ShareBuffer1 -\
069: }-> ArrayNotificationBuffer
070: ConnectorServer2 -> ShareBuffer2 -/ |
071: |
072: v
073: MBeanServer
074:
075: The ArrayNotificationBuffer has a circular buffer of
076: NamedNotification objects. Each ConnectorServer defines a
077: notification buffer size, and this size is recorded by the
078: corresponding ShareBuffer. The buffer size of the
079: ArrayNotificationBuffer is the maximum of all of its ShareBuffers.
080: When a ShareBuffer is added or removed, the ArrayNotificationBuffer
081: size is adjusted accordingly.
082:
083: An ArrayNotificationBuffer also has a BufferListener (which is a
084: NotificationListener) registered on every NotificationBroadcaster
085: MBean in the MBeanServer to which it is attached. The cost of this
086: potentially large set of listeners is the principal motivation for
087: sharing the ArrayNotificationBuffer between ConnectorServers, and
088: also the reason that we are careful to discard the
089: ArrayNotificationBuffer (and its BufferListeners) when there are no
090: longer any ConnectorServers using it.
091:
092: The synchronization of this class is inherently complex. In an attempt
093: to limit the complexity, we use just two locks:
094:
095: - globalLock controls access to the mapping between an MBeanServer
096: and its ArrayNotificationBuffer and to the set of ShareBuffers for
097: each ArrayNotificationBuffer.
098:
099: - the instance lock of each ArrayNotificationBuffer controls access
100: to the array of notifications, including its size, and to the
101: dispose flag of the ArrayNotificationBuffer. The wait/notify
102: mechanism is used to indicate changes to the array.
103:
104: If both locks are held at the same time, the globalLock must be
105: taken first.
106:
107: Since adding or removing a BufferListener to an MBean can involve
108: calling user code, we are careful not to hold any locks while it is
109: done.
110: */
111: public class ArrayNotificationBuffer implements NotificationBuffer {
112: private boolean disposed = false;
113:
114: // FACTORY STUFF, INCLUDING SHARING
115:
116: private static final Object globalLock = new Object();
117: private static final HashMap<MBeanServer, ArrayNotificationBuffer> mbsToBuffer = new HashMap<MBeanServer, ArrayNotificationBuffer>(
118: 1);
119: private final Collection<ShareBuffer> sharers = new HashSet<ShareBuffer>(
120: 1);
121:
122: public static NotificationBuffer getNotificationBuffer(
123: MBeanServer mbs, Map env) {
124:
125: if (env == null)
126: env = Collections.emptyMap();
127:
128: //Find out queue size
129: int queueSize = EnvHelp.getNotifBufferSize(env);
130:
131: ArrayNotificationBuffer buf;
132: boolean create;
133: NotificationBuffer sharer;
134: synchronized (globalLock) {
135: buf = mbsToBuffer.get(mbs);
136: create = (buf == null);
137: if (create) {
138: buf = new ArrayNotificationBuffer(mbs, queueSize);
139: mbsToBuffer.put(mbs, buf);
140: }
141: sharer = buf.new ShareBuffer(queueSize);
142: }
143: /* We avoid holding any locks while calling createListeners.
144: * This prevents possible deadlocks involving user code, but
145: * does mean that a second ConnectorServer created and started
146: * in this window will return before all the listeners are ready,
147: * which could lead to surprising behaviour. The alternative
148: * would be to block the second ConnectorServer until the first
149: * one has finished adding all the listeners, but that would then
150: * be subject to deadlock.
151: */
152: if (create)
153: buf.createListeners();
154: return sharer;
155: }
156:
157: /* Ensure that this buffer is no longer the one that will be returned by
158: * getNotificationBuffer. This method is idempotent - calling it more
159: * than once has no effect beyond that of calling it once.
160: */
161: static void removeNotificationBuffer(MBeanServer mbs) {
162: synchronized (globalLock) {
163: mbsToBuffer.remove(mbs);
164: }
165: }
166:
167: void addSharer(ShareBuffer sharer) {
168: synchronized (globalLock) {
169: synchronized (this ) {
170: if (sharer.getSize() > queueSize)
171: resize(sharer.getSize());
172: }
173: sharers.add(sharer);
174: }
175: }
176:
177: private void removeSharer(ShareBuffer sharer) {
178: boolean empty;
179: synchronized (globalLock) {
180: sharers.remove(sharer);
181: empty = sharers.isEmpty();
182: if (empty)
183: removeNotificationBuffer(mBeanServer);
184: else {
185: int max = 0;
186: for (ShareBuffer buf : sharers) {
187: int bufsize = buf.getSize();
188: if (bufsize > max)
189: max = bufsize;
190: }
191: if (max < queueSize)
192: resize(max);
193: }
194: }
195: if (empty) {
196: synchronized (this ) {
197: disposed = true;
198: // Notify potential waiting fetchNotification call
199: notifyAll();
200: }
201: destroyListeners();
202: }
203: }
204:
205: private synchronized void resize(int newSize) {
206: if (newSize == queueSize)
207: return;
208: while (queue.size() > newSize)
209: dropNotification();
210: queue.resize(newSize);
211: queueSize = newSize;
212: }
213:
214: private class ShareBuffer implements NotificationBuffer {
215: ShareBuffer(int size) {
216: this .size = size;
217: addSharer(this );
218: }
219:
220: public NotificationResult fetchNotifications(
221: NotificationBufferFilter filter,
222: long startSequenceNumber, long timeout,
223: int maxNotifications) throws InterruptedException {
224: NotificationBuffer buf = ArrayNotificationBuffer.this ;
225: return buf.fetchNotifications(filter, startSequenceNumber,
226: timeout, maxNotifications);
227: }
228:
229: public void dispose() {
230: ArrayNotificationBuffer.this .removeSharer(this );
231: }
232:
233: int getSize() {
234: return size;
235: }
236:
237: private final int size;
238: }
239:
240: // ARRAYNOTIFICATIONBUFFER IMPLEMENTATION
241:
242: private ArrayNotificationBuffer(MBeanServer mbs, int queueSize) {
243: if (logger.traceOn())
244: logger.trace("Constructor", "queueSize=" + queueSize);
245:
246: if (mbs == null || queueSize < 1)
247: throw new IllegalArgumentException("Bad args");
248:
249: this .mBeanServer = mbs;
250: this .queueSize = queueSize;
251: this .queue = new ArrayQueue<NamedNotification>(queueSize);
252: this .earliestSequenceNumber = System.currentTimeMillis();
253: this .nextSequenceNumber = this .earliestSequenceNumber;
254:
255: logger.trace("Constructor", "ends");
256: }
257:
258: private synchronized boolean isDisposed() {
259: return disposed;
260: }
261:
262: // We no longer support calling this method from outside.
263: // The JDK doesn't contain any such calls and users are not
264: // supposed to be accessing this class.
265: public void dispose() {
266: throw new UnsupportedOperationException();
267: }
268:
269: /**
270: * <p>Fetch notifications that match the given listeners.</p>
271: *
272: * <p>The operation only considers notifications with a sequence
273: * number at least <code>startSequenceNumber</code>. It will take
274: * no longer than <code>timeout</code>, and will return no more
275: * than <code>maxNotifications</code> different notifications.</p>
276: *
277: * <p>If there are no notifications matching the criteria, the
278: * operation will block until one arrives, subject to the
279: * timeout.</p>
280: *
281: * @param filter an object that will add notifications to a
282: * {@code List<TargetedNotification>} if they match the current
283: * listeners with their filters.
284: * @param startSequenceNumber the first sequence number to
285: * consider.
286: * @param timeout the maximum time to wait. May be 0 to indicate
287: * not to wait if there are no notifications.
288: * @param maxNotifications the maximum number of notifications to
289: * return. May be 0 to indicate a wait for eligible notifications
290: * that will return a usable <code>nextSequenceNumber</code>. The
291: * {@link TargetedNotification} array in the returned {@link
292: * NotificationResult} may contain more than this number of
293: * elements but will not contain more than this number of
294: * different notifications.
295: */
296: public NotificationResult fetchNotifications(
297: NotificationBufferFilter filter, long startSequenceNumber,
298: long timeout, int maxNotifications)
299: throws InterruptedException {
300:
301: logger.trace("fetchNotifications", "starts");
302:
303: if (startSequenceNumber < 0 || isDisposed()) {
304: synchronized (this ) {
305: return new NotificationResult(earliestSequenceNumber(),
306: nextSequenceNumber(),
307: new TargetedNotification[0]);
308: }
309: }
310:
311: // Check arg validity
312: if (filter == null || startSequenceNumber < 0 || timeout < 0
313: || maxNotifications < 0) {
314: logger.trace("fetchNotifications", "Bad args");
315: throw new IllegalArgumentException("Bad args to fetch");
316: }
317:
318: if (logger.debugOn()) {
319: logger.trace("fetchNotifications", "filter=" + filter
320: + "; startSeq=" + startSequenceNumber
321: + "; timeout=" + timeout + "; max="
322: + maxNotifications);
323: }
324:
325: if (startSequenceNumber > nextSequenceNumber()) {
326: final String msg = "Start sequence number too big: "
327: + startSequenceNumber + " > "
328: + nextSequenceNumber();
329: logger.trace("fetchNotifications", msg);
330: throw new IllegalArgumentException(msg);
331: }
332:
333: /* Determine the end time corresponding to the timeout value.
334: Caller may legitimately supply Long.MAX_VALUE to indicate no
335: timeout. In that case the addition will overflow and produce
336: a negative end time. Set end time to Long.MAX_VALUE in that
337: case. We assume System.currentTimeMillis() is positive. */
338: long endTime = System.currentTimeMillis() + timeout;
339: if (endTime < 0) // overflow
340: endTime = Long.MAX_VALUE;
341:
342: if (logger.debugOn())
343: logger.debug("fetchNotifications", "endTime=" + endTime);
344:
345: /* We set earliestSeq the first time through the loop. If we
346: set it here, notifications could be dropped before we
347: started examining them, so earliestSeq might not correspond
348: to the earliest notification we examined. */
349: long earliestSeq = -1;
350: long nextSeq = startSequenceNumber;
351: List<TargetedNotification> notifs = new ArrayList<TargetedNotification>();
352:
353: /* On exit from this loop, notifs, earliestSeq, and nextSeq must
354: all be correct values for the returned NotificationResult. */
355: while (true) {
356: logger.debug("fetchNotifications", "main loop starts");
357:
358: NamedNotification candidate;
359:
360: /* Get the next available notification regardless of filters,
361: or wait for one to arrive if there is none. */
362: synchronized (this ) {
363:
364: /* First time through. The current earliestSequenceNumber
365: is the first one we could have examined. */
366: if (earliestSeq < 0) {
367: earliestSeq = earliestSequenceNumber();
368: if (logger.debugOn()) {
369: logger.debug("fetchNotifications",
370: "earliestSeq=" + earliestSeq);
371: }
372: if (nextSeq < earliestSeq) {
373: nextSeq = earliestSeq;
374: logger.debug("fetchNotifications",
375: "nextSeq=earliestSeq");
376: }
377: } else
378: earliestSeq = earliestSequenceNumber();
379:
380: /* If many notifications have been dropped since the
381: last time through, nextSeq could now be earlier
382: than the current earliest. If so, notifications
383: may have been lost and we return now so the caller
384: can see this next time it calls. */
385: if (nextSeq < earliestSeq) {
386: logger.trace("fetchNotifications", "nextSeq="
387: + nextSeq + " < " + "earliestSeq="
388: + earliestSeq + " so may have lost notifs");
389: break;
390: }
391:
392: if (nextSeq < nextSequenceNumber()) {
393: candidate = notificationAt(nextSeq);
394: if (logger.debugOn()) {
395: logger.debug("fetchNotifications",
396: "candidate: " + candidate);
397: logger.debug("fetchNotifications",
398: "nextSeq now " + nextSeq);
399: }
400: } else {
401: /* nextSeq is the largest sequence number. If we
402: already got notifications, return them now.
403: Otherwise wait for some to arrive, with
404: timeout. */
405: if (notifs.size() > 0) {
406: logger
407: .debug("fetchNotifications",
408: "no more notifs but have some so don't wait");
409: break;
410: }
411: long toWait = endTime - System.currentTimeMillis();
412: if (toWait <= 0) {
413: logger.debug("fetchNotifications", "timeout");
414: break;
415: }
416:
417: /* dispose called */
418: if (isDisposed()) {
419: if (logger.debugOn())
420: logger.debug("fetchNotifications",
421: "dispose callled, no wait");
422: return new NotificationResult(
423: earliestSequenceNumber(),
424: nextSequenceNumber(),
425: new TargetedNotification[0]);
426: }
427:
428: if (logger.debugOn())
429: logger.debug("fetchNotifications", "wait("
430: + toWait + ")");
431: wait(toWait);
432:
433: continue;
434: }
435: }
436:
437: /* We have a candidate notification. See if it matches
438: our filters. We do this outside the synchronized block
439: so we don't hold up everyone accessing the buffer
440: (including notification senders) while we evaluate
441: potentially slow filters. */
442: ObjectName name = candidate.getObjectName();
443: Notification notif = candidate.getNotification();
444: List<TargetedNotification> matchedNotifs = new ArrayList<TargetedNotification>();
445: logger.debug("fetchNotifications",
446: "applying filter to candidate");
447: filter.apply(matchedNotifs, name, notif);
448:
449: if (matchedNotifs.size() > 0) {
450: /* We only check the max size now, so that our
451: returned nextSeq is as large as possible. This
452: prevents the caller from thinking it missed
453: interesting notifications when in fact we knew they
454: weren't. */
455: if (maxNotifications <= 0) {
456: logger.debug("fetchNotifications",
457: "reached maxNotifications");
458: break;
459: }
460: --maxNotifications;
461: if (logger.debugOn())
462: logger.debug("fetchNotifications", "add: "
463: + matchedNotifs);
464: notifs.addAll(matchedNotifs);
465: }
466:
467: ++nextSeq;
468: } // end while
469:
470: /* Construct and return the result. */
471: int nnotifs = notifs.size();
472: TargetedNotification[] resultNotifs = new TargetedNotification[nnotifs];
473: notifs.toArray(resultNotifs);
474: NotificationResult nr = new NotificationResult(earliestSeq,
475: nextSeq, resultNotifs);
476: if (logger.debugOn())
477: logger.debug("fetchNotifications", nr.toString());
478: logger.trace("fetchNotifications", "ends");
479:
480: return nr;
481: }
482:
483: synchronized long earliestSequenceNumber() {
484: return earliestSequenceNumber;
485: }
486:
487: synchronized long nextSequenceNumber() {
488: return nextSequenceNumber;
489: }
490:
491: synchronized void addNotification(NamedNotification notif) {
492: if (logger.traceOn())
493: logger.trace("addNotification", notif.toString());
494:
495: while (queue.size() >= queueSize) {
496: dropNotification();
497: if (logger.debugOn()) {
498: logger.debug("addNotification",
499: "dropped oldest notif, earliestSeq="
500: + earliestSequenceNumber);
501: }
502: }
503: queue.add(notif);
504: nextSequenceNumber++;
505: if (logger.debugOn())
506: logger.debug("addNotification", "nextSeq="
507: + nextSequenceNumber);
508: notifyAll();
509: }
510:
511: private void dropNotification() {
512: queue.remove(0);
513: earliestSequenceNumber++;
514: }
515:
516: synchronized NamedNotification notificationAt(long seqNo) {
517: long index = seqNo - earliestSequenceNumber;
518: if (index < 0 || index > Integer.MAX_VALUE) {
519: final String msg = "Bad sequence number: " + seqNo
520: + " (earliest " + earliestSequenceNumber + ")";
521: logger.trace("notificationAt", msg);
522: throw new IllegalArgumentException(msg);
523: }
524: return queue.get((int) index);
525: }
526:
527: private static class NamedNotification {
528: NamedNotification(ObjectName sender, Notification notif) {
529: this .sender = sender;
530: this .notification = notif;
531: }
532:
533: ObjectName getObjectName() {
534: return sender;
535: }
536:
537: Notification getNotification() {
538: return notification;
539: }
540:
541: public String toString() {
542: return "NamedNotification(" + sender + ", " + notification
543: + ")";
544: }
545:
546: private final ObjectName sender;
547: private final Notification notification;
548: }
549:
550: /*
551: * Add our listener to every NotificationBroadcaster MBean
552: * currently in the MBean server and to every
553: * NotificationBroadcaster later created.
554: *
555: * It would be really nice if we could just do
556: * mbs.addNotificationListener(new ObjectName("*:*"), ...);
557: * Definitely something for the next version of JMX.
558: *
559: * There is a nasty race condition that we must handle. We
560: * first register for MBean-creation notifications so we can add
561: * listeners to new MBeans, then we query the existing MBeans to
562: * add listeners to them. The problem is that a new MBean could
563: * arrive after we register for creations but before the query has
564: * completed. Then we could see the MBean both in the query and
565: * in an MBean-creation notification, and we would end up
566: * registering our listener twice.
567: *
568: * To solve this problem, we arrange for new MBeans that arrive
569: * while the query is being done to be added to the Set createdDuringQuery
570: * and we do not add a listener immediately. When the query is done,
571: * we atomically turn off the addition of new names to createdDuringQuery
572: * and add all the names that were there to the result of the query.
573: * Since we are dealing with Sets, the result is the same whether or not
574: * the newly-created MBean was included in the query result.
575: *
576: * It is important not to hold any locks during the operation of adding
577: * listeners to MBeans. An MBean's addNotificationListener can be
578: * arbitrary user code, and this could deadlock with any locks we hold
579: * (see bug 6239400). The corollary is that we must not do any operations
580: * in this method or the methods it calls that require locks.
581: */
582: private void createListeners() {
583: logger.debug("createListeners", "starts");
584:
585: synchronized (this ) {
586: createdDuringQuery = new HashSet<ObjectName>();
587: }
588:
589: try {
590: addNotificationListener(MBeanServerDelegate.DELEGATE_NAME,
591: creationListener, creationFilter, null);
592: logger.debug("createListeners", "added creationListener");
593: } catch (Exception e) {
594: final String msg = "Can't add listener to MBean server delegate: ";
595: RuntimeException re = new IllegalArgumentException(msg + e);
596: EnvHelp.initCause(re, e);
597: logger.fine("createListeners", msg + e);
598: logger.debug("createListeners", e);
599: throw re;
600: }
601:
602: /* Spec doesn't say whether Set returned by QueryNames can be modified
603: so we clone it. */
604: Set<ObjectName> names = queryNames(null, broadcasterQuery);
605: names = new HashSet<ObjectName>(names);
606:
607: synchronized (this ) {
608: names.addAll(createdDuringQuery);
609: createdDuringQuery = null;
610: }
611:
612: for (ObjectName name : names)
613: addBufferListener(name);
614: logger.debug("createListeners", "ends");
615: }
616:
617: private void addBufferListener(ObjectName name) {
618: checkNoLocks();
619: if (logger.debugOn())
620: logger.debug("addBufferListener", name.toString());
621: try {
622: addNotificationListener(name, bufferListener, null, name);
623: } catch (Exception e) {
624: logger.trace("addBufferListener", e);
625: /* This can happen if the MBean was unregistered just
626: after the query. Or user NotificationBroadcaster might
627: throw unexpected exception. */
628: }
629: }
630:
631: private void removeBufferListener(ObjectName name) {
632: checkNoLocks();
633: if (logger.debugOn())
634: logger.debug("removeBufferListener", name.toString());
635: try {
636: removeNotificationListener(name, bufferListener);
637: } catch (Exception e) {
638: logger.trace("removeBufferListener", e);
639: }
640: }
641:
642: private void addNotificationListener(final ObjectName name,
643: final NotificationListener listener,
644: final NotificationFilter filter, final Object handback)
645: throws Exception {
646: try {
647: AccessController
648: .doPrivileged(new PrivilegedExceptionAction<Void>() {
649: public Void run()
650: throws InstanceNotFoundException {
651: mBeanServer.addNotificationListener(name,
652: listener, filter, handback);
653: return null;
654: }
655: });
656: } catch (Exception e) {
657: throw extractException(e);
658: }
659: }
660:
661: private void removeNotificationListener(final ObjectName name,
662: final NotificationListener listener) throws Exception {
663: try {
664: AccessController
665: .doPrivileged(new PrivilegedExceptionAction<Void>() {
666: public Void run() throws Exception {
667: mBeanServer.removeNotificationListener(
668: name, listener);
669: return null;
670: }
671: });
672: } catch (Exception e) {
673: throw extractException(e);
674: }
675: }
676:
677: private Set<ObjectName> queryNames(final ObjectName name,
678: final QueryExp query) {
679: PrivilegedAction<Set<ObjectName>> act = new PrivilegedAction<Set<ObjectName>>() {
680: public Set<ObjectName> run() {
681: return mBeanServer.queryNames(name, query);
682: }
683: };
684: try {
685: return AccessController.doPrivileged(act);
686: } catch (RuntimeException e) {
687: logger.fine("queryNames", "Failed to query names: " + e);
688: logger.debug("queryNames", e);
689: throw e;
690: }
691: }
692:
693: private static boolean isInstanceOf(final MBeanServer mbs,
694: final ObjectName name, final String className) {
695: PrivilegedExceptionAction<Boolean> act = new PrivilegedExceptionAction<Boolean>() {
696: public Boolean run() throws InstanceNotFoundException {
697: return mbs.isInstanceOf(name, className);
698: }
699: };
700: try {
701: return AccessController.doPrivileged(act);
702: } catch (Exception e) {
703: logger.fine("isInstanceOf", "failed: " + e);
704: logger.debug("isInstanceOf", e);
705: return false;
706: }
707: }
708:
709: /* This method must not be synchronized. See the comment on the
710: * createListeners method.
711: *
712: * The notification could arrive after our buffer has been destroyed
713: * or even during its destruction. So we always add our listener
714: * (without synchronization), then we check if the buffer has been
715: * destroyed and if so remove the listener we just added.
716: */
717: private void createdNotification(MBeanServerNotification n) {
718: final String shouldEqual = MBeanServerNotification.REGISTRATION_NOTIFICATION;
719: if (!n.getType().equals(shouldEqual)) {
720: logger.warning("createNotification", "bad type: "
721: + n.getType());
722: return;
723: }
724:
725: ObjectName name = n.getMBeanName();
726: if (logger.debugOn())
727: logger.debug("createdNotification", "for: " + name);
728:
729: synchronized (this ) {
730: if (createdDuringQuery != null) {
731: createdDuringQuery.add(name);
732: return;
733: }
734: }
735:
736: if (isInstanceOf(mBeanServer, name, broadcasterClass)) {
737: addBufferListener(name);
738: if (isDisposed())
739: removeBufferListener(name);
740: }
741: }
742:
743: private class BufferListener implements NotificationListener {
744: public void handleNotification(Notification notif,
745: Object handback) {
746: if (logger.debugOn()) {
747: logger.debug("BufferListener.handleNotification",
748: "notif=" + notif + "; handback=" + handback);
749: }
750: ObjectName name = (ObjectName) handback;
751: addNotification(new NamedNotification(name, notif));
752: }
753: }
754:
755: private final NotificationListener bufferListener = new BufferListener();
756:
757: private static class BroadcasterQuery extends QueryEval implements
758: QueryExp {
759: private static final long serialVersionUID = 7378487660587592048L;
760:
761: public boolean apply(final ObjectName name) {
762: final MBeanServer mbs = QueryEval.getMBeanServer();
763: return isInstanceOf(mbs, name, broadcasterClass);
764: }
765: }
766:
767: private static final QueryExp broadcasterQuery = new BroadcasterQuery();
768:
769: private static final NotificationFilter creationFilter;
770: static {
771: NotificationFilterSupport nfs = new NotificationFilterSupport();
772: nfs
773: .enableType(MBeanServerNotification.REGISTRATION_NOTIFICATION);
774: creationFilter = nfs;
775: }
776:
777: private final NotificationListener creationListener = new NotificationListener() {
778: public void handleNotification(Notification notif,
779: Object handback) {
780: logger.debug("creationListener",
781: "handleNotification called");
782: createdNotification((MBeanServerNotification) notif);
783: }
784: };
785:
786: private void destroyListeners() {
787: checkNoLocks();
788: logger.debug("destroyListeners", "starts");
789: try {
790: removeNotificationListener(
791: MBeanServerDelegate.DELEGATE_NAME, creationListener);
792: } catch (Exception e) {
793: logger.warning("remove listener from MBeanServer delegate",
794: e);
795: }
796: Set<ObjectName> names = queryNames(null, broadcasterQuery);
797: for (final ObjectName name : names) {
798: if (logger.debugOn())
799: logger.debug("destroyListeners",
800: "remove listener from " + name);
801: removeBufferListener(name);
802: }
803: logger.debug("destroyListeners", "ends");
804: }
805:
806: private void checkNoLocks() {
807: if (Thread.holdsLock(this ) || Thread.holdsLock(globalLock))
808: logger.warning("checkNoLocks", "lock protocol violation");
809: }
810:
811: /**
812: * Iterate until we extract the real exception
813: * from a stack of PrivilegedActionExceptions.
814: */
815: private static Exception extractException(Exception e) {
816: while (e instanceof PrivilegedActionException) {
817: e = ((PrivilegedActionException) e).getException();
818: }
819: return e;
820: }
821:
822: private static final ClassLogger logger = new ClassLogger(
823: "javax.management.remote.misc", "ArrayNotificationBuffer");
824:
825: private final MBeanServer mBeanServer;
826: private final ArrayQueue<NamedNotification> queue;
827: private int queueSize;
828: private long earliestSequenceNumber;
829: private long nextSequenceNumber;
830: private Set<ObjectName> createdDuringQuery;
831:
832: static final String broadcasterClass = NotificationBroadcaster.class
833: .getName();
834: }
|