001: // Copyright (c) 2004-2005 Sun Microsystems Inc., All Rights Reserved.
002:
003: /*
004: * MessageProcessor.java
005: *
006: * SUN PROPRIETARY/CONFIDENTIAL.
007: * This software is the proprietary information of Sun Microsystems, Inc.
008: * Use is subject to license terms.
009: *
010: */
011: package com.sun.jbi.engine.sequencing;
012:
013: import com.sun.jbi.engine.sequencing.framework.Servicelist;
014: import com.sun.jbi.engine.sequencing.servicelist.ServicelistBean;
015: import com.sun.jbi.engine.sequencing.servicelist.SimpleServicelist;
016: import com.sun.jbi.engine.sequencing.util.ConfigData;
017: import com.sun.jbi.engine.sequencing.util.StringTranslator;
018:
019: import java.net.URI;
020:
021: import java.util.Vector;
022: import java.util.logging.Logger;
023:
024: import javax.jbi.messaging.DeliveryChannel;
025: import javax.jbi.messaging.ExchangeStatus;
026: import javax.jbi.messaging.MessageExchange;
027:
028: /**
029: * MessageProcessor.
030: *
031: * @author Sun Microsystems, Inc.
032: */
033: public class MessageProcessor implements SequencingEngineResources {
034: /**
035: * Engine channel.
036: */
037: private DeliveryChannel mChannel;
038:
039: /**
040: * Deployment Registry.
041: */
042: private DeploymentRegistry mRegistry;
043:
044: /**
045: * Logger.
046: */
047: private Logger mLog;
048:
049: /**
050: * Message exchange
051: */
052: private MessageExchange mExchange;
053:
054: /**
055: * Message registry.
056: */
057: private MessageRegistry mMessageReg;
058:
059: /**
060: * Error holder
061: */
062: private String mError = "";
063:
064: /**
065: * Pattern of current exchange.
066: */
067: private String mPattern;
068:
069: /**
070: * Translator object for internationalization.
071: */
072: private StringTranslator mTranslator;
073:
074: /**
075: * Access List which maintains a list of services to avoid.
076: */
077: private Vector mAccessList;
078:
079: /**
080: * Flag to check
081: */
082: private boolean mCanAccept = true;
083:
084: /**
085: * Denotes valdiity of the exchange.
086: */
087: private boolean mValid = true;
088:
089: /**
090: * Creates a new MessageProcessor object.
091: *
092: * @param chnl engine channel
093: */
094: public MessageProcessor(DeliveryChannel chnl) {
095: mChannel = chnl;
096: mRegistry = DeploymentRegistry.getInstance();
097: mMessageReg = MessageRegistry.getInstance();
098: mCanAccept = true;
099: mLog = SequencingEngineContext.getInstance().getLogger();
100: mTranslator = new StringTranslator();
101: mAccessList = new Vector();
102: }
103:
104: /**
105: * Sets the message exchange.
106: *
107: * @param exchange message exchange
108: */
109: public void setExchange(MessageExchange exchange) {
110: mValid = true;
111: mExchange = exchange;
112: }
113:
114: /**
115: * Gets the pattern for the current exchange.
116: *
117: * @return string pattern name
118: */
119: public String getPattern() {
120: return mPattern;
121: }
122:
123: /**
124: * Method to add a service to accesslist.
125: *
126: * @param servicename service name
127: */
128: public void addToAccessList(String servicename) {
129: try {
130: mAccessList.add(servicename);
131: } catch (Exception e) {
132: ;
133: }
134: }
135:
136: /**
137: * Method to parse the exchange and identify a servicelist.
138: *
139: * @return Serviclist list object.
140: */
141: public Servicelist process() {
142: URI pattern = mExchange.getPattern();
143: Servicelist list = null;
144:
145: String pat = pattern.toString().trim();
146: String sname = mExchange.getEndpoint().getServiceName()
147: .getNamespaceURI()
148: + mExchange.getEndpoint().getServiceName()
149: .getLocalPart();
150:
151: /* Check deployment registry for service name.
152: * If its not there in deployment registry, then this service is not
153: * serviced by us. We do not know how the NMR routed the message to
154: * us , but anyway we check.
155: */
156: ServicelistBean slb = null;
157:
158: /* In out messages are requests from other
159: * engines or bindings to invoke a servicelist
160: * So, process it accordingly , See whether our list operation
161: * supports the pattern.
162: * In Out can also be responses after from other components.
163: */
164: mLog.info(mTranslator.getString(SEQ_RECEIVED_MESSAGE, mExchange
165: .getPattern(), mExchange.getExchangeId()));
166:
167: if ((pat.equals(PatternRegistry.IN_OUT.trim()))
168: || (pat.equals(PatternRegistry.IN_ONLY.trim()))
169: || (pat.equals(PatternRegistry.ROBUST_IN_ONLY.trim()))) {
170: list = processInbound();
171: } else if (pat.equals(PatternRegistry.IN_OPTIONAL_OUT.trim())) {
172: mLog.severe(mTranslator.getString(
173: SEQ_RECEIVED_INOPTIONALOUT, mExchange
174: .getExchangeId()));
175: mValid = false;
176: } else if (pat.equals(PatternRegistry.OUT_OPTIONAL_IN.trim())) {
177: mLog.severe(mTranslator.getString(
178: SEQ_RECEIVED_OUTOPTIONALIN, mExchange
179: .getExchangeId()));
180: mValid = false;
181: } else {
182: mLog.severe(mTranslator.getString(
183: SEQ_RECEIVED_UNSUPPORTED_MEP, mExchange
184: .getExchangeId()));
185: mValid = false;
186: }
187:
188: if (list != null) {
189: list.setMessageExchange(mExchange);
190:
191: javax.transaction.Transaction trans = (javax.transaction.Transaction) mExchange
192: .getProperty(MessageExchange.JTA_TRANSACTION_PROPERTY_NAME);
193:
194: try {
195: if ((trans != null)
196: && (trans == SequencingEngineContext
197: .getInstance().getTransactionManager()
198: .getTransaction())) {
199: SequencingEngineContext.getInstance()
200: .getTransactionManager().suspend();
201: mLog.fine(mTranslator.getString(SEQ_SUSPEND_TX,
202: mExchange.getExchangeId()));
203: }
204: } catch (javax.transaction.SystemException se) {
205: mLog.severe(mTranslator.getString(
206: SEQ_SUSPEND_TX_FAILED, mExchange
207: .getExchangeId()));
208: }
209:
210: list.setTransactionContext(trans);
211: }
212:
213: setPattern(pat);
214:
215: return list;
216: }
217:
218: /**
219: * Removes a service from access list.
220: *
221: * @param servicename service name to be removed.
222: */
223: public void removeFromAccessList(String servicename) {
224: try {
225: mAccessList.remove(servicename);
226: } catch (Exception e) {
227: ;
228: }
229: }
230:
231: /**
232: * This method stops the engine from processing any more new requests.
233: */
234: public void stopNewRequests() {
235: mCanAccept = false;
236: mLog.info(mTranslator.getString(SEQ_ENGINE_SHUTTING_DOWN));
237: }
238:
239: /**
240: * Validity.
241: *
242: * @return true if valid
243: */
244: public boolean valid() {
245: return mValid;
246: }
247:
248: /**
249: * Sets the error.
250: *
251: * @param er error message.
252: */
253: private void setError(String er) {
254: mError = er;
255: }
256:
257: /**
258: * Sets the pattern.
259: *
260: * @param pat mep
261: */
262: private void setPattern(String pat) {
263: mPattern = pat;
264: }
265:
266: /**
267: * Returns true if a message for the service can be accepted. Access lists
268: * are not implemented now.
269: *
270: * @param servicename service name to be checked.
271: *
272: * @return true if can be accepted.
273: */
274: private boolean canAccept(String servicename) {
275: return mCanAccept;
276: }
277:
278: /**
279: * Checks if the operation supports the pattern of exchange received.
280: *
281: * @param slb list bean
282: * @param pat pattern
283: * @param sname service name
284: */
285: private void checkSupport(ServicelistBean slb, String pat,
286: String sname) {
287: if (slb == null) {
288: mLog.severe(mTranslator.getString(SEQ_SERVICE_NOT_HOSTED,
289: sname));
290: setError(mTranslator.getString(SEQ_SERVICE_NOT_HOSTED,
291: sname));
292: mValid = false;
293:
294: return;
295: }
296:
297: String oper = mExchange.getOperation().getLocalPart();
298:
299: if (!oper.trim().equals(slb.getOperation().trim())) {
300: mLog.severe(mTranslator.getString(SEQ_UNSUPPORTED_OPER, slb
301: .getOperation(), slb.getServicename(), mExchange
302: .getExchangeId()));
303: setError(mTranslator.getString(SEQ_UNSUPPORTED_OPER, slb
304: .getOperation(), slb.getServicename(), mExchange
305: .getExchangeId()));
306: mValid = false;
307:
308: return;
309: }
310:
311: if (!slb.getMep().trim().equals(pat.trim())) {
312: mLog.severe(mTranslator.getString(
313: SEQ_UNSUPPORTED_PATTERN_OPER, slb.getOperation(),
314: slb.getServicename(), mExchange.getExchangeId()));
315: setError(mTranslator.getString(
316: SEQ_UNSUPPORTED_PATTERN_OPER, slb.getOperation(),
317: slb.getServicename(), mExchange.getExchangeId()));
318: mValid = false;
319: }
320: }
321:
322: /**
323: * Creates a nee service list instance.
324: *
325: * @param sname service name
326: * @param pat pattern
327: *
328: * @return list
329: */
330: private Servicelist newList(String sname, String pat) {
331: // definitely a new request no doubt about it
332: ServicelistBean slb = mRegistry.findService(sname);
333: Servicelist list = null;
334: checkSupport(slb, pat, sname);
335:
336: if (mValid && canAccept(sname)) {
337: list = new SimpleServicelist();
338: list.setServicelistBean(slb);
339: } else {
340: sendUnsupportedError();
341: }
342:
343: return list;
344: }
345:
346: /**
347: * Processes all messages.
348: *
349: * @return list
350: */
351: private Servicelist processInbound() {
352: String requestseqid = (String) mExchange
353: .getProperty(ConfigData.REQUEST_SEQ_ID);
354: String responseseqid = (String) mExchange
355: .getProperty(ConfigData.RESPONSE_SEQ_ID);
356: String sname = mExchange.getEndpoint().getServiceName()
357: .getNamespaceURI()
358: + mExchange.getEndpoint().getServiceName()
359: .getLocalPart()
360: + mExchange.getEndpoint().getEndpointName()
361: + mExchange.getOperation().getLocalPart();
362: String pat = mExchange.getPattern().toString().trim();
363: Servicelist list = null;
364: int type = -1;
365:
366: /* There could be other efficient ways to express the same logic
367: * but to maintain clarity of whats being done
368: * below is followed
369: */
370: if (mExchange.getStatus() == ExchangeStatus.ACTIVE) {
371: if (requestseqid == null) {
372: list = newList(sname, pat);
373: type = ConfigData.REQUEST_TYPE;
374: } else if ((requestseqid != null)
375: && (responseseqid != null)) {
376: /**
377: * this should be a response from sequencing engine itself
378: */
379: if (mMessageReg.isTimedOut(mExchange.getExchangeId()
380: + requestseqid)) {
381: mLog.info(mTranslator.getString(
382: SEQ_SERVICE_TIMED_OUT, mExchange
383: .getExchangeId()));
384: mValid = false;
385: } else {
386: list = mMessageReg.getServicelist(mExchange
387: .getExchangeId()
388: + requestseqid);
389: type = ConfigData.RESPONSE_TYPE;
390: }
391: } else if ((requestseqid != null)
392: && (responseseqid == null)) {
393: if (mExchange.getRole().equals(
394: MessageExchange.Role.PROVIDER)) {
395: /*
396: * This is definitely a request from sequencing engine
397: */
398: list = newList(sname, pat);
399: type = ConfigData.REQUEST_TYPE;
400: } else {
401: if (mMessageReg.isTimedOut(mExchange
402: .getExchangeId()
403: + requestseqid)) {
404: mLog.info(mTranslator.getString(
405: SEQ_SERVICE_TIMED_OUT, mExchange
406: .getExchangeId()));
407: mValid = false;
408: } else {
409: /*
410: * If req id not nul then it cud a response or a new
411: * request from seq engine itself
412: */
413: list = mMessageReg.getServicelist(mExchange
414: .getExchangeId()
415: + requestseqid);
416: type = ConfigData.RESPONSE_TYPE;
417: }
418: }
419: }
420: } else if (mExchange.getStatus() == ExchangeStatus.DONE) {
421: if ((requestseqid != null) && (responseseqid != null)) {
422: if (mExchange.getPattern().toString().trim().equals(
423: PatternRegistry.IN_OUT)) {
424: list = mMessageReg.getServicelist(mExchange
425: .getExchangeId()
426: + responseseqid);
427: type = ConfigData.REQUEST_TYPE;
428: } else {
429: list = mMessageReg.getServicelist(mExchange
430: .getExchangeId()
431: + requestseqid);
432: type = ConfigData.RESPONSE_TYPE;
433: }
434: } else if ((requestseqid == null)
435: && (responseseqid != null)) {
436: /* This is a case when seq engine receives a request
437: * from another compontn
438: */
439: list = mMessageReg.getServicelist(mExchange
440: .getExchangeId()
441: + responseseqid);
442: type = ConfigData.REQUEST_TYPE;
443: } else {
444: list = mMessageReg.getServicelist(mExchange
445: .getExchangeId()
446: + requestseqid);
447: type = ConfigData.RESPONSE_TYPE;
448: }
449: } else {
450: if (requestseqid != null) {
451: list = mMessageReg.getServicelist(mExchange
452: .getExchangeId()
453: + requestseqid);
454: type = ConfigData.RESPONSE_TYPE;
455: } else if (responseseqid != null) {
456: list = mMessageReg.getServicelist(mExchange
457: .getExchangeId()
458: + responseseqid);
459:
460: type = ConfigData.REQUEST_TYPE;
461: } else {
462: mValid = false;
463: }
464: }
465:
466: if (list == null) {
467: mLog.info(mTranslator.getString(SEQ_SERVICE_TIMED_OUT,
468: mExchange.getExchangeId()));
469: mValid = false;
470: } else {
471: list.setType(type);
472: }
473:
474: return list;
475: }
476:
477: /**
478: * Sends an error if servicename or operationb is not supported.
479: */
480: private void sendUnsupportedError() {
481: Exception exp = new Exception(mError);
482:
483: try {
484: mExchange.setError(exp);
485: mChannel.send(mExchange);
486: } catch (Exception e) {
487: e.printStackTrace();
488: }
489: }
490: }
|