001: /*
002: * BEGIN_HEADER - DO NOT EDIT
003: *
004: * The contents of this file are subject to the terms
005: * of the Common Development and Distribution License
006: * (the "License"). You may not use this file except
007: * in compliance with the License.
008: *
009: * You can obtain a copy of the license at
010: * https://open-esb.dev.java.net/public/CDDLv1.0.html.
011: * See the License for the specific language governing
012: * permissions and limitations under the License.
013: *
014: * When distributing Covered Code, include this CDDL
015: * HEADER in each file and include the License file at
016: * https://open-esb.dev.java.net/public/CDDLv1.0.html.
017: * If applicable add the following below this CDDL HEADER,
018: * with the fields enclosed by brackets "[]" replaced with
019: * your own identifying information: Portions Copyright
020: * [year] [name of copyright owner]
021: */
022:
023: /*
024: * @(#)MessageProcessor.java
025: * Copyright 2004-2007 Sun Microsystems, Inc. All Rights Reserved.
026: *
027: * END_HEADER - DO NOT EDIT
028: */
029: package com.sun.jbi.engine.sequencing;
030:
031: import com.sun.jbi.engine.sequencing.framework.Servicelist;
032: import com.sun.jbi.engine.sequencing.servicelist.ServicelistBean;
033: import com.sun.jbi.engine.sequencing.servicelist.SimpleServicelist;
034: import com.sun.jbi.engine.sequencing.util.ConfigData;
035: import com.sun.jbi.engine.sequencing.util.StringTranslator;
036:
037: import java.net.URI;
038:
039: import java.util.Vector;
040: import java.util.logging.Logger;
041:
042: import javax.jbi.messaging.InOnly;
043: import javax.jbi.messaging.InOut;
044: import javax.jbi.messaging.InOptionalOut;
045: import javax.jbi.messaging.RobustInOnly;
046: import javax.jbi.messaging.DeliveryChannel;
047: import javax.jbi.messaging.ExchangeStatus;
048: import javax.jbi.messaging.MessageExchange;
049:
050: /**
051: * MessageProcessor.
052: *
053: * @author Sun Microsystems, Inc.
054: */
055: public class MessageProcessor implements SequencingEngineResources {
056: /**
057: * Engine channel.
058: */
059: private DeliveryChannel mChannel;
060:
061: /**
062: * Deployment Registry.
063: */
064: private DeploymentRegistry mRegistry;
065:
066: /**
067: * Logger.
068: */
069: private Logger mLog;
070:
071: /**
072: * Message exchange
073: */
074: private MessageExchange mExchange;
075:
076: /**
077: * Message registry.
078: */
079: private MessageRegistry mMessageReg;
080:
081: /**
082: * Error holder
083: */
084: private String mError = "";
085:
086: /**
087: * Pattern of current exchange.
088: */
089: private String mPattern;
090:
091: /**
092: * Translator object for internationalization.
093: */
094: private StringTranslator mTranslator;
095:
096: /**
097: * Access List which maintains a list of services to avoid.
098: */
099: private Vector mAccessList;
100:
101: /**
102: * Flag to check
103: */
104: private boolean mCanAccept = true;
105:
106: /**
107: * Denotes valdiity of the exchange.
108: */
109: private boolean mValid = true;
110:
111: /**
112: * Creates a new MessageProcessor object.
113: *
114: * @param chnl engine channel
115: */
116: public MessageProcessor(DeliveryChannel chnl) {
117: mChannel = chnl;
118: mRegistry = DeploymentRegistry.getInstance();
119: mMessageReg = MessageRegistry.getInstance();
120: mCanAccept = true;
121: mLog = SequencingEngineContext.getInstance().getLogger();
122: mTranslator = new StringTranslator();
123: mAccessList = new Vector();
124: }
125:
126: /**
127: * Sets the message exchange.
128: *
129: * @param exchange message exchange
130: */
131: public void setExchange(MessageExchange exchange) {
132: mValid = true;
133: mExchange = exchange;
134: }
135:
136: /**
137: * Gets the pattern for the current exchange.
138: *
139: * @return string pattern name
140: */
141: public String getPattern() {
142: return mPattern;
143: }
144:
145: /**
146: * Method to add a service to accesslist.
147: *
148: * @param servicename service name
149: */
150: public void addToAccessList(String servicename) {
151: try {
152: mAccessList.add(servicename);
153: } catch (Exception e) {
154: ;
155: }
156: }
157:
158: /**
159: * Method to parse the exchange and identify a servicelist.
160: *
161: * @return Serviclist list object.
162: */
163: public Servicelist process() {
164: URI pattern = mExchange.getPattern();
165: Servicelist list = null;
166: //mLog.info("** ACEPT - MSG REG *** ");
167: //mMessageReg.dump();
168: String pat = pattern.toString().trim();
169: String sname = mExchange.getEndpoint().getServiceName()
170: .getNamespaceURI()
171: + mExchange.getEndpoint().getServiceName()
172: .getLocalPart();
173:
174: /* Check deployment registry for service name.
175: * If its not there in deployment registry, then this service is not
176: * serviced by us. We do not know how the NMR routed the message to
177: * us , but anyway we check.
178: */
179: ServicelistBean slb = null;
180:
181: /* In out messages are requests from other
182: * engines or bindings to invoke a servicelist
183: * So, process it accordingly , See whether our list operation
184: * supports the pattern.
185: * In Out can also be responses after from other components.
186: */
187: mLog.info(mTranslator.getString(SEQ_RECEIVED_MESSAGE, mExchange
188: .getPattern(), mExchange.getExchangeId()));
189:
190: if (mExchange instanceof InOut || mExchange instanceof InOnly
191: || mExchange instanceof RobustInOnly) {
192: list = processInbound();
193: } else if (mExchange instanceof InOptionalOut) {
194: mLog.severe(mTranslator.getString(
195: SEQ_RECEIVED_INOPTIONALOUT, mExchange
196: .getExchangeId()));
197: mValid = false;
198: } else if (pat.equals(PatternRegistry.OUT_OPTIONAL_IN.trim())) {
199: mLog.severe(mTranslator.getString(
200: SEQ_RECEIVED_OUTOPTIONALIN, mExchange
201: .getExchangeId()));
202: mValid = false;
203: } else {
204: mLog.severe(mTranslator.getString(
205: SEQ_RECEIVED_UNSUPPORTED_MEP, mExchange
206: .getExchangeId()));
207: mValid = false;
208: }
209:
210: if (list != null) {
211: list.setMessageExchange(mExchange);
212:
213: javax.transaction.Transaction trans = (javax.transaction.Transaction) mExchange
214: .getProperty(MessageExchange.JTA_TRANSACTION_PROPERTY_NAME);
215:
216: try {
217: if ((trans != null)
218: && (trans == SequencingEngineContext
219: .getInstance().getTransactionManager()
220: .getTransaction())) {
221: SequencingEngineContext.getInstance()
222: .getTransactionManager().suspend();
223: mLog.fine(mTranslator.getString(SEQ_SUSPEND_TX,
224: mExchange.getExchangeId()));
225: }
226: } catch (javax.transaction.SystemException se) {
227: mLog.severe(mTranslator.getString(
228: SEQ_SUSPEND_TX_FAILED, mExchange
229: .getExchangeId()));
230: }
231:
232: list.setTransactionContext(trans);
233: }
234:
235: setPattern(pat);
236:
237: return list;
238: }
239:
240: /**
241: * Removes a service from access list.
242: *
243: * @param servicename service name to be removed.
244: */
245: public void removeFromAccessList(String servicename) {
246: try {
247: mAccessList.remove(servicename);
248: } catch (Exception e) {
249: ;
250: }
251: }
252:
253: /**
254: * This method stops the engine from processing any more new requests.
255: */
256: public void stopNewRequests() {
257: mCanAccept = false;
258: mLog.info(mTranslator.getString(SEQ_ENGINE_SHUTTING_DOWN));
259: }
260:
261: /**
262: * Validity.
263: *
264: * @return true if valid
265: */
266: public boolean valid() {
267: return mValid;
268: }
269:
270: /**
271: * Sets the error.
272: *
273: * @param er error message.
274: */
275: private void setError(String er) {
276: mError = er;
277: }
278:
279: /**
280: * Sets the pattern.
281: *
282: * @param pat mep
283: */
284: private void setPattern(String pat) {
285: mPattern = pat;
286: }
287:
288: /**
289: * Returns true if a message for the service can be accepted. Access lists
290: * are not implemented now.
291: *
292: * @param servicename service name to be checked.
293: *
294: * @return true if can be accepted.
295: */
296: private boolean canAccept(String servicename) {
297: return mCanAccept;
298: }
299:
300: /**
301: * Checks if the operation supports the pattern of exchange received.
302: *
303: * @param slb list bean
304: * @param pat pattern
305: * @param sname service name
306: */
307: private void checkSupport(ServicelistBean slb, String pat,
308: String sname) {
309: if (slb == null) {
310: mLog.severe(mTranslator.getString(SEQ_SERVICE_NOT_HOSTED,
311: sname));
312: setError(mTranslator.getString(SEQ_SERVICE_NOT_HOSTED,
313: sname));
314: mValid = false;
315:
316: return;
317: }
318:
319: String oper = mExchange.getOperation().getLocalPart();
320:
321: if (!oper.trim().equals(slb.getOperation().trim())) {
322: mLog.severe(mTranslator.getString(SEQ_UNSUPPORTED_OPER, slb
323: .getOperation(), slb.getServicename(), mExchange
324: .getExchangeId()));
325: setError(mTranslator.getString(SEQ_UNSUPPORTED_OPER, slb
326: .getOperation(), slb.getServicename(), mExchange
327: .getExchangeId()));
328: mValid = false;
329:
330: return;
331: }
332:
333: if (!slb.getMep().trim().equals(pat.trim())) {
334: mLog.severe(mTranslator.getString(
335: SEQ_UNSUPPORTED_PATTERN_OPER, slb.getOperation(),
336: slb.getServicename(), mExchange.getExchangeId()));
337: setError(mTranslator.getString(
338: SEQ_UNSUPPORTED_PATTERN_OPER, slb.getOperation(),
339: slb.getServicename(), mExchange.getExchangeId()));
340: mValid = false;
341: }
342: }
343:
344: /**
345: * Creates a nee service list instance.
346: *
347: * @param sname service name
348: * @param pat pattern
349: *
350: * @return list
351: */
352: private Servicelist newList(String sname, String pat) {
353: // definitely a new request no doubt about it
354: ServicelistBean slb = mRegistry.findService(sname);
355: Servicelist list = null;
356: checkSupport(slb, pat, sname);
357:
358: if (mValid && canAccept(sname)) {
359: list = new SimpleServicelist();
360: list.setServicelistBean(slb);
361: } else {
362: sendUnsupportedError();
363: }
364:
365: return list;
366: }
367:
368: /**
369: * Processes all messages.
370: *
371: * @return list
372: */
373: private Servicelist processInbound() {
374: String requestseqid = (String) mExchange
375: .getProperty(ConfigData.REQUEST_SEQ_ID);
376: String responseseqid = (String) mExchange
377: .getProperty(ConfigData.RESPONSE_SEQ_ID);
378: String sname = mExchange.getEndpoint().getServiceName()
379: .getNamespaceURI()
380: + mExchange.getEndpoint().getServiceName()
381: .getLocalPart()
382: + mExchange.getEndpoint().getEndpointName()
383: + mExchange.getOperation().getLocalPart();
384: String pat = "";
385: Servicelist list = null;
386: int type = -1;
387:
388: if (mExchange instanceof InOut) {
389: pat = PatternRegistry.IN_OUT;
390: } else if (mExchange instanceof InOnly) {
391: pat = PatternRegistry.IN_ONLY;
392: } else if (mExchange instanceof RobustInOnly) {
393: pat = PatternRegistry.ROBUST_IN_ONLY;
394: } else if (mExchange instanceof InOptionalOut) {
395: pat = PatternRegistry.IN_OPTIONAL_OUT;
396: }
397:
398: /* There could be other efficient ways to express the same logic
399: * but to maintain clarity of whats being done
400: * below is followed
401: */
402: String messageid = null;
403: if (mExchange.getStatus() == ExchangeStatus.ACTIVE) {
404: if (requestseqid == null) {
405: list = newList(sname, pat);
406: type = ConfigData.REQUEST_TYPE;
407: } else if ((requestseqid != null)
408: && (responseseqid != null)) {
409: /**
410: * this should be a response from sequencing engine itself
411: */
412: if (mMessageReg.isTimedOut(mExchange.getExchangeId()
413: + requestseqid)) {
414: mLog.info(mTranslator.getString(
415: SEQ_SERVICE_TIMED_OUT, mExchange
416: .getExchangeId()));
417: mValid = false;
418: } else {
419: messageid = mExchange.getExchangeId()
420: + requestseqid;
421: list = mMessageReg.getServicelist(messageid);
422: type = ConfigData.RESPONSE_TYPE;
423: }
424: } else if ((requestseqid != null)
425: && (responseseqid == null)) {
426: if (mExchange.getRole().equals(
427: MessageExchange.Role.PROVIDER)) {
428: /*
429: * This is definitely a request from sequencing engine
430: */
431: list = newList(sname, pat);
432: type = ConfigData.REQUEST_TYPE;
433: } else {
434: if (mMessageReg.isTimedOut(mExchange
435: .getExchangeId()
436: + requestseqid)) {
437: mLog.info(mTranslator.getString(
438: SEQ_SERVICE_TIMED_OUT, mExchange
439: .getExchangeId()));
440: mValid = false;
441: } else {
442: /*
443: * If req id not nul then it cud a response or a new
444: * request from seq engine itself
445: */
446: messageid = mExchange.getExchangeId()
447: + requestseqid;
448: list = mMessageReg.getServicelist(messageid);
449: type = ConfigData.RESPONSE_TYPE;
450: }
451: }
452: }
453: } else if (mExchange.getStatus() == ExchangeStatus.DONE) {
454: if ((requestseqid != null) && (responseseqid != null)) {
455: if (mExchange instanceof InOut) {
456: messageid = mExchange.getExchangeId()
457: + responseseqid;
458: list = mMessageReg.getServicelist(messageid);
459: type = ConfigData.REQUEST_TYPE;
460: } else {
461: messageid = mExchange.getExchangeId()
462: + requestseqid;
463: list = mMessageReg.getServicelist(messageid);
464: type = ConfigData.RESPONSE_TYPE;
465: }
466: } else if ((requestseqid == null)
467: && (responseseqid != null)) {
468: /* This is a case when seq engine receives a request
469: * from another compontn
470: */
471: messageid = mExchange.getExchangeId() + responseseqid;
472: list = mMessageReg.getServicelist(messageid);
473: type = ConfigData.REQUEST_TYPE;
474: } else {
475: messageid = mExchange.getExchangeId() + requestseqid;
476: list = mMessageReg.getServicelist(messageid);
477: type = ConfigData.RESPONSE_TYPE;
478: }
479: } else {
480: if (requestseqid != null) {
481: messageid = mExchange.getExchangeId() + requestseqid;
482: list = mMessageReg.getServicelist(messageid);
483: type = ConfigData.RESPONSE_TYPE;
484: } else if (responseseqid != null) {
485: messageid = mExchange.getExchangeId() + responseseqid;
486: list = mMessageReg.getServicelist(messageid);
487:
488: type = ConfigData.REQUEST_TYPE;
489: } else {
490: mValid = false;
491: }
492: }
493:
494: if (list == null) {
495: mLog.info(mTranslator.getString(SEQ_SERVICE_TIMED_OUT,
496: mExchange.getExchangeId()));
497: mValid = false;
498: } else {
499: /* We found the list, so deregister the message now.
500: * After all we registered it only to find the list the next time.
501: */
502: if (messageid != null) {
503: mMessageReg.deregisterExchange(messageid);
504: }
505: list.setType(type);
506: }
507:
508: return list;
509: }
510:
511: /**
512: * Sends an error if servicename or operationb is not supported.
513: */
514: private void sendUnsupportedError() {
515: Exception exp = new Exception(mError);
516:
517: try {
518: mExchange.setError(exp);
519: mChannel.send(mExchange);
520: } catch (Exception e) {
521: e.printStackTrace();
522: }
523: }
524: }
|