001: /*
002: * BEGIN_HEADER - DO NOT EDIT
003: *
004: * The contents of this file are subject to the terms
005: * of the Common Development and Distribution License
006: * (the "License"). You may not use this file except
007: * in compliance with the License.
008: *
009: * You can obtain a copy of the license at
010: * https://open-esb.dev.java.net/public/CDDLv1.0.html.
011: * See the License for the specific language governing
012: * permissions and limitations under the License.
013: *
014: * When distributing Covered Code, include this CDDL
015: * HEADER in each file and include the License file at
016: * https://open-esb.dev.java.net/public/CDDLv1.0.html.
017: * If applicable add the following below this CDDL HEADER,
018: * with the fields enclosed by brackets "[]" replaced with
019: * your own identifying information: Portions Copyright
020: * [year] [name of copyright owner]
021: */
022:
023: /*
024: * @(#)NMRProcessor.java
025: * Copyright 2004-2007 Sun Microsystems, Inc. All Rights Reserved.
026: *
027: * END_HEADER - DO NOT EDIT
028: */
029: package com.sun.jbi.binding.proxy;
030:
031: import com.sun.jbi.binding.proxy.connection.ConnectionManager;
032: import com.sun.jbi.binding.proxy.connection.ConnectionManagerFactory;
033: import com.sun.jbi.binding.proxy.connection.ClientConnection;
034: import com.sun.jbi.binding.proxy.connection.EventConnection;
035: import com.sun.jbi.binding.proxy.connection.ServerConnection;
036: import com.sun.jbi.binding.proxy.connection.EventInfo;
037: import com.sun.jbi.binding.proxy.connection.EventInfoFactory;
038:
039: import com.sun.jbi.binding.proxy.util.MEPInputStream;
040: import com.sun.jbi.binding.proxy.util.MEPOutputStream;
041: import com.sun.jbi.binding.proxy.util.Translator;
042:
043: import com.sun.jbi.messaging.DeliveryChannel;
044: import com.sun.jbi.messaging.MessageExchange;
045:
046: import java.io.ByteArrayInputStream;
047: import java.io.ByteArrayOutputStream;
048:
049: import java.util.HashMap;
050: import java.util.Iterator;
051: import java.util.Vector;
052: import java.util.logging.Logger;
053:
054: import javax.jbi.messaging.ExchangeStatus;
055: import javax.jbi.messaging.InOnly;
056: import javax.jbi.messaging.InOut;
057: import javax.jbi.messaging.MessageExchangeFactory;
058: import javax.jbi.messaging.NormalizedMessage;
059:
060: import javax.jbi.servicedesc.ServiceEndpoint;
061:
062: import javax.xml.namespace.QName;
063:
064: import javax.xml.transform.dom.DOMSource;
065:
066: import org.w3c.dom.Document;
067: import org.w3c.dom.DocumentFragment;
068: import org.w3c.dom.NodeList;
069:
070: /**
071: *
072: * Runnable class that implements the NMR receiver. All traffic from the NMR that is associated
073: * with endpoints for which the ProxyBinding is acting as a proxy, are processed here. Also, the
074: * ProxyBinding exposes a handful of services used locally and remotely, and these are handled
075: * here.
076: * The basic flow is:
077: * Decide if message is for us or for a proxy.
078: * If the exchange is for us, process it.
079: * Else, lookup the instance that hosts the service.
080: * Send the exchange to the remote instance that is hosting the service.
081: * Remember the exchange if new, forget the exchange is this is the last send.
082: *
083: * @author Sun Microsystems, Inc
084: */
085: class NMRProcessor implements java.lang.Runnable,
086: com.sun.jbi.messaging.EndpointListener,
087: com.sun.jbi.messaging.TimeoutListener
088:
089: {
090: private Logger mLog;
091: private ProxyBinding mPB;
092: private ConnectionManager mCM;
093: private boolean mRunning;
094: private DeliveryChannel mChannel;
095: private MessageExchangeFactory mFactory;
096: private MEPOutputStream mMOS;
097:
098: /**
099: * Operation names.
100: */
101: static final String ACTIVATE = "Activate";
102: static final String DEACTIVATE = "Deactivate";
103: static final String SERVICE_DESCRIPTION = "ServiceDescription";
104: static final String RESOLVE_ENDPOINT = "ResolveEndpoint";
105: static final String ISEXCHANGEOKAY = "IsExchangeOkay";
106: static final String ISEXCHANGEOKAY2 = "IsExchangeOkay2";
107: static final String TIMEOUT = "Timeout";
108:
109: /**
110: * Property names used in above operations.
111: */
112: static final String EXCHANGEID = "ExchangeId";
113: static final String TIMEOUTCHECK = "TimeoutCheck";
114: static final String CLASSLOADER = "ClassLoader";
115: static final String SERVICENAME = "ServiceName";
116: static final String ENDPOINTNAME = "EndpointName";
117: static final String INSTANCENAME = "InstanceName";
118: static final String EXCHANGE = "Exchange";
119: static final String EXCHANGEOKAY = "ExchangeOkay";
120:
121: int mOperationsReceived;
122: int mOperationsSent;
123:
124: NMRProcessor(ProxyBinding proxyBinding) {
125: mPB = proxyBinding;
126: mLog = mPB.getLogger("nmr");
127: mChannel = mPB.getDeliveryChannel();
128: mFactory = mChannel.createExchangeFactoryForService(mPB
129: .getService().getServiceName());
130: mCM = mPB.getConnectionManager();
131: mRunning = true;
132: mChannel.setEndpointListener(this );
133: mChannel.setTimeoutListener(this );
134: }
135:
136: void stop() {
137: mRunning = false;
138: }
139:
140: public void run() {
141: MessageExchange me = null;
142:
143: //
144: // Set up JMS Queue based on our instance-id for inbound messages.
145: //
146: mLog.info("PB:NMRProcessor starting.");
147: try {
148: mMOS = new MEPOutputStream(mPB);
149: } catch (javax.jbi.messaging.MessagingException mEx) {
150: mLog.info("PB:NMRProcessor Init MessagingException:" + mEx);
151: }
152:
153: for (; mRunning;) {
154: try {
155: ExchangeEntry ee;
156: ServiceEndpoint se;
157:
158: //
159: // Wait for a message exchange.
160: //
161: me = null;
162: me = (MessageExchange) mChannel.accept();
163: if (me == null) {
164: continue;
165: }
166:
167: //
168: // See if the ProxyBinding itself is the target of the request.
169: //
170: if ((se = me.getEndpoint()).getServiceName()
171: .getLocalPart().equals(
172: mPB.SUNPROXYBINDINGSERVICE)) {
173: processProxyServiceRequest(me);
174: } else {
175: //
176: // See if this is a new or known message exchange.
177: // A new message is tracked for future use.
178: //
179: String id = me.getExchangeId();
180:
181: ee = mPB.getExchangeEntry(id);
182: if (ee == null) {
183: ClientConnection cc;
184: String target;
185:
186: target = mPB.getInstanceForEndpoint(se);
187: if (target == null) {
188: throw new javax.jbi.messaging.MessagingException(
189: Translator
190: .translate(
191: LocalStringKeys.NO_ENDPOINT_AVAILABLE,
192: se.toString()));
193: }
194: cc = mCM.getClientConnection(target);
195: ee = mPB.trackExchange(id, me, true);
196: ee.setClientConnection(cc);
197: ee.setState(ExchangeEntry.STATE_ONGOING);
198: } else {
199: ee.updateExchange(me);
200: }
201:
202: //
203: // Send the MEP to its target PB.
204: //
205: writeMEP(ee);
206: }
207: } catch (javax.jbi.messaging.MessagingException mEx) {
208: //
209: // We will get MessagingException from an InterruptedException at shutdown.
210: //
211: if (mRunning) {
212: logExceptionInfo(
213: "PB:NMRProcessor MessagingException.", mEx);
214: if (me != null) {
215: handleError(me, mEx);
216: } else {
217: mPB.stop();
218: }
219: }
220: } catch (Exception e) {
221: logExceptionInfo(
222: "PB:NMRProcessor Unexpected Exception.", e);
223:
224: //
225: // Fail fast in the face of unanticipated problems.
226: //
227: mPB.stop();
228: }
229: }
230: }
231:
232: void handleError(MessageExchange me, Exception e) {
233: String id = me.getExchangeId();
234: ExchangeEntry ee = mPB.getExchangeEntry(id);
235:
236: //
237: // Ignore this message if we know nothing about it. It could be a stale message from a
238: // restart.
239: //
240: if (ee != null) {
241: if (mChannel.activeReference(me)) {
242: //
243: // Send error indication back to the local NMR based component.
244: // This is only possible if the ME is still active (I.E. final send() hasn't happened.)
245: //
246: try {
247: me.setError(e);
248: mChannel.send(me);
249: } catch (javax.jbi.JBIException jEx) {
250:
251: }
252: }
253:
254: //
255: // Send error indication back to remote NMR based component.
256: // This is only possible if we have an ongoing conversation with this ME.
257: //
258: ClientConnection cc = ee.getClientConnection();
259:
260: try {
261: cc.send(new MEPOutputStream(mPB).writeException(id, e));
262: } catch (javax.jbi.messaging.MessagingException mEx) {
263:
264: }
265: }
266: mLog.info("PB:NMRProcessor drop Id(" + id + ") Reason("
267: + e.toString() + ")");
268: mPB.purgeExchange(id);
269: }
270:
271: void logExceptionInfo(String reason, Throwable t) {
272: StringBuffer sb = new StringBuffer();
273: java.io.ByteArrayOutputStream b;
274: Throwable nextT = t;
275:
276: sb.append(reason);
277: while (t != null) {
278: if (nextT != null) {
279: java.io.PrintStream ps = new java.io.PrintStream(
280: b = new java.io.ByteArrayOutputStream());
281: t.printStackTrace(ps);
282: sb.append(" Exception (");
283: sb.append(b.toString());
284: sb.append(") ");
285: }
286: nextT = t.getCause();
287: if (nextT != null) {
288: sb.append("Caused by: ");
289: }
290: t = nextT;
291: }
292:
293: mLog.warning(sb.toString());
294: }
295:
296: void writeMEP(ExchangeEntry ee)
297: throws javax.jbi.messaging.MessagingException {
298: MessageExchange me = ee.getMessageExchange();
299: ClientConnection cc = ee.getClientConnection();
300: String id = me.getExchangeId();
301: byte[] bytes;
302:
303: //
304: // Send message exchange to target.
305: //
306: bytes = mMOS.writeMEP(me, ee);
307: mLog.fine("PB:NMRProcessor send Id(" + id + ") Target("
308: + cc.getInstanceId() + ") Bytes(" + bytes.length + ")");
309: cc.send(bytes);
310:
311: //
312: // Check to see if the Channel has completed all processing
313: // on this message exchange.
314: //
315: if (!mChannel.activeReference(me)) {
316: mLog.fine("PB:NMRProcessor forget Id(" + id + ")");
317: mPB.purgeExchange(id);
318: }
319:
320: }
321:
322: void writeIsMEPOk(ExchangeEntry ee)
323: throws javax.jbi.messaging.MessagingException {
324: MessageExchange me = ee.getMessageExchange();
325: ClientConnection cc = ee.getClientConnection();
326: String id = me.getExchangeId();
327: byte[] bytes;
328:
329: //
330: // Send message exchange to target.
331: //
332: bytes = mMOS.writeIsMEPOk(ee);
333: mLog.fine("PB:NMRProcessor IsMEPOk send Id("
334: + ee.getRelatedExchange().getExchangeId() + ") Target("
335: + cc.getInstanceId() + ") Bytes(" + bytes.length + ")");
336: cc.send(bytes);
337:
338: // //
339: // // Check to see if the Channel has completed all processing
340: // // on this message exchange.
341: // //
342: // if (!mChannel.activeReference(me))
343: // {
344: // mLog.info("PB:NMRProcessor forget Id(" + id + ")");
345: // mPB.purgeExchange(id);
346: // }
347:
348: }
349:
350: void writeMEPOk(ExchangeEntry ee, boolean okay)
351: throws javax.jbi.messaging.MessagingException {
352: MessageExchange me = ee.getMessageExchange();
353: ClientConnection cc = ee.getClientConnection();
354: String id = me.getExchangeId();
355: byte[] bytes;
356:
357: //
358: // Send message exchange to target.
359: //
360: bytes = mMOS.writeMEPOk(id, okay);
361: mLog.fine("PB:NMRProcessor MEPOk send Id(" + id + ") Target("
362: + cc.getInstanceId() + ") Bytes(" + bytes.length + ")");
363: cc.send(bytes);
364: if (!okay) {
365: mLog.fine("PB:NMRProcessor writeMEPOk forget Id(" + id
366: + ")");
367: mPB.purgeExchange(id);
368: }
369: }
370:
371: //
372: // ------------------- Methods that deal with ProxyBinding services ------------------
373: //
374:
375: /**
376: * Handle any operations that are targeted at the service exposed by the ProxyBinding.
377: * @param me containing the messageExchange that should be handled.
378: */
379: void processProxyServiceRequest(MessageExchange me) {
380: String operation = me.getOperation().getLocalPart();
381:
382: mLog.fine("PB:ProxyBinding Request Operation(" + operation
383: + ")");
384: mOperationsReceived++;
385:
386: //
387: // Determine action needed.
388: //
389: if (operation.equals(ACTIVATE)) {
390: doActivate(me);
391: } else if (operation.equals(DEACTIVATE)) {
392: doDeactivate(me);
393: } else if (operation.equals(TIMEOUT)) {
394: doTimeout(me);
395: } else if (operation.equals(SERVICE_DESCRIPTION)) {
396: doServiceDescription(me);
397: } else if (operation.equals(RESOLVE_ENDPOINT)) {
398: doResolveEndpoint(me);
399: } else if (operation.equals(ISEXCHANGEOKAY)) {
400: doIsExchangeOkay(me);
401: } else if (operation.equals(ISEXCHANGEOKAY2)) {
402: doIsExchangeOkay2(me);
403: } else {
404: try {
405: mLog.fine("PB:ProxyBinding Unknown Operation("
406: + operation + ")");
407: {
408: me.setStatus(ExchangeStatus.ERROR);
409: mChannel.send(me);
410: }
411: } catch (javax.jbi.messaging.MessagingException mEx) {
412:
413: }
414: }
415: }
416:
417: void doActivate(MessageExchange me) {
418: try {
419: RegistrationInfo ri = mPB.addLocalEndpoint(
420: (ServiceEndpoint) me.getProperty(ACTIVATE),
421: (ClassLoader) me.getProperty(CLASSLOADER));
422: me.setStatus(ExchangeStatus.DONE);
423: mChannel.send(me);
424: // register a local stats MBean for service stats
425: mPB.registerServiceMBean(ri.getServiceName().toString());
426: } catch (javax.jbi.JBIException jEx) {
427: try {
428: me.setStatus(ExchangeStatus.ERROR);
429: mChannel.send(me);
430: } catch (javax.jbi.messaging.MessagingException mEx) {
431:
432: }
433: }
434: }
435:
436: void doDeactivate(MessageExchange me) {
437: try {
438: ServiceEndpoint er = (ServiceEndpoint) me
439: .getProperty(DEACTIVATE);
440: RegistrationInfo ri = mPB.removeLocalEndpoint(er);
441: me.setStatus(ExchangeStatus.DONE);
442: mChannel.send(me);
443:
444: // unregister the local stats MBean for service stats
445: mPB.unregisterServiceMBean(ri.getServiceName().toString());
446: } catch (javax.jbi.JBIException jEx) {
447: try {
448: me.setStatus(ExchangeStatus.ERROR);
449: mChannel.send(me);
450: } catch (javax.jbi.messaging.MessagingException mEx) {
451:
452: }
453: }
454: }
455:
456: void doTimeout(MessageExchange me) {
457: String id = (String) me.getProperty(EXCHANGEID);
458: ExchangeEntry ee = mPB.getExchangeEntry(id);
459:
460: try {
461: if (me.isRemoteInvocation()) {
462: if (me.getRole().equals(MessageExchange.Role.PROVIDER)) {
463: MessageExchange oldme;
464: boolean timedout;
465:
466: mLog.fine("PB:Timeout check Id(" + id + ")");
467: if (ee != null) {
468: oldme = ee.getMessageExchange();
469: if (oldme != null) {
470: me.setProperty(TIMEOUTCHECK, new Boolean(
471: timedout = oldme.checkTimeout()));
472: if (timedout) {
473: //
474: // Cleanup any ExchangeEntry for the message that has timed out.
475: //
476: mLog.fine("PB:Timeout terminate Id("
477: + id + ")");
478: oldme.terminate();
479: mPB
480: .purgeExchange(oldme
481: .getExchangeId());
482: }
483: }
484: }
485: me.setStatus(ExchangeStatus.DONE);
486: mChannel.send(me);
487: } else {
488: ee = mPB.getExchangeEntry(me.getExchangeId());
489: if (ee != null) {
490: mLog.fine("PB:Timeout remote Id("
491: + id
492: + ") Target("
493: + ee.getClientConnection()
494: .getInstanceId() + ")");
495: writeMEP(ee);
496: }
497: }
498: } else {
499: ExchangeEntry tee;
500:
501: //
502: // We use the instance determined by the exchange information. This allows the
503: // timeout message to be sent to the same instance as the original request
504: // instead of being subject to a load-balancing decision.
505: //
506: tee = mPB.trackExchange(me.getExchangeId(), me, true);
507: tee.setClientConnection(ee.getClientConnection());
508: tee.setState(ExchangeEntry.STATE_ONGOING);
509: mLog.fine("PB:Timeout local Id(" + id + ") Target("
510: + ee.getClientConnection().getInstanceId()
511: + ")");
512: writeMEP(tee);
513: }
514: } catch (javax.jbi.JBIException jEx) {
515: try {
516: mLog.fine("PB:Timeout Id(" + id + ") Exception(" + jEx
517: + ")");
518: me.setStatus(ExchangeStatus.ERROR);
519: mChannel.send(me);
520: } catch (javax.jbi.messaging.MessagingException mEx) {
521:
522: }
523: }
524: }
525:
526: void doServiceDescription(MessageExchange me) {
527: try {
528: QName service = (QName) me.getProperty(SERVICENAME);
529: String endpoint = (String) me.getProperty(ENDPOINTNAME);
530: ServiceEndpoint se = ((com.sun.jbi.messaging.DeliveryChannel) mChannel)
531: .createEndpoint(service, endpoint);
532:
533: if (me.isRemoteInvocation()) {
534: if (me.getRole().equals(MessageExchange.Role.PROVIDER)) {
535: //
536: // This is an InOut request. We should get a DONE/ERROR back when the request is
537: // acknowledged. There is nothing to do with the acknowledgement so just return
538: // (and swallow the message.)
539: //
540: if (me.getStatus().equals(ExchangeStatus.ACTIVE)) {
541: Document document;
542: NormalizedMessage nm;
543:
544: document = mPB.getComponentContext()
545: .getEndpointDescriptor(se);
546: nm = me.createMessage();
547: nm.setContent(new DOMSource(document));
548: ((InOut) me).setOutMessage(nm);
549: mChannel.send(me);
550: }
551: } else {
552: ExchangeEntry ee = mPB.getExchangeEntry(me
553: .getExchangeId());
554: if (ee != null) {
555: mLog.fine("PB:ServiceDescription Target("
556: + ee.getClientConnection()
557: .getInstanceId() + ")");
558: writeMEP(ee);
559: }
560: }
561: } else {
562: ExchangeEntry ee = mPB.getExchangeEntry(me
563: .getExchangeId());
564:
565: //
566: // Select the Instance based on the Service+Endpoint information.
567: // Note: This ME is InOut so we have to correctly handle the DONE message
568: // that most go back to the provoder.
569: //
570: if (ee == null) {
571: ee = mPB
572: .trackExchange(me.getExchangeId(), me, true);
573: ee.setClientConnection(mCM.getClientConnection(mPB
574: .getInstanceForEndpoint(se)));
575: ee.setState(ExchangeEntry.STATE_ONGOING);
576: }
577: mLog.fine("PB:ServiceDescription Target("
578: + ee.getClientConnection().getInstanceId()
579: + ")");
580: writeMEP(ee);
581: }
582: } catch (javax.jbi.JBIException jEx) {
583: try {
584: mLog.fine("PB:ServiceDescription Exception(" + jEx
585: + ")");
586: me.setStatus(ExchangeStatus.ERROR);
587: mChannel.send(me);
588: } catch (javax.jbi.messaging.MessagingException mEx) {
589:
590: }
591: }
592: }
593:
594: void doResolveEndpoint(MessageExchange me) {
595: try {
596: if (me.isRemoteInvocation()) {
597: if (me.getRole().equals(MessageExchange.Role.PROVIDER)) {
598: Document document = (Document) ((DOMSource) ((InOnly) me)
599: .getInMessage().getContent()).getNode();
600: ServiceEndpoint se;
601: DocumentFragment df = document
602: .createDocumentFragment();
603: NodeList nodes = document.getChildNodes();
604:
605: for (int i = 0; i < nodes.getLength();) {
606: df.appendChild(nodes.item(i));
607: }
608: se = mPB.getComponentContext()
609: .resolveEndpointReference(df);
610: me.setProperty(SERVICENAME, se.getServiceName());
611: me.setProperty(ENDPOINTNAME, se.getEndpointName());
612: me.setStatus(ExchangeStatus.DONE);
613: mChannel.send(me);
614: } else {
615: ExchangeEntry ee = mPB.getExchangeEntry(me
616: .getExchangeId());
617: if (ee != null) {
618: mLog.fine("PB:ResolveEndpoint Target("
619: + ee.getClientConnection()
620: .getInstanceId() + ")");
621: writeMEP(ee);
622: }
623: }
624: } else {
625: ExchangeEntry ee;
626:
627: ee = mPB.trackExchange(me.getExchangeId(), me, true);
628: ee.setClientConnection(mCM
629: .getClientConnection((String) me
630: .getProperty(INSTANCENAME)));
631: ee.setState(ExchangeEntry.STATE_ONGOING);
632: mLog.fine("PB:ResolveEndpoint Target("
633: + ee.getClientConnection().getInstanceId()
634: + ")");
635: writeMEP(ee);
636: }
637: } catch (javax.jbi.JBIException jEx) {
638: try {
639: mLog.fine("PB:ResolveEndpoint Exception(" + jEx + ")");
640: me.setStatus(ExchangeStatus.ERROR);
641: mChannel.send(me);
642: } catch (javax.jbi.messaging.MessagingException mEx) {
643:
644: }
645: }
646: }
647:
648: void doIsExchangeOkay(MessageExchange me) {
649: try {
650: MessageExchange rme = (MessageExchange) me
651: .getProperty(EXCHANGE);
652: ExchangeEntry ee;
653: ClientConnection cc;
654:
655: cc = mCM.getClientConnection((String) me
656: .getProperty(INSTANCENAME));
657:
658: ee = mPB.trackExchange(rme.getExchangeId(), rme, true);
659: ee.setRelatedExchange(me);
660: ee.setClientConnection(cc);
661: ee.setState(ExchangeEntry.STATE_ONGOING);
662: mLog.fine("PB:IsExchangeOkay Target("
663: + ee.getClientConnection().getInstanceId() + ")");
664: writeIsMEPOk(ee);
665: } catch (javax.jbi.JBIException jEx) {
666: }
667: }
668:
669: void doIsExchangeOkay2(MessageExchange me) {
670: try {
671: if (me.getStatus().equals(ExchangeStatus.ACTIVE)) {
672: MessageExchange rme = (MessageExchange) me
673: .getProperty(EXCHANGE);
674: ExchangeEntry ee = mPB.getExchangeEntry(rme
675: .getExchangeId());
676:
677: writeMEPOk(ee, mChannel.isExchangeOkay(rme));
678: me.setStatus(ExchangeStatus.DONE);
679: mChannel.send(me);
680: }
681: } catch (javax.jbi.JBIException jEx) {
682: try {
683: mLog.fine("PB:IsExchangeOkay2 Exception(" + jEx + ")");
684: me.setStatus(ExchangeStatus.ERROR);
685: mChannel.send(me);
686: } catch (javax.jbi.messaging.MessagingException mEx) {
687:
688: }
689: }
690: }
691:
692: //
693: // ------------------- Methods defined in com.sun.jbi.messaging.EndpointListener ------------------
694: //
695:
696: /**
697: * An endpoint is being activated on the given channel. We process this by sending a
698: * InOnly message exchange to the ProxyBinding (remember we are running in the context of a
699: * component that is activating an endpoint) with the endpoint information.
700: * @param channel that is activating an endpoint.
701: * @param endpoint that is being activated.
702: */
703: public void activate(DeliveryChannel channel,
704: ServiceEndpoint endpoint) {
705: InOnly io;
706:
707: try {
708: io = mFactory.createInOnlyExchange();
709: io.setOperation(new QName(ACTIVATE));
710: io.setProperty(ACTIVATE, endpoint);
711: io.setProperty(CLASSLOADER, channel.getClassLoader());
712: mOperationsSent++;
713: channel.sendSync(io);
714: if (!io.getStatus().equals(ExchangeStatus.DONE)) {
715: mLog.fine("PB:ProxyBinding activate failure");
716: }
717: } catch (javax.jbi.messaging.MessagingException mEx) {
718:
719: }
720: }
721:
722: /**
723: * An endpoint is being deactivated on the given channel. We process this by sending a
724: * InOnly message exchange to the ProxyBinding (remember we are running in the context of a
725: * component that is activating an endpoint) with the endpoint information.
726: * @param channel that is deactivating an endpoint.
727: * @param endpoint that is being deactivated.
728: */
729: public void deactivate(DeliveryChannel channel,
730: ServiceEndpoint endpoint) {
731: InOnly io;
732:
733: try {
734: io = mFactory.createInOnlyExchange();
735: io.setOperation(new QName(DEACTIVATE));
736: io.setProperty(DEACTIVATE, endpoint);
737: mOperationsSent++;
738: channel.sendSync(io);
739: if (!io.getStatus().equals(ExchangeStatus.DONE)) {
740: mLog.fine("PB:ProxyBinding deactivate failure");
741: }
742: } catch (javax.jbi.messaging.MessagingException mEx) {
743:
744: }
745: }
746:
747: //
748: // ---------------- Methods defined in com.sun.jbi.messaging.TimeoutListener --------------
749: //
750:
751: /**
752: * A message exchange has timed out on a sendSynch operation. We use an InOnly message exchange
753: * to convey the request to the ProxyBinding (remember we are running in the context of a
754: * component that is waiting in sendSynch().)
755: * @param channel that is performing a timeout on an endpoint.
756: * @param endpoint that is being timed out.
757: */
758: public boolean checkTimeout(DeliveryChannel channel,
759: javax.jbi.messaging.MessageExchange exchange) {
760: InOnly io;
761: boolean timedout = false;
762:
763: try {
764: io = mFactory.createInOnlyExchange();
765: io.setOperation(new QName(TIMEOUT));
766: io.setProperty(EXCHANGEID, exchange.getExchangeId());
767: mOperationsSent++;
768: channel.sendSync(io);
769: if (io.getStatus().equals(ExchangeStatus.DONE)) {
770: timedout = ((Boolean) io.getProperty(TIMEOUTCHECK))
771: .booleanValue();
772: if (timedout) {
773: ((com.sun.jbi.messaging.MessageExchange) exchange)
774: .terminate();
775: mPB.purgeExchange(exchange.getExchangeId());
776: }
777: } else {
778: mLog.fine("PB:ProxyBinding timeout failure");
779: }
780: } catch (javax.jbi.messaging.MessagingException mEx) {
781:
782: }
783:
784: return (timedout);
785: }
786:
787: /**
788: * A service description has been requested for a remote service. We process this by sending a
789: * InOnly message exchange to the ProxyBinding (remember we are running in the context of a
790: * component that asking for the service description) with the endpoint information.
791: * @param endpoint that is being queried for its description
792: */
793: public Document getDescription(ServiceEndpoint endpoint) {
794: InOut io;
795: Document document = null;
796: try {
797: io = mFactory.createInOutExchange();
798: io.setOperation(new QName(SERVICE_DESCRIPTION));
799: io.setProperty(SERVICENAME, endpoint.getServiceName());
800: io.setProperty(ENDPOINTNAME, endpoint.getEndpointName());
801: mOperationsSent++;
802: mChannel.sendSync(io);
803: if (!io.getStatus().equals(ExchangeStatus.ACTIVE)) {
804: mLog.fine("PB:ProxyBinding serviceDescription failure");
805: } else {
806: document = (Document) ((DOMSource) ((InOut) io)
807: .getOutMessage().getContent()).getNode();
808: io.setStatus(ExchangeStatus.DONE);
809: mChannel.send(io);
810: }
811: } catch (javax.jbi.messaging.MessagingException mEx) {
812:
813: }
814:
815: return (document);
816: }
817:
818: /**
819: * An endpoint reference needs to be resolved. We send this to each known instance and wait for an
820: * answer. We process this by sending a InOnly message exchange to the remote ProxyBinding
821: * (remember we are running in the context of a component that is asking for the endpoint resolution)
822: * with the endpoint information.
823: * @param endpoint that is being queried for its description
824: */
825: ServiceEndpoint resolveEndpointReference(DocumentFragment document) {
826: InOnly io;
827: ServiceEndpoint se = null;
828: HashMap instances = mPB.getInstances();
829:
830: for (Iterator i = instances.keySet().iterator(); i.hasNext();) {
831: String instance = (String) i.next();
832:
833: //
834: // Ignore ourselves...
835: //
836: if (instance.equals(mPB.getInstanceId())) {
837: continue;
838: }
839:
840: try {
841: NormalizedMessage nm;
842: io = mFactory.createInOnlyExchange();
843: nm = io.createMessage();
844: nm.setContent(new DOMSource(document));
845: io.setInMessage(nm);
846: io.setOperation(new QName(RESOLVE_ENDPOINT));
847: io.setProperty(INSTANCENAME, instance);
848: mOperationsSent++;
849: mChannel.sendSync(io);
850: if (!io.getStatus().equals(ExchangeStatus.DONE)) {
851: mLog
852: .fine("PB:ProxyBinding resolveEndpoint failure");
853: } else {
854: if (io.getProperty(SERVICENAME) != null) {
855: se = mChannel.createEndpoint((QName) io
856: .getProperty(SERVICENAME), (String) io
857: .getProperty(ENDPOINTNAME));
858: }
859: }
860: } catch (javax.jbi.messaging.MessagingException mEx) {
861:
862: }
863: if (se != null) {
864: break;
865: }
866: }
867:
868: return (se);
869: }
870:
871: /**
872: * A provider of a service needs to be queried about the suitability of the consumer request.
873: * We process this by sending a InOnly message exchange to the remote ProxyBinding
874: * (remember we are running in the context of a component that is attempting to perform a send to the
875: * provider)
876: * @param endpoint that is being queried for its description
877: */
878: boolean isExchangeOkay(ServiceEndpoint endpoint,
879: javax.jbi.messaging.MessageExchange exchange) {
880: InOnly io;
881: ServiceEndpoint se = null;
882: HashMap instances = mPB.getInstances();
883: Boolean okay;
884:
885: for (Iterator i = instances.keySet().iterator(); i.hasNext();) {
886: String instance = (String) i.next();
887:
888: //
889: // Ignore ourselves...
890: //
891: if (instance.equals(mPB.getInstanceId())) {
892: continue;
893: }
894:
895: try {
896: io = mFactory.createInOnlyExchange();
897: io.setOperation(new QName(ISEXCHANGEOKAY));
898: io.setProperty(INSTANCENAME, instance);
899: io.setProperty(EXCHANGE, exchange);
900: mOperationsSent++;
901: mChannel.sendSync(io);
902: if (!io.getStatus().equals(ExchangeStatus.DONE)) {
903: mLog
904: .fine("PB:ProxyBinding resolveEndpoint failure");
905: } else {
906: if ((okay = (Boolean) io.getProperty(EXCHANGEOKAY)) != null) {
907: return (okay.booleanValue());
908: }
909: }
910: } catch (javax.jbi.messaging.MessagingException mEx) {
911:
912: }
913: }
914:
915: return (false);
916: }
917:
918: }
|