Source Code Cross Referenced for AbstractProxyConsumer.java in  » Collaboration » JacORB » org » jacorb » notification » servant » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Collaboration » JacORB » org.jacorb.notification.servant 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        package org.jacorb.notification.servant;
002:
003:        /*
004:         *        JacORB - a free Java ORB
005:         *
006:         *   Copyright (C) 1997-2004 Gerald Brose.
007:         *
008:         *   This library is free software; you can redistribute it and/or
009:         *   modify it under the terms of the GNU Library General Public
010:         *   License as published by the Free Software Foundation; either
011:         *   version 2 of the License, or (at your option) any later version.
012:         *
013:         *   This library is distributed in the hope that it will be useful,
014:         *   but WITHOUT ANY WARRANTY; without even the implied warranty of
015:         *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
016:         *   Library General Public License for more details.
017:         *
018:         *   You should have received a copy of the GNU Library General Public
019:         *   License along with this library; if not, write to the Free
020:         *   Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
021:         */
022:
023:        import java.util.ArrayList;
024:        import java.util.List;
025:
026:        import org.apache.avalon.framework.configuration.Configuration;
027:        import org.jacorb.notification.EventTypeWrapper;
028:        import org.jacorb.notification.MessageFactory;
029:        import org.jacorb.notification.OfferManager;
030:        import org.jacorb.notification.SubscriptionManager;
031:        import org.jacorb.notification.conf.Default;
032:        import org.jacorb.notification.engine.TaskProcessor;
033:        import org.jacorb.notification.interfaces.FilterStage;
034:        import org.jacorb.notification.interfaces.Message;
035:        import org.jacorb.notification.interfaces.MessageConsumer;
036:        import org.jacorb.notification.interfaces.MessageSupplier;
037:        import org.jacorb.notification.util.PropertySet;
038:        import org.jacorb.notification.util.PropertySetAdapter;
039:        import org.omg.CORBA.NO_IMPLEMENT;
040:        import org.omg.CORBA.ORB;
041:        import org.omg.CosNotification.EventType;
042:        import org.omg.CosNotification.Priority;
043:        import org.omg.CosNotification.StartTimeSupported;
044:        import org.omg.CosNotification.StopTimeSupported;
045:        import org.omg.CosNotification.StructuredEvent;
046:        import org.omg.CosNotification.Timeout;
047:        import org.omg.CosNotifyChannelAdmin.ObtainInfoMode;
048:        import org.omg.CosNotifyChannelAdmin.SupplierAdmin;
049:        import org.omg.CosNotifyComm.InvalidEventType;
050:        import org.omg.CosNotifyComm.NotifyPublishOperations;
051:        import org.omg.CosNotifyComm.NotifySubscribe;
052:        import org.omg.CosNotifyComm.NotifySubscribeHelper;
053:        import org.omg.CosNotifyComm.NotifySubscribeOperations;
054:        import org.omg.PortableServer.POA;
055:
056:        import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
057:
058:        /**
059:         * @jmx.mbean extends = "AbstractProxyMBean"
060:         * @jboss.xmbean
061:         *
062:         * @author Alphonse Bendt
063:         * @version $Id: AbstractProxyConsumer.java,v 1.21 2006/07/07 12:38:44 alphonse.bendt Exp $
064:         */
065:
066:        public abstract class AbstractProxyConsumer extends AbstractProxy
067:                implements  IProxyConsumer, NotifyPublishOperations,
068:                AbstractProxyConsumerMBean {
069:            private final static EventType[] EMPTY_EVENT_TYPE_ARRAY = new EventType[0];
070:
071:            // //////////////////////////////////////
072:
073:            private final MessageFactory messageFactory_;
074:
075:            // TODO check StartTime, StopTime, TimeOut: naming and usage is inconsistent.
076:            private final AtomicBoolean isStartTimeSupported_ = new AtomicBoolean(
077:                    true);
078:
079:            private final AtomicBoolean isStopTimeSupported_ = new AtomicBoolean(
080:                    true);
081:
082:            private List subsequentDestinations_;
083:
084:            private NotifySubscribeOperations proxySubscriptionListener_;
085:
086:            private NotifySubscribe subscriptionListener_;
087:
088:            protected final SupplierAdmin supplierAdmin_;
089:
090:            private int messageCounter_ = 0;
091:
092:            // //////////////////////////////////////
093:
094:            protected AbstractProxyConsumer(IAdmin admin, ORB orb, POA poa,
095:                    Configuration conf, TaskProcessor taskProcessor,
096:                    MessageFactory messageFactory, SupplierAdmin supplierAdmin,
097:                    OfferManager offerManager,
098:                    SubscriptionManager subscriptionManager) {
099:                super (admin, orb, poa, conf, taskProcessor, offerManager,
100:                        subscriptionManager);
101:
102:                supplierAdmin_ = supplierAdmin;
103:                messageFactory_ = messageFactory;
104:
105:                configureStartTimeSupported();
106:
107:                configureStopTimeSupported();
108:
109:                qosSettings_.addPropertySetListener(new String[] {
110:                        Priority.value, Timeout.value,
111:                        StartTimeSupported.value, StopTimeSupported.value },
112:                        reconfigureQoS_);
113:            }
114:
115:            protected MessageFactory getMessageFactory() {
116:                return messageFactory_;
117:            }
118:
119:            public final List getSubsequentFilterStages() {
120:                return subsequentDestinations_;
121:            }
122:
123:            public void setSubsequentDestinations(List list) {
124:                subsequentDestinations_ = list;
125:            }
126:
127:            private PropertySetAdapter reconfigureQoS_ = new PropertySetAdapter() {
128:                public void actionPropertySetChanged(PropertySet source) {
129:                    configureStartTimeSupported();
130:
131:                    configureStopTimeSupported();
132:                }
133:            };
134:
135:            private void configureStartTimeSupported() {
136:                try {
137:                    isStartTimeSupported_.set(qosSettings_.get(
138:                            StartTimeSupported.value).extract_boolean());
139:                } catch (Exception e) {
140:                    isStartTimeSupported_
141:                            .set(Default.DEFAULT_START_TIME_SUPPORTED
142:                                    .equals("on"));
143:                }
144:
145:                if (logger_.isInfoEnabled()) {
146:                    logger_.info("set QoS: StartTimeSupported="
147:                            + isStartTimeSupported_);
148:                }
149:            }
150:
151:            private void configureStopTimeSupported() {
152:                logger_.debug("QoSSettings: " + qosSettings_);
153:                try {
154:                    isStopTimeSupported_.set(qosSettings_.get(
155:                            StopTimeSupported.value).extract_boolean());
156:                } catch (Exception e) {
157:                    isStopTimeSupported_
158:                            .set(Default.DEFAULT_STOP_TIME_SUPPORTED
159:                                    .equals("on"));
160:                }
161:
162:                if (logger_.isInfoEnabled()) {
163:                    logger_.info("set QoS: StopTimeSupported="
164:                            + isStopTimeSupported_);
165:                }
166:            }
167:
168:            protected void schedulePullTask(MessageSupplier target) {
169:                getTaskProcessor().scheduleTimedPullTask(target);
170:            }
171:
172:            /**
173:             * check if a Message is acceptable to the QoS Settings of this ProxyConsumer
174:             */
175:            protected void checkMessageProperties(Message m) {
176:                // No Op
177:                // TODO implement
178:            }
179:
180:            public FilterStage getFirstStage() {
181:                return this ;
182:            }
183:
184:            /**
185:             * @jmx.managed-attribute description = "Does this ProxyConsumer support the per Message Option TimeOut"
186:             *                        access = "read-only"
187:             */
188:            public boolean getStopTimeSupported() {
189:                return isStopTimeSupported_.get();
190:            }
191:
192:            /**
193:             * @jmx.managed-attribute description = "Does this ProxyConsumer support the per Message Option StartTime"
194:             *                        access = "read-only"
195:             */
196:            public boolean getStartTimeSupported() {
197:                return isStartTimeSupported_.get();
198:            }
199:
200:            public final SupplierAdmin MyAdmin() {
201:                return supplierAdmin_;
202:            }
203:
204:            public final MessageConsumer getMessageConsumer() {
205:                throw new UnsupportedOperationException();
206:            }
207:
208:            public final boolean hasMessageConsumer() {
209:                return false;
210:            }
211:
212:            public void offer_change(EventType[] added, EventType[] removed)
213:                    throws InvalidEventType {
214:                offerManager_.offer_change(added, removed);
215:            }
216:
217:            public final EventType[] obtain_subscription_types(
218:                    ObtainInfoMode obtainInfoMode) {
219:                final EventType[] _subscriptionTypes;
220:
221:                switch (obtainInfoMode.value()) {
222:                case ObtainInfoMode._ALL_NOW_UPDATES_ON:
223:                    // attach the listener first, then return the current
224:                    // subscription types. order is important so that no
225:                    // updates are lost.
226:
227:                    registerListener();
228:
229:                    _subscriptionTypes = subscriptionManager_
230:                            .obtain_subscription_types();
231:                    break;
232:                case ObtainInfoMode._ALL_NOW_UPDATES_OFF:
233:                    _subscriptionTypes = subscriptionManager_
234:                            .obtain_subscription_types();
235:
236:                    removeListener();
237:                    break;
238:                case ObtainInfoMode._NONE_NOW_UPDATES_ON:
239:                    _subscriptionTypes = EMPTY_EVENT_TYPE_ARRAY;
240:
241:                    registerListener();
242:                    break;
243:                case ObtainInfoMode._NONE_NOW_UPDATES_OFF:
244:                    _subscriptionTypes = EMPTY_EVENT_TYPE_ARRAY;
245:
246:                    removeListener();
247:                    break;
248:                default:
249:                    throw new IllegalArgumentException(
250:                            "Illegal ObtainInfoMode: ObtainInfoMode."
251:                                    + obtainInfoMode.value());
252:                }
253:
254:                return _subscriptionTypes;
255:            }
256:
257:            private void registerListener() {
258:                if (proxySubscriptionListener_ == null) {
259:                    final NotifySubscribeOperations _listener = getSubscriptionListener();
260:
261:                    if (_listener != null) {
262:                        proxySubscriptionListener_ = new NotifySubscribeOperations() {
263:                            public void subscription_change(EventType[] added,
264:                                    EventType[] removed) {
265:                                try {
266:                                    _listener.subscription_change(added,
267:                                            removed);
268:                                } catch (NO_IMPLEMENT e) {
269:                                    logger_
270:                                            .info(
271:                                                    "disable subscription_change for Supplier",
272:                                                    e);
273:
274:                                    removeListener();
275:                                } catch (InvalidEventType e) {
276:                                    if (logger_.isDebugEnabled()) {
277:                                        logger_.debug("subscription_change("
278:                                                + EventTypeWrapper
279:                                                        .toString(added)
280:                                                + ", "
281:                                                + EventTypeWrapper
282:                                                        .toString(removed)
283:                                                + ") failed", e);
284:                                    } else {
285:                                        logger_.error("invalid event type", e);
286:                                    }
287:                                } catch (Exception e) {
288:                                    logger_.error("subscription change failed",
289:                                            e);
290:                                }
291:                            }
292:                        };
293:                        subscriptionManager_
294:                                .addListener(proxySubscriptionListener_);
295:                    }
296:                }
297:            }
298:
299:            /**
300:             * removes the listener. subscription_change will no more be issued to the connected Supplier
301:             */
302:            protected void removeListener() {
303:                if (proxySubscriptionListener_ != null) {
304:                    subscriptionManager_
305:                            .removeListener(proxySubscriptionListener_);
306:
307:                    proxySubscriptionListener_ = null;
308:                }
309:            }
310:
311:            protected final void clientDisconnected() {
312:                subscriptionListener_ = null;
313:            }
314:
315:            protected void connectClient(org.omg.CORBA.Object client) {
316:                super .connectClient(client);
317:
318:                try {
319:                    subscriptionListener_ = NotifySubscribeHelper
320:                            .narrow(client);
321:
322:                    logger_
323:                            .debug("successfully narrowed connecting Supplier to NotifySubscribe");
324:                } catch (Exception e) {
325:                    logger_
326:                            .info("connecting Supplier does not support subscription_change");
327:                }
328:            }
329:
330:            final NotifySubscribeOperations getSubscriptionListener() {
331:                return subscriptionListener_;
332:            }
333:
334:            protected void processMessage(Message mesg) {
335:                getTaskProcessor().processMessage(mesg);
336:
337:                messageCounter_++;
338:            }
339:
340:            /**
341:             * @jmx.managed-attribute description = "Total number of Messages received by this ProxyConsumer"
342:             *                        access = "read-only"
343:             */
344:            public final int getMessageCount() {
345:                return messageCounter_;
346:            }
347:
348:            protected Message[] newMessages(StructuredEvent[] events) {
349:                final List _result = new ArrayList(events.length);
350:                final MessageFactory _messageFactory = getMessageFactory();
351:
352:                for (int i = 0; i < events.length; ++i) {
353:                    final Message _newMessage = _messageFactory.newMessage(
354:                            events[i], this );
355:                    checkMessageProperties(_newMessage);
356:                    _result.add(_newMessage);
357:                }
358:
359:                return (Message[]) _result.toArray(new Message[_result.size()]);
360:            }
361:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.