001: package org.jacorb.notification.servant;
002:
003: /*
004: * JacORB - a free Java ORB
005: *
006: * Copyright (C) 1999-2004 Gerald Brose
007: *
008: * This library is free software; you can redistribute it and/or
009: * modify it under the terms of the GNU Library General Public
010: * License as published by the Free Software Foundation; either
011: * version 2 of the License, or (at your option) any later version.
012: *
013: * This library is distributed in the hope that it will be useful,
014: * but WITHOUT ANY WARRANTY; without even the implied warranty of
015: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
016: * Library General Public License for more details.
017: *
018: * You should have received a copy of the GNU Library General Public
019: * License along with this library; if not, write to the Free
020: * Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
021: *
022: */
023:
024: import java.util.HashSet;
025: import java.util.Iterator;
026: import java.util.List;
027: import java.util.Set;
028:
029: import org.apache.avalon.framework.configuration.Configurable;
030: import org.apache.avalon.framework.configuration.Configuration;
031: import org.apache.avalon.framework.logger.Logger;
032: import org.jacorb.notification.FilterManager;
033: import org.jacorb.notification.IContainer;
034: import org.jacorb.notification.OfferManager;
035: import org.jacorb.notification.SubscriptionManager;
036: import org.jacorb.notification.conf.Attributes;
037: import org.jacorb.notification.conf.Default;
038: import org.jacorb.notification.engine.TaskProcessor;
039: import org.jacorb.notification.interfaces.Disposable;
040: import org.jacorb.notification.interfaces.FilterStage;
041: import org.jacorb.notification.interfaces.JMXManageable;
042: import org.jacorb.notification.lifecycle.IServantLifecyle;
043: import org.jacorb.notification.lifecycle.ServantLifecyleControl;
044: import org.jacorb.notification.util.DisposableManager;
045: import org.jacorb.notification.util.QoSPropertySet;
046: import org.omg.CORBA.NO_IMPLEMENT;
047: import org.omg.CORBA.OBJECT_NOT_EXIST;
048: import org.omg.CORBA.ORB;
049: import org.omg.CosEventChannelAdmin.AlreadyConnected;
050: import org.omg.CosEventComm.Disconnected;
051: import org.omg.CosNotification.NamedPropertyRangeSeqHolder;
052: import org.omg.CosNotification.Property;
053: import org.omg.CosNotification.QoSAdminOperations;
054: import org.omg.CosNotification.UnsupportedQoS;
055: import org.omg.CosNotifyChannelAdmin.ConnectionAlreadyActive;
056: import org.omg.CosNotifyChannelAdmin.ConnectionAlreadyInactive;
057: import org.omg.CosNotifyChannelAdmin.NotConnected;
058: import org.omg.CosNotifyChannelAdmin.ProxyType;
059: import org.omg.CosNotifyFilter.Filter;
060: import org.omg.CosNotifyFilter.FilterAdminOperations;
061: import org.omg.CosNotifyFilter.FilterNotFound;
062: import org.omg.CosNotifyFilter.MappingFilter;
063: import org.omg.CosNotifyFilter.MappingFilterHelper;
064: import org.omg.PortableServer.POA;
065: import org.picocontainer.PicoContainer;
066:
067: import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
068: import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
069:
070: /**
071: * @jmx.mbean
072: * @jboss.xmbean
073: *
074: * @author Alphonse Bendt
075: * @version $Id: AbstractProxy.java,v 1.31 2006/03/06 19:53:46 alphonse.bendt Exp $
076: */
077:
078: public abstract class AbstractProxy implements FilterAdminOperations,
079: QoSAdminOperations, FilterStage, IServantLifecyle,
080: Configurable, JMXManageable, AbstractProxyMBean {
081: private final MappingFilter nullMappingFilterRef_;
082:
083: private final boolean isIDPublic_;
084:
085: protected final Logger logger_;
086:
087: private final AtomicBoolean connected_ = new AtomicBoolean(false);
088:
089: protected final QoSPropertySet qosSettings_;
090:
091: private final Integer id_;
092:
093: protected final OfferManager offerManager_;
094:
095: protected final SubscriptionManager subscriptionManager_;
096:
097: private MappingFilter lifetimeFilter_;
098:
099: private MappingFilter priorityFilter_;
100:
101: /**
102: * delegate for FilterAdminOperations
103: */
104: private final FilterManager filterManager_;
105:
106: private final AtomicBoolean destroyed_ = new AtomicBoolean(false);
107:
108: private final AtomicBoolean disposeInProgress_ = new AtomicBoolean(
109: false);
110:
111: private final AtomicInteger errorCounter_ = new AtomicInteger(0);
112:
113: private final POA poa_;
114:
115: private final ORB orb_;
116:
117: private final TaskProcessor taskProcessor_;
118:
119: private boolean isInterFilterGroupOperatorOR_;
120:
121: private final boolean disposedProxyDisconnectsClient_;
122:
123: private final AtomicBoolean active_ = new AtomicBoolean(true);
124:
125: private final DisposableManager disposables_ = new DisposableManager();
126:
127: private final PicoContainer container_;
128:
129: private org.omg.CORBA.Object client_;
130:
131: private final String parentMBean_;
132:
133: protected final Set eventTypes_ = new HashSet();
134:
135: private JMXManageable.JMXCallback jmxCallback_;
136:
137: protected Configuration config_;
138:
139: private final ServantLifecyleControl servantLifecycle_;
140:
141: // //////////////////////////////////////
142:
143: protected AbstractProxy(IAdmin admin, ORB orb, POA poa,
144: Configuration conf, TaskProcessor taskProcessor,
145: OfferManager offerManager,
146: SubscriptionManager subscriptionManager) {
147: parentMBean_ = admin.getAdminMBean();
148: id_ = new Integer(admin.getProxyID());
149: isIDPublic_ = admin.isIDPublic();
150: container_ = admin.getContainer();
151:
152: orb_ = orb;
153: poa_ = poa;
154: taskProcessor_ = taskProcessor;
155:
156: offerManager_ = offerManager;
157: subscriptionManager_ = subscriptionManager;
158:
159: filterManager_ = new FilterManager();
160:
161: nullMappingFilterRef_ = MappingFilterHelper.narrow(orb
162: .string_to_object(orb.object_to_string(null)));
163:
164: logger_ = ((org.jacorb.config.Configuration) conf)
165: .getNamedLogger(getClass().getName());
166:
167: disposedProxyDisconnectsClient_ = conf.getAttribute(
168: Attributes.DISPOSE_PROXY_CALLS_DISCONNECT,
169: Default.DEFAULT_DISPOSE_PROXY_CALLS_DISCONNECT).equals(
170: "on");
171:
172: qosSettings_ = new QoSPropertySet(conf,
173: QoSPropertySet.PROXY_QOS);
174:
175: servantLifecycle_ = new ServantLifecyleControl(this , conf);
176:
177: configure(conf);
178: }
179:
180: public void configure(Configuration conf) {
181: config_ = conf;
182: }
183:
184: // //////////////////////////////////////
185:
186: public void registerDisposable(Disposable d) {
187: disposables_.addDisposable(d);
188: }
189:
190: public boolean isIDPublic() {
191: return isIDPublic_;
192: }
193:
194: public final POA getPOA() {
195: return poa_;
196: }
197:
198: protected ORB getORB() {
199: return orb_;
200: }
201:
202: public final org.omg.CORBA.Object activate() {
203: return servantLifecycle_.activate();
204: }
205:
206: protected TaskProcessor getTaskProcessor() {
207: return taskProcessor_;
208: }
209:
210: // ////////////////////////////////////////////////////
211: // delegate FilterAdmin Operations to FilterManager //
212: // ////////////////////////////////////////////////////
213:
214: public final int add_filter(Filter filter) {
215: return filterManager_.add_filter(filter);
216: }
217:
218: public final void remove_filter(int n) throws FilterNotFound {
219: filterManager_.remove_filter(n);
220: }
221:
222: public final Filter get_filter(int n) throws FilterNotFound {
223: return filterManager_.get_filter(n);
224: }
225:
226: public final int[] get_all_filters() {
227: return filterManager_.get_all_filters();
228: }
229:
230: public final void remove_all_filters() {
231: filterManager_.remove_all_filters();
232: }
233:
234: // //////////////////////////////////////
235:
236: // TODO implement
237: public void validate_event_qos(Property[] qosProps,
238: NamedPropertyRangeSeqHolder propSeqHolder) {
239: throw new NO_IMPLEMENT();
240: }
241:
242: public final void validate_qos(Property[] props,
243: NamedPropertyRangeSeqHolder propertyRange)
244: throws UnsupportedQoS {
245: qosSettings_.validate_qos(props, propertyRange);
246: }
247:
248: public final void set_qos(Property[] qosProps)
249: throws UnsupportedQoS {
250: qosSettings_.set_qos(qosProps);
251: }
252:
253: public final Property[] get_qos() {
254: return qosSettings_.get_qos();
255: }
256:
257: public final void priority_filter(MappingFilter filter) {
258: priorityFilter_ = filter;
259: }
260:
261: public final MappingFilter priority_filter() {
262: if (priorityFilter_ == null) {
263: return nullMappingFilterRef_;
264: }
265:
266: return priorityFilter_;
267: }
268:
269: public final MappingFilter lifetime_filter() {
270: if (lifetimeFilter_ == null) {
271: return nullMappingFilterRef_;
272: }
273:
274: return lifetimeFilter_;
275: }
276:
277: public final void lifetime_filter(MappingFilter filter) {
278: lifetimeFilter_ = filter;
279: }
280:
281: public final Integer getID() {
282: return id_;
283: }
284:
285: public final List getFilters() {
286: return filterManager_.getFilters();
287: }
288:
289: public final void deactivate() {
290: servantLifecycle_.deactivate();
291: }
292:
293: private void tryDisconnectClient() {
294: try {
295: if (disposedProxyDisconnectsClient_ && connected_.get()) {
296: logger_.info("disconnect_client");
297:
298: disconnectClient();
299:
300: client_._release();
301: }
302: } catch (Exception e) {
303: logger_
304: .info(
305: "disconnect_client raised an unexpected error: will be ignored",
306: e);
307: } finally {
308: connected_.set(false);
309: client_ = null;
310: }
311: }
312:
313: public final boolean isDestroyed() {
314: return destroyed_.get();
315: }
316:
317: protected void checkDestroyStatus() throws OBJECT_NOT_EXIST {
318: if (!destroyed_.compareAndSet(false, true)) {
319: logger_.error("Already destroyed");
320:
321: throw new OBJECT_NOT_EXIST();
322: }
323: }
324:
325: /**
326: * @jmx.managed-operation description = "Destroy this Proxy" impact = "ACTION"
327: */
328: public final void destroy() {
329: checkDestroyStatus();
330:
331: container_.dispose();
332:
333: final List list = container_
334: .getComponentInstancesOfType(IContainer.class);
335: for (Iterator i = list.iterator(); i.hasNext();) {
336: IContainer element = (IContainer) i.next();
337: element.destroy();
338: }
339: }
340:
341: public void dispose() {
342: logger_.info("Destroy Proxy " + id_);
343:
344: disposeInProgress_.set(true);
345:
346: // ////////////////////////////
347:
348: tryDisconnectClient();
349:
350: clientDisconnected();
351:
352: // ////////////////////////////
353:
354: removeListener();
355:
356: // ////////////////////////////
357:
358: remove_all_filters();
359:
360: // ////////////////////////////
361:
362: disposables_.dispose();
363:
364: deactivate();
365: }
366:
367: protected abstract void clientDisconnected();
368:
369: public abstract ProxyType MyType();
370:
371: void setInterFilterGroupOperatorOR(boolean b) {
372: isInterFilterGroupOperatorOR_ = b;
373: }
374:
375: public final boolean hasInterFilterGroupOperatorOR() {
376: return isInterFilterGroupOperatorOR_;
377: }
378:
379: /**
380: * @jmx.managed-attribute description = "Connection Status."
381: * access = "read-only"
382: */
383: public final boolean getConnected() {
384: return !disposeInProgress_.get() && connected_.get();
385: }
386:
387: public final boolean hasLifetimeFilter() {
388: return lifetimeFilter_ != null;
389: }
390:
391: public final boolean hasPriorityFilter() {
392: return priorityFilter_ != null;
393: }
394:
395: public final MappingFilter getLifetimeFilter() {
396: return lifetimeFilter_;
397: }
398:
399: public final MappingFilter getPriorityFilter() {
400: return priorityFilter_;
401: }
402:
403: /**
404: * @jmx.managed-operation impact = "ACTION"
405: * description = "reset the error counter to its initial value"
406: */
407: public void resetErrorCounter() {
408: errorCounter_.set(0);
409: }
410:
411: /**
412: * @jmx.managed-attribute description = "error counter"
413: * access = "read-only"
414: */
415: public final int getErrorCounter() {
416: return errorCounter_.get();
417: }
418:
419: public final int incErrorCounter() {
420: return errorCounter_.getAndIncrement();
421: }
422:
423: public boolean isSuspended() {
424: return !active_.get();
425: }
426:
427: public final void suspend_connection() throws NotConnected,
428: ConnectionAlreadyInactive {
429: checkIsConnected();
430:
431: if (!active_.compareAndSet(true, false)) {
432: throw new ConnectionAlreadyInactive();
433: }
434:
435: connectionSuspended();
436: }
437:
438: /**
439: * this is an extension point.
440: */
441: protected void connectionSuspended() {
442: // No Op
443: }
444:
445: public final void resume_connection() throws NotConnected,
446: ConnectionAlreadyActive {
447: checkIsConnected();
448:
449: if (!active_.compareAndSet(false, true)) {
450: throw new ConnectionAlreadyActive();
451: }
452:
453: connectionResumed();
454: }
455:
456: /**
457: * this is an extension point. invoked when resume_connection was called successfully.
458: */
459: protected void connectionResumed() {
460: // NO OP
461: }
462:
463: protected void checkIsConnected() throws NotConnected {
464: if (!connected_.get()) {
465: throw new NotConnected();
466: }
467: }
468:
469: protected void checkIsNotConnected() throws AlreadyConnected {
470: if (connected_.get()) {
471: throw new AlreadyConnected();
472: }
473: }
474:
475: protected void checkStillConnected() throws Disconnected {
476: if (!connected_.get()) {
477: logger_.fatalError("access on a not connected proxy");
478:
479: destroy();
480:
481: throw new Disconnected();
482: }
483: }
484:
485: protected void connectClient(org.omg.CORBA.Object client) {
486: connected_.set(true);
487:
488: client_ = client;
489: }
490:
491: /**
492: * invoke the proxy specific disconnect method.
493: */
494: protected abstract void disconnectClient();
495:
496: protected void handleDisconnected(Disconnected e) {
497: logger_
498: .fatalError(
499: "Illegal state: Client think it's disconnected. "
500: + "Proxy thinks Client is still connected. The Proxy will be destroyed.",
501: e);
502:
503: destroy();
504: }
505:
506: protected abstract void removeListener();
507:
508: public final String getJMXObjectName() {
509: return parentMBean_ + ", proxy=" + getMBeanName();
510: }
511:
512: public final String getMBeanName() {
513: return getMBeanType() + "-" + getID();
514: }
515:
516: protected String getMBeanType() {
517: String clazzName = getClass().getName();
518:
519: String rawClazz = clazzName.substring(clazzName
520: .lastIndexOf('.') + 1);
521:
522: return rawClazz.substring(0, rawClazz.length()
523: - "Impl".length());
524: }
525:
526: public String[] getJMXNotificationTypes() {
527: return (String[]) eventTypes_.toArray(new String[eventTypes_
528: .size()]);
529: }
530:
531: public void setJMXCallback(JMXManageable.JMXCallback callback) {
532: jmxCallback_ = callback;
533: }
534:
535: protected void sendNotification(String type, String message) {
536: if (jmxCallback_ != null) {
537: jmxCallback_.sendJMXNotification(type, message);
538: }
539: }
540:
541: protected void sendNotification(String type, String message,
542: Object payload) {
543: if (jmxCallback_ != null) {
544: jmxCallback_.sendJMXNotification(type, message, payload);
545: }
546: }
547:
548: /**
549: * @jmx.managed-attribute description = "current Status for this Proxy (NOT CONNECTED|ACTIVE|SUSPENDED|DESTROYED)"
550: * access = "read-only"
551: */
552: public String getStatus() {
553: final String _status;
554:
555: if (destroyed_.get()) {
556: _status = "DESTROYED";
557: } else if (!connected_.get()) {
558: _status = "NOT CONNECTED";
559: } else {
560: _status = active_.get() ? "ACTIVE" : "SUSPENDED";
561: }
562:
563: return _status;
564: }
565:
566: /**
567: * @jmx.managed-attribute description = "IOR of the connected client"
568: * access = "read-only"
569: */
570: public String getClientIOR() {
571: return (client_ != null) ? orb_.object_to_string(client_) : "";
572: }
573:
574: /**
575: * @jmx.managed-attribute description = "InterFilterGroupOperator used for this proxy"
576: * access = "read-only"
577: * currencyTimeLimit = "2147483647"
578: */
579: public String getInterFilterGroupOperator() {
580: return isInterFilterGroupOperatorOR_ ? "OR_OP" : "AND_OP";
581: }
582: }
|