001: /*
002: * BEGIN_HEADER - DO NOT EDIT
003: *
004: * The contents of this file are subject to the terms
005: * of the Common Development and Distribution License
006: * (the "License"). You may not use this file except
007: * in compliance with the License.
008: *
009: * You can obtain a copy of the license at
010: * https://open-esb.dev.java.net/public/CDDLv1.0.html.
011: * See the License for the specific language governing
012: * permissions and limitations under the License.
013: *
014: * When distributing Covered Code, include this CDDL
015: * HEADER in each file and include the License file at
016: * https://open-esb.dev.java.net/public/CDDLv1.0.html.
017: * If applicable add the following below this CDDL HEADER,
018: * with the fields enclosed by brackets "[]" replaced with
019: * your own identifying information: Portions Copyright
020: * [year] [name of copyright owner]
021: */
022:
023: /*
024: * @(#)RemoteProcessor.java
025: * Copyright 2004-2007 Sun Microsystems, Inc. All Rights Reserved.
026: *
027: * END_HEADER - DO NOT EDIT
028: */
029: package com.sun.jbi.binding.proxy;
030:
031: import com.sun.jbi.binding.proxy.connection.ConnectionManager;
032: import com.sun.jbi.binding.proxy.connection.ConnectionManagerFactory;
033: import com.sun.jbi.binding.proxy.connection.ClientConnection;
034: import com.sun.jbi.binding.proxy.connection.EventConnection;
035: import com.sun.jbi.binding.proxy.connection.ServerConnection;
036: import com.sun.jbi.binding.proxy.connection.EventInfo;
037: import com.sun.jbi.binding.proxy.connection.EventInfoFactory;
038:
039: import com.sun.jbi.binding.proxy.util.MEPInputStream;
040: import com.sun.jbi.binding.proxy.util.MEPOutputStream;
041: import com.sun.jbi.binding.proxy.util.Translator;
042:
043: import com.sun.jbi.messaging.DeliveryChannel;
044: import com.sun.jbi.messaging.EndpointListener;
045: import com.sun.jbi.messaging.ExchangeFactory;
046: import com.sun.jbi.messaging.MessageExchange;
047:
048: import java.io.ByteArrayInputStream;
049: import java.io.ByteArrayOutputStream;
050:
051: import java.util.HashMap;
052: import java.util.Vector;
053: import java.util.logging.Logger;
054:
055: import javax.jbi.messaging.ExchangeStatus;
056: import javax.jbi.messaging.InOnly;
057: import javax.jbi.messaging.MessageExchangeFactory;
058: import javax.jbi.messaging.NormalizedMessage;
059:
060: import javax.jbi.servicedesc.ServiceEndpoint;
061:
062: import javax.xml.namespace.QName;
063:
064: /**
065: * Runnable class that implements the Remote receiver. All traffic from any remote instance is
066: * first handled here. The remote traffic is either directed at this ProxyBinding instance or at
067: * an endpoint for a local component.
068: * The basic flow is:
069: * Decide if message is for us or for local compoent.
070: * If the exchange is for us, process it.
071: * Else, lookup the local compoent that hosts the service.
072: * Send the to the NMR for local routing
073: * Remember the exchange if new, forget the exchange is this is the last send.
074: *
075: * @author Sun Microsystems, Inc
076: */
077: class RemoteProcessor implements java.lang.Runnable {
078: private Logger mLog;
079: private ProxyBinding mPB;
080: private ConnectionManager mCM;
081: private boolean mRunning;
082: private DeliveryChannel mChannel;
083: private MEPInputStream mMIS;
084: private MessageExchangeFactory mFactory;
085:
086: RemoteProcessor(ProxyBinding proxyBinding) {
087: mPB = proxyBinding;
088: mLog = mPB.getLogger("remote");
089: mChannel = mPB.getDeliveryChannel();
090: mCM = mPB.getConnectionManager();
091: mRunning = true;
092: mFactory = mPB.getDeliveryChannel().createExchangeFactory();
093: }
094:
095: void stop() {
096: mRunning = false;
097: }
098:
099: public void run() {
100: ServerConnection sc;
101:
102: //
103: // Set up JMS Queue based on our instance-id for inbound messages.
104: //
105: try {
106: sc = mCM.getServerConnection();
107: mLog.info("PB:RemoteProcessor starting as ("
108: + sc.getInstanceId() + ")");
109: mMIS = new MEPInputStream(mPB);
110: } catch (javax.jbi.messaging.MessagingException mEx) {
111: mLog.warning("PB:RemoteProcessor Init MessagingException:"
112: + mEx);
113: return;
114: } catch (Exception ex) {
115: mLog.warning("PB:RemoteProcessor Init Exception:" + ex);
116: return;
117: }
118:
119: //
120: // Basic processing loop.
121: //
122: for (; mRunning;) {
123: try {
124: ClientConnection cc = null;
125: int msgType;
126:
127: //
128: // Accept a new client connection.
129: //
130: cc = sc.accept();
131: if (cc == null) {
132: continue;
133: }
134:
135: //
136: // Read the message from the connection.
137: //
138: msgType = mMIS.readMessage(cc);
139:
140: //
141: // Process based on type of message.
142: //
143: if (msgType == MEPInputStream.TYPE_MEP) {
144: doMEP();
145: } else if (msgType == MEPInputStream.TYPE_EXCEPTION) {
146: doException();
147: } else if (msgType == MEPInputStream.TYPE_ISMEPOK) {
148: doISMEPOk();
149: } else if (msgType == MEPInputStream.TYPE_MEPOK) {
150: doMEPOK();
151: } else {
152: }
153:
154: } catch (javax.jbi.messaging.MessagingException mEx) {
155: if (mRunning) {
156: logExceptionInfo(
157: "PB:RemoteProcessor MessagingException.",
158: mEx);
159: handleError(mEx);
160: }
161: } catch (Exception ex) {
162: logExceptionInfo(
163: "PB:RemoteProcessor Unexpected Exception.", ex);
164:
165: //
166: // Fail fast in the face of unanticipated problems.
167: //
168: mPB.stop();
169: }
170: }
171: }
172:
173: void doException() {
174: mLog.info("PB:RemoteProcessor Exception Id("
175: + mMIS.getExchangeId() + ")");
176: handleError(mMIS.getException());
177: }
178:
179: void doISMEPOk() {
180: ExchangeEntry ee = mMIS.getExchangeEntry();
181: MessageExchange me = ee.getMessageExchange();
182: InOnly io;
183:
184: mLog.fine("PB:RemoteProcessor IsMEPOk Id(" + me.getExchangeId()
185: + ") From (" + ee.getClientConnection().getInstanceId()
186: + ") Bytes(" + ee.getBytesReceived() + ")");
187: try {
188: io = mFactory.createInOnlyExchange();
189: io.setService(mPB.getService().getServiceName());
190: io.setOperation(new QName(NMRProcessor.ISEXCHANGEOKAY2));
191: io.setProperty(NMRProcessor.EXCHANGE, me);
192: mChannel.send(io);
193: } catch (javax.jbi.messaging.MessagingException mEx) {
194: mLog
195: .info("PB:RemoteProcessor doISMEPOk MessagingException:"
196: + mEx);
197: }
198: }
199:
200: void doMEPOK() {
201: String id = mMIS.getExchangeId();
202: ExchangeEntry ee = mPB.getExchangeEntry(id);
203: MessageExchange me = ee.getRelatedExchange();
204: boolean ok = mMIS.getMEPOk();
205:
206: //
207: // Forward MEP OK status to requestor.
208: //
209: mLog.fine("PB:RemoteProcessor MEPOk Id(" + id + ") From ("
210: + ee.getClientConnection().getInstanceId() + ") Bytes("
211: + ee.getBytesReceived() + ")");
212: try {
213: me.setProperty(NMRProcessor.EXCHANGEOKAY, Boolean
214: .valueOf(ok));
215: me.setStatus(ExchangeStatus.DONE);
216: mChannel.send(me);
217: } catch (javax.jbi.messaging.MessagingException mEx) {
218: mLog.info("PB:RemoteProcessor doMEPOk MessagingException:"
219: + mEx);
220: }
221:
222: //
223: // If the exchange is going to fail, we need to cleanup allocated resources.
224: //
225: if (!ok) {
226: mPB.purgeExchange(id);
227: }
228: }
229:
230: void doMEP() throws javax.jbi.messaging.MessagingException {
231: ExchangeEntry ee = mMIS.getExchangeEntry();
232: MessageExchange me = ee.getMessageExchange();
233: ClientConnection cc = ee.getClientConnection();
234: String id = me.getExchangeId();
235: long byteCount = ee.getBytesReceived();
236:
237: //
238: // Perform any first time processing on the exchange.
239: //
240: if (ee.checkState(ExchangeEntry.STATE_FIRST)) {
241: mLog.fine("PB:RemoteProcessor exchange Id(" + id
242: + ") From (" + cc.getInstanceId() + ") Bytes("
243: + byteCount + ")");
244: if (me.getEndpoint() == null) {
245: throw new javax.jbi.messaging.MessagingException(
246: Translator.translate(
247: LocalStringKeys.NO_ENDPOINT_AVAILABLE,
248: ee.getService()));
249: }
250: ee.setState(ExchangeEntry.STATE_ONGOING);
251: } else {
252: mLog.fine("PB:RemoteProcessor message Id(" + id
253: + ") From (" + cc.getInstanceId() + ") Bytes("
254: + byteCount + ")");
255: }
256:
257: //
258: // Send it on its merry way.
259: //
260: mLog.fine("PB:RemoteProcessor send Id(" + id + ")");
261: mChannel.send(me);
262:
263: //
264: // Check to see if the Channel has completed all processing
265: // on this message exchange.
266: //
267: if (!mChannel.activeReference(me)) {
268: mLog.fine("PB:RemoteProcessor forget Id(" + id + ")");
269: mPB.purgeExchange(id);
270: }
271:
272: }
273:
274: void handleError(Exception e) {
275: ExchangeEntry ee;
276: ClientConnection cc;
277: String id;
278:
279: id = mMIS.getExchangeId();
280: ee = mPB.getExchangeEntry(id);
281:
282: //
283: // Ignore this message if we don't know anything about it. Could be a stale message from
284: // the most recent restart.
285: //
286: if (ee != null) {
287: cc = ee.getClientConnection();
288: if (cc != null) {
289: //
290: // Send error indication back to the remote NMR based component.
291: //
292: try {
293: mLog
294: .info("PB:RemoteProcessor forward exception Id("
295: + id
296: + ") Reason("
297: + e.toString()
298: + ")");
299: cc.send(new MEPOutputStream(mPB).writeException(id,
300: e));
301: } catch (javax.jbi.JBIException jEx) {
302: //
303: // Ignore problems sending.
304: //
305: }
306: }
307:
308: //
309: // Send error indication back to local NMR based component.
310: // This is only possible if we have an ongoing conversation with this ME.
311: //
312: try {
313: MessageExchange me = ee.getMessageExchange();
314:
315: mLog.warning("PB:RemoteProcessor set exception Id("
316: + id + ") Reason(" + e.toString() + ")");
317: if (me != null) {
318: me.setError(e);
319: mChannel.send(me);
320: }
321: } catch (javax.jbi.JBIException jEx) {
322: //
323: // Ignore problems sending.
324: //
325: }
326: }
327: mLog.info("PB:RemoteProcessor dropping exchange Id(" + id
328: + ") Reason(" + e.toString() + ")");
329: mPB.purgeExchange(id);
330: }
331:
332: void logExceptionInfo(String reason, Throwable t) {
333: StringBuffer sb = new StringBuffer();
334: java.io.ByteArrayOutputStream b;
335: Throwable nextT = t;
336:
337: sb.append(reason);
338: while (t != null) {
339: if (nextT != null) {
340: java.io.PrintStream ps = new java.io.PrintStream(
341: b = new java.io.ByteArrayOutputStream());
342: t.printStackTrace(ps);
343: sb.append(" Exception (");
344: sb.append(b.toString());
345: sb.append(") ");
346: }
347: nextT = t.getCause();
348: if (nextT != null) {
349: sb.append("Caused by: ");
350: }
351: t = nextT;
352: }
353:
354: mLog.warning(sb.toString());
355: }
356:
357: }
|