001: package org.objectweb.celtix.bindings;
002:
003: import java.util.HashMap;
004: import java.util.Map;
005: import java.util.logging.Level;
006: import java.util.logging.Logger;
007:
008: import org.objectweb.celtix.common.logging.LogUtils;
009: import org.objectweb.celtix.context.InputStreamMessageContext;
010: import org.objectweb.celtix.handlers.HandlerInvoker;
011:
012: /**
013: * Class to manage correlation of decoupled responses.
014: */
015: public class ResponseCorrelator implements ResponseCallback {
016:
017: private static final Logger LOG = LogUtils
018: .getL7dLogger(ResponseCorrelator.class);
019:
020: private HandlerInvoker fixedHandlerInvoker;
021: private Map<String, Response> responseMap;
022: private Map<String, HandlerInvoker> relatedRequestMap;
023: private AbstractBindingBase binding;
024:
025: protected ResponseCorrelator(AbstractBindingBase b) {
026: // a fixed snap-shot of the stream and system handler chains
027: // are used, as the incoming (possibly asynchronous) response
028: // cannot yet be corellated with a particular request, hence
029: // may not include any dynamic (i.e. programmatic) changes
030: // made to the handler chains
031: fixedHandlerInvoker = b.createHandlerInvoker();
032: responseMap = new HashMap<String, Response>();
033: relatedRequestMap = new HashMap<String, HandlerInvoker>();
034: binding = b;
035: }
036:
037: /**
038: * Used by the ClientTransport to dispatch decoupled responses.
039: *
040: * @param responseContext context with InputStream containing the
041: * incoming the response
042: */
043:
044: public void dispatch(InputStreamMessageContext responseContext) {
045: assert responseContext != null;
046:
047: Response response = new Response(binding, fixedHandlerInvoker);
048: response.processProtocol(responseContext);
049:
050: synchronized (this ) {
051: String inCorrelation = response.getCorrelationId();
052: if (inCorrelation != null) {
053: HandlerInvoker alternate = relatedRequestMap
054: .remove(inCorrelation);
055: if (alternate == null) {
056: LOG.log(Level.INFO, "response correlation ID: {0}",
057: inCorrelation);
058: responseMap.put(inCorrelation, response);
059: notifyAll();
060: } else {
061: DataBindingCallback callback = BindingContextUtils
062: .retrieveDataBindingCallback(responseContext);
063: if (callback != null) {
064: response.getHandlerInvoker()
065: .adoptLogicalHandlers(alternate);
066: response.processLogical(callback);
067: }
068: }
069: } else {
070: // this is expected for partial responses
071: LOG.info("no correlation ID in incoming message");
072: DataBindingCallback callback = BindingContextUtils
073: .retrieveDataBindingCallback(responseContext);
074: if (callback != null) {
075: response.processLogical(callback);
076: }
077: }
078: }
079: }
080:
081: /**
082: * Wait for a correlated response.
083: *
084: * @param outContext outgoing context containing correlation ID property
085: * @return binding-specific context for the correlated response
086: */
087: public Response getResponse(Request request) {
088:
089: String outCorrelation = request.getCorrelationId();
090: Response response = null;
091: if (outCorrelation != null) {
092: LOG.log(Level.INFO, "request correlation ID: {0}",
093: outCorrelation);
094: synchronized (this ) {
095: response = responseMap.remove(outCorrelation);
096: int count = 0;
097: while (response == null && count < 10) {
098: try {
099: wait();
100: response = responseMap.remove(outCorrelation);
101: if (request.isRelatedRequestExpected()) {
102: relatedRequestMap.put(outCorrelation,
103: request.getHandlerInvoker());
104: }
105: } catch (InterruptedException ie) {
106: // ignore
107: }
108:
109: count++;
110: }
111: }
112: } else {
113: LOG.warning("NO_OUTGOING_CORRELATION_ID_MSG");
114: }
115: return response;
116: }
117: }
|