001: package org.jacorb.notification.servant;
002:
003: /*
004: * JacORB - a free Java ORB
005: *
006: * Copyright (C) 1997-2004 Gerald Brose.
007: *
008: * This library is free software; you can redistribute it and/or
009: * modify it under the terms of the GNU Library General Public
010: * License as published by the Free Software Foundation; either
011: * version 2 of the License, or (at your option) any later version.
012: *
013: * This library is distributed in the hope that it will be useful,
014: * but WITHOUT ANY WARRANTY; without even the implied warranty of
015: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
016: * Library General Public License for more details.
017: *
018: * You should have received a copy of the GNU Library General Public
019: * License along with this library; if not, write to the Free
020: * Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
021: */
022:
023: import java.util.ArrayList;
024: import java.util.List;
025:
026: import org.apache.avalon.framework.configuration.Configuration;
027: import org.jacorb.notification.EventTypeWrapper;
028: import org.jacorb.notification.MessageFactory;
029: import org.jacorb.notification.OfferManager;
030: import org.jacorb.notification.SubscriptionManager;
031: import org.jacorb.notification.conf.Default;
032: import org.jacorb.notification.engine.TaskProcessor;
033: import org.jacorb.notification.interfaces.FilterStage;
034: import org.jacorb.notification.interfaces.Message;
035: import org.jacorb.notification.interfaces.MessageConsumer;
036: import org.jacorb.notification.interfaces.MessageSupplier;
037: import org.jacorb.notification.util.PropertySet;
038: import org.jacorb.notification.util.PropertySetAdapter;
039: import org.omg.CORBA.NO_IMPLEMENT;
040: import org.omg.CORBA.ORB;
041: import org.omg.CosNotification.EventType;
042: import org.omg.CosNotification.Priority;
043: import org.omg.CosNotification.StartTimeSupported;
044: import org.omg.CosNotification.StopTimeSupported;
045: import org.omg.CosNotification.StructuredEvent;
046: import org.omg.CosNotification.Timeout;
047: import org.omg.CosNotifyChannelAdmin.ObtainInfoMode;
048: import org.omg.CosNotifyChannelAdmin.SupplierAdmin;
049: import org.omg.CosNotifyComm.InvalidEventType;
050: import org.omg.CosNotifyComm.NotifyPublishOperations;
051: import org.omg.CosNotifyComm.NotifySubscribe;
052: import org.omg.CosNotifyComm.NotifySubscribeHelper;
053: import org.omg.CosNotifyComm.NotifySubscribeOperations;
054: import org.omg.PortableServer.POA;
055:
056: import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
057:
058: /**
059: * @jmx.mbean extends = "AbstractProxyMBean"
060: * @jboss.xmbean
061: *
062: * @author Alphonse Bendt
063: * @version $Id: AbstractProxyConsumer.java,v 1.21 2006/07/07 12:38:44 alphonse.bendt Exp $
064: */
065:
066: public abstract class AbstractProxyConsumer extends AbstractProxy
067: implements IProxyConsumer, NotifyPublishOperations,
068: AbstractProxyConsumerMBean {
069: private final static EventType[] EMPTY_EVENT_TYPE_ARRAY = new EventType[0];
070:
071: // //////////////////////////////////////
072:
073: private final MessageFactory messageFactory_;
074:
075: // TODO check StartTime, StopTime, TimeOut: naming and usage is inconsistent.
076: private final AtomicBoolean isStartTimeSupported_ = new AtomicBoolean(
077: true);
078:
079: private final AtomicBoolean isStopTimeSupported_ = new AtomicBoolean(
080: true);
081:
082: private List subsequentDestinations_;
083:
084: private NotifySubscribeOperations proxySubscriptionListener_;
085:
086: private NotifySubscribe subscriptionListener_;
087:
088: protected final SupplierAdmin supplierAdmin_;
089:
090: private int messageCounter_ = 0;
091:
092: // //////////////////////////////////////
093:
094: protected AbstractProxyConsumer(IAdmin admin, ORB orb, POA poa,
095: Configuration conf, TaskProcessor taskProcessor,
096: MessageFactory messageFactory, SupplierAdmin supplierAdmin,
097: OfferManager offerManager,
098: SubscriptionManager subscriptionManager) {
099: super (admin, orb, poa, conf, taskProcessor, offerManager,
100: subscriptionManager);
101:
102: supplierAdmin_ = supplierAdmin;
103: messageFactory_ = messageFactory;
104:
105: configureStartTimeSupported();
106:
107: configureStopTimeSupported();
108:
109: qosSettings_.addPropertySetListener(new String[] {
110: Priority.value, Timeout.value,
111: StartTimeSupported.value, StopTimeSupported.value },
112: reconfigureQoS_);
113: }
114:
115: protected MessageFactory getMessageFactory() {
116: return messageFactory_;
117: }
118:
119: public final List getSubsequentFilterStages() {
120: return subsequentDestinations_;
121: }
122:
123: public void setSubsequentDestinations(List list) {
124: subsequentDestinations_ = list;
125: }
126:
127: private PropertySetAdapter reconfigureQoS_ = new PropertySetAdapter() {
128: public void actionPropertySetChanged(PropertySet source) {
129: configureStartTimeSupported();
130:
131: configureStopTimeSupported();
132: }
133: };
134:
135: private void configureStartTimeSupported() {
136: try {
137: isStartTimeSupported_.set(qosSettings_.get(
138: StartTimeSupported.value).extract_boolean());
139: } catch (Exception e) {
140: isStartTimeSupported_
141: .set(Default.DEFAULT_START_TIME_SUPPORTED
142: .equals("on"));
143: }
144:
145: if (logger_.isInfoEnabled()) {
146: logger_.info("set QoS: StartTimeSupported="
147: + isStartTimeSupported_);
148: }
149: }
150:
151: private void configureStopTimeSupported() {
152: logger_.debug("QoSSettings: " + qosSettings_);
153: try {
154: isStopTimeSupported_.set(qosSettings_.get(
155: StopTimeSupported.value).extract_boolean());
156: } catch (Exception e) {
157: isStopTimeSupported_
158: .set(Default.DEFAULT_STOP_TIME_SUPPORTED
159: .equals("on"));
160: }
161:
162: if (logger_.isInfoEnabled()) {
163: logger_.info("set QoS: StopTimeSupported="
164: + isStopTimeSupported_);
165: }
166: }
167:
168: protected void schedulePullTask(MessageSupplier target) {
169: getTaskProcessor().scheduleTimedPullTask(target);
170: }
171:
172: /**
173: * check if a Message is acceptable to the QoS Settings of this ProxyConsumer
174: */
175: protected void checkMessageProperties(Message m) {
176: // No Op
177: // TODO implement
178: }
179:
180: public FilterStage getFirstStage() {
181: return this ;
182: }
183:
184: /**
185: * @jmx.managed-attribute description = "Does this ProxyConsumer support the per Message Option TimeOut"
186: * access = "read-only"
187: */
188: public boolean getStopTimeSupported() {
189: return isStopTimeSupported_.get();
190: }
191:
192: /**
193: * @jmx.managed-attribute description = "Does this ProxyConsumer support the per Message Option StartTime"
194: * access = "read-only"
195: */
196: public boolean getStartTimeSupported() {
197: return isStartTimeSupported_.get();
198: }
199:
200: public final SupplierAdmin MyAdmin() {
201: return supplierAdmin_;
202: }
203:
204: public final MessageConsumer getMessageConsumer() {
205: throw new UnsupportedOperationException();
206: }
207:
208: public final boolean hasMessageConsumer() {
209: return false;
210: }
211:
212: public void offer_change(EventType[] added, EventType[] removed)
213: throws InvalidEventType {
214: offerManager_.offer_change(added, removed);
215: }
216:
217: public final EventType[] obtain_subscription_types(
218: ObtainInfoMode obtainInfoMode) {
219: final EventType[] _subscriptionTypes;
220:
221: switch (obtainInfoMode.value()) {
222: case ObtainInfoMode._ALL_NOW_UPDATES_ON:
223: // attach the listener first, then return the current
224: // subscription types. order is important so that no
225: // updates are lost.
226:
227: registerListener();
228:
229: _subscriptionTypes = subscriptionManager_
230: .obtain_subscription_types();
231: break;
232: case ObtainInfoMode._ALL_NOW_UPDATES_OFF:
233: _subscriptionTypes = subscriptionManager_
234: .obtain_subscription_types();
235:
236: removeListener();
237: break;
238: case ObtainInfoMode._NONE_NOW_UPDATES_ON:
239: _subscriptionTypes = EMPTY_EVENT_TYPE_ARRAY;
240:
241: registerListener();
242: break;
243: case ObtainInfoMode._NONE_NOW_UPDATES_OFF:
244: _subscriptionTypes = EMPTY_EVENT_TYPE_ARRAY;
245:
246: removeListener();
247: break;
248: default:
249: throw new IllegalArgumentException(
250: "Illegal ObtainInfoMode: ObtainInfoMode."
251: + obtainInfoMode.value());
252: }
253:
254: return _subscriptionTypes;
255: }
256:
257: private void registerListener() {
258: if (proxySubscriptionListener_ == null) {
259: final NotifySubscribeOperations _listener = getSubscriptionListener();
260:
261: if (_listener != null) {
262: proxySubscriptionListener_ = new NotifySubscribeOperations() {
263: public void subscription_change(EventType[] added,
264: EventType[] removed) {
265: try {
266: _listener.subscription_change(added,
267: removed);
268: } catch (NO_IMPLEMENT e) {
269: logger_
270: .info(
271: "disable subscription_change for Supplier",
272: e);
273:
274: removeListener();
275: } catch (InvalidEventType e) {
276: if (logger_.isDebugEnabled()) {
277: logger_.debug("subscription_change("
278: + EventTypeWrapper
279: .toString(added)
280: + ", "
281: + EventTypeWrapper
282: .toString(removed)
283: + ") failed", e);
284: } else {
285: logger_.error("invalid event type", e);
286: }
287: } catch (Exception e) {
288: logger_.error("subscription change failed",
289: e);
290: }
291: }
292: };
293: subscriptionManager_
294: .addListener(proxySubscriptionListener_);
295: }
296: }
297: }
298:
299: /**
300: * removes the listener. subscription_change will no more be issued to the connected Supplier
301: */
302: protected void removeListener() {
303: if (proxySubscriptionListener_ != null) {
304: subscriptionManager_
305: .removeListener(proxySubscriptionListener_);
306:
307: proxySubscriptionListener_ = null;
308: }
309: }
310:
311: protected final void clientDisconnected() {
312: subscriptionListener_ = null;
313: }
314:
315: protected void connectClient(org.omg.CORBA.Object client) {
316: super .connectClient(client);
317:
318: try {
319: subscriptionListener_ = NotifySubscribeHelper
320: .narrow(client);
321:
322: logger_
323: .debug("successfully narrowed connecting Supplier to NotifySubscribe");
324: } catch (Exception e) {
325: logger_
326: .info("connecting Supplier does not support subscription_change");
327: }
328: }
329:
330: final NotifySubscribeOperations getSubscriptionListener() {
331: return subscriptionListener_;
332: }
333:
334: protected void processMessage(Message mesg) {
335: getTaskProcessor().processMessage(mesg);
336:
337: messageCounter_++;
338: }
339:
340: /**
341: * @jmx.managed-attribute description = "Total number of Messages received by this ProxyConsumer"
342: * access = "read-only"
343: */
344: public final int getMessageCount() {
345: return messageCounter_;
346: }
347:
348: protected Message[] newMessages(StructuredEvent[] events) {
349: final List _result = new ArrayList(events.length);
350: final MessageFactory _messageFactory = getMessageFactory();
351:
352: for (int i = 0; i < events.length; ++i) {
353: final Message _newMessage = _messageFactory.newMessage(
354: events[i], this );
355: checkMessageProperties(_newMessage);
356: _result.add(_newMessage);
357: }
358:
359: return (Message[]) _result.toArray(new Message[_result.size()]);
360: }
361: }
|