001: /*
002: * <copyright>
003: * Copyright 1999-2004 Cougaar Software, Inc.
004: * under sponsorship of the Defense Advanced Research Projects
005: * Agency (DARPA).
006: *
007: * You can redistribute this software and/or modify it under the
008: * terms of the Cougaar Open Source License as published on the
009: * Cougaar Open Source Website (www.cougaar.org).
010: *
011: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
012: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
013: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
014: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
015: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
016: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
017: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
018: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
019: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
020: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
021: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
022: *
023: * </copyright>
024: */
025:
026: package org.cougaar.lib.web.axis.mts;
027:
028: import java.io.ByteArrayOutputStream;
029: import java.io.FileInputStream;
030: import java.io.IOException;
031: import java.io.ObjectInputStream;
032: import java.io.ObjectOutputStream;
033: import java.lang.reflect.Method;
034: import java.net.InetAddress;
035: import java.net.URI;
036: import java.net.URL;
037: import java.net.UnknownHostException;
038:
039: import javax.activation.DataHandler;
040: import javax.xml.namespace.QName;
041: import javax.xml.rpc.ParameterMode;
042: import javax.xml.rpc.ServiceException;
043:
044: import org.apache.axis.attachments.OctetStream;
045: import org.apache.axis.attachments.OctetStreamDataSource;
046: import org.apache.axis.client.Call;
047: import org.apache.axis.encoding.ser.BeanDeserializerFactory;
048: import org.apache.axis.encoding.ser.BeanSerializerFactory;
049: import org.apache.axis.encoding.ser.JAFDataHandlerDeserializerFactory;
050: import org.apache.axis.encoding.ser.JAFDataHandlerSerializerFactory;
051: import org.cougaar.core.component.ServiceAvailableEvent;
052: import org.cougaar.core.component.ServiceAvailableListener;
053: import org.cougaar.core.component.ServiceBroker;
054: import org.cougaar.core.mts.MessageAddress;
055: import org.cougaar.core.mts.MessageAttributes;
056: import org.cougaar.core.service.LoggingService;
057: import org.cougaar.core.service.ServletService;
058: import org.cougaar.core.service.WebServicesService;
059: import org.cougaar.core.thread.SchedulableStatus;
060: import org.cougaar.mts.base.CommFailureException;
061: import org.cougaar.mts.base.DestinationLink;
062: import org.cougaar.mts.base.LinkProtocol;
063: import org.cougaar.mts.base.MisdeliveredMessageException;
064: import org.cougaar.mts.base.NameLookupException;
065: import org.cougaar.mts.base.RPCLinkProtocol;
066: import org.cougaar.mts.base.UnregisteredNameException;
067: import org.cougaar.mts.std.AttributedMessage;
068:
069: /**
070: * This component is a SOAP-based {@link LinkProtocol} that uses
071: * the {@link WebServicesService} to receive messages and Axis
072: * {@link Call}s to send messages.
073: * <p>
074: * Load with:<pre>
075: * <component
076: * class='org.cougaar.lib.web.axis.mts.SOAPLinkProtocol'
077: * insertionpoint='Node.AgentManager.Agent.MessageTransport.Component'/>
078: * </pre>
079: * <p>
080: * The current code is only partially an "open-messaging" format,
081: * since the SOAP XML contains serialized Java Objects that are
082: * equivalent to the objects passed by the RMI-based LinkProtocol.
083: * <p>
084: * Performance is about 5x slower than RMI. In a two-node localhost
085: * "ping" test sending 2k messages back and forth as fast as possible,
086: * the time to send a message in milliseconds were:
087: * mean=15, stddev=14, min=9, max=122
088: * as compared to RMI:
089: * mean= 3, stddev= 2, min=2, max= 54
090: * This matches Axis performance metrics found in several research
091: * papers, e.g. <a
092: * href="http://www.mathematik.uni-ulm.de/sai/ws03/webserv/PerfWS.pdf"
093: * >Performance of Web Services</a>.
094: */
095: public class SOAPLinkProtocol extends RPCLinkProtocol {
096:
097: /**
098: * Our servlet path registered by the WebServicesService.
099: */
100: private static final String SERVLET_URI = "/axis/services";
101:
102: /**
103: * Maximum byte size for messages that can be sent as SOAPData,
104: * where larger messages must be sent as attachments.
105: * <p>
106: * The SOAP XML parser throws an OutOfMemoryError for large
107: * messages (> ~5mb) sent in the XML as SOAPData. Also,
108: * attachments are more efficient for larger messages, since
109: * the XML parser adds base64 encoding (+33% bloat) and
110: * inefficient string buffering. We could always use attachments,
111: * for smaller messages this seems wasteful, since attachments
112: * are copied to the disk and require file I/O to read them, plus
113: * periodic Axis directly cleanup costs.
114: * <p>
115: * Here we use 64k, which minimal testing has found to be a
116: * pretty good tradeoff.
117: */
118: private static final int BIG_MESSAGE_LENGTH = 1 << 16;
119:
120: /**
121: * Our WSDD to register in the {@link WebServicesService}.
122: * <p>
123: * This is a giant string, but we use ".class.getName()" to make
124: * sure the referenced classnames are correct.
125: */
126: private static final String MT_WSDD = "<deployment name=\"test\" xmlns=\"http://xml.apache.org/axis/wsdd/\" \n"
127: + " xmlns:java=\"http://xml.apache.org/axis/wsdd/providers/java\">\n"
128: + " <service name=\"urn:Cougaar-MTS\" provider=\"java:RPC\">\n"
129: + " <parameter name=\"className\"\n" + " value=\""
130: + SOAPMTHook.class.getName()
131: + "\"/>\n"
132: + " <parameter name=\"allowedMethods\"\n"
133: + " value=\"rerouteMessage rerouteMessageAsAttachment "
134: + "getMessageAddress\"/>\n"
135: + " <parameter name=\"wsdlServicePort\" value=\"SOAPMT\"/>\n"
136: + " <operation name=\"rerouteMessage\"\n"
137: + " returnQName=\"returnqname\" returnType=\"SOAPData\">\n"
138: + " <parameter name=\"small_message\" type=\"SOAPData\"/>\n"
139: + " </operation>\n"
140: + " <operation name=\"rerouteMessageAsAttachment\"\n"
141: + " returnQName=\"returnqname\" returnType=\"SOAPData\">\n"
142: + " <parameter name=\"big_message\" type=\"DataHandler\"/>\n"
143: + " </operation>\n"
144: + " <operation name=\"getMessageAddress\"\n"
145: + " returnQName=\"returnqname\" returnType=\"SOAPData\"/>\n"
146: + " <beanMapping qname=\"myNS:SOAPData\"\n"
147: + " xmlns:myNS=\"urn:BeanService\"\n"
148: + " languageSpecificType=\"java:"
149: + SOAPData.class.getName()
150: + "\"/>\n"
151: + " <typeMapping\n"
152: + " deserializer=\""
153: + JAFDataHandlerDeserializerFactory.class.getName()
154: + "\"\n"
155: + " languageSpecificType=\"java:"
156: + DataHandler.class.getName()
157: + "\"\n"
158: + " qname=\"DataHandler\"\n"
159: + " serializer=\""
160: + JAFDataHandlerSerializerFactory.class.getName()
161: + "\"\n"
162: + " encodingStyle=\"http://schemas.xmlsoap.org/soap/encoding/\"/>\n"
163: + " </service>\n" + "</deployment>";
164:
165: /**
166: * The preferred ServletService API to get the http/https port,
167: * which we obtain through reflection to avoid a "webserver"
168: * module dependency.
169: */
170: private static final String ROOT_SERVLET_SERVICE_CLASS = "org.cougaar.lib.web.service.RootServletService";
171:
172: private LoggingService logger;
173: private WebServicesService webServicesService;
174:
175: private boolean servant_made = false;
176:
177: public void load() {
178: super .load();
179: logger = getLoggingService();
180: if (logger.isDebugEnabled()) {
181: logger.debug("Loading");
182: }
183:
184: // when an agent registers on our node, our RPCLinkProtocol
185: // base class will call "findOrMakeNodeServant()", which
186: // will call "registerWebService()" to deploy our WSDD and
187: // register us to receive messages. Also, we will register
188: // in the WP so other nodes can find us.
189: }
190:
191: /**
192: * If we registered a WSDD, unregister here.
193: */
194: public void unload() {
195: ServiceBroker sb = getServiceBroker();
196: if (webServicesService != null) {
197: sb.releaseService(this , WebServicesService.class,
198: webServicesService);
199: webServicesService = null;
200: }
201: super .unload();
202: }
203:
204: /** @return the naming service "AddressEntry" type */
205: public String getProtocolType() {
206: return "-SOAP";
207: }
208:
209: //
210: // protected methods for SSLSOAPLinkProtocol use:
211: //
212:
213: /** @return HTTP */
214: protected String getProtocol() {
215: return "http";
216: }
217:
218: /** @return servlet path */
219: protected String getPath() {
220: return SERVLET_URI;
221: }
222:
223: /** @return true of the socket is encrypted */
224: protected Boolean usesEncryptedSocket() {
225: return Boolean.FALSE;
226: }
227:
228: /**
229: * @return estimated cost for HTTP-based SOAP, which is simply
230: * hard-coded to be more than the RMI & HTTP LinkProtocols.
231: */
232: protected int computeCost(AttributedMessage message) {
233: return 1500;
234: }
235:
236: /** @return outgoing link to the target address */
237: protected DestinationLink createDestinationLink(MessageAddress addr) {
238: return new SOAPDestinationLink(addr);
239: }
240:
241: /**
242: * A local agent has registered in the MTS, so deploy our WSDD,
243: * advertise ourselves in the WP, and get ready to receive messages.
244: * <p>
245: * If we've already registered then this is a no-op.
246: * <p>
247: * The one snag is that the node may register early on, before
248: * the WebServicesService is available, so we use a
249: * ServiceAvailabiltyListener in case the service isn't available
250: * yet.
251: */
252: protected void findOrMakeNodeServant() {
253: if (logger.isDebugEnabled()) {
254: logger.debug("findOrMakeNodeServant, servant_made="
255: + servant_made);
256: }
257:
258: if (servant_made) {
259: // already created node webservice
260: return;
261: }
262:
263: final ServiceBroker sb = getServiceBroker();
264:
265: // use the servlet service to get our local servlet port
266: int port = -1;
267: Class ssClass;
268: try {
269: ssClass = Class.forName(ROOT_SERVLET_SERVICE_CLASS);
270: } catch (Exception e) {
271: ssClass = ServletService.class;
272: }
273: Object ss = sb.getService(this , ssClass, null);
274: if (ss != null) {
275: // port = ss.get<Protocol>Port();
276: try {
277: String s = getProtocol();
278: s = Character.toUpperCase(s.charAt(0)) + s.substring(1);
279: s = "get" + s + "Port";
280: Method m = ssClass.getMethod(s, null);
281: Object ret = m.invoke(ss, null);
282: port = ((Integer) ret).intValue();
283: } catch (Exception e) {
284: if (logger.isWarnEnabled()) {
285: logger.warn("Unable to get " + getProtocol()
286: + " port", e);
287: }
288: }
289: sb.releaseService(this , ssClass, ss);
290: }
291: if (port < 0) {
292: if (logger.isWarnEnabled()) {
293: logger.warn(getProtocol() + " port is disabled,"
294: + " not registering to receive SOAP messages");
295: }
296: servant_made = true;
297: return;
298: }
299:
300: // get the WebServicesService. If it's not available yet, set a
301: // callback to tell us when it's available.
302: if (sb.hasService(WebServicesService.class)) {
303: registerWebService();
304: } else {
305: sb.addServiceListener(new ServiceAvailableListener() {
306: public void serviceAvailable(ServiceAvailableEvent ae) {
307: Class cl = ae.getService();
308: if (WebServicesService.class.isAssignableFrom(cl)) {
309: sb.removeServiceListener(this );
310: registerWebService();
311: }
312: }
313: });
314: }
315:
316: //
317: // ideally we could return here if we haven't registered
318: // our WSDD yet, but our RPCLinkProtocol needs us to
319: // set our "nodeURI" now to support the WP, so we must
320: // do this now.
321: //
322:
323: // set our node's servlet URI for later binding in the white pages,
324: // so all other nodes can find us
325: MessageAddress node_addr = getNameSupport()
326: .getNodeMessageAddress();
327: String node_name = node_addr.toAddress();
328: URI nodeURI;
329: try {
330: InetAddress me = InetAddress.getLocalHost();
331: nodeURI = new URI(getProtocol() + "://" + me.getHostName()
332: + ':' + port + getPath());
333: setNodeURI(nodeURI);
334: } catch (Exception e) {
335: if (logger.isErrorEnabled()) {
336: logger.error("createURI failed", e);
337: }
338: nodeURI = null;
339: }
340:
341: if (logger.isDebugEnabled()) {
342: logger.debug("Registered in WP with URI: " + nodeURI);
343: }
344:
345: servant_made = true;
346: }
347:
348: /**
349: * Handle IP address change.
350: * <p>
351: * Servlets handle the new-address case automatically, so this is
352: * a no-op.
353: */
354: protected void remakeNodeServant() {
355: }
356:
357: /**
358: * Register our WebService that will handle the messages on the
359: * receiving end.
360: */
361: private void registerWebService() {
362: if (logger.isDebugEnabled()) {
363: logger.debug("registerWebServices");
364: }
365:
366: // get webservices api
367: ServiceBroker sb = getServiceBroker();
368: webServicesService = (WebServicesService) sb.getService(this ,
369: WebServicesService.class, null);
370: if (webServicesService == null) {
371: throw new RuntimeException(
372: "Unable to obtain WebServicesService");
373: }
374:
375: // set a static in the SOAPMTHook, to forward method calls into
376: // this link protocol
377: SOAPMT mt = new SOAPMT() {
378: public SOAPData rerouteMessage(SOAPData small_message)
379: throws Exception {
380: AttributedMessage message = (AttributedMessage) small_message
381: .toObject();
382: Object result = SOAPLinkProtocol.this
383: .receiveMessage(message);
384: return new SOAPData(result);
385: }
386:
387: public SOAPData rerouteMessageAsAttachment(
388: DataHandler big_message) throws Exception {
389: AttributedMessage message = (AttributedMessage) readFromDataHandler(big_message);
390: Object result = SOAPLinkProtocol.this
391: .receiveMessage(message);
392: return new SOAPData(result);
393: }
394:
395: public SOAPData getMessageAddress() {
396: MessageAddress addr = SOAPLinkProtocol.this
397: .getMessageAddress();
398: return new SOAPData(addr);
399: }
400: };
401: SOAPMTHook.setStatic(mt);
402:
403: // register our wsdd
404: if (logger.isDebugEnabled()) {
405: logger.debug("processWSDD(\n" + MT_WSDD + "\n):");
406: }
407: try {
408: webServicesService.processWSDD(MT_WSDD);
409: } catch (Exception e) {
410: throw new RuntimeException("Unable to processWSDD(\n"
411: + MT_WSDD + "\n)", e);
412: }
413:
414: // ready to receive messages
415: }
416:
417: private Object receiveMessage(AttributedMessage message) {
418: Object result;
419: try {
420: // deliver the message by obtaining the
421: // MessageDeliverer from the LinkProtocol
422: result = getDeliverer().deliverMessage(message,
423: message.getTarget());
424: // the result should be MessageAttributes!
425: } catch (MisdeliveredMessageException e) {
426: result = e;
427: } catch (Exception e) {
428: result = new CommFailureException(e);
429: }
430: return result;
431: }
432:
433: private MessageAddress getMessageAddress() {
434: return getNameSupport().getNodeMessageAddress();
435: }
436:
437: private Object readFromDataHandler(DataHandler dh) throws Exception {
438: String filename = dh.getName();
439:
440: // read object from file
441: Object obj = null;
442: FileInputStream fis = new FileInputStream(filename);
443: ObjectInputStream ois = new ObjectInputStream(fis);
444: obj = ois.readObject();
445: ois.close();
446:
447: return obj;
448: }
449:
450: /**
451: * Our per-destination outgoing link, where we make our call.
452: */
453: protected class SOAPDestinationLink extends Link {
454:
455: // our generic SOAP client-sie Service engine instance.
456: //
457: // By saving this instead of creating an instance per "invoke",
458: // we avoid about 5 millis per call, which would be 33% of the
459: // 15 millis messaging delay for the average 2k localhost
460: // message.
461: //
462: // We must specify the full package name to avoid confusion with
463: // our RPCLinkProtocol's inner "Service" interface
464: private org.apache.axis.client.Service service;
465:
466: public SOAPDestinationLink(MessageAddress target) {
467: super (target);
468: }
469:
470: public Class getProtocolClass() {
471: return SOAPLinkProtocol.this .getClass();
472: }
473:
474: /**
475: * We found this URI in the WP, decode it if necessary.
476: * <p>
477: * For us this is the Axis URI, which is fine as-is.
478: */
479: protected Object decodeRemoteRef(URI ref) throws Exception {
480: return (ref == null ? null : ref.toURL());
481: }
482:
483: /**
484: * Send the message to the target Agent's SOAPLinkProtocol
485: * via SOAP.
486: */
487: protected MessageAttributes forwardByProtocol(
488: Object remote_ref, AttributedMessage message)
489: throws NameLookupException, UnregisteredNameException,
490: CommFailureException, MisdeliveredMessageException {
491: try {
492: // loopback:
493: MessageAddress target = message.getTarget();
494: if (getRegistry().isLocalClient(target)) {
495: return getDeliverer().deliverMessage(message,
496: target);
497: }
498: // send remote:
499: Object response = sendMessage((URL) remote_ref, message);
500: if (response instanceof MessageAttributes) {
501: return (MessageAttributes) response;
502: } else if (response instanceof MisdeliveredMessageException) {
503: decache();
504: throw (MisdeliveredMessageException) response;
505: } else {
506: throw new CommFailureException((Exception) response);
507: }
508: } catch (Exception e) {
509: //e.printStackTrace();
510: throw new CommFailureException(e);
511: }
512: }
513:
514: /**
515: * This method streams serialized java objects over SOAP.
516: */
517: private Object sendMessage(URL url, AttributedMessage message)
518: throws IOException, ClassNotFoundException,
519: UnknownHostException {
520: // write object to byte array
521: ByteArrayOutputStream bos = new ByteArrayOutputStream();
522: ObjectOutputStream oos = new ObjectOutputStream(bos);
523: oos.writeObject(message);
524: oos.close();
525: byte[] messageBytes = bos.toByteArray();
526: int messageLength = (messageBytes == null ? 0
527: : messageBytes.length);
528:
529: // choose to send either inlined or as an attachment
530: boolean isBigMessage = (messageLength >= BIG_MESSAGE_LENGTH);
531:
532: if (logger.isDetailEnabled()) {
533: logger.detail("sendMessage(" + url + ", " + message
534: + ") length=" + messageLength);
535: }
536:
537: Object sendObj;
538: if (isBigMessage) {
539: // wrap inside DataHandler to send as attachement
540: sendObj = new DataHandler(new OctetStreamDataSource(
541: "source", new OctetStream(messageBytes)));
542: } else {
543: // send inline as xml encoded binary
544: sendObj = new SOAPData(messageBytes);
545: }
546:
547: if (service == null) {
548: service = new org.apache.axis.client.Service();
549: }
550:
551: Call call;
552: try {
553: call = (Call) service.createCall();
554: } catch (ServiceException se) {
555: throw new RuntimeException(
556: "Unable to create SOAP call", se);
557: }
558:
559: // register type mappings
560: QName dataHandlerQN = (isBigMessage ? (new QName(
561: "urn:Cougaar-MTS", "DataHandler")) : null);
562: QName dataQN = new QName("urn:BeanService", "SOAPData");
563: Class cl = SOAPData.class;
564: QName qn = dataQN;
565: call.registerTypeMapping(cl, qn, new BeanSerializerFactory(
566: cl, qn), new BeanDeserializerFactory(cl, qn));
567: if (isBigMessage) {
568: cl = DataHandler.class;
569: qn = dataHandlerQN;
570: call.registerTypeMapping(cl, qn,
571: JAFDataHandlerSerializerFactory.class,
572: JAFDataHandlerDeserializerFactory.class);
573: }
574: call.setTargetEndpointAddress(url);
575:
576: if (isBigMessage) {
577: call.setOperationName(new QName("urn:Cougaar-MTS",
578: "rerouteMessageAsAttachment"));
579: call.addParameter("big_message", dataHandlerQN,
580: ParameterMode.IN);
581: } else {
582: call.setOperationName(new QName("urn:Cougaar-MTS",
583: "rerouteMessage"));
584: call.addParameter("small_message", dataQN,
585: ParameterMode.IN);
586: }
587: call.setReturnType(dataQN);
588:
589: // invoke
590: Object ret;
591: try {
592: SchedulableStatus.beginNetIO("SOAP call");
593: ret = call.invoke(new Object[] { sendObj });
594: } finally {
595: SchedulableStatus.endBlocking();
596: }
597:
598: if (ret instanceof SOAPData) {
599: // usual case, MTS works
600: Object o = ((SOAPData) ret).toObject();
601: if (o instanceof MessageAttributes) {
602: // good case, should be typical case
603: return (MessageAttributes) o;
604: } else if (o instanceof MisdeliveredMessageException) {
605: // remote MTS exception, rethrow
606: throw (MisdeliveredMessageException) o;
607: } else {
608: throw new IllegalArgumentException(
609: "Invalid data type: "
610: + (o == null ? "null" : o
611: .getClass().getName()));
612: }
613: }
614:
615: // check for Axis/SOAP error
616: if (ret instanceof String) {
617: // SOAP failure, e.g. I/O?? Treat as CommFailure
618: throw new IOException("SOAP failure: " + ret);
619: }
620:
621: // other case? error!
622: throw new RuntimeException("Invalid SOAP return type: "
623: + (ret == null ? "null" : ret.getClass().getName()));
624: }
625: }
626:
627: protected void releaseNodeServant() {
628: }
629: }
|