001: /* ====================================================================
002: The Jicarilla Software License
003:
004: Copyright (c) 2003 Leo Simons.
005: All rights reserved.
006:
007: Permission is hereby granted, free of charge, to any person obtaining
008: a copy of this software and associated documentation files (the
009: "Software"), to deal in the Software without restriction, including
010: without limitation the rights to use, copy, modify, merge, publish,
011: distribute, sublicense, and/or sell copies of the Software, and to
012: permit persons to whom the Software is furnished to do so, subject to
013: the following conditions:
014:
015: The above copyright notice and this permission notice shall be
016: included in all copies or substantial portions of the Software.
017:
018: THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
019: EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
020: MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
021: IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
022: CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
023: TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
024: SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
025: ==================================================================== */
026: package org.jicarilla.plumbing;
027:
028: import EDU.oswego.cs.dl.util.concurrent.Channel;
029:
030: import java.util.ArrayList;
031: import java.util.Iterator;
032: import java.util.List;
033:
034: import org.jicarilla.plumbing.Sink;
035: import org.jicarilla.plumbing.Stage;
036: import org.jicarilla.plumbing.DefaultStage;
037: import org.jicarilla.lang.Assert;
038:
039: /**
040: * <p>A Postprocessor runs a message through several stages <i>before</i>
041: * dequeueing it. In a diagram:</p>
042: *
043: * <pre>
044: * +-------------------------------------------+
045: * |Postprocessor |
046: * put() |+---------+ +------+ +------+ +------+ take()
047: * ------>|Channel |-->|Stage1|--->|Stage2|-->|Stage3|----->
048: * |+---------+ +------+ +------+ +------+
049: * +-------------------------------------------+
050: * </pre>
051: *
052: * <p>Use a Postprocessor when it does not make sense to put any kind of
053: * (asynchronous) buffering between plumbing and it is acceptable for
054: * {@link #take()} to take a long time (no pun intended).</p>
055: *
056: * @author <a href="lsimons at jicarilla dot org">Leo Simons</a>
057: * @version $Id: PostProcessor.java,v 1.2 2004/03/23 13:37:58 lsimons Exp $
058: */
059: public class PostProcessor extends DefaultStage {
060: /** The stages to run before returning from {@link #take()}. */
061: protected final List m_stages;
062: /**
063: * The mutex to synchronize on for operations involving
064: * {@link m_stages}.
065: */
066: protected final Object m_mutex = new Object();
067:
068: /**
069: * Create a new instance.
070: *
071: * @param channel the channel to pass to {@link DefaultStage}
072: * @param errorHandler the error handler to pass to {@link org.jicarilla.plumbing.DefaultStage}
073: */
074: public PostProcessor(final Channel channel, final Sink errorHandler) {
075: super (channel, errorHandler);
076: m_stages = new ArrayList();
077: }
078:
079: /**
080: * Add a stage to the processor. It's appended at the end of the current
081: * set of stages.
082: *
083: * @param stage the stage to add
084: */
085: public void addStage(final Stage stage) {
086: Assert.assertNotNull(stage);
087:
088: synchronized (m_mutex) {
089: m_stages.add(stage);
090: }
091: }
092:
093: /**
094: * See {@link org.jicarilla.plumbing.Stage#take()}.
095: *
096: * @return the message retrieved
097: * @throws InterruptedException if the current thread has been
098: * {@link java.lang.Thread#interrupt()}ed
099: */
100: public Object take() throws InterruptedException {
101: final Object o = super .take();
102: process(o);
103: return o;
104: }
105:
106: /**
107: * See {@link org.jicarilla.plumbing.Stage#poll(long)}.
108: *
109: * @param pollingTime how long to try and fetch before giving up
110: * @return the message retrieved, or null if none was retrieved within
111: * the specified time interval
112: * @throws InterruptedException if the current thread has been
113: * {@link java.lang.Thread#interrupt()}ed
114: */
115: public Object poll(final long pollingTime)
116: throws InterruptedException {
117: final Object o = super .poll(pollingTime);
118: process(o);
119: return o;
120: }
121:
122: /**
123: * Runs the message through the set of post processing stages.
124: *
125: * @param o the message to process
126: * @throws InterruptedException if the current thread has been
127: * {@link java.lang.Thread#interrupt()}ed
128: */
129: protected void process(final Object o) throws InterruptedException {
130: synchronized (m_mutex) {
131: Object ob = o;
132:
133: final Iterator it = m_stages.iterator();
134: while (it.hasNext()) {
135: final Stage stage = (Stage) it.next();
136: stage.put(ob);
137: ob = stage.take();
138: }
139: }
140: }
141: }
|