0001: /*
0002: * BEGIN_HEADER - DO NOT EDIT
0003: *
0004: * The contents of this file are subject to the terms
0005: * of the Common Development and Distribution License
0006: * (the "License"). You may not use this file except
0007: * in compliance with the License.
0008: *
0009: * You can obtain a copy of the license at
0010: * https://open-esb.dev.java.net/public/CDDLv1.0.html.
0011: * See the License for the specific language governing
0012: * permissions and limitations under the License.
0013: *
0014: * When distributing Covered Code, include this CDDL
0015: * HEADER in each file and include the License file at
0016: * https://open-esb.dev.java.net/public/CDDLv1.0.html.
0017: * If applicable add the following below this CDDL HEADER,
0018: * with the fields enclosed by brackets "[]" replaced with
0019: * your own identifying information: Portions Copyright
0020: * [year] [name of copyright owner]
0021: */
0022:
0023: /*
0024: * @(#)DeliveryChannelImpl.java
0025: * Copyright 2004-2007 Sun Microsystems, Inc. All Rights Reserved.
0026: *
0027: * END_HEADER - DO NOT EDIT
0028: */
0029: package com.sun.jbi.messaging;
0030:
0031: import com.sun.jbi.messaging.stats.METimestamps;
0032: import com.sun.jbi.messaging.stats.Value;
0033:
0034: import com.sun.jbi.messaging.util.Translator;
0035: import com.sun.jbi.monitoring.ComponentStatisticsBase;
0036:
0037: import java.net.URI;
0038: import java.util.HashSet;
0039: import java.util.Iterator;
0040: import java.util.IdentityHashMap;
0041: import java.util.LinkedList;
0042: import java.util.Map;
0043: import java.util.HashMap;
0044: import java.util.Set;
0045: import java.util.logging.Logger;
0046: import javax.management.openmbean.CompositeData;
0047: import javax.management.openmbean.CompositeDataSupport;
0048: import javax.management.openmbean.CompositeType;
0049: import javax.management.openmbean.SimpleType;
0050: import javax.management.openmbean.OpenType;
0051: import javax.jbi.messaging.ExchangeStatus;
0052: import javax.jbi.messaging.MessageExchange;
0053: import javax.jbi.messaging.MessageExchangeFactory;
0054: import javax.jbi.servicedesc.ServiceEndpoint;
0055:
0056: import javax.transaction.xa.XAResource;
0057:
0058: import javax.xml.namespace.QName;
0059:
0060: import org.w3c.dom.Document;
0061:
0062: /** Implementation of DeliveryChannel interface.
0063: * @author Sun Microsystems, Inc.
0064: */
0065: public class DeliveryChannelImpl implements DeliveryChannel,
0066: ChannelStatistics {
0067: public static final String REG_NOTIFICATION_TYPE = "Registration";
0068:
0069: /** Component id of channel owner. */
0070: private String mChannelId;
0071: /** ClassLoader used by the owner of the channel. */
0072: private ClassLoader mClassLoader;
0073: /** Reference to message service. */
0074: private MessageService mMsgSvc;
0075: /** Flag indicating state of channel */
0076: private boolean mIsClosed;
0077: /** Flag indicating transactional state. */
0078: private boolean mTransactional;
0079: /** Map of in process exchanges for this Channel. */
0080: private IdentityHashMap mActive;
0081: /** Queue of MessageExchanges waiting for delivery. */
0082: private LinkedList mQueue;
0083: private int mAcceptors;
0084:
0085: /** Basic statistics used for DeliveryChannel development. */
0086: private long mDeadWait;
0087: private long mTimeOut;
0088: private long mSendCount;
0089: private long mSendSyncCount;
0090: private long mAcceptCount;
0091:
0092: /** Primary statistics for external use. */
0093: private long mSendRequest;
0094: private long mReceiveRequest;
0095: private long mSendReply;
0096: private long mReceiveReply;
0097: private long mSendFault;
0098: private long mReceiveFault;
0099: private long mSendDONE;
0100: private long mReceiveDONE;
0101: private long mSendERROR;
0102: private long mReceiveERROR;
0103: private Value mResponseTime;
0104: private Value mStatusTime;
0105: private Value mNMRTime;
0106: private Value mComponentTime;
0107: private Value mChannelTime;
0108:
0109: /** Set of Internal endpoints activiated. */
0110: private HashSet<RegisteredEndpoint> mInternalEndpoints;
0111: private HashMap<String, RegisteredEndpoint> mEndpoints;
0112:
0113: /** Set of External endpoints registered. */
0114: private HashSet<RegisteredEndpoint> mExternalEndpoints;
0115:
0116: private EndpointRegistry mRegistry;
0117: private EndpointListener mListener;
0118: private TimeoutListener mTimeout;
0119:
0120: private int mNotifyIdx = 0;
0121:
0122: private Logger mLog = Logger.getLogger(this .getClass().getPackage()
0123: .getName());
0124:
0125: /**
0126: * Statistics holder provided by the component owning this DeliveryChannel.
0127: */
0128: private ComponentStatisticsBase mStats;
0129:
0130: /**
0131: * Messaging Statistics holder provided by the component owning this
0132: * DeliveryChannel.
0133: */
0134: private MessagingStatistics mMsgStats;
0135:
0136: /** Create a new DeliveryChannel.
0137: * @param channelId component id
0138: * @param msgSvc message service
0139: */
0140: DeliveryChannelImpl(String channelId, ClassLoader classLoader,
0141: MessageService msgSvc, EndpointListener listener) {
0142: mChannelId = channelId;
0143: mClassLoader = classLoader;
0144: mMsgSvc = msgSvc;
0145: mIsClosed = false;
0146: mTransactional = true;
0147: mQueue = new LinkedList();
0148: mActive = new IdentityHashMap();
0149: mInternalEndpoints = new HashSet();
0150: mExternalEndpoints = new HashSet();
0151: mEndpoints = new HashMap();
0152: mRegistry = EndpointRegistry.getInstance();
0153: mListener = listener;
0154: mStats = null;
0155: mResponseTime = new Value();
0156: mStatusTime = new Value();
0157: mNMRTime = new Value();
0158: mComponentTime = new Value();
0159: mChannelTime = new Value();
0160: }
0161:
0162: /*#####################################################################
0163: *# #
0164: *# Public API Methods #
0165: *# #
0166: *#####################################################################*/
0167:
0168: /*######################### Communication #############################*/
0169:
0170: /** Blocking call which samples the channel at a pre-determined interval
0171: * for available exchanges.
0172: * @return message exchange, or null if channel was interrupted.
0173: * @throws javax.jbi.messaging.MessagingException error while reading
0174: * exchange.
0175: */
0176: public MessageExchange accept()
0177: throws javax.jbi.messaging.MessagingException {
0178: MessageExchangeProxy me = null;
0179:
0180: me = acceptInternal(0);
0181: if (me != null) {
0182: if (me.capture(METimestamps.TAG_CACCEPT,
0183: METimestamps.TAG_PACCEPT)) {
0184: propagateStatistics(me);
0185: }
0186: }
0187: return (me);
0188: }
0189:
0190: public MessageExchange accept(long timeout)
0191: throws javax.jbi.messaging.MessagingException {
0192: MessageExchangeProxy me = null;
0193:
0194: me = acceptInternal(timeout);
0195: if (me != null) {
0196: if (me.capture(METimestamps.TAG_CACCEPT,
0197: METimestamps.TAG_PACCEPT)) {
0198: propagateStatistics(me);
0199: }
0200: }
0201: return (me);
0202: }
0203:
0204: private MessageExchangeProxy acceptInternal(long timeout)
0205: throws javax.jbi.messaging.MessagingException {
0206: MessageExchangeProxy me = null;
0207:
0208: if (mIsClosed) {
0209: throw new javax.jbi.messaging.MessagingException(Translator
0210: .translate(LocalStringKeys.CHANNEL_CLOSED));
0211: }
0212: if (timeout == 0) {
0213: me = dequeueExchange();
0214: } else {
0215: me = dequeueExchange(timeout);
0216: }
0217:
0218: if (me != null) {
0219: if (me.handleAccept(this )) {
0220: removeAcceptReference(me);
0221: mLog.finer(Translator
0222: .translate(LocalStringKeys.EXCHANGE_ACCEPTED,
0223: new Object[] { me.getExchangeId(),
0224: mChannelId }));
0225: } else {
0226: updateStatistics(me);
0227: }
0228: }
0229: return me;
0230: }
0231:
0232: /** Defers routing to the message service. */
0233: public void send(MessageExchange exchange)
0234: throws javax.jbi.messaging.MessagingException {
0235: //
0236: // Capture timestamp.
0237: //
0238: ((MessageExchangeProxy) exchange).capture(
0239: METimestamps.TAG_CSEND, METimestamps.TAG_PSEND);
0240:
0241: if (isClosed()) {
0242: throw new javax.jbi.messaging.MessagingException(Translator
0243: .translate(LocalStringKeys.CHANNEL_CLOSED));
0244: }
0245: mMsgSvc.doExchange(this , (MessageExchangeProxy) exchange);
0246: }
0247:
0248: public boolean sendSync(MessageExchange exchange)
0249: throws javax.jbi.messaging.MessagingException {
0250: boolean available;
0251:
0252: //
0253: // Capture timestamp.
0254: //
0255: ((MessageExchangeProxy) exchange).capture(
0256: METimestamps.TAG_CSEND, METimestamps.TAG_PSEND);
0257:
0258: return (sendSyncInternal((MessageExchangeProxy) exchange, 0));
0259: }
0260:
0261: public boolean sendSync(MessageExchange exchange, long timeout)
0262: throws javax.jbi.messaging.MessagingException {
0263: boolean available;
0264:
0265: //
0266: // Capture timestamp.
0267: //
0268: ((MessageExchangeProxy) exchange).capture(
0269: METimestamps.TAG_CSEND, METimestamps.TAG_PSEND);
0270:
0271: return (sendSyncInternal((MessageExchangeProxy) exchange,
0272: timeout));
0273: }
0274:
0275: private boolean sendSyncInternal(MessageExchangeProxy mep,
0276: long timeout) throws javax.jbi.messaging.MessagingException {
0277: boolean available;
0278:
0279: if (isClosed()) {
0280: throw new javax.jbi.messaging.MessagingException(Translator
0281: .translate(LocalStringKeys.CHANNEL_CLOSED));
0282: }
0283:
0284: mLog.finer(Translator.translate(LocalStringKeys.EXCHANGE_SEND,
0285: new Object[] { mep.getExchangeId(), mChannelId }));
0286:
0287: if (available = mMsgSvc.doSynchExchange(this , mep, timeout)) {
0288: if (mep.handleAccept(this )) {
0289: this .removeAcceptReference(mep);
0290: } else {
0291: updateStatistics(mep);
0292: }
0293: if (mep.capture(METimestamps.TAG_CACCEPT,
0294: METimestamps.TAG_PACCEPT)) {
0295: propagateStatistics(mep);
0296: }
0297: }
0298:
0299: return available;
0300: }
0301:
0302: /*###################### Exchange Factories #############################*/
0303:
0304: /** Create a message exchange factory. This factory will create exchange
0305: * instances with all appropriate properties set to null.
0306: * @return a message exchange factory
0307: */
0308: public MessageExchangeFactory createExchangeFactory() {
0309: return new ExchangeFactory(mMsgSvc);
0310: }
0311:
0312: /** Create a message exchange factory for the given interface name.
0313: * @param interfaceName name of the interface for which all exchanges
0314: * created by the returned factory will be set
0315: * @return name of the interface for which all exchanges created by the
0316: * returned factory will be set
0317: */
0318: public MessageExchangeFactory createExchangeFactory(
0319: QName interfaceName) {
0320: return ExchangeFactory.newInterfaceFactory(mMsgSvc,
0321: interfaceName);
0322: }
0323:
0324: /** Create a message exchange factory for the given service name.
0325: * @param serviceName name of the service for which all exchanges
0326: * created by the returned factory will be set
0327: * @return an exchange factory that will create exchanges for the given
0328: * service; must be non-null
0329: */
0330: public MessageExchangeFactory createExchangeFactoryForService(
0331: QName serviceName) {
0332: return ExchangeFactory.newServiceFactory(mMsgSvc, serviceName);
0333: }
0334:
0335: /** Create a message exchange factory for the given endpoint.
0336: * @param endpoint endpoint for which all exchanges created by the
0337: * returned factory will be set for
0338: * @return an exchange factory that will create exchanges for the
0339: * given endpoint
0340: */
0341: public MessageExchangeFactory createExchangeFactory(
0342: ServiceEndpoint endpoint) {
0343: return ExchangeFactory.newEndpointFactory(mMsgSvc, endpoint);
0344: }
0345:
0346: /*################# Framework Passthrough Methods #######################*/
0347:
0348: /**
0349: * Register an XAResource
0350: * @param resource to be added.
0351: */
0352: public void registerXAResource(XAResource resource) {
0353: mMsgSvc.addXAResource(resource);
0354: }
0355:
0356: /** Activate an endpoint.
0357: * @param service QName of service
0358: * @param endpoint NCName of endpoint
0359: * @return activated endpoint
0360: * @throws javax.jbi.messaging.MessagingException failed to register endpoint
0361: */
0362: public RegisteredEndpoint activateEndpoint(QName service,
0363: String endpoint)
0364: throws javax.jbi.messaging.MessagingException {
0365: RegisteredEndpoint re;
0366:
0367: if (service == null || endpoint == null || endpoint.equals("")) {
0368: throw new javax.jbi.messaging.MessagingException(Translator
0369: .translate(LocalStringKeys.ACTIVATE_NOT_NULL));
0370: }
0371:
0372: re = mRegistry.registerInternalEndpoint(service, endpoint,
0373: getChannelId());
0374: if (null != mStats && mStats.getStatisticsBase().isEnabled()) {
0375: mStats.incrementRegisteredServicesOrEndpoints();
0376: }
0377:
0378: synchronized (mInternalEndpoints) {
0379: mInternalEndpoints.add(re);
0380: mEndpoints.put(re.toExternalName(), re);
0381: }
0382:
0383: //re.parseDescriptor(getEndpointDescriptor(re));
0384:
0385: if (mListener != null) {
0386: mListener.activate(
0387: (com.sun.jbi.messaging.DeliveryChannel) this , re);
0388: }
0389: return re;
0390: }
0391:
0392: /** Used by a BC to deactivate an endpoint which it has registered.
0393: * @param ref endpoint reference
0394: * @throws javax.jbi.messaging.MessagingException failed to deactivate
0395: * endpoint.
0396: */
0397: public void deactivateEndpoint(ServiceEndpoint ref)
0398: throws javax.jbi.messaging.MessagingException {
0399: RegisteredEndpoint re;
0400:
0401: re = (RegisteredEndpoint) ref;
0402:
0403: if (!re.getOwnerId().equals(mChannelId)) {
0404: throw new javax.jbi.messaging.MessagingException(Translator
0405: .translate(LocalStringKeys.DEACTIVATE_NOT_OWNER));
0406: }
0407:
0408: synchronized (mInternalEndpoints) {
0409: mInternalEndpoints.remove(ref);
0410: mEndpoints.remove(((RegisteredEndpoint) ref)
0411: .toExternalName());
0412: }
0413:
0414: mRegistry.removeEndpoint(re);
0415: if (null != mStats && mStats.getStatisticsBase().isEnabled()) {
0416: mStats.decrementRegisteredServicesOrEndpoints();
0417: }
0418:
0419: if (mListener != null) {
0420: mListener.deactivate(
0421: (com.sun.jbi.messaging.DeliveryChannel) this , re);
0422: }
0423: }
0424:
0425: /**
0426: * Registers the specified external endpoint with the NMR. This indicates
0427: * to the NMR that the specified endpoint is used as a proxy for external
0428: * service consumers to access an internal service by the same name.
0429: * @throw javax.jbi.messaging.MessagingException if the specified endpoint
0430: * has already been registered.
0431: */
0432: public void registerExternalEndpoint(
0433: ServiceEndpoint externalEndpoint)
0434: throws javax.jbi.messaging.MessagingException {
0435: RegisteredEndpoint re;
0436:
0437: try {
0438: re = mRegistry.registerExternalEndpoint(externalEndpoint,
0439: getChannelId());
0440: if (null != mStats
0441: && mStats.getStatisticsBase().isEnabled()) {
0442: mStats.incrementRegisteredServicesOrEndpoints();
0443: }
0444: synchronized (mExternalEndpoints) {
0445: mExternalEndpoints.add(re);
0446: }
0447: } catch (javax.jbi.messaging.MessagingException msgEx) {
0448: mLog.warning(msgEx.getMessage());
0449: throw msgEx;
0450: }
0451: }
0452:
0453: /**
0454: * Deregisters the specified external endpoint with the NMR. This indicates
0455: * to the NMR that external service consumers can no longer access the
0456: * internal service by this name.
0457: */
0458: public void deregisterExternalEndpoint(
0459: ServiceEndpoint externalEndpoint)
0460: throws javax.jbi.messaging.MessagingException {
0461: RegisteredEndpoint re;
0462:
0463: re = mRegistry.getExternalEndpoint(externalEndpoint
0464: .getServiceName(), externalEndpoint.getEndpointName());
0465:
0466: if (!re.getOwnerId().equals(mChannelId)) {
0467: throw new javax.jbi.messaging.MessagingException(Translator
0468: .translate(LocalStringKeys.DEACTIVATE_NOT_OWNER));
0469: }
0470:
0471: synchronized (mExternalEndpoints) {
0472: mExternalEndpoints.remove(re);
0473: }
0474:
0475: mRegistry.removeEndpoint(re);
0476: if (null != mStats && mStats.getStatisticsBase().isEnabled()) {
0477: mStats.decrementRegisteredServicesOrEndpoints();
0478: }
0479: }
0480:
0481: /**
0482: * Get the service description for the named endpoint, if any exists.
0483: * @param service the qualified name of the endpoint's service.
0484: */
0485: public ServiceEndpoint getEndpoint(QName service, String name) {
0486: ServiceEndpoint[] list;
0487: ServiceEndpoint endpoint = null;
0488:
0489: list = getEndpointsForService(service);
0490:
0491: for (int i = 0; i < list.length; i++) {
0492: if (list[i].getEndpointName().equals(name)) {
0493: endpoint = list[i];
0494: break;
0495: }
0496: }
0497:
0498: return endpoint;
0499: }
0500:
0501: synchronized RegisteredEndpoint getEndpointByExternalName(
0502: String name) {
0503: return (mEndpoints.get(name));
0504: }
0505:
0506: synchronized RegisteredEndpoint getConsumingEndpointByExternalName(
0507: String name) {
0508: return (mRegistry.getLinkedEndpointByName(name));
0509: }
0510:
0511: /** Uses the owner id of the reference to query the appopriate channel. */
0512: public Document getEndpointDescriptor(ServiceEndpoint endpoint)
0513: throws javax.jbi.messaging.MessagingException {
0514: return mMsgSvc.queryDescriptor(endpoint);
0515: }
0516:
0517: /**
0518: * Queries the NMR for external endpoints that implement the specified
0519: * interface name.
0520: */
0521: public ServiceEndpoint[] getExternalEndpoints(QName interfaceName) {
0522: return mRegistry.getExternalEndpointsForInterface(
0523: interfaceName, mMsgSvc);
0524: }
0525:
0526: /**
0527: * Queries the NMR for active endpoints that implement the given interface.
0528: * This will return the endpoints for all services and endpoints that
0529: * implement the named interface (portType in WSDL 1.1). This method does
0530: * NOT include external endpoints (those registered using
0531: * registerExternalEndpoint(ServiceEndpoint)).
0532: * @param interfaceName qualified name of interface/portType that is
0533: * implemented by the endpoint
0534: * @return ServiceEndpoint[] list of available endpoints for the specified
0535: * interface name; potentially zero-length.
0536: */
0537: public ServiceEndpoint[] getEndpoints(QName interfaceName) {
0538: return mRegistry.getInternalEndpointsForInterface(
0539: interfaceName, mMsgSvc);
0540: }
0541:
0542: public ServiceEndpoint[] getEndpoints() {
0543: return (mInternalEndpoints
0544: .toArray(new ServiceEndpoint[mInternalEndpoints.size()]));
0545: }
0546:
0547: /**
0548: * Queries the NMR for external endpoints that are part of the specified
0549: * service.
0550: */
0551: public ServiceEndpoint[] getExternalEndpointsForService(
0552: QName serviceName) {
0553: return mRegistry.getExternalEndpointsForService(serviceName);
0554: }
0555:
0556: /**
0557: * Resolves the specified endpoint reference into a service endpoint. This
0558: * is called by the component when it has an endpoint reference that it
0559: * needs to resolve into a service endpoint.
0560: */
0561: public ServiceEndpoint resolveEndpointReference(
0562: org.w3c.dom.DocumentFragment endpointReference) {
0563: return mMsgSvc.resolveEndpointReference(endpointReference);
0564: }
0565:
0566: /** Used by ComponentContext.getEndpointsForService(). */
0567: public ServiceEndpoint[] getEndpointsForService(QName service) {
0568: // return endpoints from registry without converting connection links
0569: return mRegistry.getInternalEndpointsForService(service, false);
0570: }
0571:
0572: /*###################### RI Private Methods ###############################*/
0573:
0574: public void setTransactional(boolean transactional) {
0575: mTransactional = transactional;
0576: }
0577:
0578: public final boolean isTransactional() {
0579: return (mTransactional);
0580: }
0581:
0582: public ClassLoader getClassLoader() {
0583: return (mClassLoader);
0584: }
0585:
0586: /*########################## Lifecycle #################################*/
0587:
0588: /** Close the channel.
0589: * @throws javax.jbi.messaging.MessagingException failure during close.
0590: */
0591: public void close() throws javax.jbi.messaging.MessagingException {
0592: if (mIsClosed) {
0593: return;
0594: }
0595: mIsClosed = true;
0596:
0597: for (RegisteredEndpoint ref : mInternalEndpoints) {
0598: mRegistry.removeEndpoint(ref);
0599: }
0600: for (RegisteredEndpoint ref : mExternalEndpoints) {
0601: mRegistry.removeEndpoint(ref);
0602: }
0603:
0604: // Terminate active, flush accept queue and notify waiters.
0605: cleanActive();
0606:
0607: // Remove channel from message service registry
0608: mMsgSvc.removeChannel(mChannelId);
0609: }
0610:
0611: public boolean isClosed() {
0612: return (mIsClosed);
0613: }
0614:
0615: /*#####################################################################
0616: *# #
0617: *# Internal Methods #
0618: *# #
0619: *#####################################################################*/
0620:
0621: /** Retrieve the id for this channel. */
0622: String getChannelId() {
0623: return mChannelId;
0624: }
0625:
0626: /** Track the given MessageExchange. */
0627: synchronized void addSendReference(MessageExchangeProxy me) {
0628: if (mActive.put(me, me) == null) {
0629: me.setInUse(mChannelId);
0630: }
0631: mSendCount++;
0632: switch (me.getPhase()) {
0633: case MessageExchangeProxy.PHASE_REPLY:
0634: mSendReply++;
0635: break;
0636:
0637: case MessageExchangeProxy.PHASE_REQUEST:
0638: mSendRequest++;
0639: break;
0640:
0641: case MessageExchangeProxy.PHASE_DONE:
0642: mSendDONE++;
0643: break;
0644:
0645: case MessageExchangeProxy.PHASE_ERROR:
0646: mSendERROR++;
0647: break;
0648:
0649: case MessageExchangeProxy.PHASE_FAULT:
0650: mSendFault++;
0651: break;
0652: }
0653: }
0654:
0655: /** Track the given MessageExchange. */
0656: synchronized void addSendSyncReference(MessageExchangeProxy me) {
0657: if (mActive.put(me, me) == null) {
0658: me.setInUse(mChannelId);
0659: }
0660: mSendSyncCount++;
0661: switch (me.getPhase()) {
0662: case MessageExchangeProxy.PHASE_REPLY:
0663: mSendReply++;
0664: break;
0665:
0666: case MessageExchangeProxy.PHASE_REQUEST:
0667: mSendRequest++;
0668: break;
0669:
0670: case MessageExchangeProxy.PHASE_DONE:
0671: mSendDONE++;
0672: break;
0673:
0674: case MessageExchangeProxy.PHASE_ERROR:
0675: mSendERROR++;
0676: break;
0677:
0678: case MessageExchangeProxy.PHASE_FAULT:
0679: mSendFault++;
0680: break;
0681: }
0682: }
0683:
0684: /** Stop tracking the given MessageExchange. */
0685: synchronized void removeSendReference(MessageExchangeProxy me) {
0686: mActive.remove(me);
0687: me.resetInUse();
0688: mSendCount++;
0689: switch (me.getPhase()) {
0690: case MessageExchangeProxy.PHASE_REPLY:
0691: mSendReply++;
0692: break;
0693:
0694: case MessageExchangeProxy.PHASE_REQUEST:
0695: mSendRequest++;
0696: break;
0697:
0698: case MessageExchangeProxy.PHASE_DONE:
0699: mSendDONE++;
0700: break;
0701:
0702: case MessageExchangeProxy.PHASE_ERROR:
0703: mSendERROR++;
0704: break;
0705:
0706: case MessageExchangeProxy.PHASE_FAULT:
0707: mSendFault++;
0708: break;
0709: }
0710: }
0711:
0712: /** Stop tracking the given MessageExchange. */
0713: synchronized void removeAcceptReference(MessageExchangeProxy me) {
0714: if (mActive.remove(me) != null) {
0715: me.resetInUse();
0716: }
0717: switch (me.getPhase()) {
0718: case MessageExchangeProxy.PHASE_REPLY:
0719: mReceiveReply++;
0720: break;
0721:
0722: case MessageExchangeProxy.PHASE_REQUEST:
0723: mReceiveRequest++;
0724: break;
0725:
0726: case MessageExchangeProxy.PHASE_DONE:
0727: mReceiveDONE++;
0728: break;
0729:
0730: case MessageExchangeProxy.PHASE_ERROR:
0731: mReceiveERROR++;
0732: break;
0733:
0734: case MessageExchangeProxy.PHASE_FAULT:
0735: mReceiveFault++;
0736: break;
0737: }
0738: }
0739:
0740: synchronized void addAcceptReference(MessageExchangeProxy me) {
0741: if (mActive.put(me, me) == null) {
0742: me.setInUse(null);
0743: }
0744: }
0745:
0746: void propagateStatistics(MessageExchangeProxy me) {
0747: updateStatistics(me);
0748: me.updateStatistics();
0749: mMsgSvc.updateStatistics(me);
0750: }
0751:
0752: synchronized void updateStatistics(MessageExchangeProxy me) {
0753: switch (me.getPhase()) {
0754: case MessageExchangeProxy.PHASE_REPLY:
0755: mReceiveReply++;
0756: break;
0757:
0758: case MessageExchangeProxy.PHASE_REQUEST:
0759: mReceiveRequest++;
0760: break;
0761:
0762: case MessageExchangeProxy.PHASE_DONE:
0763: mReceiveDONE++;
0764: break;
0765:
0766: case MessageExchangeProxy.PHASE_ERROR:
0767: mReceiveERROR++;
0768: break;
0769:
0770: case MessageExchangeProxy.PHASE_FAULT:
0771: mReceiveFault++;
0772: break;
0773: }
0774: }
0775:
0776: synchronized void updateProviderStatistics(METimestamps ts) {
0777: if (ts != null) {
0778: mResponseTime.addSample(ts.mResponseTime);
0779: mNMRTime.addSample(ts.mNMRTime);
0780: mComponentTime.addSample(ts.mProviderTime);
0781: mChannelTime.addSample(ts.mProviderChannelTime);
0782: }
0783: }
0784:
0785: synchronized void updateConsumerStatistics(METimestamps ts) {
0786: if (ts != null) {
0787: if (ts.mStatusTime != 0) {
0788: mStatusTime.addSample(ts.mStatusTime);
0789: }
0790: if (ts.mConsumerTime != 0) {
0791: mComponentTime.addSample(ts.mConsumerTime);
0792: }
0793: mNMRTime.addSample(ts.mNMRTime);
0794: mChannelTime.addSample(ts.mConsumerChannelTime);
0795: }
0796: }
0797:
0798: /** Check if a MessageExchange is being tracked. */
0799: synchronized public boolean activeReference(MessageExchange me) {
0800: return (mActive.get(me) != null);
0801: }
0802:
0803: /**
0804: * Queue a message exchange for an observer
0805: */
0806: synchronized void queueObserved(MessageExchangeProxy mep)
0807: throws javax.jbi.messaging.MessagingException {
0808: if (mIsClosed) {
0809: mLog.warning(Translator
0810: .translate(LocalStringKeys.CHANNEL_CLOSED));
0811: return;
0812: }
0813:
0814: mQueue.addLast(mep);
0815: if (mQueue.size() == 1) {
0816: this .notifyAll();
0817: }
0818: }
0819:
0820: /** Queue a message exchange. Notify any waiters on 0->1 transition. */
0821: synchronized void queueExchange(MessageExchangeProxy mep)
0822: throws javax.jbi.messaging.MessagingException {
0823: if (mIsClosed) {
0824: throw new javax.jbi.messaging.MessagingException(Translator
0825: .translate(LocalStringKeys.CHANNEL_CLOSED));
0826: }
0827:
0828: addAcceptReference(mep);
0829: if (mep.getSynchState() == MessageExchangeProxy.NONE) {
0830: //
0831: // Give priority to completed messages.
0832: //
0833: if (mep.getStatus().equals(ExchangeStatus.ACTIVE)) {
0834: mQueue.addLast(mep);
0835: } else {
0836: mQueue.addFirst(mep);
0837: }
0838: mep.setSynchState(MessageExchangeProxy.DONE);
0839: if (mQueue.size() == 1) {
0840: this .notifyAll();
0841: }
0842: mep.capture(METimestamps.TAG_CQUEUE,
0843: METimestamps.TAG_PQUEUE);
0844: } else {
0845: synchronized (mep) {
0846: if (mep.getSynchState() == MessageExchangeProxy.WAIT
0847: || mep.getSynchState() == MessageExchangeProxy.WAIT_TIMEOUT) {
0848: mAcceptCount++;
0849: mep.setSynchState(MessageExchangeProxy.HALF_DONE);
0850: mep.capture(METimestamps.TAG_CQUEUE,
0851: METimestamps.TAG_PQUEUE);
0852: mep.notify();
0853: } else if (mep.getSynchState() == MessageExchangeProxy.ERROR) {
0854: mActive.remove(mep);
0855: mep.terminate();
0856: throw new javax.jbi.messaging.MessagingException(
0857: Translator
0858: .translate(LocalStringKeys.MESSAGE_TIMEOUT));
0859: }
0860: }
0861: }
0862: }
0863:
0864: /** Dequeue a message exchange. Wait if the queue is empty. */
0865: synchronized MessageExchangeProxy dequeueExchange()
0866: throws javax.jbi.messaging.MessagingException {
0867: MessageExchangeProxy me = null;
0868: boolean dead = false;
0869: try {
0870: mAcceptCount++;
0871: mAcceptors++;
0872: while (!mIsClosed) {
0873: if (mQueue.size() > 0) {
0874: me = (MessageExchangeProxy) mQueue.removeFirst();
0875: break;
0876: }
0877: this .wait();
0878: if (dead) {
0879: mDeadWait++;
0880: }
0881: dead = true;
0882: }
0883: } catch (InterruptedException intEx) {
0884: throw new javax.jbi.messaging.MessagingException(intEx);
0885: } finally {
0886: mAcceptors--;
0887: }
0888:
0889: return (me);
0890: }
0891:
0892: /** Dequeue a message exchange. Wait for the given number of milliseconds if the queue is empty. */
0893: synchronized MessageExchangeProxy dequeueExchange(long timeout)
0894: throws javax.jbi.messaging.MessagingException {
0895: MessageExchangeProxy me = null;
0896: boolean timedOut = false;
0897: boolean dead = false;
0898: long endTime = System.currentTimeMillis() + timeout;
0899:
0900: try {
0901: mAcceptCount++;
0902: mAcceptors++;
0903: while (!mIsClosed) {
0904: if (mQueue.size() > 0) {
0905: me = (MessageExchangeProxy) mQueue.removeFirst();
0906: break;
0907: }
0908: if (timedOut) {
0909: mTimeOut++;
0910: break;
0911: }
0912:
0913: /* We can wake up for two reasons:
0914: * (1) notify() was called
0915: * (2) the wait timer expired
0916: * The wait() call itself doesn't provide this information,
0917: * so we need to sample the time to see what happened.
0918: */
0919: this .wait(timeout);
0920: timeout = endTime - System.currentTimeMillis();
0921: timedOut = timeout <= 0;
0922: if (dead) {
0923: mDeadWait++;
0924: }
0925: dead = true;
0926: }
0927: } catch (InterruptedException intEx) {
0928: throw new javax.jbi.messaging.MessagingException(intEx);
0929: } finally {
0930: mAcceptors--;
0931: }
0932:
0933: return (me);
0934: }
0935:
0936: /** Clean up in preparation for closing the channel. */
0937: void cleanActive() {
0938: Set s;
0939: Iterator i;
0940:
0941: //
0942: // Try and cleanup any active MEP's.
0943: //
0944: synchronized (this ) {
0945: s = mActive.entrySet();
0946:
0947: for (i = s.iterator(); i.hasNext();) {
0948: MessageExchangeProxy mep;
0949: mep = (MessageExchangeProxy) ((Map.Entry) i.next())
0950: .getKey();
0951: if (mep.terminate()) {
0952: i.remove();
0953: mLog
0954: .warning("DeliveryChannel:Close Cleanup Exchange\n"
0955: + mep);
0956: }
0957: }
0958:
0959: mQueue.clear();
0960: this .notifyAll();
0961: }
0962:
0963: //
0964: // Any MEP's left over need to perform a send() to trigger processing
0965: // on the other side.
0966: //
0967: for (i = s.iterator(); i.hasNext();) {
0968: MessageExchangeProxy mep;
0969: mep = (MessageExchangeProxy) ((Map.Entry) i.next())
0970: .getKey();
0971: try {
0972: mep.getSendChannel().queueExchange(mep.getTwin());
0973: } catch (javax.jbi.messaging.MessagingException mEx) {
0974: //
0975: // Just swallow any errors as it means that the target is also
0976: // closed.
0977: //
0978: mLog.warning("MessagingException Id("
0979: + mep.getExchangeId() + ") : " + mEx);
0980: }
0981: }
0982: }
0983:
0984: /** Helper the forward channel lookup to MessageService. */
0985: DeliveryChannelImpl getChannel(String id) {
0986: return ((DeliveryChannelImpl) mMsgSvc.getChannel(id));
0987: }
0988:
0989: public void setEndpointListener(EndpointListener listener) {
0990: mMsgSvc.setEndpointListener(listener);
0991: }
0992:
0993: public void setTimeoutListener(TimeoutListener timeout) {
0994: mMsgSvc.setTimeoutListener(timeout == null ? null : this ,
0995: timeout);
0996: }
0997:
0998: public MessageExchange createExchange(URI pattern, String id)
0999: throws javax.jbi.messaging.MessagingException {
1000: MessageExchange me;
1001:
1002: me = createExchangeFactory().createExchange(pattern);
1003: ((MessageExchangeProxy) me).getMessageExchange().setExchangeId(
1004: id);
1005: return (me);
1006: }
1007:
1008: public ServiceEndpoint createEndpoint(QName service, String endpoint)
1009: throws javax.jbi.messaging.MessagingException {
1010: return (mRegistry.registerInternalEndpoint(service, endpoint,
1011: mChannelId));
1012: }
1013:
1014: public void setExchangeIdGenerator(ExchangeIdGenerator generator) {
1015: mMsgSvc.setExchangeIdGenerator(generator);
1016: }
1017:
1018: public boolean isExchangeOkay(MessageExchange me) {
1019: return (mMsgSvc
1020: .isExchangeOkay((com.sun.jbi.messaging.MessageExchange) me));
1021: }
1022:
1023: //----------------------------ChannelStatistics-----------------------------
1024:
1025: /**
1026: * List of item names for CompositeData construction.
1027: */
1028: private static final String[] ITEM_NAMES = { "ActiveExchanges",
1029: "ActiveEndpoints", "SendRequest", "ReceiveRequest",
1030: "SendReply", "ReceiveReply", "SendFault", "ReceiveFault",
1031: "SendDONE", "ReceiveDONE", "SendERROR", "ReceiveERROR",
1032: "DeadWait", "TimeOut", "SendCount", "SendSyncCount",
1033: "AcceptCount", "ResponseTimeMin (ns)",
1034: "ResponseTimeAvg (ns)", "ResponseTimeMax (ns)",
1035: "ResponseTimeStd (ns)", "StatusTimeMin (ns)",
1036: "StatusTimeAvg (ns)", "StatusTimeMax (ns)",
1037: "StatusTimeStd (ns)", "NMRTimeMin (ns)", "NMRTimeAvg (ns)",
1038: "NMRTimeMax (ns)", "NMRTimeStd (ns)",
1039: "ComponentTimeMin (ns)", "ComponentTimeAvg (ns)",
1040: "ComponentTimeMax (ns)", "ComponentTimeStd (ns)",
1041: "ChannelTimeMin (ns)", "ChannelTimeAvg (ns)",
1042: "ChannelTimeMax (ns)", "ChannelTimeStd (ns)" };
1043:
1044: /**
1045: * List of descriptions of items for ComponsiteData construction.
1046: */
1047: private static final String ITEM_DESCRIPTIONS[] = {
1048: "Number of active MessageExchanges",
1049: "Number of active Endpoints", "Number of requests sent",
1050: "Number of requests received", "Number of replies sent",
1051: "Number of replies received", "Number of faults sent",
1052: "Number of faults received",
1053: "Number of DONE requests sent",
1054: "Number of DONE requests received",
1055: "Number of ERROR requests sent",
1056: "Number of ERROR requests received",
1057: "Number of waits that didn't find any work",
1058: "Number of requests that timed out",
1059: "Number of Send operations",
1060: "Number of SendSync operations",
1061: "Number of Accept operations", "Response Time Min",
1062: "Response Time Avg", "Response Time Max",
1063: "Response Time Std", "Status Time Min", "Status Time Avg",
1064: "Status Time Max", "Status Time Std", "NMR Time Min",
1065: "NMR Time Avg", "NMR Time Max", "NMR Time Std",
1066: "Component Time Min", "Component Time Avg",
1067: "Component Time Max", "Component Time Std",
1068: "Channel Time Min", "Channel Time Avg", "Channel Time Max",
1069: "Channel Time Std" };
1070:
1071: /**
1072: * List of types of items for CompositeData construction.
1073: */
1074: private static final OpenType ITEM_TYPES[] = { SimpleType.LONG,
1075: SimpleType.LONG, SimpleType.LONG, SimpleType.LONG,
1076: SimpleType.LONG, SimpleType.LONG, SimpleType.LONG,
1077: SimpleType.LONG, SimpleType.LONG, SimpleType.LONG,
1078: SimpleType.LONG, SimpleType.LONG, SimpleType.LONG,
1079: SimpleType.LONG, SimpleType.LONG, SimpleType.LONG,
1080: SimpleType.LONG, SimpleType.LONG, SimpleType.LONG,
1081: SimpleType.LONG, SimpleType.LONG, SimpleType.LONG,
1082: SimpleType.LONG, SimpleType.LONG, SimpleType.LONG,
1083: SimpleType.LONG, SimpleType.LONG, SimpleType.LONG,
1084: SimpleType.LONG, SimpleType.LONG, SimpleType.LONG,
1085: SimpleType.LONG, SimpleType.LONG, SimpleType.LONG,
1086: SimpleType.LONG, SimpleType.LONG, SimpleType.LONG };
1087:
1088: public String getName() {
1089: return (mChannelId);
1090: }
1091:
1092: public String[] getEndpointNames() {
1093: String[] results;
1094: Object[] eps = mInternalEndpoints.toArray();
1095:
1096: results = new String[eps.length];
1097: for (int i = 0; i < eps.length; i++) {
1098: results[i] = ((RegisteredEndpoint) eps[i]).toExternalName();
1099: }
1100: return (results);
1101: }
1102:
1103: public String[] getConsumingEndpointNames() {
1104: return mRegistry.getLinkedEndpointsByChannel(mChannelId);
1105: }
1106:
1107: public EndpointStatistics getEndpointStatistics(String name) {
1108: return (getEndpointByExternalName(name));
1109: }
1110:
1111: public CompositeData getStatistics() {
1112: try {
1113: Object values[] = { (long) (mActive.size()),
1114: (long) (mInternalEndpoints.size()), mSendRequest,
1115: mReceiveRequest, mSendReply, mReceiveReply,
1116: mSendFault, mReceiveFault, mSendDONE, mReceiveDONE,
1117: mSendERROR, mReceiveERROR, mDeadWait, mTimeOut,
1118: mSendCount, mSendSyncCount, mAcceptCount,
1119: mResponseTime.getMin(),
1120: (long) mResponseTime.getAverage(),
1121: mResponseTime.getMax(),
1122: (long) mResponseTime.getSd(), mStatusTime.getMin(),
1123: (long) mStatusTime.getAverage(),
1124: mStatusTime.getMax(), (long) mStatusTime.getSd(),
1125: mNMRTime.getMin(), (long) mNMRTime.getAverage(),
1126: mNMRTime.getMax(), (long) mNMRTime.getSd(),
1127: mComponentTime.getMin(),
1128: (long) mComponentTime.getAverage(),
1129: mComponentTime.getMax(),
1130: (long) mComponentTime.getSd(),
1131: mChannelTime.getMin(),
1132: (long) mChannelTime.getAverage(),
1133: mChannelTime.getMax(), (long) mChannelTime.getSd() };
1134:
1135: return new CompositeDataSupport(new CompositeType(
1136: "DeliveryChannelStatistics",
1137: "DeliveryChannel statistics", ITEM_NAMES,
1138: ITEM_DESCRIPTIONS, ITEM_TYPES), ITEM_NAMES, values);
1139: } catch (javax.management.openmbean.OpenDataException odEx) {
1140: ; // ignore this for now
1141: }
1142: return (null);
1143: }
1144:
1145: //-------------------------------Object-------------------------------------
1146:
1147: public String toString() {
1148: StringBuilder sb = new StringBuilder();
1149:
1150: sb.append(" DeliveryChannel for Component: ");
1151: sb.append(mChannelId);
1152: sb.append("\n State: ");
1153: sb.append(mIsClosed ? "CLOSED" : "OPEN");
1154: sb.append(" Transactional: ");
1155: sb.append(mTransactional ? "YES" : "NO");
1156: sb.append(" Acceptors: " + mAcceptors);
1157: sb.append("\n DeadWaitCount: " + mDeadWait);
1158: sb.append(" TimeoutCount: " + mTimeOut);
1159: sb.append("\n SendCount: " + mSendCount);
1160: sb.append(" SendSyncCount:" + mSendSyncCount);
1161: sb.append(" AcceptCount: " + mAcceptCount);
1162: sb.append("\n SendRequest: " + mSendRequest);
1163: sb.append(" RecvRequest: " + mReceiveRequest);
1164: sb.append(" SendReply: " + mSendReply);
1165: sb.append(" RecvReply: " + mReceiveReply);
1166: sb.append("\n SendDONE: " + mSendDONE);
1167: sb.append(" RecvDONE: " + mReceiveDONE);
1168: sb.append(" SendERROR: " + mSendERROR);
1169: sb.append(" RecvERROR: " + mReceiveERROR);
1170: sb.append("\n SendFault: " + mSendFault);
1171: sb.append(" RecvFault: " + mReceiveFault);
1172: sb.append("\n ResponseTime: " + mResponseTime.toString());
1173: sb.append("\n StatusTime: " + mStatusTime.toString());
1174: sb.append("\n ComponentTime: " + mComponentTime.toString());
1175: sb.append("\n ChannelTime: " + mChannelTime.toString());
1176: sb.append("\n NMRTime: " + mNMRTime.toString());
1177: sb.append("\n InternalEndpoints Count: ");
1178: sb.append(mInternalEndpoints.size());
1179: sb.append("\n");
1180: for (Iterator i = mInternalEndpoints.iterator(); i.hasNext();) {
1181: sb.append(i.next().toString());
1182: }
1183: sb.append(" ExternalEndpoints Count: ");
1184: sb.append(mExternalEndpoints.size());
1185: sb.append("\n");
1186: for (Iterator i = mExternalEndpoints.iterator(); i.hasNext();) {
1187: sb.append(i.next().toString());
1188: sb.append("\n");
1189: }
1190: sb.append(" Active MEP's Count: ");
1191: sb.append(mActive.size());
1192: sb.append("\n");
1193: for (Iterator m = mActive.values().iterator(); m.hasNext();) {
1194: MessageExchangeProxy mep = (MessageExchangeProxy) m.next();
1195: sb.append(mep.toString());
1196: sb.append("\n");
1197: }
1198: sb.append(" Queued MEP's Count: ");
1199: sb.append(mQueue.size());
1200: sb.append("\n");
1201: for (Iterator m = mQueue.iterator(); m.hasNext();) {
1202: MessageExchangeProxy mep = (MessageExchangeProxy) m.next();
1203: sb.append(" Exchange Id(");
1204: sb.append(mep.getExchangeId());
1205: sb.append(")\n");
1206: }
1207: return (sb.toString());
1208: }
1209: }
|