001: /* ====================================================================
002: The Jicarilla Software License
003:
004: Copyright (c) 2003 Leo Simons.
005: All rights reserved.
006:
007: Permission is hereby granted, free of charge, to any person obtaining
008: a copy of this software and associated documentation files (the
009: "Software"), to deal in the Software without restriction, including
010: without limitation the rights to use, copy, modify, merge, publish,
011: distribute, sublicense, and/or sell copies of the Software, and to
012: permit persons to whom the Software is furnished to do so, subject to
013: the following conditions:
014:
015: The above copyright notice and this permission notice shall be
016: included in all copies or substantial portions of the Software.
017:
018: THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
019: EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
020: MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
021: IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
022: CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
023: TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
024: SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
025: ==================================================================== */
026: package org.jicarilla.plumbing;
027:
028: import EDU.oswego.cs.dl.util.concurrent.Channel;
029:
030: import java.util.ArrayList;
031: import java.util.Iterator;
032: import java.util.List;
033:
034: import org.jicarilla.plumbing.Sink;
035: import org.jicarilla.plumbing.Stage;
036: import org.jicarilla.plumbing.DefaultStage;
037: import org.jicarilla.lang.Assert;
038:
039: /**
040: * A Preprocessor runs a series of plumbing over a message before enqueueing it:
041: *
042: * <pre>
043: * +-----------------------------------------+
044: * |Preprocessor |
045: * put() |+------+ +------+ +------+ +---------+ take()
046: * ------>|Stage1|-->|Stage2|-->|Stage3|-->| Channel |----->
047: * |+------+ +------+ +------+ +---------+
048: * +-----------------------------------------+
049: * </pre>
050: *
051: * Use a Preprocessor when it does not make sense to put any kind of
052: * (asynchronous) buffering between plumbing and it is acceptable for put() to
053: * take a long time.
054: *
055: * @author <a href="lsimons at jicarilla dot org">Leo Simons</a>
056: * @version $Id: PreProcessor.java,v 1.2 2004/03/23 13:37:58 lsimons Exp $
057: */
058: public class PreProcessor extends DefaultStage {
059: /** The stages to run before returning from {@link #put(Object)}. */
060: protected final List m_stages;
061: /**
062: * The mutex to synchronize on for operations involving
063: * {@link m_stages}.
064: */
065: protected final Object m_mutex = new Object();
066:
067: /**
068: * Create a new instance.
069: *
070: * @param channel the channel to pass to {@link DefaultStage}
071: * @param errorHandler the error handler to pass to {@link DefaultStage}
072: */
073: public PreProcessor(final Channel channel, final Sink errorHandler) {
074: super (channel, errorHandler);
075: m_stages = new ArrayList();
076: }
077:
078: /**
079: * Add a stage to the processor. It's appended at the end of the current
080: * set of stages.
081: *
082: * @param stage the stage to add
083: */
084: public void addStage(final Stage stage) {
085: Assert.assertNotNull(stage);
086: synchronized (m_mutex) {
087: m_stages.add(stage);
088: }
089: }
090:
091: /**
092: * See {@link Stage#put(Object)}.
093: *
094: * @param o the object to add
095: * @throws InterruptedException if the current thread has been
096: * {@link java.lang.Thread#interrupt()}ed
097: */
098: public void put(final Object o) throws InterruptedException {
099: process(o);
100: super .put(o);
101: }
102:
103: /**
104: * See {@link org.jicarilla.plumbing.Stage#offer(Object,long)}.
105: *
106: * @param o the object to add
107: * @param l how long to try adding the object before returning
108: * @return true if the object was added, false if it wasn't
109: * @throws InterruptedException if the current thread has been
110: * {@link java.lang.Thread#interrupt()}ed
111: */
112: public boolean offer(final Object o, final long l)
113: throws InterruptedException {
114: process(o);
115: return super .offer(o, l);
116: }
117:
118: /**
119: * Runs the message through the set of post processing stages.
120: *
121: * @param o the message to process
122: * @throws InterruptedException if the current thread has been
123: * {@link java.lang.Thread#interrupt()}ed
124: */
125: protected void process(final Object o) throws InterruptedException {
126: synchronized (m_mutex) {
127:
128: Object ob = o;
129:
130: final Iterator it = m_stages.iterator();
131: while (it.hasNext()) {
132: final Stage stage = (Stage) it.next();
133: stage.put(ob);
134: ob = stage.take();
135: }
136: }
137: }
138: }
|