001: /* ====================================================================
002: * The Apache Software License, Version 1.1
003: *
004: * Copyright (c) 1997-2003 The Apache Software Foundation. All rights
005: * reserved.
006: *
007: * Redistribution and use in source and binary forms, with or without
008: * modification, are permitted provided that the following conditions
009: * are met:
010: *
011: * 1. Redistributions of source code must retain the above copyright
012: * notice, this list of conditions and the following disclaimer.
013: *
014: * 2. Redistributions in binary form must reproduce the above copyright
015: * notice, this list of conditions and the following disclaimer in
016: * the documentation and/or other materials provided with the
017: * distribution.
018: *
019: * 3. The end-user documentation included with the redistribution,
020: * if any, must include the following acknowledgment:
021: * "This product includes software developed by the
022: * Apache Software Foundation (http://www.apache.org/)."
023: * Alternately, this acknowledgment may appear in the software
024: * itself, if and wherever such third-party acknowledgments
025: * normally appear.
026: *
027: * 4. The names "Avalon", and "Apache Software Foundation"
028: * must not be used to endorse or promote products derived from this
029: * software without prior written permission. For written
030: * permission, please contact apache@apache.org.
031: *
032: * 5. Products derived from this software may not be called "Apache",
033: * nor may "Apache" appear in their name, without prior written
034: * permission of the Apache Software Foundation.
035: *
036: * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
037: * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
038: * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
039: * DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
040: * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
041: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
042: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
043: * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
044: * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
045: * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
046: * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
047: * SUCH DAMAGE.
048: * ====================================================================
049: *
050: * This software consists of voluntary contributions made by many
051: * individuals on behalf of the Apache Software Foundation. For more
052: * information on the Apache Software Foundation, please see
053: * <http://www.apache.org/>.
054: */
055: package org.jicarilla.plumbing;
056:
057: import org.jicarilla.plumbing.Multicaster;
058: import org.jicarilla.plumbing.Sink;
059: import org.jicarilla.lang.Assert;
060:
061: import java.lang.reflect.InvocationTargetException;
062: import java.util.ArrayList;
063: import java.util.List;
064:
065: /**
066: * A {@link Multicaster} which clones {@link Cloneable} messages before sending
067: * them to all its sinks, but makes no attempt to do so for objects that do not
068: * implement Cloneable.
069: *
070: * @author <a href="lsimons at jicarilla dot org">Leo Simons</a>
071: * @version $Id: SimpleMulticaster.java,v 1.4 2003/11/18 16:09:11 lsimons Exp
072: * $
073: */
074: public class SimpleMulticaster implements Multicaster {
075: /** the referenced sinks. */
076: protected final List m_sinks;
077: /** synchronization point for {@link m_sinks}. */
078: protected final Object m_mutex = new Object();
079:
080: /**
081: * Create a new instance backed by an {@link ArrayList}.
082: */
083: public SimpleMulticaster() {
084: m_sinks = new ArrayList();
085: }
086:
087: /**
088: * Add a sink.
089: *
090: * @param sink the sink to add
091: */
092: public void addSink(final Sink sink) {
093: Assert.assertNotNull(sink);
094: synchronized (m_mutex) {
095: m_sinks.add(sink);
096: }
097: }
098:
099: /**
100: * See {@link org.jicarilla.plumbing.Stage#put(Object)}.
101: *
102: * @param o the object to add
103: * @throws InterruptedException if the current thread has been
104: * {@link java.lang.Thread#interrupt()}ed
105: */
106: public void put(final Object o) throws InterruptedException {
107: synchronized (m_mutex) {
108: for (int i = 0; i < m_sinks.size(); i++) {
109: final Object puttableObject = tryClone(o);
110: ((Sink) m_sinks.get(i)).put(puttableObject);
111: }
112: }
113: }
114:
115: /**
116: * See {@link org.jicarilla.plumbing.Stage#offer(Object,long)}.
117: *
118: * @param o the object to add
119: * @param l how long to try adding the object before returning
120: * @return true if the object was added, false if it wasn't
121: * @throws InterruptedException if the current thread has been
122: * {@link java.lang.Thread#interrupt()}ed
123: */
124: public boolean offer(final Object o, final long l)
125: throws InterruptedException {
126: boolean accept = false;
127:
128: synchronized (m_mutex) {
129: for (int i = 0; i < m_sinks.size(); i++) {
130: final Object puttableObject = tryClone(o);
131: final boolean accepted = ((Sink) m_sinks.get(i)).offer(
132: puttableObject, l);
133: accept = (accept || accepted);
134: }
135: }
136:
137: return accept;
138: }
139:
140: /**
141: * Try and create a clone of an object. If the object is not {@link
142: * Cloneable} or if the cloning fails, don't clone.
143: *
144: * @param o the object to clone
145: * @return the new clone, or the object itself if the object wasn't cloned
146: */
147: protected Object tryClone(final Object o) {
148: if (o instanceof Cloneable) {
149: try {
150: final Object clone = o.getClass().getMethod("clone",
151: new Class[0]).invoke(o, new Object[0]);
152: return clone;
153: } catch (NoSuchMethodException nsme) {
154: } catch (IllegalAccessException iae) {
155: } catch (InvocationTargetException ite) {
156: }
157: }
158:
159: return o;
160: }
161: }
|