001: /* ====================================================================
002: * The Apache Software License, Version 1.1
003: *
004: * Copyright (c) 1997-2003 The Apache Software Foundation. All rights
005: * reserved.
006: *
007: * Redistribution and use in source and binary forms, with or without
008: * modification, are permitted provided that the following conditions
009: * are met:
010: *
011: * 1. Redistributions of source code must retain the above copyright
012: * notice, this list of conditions and the following disclaimer.
013: *
014: * 2. Redistributions in binary form must reproduce the above copyright
015: * notice, this list of conditions and the following disclaimer in
016: * the documentation and/or other materials provided with the
017: * distribution.
018: *
019: * 3. The end-user documentation included with the redistribution,
020: * if any, must include the following acknowledgment:
021: * "This product includes software developed by the
022: * Apache Software Foundation (http://www.apache.org/)."
023: * Alternately, this acknowledgment may appear in the software
024: * itself, if and wherever such third-party acknowledgments
025: * normally appear.
026: *
027: * 4. The names "Avalon", and "Apache Software Foundation"
028: * must not be used to endorse or promote products derived from this
029: * software without prior written permission. For written
030: * permission, please contact apache@apache.org.
031: *
032: * 5. Products derived from this software may not be called "Apache",
033: * nor may "Apache" appear in their name, without prior written
034: * permission of the Apache Software Foundation.
035: *
036: * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
037: * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
038: * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
039: * DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
040: * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
041: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
042: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
043: * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
044: * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
045: * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
046: * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
047: * SUCH DAMAGE.
048: * ====================================================================
049: *
050: * This software consists of voluntary contributions made by many
051: * individuals on behalf of the Apache Software Foundation. For more
052: * information on the Apache Software Foundation, please see
053: * <http://www.apache.org/>.
054: */
055: package org.jicarilla.plumbing;
056:
057: import org.jicarilla.plumbing.Sink;
058: import org.jicarilla.plumbing.Alternator;
059: import org.jicarilla.lang.Assert;
060:
061: import java.util.ArrayList;
062: import java.util.List;
063:
064: /**
065: * A list-backed {@link org.jicarilla.plumbing.Alternator} which uses the priority as a weight. In
066: * other words, a sink with priority <i>2*n</i> has twice the chance of being
067: * selected compared to a sink with priority <i>n</i>).
068: *
069: * @author <a href="lsimons at jicarilla dot org">Leo Simons</a>
070: * @version $Id: SimpleAlternator.java,v 1.2 2004/03/23 13:37:58 lsimons Exp $
071: */
072: public class SimpleAlternator implements Alternator {
073: /** the sinks to alternate through. */
074: protected final List m_sinks;
075: /** synchronization point for {@link m_sinks}. */
076: protected final Object m_mutex = new Object();
077:
078: /**
079: * Create a new instance backed by an {@link ArrayList}.
080: */
081: public SimpleAlternator() {
082: m_sinks = new ArrayList();
083: }
084:
085: /**
086: * Add a sink with the default priority.
087: *
088: * @param sink the sink to add
089: */
090: public void addSink(final Sink sink) {
091: addSink(sink, DEFAULT_PRIORITY);
092: }
093:
094: /**
095: * Add a sink with the specified priority.
096: *
097: * @param sink the sink to add
098: * @param priority the weight of the sink in determining how often
099: * it receives a message
100: */
101: public void addSink(final Sink sink, final int priority) {
102: Assert.assertNotNull(sink);
103: Assert.assertTrue(priority >= MIN_PRIORITY);
104: Assert.assertTrue(priority <= MAX_PRIORITY);
105:
106: synchronized (m_mutex) {
107: for (int i = 0; i < priority; i++) {
108: m_sinks.add(sink);
109: }
110: }
111: }
112:
113: /**
114: * See {@link org.jicarilla.plumbing.Stage#put(Object)}.
115: *
116: * @param o the object to add
117: * @throws InterruptedException if the current thread has been
118: * {@link java.lang.Thread#interrupt()}ed
119: */
120: public void put(final Object o) throws InterruptedException {
121: final Sink sink = randomSink();
122: sink.put(o);
123: }
124:
125: /**
126: * See {@link org.jicarilla.plumbing.Stage#offer(Object,long)}.
127: *
128: * @param o the object to add
129: * @param l how long to try adding the object before returning
130: * @return true if the object was added, false if it wasn't
131: * @throws InterruptedException if the current thread has been
132: * {@link java.lang.Thread#interrupt()}ed
133: */
134: public boolean offer(final Object o, final long l)
135: throws InterruptedException {
136: final Sink sink = randomSink();
137: return sink.offer(o, l);
138: }
139:
140: /**
141: * Select a random sink from all referenced sinks.
142: *
143: * @return the selected sink
144: */
145: protected Sink randomSink() {
146: synchronized (m_mutex) {
147: final int selected = (int) (Math.random() * m_sinks.size());
148: return (Sink) m_sinks.get(selected);
149: }
150: }
151: }
|