001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017:
018: /*
019: * ChainBuilder ESB
020: * Visual Enterprise Integration
021: *
022: * Copyright (C) 2006 Bostech Corporation
023: *
024: * This program is free software; you can redistribute it and/or modify it
025: * under the terms of the GNU General Public License as published by the
026: * Free Software Foundation; either version 2 of the License, or (at your option)
027: * any later version.
028: *
029: * This program is distributed in the hope that it will be useful,
030: * but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
031: * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
032: * for more details.
033: *
034: * You should have received a copy of the GNU General Public License along with
035: * this program; if not, write to the Free Software Foundation, Inc.,
036: * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
037: *
038: * $Id: CbLifeCycle.java 11812 2008-02-01 09:00:11Z lzheng $
039: */
040: package com.bostechcorp.cbesb.runtime.ccsl.jbi.messaging;
041:
042: import java.lang.reflect.Method;
043:
044: import javax.jbi.JBIException;
045: import javax.jbi.component.ComponentContext;
046: import javax.jbi.component.ComponentLifeCycle;
047: import javax.jbi.management.MBeanNames;
048: import javax.jbi.messaging.DeliveryChannel;
049: import javax.jbi.messaging.MessageExchange;
050: import javax.jbi.messaging.MessageExchange.Role;
051: import javax.jbi.servicedesc.ServiceEndpoint;
052: import javax.management.MBeanServer;
053: import javax.management.NotificationFilter;
054: import javax.management.ObjectName;
055: import javax.resource.spi.work.Work;
056: import javax.resource.spi.work.WorkManager;
057: import javax.transaction.Status;
058: import javax.transaction.Transaction;
059: import javax.transaction.TransactionManager;
060: import javax.xml.namespace.QName;
061:
062: import org.apache.commons.logging.Log;
063:
064: import com.bostechcorp.cbesb.common.constant.MetadataConstants;
065: import com.bostechcorp.cbesb.runtime.ccsl.jbi.messaging.notification.MessageNotificationListener;
066:
067: import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
068:
069: /**
070: * Base class for life cycle management of components.
071: * This class may be used as is.
072: *
073: * @author Guillaume Nodet
074: * @version $Revision: 399873 $
075: * @since 3.0
076: */
077: public class CbLifeCycle implements ComponentLifeCycle {
078:
079: protected final transient Log logger;
080:
081: protected CbComponent component;
082: protected ComponentContext context;
083: protected ObjectName mbeanName;
084: protected WorkManager workManager;
085: protected AtomicBoolean running;
086: protected DeliveryChannel channel;
087: protected Thread poller;
088: protected AtomicBoolean polling;
089: protected TransactionManager transactionManager;
090: protected boolean workManagerCreated;
091:
092: protected MessageNotificationListener clientListener = null;
093:
094: public CbLifeCycle(CbComponent component) {
095: this .component = component;
096: this .logger = component.logger;
097: this .running = new AtomicBoolean(false);
098: this .polling = new AtomicBoolean(false);
099: }
100:
101: /* (non-Javadoc)
102: * @see javax.jbi.component.ComponentLifeCycle#getExtensionMBeanName()
103: */
104: public ObjectName getExtensionMBeanName() {
105: return mbeanName;
106: }
107:
108: protected Object getExtensionMBean() throws Exception {
109: return null;
110: }
111:
112: protected ObjectName createExtensionMBeanName() throws Exception {
113: mbeanName = this .context.getMBeanNames()
114: .createCustomComponentMBeanName(
115: MBeanNames.COMPONENT_LIFE_CYCLE_EXTENSION);
116: return mbeanName;
117: }
118:
119: protected QName getEPRServiceName() {
120: return null;
121: }
122:
123: /* (non-Javadoc)
124: * @see javax.jbi.component.ComponentLifeCycle#init(javax.jbi.component.ComponentContext)
125: */
126: public void init(ComponentContext context) throws JBIException {
127: try {
128: if (logger.isDebugEnabled()) {
129: logger.debug("Initializing component");
130: }
131: this .context = context;
132: this .channel = context.getDeliveryChannel();
133: try {
134: this .transactionManager = (TransactionManager) context
135: .getTransactionManager();
136: } catch (Throwable e) {
137: // Ignore, this is just a safeguard against non compliant
138: // JBI implementation which throws an exception instead of
139: // return null
140: }
141: doInit();
142: if (logger.isDebugEnabled()) {
143: logger.debug("Component initialized");
144: }
145: } catch (JBIException e) {
146: throw e;
147: } catch (Exception e) {
148: throw new JBIException("Error calling init", e);
149: }
150: }
151:
152: protected void doInit() throws Exception {
153: // Register extension mbean
154: Object mbean = getExtensionMBean();
155: if (mbean != null) {
156: MBeanServer server = this .context.getMBeanServer();
157: if (server == null) {
158: // TODO: log a warning ?
159: //throw new JBIException("null mBeanServer");
160: } else {
161: this .mbeanName = createExtensionMBeanName();
162: if (server.isRegistered(this .mbeanName)) {
163: // server.unregisterMBean(this.mbeanName);
164:
165: } else {
166: server.registerMBean(mbean, this .mbeanName);
167: this .clientListener = new MessageNotificationListener();
168: server.addNotificationListener(this .mbeanName,
169: this .clientListener, null, null);
170: this .logger
171: .debug("Add MessageNotificationListener listener to mbean:"
172: + mbeanName);
173: }
174: }
175: }
176: // Obtain or create the work manager
177: // When using the WorkManager from ServiceMix,
178: // some class loader problems can appear when
179: // trying to uninstall the components.
180: // Some threads owned by the work manager have a
181: // security context referencing the component class loader
182: // so that every loaded classes are locked
183: //this.workManager = findWorkManager();
184: if (this .workManager == null) {
185: this .workManagerCreated = true;
186: this .workManager = createWorkManager();
187: }
188: }
189:
190: /* (non-Javadoc)
191: * @see javax.jbi.component.ComponentLifeCycle#shutDown()
192: */
193: public void shutDown() throws JBIException {
194: try {
195: if (logger.isDebugEnabled()) {
196: logger.debug("Shutting down component");
197: }
198: doShutDown();
199: this .context = null;
200: if (logger.isDebugEnabled()) {
201: logger.debug("Component shut down");
202: }
203: } catch (JBIException e) {
204: throw e;
205: } catch (Exception e) {
206: throw new JBIException("Error calling shutdown", e);
207: }
208: }
209:
210: protected void doShutDown() throws Exception {
211: // Unregister mbean
212: if (this .mbeanName != null) {
213: MBeanServer server = this .context.getMBeanServer();
214: if (server == null) {
215: throw new JBIException("null mBeanServer");
216: }
217: if (this .clientListener != null)
218: server.removeNotificationListener(this .mbeanName,
219: this .clientListener, null, null);
220:
221: if (server.isRegistered(this .mbeanName)) {
222: server.unregisterMBean(this .mbeanName);
223: }
224:
225: }
226: // Destroy work manager, if created
227: if (this .workManagerCreated) {
228: if (this .workManager instanceof WorkManagerImpl) {
229: ((WorkManagerImpl) this .workManager).shutDown();
230: }
231: this .workManager = null;
232: }
233: }
234:
235: /* (non-Javadoc)
236: * @see javax.jbi.component.ComponentLifeCycle#start()
237: */
238: public void start() throws JBIException {
239: try {
240: if (logger.isDebugEnabled()) {
241: logger.debug("Starting component");
242: }
243: if (this .running.compareAndSet(false, true)) {
244: doStart();
245: }
246: if (logger.isDebugEnabled()) {
247: logger.debug("Component started");
248: }
249: } catch (JBIException e) {
250: throw e;
251: } catch (Exception e) {
252: throw new JBIException("Error calling start", e);
253: }
254: }
255:
256: protected void doStart() throws Exception {
257: synchronized (this .polling) {
258: workManager.startWork(new Work() {
259: public void release() {
260: }
261:
262: public void run() {
263: poller = Thread.currentThread();
264: pollDeliveryChannel();
265: }
266: });
267: polling.wait();
268: }
269: }
270:
271: protected void pollDeliveryChannel() {
272: synchronized (polling) {
273: polling.set(true);
274: polling.notify();
275: }
276: while (running.get()) {
277: try {
278: final MessageExchange exchange = channel.accept(1000L);
279: if (exchange != null) {
280: final Transaction tx = (Transaction) exchange
281: .getProperty(MessageExchange.JTA_TRANSACTION_PROPERTY_NAME);
282: if (tx != null) {
283: if (transactionManager == null) {
284: throw new IllegalStateException(
285: "Exchange is enlisted in a transaction, but no transaction manager is available");
286: }
287: transactionManager.suspend();
288: }
289: workManager.scheduleWork(new Work() {
290: public void release() {
291: }
292:
293: public void run() {
294: processExchangeInTx(exchange, tx);
295: }
296: });
297: }
298: } catch (Throwable t) {
299: if (running.get() == false) {
300: // Should have been interrupted, discard the throwable
301: if (logger.isDebugEnabled()) {
302: logger.debug("Polling thread will stop");
303: }
304: } else {
305: logger.error("Error polling delivery channel", t);
306: }
307: }
308: }
309: synchronized (polling) {
310: polling.set(false);
311: polling.notify();
312: }
313: }
314:
315: /* (non-Javadoc)
316: * @see javax.jbi.component.ComponentLifeCycle#stop()
317: */
318: public void stop() throws JBIException {
319: try {
320: if (logger.isDebugEnabled()) {
321: logger.debug("Stopping component");
322: }
323: if (this .running.compareAndSet(true, false)) {
324: doStop();
325: }
326: if (logger.isDebugEnabled()) {
327: logger.debug("Component stopped");
328: }
329: } catch (JBIException e) {
330: throw e;
331: } catch (Exception e) {
332: throw new JBIException("Error calling stop", e);
333: }
334: }
335:
336: protected void doStop() throws Exception {
337: // Interrupt the polling thread and await termination
338: try {
339: synchronized (polling) {
340: if (polling.get()) {
341: poller.interrupt();
342: polling.wait();
343: }
344: }
345: } finally {
346: poller = null;
347: }
348: }
349:
350: /**
351: * @return Returns the context.
352: */
353: public ComponentContext getContext() {
354: return context;
355: }
356:
357: public WorkManager getWorkManager() {
358: return workManager;
359: }
360:
361: protected WorkManager createWorkManager() {
362: // Create a very simple one
363: return new WorkManagerImpl();
364: }
365:
366: protected WorkManager findWorkManager() {
367: // If inside ServiceMix, retrieve its work manager
368: try {
369: Method getContainerMth = context.getClass().getMethod(
370: "getContainer", new Class[0]);
371: Object container = getContainerMth.invoke(context,
372: new Object[0]);
373: Method getWorkManagerMth = container.getClass().getMethod(
374: "getWorkManager", new Class[0]);
375: return (WorkManager) getWorkManagerMth.invoke(container,
376: new Object[0]);
377: } catch (Throwable t) {
378: if (logger.isDebugEnabled()) {
379: logger
380: .debug(
381: "JBI container is not ServiceMix. Will create our own WorkManager",
382: t);
383: }
384: }
385: // TODO: should look in jndi for an existing work manager
386: return null;
387: }
388:
389: protected void processExchangeInTx(MessageExchange exchange,
390: Transaction tx) {
391: try {
392: if (tx != null) {
393: transactionManager.resume(tx);
394: }
395: processExchange(exchange);
396: } catch (Exception e) {
397: logger.error("Error processing exchange " + exchange, e);
398: try {
399: // If we are transacted, check if this exception should
400: // rollback the transaction
401: if (transactionManager != null
402: && transactionManager.getStatus() == Status.STATUS_ACTIVE
403: && exceptionShouldRollbackTx(e)) {
404: transactionManager.setRollbackOnly();
405: }
406: exchange.setError(e);
407: channel.send(exchange);
408: } catch (Exception inner) {
409: logger.error("Error setting exchange status to ERROR",
410: inner);
411: }
412: } finally {
413: try {
414: // Check transaction status
415: if (tx != null) {
416: int status = transactionManager.getStatus();
417: // We use pull delivery, so the transaction should already
418: // have been transfered to another thread because the component
419: // must have answered.
420: if (status != Status.STATUS_NO_TRANSACTION) {
421: logger
422: .error("Transaction is still active after exchange processing. Trying to rollback transaction.");
423: try {
424: transactionManager.rollback();
425: } catch (Throwable t) {
426: logger
427: .error(
428: "Error trying to rollback transaction.",
429: t);
430: }
431: }
432: }
433: } catch (Throwable t) {
434: logger.error("Error checking transaction status.", t);
435: }
436: }
437: }
438:
439: protected boolean exceptionShouldRollbackTx(Exception e) {
440: return false;
441: }
442:
443: public void processExchange(MessageExchange exchange)
444: throws Exception {
445: if (logger.isDebugEnabled()) {
446: logger.debug("Received exchange: status: "
447: + exchange.getStatus()
448: + ", role: "
449: + (exchange.getRole() == Role.CONSUMER ? "consumer"
450: : "provider"));
451: }
452: if (exchange.getRole() == Role.PROVIDER) {
453: boolean dynamic = false;
454: ServiceEndpoint endpoint = exchange.getEndpoint();
455: String key = EndpointKeyUtil.getKey(exchange.getEndpoint());
456: CbEndpoint ep = (CbEndpoint) this .component.getRegistry()
457: .getEndpoint(key);
458: if (ep == null) {
459: if (endpoint.getServiceName().equals(
460: getEPRServiceName())) {
461: ep = getResolvedEPR(exchange.getEndpoint());
462: dynamic = true;
463: }
464: if (ep == null) {
465: throw new IllegalStateException(
466: "Endpoint not found: " + key);
467: }
468: }
469: IExchangeProcessor processor = ep.getProcessor();
470: if (processor == null) {
471: throw new IllegalStateException(
472: "No processor found for endpoint: " + key);
473: }
474: try {
475: processor.process(exchange);
476: } finally {
477: // If the endpoint is dynamic, deactivate it
478: if (dynamic) {
479: ep.deactivate();
480: }
481: }
482: } else {
483: IExchangeProcessor processor = null;
484: if (exchange
485: .getProperty(MetadataConstants.SENDER_ENDPOINT_PROPERTY) != null) {
486: String key = exchange.getProperty(
487: MetadataConstants.SENDER_ENDPOINT_PROPERTY)
488: .toString();
489: CbEndpoint ep = (CbEndpoint) this .component
490: .getRegistry().getEndpoint(key);
491: if (ep != null) {
492: processor = ep.getProcessor();
493: }
494: } else {
495: throw new IllegalStateException(
496: "No sender endpoint property on: "
497: + exchange.getExchangeId());
498: }
499: if (processor == null) {
500: throw new IllegalStateException(
501: "No processor found for: "
502: + exchange.getExchangeId());
503: }
504: processor.process(exchange);
505: }
506: }
507:
508: /**
509: * Handle an exchange sent to an EPR resolved by this component
510: * @param exchange
511: * @return an endpoint to use for handling the exchange
512: * @throws Exception
513: */
514: protected CbEndpoint getResolvedEPR(ServiceEndpoint ep)
515: throws Exception {
516: throw new UnsupportedOperationException(
517: "Component does not handle EPR exchanges");
518: }
519:
520: }
|