Source Code Cross Referenced for ArrayNotificationBuffer.java in  » 6.0-JDK-Modules-com.sun » jmx » com » sun » jmx » remote » internal » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » 6.0 JDK Modules com.sun » jmx » com.sun.jmx.remote.internal 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        /*
002:         * Copyright 2003-2006 Sun Microsystems, Inc.  All Rights Reserved.
003:         * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
004:         *
005:         * This code is free software; you can redistribute it and/or modify it
006:         * under the terms of the GNU General Public License version 2 only, as
007:         * published by the Free Software Foundation.  Sun designates this
008:         * particular file as subject to the "Classpath" exception as provided
009:         * by Sun in the LICENSE file that accompanied this code.
010:         *
011:         * This code is distributed in the hope that it will be useful, but WITHOUT
012:         * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
013:         * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
014:         * version 2 for more details (a copy is included in the LICENSE file that
015:         * accompanied this code).
016:         *
017:         * You should have received a copy of the GNU General Public License version
018:         * 2 along with this work; if not, write to the Free Software Foundation,
019:         * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
020:         *
021:         * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
022:         * CA 95054 USA or visit www.sun.com if you need additional information or
023:         * have any questions.
024:         */
025:
026:        package com.sun.jmx.remote.internal;
027:
028:        import java.security.AccessController;
029:        import java.security.PrivilegedAction;
030:        import java.security.PrivilegedActionException;
031:        import java.security.PrivilegedExceptionAction;
032:        import java.util.ArrayList;
033:        import java.util.Collection;
034:        import java.util.Collections;
035:        import java.util.HashSet;
036:        import java.util.List;
037:        import java.util.Set;
038:        import java.util.HashMap;
039:        import java.util.Map;
040:
041:        import javax.management.InstanceNotFoundException;
042:        import javax.management.MBeanServer;
043:        import javax.management.MBeanServerDelegate;
044:        import javax.management.MBeanServerNotification;
045:        import javax.management.Notification;
046:        import javax.management.NotificationBroadcaster;
047:        import javax.management.NotificationFilter;
048:        import javax.management.NotificationFilterSupport;
049:        import javax.management.NotificationListener;
050:        import javax.management.ObjectName;
051:        import javax.management.QueryEval;
052:        import javax.management.QueryExp;
053:
054:        import javax.management.remote.NotificationResult;
055:        import javax.management.remote.TargetedNotification;
056:
057:        import com.sun.jmx.remote.util.EnvHelp;
058:        import com.sun.jmx.remote.util.ClassLogger;
059:
060:        /** A circular buffer of notifications received from an MBean server. */
061:        /*
062:         There is one instance of ArrayNotificationBuffer for every
063:         MBeanServer object that has an attached ConnectorServer.  Then, for
064:         every ConnectorServer attached to a given MBeanServer, there is an
065:         instance of the inner class ShareBuffer.  So for example with two
066:         ConnectorServers it looks like this:
067:
068:         ConnectorServer1 -> ShareBuffer1 -\
069:         }-> ArrayNotificationBuffer
070:         ConnectorServer2 -> ShareBuffer2 -/              |
071:         |
072:         v
073:         MBeanServer
074:
075:         The ArrayNotificationBuffer has a circular buffer of
076:         NamedNotification objects.  Each ConnectorServer defines a
077:         notification buffer size, and this size is recorded by the
078:         corresponding ShareBuffer.  The buffer size of the
079:         ArrayNotificationBuffer is the maximum of all of its ShareBuffers.
080:         When a ShareBuffer is added or removed, the ArrayNotificationBuffer
081:         size is adjusted accordingly.
082:
083:         An ArrayNotificationBuffer also has a BufferListener (which is a
084:         NotificationListener) registered on every NotificationBroadcaster
085:         MBean in the MBeanServer to which it is attached.  The cost of this
086:         potentially large set of listeners is the principal motivation for
087:         sharing the ArrayNotificationBuffer between ConnectorServers, and
088:         also the reason that we are careful to discard the
089:         ArrayNotificationBuffer (and its BufferListeners) when there are no
090:         longer any ConnectorServers using it.
091:
092:         The synchronization of this class is inherently complex.  In an attempt
093:         to limit the complexity, we use just two locks:
094:
095:         - globalLock controls access to the mapping between an MBeanServer
096:         and its ArrayNotificationBuffer and to the set of ShareBuffers for
097:         each ArrayNotificationBuffer.
098:        
099:         - the instance lock of each ArrayNotificationBuffer controls access
100:         to the array of notifications, including its size, and to the
101:         dispose flag of the ArrayNotificationBuffer.  The wait/notify
102:         mechanism is used to indicate changes to the array.
103:
104:         If both locks are held at the same time, the globalLock must be
105:         taken first.
106:
107:         Since adding or removing a BufferListener to an MBean can involve
108:         calling user code, we are careful not to hold any locks while it is
109:         done.
110:         */
111:        public class ArrayNotificationBuffer implements  NotificationBuffer {
112:            private boolean disposed = false;
113:
114:            // FACTORY STUFF, INCLUDING SHARING
115:
116:            private static final Object globalLock = new Object();
117:            private static final HashMap<MBeanServer, ArrayNotificationBuffer> mbsToBuffer = new HashMap<MBeanServer, ArrayNotificationBuffer>(
118:                    1);
119:            private final Collection<ShareBuffer> sharers = new HashSet<ShareBuffer>(
120:                    1);
121:
122:            public static NotificationBuffer getNotificationBuffer(
123:                    MBeanServer mbs, Map env) {
124:
125:                if (env == null)
126:                    env = Collections.emptyMap();
127:
128:                //Find out queue size
129:                int queueSize = EnvHelp.getNotifBufferSize(env);
130:
131:                ArrayNotificationBuffer buf;
132:                boolean create;
133:                NotificationBuffer sharer;
134:                synchronized (globalLock) {
135:                    buf = mbsToBuffer.get(mbs);
136:                    create = (buf == null);
137:                    if (create) {
138:                        buf = new ArrayNotificationBuffer(mbs, queueSize);
139:                        mbsToBuffer.put(mbs, buf);
140:                    }
141:                    sharer = buf.new ShareBuffer(queueSize);
142:                }
143:                /* We avoid holding any locks while calling createListeners.
144:                 * This prevents possible deadlocks involving user code, but
145:                 * does mean that a second ConnectorServer created and started
146:                 * in this window will return before all the listeners are ready,
147:                 * which could lead to surprising behaviour.  The alternative
148:                 * would be to block the second ConnectorServer until the first
149:                 * one has finished adding all the listeners, but that would then
150:                 * be subject to deadlock.
151:                 */
152:                if (create)
153:                    buf.createListeners();
154:                return sharer;
155:            }
156:
157:            /* Ensure that this buffer is no longer the one that will be returned by
158:             * getNotificationBuffer.  This method is idempotent - calling it more
159:             * than once has no effect beyond that of calling it once.
160:             */
161:            static void removeNotificationBuffer(MBeanServer mbs) {
162:                synchronized (globalLock) {
163:                    mbsToBuffer.remove(mbs);
164:                }
165:            }
166:
167:            void addSharer(ShareBuffer sharer) {
168:                synchronized (globalLock) {
169:                    synchronized (this ) {
170:                        if (sharer.getSize() > queueSize)
171:                            resize(sharer.getSize());
172:                    }
173:                    sharers.add(sharer);
174:                }
175:            }
176:
177:            private void removeSharer(ShareBuffer sharer) {
178:                boolean empty;
179:                synchronized (globalLock) {
180:                    sharers.remove(sharer);
181:                    empty = sharers.isEmpty();
182:                    if (empty)
183:                        removeNotificationBuffer(mBeanServer);
184:                    else {
185:                        int max = 0;
186:                        for (ShareBuffer buf : sharers) {
187:                            int bufsize = buf.getSize();
188:                            if (bufsize > max)
189:                                max = bufsize;
190:                        }
191:                        if (max < queueSize)
192:                            resize(max);
193:                    }
194:                }
195:                if (empty) {
196:                    synchronized (this ) {
197:                        disposed = true;
198:                        // Notify potential waiting fetchNotification call
199:                        notifyAll();
200:                    }
201:                    destroyListeners();
202:                }
203:            }
204:
205:            private synchronized void resize(int newSize) {
206:                if (newSize == queueSize)
207:                    return;
208:                while (queue.size() > newSize)
209:                    dropNotification();
210:                queue.resize(newSize);
211:                queueSize = newSize;
212:            }
213:
214:            private class ShareBuffer implements  NotificationBuffer {
215:                ShareBuffer(int size) {
216:                    this .size = size;
217:                    addSharer(this );
218:                }
219:
220:                public NotificationResult fetchNotifications(
221:                        NotificationBufferFilter filter,
222:                        long startSequenceNumber, long timeout,
223:                        int maxNotifications) throws InterruptedException {
224:                    NotificationBuffer buf = ArrayNotificationBuffer.this ;
225:                    return buf.fetchNotifications(filter, startSequenceNumber,
226:                            timeout, maxNotifications);
227:                }
228:
229:                public void dispose() {
230:                    ArrayNotificationBuffer.this .removeSharer(this );
231:                }
232:
233:                int getSize() {
234:                    return size;
235:                }
236:
237:                private final int size;
238:            }
239:
240:            // ARRAYNOTIFICATIONBUFFER IMPLEMENTATION
241:
242:            private ArrayNotificationBuffer(MBeanServer mbs, int queueSize) {
243:                if (logger.traceOn())
244:                    logger.trace("Constructor", "queueSize=" + queueSize);
245:
246:                if (mbs == null || queueSize < 1)
247:                    throw new IllegalArgumentException("Bad args");
248:
249:                this .mBeanServer = mbs;
250:                this .queueSize = queueSize;
251:                this .queue = new ArrayQueue<NamedNotification>(queueSize);
252:                this .earliestSequenceNumber = System.currentTimeMillis();
253:                this .nextSequenceNumber = this .earliestSequenceNumber;
254:
255:                logger.trace("Constructor", "ends");
256:            }
257:
258:            private synchronized boolean isDisposed() {
259:                return disposed;
260:            }
261:
262:            // We no longer support calling this method from outside.
263:            // The JDK doesn't contain any such calls and users are not
264:            // supposed to be accessing this class.
265:            public void dispose() {
266:                throw new UnsupportedOperationException();
267:            }
268:
269:            /**
270:             * <p>Fetch notifications that match the given listeners.</p>
271:             *
272:             * <p>The operation only considers notifications with a sequence
273:             * number at least <code>startSequenceNumber</code>.  It will take
274:             * no longer than <code>timeout</code>, and will return no more
275:             * than <code>maxNotifications</code> different notifications.</p>
276:             *
277:             * <p>If there are no notifications matching the criteria, the
278:             * operation will block until one arrives, subject to the
279:             * timeout.</p>
280:             *
281:             * @param filter an object that will add notifications to a
282:             * {@code List<TargetedNotification>} if they match the current
283:             * listeners with their filters.
284:             * @param startSequenceNumber the first sequence number to
285:             * consider.
286:             * @param timeout the maximum time to wait.  May be 0 to indicate
287:             * not to wait if there are no notifications.
288:             * @param maxNotifications the maximum number of notifications to
289:             * return.  May be 0 to indicate a wait for eligible notifications
290:             * that will return a usable <code>nextSequenceNumber</code>.  The
291:             * {@link TargetedNotification} array in the returned {@link
292:             * NotificationResult} may contain more than this number of
293:             * elements but will not contain more than this number of
294:             * different notifications.
295:             */
296:            public NotificationResult fetchNotifications(
297:                    NotificationBufferFilter filter, long startSequenceNumber,
298:                    long timeout, int maxNotifications)
299:                    throws InterruptedException {
300:
301:                logger.trace("fetchNotifications", "starts");
302:
303:                if (startSequenceNumber < 0 || isDisposed()) {
304:                    synchronized (this ) {
305:                        return new NotificationResult(earliestSequenceNumber(),
306:                                nextSequenceNumber(),
307:                                new TargetedNotification[0]);
308:                    }
309:                }
310:
311:                // Check arg validity
312:                if (filter == null || startSequenceNumber < 0 || timeout < 0
313:                        || maxNotifications < 0) {
314:                    logger.trace("fetchNotifications", "Bad args");
315:                    throw new IllegalArgumentException("Bad args to fetch");
316:                }
317:
318:                if (logger.debugOn()) {
319:                    logger.trace("fetchNotifications", "filter=" + filter
320:                            + "; startSeq=" + startSequenceNumber
321:                            + "; timeout=" + timeout + "; max="
322:                            + maxNotifications);
323:                }
324:
325:                if (startSequenceNumber > nextSequenceNumber()) {
326:                    final String msg = "Start sequence number too big: "
327:                            + startSequenceNumber + " > "
328:                            + nextSequenceNumber();
329:                    logger.trace("fetchNotifications", msg);
330:                    throw new IllegalArgumentException(msg);
331:                }
332:
333:                /* Determine the end time corresponding to the timeout value.
334:                   Caller may legitimately supply Long.MAX_VALUE to indicate no
335:                   timeout.  In that case the addition will overflow and produce
336:                   a negative end time.  Set end time to Long.MAX_VALUE in that
337:                   case.  We assume System.currentTimeMillis() is positive.  */
338:                long endTime = System.currentTimeMillis() + timeout;
339:                if (endTime < 0) // overflow
340:                    endTime = Long.MAX_VALUE;
341:
342:                if (logger.debugOn())
343:                    logger.debug("fetchNotifications", "endTime=" + endTime);
344:
345:                /* We set earliestSeq the first time through the loop.  If we
346:                   set it here, notifications could be dropped before we
347:                   started examining them, so earliestSeq might not correspond
348:                   to the earliest notification we examined.  */
349:                long earliestSeq = -1;
350:                long nextSeq = startSequenceNumber;
351:                List<TargetedNotification> notifs = new ArrayList<TargetedNotification>();
352:
353:                /* On exit from this loop, notifs, earliestSeq, and nextSeq must
354:                   all be correct values for the returned NotificationResult.  */
355:                while (true) {
356:                    logger.debug("fetchNotifications", "main loop starts");
357:
358:                    NamedNotification candidate;
359:
360:                    /* Get the next available notification regardless of filters,
361:                       or wait for one to arrive if there is none.  */
362:                    synchronized (this ) {
363:
364:                        /* First time through.  The current earliestSequenceNumber
365:                           is the first one we could have examined.  */
366:                        if (earliestSeq < 0) {
367:                            earliestSeq = earliestSequenceNumber();
368:                            if (logger.debugOn()) {
369:                                logger.debug("fetchNotifications",
370:                                        "earliestSeq=" + earliestSeq);
371:                            }
372:                            if (nextSeq < earliestSeq) {
373:                                nextSeq = earliestSeq;
374:                                logger.debug("fetchNotifications",
375:                                        "nextSeq=earliestSeq");
376:                            }
377:                        } else
378:                            earliestSeq = earliestSequenceNumber();
379:
380:                        /* If many notifications have been dropped since the
381:                           last time through, nextSeq could now be earlier
382:                           than the current earliest.  If so, notifications
383:                           may have been lost and we return now so the caller
384:                           can see this next time it calls.  */
385:                        if (nextSeq < earliestSeq) {
386:                            logger.trace("fetchNotifications", "nextSeq="
387:                                    + nextSeq + " < " + "earliestSeq="
388:                                    + earliestSeq + " so may have lost notifs");
389:                            break;
390:                        }
391:
392:                        if (nextSeq < nextSequenceNumber()) {
393:                            candidate = notificationAt(nextSeq);
394:                            if (logger.debugOn()) {
395:                                logger.debug("fetchNotifications",
396:                                        "candidate: " + candidate);
397:                                logger.debug("fetchNotifications",
398:                                        "nextSeq now " + nextSeq);
399:                            }
400:                        } else {
401:                            /* nextSeq is the largest sequence number.  If we
402:                               already got notifications, return them now.
403:                               Otherwise wait for some to arrive, with
404:                               timeout.  */
405:                            if (notifs.size() > 0) {
406:                                logger
407:                                        .debug("fetchNotifications",
408:                                                "no more notifs but have some so don't wait");
409:                                break;
410:                            }
411:                            long toWait = endTime - System.currentTimeMillis();
412:                            if (toWait <= 0) {
413:                                logger.debug("fetchNotifications", "timeout");
414:                                break;
415:                            }
416:
417:                            /* dispose called */
418:                            if (isDisposed()) {
419:                                if (logger.debugOn())
420:                                    logger.debug("fetchNotifications",
421:                                            "dispose callled, no wait");
422:                                return new NotificationResult(
423:                                        earliestSequenceNumber(),
424:                                        nextSequenceNumber(),
425:                                        new TargetedNotification[0]);
426:                            }
427:
428:                            if (logger.debugOn())
429:                                logger.debug("fetchNotifications", "wait("
430:                                        + toWait + ")");
431:                            wait(toWait);
432:
433:                            continue;
434:                        }
435:                    }
436:
437:                    /* We have a candidate notification.  See if it matches
438:                       our filters.  We do this outside the synchronized block
439:                       so we don't hold up everyone accessing the buffer
440:                       (including notification senders) while we evaluate
441:                       potentially slow filters.  */
442:                    ObjectName name = candidate.getObjectName();
443:                    Notification notif = candidate.getNotification();
444:                    List<TargetedNotification> matchedNotifs = new ArrayList<TargetedNotification>();
445:                    logger.debug("fetchNotifications",
446:                            "applying filter to candidate");
447:                    filter.apply(matchedNotifs, name, notif);
448:
449:                    if (matchedNotifs.size() > 0) {
450:                        /* We only check the max size now, so that our
451:                           returned nextSeq is as large as possible.  This
452:                           prevents the caller from thinking it missed
453:                           interesting notifications when in fact we knew they
454:                           weren't.  */
455:                        if (maxNotifications <= 0) {
456:                            logger.debug("fetchNotifications",
457:                                    "reached maxNotifications");
458:                            break;
459:                        }
460:                        --maxNotifications;
461:                        if (logger.debugOn())
462:                            logger.debug("fetchNotifications", "add: "
463:                                    + matchedNotifs);
464:                        notifs.addAll(matchedNotifs);
465:                    }
466:
467:                    ++nextSeq;
468:                } // end while
469:
470:                /* Construct and return the result.  */
471:                int nnotifs = notifs.size();
472:                TargetedNotification[] resultNotifs = new TargetedNotification[nnotifs];
473:                notifs.toArray(resultNotifs);
474:                NotificationResult nr = new NotificationResult(earliestSeq,
475:                        nextSeq, resultNotifs);
476:                if (logger.debugOn())
477:                    logger.debug("fetchNotifications", nr.toString());
478:                logger.trace("fetchNotifications", "ends");
479:
480:                return nr;
481:            }
482:
483:            synchronized long earliestSequenceNumber() {
484:                return earliestSequenceNumber;
485:            }
486:
487:            synchronized long nextSequenceNumber() {
488:                return nextSequenceNumber;
489:            }
490:
491:            synchronized void addNotification(NamedNotification notif) {
492:                if (logger.traceOn())
493:                    logger.trace("addNotification", notif.toString());
494:
495:                while (queue.size() >= queueSize) {
496:                    dropNotification();
497:                    if (logger.debugOn()) {
498:                        logger.debug("addNotification",
499:                                "dropped oldest notif, earliestSeq="
500:                                        + earliestSequenceNumber);
501:                    }
502:                }
503:                queue.add(notif);
504:                nextSequenceNumber++;
505:                if (logger.debugOn())
506:                    logger.debug("addNotification", "nextSeq="
507:                            + nextSequenceNumber);
508:                notifyAll();
509:            }
510:
511:            private void dropNotification() {
512:                queue.remove(0);
513:                earliestSequenceNumber++;
514:            }
515:
516:            synchronized NamedNotification notificationAt(long seqNo) {
517:                long index = seqNo - earliestSequenceNumber;
518:                if (index < 0 || index > Integer.MAX_VALUE) {
519:                    final String msg = "Bad sequence number: " + seqNo
520:                            + " (earliest " + earliestSequenceNumber + ")";
521:                    logger.trace("notificationAt", msg);
522:                    throw new IllegalArgumentException(msg);
523:                }
524:                return queue.get((int) index);
525:            }
526:
527:            private static class NamedNotification {
528:                NamedNotification(ObjectName sender, Notification notif) {
529:                    this .sender = sender;
530:                    this .notification = notif;
531:                }
532:
533:                ObjectName getObjectName() {
534:                    return sender;
535:                }
536:
537:                Notification getNotification() {
538:                    return notification;
539:                }
540:
541:                public String toString() {
542:                    return "NamedNotification(" + sender + ", " + notification
543:                            + ")";
544:                }
545:
546:                private final ObjectName sender;
547:                private final Notification notification;
548:            }
549:
550:            /*
551:             * Add our listener to every NotificationBroadcaster MBean
552:             * currently in the MBean server and to every
553:             * NotificationBroadcaster later created.
554:             *
555:             * It would be really nice if we could just do
556:             * mbs.addNotificationListener(new ObjectName("*:*"), ...);
557:             * Definitely something for the next version of JMX.
558:             *
559:             * There is a nasty race condition that we must handle.  We
560:             * first register for MBean-creation notifications so we can add
561:             * listeners to new MBeans, then we query the existing MBeans to
562:             * add listeners to them.  The problem is that a new MBean could
563:             * arrive after we register for creations but before the query has
564:             * completed.  Then we could see the MBean both in the query and
565:             * in an MBean-creation notification, and we would end up
566:             * registering our listener twice.
567:             *
568:             * To solve this problem, we arrange for new MBeans that arrive
569:             * while the query is being done to be added to the Set createdDuringQuery
570:             * and we do not add a listener immediately.  When the query is done,
571:             * we atomically turn off the addition of new names to createdDuringQuery
572:             * and add all the names that were there to the result of the query.
573:             * Since we are dealing with Sets, the result is the same whether or not
574:             * the newly-created MBean was included in the query result.
575:             *
576:             * It is important not to hold any locks during the operation of adding
577:             * listeners to MBeans.  An MBean's addNotificationListener can be
578:             * arbitrary user code, and this could deadlock with any locks we hold
579:             * (see bug 6239400).  The corollary is that we must not do any operations
580:             * in this method or the methods it calls that require locks.
581:             */
582:            private void createListeners() {
583:                logger.debug("createListeners", "starts");
584:
585:                synchronized (this ) {
586:                    createdDuringQuery = new HashSet<ObjectName>();
587:                }
588:
589:                try {
590:                    addNotificationListener(MBeanServerDelegate.DELEGATE_NAME,
591:                            creationListener, creationFilter, null);
592:                    logger.debug("createListeners", "added creationListener");
593:                } catch (Exception e) {
594:                    final String msg = "Can't add listener to MBean server delegate: ";
595:                    RuntimeException re = new IllegalArgumentException(msg + e);
596:                    EnvHelp.initCause(re, e);
597:                    logger.fine("createListeners", msg + e);
598:                    logger.debug("createListeners", e);
599:                    throw re;
600:                }
601:
602:                /* Spec doesn't say whether Set returned by QueryNames can be modified
603:                   so we clone it. */
604:                Set<ObjectName> names = queryNames(null, broadcasterQuery);
605:                names = new HashSet<ObjectName>(names);
606:
607:                synchronized (this ) {
608:                    names.addAll(createdDuringQuery);
609:                    createdDuringQuery = null;
610:                }
611:
612:                for (ObjectName name : names)
613:                    addBufferListener(name);
614:                logger.debug("createListeners", "ends");
615:            }
616:
617:            private void addBufferListener(ObjectName name) {
618:                checkNoLocks();
619:                if (logger.debugOn())
620:                    logger.debug("addBufferListener", name.toString());
621:                try {
622:                    addNotificationListener(name, bufferListener, null, name);
623:                } catch (Exception e) {
624:                    logger.trace("addBufferListener", e);
625:                    /* This can happen if the MBean was unregistered just
626:                       after the query.  Or user NotificationBroadcaster might
627:                       throw unexpected exception.  */
628:                }
629:            }
630:
631:            private void removeBufferListener(ObjectName name) {
632:                checkNoLocks();
633:                if (logger.debugOn())
634:                    logger.debug("removeBufferListener", name.toString());
635:                try {
636:                    removeNotificationListener(name, bufferListener);
637:                } catch (Exception e) {
638:                    logger.trace("removeBufferListener", e);
639:                }
640:            }
641:
642:            private void addNotificationListener(final ObjectName name,
643:                    final NotificationListener listener,
644:                    final NotificationFilter filter, final Object handback)
645:                    throws Exception {
646:                try {
647:                    AccessController
648:                            .doPrivileged(new PrivilegedExceptionAction<Void>() {
649:                                public Void run()
650:                                        throws InstanceNotFoundException {
651:                                    mBeanServer.addNotificationListener(name,
652:                                            listener, filter, handback);
653:                                    return null;
654:                                }
655:                            });
656:                } catch (Exception e) {
657:                    throw extractException(e);
658:                }
659:            }
660:
661:            private void removeNotificationListener(final ObjectName name,
662:                    final NotificationListener listener) throws Exception {
663:                try {
664:                    AccessController
665:                            .doPrivileged(new PrivilegedExceptionAction<Void>() {
666:                                public Void run() throws Exception {
667:                                    mBeanServer.removeNotificationListener(
668:                                            name, listener);
669:                                    return null;
670:                                }
671:                            });
672:                } catch (Exception e) {
673:                    throw extractException(e);
674:                }
675:            }
676:
677:            private Set<ObjectName> queryNames(final ObjectName name,
678:                    final QueryExp query) {
679:                PrivilegedAction<Set<ObjectName>> act = new PrivilegedAction<Set<ObjectName>>() {
680:                    public Set<ObjectName> run() {
681:                        return mBeanServer.queryNames(name, query);
682:                    }
683:                };
684:                try {
685:                    return AccessController.doPrivileged(act);
686:                } catch (RuntimeException e) {
687:                    logger.fine("queryNames", "Failed to query names: " + e);
688:                    logger.debug("queryNames", e);
689:                    throw e;
690:                }
691:            }
692:
693:            private static boolean isInstanceOf(final MBeanServer mbs,
694:                    final ObjectName name, final String className) {
695:                PrivilegedExceptionAction<Boolean> act = new PrivilegedExceptionAction<Boolean>() {
696:                    public Boolean run() throws InstanceNotFoundException {
697:                        return mbs.isInstanceOf(name, className);
698:                    }
699:                };
700:                try {
701:                    return AccessController.doPrivileged(act);
702:                } catch (Exception e) {
703:                    logger.fine("isInstanceOf", "failed: " + e);
704:                    logger.debug("isInstanceOf", e);
705:                    return false;
706:                }
707:            }
708:
709:            /* This method must not be synchronized.  See the comment on the
710:             * createListeners method.
711:             *
712:             * The notification could arrive after our buffer has been destroyed
713:             * or even during its destruction.  So we always add our listener
714:             * (without synchronization), then we check if the buffer has been
715:             * destroyed and if so remove the listener we just added.
716:             */
717:            private void createdNotification(MBeanServerNotification n) {
718:                final String shouldEqual = MBeanServerNotification.REGISTRATION_NOTIFICATION;
719:                if (!n.getType().equals(shouldEqual)) {
720:                    logger.warning("createNotification", "bad type: "
721:                            + n.getType());
722:                    return;
723:                }
724:
725:                ObjectName name = n.getMBeanName();
726:                if (logger.debugOn())
727:                    logger.debug("createdNotification", "for: " + name);
728:
729:                synchronized (this ) {
730:                    if (createdDuringQuery != null) {
731:                        createdDuringQuery.add(name);
732:                        return;
733:                    }
734:                }
735:
736:                if (isInstanceOf(mBeanServer, name, broadcasterClass)) {
737:                    addBufferListener(name);
738:                    if (isDisposed())
739:                        removeBufferListener(name);
740:                }
741:            }
742:
743:            private class BufferListener implements  NotificationListener {
744:                public void handleNotification(Notification notif,
745:                        Object handback) {
746:                    if (logger.debugOn()) {
747:                        logger.debug("BufferListener.handleNotification",
748:                                "notif=" + notif + "; handback=" + handback);
749:                    }
750:                    ObjectName name = (ObjectName) handback;
751:                    addNotification(new NamedNotification(name, notif));
752:                }
753:            }
754:
755:            private final NotificationListener bufferListener = new BufferListener();
756:
757:            private static class BroadcasterQuery extends QueryEval implements 
758:                    QueryExp {
759:                private static final long serialVersionUID = 7378487660587592048L;
760:
761:                public boolean apply(final ObjectName name) {
762:                    final MBeanServer mbs = QueryEval.getMBeanServer();
763:                    return isInstanceOf(mbs, name, broadcasterClass);
764:                }
765:            }
766:
767:            private static final QueryExp broadcasterQuery = new BroadcasterQuery();
768:
769:            private static final NotificationFilter creationFilter;
770:            static {
771:                NotificationFilterSupport nfs = new NotificationFilterSupport();
772:                nfs
773:                        .enableType(MBeanServerNotification.REGISTRATION_NOTIFICATION);
774:                creationFilter = nfs;
775:            }
776:
777:            private final NotificationListener creationListener = new NotificationListener() {
778:                public void handleNotification(Notification notif,
779:                        Object handback) {
780:                    logger.debug("creationListener",
781:                            "handleNotification called");
782:                    createdNotification((MBeanServerNotification) notif);
783:                }
784:            };
785:
786:            private void destroyListeners() {
787:                checkNoLocks();
788:                logger.debug("destroyListeners", "starts");
789:                try {
790:                    removeNotificationListener(
791:                            MBeanServerDelegate.DELEGATE_NAME, creationListener);
792:                } catch (Exception e) {
793:                    logger.warning("remove listener from MBeanServer delegate",
794:                            e);
795:                }
796:                Set<ObjectName> names = queryNames(null, broadcasterQuery);
797:                for (final ObjectName name : names) {
798:                    if (logger.debugOn())
799:                        logger.debug("destroyListeners",
800:                                "remove listener from " + name);
801:                    removeBufferListener(name);
802:                }
803:                logger.debug("destroyListeners", "ends");
804:            }
805:
806:            private void checkNoLocks() {
807:                if (Thread.holdsLock(this ) || Thread.holdsLock(globalLock))
808:                    logger.warning("checkNoLocks", "lock protocol violation");
809:            }
810:
811:            /**
812:             * Iterate until we extract the real exception
813:             * from a stack of PrivilegedActionExceptions.
814:             */
815:            private static Exception extractException(Exception e) {
816:                while (e instanceof  PrivilegedActionException) {
817:                    e = ((PrivilegedActionException) e).getException();
818:                }
819:                return e;
820:            }
821:
822:            private static final ClassLogger logger = new ClassLogger(
823:                    "javax.management.remote.misc", "ArrayNotificationBuffer");
824:
825:            private final MBeanServer mBeanServer;
826:            private final ArrayQueue<NamedNotification> queue;
827:            private int queueSize;
828:            private long earliestSequenceNumber;
829:            private long nextSequenceNumber;
830:            private Set<ObjectName> createdDuringQuery;
831:
832:            static final String broadcasterClass = NotificationBroadcaster.class
833:                    .getName();
834:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.