001: /* ====================================================================
002: * The Apache Software License, Version 1.1
003: *
004: * Copyright (c) 1997-2003 The Apache Software Foundation. All rights
005: * reserved.
006: *
007: * Redistribution and use in source and binary forms, with or without
008: * modification, are permitted provided that the following conditions
009: * are met:
010: *
011: * 1. Redistributions of source code must retain the above copyright
012: * notice, this list of conditions and the following disclaimer.
013: *
014: * 2. Redistributions in binary form must reproduce the above copyright
015: * notice, this list of conditions and the following disclaimer in
016: * the documentation and/or other materials provided with the
017: * distribution.
018: *
019: * 3. The end-user documentation included with the redistribution,
020: * if any, must include the following acknowledgment:
021: * "This product includes software developed by the
022: * Apache Software Foundation (http://www.apache.org/)."
023: * Alternately, this acknowledgment may appear in the software
024: * itself, if and wherever such third-party acknowledgments
025: * normally appear.
026: *
027: * 4. The names "Avalon", and "Apache Software Foundation"
028: * must not be used to endorse or promote products derived from this
029: * software without prior written permission. For written
030: * permission, please contact apache@apache.org.
031: *
032: * 5. Products derived from this software may not be called "Apache",
033: * nor may "Apache" appear in their name, without prior written
034: * permission of the Apache Software Foundation.
035: *
036: * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
037: * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
038: * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
039: * DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
040: * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
041: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
042: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
043: * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
044: * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
045: * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
046: * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
047: * SUCH DAMAGE.
048: * ====================================================================
049: *
050: * This software consists of voluntary contributions made by many
051: * individuals on behalf of the Apache Software Foundation. For more
052: * information on the Apache Software Foundation, please see
053: * <http://www.apache.org/>.
054: */
055: package org.jicarilla.plumbing;
056:
057: import org.jicarilla.lang.Assert;
058: import org.jicarilla.lang.Selector;
059: import org.jicarilla.lang.SelectorSwitch;
060: import org.jicarilla.lang.Switch;
061:
062: import java.util.Iterator;
063: import java.util.List;
064:
065: /**
066: * A screener which tries sinks in the order they were added, or falls back on
067: * the sink that was added last when none of the sinks will accept a message.
068: *
069: * @author <a href="lsimons at jicarilla dot org">Leo Simons</a>
070: * @version $Id: SimpleScreener.java,v 1.2 2004/03/23 13:37:58 lsimons Exp $
071: */
072: public class SimpleScreener implements Screener {
073: protected final Switch m_switch;
074: /** synchronization point for {@link m_switch}. */
075: protected final Object m_mutex = new Object();
076:
077: /**
078: * Create a new instance backed by a {@link SelectorSwitch}.
079: */
080: public SimpleScreener() {
081: m_switch = new SelectorSwitch();
082: }
083:
084: /**
085: * Add a sink.
086: *
087: * @param sink the sink to add
088: * @param selector the selector that determines whether this sink will
089: * receive a particular message
090: */
091: public void addSink(final Selector selector, final Sink sink) {
092: Assert.assertNotNull(selector);
093: Assert.assertNotNull(sink);
094:
095: synchronized (m_mutex) {
096: m_switch.put(selector, sink);
097: }
098: }
099:
100: /**
101: * See {@link org.jicarilla.plumbing.Stage#put(Object)}.
102: *
103: * @param o the object to add
104: * @throws InterruptedException if the current thread has been
105: * {@link java.lang.Thread#interrupt()}ed
106: */
107: public void put(final Object o) throws InterruptedException {
108: final Sink sink = selectSink(o);
109: sink.put(o);
110: }
111:
112: /**
113: * See {@link org.jicarilla.plumbing.Stage#offer(Object,long)}.
114: *
115: * @param o the object to add
116: * @param l how long to try adding the object before returning
117: * @return true if the object was added, false if it wasn't
118: * @throws InterruptedException if the current thread has been
119: * {@link java.lang.Thread#interrupt()}ed
120: */
121: public boolean offer(final Object o, final long l)
122: throws InterruptedException {
123: final Sink sink = selectSink(o);
124: return sink.offer(o, l);
125: }
126:
127: /**
128: * Select a sink from all referenced sinks based on the provided selection
129: * criterium.
130: *
131: * @param o the selection criterium
132: * @return the selected sink
133: * @throws IllegalStateException if no sink can be found for this
134: * message
135: */
136: protected Sink selectSink(final Object o) {
137: synchronized (m_mutex) {
138: Iterator it = m_switch.keySet().iterator();
139: while (it.hasNext()) {
140: Selector selector = (Selector) it.next();
141: final boolean selected = selector.select(o);
142: if (selected)
143: return (Sink) m_switch.get(selector);
144: }
145:
146: List switchList = m_switch.entryList();
147: Switch.Entry entry = (Switch.Entry) switchList.get(m_switch
148: .size() - 1);
149: return (Sink) entry.getValue();
150: }
151: }
152: }
|