0001: /*
0002: * BEGIN_HEADER - DO NOT EDIT
0003: *
0004: * The contents of this file are subject to the terms
0005: * of the Common Development and Distribution License
0006: * (the "License"). You may not use this file except
0007: * in compliance with the License.
0008: *
0009: * You can obtain a copy of the license at
0010: * https://open-esb.dev.java.net/public/CDDLv1.0.html.
0011: * See the License for the specific language governing
0012: * permissions and limitations under the License.
0013: *
0014: * When distributing Covered Code, include this CDDL
0015: * HEADER in each file and include the License file at
0016: * https://open-esb.dev.java.net/public/CDDLv1.0.html.
0017: * If applicable add the following below this CDDL HEADER,
0018: * with the fields enclosed by brackets "[]" replaced with
0019: * your own identifying information: Portions Copyright
0020: * [year] [name of copyright owner]
0021: */
0022:
0023: /*
0024: * @(#)MessageService.java
0025: * Copyright 2004-2007 Sun Microsystems, Inc. All Rights Reserved.
0026: *
0027: * END_HEADER - DO NOT EDIT
0028: */
0029: package com.sun.jbi.messaging;
0030:
0031: import com.sun.jbi.ComponentManager;
0032: import com.sun.jbi.EnvironmentContext;
0033: import com.sun.jbi.messaging.ExchangeIdGenerator;
0034: import com.sun.jbi.management.support.JbiNameInfo;
0035: import com.sun.jbi.management.support.MBeanHelper;
0036: import com.sun.jbi.messaging.stats.METimestamps;
0037: import com.sun.jbi.messaging.stats.Value;
0038: import com.sun.jbi.messaging.util.Translator;
0039: import com.sun.jbi.messaging.util.XMLUtil;
0040:
0041: import java.lang.reflect.Constructor;
0042: import java.net.URI;
0043: import java.util.Date;
0044: import java.util.Enumeration;
0045: import java.util.Hashtable;
0046: import java.util.HashMap;
0047: import java.util.Iterator;
0048: import java.util.LinkedList;
0049: import java.util.Set;
0050: import java.util.logging.Logger;
0051: import java.util.logging.Level;
0052:
0053: import java.util.concurrent.atomic.AtomicLong;
0054:
0055: import javax.jbi.component.Component;
0056: import javax.jbi.servicedesc.ServiceEndpoint;
0057:
0058: import javax.management.openmbean.CompositeData;
0059: import javax.management.openmbean.CompositeDataSupport;
0060: import javax.management.openmbean.CompositeType;
0061: import javax.management.openmbean.SimpleType;
0062: import javax.management.openmbean.OpenType;
0063: import javax.management.Notification;
0064: import javax.management.ObjectName;
0065: import javax.management.StandardMBean;
0066: import javax.xml.namespace.QName;
0067: import javax.transaction.TransactionManager;
0068: import javax.transaction.xa.XAResource;
0069:
0070: import org.w3c.dom.Document;
0071:
0072: /**
0073: * Entry point to NMS for framework and management code.
0074: * @author Sun Microsystems, Inc.
0075: */
0076: public class MessageService extends
0077: com.sun.jbi.management.system.ModelSystemService implements
0078: com.sun.jbi.ServiceLifecycle,
0079: com.sun.jbi.management.system.MessageServiceMBean,
0080: NMRStatistics {
0081: /** Copy of EnvironmentContext given to us at initialization.
0082: */
0083: private EnvironmentContext mEnvironmentContext;
0084:
0085: private EndpointRegistry mRegistry;
0086:
0087: private Logger mLog = Logger.getLogger(this .getClass().getPackage()
0088: .getName());
0089:
0090: /** Hook to framework to retrieve component instances.*/
0091: private ComponentManager mComponentManager;
0092:
0093: /** Reference to TransactionManager if one exists. */
0094: private TransactionManager mTransactionManager;
0095:
0096: /** Table used to store channel references. */
0097: private Hashtable mChannels;
0098:
0099: /** our immutable name (TODO: this should come from framework): */
0100: private final JbiNameInfo mJbiNameInfo = new JbiNameInfo(
0101: "MessageService");
0102:
0103: /** Listener for endpoint notifications. */
0104: private EndpointListener mListener;
0105:
0106: /** Listener for exchange timeout notifications. */
0107: private TimeoutListener mTimeout;
0108: private DeliveryChannel mTimeoutDC;
0109:
0110: /** ExchangeIdGenerator for unique exchange Ids. */
0111: private ExchangeIdGenerator mGenerator;
0112:
0113: /** Basic statistics. */
0114: private long mTotalChannels;
0115: private long mSendRequest;
0116: private long mReceiveRequest;
0117: private long mSendReply;
0118: private long mReceiveReply;
0119: private long mSendFault;
0120: private long mReceiveFault;
0121: private long mSendStatus;
0122: private long mReceiveStatus;
0123: private long mSendDONE;
0124: private long mReceiveDONE;
0125: private long mSendERROR;
0126: private long mReceiveERROR;
0127: private long mInOutMEPs;
0128: private long mInOnlyMEPs;
0129: private long mRobustInOnlyMEPs;
0130: private long mInOptionalOutMEPs;
0131: private Value mResponseTime;
0132: private Value mStatusTime;
0133: private Value mNMRTime;
0134: private Value mConsumerTime;
0135: private Value mProviderTime;
0136: private Value mChannelTime;
0137:
0138: private AtomicLong mActiveExchanges;
0139: private AtomicLong mTotalExchanges;
0140:
0141: /** Message Service statistics MBean implementation. */
0142: private MessageServiceStatistics mStatistics;
0143:
0144: /** XAResources private to JBI Components. */
0145: LinkedList mResources;
0146:
0147: /**
0148: * ArrayList to store all registered observers
0149: */
0150: private HashMap<DeliveryChannel, DeliveryChannel> mObserver;
0151:
0152: private static final String OBSERVER_SENDER = "com.sun.jbi.observer.sender";
0153: private static final String OBSERVER_RECEIVER = "com.sun.jbi.observer.receiver";
0154:
0155: /** Monitoring support. */
0156: private boolean mMonitoringEnabled;
0157:
0158: /** Create a new instance of the NMS. */
0159: public MessageService() {
0160: mChannels = new Hashtable();
0161: mResources = new LinkedList();
0162: mStatistics = new MessageServiceStatistics(this , mJbiNameInfo
0163: .name());
0164: mObserver = new HashMap();
0165: mActiveExchanges = new AtomicLong();
0166: mTotalExchanges = new AtomicLong();
0167: mResponseTime = new Value();
0168: mStatusTime = new Value();
0169: mNMRTime = new Value();
0170: mProviderTime = new Value();
0171: mConsumerTime = new Value();
0172: mChannelTime = new Value();
0173: }
0174:
0175: /******************************************
0176: * ServiceLifecycle methods *
0177: ******************************************/
0178:
0179: /** Init our environment. */
0180: public void initService(EnvironmentContext ctx)
0181: throws javax.jbi.JBIException {
0182: // Save our EnvironmentContext.
0183:
0184: mEnvironmentContext = ctx;
0185: mRegistry = EndpointRegistry.getInstance();
0186: mComponentManager = ctx.getComponentManager();
0187:
0188: // init string translator
0189: Translator.setStringTranslator(mEnvironmentContext
0190: .getStringTranslator("com.sun.jbi.messaging"));
0191:
0192: // initialize the model system service:
0193: super .initModelSystemService(ctx, mLog, mJbiNameInfo);
0194:
0195: /*
0196: * NOTE: this is the place to override the generic configuration MBean.
0197: * EXAMPLE:
0198: * //replace configuration mbean in INITIAL mbean set with mine:
0199: * super.mInitialMBeans.replace(
0200: * super.mConfigMBeanName,
0201: * <YOUR_CONFIG_MBEAN>.class,
0202: * <YOUR_CONFIG_IMPLEMENTATION_INSTANCE>
0203: * );
0204: * See AdminServiceConfiguration*.java for example. RT 3/6/05
0205: */
0206: super .mInitialMBeans
0207: .replace(
0208: super .mStatisticsMBeanName,
0209: com.sun.jbi.messaging.MessageServiceStatisticsMBean.class,
0210: mStatistics);
0211:
0212: //add MessageService MBean to START/STOP mbean set.
0213: super .mStartMBeans
0214: .add(
0215: super .mMessageServiceMBeanName,
0216: com.sun.jbi.management.system.MessageServiceMBean.class,
0217: this , true //create a standard mbean that can emit notifications.
0218: );
0219:
0220: super .bootstrap();
0221: }
0222:
0223: /** Start the NMS. */
0224: public void startService() throws javax.jbi.JBIException {
0225: mGenerator = (com.sun.jbi.messaging.ExchangeIdGenerator) new ExchangeIdGeneratorImpl();
0226: // Register the Statistics MBean as a listener on the config MBean
0227: startListeningToConfigurationMBean();
0228: mStatistics.setLastRestartTime(new Date());
0229: // Get the default timing stats enabled value from the config MBean
0230: if (isMsgTimingStatsEnabledByDefault()) {
0231: mStatistics.enableTimingStatistics();
0232: } else {
0233: mStatistics.disableTimingStatistics();
0234: }
0235: }
0236:
0237: /** Stop the NMS. */
0238: public void stopService() throws javax.jbi.JBIException {
0239: DeliveryChannelImpl channel;
0240: Enumeration e;
0241:
0242: // make sure all of the channels are closed
0243: e = mChannels.elements();
0244: while (e.hasMoreElements()) {
0245: channel = (DeliveryChannelImpl) e.nextElement();
0246: channel.close();
0247: }
0248:
0249: // clear the channel table
0250: mChannels.clear();
0251: // clear the registry
0252: mRegistry.clear();
0253:
0254: // Unregister the Statistics MBean as a listener on the configuration MBean
0255: stopListeningToConfigurationMBean();
0256: }
0257:
0258: /******************************************
0259: * Framework methods *
0260: ******************************************/
0261:
0262: /** Create a new delivery channel. If the channel has already been created,
0263: * this method throws an exception.
0264: * @param componentId component id
0265: * @param classLoader class loader for component
0266: * @throws javax.jbi.messaging.MessagingException attempt to activate
0267: * duplicate channel.
0268: */
0269: public DeliveryChannelImpl activateChannel(String componentId,
0270: ClassLoader classLoader)
0271: throws javax.jbi.messaging.MessagingException {
0272: DeliveryChannelImpl dc;
0273:
0274: if (mChannels.containsKey(componentId)) {
0275: dc = (DeliveryChannelImpl) mChannels.get(componentId);
0276: } else {
0277: mLog.fine(Translator.translate(
0278: LocalStringKeys.CHANNEL_CREATED,
0279: new Object[] { componentId }));
0280:
0281: dc = new DeliveryChannelImpl(componentId, classLoader,
0282: this , mListener);
0283: addChannel(dc);
0284: }
0285:
0286: return dc;
0287: }
0288:
0289: /** Get a reference to the NMR ConnectionManager.
0290: * @return instance of ConnectionManager.
0291: */
0292: public ConnectionManager getConnectionManager() {
0293: return mRegistry;
0294: }
0295:
0296: /******************************************
0297: * Internal NMS methods *
0298: ******************************************/
0299:
0300: /** Adds a channel to the NMS routing table.
0301: */
0302: void addChannel(DeliveryChannelImpl channel) {
0303: mChannels.put(channel.getChannelId(), channel);
0304: mTotalChannels++;
0305: }
0306:
0307: /** Removes a channel from the NMS routing table.
0308: */
0309: void removeChannel(String channelId) {
0310: mChannels.remove(channelId);
0311: }
0312:
0313: /** Retrieves a channel based on it's ID from the NMS routing table.
0314: */
0315: DeliveryChannelImpl getChannel(String channelId) {
0316: return (DeliveryChannelImpl) mChannels.get(channelId);
0317: }
0318:
0319: /** Get the TransactionManager
0320: */
0321: javax.transaction.TransactionManager getTransactionManager() {
0322: // See if we can find a TransactionManager
0323: if (mTransactionManager == null) {
0324: if (mEnvironmentContext != null) {
0325: mTransactionManager = (javax.transaction.TransactionManager) mEnvironmentContext
0326: .getTransactionManager();
0327: }
0328: }
0329: return (mTransactionManager);
0330: }
0331:
0332: public void addXAResource(XAResource resource) {
0333: mResources.add(resource);
0334: }
0335:
0336: public javax.transaction.xa.XAResource[] getXAResources() {
0337: XAResource[] resources = new XAResource[mResources.size()];
0338:
0339: mResources.toArray(resources);
0340: return (resources);
0341: }
0342:
0343: public void purgeXAResources() {
0344: mResources.clear();
0345: }
0346:
0347: /**
0348: * Set the TransactionManager. Used by the junit test harness to bypass using
0349: * EnvironmentContext (which is not available.)
0350: */
0351: void setTransactionManager(javax.transaction.TransactionManager tm) {
0352: mTransactionManager = tm;
0353: }
0354:
0355: /** Perform the exchange operation.
0356: */
0357: void doExchange(DeliveryChannelImpl channel,
0358: MessageExchangeProxy exchange)
0359: throws javax.jbi.messaging.MessagingException {
0360: DeliveryChannelImpl targetChannel;
0361: MessageExchangeProxy targetExchange;
0362:
0363: addressExchange(exchange, channel);
0364: exchange.validate(channel, false);
0365: handleObserver(exchange, channel);
0366: targetChannel = exchange.getSendChannel();
0367: targetExchange = exchange.getTwin();
0368: if (exchange.handleSend(channel)) {
0369: channel.removeSendReference(exchange);
0370: mActiveExchanges.decrementAndGet();
0371: } else {
0372: channel.addSendReference(exchange);
0373: mActiveExchanges.incrementAndGet();
0374: }
0375: targetChannel.queueExchange(targetExchange);
0376: }
0377:
0378: /** Perform the exchange operation.
0379: */
0380: boolean doSynchExchange(DeliveryChannelImpl channel,
0381: MessageExchangeProxy exchange, long timeout)
0382: throws javax.jbi.messaging.MessagingException {
0383: DeliveryChannelImpl targetChannel;
0384: MessageExchangeProxy targetExchange;
0385: boolean valid = true;
0386: boolean timedout = false;
0387:
0388: addressExchange(exchange, channel);
0389: exchange.validate(channel, true);
0390: handleObserver(exchange, channel);
0391: targetChannel = exchange.getSendChannel();
0392: targetExchange = exchange.getTwin();
0393: exchange.handleSendSync(channel);
0394: channel.addSendSyncReference(exchange);
0395: mStatistics.getMessagingStatisticsInstance()
0396: .incrementSendSyncs();
0397: synchronized (exchange) {
0398: exchange
0399: .setSynchState((timeout != 0 && mTimeout != null && mTimeoutDC != targetChannel) ? MessageExchangeProxy.WAIT_TIMEOUT
0400: : MessageExchangeProxy.WAIT);
0401: targetChannel.queueExchange(targetExchange);
0402: try {
0403: for (;;) {
0404: int state;
0405:
0406: exchange.wait(timeout);
0407: state = exchange.getSynchState();
0408: if (state == MessageExchangeProxy.HALF_DONE) {
0409: exchange
0410: .setSynchState(MessageExchangeProxy.DONE);
0411: break;
0412: }
0413: if (timeout != 0) {
0414: if (state == MessageExchangeProxy.WAIT) {
0415: exchange.terminate();
0416: valid = false;
0417: break;
0418: } else if (state == MessageExchangeProxy.WAIT_TIMEOUT) {
0419: timedout = true;
0420: break;
0421: }
0422: }
0423: }
0424: } catch (java.lang.InterruptedException iEx) {
0425: exchange.terminate();
0426: valid = false;
0427: }
0428: }
0429: if (timedout) {
0430: if (mTimeout.checkTimeout(channel, exchange)) {
0431: exchange.terminate();
0432: } else {
0433: exchange.setSynchState(MessageExchangeProxy.NONE);
0434: valid = false;
0435: }
0436: }
0437: return (valid);
0438: }
0439:
0440: void addressExchange(MessageExchangeProxy exchange,
0441: DeliveryChannelImpl channel)
0442: throws javax.jbi.messaging.MessagingException {
0443: RegisteredEndpoint endpoint;
0444: QName serviceName;
0445: QName interfaceName;
0446:
0447: //
0448: // Check addressing of exchange in this order:
0449: // (1) endpoint
0450: // (2) service name
0451: // (3) interface name
0452: //
0453: endpoint = (RegisteredEndpoint) exchange.getEndpoint();
0454:
0455: if (endpoint == null) {
0456: if ((serviceName = exchange.getService()) != null) {
0457: RegisteredEndpoint[] endpoints;
0458: RegisteredEndpoint match;
0459:
0460: endpoints = mRegistry.getInternalEndpointsForService(
0461: serviceName, true);
0462: if (endpoints.length != 0) {
0463: endpoint = matchConsumerAndProvider(channel
0464: .getChannelId(), endpoints, exchange);
0465: } else {
0466: throw new javax.jbi.messaging.MessagingException(
0467: Translator
0468: .translate(
0469: LocalStringKeys.CANT_FIND_ENDPOINT_FOR_SERVICE,
0470: new Object[] { serviceName
0471: .toString() }));
0472: }
0473: } else if ((interfaceName = exchange.getInterfaceName()) != null) {
0474: RegisteredEndpoint[] endpoints;
0475: RegisteredEndpoint match;
0476:
0477: endpoints = mRegistry.getInternalEndpointsForInterface(
0478: interfaceName, this );
0479: if (endpoints.length != 0) {
0480: endpoint = matchConsumerAndProvider(channel
0481: .getChannelId(), endpoints, exchange);
0482: } else {
0483: throw new javax.jbi.messaging.MessagingException(
0484: Translator
0485: .translate(
0486: LocalStringKeys.CANT_FIND_ENDPOINT_FOR_INTERFACE,
0487: new Object[] { interfaceName
0488: .toString() }));
0489: }
0490: } else {
0491: throw new javax.jbi.messaging.MessagingException(
0492: Translator
0493: .translate(LocalStringKeys.ADDR_NO_ENDPOINT));
0494: }
0495: exchange.setEndpoint(endpoint);
0496: }
0497:
0498: /** Check to see if we are dealing with a real endpoint or a
0499: * linked endpoint from a service connection.
0500: */
0501: if (endpoint.isLinked()) {
0502: exchange.setEndpointLink(endpoint);
0503: endpoint = mRegistry
0504: .resolveLinkedEndpoint((LinkedEndpoint) endpoint);
0505:
0506: // check for resolution failure
0507: if (endpoint == null) {
0508: throw new javax.jbi.messaging.MessagingException(
0509: Translator
0510: .translate(
0511: LocalStringKeys.SERVICE_CONNECTION_NO_ENDPOINT,
0512: new Object[] {
0513: exchange
0514: .getEndpointLink()
0515: .getServiceName(),
0516: exchange
0517: .getEndpointLink()
0518: .getEndpointName() }));
0519: }
0520: exchange.setEndpoint(endpoint);
0521: }
0522: }
0523:
0524: /**
0525: * Processes an Observed Message Exchange for all components that are designated
0526: * as Observers.
0527: *
0528: * @param exchange - The exchange to Observe
0529: * @param channel - The DeliveryChannel performing the send.
0530: * @throws MessagingException
0531: */
0532: private void handleObserver(MessageExchangeProxy exchange,
0533: DeliveryChannelImpl channel)
0534: throws javax.jbi.messaging.MessagingException {
0535: if (mObserver != null) {
0536: for (DeliveryChannel odc : mObserver.keySet()) {
0537: MessageExchangeProxy observedExchange = new Observer(
0538: exchange);
0539:
0540: MessageExchangeImpl mei = new MessageExchangeImpl(
0541: observedExchange,
0542: exchange.getMessageExchange(), this );
0543: observedExchange.setMessageExchange(mei, false);
0544: observedExchange.setEndpointLink(exchange
0545: .getEndpointLink());
0546: observedExchange.setProperty(OBSERVER_SENDER, channel
0547: .getChannelId());
0548: observedExchange.setProperty(OBSERVER_RECEIVER,
0549: exchange.getSendChannel().getChannelId());
0550: ((DeliveryChannelImpl) odc)
0551: .queueObserved(observedExchange);
0552: }
0553: }
0554: }
0555:
0556: /** Facilitates intercomponent meta data queries.
0557: */
0558: Document queryDescriptor(ServiceEndpoint ref) {
0559: Component component;
0560: Document descriptor = null;
0561:
0562: if (ref instanceof LinkedEndpoint) {
0563: ref = mRegistry.resolveLinkedEndpoint((LinkedEndpoint) ref);
0564: // return immediately if this is a dead link
0565: if (ref == null) {
0566: return null;
0567: }
0568: }
0569:
0570: component = mComponentManager
0571: .getComponentInstance(((RegisteredEndpoint) ref)
0572: .getOwnerId());
0573:
0574: if (component != null) {
0575: descriptor = component.getServiceDescription(ref);
0576: }
0577:
0578: return descriptor;
0579: }
0580:
0581: DynamicEndpoint resolveEndpointReference(
0582: org.w3c.dom.DocumentFragment reference) {
0583: Set channels;
0584: DynamicEndpoint dep = null;
0585:
0586: channels = ((Hashtable) mChannels.clone()).keySet();
0587:
0588: for (Iterator i = channels.iterator(); i.hasNext();) {
0589: String id;
0590: Component comp;
0591: ServiceEndpoint ep;
0592:
0593: id = (String) i.next();
0594: comp = mComponentManager.getComponentInstance(id);
0595: ep = comp.resolveEndpointReference(reference);
0596:
0597: if (ep != null) {
0598: dep = new DynamicEndpoint(ep, id, reference);
0599: break;
0600: }
0601: }
0602:
0603: return dep;
0604: }
0605:
0606: /** Convenience method used in unit tests. */
0607: void setComponentManager(ComponentManager compMgr) {
0608: mComponentManager = compMgr;
0609: }
0610:
0611: /** Make sure that consumer and provider are cool with exchange. */
0612: RegisteredEndpoint matchConsumerAndProvider(String consumerId,
0613: RegisteredEndpoint[] endpoints,
0614: MessageExchangeProxy exchange)
0615: throws javax.jbi.messaging.MessagingException {
0616: RegisteredEndpoint match = null;
0617: Component consumer;
0618: Component provider;
0619:
0620: consumer = mComponentManager.getComponentInstance(consumerId);
0621:
0622: if (endpoints.length == 1) {
0623: // No room to be picky
0624: match = endpoints[0];
0625: } else {
0626: try {
0627: for (int i = 0; i < endpoints.length; i++) {
0628: provider = mComponentManager
0629: .getComponentInstance(endpoints[i]
0630: .getOwnerId());
0631: exchange.beforeCapabilityCheck(endpoints[i]);
0632:
0633: // make sure provider and consumer are cool with values
0634: if (consumer.isExchangeWithProviderOkay(
0635: endpoints[i], exchange)
0636: && provider.isExchangeWithConsumerOkay(
0637: endpoints[i], exchange)) {
0638: match = endpoints[i];
0639: break;
0640: }
0641: }
0642: } finally {
0643: exchange.afterCapabilityCheck();
0644: }
0645: }
0646:
0647: if (match == null) {
0648: throw new javax.jbi.messaging.MessagingException(Translator
0649: .translate(LocalStringKeys.CAPABILITY_NO_MATCH));
0650: }
0651:
0652: return match;
0653: }
0654:
0655: //-----------------------------MessageServiceMBean--------------------------------
0656:
0657: /** Returns the total number of DeliveryChannels that have been activated
0658: * in the NMR.
0659: * @return number of active DeliveryChannels
0660: */
0661: public int getActiveChannelCount() {
0662: return mChannels.size();
0663: }
0664:
0665: /** Returns the identifiers of all the active channels.
0666: * @return names of all the active channels.
0667: */
0668: public String[] getActiveChannels() {
0669: return (String[]) mChannels.keySet().toArray(new String[0]);
0670: }
0671:
0672: /** Returns a list of component IDs corresponding to active channels in the NMR.
0673: * @return list of component IDs
0674: */
0675: public int getActiveEndpointCount() {
0676: return mRegistry.countEndpoints(RegisteredEndpoint.INTERNAL);
0677: }
0678:
0679: /** Returns a list of active endpoints in the NMR.
0680: * @return list of activated endpoints
0681: */
0682: public String[] getActiveEndpoints() {
0683: return (getEndpointNames());
0684: }
0685:
0686: /** Returns a list of active consuming endpoints in the NMR.
0687: * @return list of activated consuming endpoints
0688: */
0689: public String[] getActiveConsumingEndpoints() {
0690: return (getConsumingEndpointNames());
0691: }
0692:
0693: /** Identical to getActiveEndpoints(), but list is limited to endpoints
0694: * registered by the specified component.
0695: * @param ownerId component identifier
0696: * @return list of activated endpoints
0697: */
0698: public String[] getActiveEndpoints(String ownerId) {
0699: DeliveryChannelImpl dc = (DeliveryChannelImpl) this
0700: .getChannel(ownerId);
0701:
0702: return (dc.getEndpointNames());
0703: }
0704:
0705: /** Identical to getActiveEndpoints(), but list is limited to endpoints
0706: * registered by the specified component.
0707: * @param ownerId component identifier
0708: * @return list of activated endpoints
0709: */
0710: public String[] getActiveConsumingEndpoints(String ownerId) {
0711: DeliveryChannelImpl dc = (DeliveryChannelImpl) this
0712: .getChannel(ownerId);
0713:
0714: return (dc.getConsumingEndpointNames());
0715: }
0716:
0717: /** Provides metadata query facility for endpoints registered with the NMR.
0718: * This method returns the contents of an XML descriptor as a string.
0719: * @param service string representation of service QName
0720: * @param endpoint endpoint name
0721: * @return XML descriptor as string
0722: */
0723: public String getDescriptor(String service, String endpoint) {
0724: RegisteredEndpoint re;
0725: Document desc;
0726: String descStr = null;
0727:
0728: re = mRegistry
0729: .getInternalEndpoint(new QName(service), endpoint);
0730:
0731: try {
0732: desc = queryDescriptor(re);
0733:
0734: if (desc != null) {
0735: descStr = XMLUtil.getInstance().asString(desc);
0736: }
0737: } catch (javax.jbi.messaging.MessagingException msgEx) {
0738: mLog.warning(msgEx.getMessage());
0739: }
0740:
0741: return descStr;
0742: }
0743:
0744: /**
0745: * Dump the state of the MessageService to the log.
0746: */
0747: public void dumpState() {
0748: mLog.info(toString());
0749: }
0750:
0751: void setEndpointListener(EndpointListener listener) {
0752: mListener = listener;
0753: }
0754:
0755: void setTimeoutListener(DeliveryChannel dc, TimeoutListener timeout) {
0756: mTimeout = timeout;
0757: mTimeoutDC = dc;
0758: }
0759:
0760: void setExchangeIdGenerator(ExchangeIdGenerator generator) {
0761: mGenerator = generator;
0762: }
0763:
0764: String generateNextId() {
0765: return (mGenerator.nextId());
0766: }
0767:
0768: boolean isExchangeOkay(MessageExchange me) {
0769: Component provider;
0770:
0771: provider = mComponentManager
0772: .getComponentInstance(((RegisteredEndpoint) me
0773: .getEndpoint()).getOwnerId());
0774: return (provider.isExchangeWithConsumerOkay(me.getEndpoint(),
0775: me));
0776:
0777: }
0778:
0779: /**
0780: * Add a component as an observer of the NMR
0781: *
0782: * @param channel - The Delivery Channel of the observer component.
0783: */
0784: synchronized public void addObserver(DeliveryChannel channel) {
0785: HashMap newObserver = (HashMap) mObserver.clone();
0786: newObserver.put(channel, channel);
0787: mObserver = newObserver;
0788: }
0789:
0790: /**
0791: * Remove a component as an observer of the NMR
0792: *
0793: * @param channel
0794: */
0795: synchronized public void removeObserver(DeliveryChannel channel) {
0796: HashMap newObserver = (HashMap) mObserver.clone();
0797: newObserver.remove(channel);
0798: mObserver = newObserver;
0799: }
0800:
0801: void updateStatistics(MessageExchangeProxy mep) {
0802: int phaseMask = mep.getPhaseMask();
0803: METimestamps ts = mep.getTimestamps();
0804:
0805: synchronized (this ) {
0806: if ((phaseMask & MessageExchangeProxy.PM_SEND_REQUEST) != 0) {
0807: mSendRequest++;
0808: }
0809: if ((phaseMask & MessageExchangeProxy.PM_SEND_REPLY) != 0) {
0810: mSendReply++;
0811: }
0812: if ((phaseMask & MessageExchangeProxy.PM_SEND_FAULT) != 0) {
0813: mSendFault++;
0814: }
0815: if ((phaseMask & MessageExchangeProxy.PM_SEND_DONE) != 0) {
0816: mSendDONE++;
0817: }
0818: if ((phaseMask & MessageExchangeProxy.PM_SEND_ERROR) != 0) {
0819: mSendERROR++;
0820: }
0821: if ((phaseMask & MessageExchangeProxy.PM_RECEIVE_REQUEST) != 0) {
0822: mReceiveRequest++;
0823: }
0824: if ((phaseMask & MessageExchangeProxy.PM_RECEIVE_REPLY) != 0) {
0825: mReceiveReply++;
0826: }
0827: if ((phaseMask & MessageExchangeProxy.PM_RECEIVE_FAULT) != 0) {
0828: mReceiveFault++;
0829: }
0830: if ((phaseMask & MessageExchangeProxy.PM_RECEIVE_DONE) != 0) {
0831: mReceiveDONE++;
0832: }
0833: if ((phaseMask & MessageExchangeProxy.PM_RECEIVE_ERROR) != 0) {
0834: mReceiveERROR++;
0835: }
0836: if (ts != null) {
0837: mResponseTime.addSample(ts.mResponseTime);
0838: mNMRTime.addSample(ts.mNMRTime);
0839: mProviderTime.addSample(ts.mProviderTime);
0840: mChannelTime.addSample(ts.mProviderChannelTime);
0841: mChannelTime.addSample(ts.mConsumerChannelTime);
0842: if (ts.mConsumerTime != 0) {
0843: mConsumerTime.addSample(ts.mConsumerTime);
0844: }
0845: if (ts.mStatusTime != 0) {
0846: mStatusTime.addSample(ts.mStatusTime);
0847: }
0848: }
0849: if (mep instanceof InOnlyImpl) {
0850: mInOnlyMEPs++;
0851: } else if (mep instanceof InOutImpl) {
0852: mInOutMEPs++;
0853:
0854: } else if (mep instanceof RobustInOnlyImpl) {
0855: mRobustInOnlyMEPs++;
0856:
0857: } else if (mep instanceof InOptionalOutImpl) {
0858: mInOptionalOutMEPs++;
0859: }
0860: }
0861:
0862: RegisteredEndpoint re = ((RegisteredEndpoint) mep.getEndpoint());
0863: if (re != null) {
0864: re.updateStatistics(mep);
0865: }
0866: re = ((RegisteredEndpoint) mep.getEndpointLink());
0867: if (re != null) {
0868: re.updateStatistics(mep);
0869: }
0870:
0871: }
0872:
0873: //-----------------------------NMRStatistics--------------------------------
0874:
0875: synchronized public void enableStatistics() {
0876: mMonitoringEnabled = true;
0877: }
0878:
0879: synchronized public void disableStatistics() {
0880: mMonitoringEnabled = false;
0881:
0882: }
0883:
0884: synchronized public boolean areStatisticsEnabled() {
0885: return (mMonitoringEnabled);
0886: }
0887:
0888: synchronized public void zeroStatistics() {
0889: mSendRequest = 0;
0890: mReceiveRequest = 0;
0891: mSendReply = 0;
0892: mReceiveReply = 0;
0893: mSendFault = 0;
0894: mReceiveFault = 0;
0895: mSendStatus = 0;
0896: mReceiveStatus = 0;
0897: mSendDONE = 0;
0898: mReceiveDONE = 0;
0899: mSendERROR = 0;
0900: mReceiveERROR = 0;
0901: mInOutMEPs = 0;
0902: mInOnlyMEPs = 0;
0903: mRobustInOnlyMEPs = 0;
0904: mInOptionalOutMEPs = 0;
0905: mResponseTime.zero();
0906: mStatusTime.zero();
0907: mNMRTime.zero();
0908: mConsumerTime.zero();
0909: mProviderTime.zero();
0910: mChannelTime.zero();
0911:
0912: }
0913:
0914: boolean statisticsEnabled() {
0915: return (mMonitoringEnabled);
0916: }
0917:
0918: public String[] getChannelNames() {
0919: return (getActiveChannels());
0920: }
0921:
0922: public ChannelStatistics getChannelStatistics(String name) {
0923: return (getChannel(name));
0924: }
0925:
0926: public String[] getEndpointNames() {
0927: ServiceEndpoint[] endpoints;
0928: ServiceEndpoint[] lendpoints;
0929: String[] list;
0930:
0931: endpoints = mRegistry
0932: .listEndpoints(RegisteredEndpoint.INTERNAL);
0933: lendpoints = mRegistry.listEndpoints(RegisteredEndpoint.LINKED);
0934: list = new String[endpoints.length + lendpoints.length];
0935:
0936: for (int i = 0; i < endpoints.length; i++) {
0937: list[i] = ((RegisteredEndpoint) endpoints[i])
0938: .toExternalName();
0939: }
0940: for (int i = 0; i < lendpoints.length; i++) {
0941: list[endpoints.length + i] = ((RegisteredEndpoint) lendpoints[i])
0942: .toExternalName();
0943: }
0944:
0945: return list;
0946: }
0947:
0948: public String[] getConsumingEndpointNames() {
0949: ServiceEndpoint[] endpoints;
0950: String[] list;
0951:
0952: endpoints = mRegistry.listEndpoints(RegisteredEndpoint.LINKED);
0953: list = new String[endpoints.length];
0954:
0955: for (int i = 0; i < list.length; i++) {
0956: list[i] = ((RegisteredEndpoint) endpoints[i])
0957: .toExternalName();
0958: }
0959:
0960: return list;
0961: }
0962:
0963: public EndpointStatistics getEndpointStatistics(String name) {
0964: EndpointStatistics es;
0965:
0966: es = mRegistry.getLinkedEndpointByName(name);
0967: if (es == null) {
0968: es = mRegistry.getInternalEndpointByName(name);
0969: }
0970: return (es);
0971: }
0972:
0973: /**
0974: * List of item names for CompositeData construction.
0975: */
0976: private static final String[] ITEM_NAMES = { "SendRequest",
0977: "ReceiveRequest", "SendReply", "ReceiveReply", "SendFault",
0978: "ReceiveFault", "SendDONE", "ReceiveDONE", "SendERROR",
0979: "ReceiveERROR", "InOnlyMEPs", "RobustInOnlyMEPs",
0980: "InOutMEPs", "InOptionalOutMEPs", "DeliveryChannels",
0981: "Endpoints", "ResponseTimeMin (ns)",
0982: "ResponseTimeAvg (ns)", "ResponseTimeMax (ns)",
0983: "ResponseTimeStd (ns)", "StatusTimeMin (ns)",
0984: "StatusTimeAvg (ns)", "StatusTimeMax (ns)",
0985: "StatusTimeStd (ns)", "NMRTimeMin (ns)", "NMRTimeAvg (ns)",
0986: "NMRTimeMax (ns)", "NMRTimeStd (ns)",
0987: "ConsumerTimeMin (ns)", "ConsumerTimeAvg (ns)",
0988: "ConsumerTimeMax (ns)", "ConsumerTimeStd (ns)",
0989: "ProviderTimeMin (ns)", "ProviderTimeAvg (ns)",
0990: "ProviderTimeMax (ns)", "ProviderTimeStd (ns)",
0991: "ChannelTimeMin (ns)", "ChannelTimeAvg (ns)",
0992: "ChannelTimeMax (ns)", "ChannelTimeStd (ns)" };
0993:
0994: /**
0995: * List of descriptions of items for ComponsiteData construction.
0996: */
0997: private static final String ITEM_DESCRIPTIONS[] = {
0998: "Number of requests sent", "Number of requests received",
0999: "Number of replies sent", "Number of replies received",
1000: "Number of faults sent", "Number of faults received",
1001: "Number of DONE requests sent",
1002: "Number of DONE requests received",
1003: "Number of ERROR requests sent",
1004: "Number of ERROR requests received",
1005: "Number of InOnly MEP's", "Number of RobustInOnly MEP's",
1006: "Number of InOut MEP's", "Number of InOptionalOut MEP's",
1007: "Number of DeliveryChannels", "Number of Endpoints",
1008: "Response Time Min", "Response Time Avg",
1009: "Response Time Max", "Response Time Std",
1010: "Status Time Min", "Status Time Avg", "Status Time Max",
1011: "Status Time Std", "NMR Time Min", "NMR Time Avg",
1012: "NMR Time Max", "NMR Time Std", "Consumer Time Min",
1013: "Consumer Time Avg", "Consumer Time Max",
1014: "Consumer Time Std", "Provider Time Min",
1015: "Provider Time Avg", "Provider Time Max",
1016: "Provider Time Std", "Channel Time Min",
1017: "Channel Time Avg", "Channel Time Max", "Channel Time Std" };
1018:
1019: /**
1020: * List of types of items for CompositeData construction.
1021: */
1022: private static final OpenType ITEM_TYPES[] = { SimpleType.LONG,
1023: SimpleType.LONG, SimpleType.LONG, SimpleType.LONG,
1024: SimpleType.LONG, SimpleType.LONG, SimpleType.LONG,
1025: SimpleType.LONG, SimpleType.LONG, SimpleType.LONG,
1026: SimpleType.LONG, SimpleType.LONG, SimpleType.LONG,
1027: SimpleType.LONG, SimpleType.LONG, SimpleType.LONG,
1028: SimpleType.LONG, SimpleType.LONG, SimpleType.LONG,
1029: SimpleType.LONG, SimpleType.LONG, SimpleType.LONG,
1030: SimpleType.LONG, SimpleType.LONG, SimpleType.LONG,
1031: SimpleType.LONG, SimpleType.LONG, SimpleType.LONG,
1032: SimpleType.LONG, SimpleType.LONG, SimpleType.LONG,
1033: SimpleType.LONG, SimpleType.LONG, SimpleType.LONG,
1034: SimpleType.LONG, SimpleType.LONG, SimpleType.LONG,
1035: SimpleType.LONG, SimpleType.LONG, SimpleType.LONG };
1036:
1037: public CompositeData getStatistics() {
1038: try {
1039: Object values[] = {
1040: mSendRequest,
1041: mReceiveRequest,
1042: mSendReply,
1043: mReceiveReply,
1044: mSendFault,
1045: mReceiveFault,
1046: mSendDONE,
1047: mReceiveDONE,
1048: mSendERROR,
1049: mReceiveERROR,
1050: mInOnlyMEPs,
1051: mRobustInOnlyMEPs,
1052: mInOutMEPs,
1053: mInOptionalOutMEPs,
1054: (long) mChannels.size(),
1055: (long) mRegistry
1056: .countEndpoints(RegisteredEndpoint.INTERNAL),
1057: mResponseTime.getMin(),
1058: (long) mResponseTime.getAverage(),
1059: mResponseTime.getMax(),
1060: (long) mResponseTime.getSd(), mStatusTime.getMin(),
1061: (long) mStatusTime.getAverage(),
1062: mStatusTime.getMax(), (long) mStatusTime.getSd(),
1063: mNMRTime.getMin(), (long) mNMRTime.getAverage(),
1064: mNMRTime.getMax(), (long) mNMRTime.getSd(),
1065: mConsumerTime.getMin(),
1066: (long) mConsumerTime.getAverage(),
1067: mConsumerTime.getMax(),
1068: (long) mConsumerTime.getSd(),
1069: mProviderTime.getMin(),
1070: (long) mProviderTime.getAverage(),
1071: mProviderTime.getMax(),
1072: (long) mProviderTime.getSd(),
1073: mChannelTime.getMin(),
1074: (long) mChannelTime.getAverage(),
1075: mChannelTime.getMax(), (long) mChannelTime.getSd() };
1076:
1077: return new CompositeDataSupport(new CompositeType(
1078: "MessagingStatistics",
1079: "Message exchange statistics", ITEM_NAMES,
1080: ITEM_DESCRIPTIONS, ITEM_TYPES), ITEM_NAMES, values);
1081: } catch (javax.management.openmbean.OpenDataException odEx) {
1082: System.out.println(odEx.toString()); // ignore this for now
1083: }
1084: return (null);
1085: }
1086:
1087: //-------------------------------Object-------------------------------------
1088:
1089: public String toString() {
1090: StringBuilder sb = new StringBuilder();
1091:
1092: sb.append("\nMessage Service Status\n");
1093: sb.append(" InOnlyMEPs: " + mInOnlyMEPs);
1094: sb.append(" InOutMEPs: " + mInOutMEPs);
1095: sb.append("\n RobustInOnlyMEPs: " + mRobustInOnlyMEPs);
1096: sb.append(" InOptionalOutMEPs: " + mInOptionalOutMEPs);
1097: sb.append("\n SendReply: " + mSendReply);
1098: sb.append(" RecvReply: " + mReceiveReply);
1099: sb.append("\n SendDONE: " + mSendDONE);
1100: sb.append(" RecvDONE: " + mReceiveDONE);
1101: sb.append(" SendERROR: " + mSendERROR);
1102: sb.append(" RecvERROR: " + mReceiveERROR);
1103: sb.append("\n SendStatus: " + mSendStatus);
1104: sb.append(" RecvStatus: " + mReceiveStatus);
1105: sb.append(" SendFault: " + mSendFault);
1106: sb.append(" RecvFault: " + mReceiveFault);
1107: if (mResponseTime != null) {
1108: sb
1109: .append("\n ResponseTime: "
1110: + mResponseTime.toString());
1111: sb.append("\n StatusTime: " + mStatusTime.toString());
1112: sb.append("\n ConsumerTime: " + mConsumerTime.toString());
1113: sb.append("\n ProviderTime: " + mProviderTime.toString());
1114: sb.append("\n ChannelTime: " + mChannelTime.toString());
1115: sb.append("\n NMRTime: " + mNMRTime.toString());
1116: }
1117: sb.append(" Delivery Channel Count: ");
1118: sb.append(mChannels.size());
1119: sb.append("\n");
1120: for (Iterator i = mChannels.values().iterator(); i.hasNext();) {
1121: sb.append(i.next().toString());
1122: }
1123: sb.append(mRegistry.toString());
1124: return (sb.toString());
1125: }
1126:
1127: /**
1128: * Register as a listener for attribute change events
1129: */
1130: private void startListeningToConfigurationMBean() {
1131: javax.management.MBeanServer mbeanServer = mEnvironmentContext
1132: .getMBeanServer();
1133:
1134: /**
1135: * Create a AttributeChangeNotificationFilter
1136: */
1137: javax.management.AttributeChangeNotificationFilter filter = new javax.management.AttributeChangeNotificationFilter();
1138: filter.disableAllAttributes();
1139: filter
1140: .enableAttribute(com.sun.jbi.management.config.SystemConfigurationFactory.MSG_SVC_STATS_ENABLED);
1141:
1142: try {
1143: mbeanServer.addNotificationListener(MBeanHelper
1144: .getSystemConfigMBeanName(), mStatistics, filter,
1145: null);
1146: } catch (Exception ex) {
1147: String exMsg = Translator
1148: .translate(
1149: LocalStringKeys.EXCEPTION_LOG,
1150: new Object[] { "startListeningToConfigurationMBean" });
1151: mLog.log(Level.FINE, exMsg, ex);
1152: }
1153: }
1154:
1155: /**
1156: * Stop listening to attribute change events
1157: */
1158: private void stopListeningToConfigurationMBean() {
1159: javax.management.MBeanServer mbeanServer = mEnvironmentContext
1160: .getMBeanServer();
1161:
1162: try {
1163: mbeanServer.removeNotificationListener(MBeanHelper
1164: .getSystemConfigMBeanName(), mStatistics);
1165: } catch (Exception ex) {
1166: String exMsg = Translator
1167: .translate(
1168: LocalStringKeys.EXCEPTION_LOG,
1169: new Object[] { "stopListeningToConfigurationMBean" });
1170: mLog.log(Level.FINE, exMsg, ex);
1171: }
1172: }
1173:
1174: /**
1175: * Get the value of a configuration attribute
1176: *
1177: * @param category - configuration category the attribute is defined in
1178: * @param attrName - name of the attribute
1179: * @return the value of the requested configuration attribute
1180: */
1181: private boolean isMsgTimingStatsEnabledByDefault() {
1182: ObjectName configMBeanName = MBeanHelper
1183: .getSystemConfigMBeanName();
1184: javax.management.MBeanServer mbeanServer = mEnvironmentContext
1185: .getMBeanServer();
1186:
1187: Boolean isEnabled = new Boolean(false);
1188: String attrName = com.sun.jbi.management.config.SystemConfigurationFactory.MSG_SVC_STATS_ENABLED;
1189: try {
1190: if (mbeanServer.isRegistered(configMBeanName)) {
1191: isEnabled = (Boolean) mbeanServer.getAttribute(
1192: configMBeanName, attrName);
1193: }
1194: } catch (javax.management.JMException jmex) {
1195: String exMsg = Translator
1196: .translate(
1197: LocalStringKeys.EXCEPTION_LOG,
1198: new Object[] { "isMsgTimingStatsEnabledByDefault" });
1199: mLog.log(Level.FINE, exMsg, jmex);
1200: }
1201: return isEnabled.booleanValue();
1202: }
1203: }
|