001: /*
002: * JacORB - a free Java ORB
003: *
004: * Copyright (C) 1999-2004 Gerald Brose
005: *
006: * This library is free software; you can redistribute it and/or
007: * modify it under the terms of the GNU Library General Public
008: * License as published by the Free Software Foundation; either
009: * version 2 of the License, or (at your option) any later version.
010: *
011: * This library is distributed in the hope that it will be useful,
012: * but WITHOUT ANY WARRANTY; without even the implied warranty of
013: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
014: * Library General Public License for more details.
015: *
016: * You should have received a copy of the GNU Library General Public
017: * License along with this library; if not, write to the Free
018: * Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
019: *
020: */
021:
022: package org.jacorb.notification.servant;
023:
024: import java.io.PrintWriter;
025: import java.io.StringWriter;
026:
027: import org.apache.avalon.framework.configuration.Configuration;
028: import org.apache.avalon.framework.configuration.ConfigurationException;
029: import org.jacorb.notification.OfferManager;
030: import org.jacorb.notification.SubscriptionManager;
031: import org.jacorb.notification.conf.Attributes;
032: import org.jacorb.notification.conf.Default;
033: import org.jacorb.notification.engine.AbstractRetryStrategy;
034: import org.jacorb.notification.engine.PushOperation;
035: import org.jacorb.notification.engine.PushTaskExecutor;
036: import org.jacorb.notification.engine.PushTaskExecutorFactory;
037: import org.jacorb.notification.engine.RetryException;
038: import org.jacorb.notification.engine.RetryStrategy;
039: import org.jacorb.notification.engine.RetryStrategyFactory;
040: import org.jacorb.notification.engine.TaskProcessor;
041: import org.jacorb.notification.interfaces.IProxyPushSupplier;
042: import org.jacorb.util.ObjectUtil;
043: import org.omg.CORBA.ORB;
044: import org.omg.CosNotifyChannelAdmin.ConsumerAdmin;
045: import org.omg.PortableServer.POA;
046: import org.picocontainer.MutablePicoContainer;
047: import org.picocontainer.defaults.DefaultPicoContainer;
048:
049: import edu.emory.mathcs.backport.java.util.concurrent.Semaphore;
050: import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
051: import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
052: import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
053: import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicReference;
054:
055: /**
056: * @jmx.mbean extends = "AbstractProxySupplierMBean"
057: * @jboss.xmbean
058: *
059: * @--jmx.notification name = "notification.proxy.push_failed" description = "push to
060: * ProxyPushConsumer failed" notificationType = "java.lang.String"
061: *
062: * @author Alphonse Bendt
063: * @version $Id: AbstractProxyPushSupplier.java,v 1.9 2006/06/16 14:33:39 alphonse.bendt Exp $
064: */
065: public abstract class AbstractProxyPushSupplier extends
066: AbstractProxySupplier implements IProxyPushSupplier {
067: private static final String NOTIFY_PUSH_FAILED = "notification.proxy.push_failed";
068:
069: private final AtomicReference retryStrategyFactory_;
070:
071: /**
072: * flag to indicate that this ProxySupplier should invoke remote calls (push) during
073: * deliverMessage.
074: */
075: private final AtomicBoolean enabled_ = new AtomicBoolean(true);
076:
077: private final PushTaskExecutor pushTaskExecutor_;
078:
079: private final AtomicInteger pushCounter_ = new AtomicInteger(0);
080:
081: private final AtomicInteger pushErrors_ = new AtomicInteger(0);
082:
083: /**
084: * number of concurrent push operations allowed.
085: */
086: protected final Semaphore pushSync_ = new Semaphore(1);
087:
088: private final PushTaskExecutor.PushTask pushTask_ = new PushTaskExecutor.PushTask() {
089: public void doPush() {
090: if (isEnabled()) {
091: tryPushEvent();
092: }
093: }
094:
095: public void cancel() {
096: // ignore, only depends on settings of ProxyPushSupplier
097: }
098: };
099:
100: private final PushTaskExecutor.PushTask flushTask_ = new PushTaskExecutor.PushTask() {
101: public void doPush() {
102: if (isEnabled()) {
103: flushPendingEvents();
104: }
105: }
106:
107: public void cancel() {
108: // ignore, only depends on settings of ProxyPushSupplier
109: }
110: };
111:
112: public AbstractProxyPushSupplier(IAdmin admin, ORB orb, POA poa,
113: Configuration conf, TaskProcessor taskProcessor,
114: PushTaskExecutorFactory pushTaskExecutorFactory,
115: OfferManager offerManager,
116: SubscriptionManager subscriptionManager,
117: ConsumerAdmin consumerAdmin) throws ConfigurationException {
118: super (admin, orb, poa, conf, taskProcessor, offerManager,
119: subscriptionManager, consumerAdmin);
120:
121: pushTaskExecutor_ = pushTaskExecutorFactory.newExecutor(this );
122:
123: retryStrategyFactory_ = new AtomicReference(
124: newRetryStrategyFactory(conf, taskProcessor));
125:
126: eventTypes_.add(NOTIFY_PUSH_FAILED);
127: }
128:
129: private boolean tryPushEvent() {
130: try {
131: boolean _acquired = pushSync_.tryAcquire(1000,
132: TimeUnit.MILLISECONDS);
133:
134: if (_acquired) {
135: try {
136: return pushEvent();
137: } finally {
138: pushSync_.release();
139: }
140: }
141:
142: // the scheduled push was not processed.
143: // therfor we need to schedule a push again.
144: schedulePush();
145: } catch (InterruptedException e) {
146: // ignored
147: }
148:
149: return true;
150: }
151:
152: protected abstract boolean pushEvent();
153:
154: protected void handleFailedPushOperation(PushOperation operation,
155: Exception error) {
156: logger_.warn("handle failed pushoperation", error);
157:
158: if (isDestroyed()) {
159: operation.dispose();
160:
161: return;
162: }
163:
164: StringWriter out = new StringWriter();
165: error.printStackTrace(new PrintWriter(out));
166: sendNotification(NOTIFY_PUSH_FAILED, "Push Operation failed",
167: out.toString());
168:
169: pushErrors_.getAndIncrement();
170:
171: incErrorCounter();
172:
173: if (AbstractRetryStrategy.isFatalException(error)) {
174: // push operation caused a fatal exception
175: // destroy the ProxySupplier
176: if (logger_.isWarnEnabled()) {
177: logger_.warn("push raised " + error
178: + ": will destroy ProxySupplier, "
179: + "disconnect Consumer", error);
180: }
181:
182: operation.dispose();
183: destroy();
184: } else if (!isRetryAllowed()) {
185: logger_
186: .warn("no more retries allowed. disconnect consumer");
187:
188: operation.dispose();
189:
190: destroy();
191: } else if (!isDestroyed()) {
192: final RetryStrategy _retry = newRetryStrategy(this ,
193: operation);
194:
195: try {
196: _retry.retry();
197: } catch (RetryException e) {
198: logger_.error("retry failed", e);
199:
200: _retry.dispose();
201: destroy();
202: }
203: } else {
204: // retry allowed && isDestroyed
205: throw new IllegalStateException("should not happen");
206: }
207: }
208:
209: private RetryStrategy newRetryStrategy(
210: IProxyPushSupplier pushSupplier, PushOperation pushOperation) {
211: return ((RetryStrategyFactory) retryStrategyFactory_.get())
212: .newRetryStrategy(pushSupplier, pushOperation);
213: }
214:
215: private RetryStrategyFactory newRetryStrategyFactory(
216: Configuration config, TaskProcessor taskProcessor)
217: throws ConfigurationException {
218: final String factoryName = config.getAttribute(
219: Attributes.RETRY_STRATEGY_FACTORY,
220: Default.DEFAULT_RETRY_STRATEGY_FACTORY);
221:
222: try {
223: return newRetryStrategyFactory(config, taskProcessor,
224: factoryName);
225:
226: } catch (ClassNotFoundException e) {
227: throw new ConfigurationException(
228: Attributes.RETRY_STRATEGY_FACTORY, e);
229: }
230: }
231:
232: /**
233: * @jmx.managed-attribute description = "Factory used to control RetryPolicy" access =
234: * "read-write"
235: */
236: public void setRetryStrategy(String factoryName)
237: throws ClassNotFoundException {
238: RetryStrategyFactory factory = newRetryStrategyFactory(config_,
239: getTaskProcessor(), factoryName);
240:
241: retryStrategyFactory_.set(factory);
242:
243: logger_.info("set RetryStrategyFactory: " + factoryName);
244: }
245:
246: /**
247: * @jmx.managed-attribute description = "Factory used to control RetryPolicy" access =
248: * "read-write"
249: */
250: public String getRetryStrategy() {
251: return retryStrategyFactory_.get().getClass().getName();
252: }
253:
254: private RetryStrategyFactory newRetryStrategyFactory(
255: Configuration config, TaskProcessor taskProcessor,
256: String factoryName) throws ClassNotFoundException {
257: final Class factoryClazz = ObjectUtil.classForName(factoryName);
258:
259: final MutablePicoContainer pico = new DefaultPicoContainer();
260:
261: pico.registerComponentInstance(TaskProcessor.class,
262: taskProcessor);
263:
264: pico.registerComponentImplementation(
265: RetryStrategyFactory.class, factoryClazz);
266:
267: pico.registerComponentInstance(config);
268:
269: return (RetryStrategyFactory) pico
270: .getComponentInstance(RetryStrategyFactory.class);
271: }
272:
273: public final void schedulePush() {
274: if (isEnabled()) {
275: scheduleTask(pushTask_);
276: }
277: }
278:
279: public void scheduleFlush() {
280: if (isEnabled()) {
281: scheduleTask(flushTask_);
282: }
283: }
284:
285: public final void scheduleTask(PushTaskExecutor.PushTask pushTask) {
286: if (!isDestroyed() && !isSuspended()) {
287: pushTaskExecutor_.executePush(pushTask);
288: }
289: }
290:
291: public void flushPendingEvents() {
292: while (tryPushEvent()) {
293: // nothing
294: }
295: }
296:
297: public final void messageQueued() {
298: if (isEnabled()) {
299: schedulePush();
300: }
301: }
302:
303: public void resetErrorCounter() {
304: super .resetErrorCounter();
305:
306: pushCounter_.getAndIncrement();
307:
308: enableDelivery();
309: }
310:
311: public void disableDelivery() {
312: boolean _wasEnabled = enabled_.getAndSet(false);
313:
314: if (_wasEnabled) {
315: logger_
316: .warn("disabled delivery to ProxySupplier temporarily");
317: }
318: }
319:
320: protected boolean isEnabled() {
321: return enabled_.get();
322: }
323:
324: private void enableDelivery() {
325: boolean _wasEnabled = enabled_.getAndSet(true);
326:
327: if (!_wasEnabled) {
328: logger_.debug("enabled delivery to ProxySupplier");
329: }
330: }
331:
332: /**
333: * @jmx.managed-attribute description = "Total Number of Push Operations" access = "read-only"
334: */
335: public int getPushOperationCount() {
336: return pushCounter_.get();
337: }
338:
339: /**
340: * @jmx.managed-attribute description = "Number of failed Push-Operations" access = "read-only"
341: */
342: public int getPushErrorCount() {
343: return pushErrors_.get();
344: }
345:
346: /**
347: * @jmx.managed-attribute description = "Average time (in ms) per Push-Operation" access =
348: * "read-only"
349: */
350: public int getAveragePushDuration() {
351: return (int) getCost() / getPushOperationCount();
352: }
353: }
|