Source Code Cross Referenced for Subscriber.java in  » Science » Cougaar12_4 » org » cougaar » core » blackboard » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Science » Cougaar12_4 » org.cougaar.core.blackboard 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


0001:        /*
0002:         * <copyright>
0003:         *  
0004:         *  Copyright 1997-2007 BBNT Solutions, LLC
0005:         *  under sponsorship of the Defense Advanced Research Projects
0006:         *  Agency (DARPA).
0007:         * 
0008:         *  You can redistribute this software and/or modify it under the
0009:         *  terms of the Cougaar Open Source License as published on the
0010:         *  Cougaar Open Source Website (www.cougaar.org).
0011:         * 
0012:         *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
0013:         *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
0014:         *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
0015:         *  A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
0016:         *  OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
0017:         *  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
0018:         *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
0019:         *  DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
0020:         *  THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
0021:         *  (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
0022:         *  OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
0023:         *  
0024:         * </copyright>
0025:         */
0026:
0027:        package org.cougaar.core.blackboard;
0028:
0029:        import java.util.ArrayList;
0030:        import java.util.Collection;
0031:        import java.util.Enumeration;
0032:        import java.util.HashSet;
0033:        import java.util.Iterator;
0034:        import java.util.List;
0035:        import java.util.Map;
0036:
0037:        import org.cougaar.bootstrap.SystemProperties;
0038:        import org.cougaar.core.persist.Persistence;
0039:        import org.cougaar.core.persist.PersistenceNotEnabledException;
0040:        import org.cougaar.util.LockFlag;
0041:        import org.cougaar.util.UnaryPredicate;
0042:        import org.cougaar.util.log.Logger;
0043:        import org.cougaar.util.log.Logging;
0044:        import org.cougaar.util.CallerTracker;
0045:
0046:        /**
0047:         * The standard implementation of the {@link
0048:         * org.cougaar.core.service.BlackboardService}.
0049:         *
0050:         * @property org.cougaar.core.blackboard.enforceTransactions
0051:         * Set to <em>false</em> to disable checking for clients
0052:         * of BlackboardService publishing changes to the blackboard outside
0053:         * of a transaction.
0054:         *
0055:         * @property org.cougaar.core.blackboard.debug
0056:         * Set to true to additional checking on blackboard transactions.  
0057:         * For instance, it will attempt to look for changes to blackboard
0058:         * objects which have not been published at transaction close time.
0059:         *
0060:         * @note Although Subscriber directly implements all the methods of
0061:         * BlackboardService, it declines to implement the interface to avoid
0062:         * the Subscriber class itself <em>and all extending classes</em> from
0063:         * being Services.
0064:         *
0065:         * @property org.cougaar.core.blackboard.timestamp
0066:         * Set to true to enable EnvelopeMetrics and TimestampSubscriptions
0067:         * (defaults to false).
0068:         *
0069:         * @property org.cougaar.core.blackboard.trackPublishers
0070:         * Set to true to enable PublisherSubscriptions
0071:         * (defaults to false).
0072:         */
0073:        public class Subscriber {
0074:            private static final Logger logger = Logging
0075:                    .getLogger(Subscriber.class);
0076:
0077:            private static final boolean isEnforcing = SystemProperties
0078:                    .getBoolean(
0079:                            "org.cougaar.core.blackboard.enforceTransactions",
0080:                            true);
0081:
0082:            private static final boolean warnUnpublishChanges = SystemProperties
0083:                    .getBoolean("org.cougaar.core.blackboard.debug");
0084:
0085:            private static final boolean enableTimestamps = SystemProperties
0086:                    .getBoolean("org.cougaar.core.blackboard.timestamp")
0087:                    || SystemProperties
0088:                            .getBoolean("org.cougaar.core.blackboard.trackPublishers");
0089:
0090:            private BlackboardClient theClient = null;
0091:            private Distributor theDistributor = null;
0092:            private String subscriberName = "";
0093:            private boolean shouldBePersisted = true;
0094:            private boolean firstTransactionComplete = false;
0095:
0096:            protected Subscriber() {
0097:            }
0098:
0099:            /**
0100:             * Create a subscriber that provides subscription services 
0101:             * to a client and send outgoing messages to a Distributor.
0102:             * Plugin clients will use this API.
0103:             */
0104:            public Subscriber(BlackboardClient client, Distributor distributor) {
0105:                this (client, distributor, ((client != null) ? client
0106:                        .getBlackboardClientName() : null));
0107:            }
0108:
0109:            public Subscriber(BlackboardClient client, Distributor distributor,
0110:                    String subscriberName) {
0111:                setClientDistributor(client, distributor);
0112:                setName(subscriberName);
0113:            }
0114:
0115:            public void setClientDistributor(BlackboardClient client,
0116:                    Distributor newDistributor) {
0117:                theClient = client;
0118:                if (theDistributor != newDistributor) {
0119:                    if (theDistributor != null) {
0120:                        theDistributor.unregisterSubscriber(this );
0121:                    }
0122:                    theDistributor = newDistributor;
0123:                    if (theDistributor != null) {
0124:                        theDistributor.registerSubscriber(this );
0125:                    }
0126:                }
0127:            }
0128:
0129:            public void setName(String newName) {
0130:                subscriberName = newName;
0131:            }
0132:
0133:            public String getName() {
0134:                return subscriberName;
0135:            }
0136:
0137:            public boolean shouldBePersisted() {
0138:                return shouldBePersisted;
0139:            }
0140:
0141:            public void setShouldBePersisted(boolean shouldBePersisted) {
0142:                this .shouldBePersisted = shouldBePersisted;
0143:            }
0144:
0145:            boolean isReadyToPersist() {
0146:                return firstTransactionComplete;
0147:            }
0148:
0149:            public void setReadyToPersist() {
0150:                theDistributor.discardRehydrationInfo(this );
0151:                firstTransactionComplete = true;
0152:            }
0153:
0154:            public boolean didRehydrate() {
0155:                boolean result = theDistributor.didRehydrate(this );
0156:                return result;
0157:            }
0158:
0159:            public void persistNow() throws PersistenceNotEnabledException {
0160:                boolean inTransaction = (transactionLock.getBusyFlagOwner() == Thread
0161:                        .currentThread());
0162:                if (inTransaction)
0163:                    closeTransaction();
0164:                theDistributor.persistNow();
0165:                if (inTransaction)
0166:                    openTransaction();
0167:            }
0168:
0169:            public Persistence getPersistence() {
0170:                return theDistributor.getPersistence();
0171:            }
0172:
0173:            /**
0174:             * Move inboxes into subscriptions.
0175:             */
0176:            protected boolean privateUpdateSubscriptions() {
0177:                boolean changedp = false;
0178:                synchronized (subscriptions) {
0179:                    transactionAllowsQuiescence = inboxAllowsQuiescence;
0180:                    transactionEnvelopes = flushInbox();
0181:                    try {
0182:                        for (int i = 0, n = subscriptions.size(); i < n; i++) {
0183:                            Subscription subscription = (Subscription) subscriptions
0184:                                    .get(i);
0185:                            for (int j = 0, l = transactionEnvelopes.size(); j < l; j++) {
0186:                                Envelope envelope = (Envelope) transactionEnvelopes
0187:                                        .get(j);
0188:                                try {
0189:                                    changedp |= subscription.apply(envelope);
0190:                                } catch (PublishException pe) {
0191:                                    Logger logger = Logging
0192:                                            .getLogger(Subscriber.class);
0193:                                    String message = pe.getMessage();
0194:                                    logger.error(message);
0195:
0196:                                    BlackboardClient currentClient = null;
0197:                                    //                 if (envelope instanceof OutboxEnvelope) {
0198:                                    //                   OutboxEnvelope e = (OutboxEnvelope) envelope;
0199:                                    //                   currentClient = e.theClient;
0200:                                    //                 }
0201:                                    if (currentClient == null) {
0202:                                        currentClient = BlackboardClient.current
0203:                                                .getClient();
0204:                                    }
0205:                                    String this Publisher = null;
0206:                                    if (currentClient != null) {
0207:                                        this Publisher = currentClient
0208:                                                .getBlackboardClientName();
0209:                                    }
0210:                                    if (envelope instanceof  Blackboard.PlanEnvelope) {
0211:                                        if (this Publisher == null) {
0212:                                            this Publisher = "Blackboard";
0213:                                        } else {
0214:                                            this Publisher = "Blackboard after "
0215:                                                    + this Publisher;
0216:                                        }
0217:                                    } else if (this Publisher == null) {
0218:                                        this Publisher = "Unknown";
0219:                                    }
0220:                                    pe.printStackTrace(" This publisher: "
0221:                                            + this Publisher);
0222:                                    if (!pe.priorStackUnavailable) {
0223:                                        if (pe.priorStack == null) {
0224:                                            System.err
0225:                                                    .println("Prior publisher: Unknown");
0226:                                        }
0227:                                    } else {
0228:                                        if (pe.priorStack == null) {
0229:                                            System.err
0230:                                                    .println("Prior publisher: Not set");
0231:                                        } else {
0232:                                            pe.priorStack.printStackTrace();
0233:                                        }
0234:                                    }
0235:                                } catch (RuntimeException ire) {
0236:                                    BlackboardClient currentClient = null;
0237:                                    if (currentClient == null) {
0238:                                        currentClient = BlackboardClient.current
0239:                                                .getClient();
0240:                                    }
0241:                                    String this Publisher = null;
0242:                                    if (currentClient != null) {
0243:                                        this Publisher = currentClient
0244:                                                .getBlackboardClientName();
0245:                                    }
0246:                                    logger.error(
0247:                                            "Exception while applying envelopes in "
0248:                                                    + currentClient + "/"
0249:                                                    + this Publisher, ire);
0250:                                }
0251:                            }
0252:                        }
0253:                    } catch (RuntimeException re) {
0254:                        re.printStackTrace();
0255:                    }
0256:                }
0257:                return changedp;
0258:            }
0259:
0260:            /**
0261:             * Report changes that the plugin published.
0262:             * These changes are represented by the outbox.
0263:             */
0264:            protected Envelope privateGetPublishedChanges() {
0265:                Envelope box = flushOutbox();
0266:                if (transactionEnvelopes != null) {
0267:                    recycleInbox(transactionEnvelopes);
0268:                    transactionEnvelopes = null;
0269:                    transactionAllowsQuiescence = true;
0270:                } else {
0271:                    recycleInbox(flushInbox());
0272:                }
0273:                if (enableTimestamps && (box instanceof  TimestampedEnvelope)) {
0274:                    TimestampedEnvelope te = (TimestampedEnvelope) box;
0275:                    te.setName(getName());
0276:                    te.setTransactionOpenTime(openTime);
0277:                    te.setTransactionCloseTime(System.currentTimeMillis());
0278:                }
0279:                return box;
0280:            }
0281:
0282:            /**
0283:             * Accessors to persist our inbox state
0284:             */
0285:            public List getTransactionEnvelopes() {
0286:                return transactionEnvelopes;
0287:            }
0288:
0289:            public List getPendingEnvelopes() {
0290:                return pendingEnvelopes;
0291:            }
0292:
0293:            //////////////////////////////////////////////////////
0294:            //              Subscriptions                       //
0295:            //////////////////////////////////////////////////////
0296:            private int publishAddedCount;
0297:            private int publishChangedCount;
0298:            private int publishRemovedCount;
0299:
0300:            // now unused
0301:            public int getSubscriptionCount() {
0302:                // synchronized (subscriptions) {}
0303:                return subscriptions.size();
0304:            }
0305:
0306:            // unused?
0307:            public int getSubscriptionSize() {
0308:                int size = 0;
0309:                // synchronized (subscriptions) {} 
0310:                int l = subscriptions.size();
0311:                for (int i = 0; i < l; i++) {
0312:                    Object s = subscriptions.get(i);
0313:                    if (s instanceof  CollectionSubscription) {
0314:                        size += ((CollectionSubscription) s).size();
0315:                    }
0316:                }
0317:                return size;
0318:            }
0319:
0320:            public int getPublishAddedCount() {
0321:                return publishAddedCount;
0322:            }
0323:
0324:            public int getPublishChangedCount() {
0325:                return publishChangedCount;
0326:            }
0327:
0328:            public int getPublishRemovedCount() {
0329:                return publishRemovedCount;
0330:            }
0331:
0332:            /** our set of active subscriptions. Access must be synchronized on self. */
0333:            protected final List subscriptions = new ArrayList(5);
0334:
0335:            protected void resetSubscriptionChanges() {
0336:                synchronized (subscriptions) {
0337:                    int l = subscriptions.size();
0338:                    for (int i = 0; i < l; i++) {
0339:                        Subscription s = (Subscription) subscriptions.get(i);
0340:                        s.resetChanges();
0341:                    }
0342:                    resetHaveCollectionsChanged();
0343:                }
0344:            }
0345:
0346:            /**
0347:             * Subscribe to a collection service with isMember, default inner
0348:             * collection and supporting incremental change queries.
0349:             * @note Although allowed, use of DynamicUnaryPredicate can be extremely expensive
0350:             * and tends to create as many problems as it solves.  When in pedantic mode,
0351:             * warning are emitted when DynamicUnaryPredicate is used. Disable Blackboard.PEDANTIC to quiet
0352:             * such warnings if you are sure you want to do this.
0353:             * @see Blackboard#PEDANTIC
0354:             */
0355:            public Subscription subscribe(UnaryPredicate isMember) {
0356:                return subscribe(isMember, null, true);
0357:            }
0358:
0359:            /**
0360:             * Subscribe to a collection service with isMember, default inner
0361:             * collection and specifying if you want incremental change query support.
0362:             * @note Although allowed, use of DynamicUnaryPredicate can be extremely expensive
0363:             * and tends to create as many problems as it solves.  When in pedantic mode,
0364:             * warning are emitted when DynamicUnaryPredicate is used. Disable Blackboard.PEDANTIC to quiet
0365:             * such warnings if you are sure you want to do this.
0366:             * @see Blackboard#PEDANTIC
0367:             */
0368:            public Subscription subscribe(UnaryPredicate isMember,
0369:                    boolean isIncremental) {
0370:                return subscribe(isMember, null, isIncremental);
0371:            }
0372:
0373:            /**
0374:             * Subscribe to a collection service with isMember, specifying inner
0375:             * collection and supporting incremental change queries.
0376:             * @note Although allowed, use of DynamicUnaryPredicate can be extremely expensive
0377:             * and tends to create as many problems as it solves.  When in pedantic mode,
0378:             * warning are emitted when DynamicUnaryPredicate is used. Disable Blackboard.PEDANTIC to quiet
0379:             * such warnings if you are sure you want to do this.
0380:             * @see Blackboard#PEDANTIC
0381:             */
0382:            public Subscription subscribe(UnaryPredicate isMember,
0383:                    Collection realCollection) {
0384:                return subscribe(isMember, realCollection, true);
0385:            }
0386:
0387:            /**
0388:             * Subscribe to a collection service.
0389:             * Tells the Distributor about its interest, but should not block,
0390:             * even if there are lots of "back issues" to transmit.
0391:             * This is the full form.
0392:             * @param isMember The defining predicate for the slice of the blackboard.
0393:             * @param realCollection The real container wrapped by the returned value.
0394:             * @param isIncremental IFF true, returns a container that supports delta
0395:             * lists.
0396:             * @return The resulting Subscription
0397:             * @see IncrementalSubscription
0398:             * @note Although allowed, use of DynamicUnaryPredicate can be extremely expensive
0399:             * and tends to create as many problems as it solves.  When in pedantic mode,
0400:             * warning are emitted when DynamicUnaryPredicate is used. Disable Blackboard.PEDANTIC to quiet
0401:             * such warnings if you are sure you want to do this.
0402:             * @see Blackboard#PEDANTIC
0403:             */
0404:            public Subscription subscribe(UnaryPredicate isMember,
0405:                    Collection realCollection, boolean isIncremental) {
0406:                Subscription sn;
0407:
0408:                if (realCollection == null)
0409:                    realCollection = new HashSet();
0410:
0411:                if (isIncremental) {
0412:                    sn = new IncrementalSubscription(isMember, realCollection);
0413:                } else {
0414:                    sn = new CollectionSubscription(isMember, realCollection);
0415:                }
0416:                return subscribe(sn);
0417:            }
0418:
0419:            /** Primary subscribe method.  Register a new subscription. */
0420:            public final Subscription subscribe(Subscription subscription) {
0421:                // Strictly speaking, subscribe can be done outside a transaction, but the 
0422:                // state of filled subscription w/rt the rest of the subscriptions
0423:                // is suspect if it isn't.
0424:                checkTransactionOK("subscribe()");
0425:
0426:                synchronized (subscriptions) {
0427:                    subscription.setSubscriber(this );
0428:                    subscriptions.add(subscription);
0429:                    theDistributor.fillSubscription(subscription);
0430:                }
0431:                setHaveNewSubscriptions(); // make sure we get counted.
0432:                return subscription;
0433:            }
0434:
0435:            /** lightweight query of Blackboard */
0436:            public final Collection query(UnaryPredicate isMember) {
0437:                checkTransactionOK("query(UnaryPredicate)");
0438:                QuerySubscription s = new QuerySubscription(isMember);
0439:                s.setSubscriber(this ); // shouldn't really be needed
0440:                theDistributor.fillQuery(s);
0441:                return s.getCollection();
0442:            }
0443:
0444:            private static final CallerTracker pTracker = CallerTracker
0445:                    .getShallowTracker(2);
0446:
0447:            final void checkTransactionOK(String methodname, Object arg) {
0448:                if (this  instanceof  Blackboard)
0449:                    return; // No check for Blackboard
0450:
0451:                if (Blackboard.PEDANTIC && arg instanceof  Collection
0452:                        && pTracker.isNew()) {
0453:                    if (logger.isWarnEnabled())
0454:                        logger.warn("PEDANTIC: A Collection published by "
0455:                                + theClient, new Throwable());
0456:                }
0457:
0458:                if (!isMyTransaction()) {
0459:                    if (arg != null) {
0460:                        methodname = methodname + "(" + arg + ")";
0461:                    }
0462:                    logger
0463:                            .error(toString() + "." + methodname
0464:                                    + " called outside of transaction",
0465:                                    new Throwable());
0466:                    //throw new RuntimeException(methodname+" called outside of transaction boundaries");
0467:                }
0468:            }
0469:
0470:            final void checkTransactionOK(String methodname) {
0471:                checkTransactionOK(methodname, null);
0472:            }
0473:
0474:            /**
0475:             * Stop subscribing to a previously obtained Subscription. The
0476:             * Subscription must have been returned from a previous call to
0477:             * subscribe.
0478:             * @param subscription the Subscription that is to be cancelled.
0479:             */
0480:            public void unsubscribe(Subscription subscription) {
0481:                // strictly speaking, this doesn't have to be done inside a transaction, but
0482:                // we'll check anyway to be symmetric with subscribe.
0483:                checkTransactionOK("unsubscribe()");
0484:                synchronized (subscriptions) {
0485:                    subscriptions.remove(subscription);
0486:                }
0487:            }
0488:
0489:            /*
0490:             * Inbox invariants:
0491:             * pendingEnvelopes accumulates new envelopes for the next transaction (always).
0492:             * transactionEnvelopes has the previous pendingEnvelopes during a
0493:             * transaction, null otherwise.
0494:             * idleEnvelopes has an empty list when no transaction is active.
0495:             *
0496:             * The list cycle around from idle to pending to transaction back to
0497:             * idle. idle and transaction are never null at the same time; one
0498:             * of them always has the list the pendingEnvelopes does not have
0499:             *
0500:             */
0501:            private List pendingEnvelopes = new ArrayList(); // Envelopes to be added at next transaction
0502:            private List transactionEnvelopes = null; // Envelopes of current transaction
0503:            private List idleEnvelopes = new ArrayList(); // Alternate list
0504:            private final Object inboxLock = new Object(); // For synchronized access to inboxes
0505:            private boolean inboxAllowsQuiescence = true; // True if inbox allows quiescence
0506:            private boolean transactionAllowsQuiescence = true; // True if inbox being processed allowed quiescence.
0507:
0508:            /**
0509:             * Called by non-client methods to add an envelope to our inboxes.
0510:             * This is complicated because we wish to avoid holding envelopes
0511:             * when there is no possibility of their ever being used (no
0512:             * subscriptions). A simple test of the number of subscriptions is
0513:             * insufficient because, if a transaction is open, new subscriptions
0514:             * may be created that, in later transactions, need to receive the
0515:             * envelopes. So the test includes a test of transactions being
0516:             * open. We use transactionLock.tryGetBusyFlag() because we can't
0517:             * block and the fact that the lock is busy, is a sufficient
0518:             * indication that we must put the new envelopes into the inbox. It
0519:             * may turn out that the inbox did not need to be stuffed (because
0520:             * there will not be any subscriptions), but this is handled when
0521:             * the transaction is closed where the inbox is emptied if there are
0522:             * no subscriptions.
0523:             */
0524:            public void receiveEnvelopes(List envelopes,
0525:                    boolean envelopeQuiescenceRequired) {
0526:                boolean signalActivity = false;
0527:                synchronized (inboxLock) {
0528:                    boolean notBusy = transactionLock.tryGetBusyFlag();
0529:                    // if notBusy, then the client isn't running (and wont) until we're done.
0530:                    // if !notBusy, then the client IS running so we need to dump the envelopes
0531:                    //  in regardless (because it might add a watcher or subscription
0532:                    boolean hasWatchers;
0533:                    synchronized (watchers) {
0534:                        hasWatchers = !watchers.isEmpty();
0535:                    }
0536:                    boolean hasSubscriptions = !subscriptions.isEmpty();
0537:                    if (hasSubscriptions || (hasWatchers && !notBusy)) {
0538:                        pendingEnvelopes.addAll(envelopes);
0539:                        if (envelopeQuiescenceRequired) {
0540:                            inboxAllowsQuiescence = false;
0541:                        }
0542:                        signalActivity = true;
0543:                    } else {
0544:                        if (logger.isInfoEnabled() && !hasSubscriptions
0545:                                && !notBusy && !hasWatchers) {
0546:                            logger
0547:                                    .info(this 
0548:                                            + ".receiveEnvs: Fix for bug 3328 means"
0549:                                            + " we're not distributing the outbox here cause no watchers.");
0550:                        }
0551:                    }
0552:                    if (notBusy)
0553:                        transactionLock.freeBusyFlag();
0554:                }
0555:                if (signalActivity)
0556:                    signalExternalActivity();
0557:            }
0558:
0559:            public boolean isBusy() {
0560:                synchronized (inboxLock) {
0561:                    return (pendingEnvelopes.size() > 0);
0562:                }
0563:            }
0564:
0565:            public boolean isQuiescent() {
0566:                synchronized (inboxLock) {
0567:                    return (inboxAllowsQuiescence && transactionAllowsQuiescence);
0568:                }
0569:            }
0570:
0571:            private List flushInbox() {
0572:                synchronized (inboxLock) {
0573:                    List result = pendingEnvelopes;
0574:                    pendingEnvelopes = idleEnvelopes;
0575:                    idleEnvelopes = null;
0576:                    inboxAllowsQuiescence = true;
0577:                    return result;
0578:                }
0579:            }
0580:
0581:            private void recycleInbox(List old) {
0582:                old.clear();
0583:                idleEnvelopes = old;
0584:            }
0585:
0586:            /**
0587:             * outbox data structure - an Envelope used to encapsulate 
0588:             * outgoing changes to collections.
0589:             */
0590:            private Envelope outbox = createEnvelope();
0591:
0592:            protected Envelope flushOutbox() {
0593:                if (outbox.size() == 0)
0594:                    return null;
0595:                Envelope result = outbox;
0596:                outbox = createEnvelope();
0597:                return result;
0598:            }
0599:
0600:            // This won't work with persistence turned on. Don't _ever_ use operationally (ray)
0601:            //   public static class OutboxEnvelope extends Envelope {
0602:            //     public OutboxEnvelope(BlackboardClient client) {
0603:            //       theClient = client;
0604:            //     }
0605:            //     public BlackboardClient theClient;
0606:            //   }
0607:
0608:            /** factory method for creating Envelopes of the correct type */
0609:            protected Envelope createEnvelope() {
0610:                if (enableTimestamps) {
0611:                    return new TimestampedEnvelope();
0612:                } else {
0613:                    return new Envelope();
0614:                }
0615:                // return new OutboxEnvelope(getClient());  // for debugging
0616:            }
0617:
0618:            // might want to make the syncs finer-grained
0619:            /**
0620:             * called whenever the client adds an object to a collection
0621:             * to notify the rest of the world of the change.
0622:             * Actual Changes to the collection only happen via this api.
0623:             */
0624:            protected EnvelopeTuple clientAddedObject(Object o) {
0625:                // attempt to claim the object
0626:                claimObject(o);
0627:
0628:                return outbox.addObject(o);
0629:            }
0630:
0631:            /**
0632:             * called whenever the client removes an object from a collection
0633:             * to notify the rest of the world of the change.
0634:             * Actual Changes to the collection only happen via this api.
0635:             */
0636:            protected EnvelopeTuple clientRemovedObject(Object o) {
0637:                // attempt to unclaim the object
0638:                unclaimObject(o);
0639:
0640:                return outbox.removeObject(o);
0641:            }
0642:
0643:            /**
0644:             * called whenever the client changes an object in a collection
0645:             * to notify the rest of the world of the change.
0646:             * Actual Changes to the collection only happen via this api.
0647:             */
0648:            protected EnvelopeTuple clientChangedObject(Object o, List changes) {
0649:                return outbox.changeObject(o, changes);
0650:            }
0651:
0652:            /**
0653:             * Add an object to the blackboard.
0654:             * <p> 
0655:             * Behavior is not defined if the object was already on the blackboard.
0656:             * @note Although strictly allowed, it takes special care to properly publish a
0657:             * raw Collection object to the Blackboard.  Disable Blackboard.PEDANTIC to quiet
0658:             * such warnings if you are sure you want to do this.
0659:             * @see Blackboard#PEDANTIC
0660:             */
0661:            public final void publishAdd(Object o) {
0662:                checkTransactionOK("add", o);
0663:
0664:                if (theDistributor.history != null)
0665:                    theDistributor.history.publishAdd(o);
0666:                if (o instanceof  ActiveSubscriptionObject) {
0667:                    ((ActiveSubscriptionObject) o).addingToBlackboard(this ,
0668:                            false);
0669:                    if (!ActiveSubscriptionObject.deferCommit) {
0670:                        ((ActiveSubscriptionObject) o).addingToBlackboard(this ,
0671:                                true);
0672:                    }
0673:                }
0674:
0675:                if (o instanceof  Publishable) {
0676:                    //List crs =  // var unused
0677:                    Transaction.getCurrentTransaction().getChangeReports(o); // side effects
0678:                }
0679:
0680:                // if we made it this far publish the object and return true.
0681:                clientAddedObject(o);
0682:                publishAddedCount++;
0683:            }
0684:
0685:            /**
0686:             * Remove an object from the blackboard.
0687:             * <p> 
0688:             * Behavior is not defined if the object was not already on the blackboard.
0689:             *
0690:             * @note Although strictly allowed, it takes special care to properly publish a
0691:             * raw Collection object to the Blackboard.  Disable Blackboard.PEDANTIC to quiet
0692:             * such warnings if you are sure you want to do this.
0693:             * @see Blackboard#PEDANTIC
0694:             */
0695:            public final void publishRemove(Object o) {
0696:                checkTransactionOK("remove", o);
0697:
0698:                if (theDistributor.history != null)
0699:                    theDistributor.history.publishRemove(o);
0700:                if (o instanceof  ActiveSubscriptionObject) {
0701:                    ((ActiveSubscriptionObject) o).removingFromBlackboard(this ,
0702:                            false);
0703:                    if (!ActiveSubscriptionObject.deferCommit) {
0704:                        ((ActiveSubscriptionObject) o).removingFromBlackboard(
0705:                                this , true);
0706:                    }
0707:                }
0708:
0709:                if (o instanceof  Publishable) {
0710:                    List crs = Transaction.getCurrentTransaction()
0711:                            .getChangeReports(o);
0712:                    if (warnUnpublishChanges) {
0713:                        if (crs != null && crs.size() > 0) {
0714:                            if (logger.isWarnEnabled())
0715:                                logger
0716:                                        .warn("Warning: publishRemove("
0717:                                                + o
0718:                                                + ") is dropping outstanding changes:\n\t"
0719:                                                + crs);
0720:                        }
0721:                    }
0722:                }
0723:
0724:                clientRemovedObject(o);
0725:                publishRemovedCount++;
0726:            }
0727:
0728:            /**
0729:             * Convenience function for publishChange(o, null).
0730:             * @note Although strictly allowed, it takes special care to properly publish a
0731:             * raw Collection object to the Blackboard.  Disable Blackboard.PEDANTIC to quiet
0732:             * such warnings if you are sure you want to do this.
0733:             * @see Blackboard#PEDANTIC
0734:             */
0735:            public final void publishChange(Object o) {
0736:                publishChange(o, null);
0737:            }
0738:
0739:            /**
0740:             * Mark an object on the blackboard as changed.
0741:             * <p> 
0742:             * Behavior is not defined if the object is not on the blackboard.
0743:             * <p> 
0744:             * There is no need to call this if the object was added or removed,
0745:             * only if the contents of the object itself has been changed.
0746:             * The changes parameter describes a set of changes made to the
0747:             * object beyond those tracked automatically by the object class
0748:             * (see the object class documentation for a description of which
0749:             * types of changes are tracked).  Any additional changes are
0750:             * merged in <em>after</em> automatically collected reports.
0751:             * @param changes a set of ChangeReport instances or null.
0752:             * @note Although strictly allowed, it takes special care to properly publish a
0753:             * raw Collection object to the Blackboard.  Disable Blackboard.PEDANTIC to quiet
0754:             * such warnings if you are sure you want to do this.
0755:             * @see Blackboard#PEDANTIC
0756:             */
0757:            public final void publishChange(Object o, Collection changes) {
0758:                checkTransactionOK("change", o);
0759:
0760:                if (theDistributor.history != null)
0761:                    theDistributor.history.publishChange(o);
0762:                if (o instanceof  ActiveSubscriptionObject) {
0763:                    ((ActiveSubscriptionObject) o).changingInBlackboard(this ,
0764:                            false);
0765:                    if (!ActiveSubscriptionObject.deferCommit) {
0766:                        ((ActiveSubscriptionObject) o).changingInBlackboard(
0767:                                this , true);
0768:                    }
0769:                }
0770:
0771:                List crs = null;
0772:                if (o instanceof  Publishable) {
0773:                    crs = Transaction.getCurrentTransaction().getChangeReports(
0774:                            o);
0775:                }
0776:
0777:                // convert null or empty changes to the "anonymous" list
0778:                if (isZeroChanges(changes)) {
0779:                    if (isZeroChanges(crs)) {
0780:                        crs = AnonymousChangeReport.LIST;
0781:                    } else {
0782:                        // use crs as-is
0783:                    }
0784:                } else {
0785:                    if (isZeroChanges(crs)) {
0786:                        crs = new ArrayList(changes);
0787:                    } else {
0788:                        crs.addAll(changes);
0789:                    }
0790:                }
0791:
0792:                // if we made it this far publish the change and return true.
0793:                clientChangedObject(o, crs);
0794:                publishChangedCount++;
0795:            }
0796:
0797:            private final boolean isZeroChanges(final Collection c) {
0798:                return ((c == null) || (c == AnonymousChangeReport.LIST)
0799:                        || (c == AnonymousChangeReport.SET) || (c.isEmpty()));
0800:            }
0801:
0802:            /**
0803:             * A extension subscriber may call this method to execute bulkAdd transactions.
0804:             * This is protected because it is of very limited to other than persistance plugins.
0805:             *  Note that Blackboard does something like
0806:             * this by hand constructing an entire special-purpose envelope.  This, however, is
0807:             * for use in-band, in-transaction.  
0808:             *  The Collection passed MUST be immutable, since there may be many consumers,
0809:             * each running at different times.
0810:             */
0811:            protected EnvelopeTuple bulkAddObject(Collection c) {
0812:                checkTransactionOK("bulkAdd", c);
0813:
0814:                EnvelopeTuple t;
0815:                t = outbox.bulkAddObject(c);
0816:
0817:                return t;
0818:            }
0819:
0820:            /**
0821:             * Safer version of bulkAddObject(Collection).
0822:             * Creates a Collection from the Enumeration and passes it into
0823:             * the envelope.
0824:             */
0825:            protected EnvelopeTuple bulkAddObject(Enumeration en) {
0826:                checkTransactionOK("bulkAdd", en);
0827:
0828:                EnvelopeTuple t;
0829:                t = outbox.bulkAddObject(en);
0830:
0831:                return t;
0832:            }
0833:
0834:            protected EnvelopeTuple bulkAddObject(Iterator en) {
0835:                checkTransactionOK("bulkAdd", en);
0836:
0837:                EnvelopeTuple t;
0838:                t = outbox.bulkAddObject(en);
0839:
0840:                return t;
0841:            }
0842:
0843:            //
0844:            // Transaction handling.
0845:            //
0846:            /*
0847:             * It would be nice if we could merge the Transaction object with the
0848:             * older open/close transaction code somehow - there is some redundancy
0849:             * and the current parallel implementations are somewhat confusing.
0850:             */
0851:
0852:            /**
0853:             * The transaction lock.  At most one watcher per subscriber gets to
0854:             * have an open transaction at one time.  We could support multiple
0855:             * simultaneously open transactions with multiple subscribers, but
0856:             * this is a feature for another day.
0857:             */
0858:            private LockFlag transactionLock = new LockFlag();
0859:
0860:            /**
0861:             * The current in-force transaction instance.
0862:             * The is only kept around as a check to pass to Transaction.close()
0863:             * in order to make sure we're closing the right one.
0864:             * In particular, we cannot use this in the publishWhatever methods 
0865:             * because the Blackboard methods are executing in the wrong thread.
0866:             */
0867:            private Transaction theTransaction = null;
0868:
0869:            /**
0870:             * Overridable by extending classes to specify more featureful
0871:             * Transaction semantics.
0872:             */
0873:            protected Transaction newTransaction() {
0874:                return new Transaction(this );
0875:            }
0876:
0877:            /**
0878:             * Open a transaction by grabbing the transaction lock and updating
0879:             * the subscriptions.  This method blocks waiting for the
0880:             * transaction lock.
0881:             */
0882:            public final void openTransaction() {
0883:                transactionLock.getBusyFlag();
0884:                finishOpenTransaction();
0885:            }
0886:
0887:            private long openTime = setTransactionOpenTime();
0888:
0889:            protected final boolean isTimestamped() {
0890:                return enableTimestamps;
0891:            }
0892:
0893:            protected final long setTransactionOpenTime() {
0894:                if (enableTimestamps) {
0895:                    return (openTime = System.currentTimeMillis());
0896:                } else {
0897:                    return -1;
0898:                }
0899:            }
0900:
0901:            /**
0902:             * Common routine for both openTransaction and tryOpenTransaction
0903:             * does everything except getting the transactionLock busy flag.
0904:             */
0905:            private void finishOpenTransaction() {
0906:                int count = transactionLock.getBusyCount();
0907:                if (count > 1) {
0908:                    if (isEnforcing) {
0909:                        logger.error("Opened nested transaction (level="
0910:                                + count + ")", new Throwable());
0911:                    }
0912:                    return;
0913:                }
0914:
0915:                startTransaction();
0916:
0917:                theDistributor.startTransaction();
0918:
0919:                setTransactionOpenTime();
0920:                if (privateUpdateSubscriptions()) {
0921:                    setHaveCollectionsChanged();
0922:                }
0923:                if (haveNewSubscriptions()) {
0924:                    setHaveCollectionsChanged();
0925:                    resetHaveNewSubscriptions();
0926:                }
0927:                noteOpenTransaction(this );
0928:            }
0929:
0930:            protected final void startTransaction() {
0931:                theTransaction = newTransaction();
0932:                Transaction.open(theTransaction);
0933:            }
0934:
0935:            private boolean _haveNewSubscriptions = false;
0936:
0937:            private boolean haveNewSubscriptions() {
0938:                return _haveNewSubscriptions;
0939:            }
0940:
0941:            private void setHaveNewSubscriptions() {
0942:                _haveNewSubscriptions = true;
0943:            }
0944:
0945:            private void resetHaveNewSubscriptions() {
0946:                _haveNewSubscriptions = false;
0947:            }
0948:
0949:            /**
0950:             * Keep track of whether or not the collections have changed 
0951:             * since the previous openTransaction.
0952:             */
0953:            private boolean _haveCollectionsChangedSinceLastTransaction = false;
0954:
0955:            /** set haveCollectionsChanged() */
0956:            private void setHaveCollectionsChanged() {
0957:                _haveCollectionsChangedSinceLastTransaction = true;
0958:            }
0959:
0960:            /** set haveCollectionsChanged() */
0961:            private void resetHaveCollectionsChanged() {
0962:                _haveCollectionsChangedSinceLastTransaction = false;
0963:            }
0964:
0965:            /** can be called by anyone who can open a transaction to decide what to do.
0966:             * returned value is only valid/useful inside an open transaction.
0967:             */
0968:            public boolean haveCollectionsChanged() {
0969:                return _haveCollectionsChangedSinceLastTransaction;
0970:            }
0971:
0972:            /** Attempt to open a transaction by attempting to grab the 
0973:             * transaction lock and updating the collections (iff we got the 
0974:             * lock).
0975:             *
0976:             * This is equivalent to the old (misnamed) tryLockSubscriber method
0977:             * in PluginWrapper.
0978:             *
0979:             * @return true IFF a transaction was opened.
0980:             */
0981:            public final boolean tryOpenTransaction() {
0982:                if (transactionLock.tryGetBusyFlag()) {
0983:                    finishOpenTransaction();
0984:                    return true;
0985:                }
0986:                return false;
0987:            }
0988:
0989:            /**
0990:             * Close a transaction opened by openTransaction() or a successful
0991:             * tryOpenTransaction(), but don't reset subscription changes or
0992:             * clear delta lists.
0993:             * @exception SubscriberException IFF we did not own the transaction
0994:             * lock.
0995:             */
0996:            public final void closeTransactionDontReset() {
0997:                closeTransaction(false);
0998:            }
0999:
1000:            /** check to see if we've already got an open transaction
1001:             */
1002:            public final boolean isTransactionOpen() {
1003:                return (transactionLock.getBusyFlagOwner() == Thread
1004:                        .currentThread());
1005:            }
1006:
1007:            /** Close a transaction opened by openTransaction() or a 
1008:             * successful tryOpenTransaction().
1009:             * @param resetSubscriptions IFF true, all subscriptions will have
1010:             * their resetChanges() method called to clear any delta lists, etc.
1011:             * @exception SubscriberException IFF we did not own the transaction
1012:             * lock.
1013:             * @deprecated Use {@link #closeTransactionDontReset closeTransactionDontReset}
1014:             * This method becomes private after deprecation period expires.
1015:             */
1016:            public final void closeTransaction(boolean resetSubscriptions)
1017:                    throws SubscriberException {
1018:                if (transactionLock.getBusyFlagOwner() == Thread
1019:                        .currentThread()) {
1020:                    // only do our closeTransaction work when exiting the nest.
1021:                    if (transactionLock.getBusyCount() == 1) {
1022:                        checkUnpostedChangeReports();
1023:
1024:                        if (!isReadyToPersist()) {
1025:                            setReadyToPersist();
1026:                        }
1027:                        if (resetSubscriptions)
1028:                            resetSubscriptionChanges();
1029:                        Envelope box = privateGetPublishedChanges();
1030:                        try {
1031:                            theDistributor.finishTransaction(box, getClient());
1032:                        } finally {
1033:                            stopTransaction();
1034:                        }
1035:                    } else {
1036:                        // Nested transaction (more than 1 busy)?
1037:                        //System.err.println("Closed nested transaction.");
1038:                        if (logger.isDebugEnabled())
1039:                            logger.debug("Closed nested transaction.");
1040:                    }
1041:                    // If no subscriptions we will never process the inbox. Empty
1042:                    // it to conserve memory instead of waiting for
1043:                    // openTransaction
1044:                    synchronized (inboxLock) {
1045:                        if (getSubscriptionCount() == 0) {
1046:                            pendingEnvelopes.clear();
1047:                        }
1048:                        if (!transactionLock.freeBusyFlag()) {
1049:                            throw new SubscriberException(
1050:                                    "Failed to close an owned transaction");
1051:                        }
1052:                    }
1053:                } else {
1054:                    throw new SubscriberException(
1055:                            "Attempt to close a non-open transaction");
1056:                }
1057:                noteCloseTransaction(this );
1058:            }
1059:
1060:            protected final void stopTransaction() {
1061:                Transaction.close(theTransaction);
1062:                theTransaction = null;
1063:            }
1064:
1065:            protected final void checkUnpostedChangeReports() {
1066:                //Map map = theTransaction.getChangeMap();
1067:                Map map = Transaction.getCurrentTransaction().getChangeMap();
1068:                if (warnUnpublishChanges) {
1069:                    if (map == null || map.size() == 0)
1070:                        return;
1071:                    if (logger.isWarnEnabled())
1072:                        logger
1073:                                .warn("Ignoring outstanding unpublished changes:");
1074:                    for (Iterator ki = map.keySet().iterator(); ki.hasNext();) {
1075:                        Object o = ki.next();
1076:                        List l = (List) map.get(o);
1077:                        if (logger.isWarnEnabled())
1078:                            logger.warn("\t" + o + " (" + l.size() + ")");
1079:                        // we could just publish them with something like:
1080:                        //handleActiveSubscriptionObjects()
1081:                        //clientChangedObject(o, l);
1082:                    }
1083:                }
1084:            }
1085:
1086:            /** Close a transaction opened by openTransaction() or a 
1087:             * successful tryOpenTransaction().
1088:             * Will reset all subscription change tracking facilities.
1089:             * To avoid this, use closeTransactionDontReset() instead.
1090:             * @exception SubscriberException IFF we did not own the transaction
1091:             * lock.
1092:             */
1093:            public final void closeTransaction() {
1094:                closeTransaction(true);
1095:            }
1096:
1097:            /** Does someone have an open transaction? */
1098:            public final boolean isInTransaction() {
1099:                return (transactionLock.getBusyFlagOwner() != null);
1100:            }
1101:
1102:            /** Do I have an open transaction?
1103:             * This really translates to "Is is safe to make changes to my
1104:             * collections?"
1105:             */
1106:            public final boolean isMyTransaction() {
1107:                return (transactionLock.getBusyFlagOwner() == Thread
1108:                        .currentThread());
1109:            }
1110:
1111:            //
1112:            // Interest Handling - extension of earlier wakeRequest and
1113:            //   interestSemaphore code.
1114:            //
1115:
1116:            /** list of SubscriptionWatchers to be notified when something
1117:             * interesting happens.  Access must be synchronized on watchers.
1118:             */
1119:            private final List watchers = new ArrayList(1);
1120:
1121:            public final SubscriptionWatcher registerInterest(
1122:                    SubscriptionWatcher w) {
1123:                if (w == null) {
1124:                    throw new IllegalArgumentException(
1125:                            "Null SubscriptionWatcher");
1126:                }
1127:
1128:                synchronized (watchers) {
1129:                    watchers.add(w);
1130:                }
1131:
1132:                return w;
1133:            }
1134:
1135:            /** Allow a thread of a subscriber to register an interest in the
1136:             * subscriber's collections.  Mainly used to allow threads to monitor
1137:             * changes in collections - that is, the fact of change, not the details.
1138:             * The level of support here is like the old wake and interestSemaphore
1139:             * code.  The client of a subscriber need not register explicitly, as
1140:             * it is done at initialization time.
1141:             */
1142:            public final SubscriptionWatcher registerInterest() {
1143:                return registerInterest(new SubscriptionWatcher());
1144:            }
1145:
1146:            /** Allow a thread to unregister an interest registered by
1147:             * registerInterest.  Should be done if a subordinate (watching)
1148:             * thread exits, or a plugin unloads.
1149:             */
1150:            public final void unregisterInterest(SubscriptionWatcher w)
1151:                    throws SubscriberException {
1152:                synchronized (watchers) {
1153:                    if (!watchers.remove(w)) {
1154:                        throw new SubscriberException(
1155:                                "Attempt to unregisterInterest of unknown SubscriptionWatcher");
1156:                    }
1157:                }
1158:            }
1159:
1160:            //
1161:            // watcher triggers
1162:            //
1163:
1164:            private boolean _externalActivity = false;
1165:            private boolean _internalActivity = false;
1166:            private boolean _clientActivity = false;
1167:
1168:            public boolean wasExternalActivity() {
1169:                return _externalActivity;
1170:            }
1171:
1172:            public boolean wasInternalActivity() {
1173:                return _internalActivity;
1174:            }
1175:
1176:            public boolean wasClientActivity() {
1177:                return _clientActivity;
1178:            }
1179:
1180:            /** called when external activity changes the subscriber's collections.
1181:             * by default, just calls wakeSubscriptionWatchers, but subclasses
1182:             * may be more circumspect.
1183:             */
1184:            public void signalExternalActivity() {
1185:                _externalActivity = true;
1186:                wakeSubscriptionWatchers(SubscriptionWatcher.EXTERNAL);
1187:            }
1188:
1189:            /** called when internal activity actually changes the subscriber's
1190:             * collections. 
1191:             * by default, just calls wakeSubscriptionWatchers, but subclasses
1192:             * may be more circumspect.
1193:             */
1194:            public void signalInternalActivity() {
1195:                _internalActivity = true;
1196:                wakeSubscriptionWatchers(SubscriptionWatcher.INTERNAL);
1197:            }
1198:
1199:            /** called when the client (Plugin) requests that it be waked again.
1200:             * by default, just calls wakeSubscriptionWatchers, but subclasses
1201:             * may be more circumspect.
1202:             */
1203:            public void signalClientActivity() {
1204:                _clientActivity = true;
1205:                wakeSubscriptionWatchers(SubscriptionWatcher.CLIENT);
1206:            }
1207:
1208:            /** called to notify all SubscriptionWatchers.
1209:             */
1210:            private final void wakeSubscriptionWatchers(int event) {
1211:                synchronized (watchers) {
1212:                    int l = watchers.size();
1213:                    for (int i = 0; i < l; i++) {
1214:                        ((SubscriptionWatcher) (watchers.get(i)))
1215:                                .signalNotify(event);
1216:                    }
1217:                }
1218:            }
1219:
1220:            //
1221:            // usability and debugability methods
1222:            // 
1223:
1224:            public String toString() {
1225:                String cs = "(self)";
1226:                if (theClient != this )
1227:                    cs = theClient.toString();
1228:
1229:                return "<" + getClass().getName() + " " + this .hashCode()
1230:                        + " for " + cs + " and " + theDistributor + ">";
1231:            }
1232:
1233:            /** utility to claim an object as ours */
1234:            protected void claimObject(Object o) {
1235:                if (o instanceof  ClaimableHolder) {
1236:                    Claimable c = ((ClaimableHolder) o).getClaimable();
1237:                    if (c != null) {
1238:                        //System.err.println("\n->"+getClient()+" claimed "+c);
1239:                        c.setClaim(getClient());
1240:                    }
1241:                }
1242:            }
1243:
1244:            /** utility to release a claim on an object */
1245:            protected void unclaimObject(Object o) {
1246:                if (o instanceof  ClaimableHolder) {
1247:                    Claimable c = ((ClaimableHolder) o).getClaimable();
1248:                    if (c != null) {
1249:                        //System.err.println("\n->"+getClient()+" unclaimed "+c);
1250:                        c.resetClaim(getClient());
1251:                    }
1252:                }
1253:            }
1254:
1255:            /** return the client of the the subscriber.
1256:             * May be overridden by subclasses in case they are really
1257:             * delegating to some other object.
1258:             */
1259:            public BlackboardClient getClient() {
1260:                return theClient;
1261:            }
1262:
1263:            //Leftover from ancient code - now in BlackboardService... may want to 
1264:            //deprecate next release?
1265:            public Subscriber getSubscriber() {
1266:                return this ;
1267:            }
1268:
1269:            // try and save the state so that we can abort open transactions if
1270:            // someone is bad.
1271:            private static final ThreadLocal _openTransaction = new ThreadLocal();
1272:
1273:            private static void noteOpenTransaction(Subscriber s) {
1274:                _openTransaction.set(s);
1275:            }
1276:
1277:            private static void noteCloseTransaction(Subscriber s) {
1278:                if (s != _openTransaction.get()) {
1279:                    Logging.getLogger(Subscriber.class).error(
1280:                            "Attempt to close a transaction from a different thread"
1281:                                    + " than the one which opened it:\n\t" + s
1282:                                    + "\t" + _openTransaction.get(),
1283:                            new Throwable());
1284:                }
1285:                _openTransaction.set(null);
1286:            }
1287:
1288:            public static boolean abortTransaction() {
1289:                Subscriber s = (Subscriber) _openTransaction.get();
1290:                if (s != null) {
1291:                    s.closeTransaction();
1292:                    return true;
1293:                } else {
1294:                    return false;
1295:                }
1296:            }
1297:
1298:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.