001: /*
002: * <copyright>
003: *
004: * Copyright 1997-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.lib.web.axis.distance;
028:
029: import javax.xml.namespace.QName;
030: import javax.xml.rpc.ParameterMode;
031: import java.net.URL;
032: import java.util.ArrayList;
033: import java.util.Collections;
034: import java.util.Enumeration;
035: import java.util.List;
036:
037: import org.cougaar.core.blackboard.IncrementalSubscription;
038: import org.cougaar.core.service.LoggingService;
039: import org.cougaar.core.plugin.ComponentPlugin;
040: import org.cougaar.core.thread.SchedulableStatus;
041: import org.cougaar.util.Arguments;
042: import org.cougaar.util.UnaryPredicate;
043:
044: import org.apache.axis.AxisFault;
045: import org.apache.axis.client.Call;
046: import org.apache.axis.client.Service;
047: import org.apache.axis.client.async.AsyncCall;
048: import org.apache.axis.client.async.IAsyncCallback;
049: import org.apache.axis.client.async.IAsyncResult;
050: import org.apache.axis.client.async.Status;
051: import org.apache.axis.encoding.XMLType;
052: import org.apache.axis.utils.Options;
053: import org.apache.axis.encoding.ser.BeanDeserializerFactory;
054: import org.apache.axis.encoding.ser.BeanSerializerFactory;
055:
056: /**
057: * This component subscribes to {@link DistanceQuery} objects
058: * and sets the "distance" between the two ZIP codes by asking
059: * a remote SOAP server.
060: * <p>
061: * Load with:<pre>
062: * <component
063: * class="org.cougaar.lib.web.axis.distance.DistanceQueryPlugin">
064: * <argument>async=false</argument>
065: * </component>
066: * </pre>
067: * <p>
068: * Change the above to "async=true" to enable asynchronous SOAP
069: * calls, which avoids blocking Cougaar's pooled threads during
070: * the SOAP I/O.
071: * <p>
072: * This example uses the <a href=
073: * "http://webservices.imacination.com/distance/"
074: * >imacination.com</a> "ZIP Distance Calculator" web service.
075: */
076: public class DistanceQueryPlugin extends ComponentPlugin {
077:
078: private static final String DEFAULT_URL = "http://webservices.imacination.com/distance/Distance.jws";
079:
080: // if the plugin lacks an "async=boolean" parameter, use
081: // this default:
082: private static final boolean DEFAULT_ASYNC = true;
083:
084: private static final UnaryPredicate DISTANCE_QUERY_PREDICATE = new DistanceQueryPredicate();
085:
086: private boolean async;
087:
088: // if async, this is the pending result queue:
089: private final List queue = new ArrayList();
090:
091: private LoggingService log;
092:
093: private IncrementalSubscription distSub;
094:
095: public void setLoggingService(LoggingService log) {
096: this .log = log;
097: }
098:
099: protected void setupSubscriptions() {
100: Arguments args = new Arguments(getParameters());
101: async = args.getBoolean("async", DEFAULT_ASYNC);
102:
103: distSub = (IncrementalSubscription) blackboard
104: .subscribe(DISTANCE_QUERY_PREDICATE);
105: }
106:
107: protected void execute() {
108: // handle new distance requests
109: for (Enumeration en = distSub.getAddedList(); en
110: .hasMoreElements();) {
111: DistanceQuery dq = (DistanceQuery) en.nextElement();
112: if (log.isInfoEnabled()) {
113: log.info("Handle added " + dq);
114: }
115: if (async) {
116: // send asynchronous distance lookup, look for an
117: // answer the next time we "execute()"
118: getDistanceAsync(dq);
119: continue;
120: }
121: // do blocking SOAP call, set distance now
122: double distance = getDistance(dq);
123: setDistance(dq, distance);
124: }
125:
126: if (async) {
127: // check for asynchronous callback results
128: DQResult dqr;
129: while ((dqr = nextResult()) != null) {
130: DistanceQuery dq = dqr.getDistanceQuery();
131: double distance = dqr.getDistance();
132: setDistance(dq, distance);
133: }
134: }
135: }
136:
137: private void setDistance(DistanceQuery dq, double distance) {
138: if (log.isInfoEnabled()) {
139: log.info("Setting distance to " + distance + " for " + dq);
140: }
141: dq.setDistance(distance);
142: blackboard.publishChange(dq);
143: }
144:
145: // blocking distance lookup
146: private double getDistance(DistanceQuery dq) {
147: try {
148: Call call = prepareCall();
149: Object[] params = createParams(dq);
150:
151: if (log.isInfoEnabled()) {
152: log.info("Invoking blocking SOAP call");
153: }
154:
155: // invoke, tell the thread service that we'll block
156: Object ret;
157: try {
158: SchedulableStatus.beginNetIO("SOAP call");
159: ret = call.invoke(params);
160: } finally {
161: SchedulableStatus.endBlocking();
162: }
163:
164: if (ret instanceof Double) {
165: return ((Double) ret).doubleValue();
166: }
167: throw new RuntimeException(
168: "Expecting a \"double\" response, not " + ret);
169: } catch (Exception e) {
170: if (log.isErrorEnabled()) {
171: log.error("getDistance(" + dq + ") failed", e);
172: }
173: return -1;
174: }
175: }
176:
177: // non-blocking
178: private void getDistanceAsync(DistanceQuery dq) {
179: try {
180: Call call = prepareCall();
181: Object[] params = createParams(dq);
182:
183: if (log.isInfoEnabled()) {
184: log.info("Submitting asynchronous SOAP call for " + dq);
185: }
186:
187: IAsyncCallback iac = new MyAsyncCallback(dq);
188: AsyncCall ac = new AsyncCall(call, iac);
189: IAsyncResult result = ac.invoke(params);
190:
191: // no result yet, "execute()" later
192: } catch (Exception e) {
193: if (log.isErrorEnabled()) {
194: log.error("getDistance(" + dq + ") failed", e);
195: }
196: setDistance(dq, -1);
197: }
198: }
199:
200: // axis boilerplate.
201: //
202: // There are other ways to do this, e.g.
203: // - ask the service for its WSLD
204: // - create proxy classes
205: // - etc
206: // See the Axis examples for pointers, e.g.
207: // $AXIS_HOME/samples/stock/GetQuote*.java
208: private Call prepareCall() throws Exception {
209: String surl = DEFAULT_URL;
210: URL url = new URL(surl);
211:
212: Service service = new Service();
213:
214: Call call = (Call) service.createCall();
215:
216: call.setTargetEndpointAddress(url);
217: //call.setTimeout(new Integer(however_many_millis));
218: call.setOperationName(new QName("urn:DistanceService",
219: "getDistance"));
220: call.addParameter("fromZip", XMLType.XSD_STRING,
221: ParameterMode.IN);
222: call
223: .addParameter("toZip", XMLType.XSD_STRING,
224: ParameterMode.IN);
225: call.setReturnType(XMLType.XSD_DOUBLE);
226:
227: return call;
228: }
229:
230: private Object[] createParams(DistanceQuery dq) {
231: return new Object[] { dq.getFromZip(), dq.getToZip() };
232: }
233:
234: private DQResult nextResult() {
235: synchronized (queue) {
236: if (queue.isEmpty()) {
237: return null;
238: }
239: // take the head of the queue.
240: // don't bother to optimize with a linked list.
241: return (DQResult) queue.remove(0);
242: }
243: }
244:
245: private static final class DistanceQueryPredicate implements
246: UnaryPredicate {
247: public boolean execute(Object o) {
248: return (o instanceof DistanceQuery);
249: }
250: }
251:
252: private static final class DQResult {
253: private final DistanceQuery dq;
254: private final double distance;
255:
256: public DQResult(DistanceQuery dq, double distance) {
257: this .dq = dq;
258: this .distance = distance;
259: }
260:
261: public DistanceQuery getDistanceQuery() {
262: return dq;
263: }
264:
265: public double getDistance() {
266: return distance;
267: }
268: }
269:
270: private final class MyAsyncCallback implements IAsyncCallback {
271: private final DistanceQuery dq;
272:
273: public MyAsyncCallback(DistanceQuery dq) {
274: this .dq = dq;
275: }
276:
277: public void onCompletion(IAsyncResult result) {
278: if (log.isInfoEnabled()) {
279: log
280: .info("Receiving asynchronous SOAP result for "
281: + dq);
282: }
283: double distance = extractDistance(result);
284: synchronized (queue) {
285: queue.add(new DQResult(dq, distance));
286: }
287: // request "execute()" in the plugin's thread.
288: // We need to do this to do the "publishChange", plus
289: // (in general) to avoid blocking the SOAP callback thread.
290: blackboard.signalClientActivity();
291: }
292:
293: private double extractDistance(IAsyncResult result) {
294: Status status = result.getStatus();
295: if (status == Status.COMPLETED) {
296: Object o = result.getResponse();
297: if (o instanceof Double) {
298: // good!
299: return ((Double) o).doubleValue();
300: }
301: }
302: if (log.isErrorEnabled()) {
303: String s = (status == Status.COMPLETED ? ("unexpected return type: " + result
304: .getResponse())
305: : (status == Status.EXCEPTION ? "exception"
306: : "invalid status: " + status));
307: Throwable t = (status == Status.EXCEPTION ? result
308: .getException() : null);
309: log.error("getDistance(" + dq + ") failed: " + s, t);
310: }
311: return -1;
312: }
313: }
314: }
|