001: /*
002: * $Header: /home/cvs/jakarta-tomcat-4.0/catalina/src/share/org/apache/catalina/cluster/MulticastReceiver.java,v 1.5 2002/01/03 08:52:56 remm Exp $
003: * $Revision: 1.5 $
004: * $Date: 2002/01/03 08:52:56 $
005: *
006: * ====================================================================
007: *
008: * The Apache Software License, Version 1.1
009: *
010: * Copyright (c) 1999 The Apache Software Foundation. All rights
011: * reserved.
012: *
013: * Redistribution and use in source and binary forms, with or without
014: * modification, are permitted provided that the following conditions
015: * are met:
016: *
017: * 1. Redistributions of source code must retain the above copyright
018: * notice, this list of conditions and the following disclaimer.
019: *
020: * 2. Redistributions in binary form must reproduce the above copyright
021: * notice, this list of conditions and the following disclaimer in
022: * the documentation and/or other materials provided with the
023: * distribution.
024: *
025: * 3. The end-user documentation included with the redistribution, if
026: * any, must include the following acknowlegement:
027: * "This product includes software developed by the
028: * Apache Software Foundation (http://www.apache.org/)."
029: * Alternately, this acknowlegement may appear in the software itself,
030: * if and wherever such third-party acknowlegements normally appear.
031: *
032: * 4. The names "The Jakarta Project", "Tomcat", and "Apache Software
033: * Foundation" must not be used to endorse or promote products derived
034: * from this software without prior written permission. For written
035: * permission, please contact apache@apache.org.
036: *
037: * 5. Products derived from this software may not be called "Apache"
038: * nor may "Apache" appear in their names without prior written
039: * permission of the Apache Group.
040: *
041: * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
042: * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
043: * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
044: * DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
045: * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
046: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
047: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
048: * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
049: * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
050: * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
051: * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
052: * SUCH DAMAGE.
053: * ====================================================================
054: *
055: * This software consists of voluntary contributions made by many
056: * individuals on behalf of the Apache Software Foundation. For more
057: * information on the Apache Software Foundation, please see
058: * <http://www.apache.org/>.
059: *
060: * [Additional notices, if required by prior licensing conditions]
061: *
062: */
063:
064: package org.apache.catalina.cluster;
065:
066: import java.net.DatagramPacket;
067: import java.net.InetAddress;
068: import java.net.MulticastSocket;
069: import java.io.InputStream;
070: import java.io.OutputStream;
071: import java.io.BufferedOutputStream;
072: import java.io.ByteArrayInputStream;
073: import java.io.ByteArrayOutputStream;
074: import java.io.IOException;
075: import java.io.ObjectInputStream;
076: import java.io.ObjectOutputStream;
077: import java.io.ObjectStreamClass;
078: import java.util.Vector;
079:
080: /**
081: * This class is responsible for checking for incoming multicast
082: * data and determine if the data belongs to us and if so push
083: * it onto an internal stack and let it be picked up when needed.
084: *
085: * @author Bip Thelin
086: * @version $Revision: 1.5 $, $Date: 2002/01/03 08:52:56 $
087: */
088:
089: public final class MulticastReceiver extends ClusterSessionBase
090: implements ClusterReceiver {
091:
092: // ----------------------------------------------------- Instance Variables
093:
094: /**
095: * The unique message ID
096: */
097: private static String senderId = null;
098:
099: /**
100: * The MulticastSocket to use
101: */
102: private MulticastSocket multicastSocket = null;
103:
104: /**
105: * Our Thread name
106: */
107: private String threadName = "MulticastReceiver";
108:
109: /**
110: * The name of our component, used for logging.
111: */
112: private String receiverName = "MulticastReceiver";
113:
114: /**
115: * The stack that keeps incoming requests
116: */
117: private static Vector stack = new Vector();
118:
119: /**
120: * Has this component been started?
121: */
122: private boolean started = false;
123:
124: /**
125: * The background thread.
126: */
127: private Thread thread = null;
128:
129: /**
130: * The background thread completion semaphore.
131: */
132: protected boolean threadDone = false;
133:
134: /**
135: * The interval for the background thread to sleep
136: */
137: private int checkInterval = 5;
138:
139: // --------------------------------------------------------- Public Methods
140:
141: /**
142: * Create a new MulticastReceiver.
143: *
144: * @param senderId The unique senderId
145: * @param multicastSocket The MulticastSocket to use
146: */
147: MulticastReceiver(String senderId, MulticastSocket multicastSocket,
148: InetAddress multicastAddress, int multicastPort) {
149: this .multicastSocket = multicastSocket;
150: this .senderId = senderId;
151: }
152:
153: /**
154: * Return a <code>String</code> containing the name of this
155: * implementation, used for logging
156: *
157: * @return The name of the implementation
158: */
159: public String getName() {
160: return (this .receiverName);
161: }
162:
163: /**
164: * Set the time in seconds for this component to
165: * Sleep before it checks for new received data in the Cluster
166: *
167: * @param checkInterval The time to sleep
168: */
169: public void setCheckInterval(int checkInterval) {
170: this .checkInterval = checkInterval;
171: }
172:
173: /**
174: * Get the time in seconds this Cluster sleeps
175: *
176: * @return The time in seconds this Cluster sleeps
177: */
178: public int getCheckInterval() {
179: return (this .checkInterval);
180: }
181:
182: /**
183: * Receive the objects currently in our stack and clear
184: * if afterwards.
185: *
186: * @return An array with objects
187: */
188: public Object[] getObjects() {
189: synchronized (stack) {
190: Object[] objs = stack.toArray();
191: stack.removeAllElements();
192: return (objs);
193: }
194: }
195:
196: /**
197: * Start our component
198: */
199: public void start() {
200: started = true;
201:
202: // Start the background reaper thread
203: threadStart();
204: }
205:
206: /**
207: * Stop our component
208: */
209: public void stop() {
210: started = false;
211:
212: // Stop the background reaper thread
213: threadStop();
214: }
215:
216: // -------------------------------------------------------- Private Methods
217:
218: /**
219: * Check our multicast socket for new data and determine if the
220: * data matches us(senderId) and if so push it onto the stack,
221: */
222: private void receive() {
223: try {
224: byte[] buf = new byte[5000];
225: DatagramPacket recv = new DatagramPacket(buf, buf.length);
226: ByteArrayInputStream ips = null;
227: ObjectInputStream ois = null;
228:
229: multicastSocket.receive(recv);
230: ips = new ByteArrayInputStream(buf, 0, buf.length);
231: ois = new ObjectInputStream(ips);
232: ReplicationWrapper obj = (ReplicationWrapper) ois
233: .readObject();
234:
235: if (obj.getSenderId().equals(this .senderId))
236: stack.add(obj);
237: } catch (IOException e) {
238: log("An error occurred when trying to replicate: "
239: + e.toString());
240: } catch (ClassNotFoundException e) {
241: log("An error occurred when trying to replicate: "
242: + e.toString());
243: }
244: }
245:
246: // ------------------------------------------------------ Background Thread
247:
248: /**
249: * The background thread.
250: */
251: public void run() {
252: // Loop until the termination semaphore is set
253: while (!threadDone) {
254: receive();
255: threadSleep();
256: }
257: }
258:
259: /**
260: * Sleep for the duration specified by the <code>checkInterval</code>
261: * property.
262: */
263: private void threadSleep() {
264: try {
265: Thread.sleep(checkInterval * 1000L);
266: } catch (InterruptedException e) {
267: ;
268: }
269: }
270:
271: /**
272: * Start the background thread.
273: */
274: private void threadStart() {
275: if (thread != null)
276: return;
277:
278: threadDone = false;
279: threadName = threadName + "[" + senderId + "]";
280: thread = new Thread(this , threadName);
281: thread.setDaemon(true);
282: thread.start();
283: }
284:
285: /**
286: * Stop the background thread.
287: */
288: private void threadStop() {
289: if (thread == null)
290: return;
291:
292: threadDone = true;
293: thread.interrupt();
294: try {
295: thread.join();
296: } catch (InterruptedException e) {
297: ;
298: }
299:
300: thread = null;
301: }
302: }
|