001: package org.jacorb.notification.servant;
002:
003: /*
004: * JacORB - a free Java ORB
005: *
006: * Copyright (C) 1999-2004 Gerald Brose
007: *
008: * This library is free software; you can redistribute it and/or
009: * modify it under the terms of the GNU Library General Public
010: * License as published by the Free Software Foundation; either
011: * version 2 of the License, or (at your option) any later version.
012: *
013: * This library is distributed in the hope that it will be useful,
014: * but WITHOUT ANY WARRANTY; without even the implied warranty of
015: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
016: * Library General Public License for more details.
017: *
018: * You should have received a copy of the GNU Library General Public
019: * License along with this library; if not, write to the Free
020: * Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
021: *
022: */
023:
024: import org.apache.avalon.framework.configuration.Configuration;
025: import org.apache.avalon.framework.configuration.ConfigurationException;
026: import org.jacorb.notification.OfferManager;
027: import org.jacorb.notification.SubscriptionManager;
028: import org.jacorb.notification.engine.MessagePushOperation;
029: import org.jacorb.notification.engine.PushTaskExecutorFactory;
030: import org.jacorb.notification.engine.TaskProcessor;
031: import org.jacorb.notification.interfaces.Message;
032: import org.omg.CORBA.ORB;
033: import org.omg.CosEventChannelAdmin.AlreadyConnected;
034: import org.omg.CosEventComm.Disconnected;
035: import org.omg.CosNotifyChannelAdmin.ConsumerAdmin;
036: import org.omg.CosNotifyChannelAdmin.ProxyType;
037: import org.omg.CosNotifyChannelAdmin.StructuredProxyPushSupplierOperations;
038: import org.omg.CosNotifyChannelAdmin.StructuredProxyPushSupplierPOATie;
039: import org.omg.CosNotifyComm.StructuredPushConsumer;
040: import org.omg.CosNotifyComm.StructuredPushConsumerOperations;
041: import org.omg.PortableServer.POA;
042: import org.omg.PortableServer.Servant;
043:
044: /**
045: * @jmx.mbean extends = "AbstractProxyPushSupplierMBean"
046: * @jboss.xmbean
047: *
048: * @author Alphonse Bendt
049: * @version $Id: StructuredProxyPushSupplierImpl.java,v 1.24 2006/03/06 19:53:46 alphonse.bendt Exp $
050: */
051:
052: public class StructuredProxyPushSupplierImpl extends
053: AbstractProxyPushSupplier implements
054: StructuredProxyPushSupplierOperations,
055: StructuredProxyPushSupplierImplMBean {
056: private class PushStructuredOperation extends MessagePushOperation {
057: public PushStructuredOperation(Message message) {
058: super (message);
059: }
060:
061: public void invokePush() throws Disconnected {
062: deliverMessageInternal(message_);
063: }
064: }
065:
066: private StructuredPushConsumerOperations pushConsumer_;
067:
068: private long timeSpent_;
069:
070: // //////////////////////////////////////
071:
072: public StructuredProxyPushSupplierImpl(IAdmin admin, ORB orb,
073: POA poa, Configuration conf, TaskProcessor taskProcessor,
074: PushTaskExecutorFactory pushTaskExecutorFactory,
075: OfferManager offerManager,
076: SubscriptionManager subscriptionManager,
077: ConsumerAdmin consumerAdmin) throws ConfigurationException {
078: super (admin, orb, poa, conf, taskProcessor,
079: pushTaskExecutorFactory, offerManager,
080: subscriptionManager, consumerAdmin);
081: }
082:
083: public ProxyType MyType() {
084: return ProxyType.PUSH_STRUCTURED;
085: }
086:
087: public boolean pushEvent() {
088: final Message _message = getMessageNoBlock();
089:
090: if (_message != null) {
091: try {
092: return deliverMessageWithRetry(_message);
093: } finally {
094: _message.dispose();
095: }
096: }
097:
098: return false;
099: }
100:
101: private boolean deliverMessageWithRetry(final Message message) {
102: try {
103: deliverMessageInternal(message);
104:
105: return true;
106: } catch (Exception e) {
107: final PushStructuredOperation _failedOperation = new PushStructuredOperation(
108: message);
109:
110: handleFailedPushOperation(_failedOperation, e);
111:
112: return false;
113: }
114: }
115:
116: private void deliverMessageInternal(final Message message)
117: throws Disconnected {
118: final long now = System.currentTimeMillis();
119: pushConsumer_
120: .push_structured_event(message.toStructuredEvent());
121: final long _duration = (System.currentTimeMillis() - now);
122: timeSpent_ += _duration;
123: resetErrorCounter();
124:
125: if (logger_.isDebugEnabled()) {
126: logger_.debug("Push took " + _duration + " ms");
127: }
128: }
129:
130: public void connect_structured_push_consumer(
131: StructuredPushConsumer consumer) throws AlreadyConnected {
132: checkIsNotConnected();
133:
134: if (logger_.isDebugEnabled()) {
135: logger_.debug("connect structured_push_consumer");
136: }
137:
138: pushConsumer_ = consumer;
139:
140: connectClient(consumer);
141: }
142:
143: public void disconnect_structured_push_supplier() {
144: destroy();
145: }
146:
147: protected void connectionResumed() {
148: scheduleFlush();
149: }
150:
151: protected void disconnectClient() {
152: pushConsumer_.disconnect_structured_push_consumer();
153:
154: pushConsumer_ = null;
155: }
156:
157: public Servant newServant() {
158: return new StructuredProxyPushSupplierPOATie(this );
159: }
160:
161: protected long getCost() {
162: return timeSpent_;
163: }
164: }
|