001: /*
002: * The contents of this file are subject to the terms of the Common Development
003: * and Distribution License (the License). You may not use this file except in
004: * compliance with the License.
005: *
006: * You can obtain a copy of the License at http://www.netbeans.org/cddl.html
007: * or http://www.netbeans.org/cddl.txt.
008: *
009: * When distributing Covered Code, include this CDDL Header Notice in each file
010: * and include the License file at http://www.netbeans.org/cddl.txt.
011: * If applicable, add the following below the CDDL Header, with the fields
012: * enclosed by brackets [] replaced by your own identifying information:
013: * "Portions Copyrighted [year] [name of copyright owner]"
014: *
015: * The Original Software is NetBeans. The Initial Developer of the Original
016: * Software is Sun Microsystems, Inc. Portions Copyright 1997-2007 Sun
017: * Microsystems, Inc. All Rights Reserved.
018: */
019: package org.netbeans.modules.wsdlextensions.jms.validator;
020:
021: import java.util.HashMap;
022: import java.util.regex.Matcher;
023: import java.util.regex.Pattern;
024:
025: import org.netbeans.modules.wsdlextensions.jms.JMSConstants;
026:
027: /**
028: * Is used to deal with "poisonous messages". A poison message is a message that fails
029: * to be processed time and time again, thereby stopping other messages from being
030: * processed.
031: *
032: * Is invoked for each message. Maintains a cache of msgids of messages that have the
033: * JMSRedelivered flag set, and keeps a count of these messages. Based on the number
034: * of times a message was redelivered, a particular Action can be invoked. Actions are
035: * delay, moving or deleting the message.
036: *
037: * The msgid cache is not persistent, nor is it shared between multiple activations. This
038: * means that if a message was seen 10 times with the redelivered flag set, and the
039: * project is undeployed, the redelivery count will be reset to zero. Also, if there are
040: * multiple application servers reading from the same queue, a message may be redelivered
041: * 10 times to one application server, and 10 times to the other application server, and
042: * both activations will see a count of 10 instead of 20.
043: *
044: * The msgid cache is limited to 5000 (check source); when this limit is reached, the
045: * oldest msgids are flushed from the cache. "Oldest" means least recently seen.
046: *
047: * Specification of the actions is done through a specially formatted string. The string
048: * has this format:
049: * format := entry[; entry]*
050: * entry := idx ":" action
051: * idx := number (denotes the n-th time a msg was seen)
052: * action := number (denotes delay in ms) | "delete" | "move"(args)
053: * move := "queue"|"topic" | "same" ":" destname
054: * destname := any string, may include "$" which will be replaced with the original
055: * destination name.
056: *
057: * Examples:
058: * 5:1000; 10:5000; 50:move(queue:mydlq)
059: * This causes no delay up to the 5th delivery; a 1000 ms delay is invoked when the
060: * message is seen the 5th, 6th, 7th, 8th, and 9th time. A 5 second delay is invoked
061: * when the msg is invoked the 10th, 11th, ..., 49th time. When the msg is seen the 50th
062: * time the msg is moved to a queue with the name "mydlq".
063: *
064: * If the messages were received from "Queue1" and if the string was specified as
065: * 5:1000; 10:5000; 50:move(queue:dlq$oops)
066: * the messages would be moved to the destination "dlqQueue1oops".
067: *
068: * Another example:
069: * 5:1000; 10:5000
070: * This causes no delay up to the 5th delivery; a 1000 ms delay is invoked when the
071: * message is seen the 5th, 6th, 7th, 8th, and 9th time. A 5 second delay is invoked
072: * for each time the message is seen thereafter.
073: *
074: * Moving messages is done in the same transaction if the transaction is XA. Moving
075: * messages is done using auto-commit if the delivery is non-XA.
076: *
077: * Moving messages is done by creating a new message of the same type unless the
078: * property JMSJCA.redeliveryRedirect is set to true in which case the messages are
079: * simply redirected. In the first case, the payload of the new message is set as follows:
080: * - for a ObjectMessage this will be done through getObject(), setObject();
081: * - for a StreamMessage through readObject/writeObject,
082: * - for a BytesMessage through readBytes() and writeBytes()
083: * (avoiding the getBodyLength() method new in JMS 1.1)
084: * Copying the payload of an ObjectMessage may cause classloader problems since the
085: * context classloader is not properly set. In this case the redelivery handler should
086: * be configured to redirect the message instead.
087: * The new message will have properties as follows:
088: * * JMS properties
089: * - JMSCorrelationID: copied
090: * - JMSDestination: see above; set by JMS provider
091: * - JMSExpiration: copied through the send method
092: * - JMSMessageID: set by the JMS provider
093: * - JMSPriority: set by the JMS provider; propagated through the send() method
094: * - JMSRedelivered: NOT copied
095: * - JMSReplyTo: copied
096: * - JMSTimestamp: copied into the user property field JMSJCATimestamp
097: * - JMSType: copied
098: * - JMSDeliveryMode: set by the JMS provider; propagated through the send() method
099: * * All user defined properties: copied
100: * * Additional properties:
101: * - JMS_Sun-JMSJCA.RedeliveryCount: number of times the message was seen with the redelivered
102: * flag set by JMSJCA. Will accurately reflect the total number of redelivery attempts
103: * only if there's one instance of the inbound adapter, and the inbound adapter was
104: * not redeployed.
105: * - JMS_Sun-JMSJCA.OriginalDestinationName: name of the destination as specified in the
106: * activation spec
107: * - JMS_Sun-JMSJCA.OriginalDestinationType: either "javax.jms.Queue" or "javax.jms.Topic"
108: * - JMS_Sun-JMSJCA.SubscriberName: as specified in the activation spec
109: * - JMS_Sun-JMSJCA.ContextName: as specified in the activation spec
110: *
111: * Invoking a delay takes place by holding the processing thread occupied, that means
112: * that while the thread is sleeping, this thread will not be used to process any other
113: * messages. Undeployment interrupts threads that are delaying message delivery. If a
114: * msg delay is divisible by 1000, an INFO message is written to the log indicating that
115: * the thead is delaying message delivery.
116: *
117: * There is a default behavior for message redelivery handling: see source.
118: *
119: * Implementation notes: this class is made abstract to enhance testability.
120: *
121: */
122: public class RedeliveryHandlingParser {
123:
124: /**
125: * A baseclass of all actions that could happen in response to a a repeated
126: * redelivered message
127: */
128: public abstract static class Action {
129: private int mAt;
130:
131: /**
132: * Constructor
133: *
134: * @param at at which encounter to invoke
135: */
136: public Action(int at) throws ValidationException {
137: if (at <= 0) {
138: throw new ValidationException("Index " + at
139: + " should be > 0");
140: }
141: if (at > 5000) {
142: throw new ValidationException("Index " + at
143: + " should be <= 5000");
144: }
145: mAt = at;
146: }
147:
148: /**
149: * @return at which encounter to invoke
150: */
151: public int getAt() {
152: return mAt;
153: }
154:
155: /**
156: * Asserts that the next value is greater than the previous value
157: *
158: * @param lastAt last value
159: * @return current value of lastAt
160: * @throws Exception on assertion failure
161: */
162: public int checkLast(int lastAt) throws Exception {
163: if (lastAt == mAt) {
164: throw new Exception("Duplicate entry at: " + lastAt);
165: }
166: if (lastAt >= mAt) {
167: throw new Exception("Should be properly ordered: "
168: + lastAt + " >= " + mAt);
169: }
170: return mAt;
171: }
172: }
173:
174: /**
175: * No action; always at the beginning
176: */
177: public static class VoidAction extends Action {
178: /**
179: * Constructor
180: */
181: public VoidAction() throws ValidationException {
182: super (1);
183: }
184:
185: /**
186: * @see java.lang.Object#toString()
187: */
188: public String toString() {
189: return "Void";
190: }
191: }
192:
193: /**
194: * A delay action
195: */
196: public static class Delay extends Action {
197: /**
198: * How to recognize a delay
199: */
200: public static final String PATTERN = "(\\d+):\\s?(\\d+)";
201: /**
202: * Compiled regex pattern
203: */
204: public static Pattern sPattern = Pattern.compile(PATTERN);
205: private long mDelay;
206:
207: /**
208: * Constructor
209: *
210: * @param at when
211: * @param delay how long (ms)
212: * @throws Exception on invalid arguments
213: */
214: public Delay(int at, long delay) throws Exception {
215: super (at);
216: if (delay > 5000 && delay != Integer.MAX_VALUE) {
217: // Note: max_value is used for testing
218: throw new Exception("Delay of [" + delay
219: + "] exceeds maximum of 5000 ms");
220: }
221: mDelay = delay;
222: }
223:
224: /**
225: * @return delay time in ms
226: */
227: public long getHowLong() {
228: return mDelay;
229: }
230:
231: /**
232: * @see java.lang.Object#toString()
233: */
234: public String toString() {
235: return "At " + getAt() + ": delay for " + mDelay + " ms";
236: }
237: }
238:
239: /**
240: * Moves a msg to a different queue or topic
241: */
242: public static class Move extends Action {
243: /**
244: * How to recognize a delay
245: */
246: public static final String PATTERN = "(\\d+):\\s?move\\((.*)\\)";
247: /**
248: * Compiled version
249: */
250: public static Pattern sPattern = Pattern.compile(PATTERN);
251:
252: /**
253: * The argument pattern
254: */
255: public static String ARGPATTERN = "(queue|same|topic)\\s?:\\s?(.+)";
256: /**
257: * Compiled version
258: */
259: public static Pattern sArgPattern = Pattern.compile(ARGPATTERN);
260:
261: private String mType; // either Queue or Topic
262: private String mName;
263:
264: /**
265: * @param at when to invoke
266: * @param type destination type
267: * @param name destination name
268: * @param destinationType type from wsdl
269: * @throws Exception on failure
270: */
271: public Move(int at, String type, String name,
272: String destinationType) throws ValidationException {
273: super (at);
274: mName = name;
275: if (!JMSConstants.QUEUE.equals(destinationType)
276: && !JMSConstants.TOPIC.equals(destinationType)) {
277: throw new ValidationException(
278: "Invalid destination type [" + destinationType
279: + "]");
280: }
281: if ("same".equals(type)) {
282: mType = destinationType;
283: } else if ("queue".equals(type)) {
284: mType = JMSConstants.QUEUE;
285: } else if ("topic".equals(type)) {
286: mType = JMSConstants.TOPIC;
287: } else {
288: throw new ValidationException("Invalid type [" + type
289: + "]");
290: }
291: }
292:
293: /**
294: * @return javax.jms.Queue or javax.jms.Topic
295: */
296: public String getDestinationType() {
297: return mType;
298: }
299:
300: /**
301: * @return true if Queue
302: */
303: public boolean isQueue() {
304: return mType.equals(JMSConstants.QUEUE);
305: }
306:
307: /**
308: * @return true if topic
309: */
310: public boolean isTopic() {
311: return mType.equals(JMSConstants.TOPIC);
312: }
313:
314: /**
315: * @return destination name to use for DLQ
316: */
317: public String getDestinationName() {
318: return mName;
319: }
320:
321: /**
322: * @see java.lang.Object#toString()
323: */
324: public String toString() {
325: return "At " + getAt() + ": move to " + mType
326: + " with name [" + mName + "]";
327: }
328: }
329:
330: /**
331: * Deletes a msg
332: */
333: public static class Delete extends Action {
334: /**
335: * How to recognize a delete
336: */
337: public static final String PATTERN = "(\\d+):\\s?delete";
338: /**
339: * Compiled
340: */
341: public static Pattern sPattern = Pattern.compile(PATTERN);
342:
343: /**
344: * Constructor
345: *
346: * @param at when
347: * @throws Exception on illegal argument
348: */
349: public Delete(int at) throws Exception {
350: super (at);
351: }
352:
353: /**
354: * @see java.lang.Object#toString()
355: */
356: public String toString() {
357: return "At " + getAt() + ": delete";
358: }
359: }
360:
361: /**
362: * @param actions action string
363: * @return true if can be parsed properly
364: */
365: public static boolean checkValid(String actions) {
366: try {
367: parse(actions, "nothing", JMSConstants.QUEUE);
368: return true;
369: } catch (Exception e) {
370: return false;
371: }
372: }
373:
374: /**
375: * Parses an action string into separate actions and performs validations.
376: * The returned action array is guaranteed to be ordered and without duplicates.
377: *
378: * @param s string to be parsed
379: * @param destName destination name being used (for dlq name construction)
380: * @param destType type from activation spec (javax.jms.Queue or javax.jms.Topic)
381: * @return array of actions
382: * @throws Exception upon parsing failure
383: */
384: public static Action[] parse(String s, String destName,
385: String destType) throws Exception {
386: if (s.trim().length() == 0) {
387: return new Action[] { new VoidAction() };
388: }
389:
390: // Split the string in different actions
391: String[] actions = s.split("\\s*;\\s*");
392: Action[] ret = new Action[actions.length];
393:
394: // Go over all actions and try to parse each action
395: int lastAt = 0;
396: for (int i = 0; i < actions.length; i++) {
397:
398: try {
399: Matcher m;
400: boolean last = i == (actions.length - 1);
401:
402: // Delay
403: m = Delay.sPattern.matcher(actions[i]);
404: if (m.matches()) {
405: String at = m.group(1);
406: String delay = m.group(2);
407: ret[i] = new Delay(Integer.parseInt(at), Long
408: .parseLong(delay));
409: lastAt = ret[i].checkLast(lastAt);
410: continue;
411: }
412:
413: // Delete
414: m = Delete.sPattern.matcher(actions[i]);
415: if (m.matches()) {
416: String at = m.group(1);
417:
418: if (!last) {
419: throw new Exception(
420: "Move command should be last command");
421: }
422:
423: ret[i] = new Delete(Integer.parseInt(at));
424: lastAt = ret[i].checkLast(lastAt);
425: continue;
426: }
427:
428: // Move
429: m = Move.sPattern.matcher(actions[i]);
430: if (m.matches()) {
431: String at = m.group(1);
432: String guts = m.group(2);
433: Matcher g = Move.sArgPattern.matcher(guts);
434: if (!g.matches()) {
435: throw new Exception(
436: "Wrong arguments: should match "
437: + Move.ARGPATTERN);
438: }
439: String type = g.group(1);
440: String name = g.group(2);
441:
442: if (!last) {
443: throw new Exception(
444: "Move command should be last command");
445: }
446:
447: name = name.replaceAll("\\$", destName);
448:
449: ret[i] = new Move(Integer.parseInt(at), type, name,
450: destType);
451: lastAt = ret[i].checkLast(lastAt);
452: continue;
453: }
454:
455: throw new ValidationException("Action '" + actions[i]
456: + "' is not a valid action");
457: } catch (Exception e) {
458: throw new ValidationException("Could not parse [" + s
459: + "]: error [" + e + "] in element number " + i
460: + ": [" + actions[i] + "]", e);
461: }
462: }
463:
464: return ret;
465: }
466: }
|