001: /* ----- BEGIN LICENSE BLOCK -----
002: * Version: MPL 1.1
003: *
004: * The contents of this file are subject to the Mozilla Public License Version
005: * 1.1 (the "License"); you may not use this file except in compliance with
006: * the License. You may obtain a copy of the License at
007: * http://www.mozilla.org/MPL/
008: *
009: * Software distributed under the License is distributed on an "AS IS" basis,
010: * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
011: * for the specific language governing rights and limitations under the
012: * License.
013: *
014: * The Original Code is the DataShare server.
015: *
016: * The Initial Developer of the Original Code is
017: * Ball Aerospace & Technologies Corp, Fairborn, Ohio
018: * Portions created by the Initial Developer are Copyright (C) 2001
019: * the Initial Developer. All Rights Reserved.
020: *
021: * Contributor(s): Charles Wood <cwood@ball.com>
022: *
023: * ----- END LICENSE BLOCK ----- */
024: /* RCS $Id: FifoQueue.java,v 1.2 2002/01/29 20:50:17 lizellaman Exp $
025: * $Log: FifoQueue.java,v $
026: * Revision 1.2 2002/01/29 20:50:17 lizellaman
027: * Added LoggingInterface, modified the PropertiesInterface implementation
028: *
029: * Revision 1.1.1.1 2001/10/23 13:37:19 lizellaman
030: * initial sourceforge release
031: *
032: */
033:
034: package org.datashare;
035:
036: import java.util.LinkedList;
037:
038: /**
039: * this class is used where we have serializable objects that we want to pass between two
040: * threads in a FIFO/QUEUE fashion, one thread will source the data, and another thread to sink
041: * the data. NOTE!!!! The read() method will block if no object is available to read.
042: * Note that this class can be used to provide the sink thread to feed a consumer by
043: * calling setConsumer() to give it a class/method to call when data is available.
044: */
045: public class FifoQueue implements Runnable {
046: //private int bufferedObjectCount = 0;
047: private LinkedList list = new LinkedList();
048: private boolean closed = false; // set to true if we close this pipe
049:
050: //private int count = 0;
051: private FifoConsumer consumer = null;
052: private Thread myThread = null;
053:
054: /**
055: * constructor
056: */
057: public FifoQueue() {
058: }
059:
060: /**
061: * used only if a setConsumer() call is made, calls the newFifoDataAvailable method of
062: * the consumer (set in setConsumer) whenever objects are put into the FIFO via the write method.
063: */
064: public void run() {
065: while (!closed) {
066: try {
067: Object object = read();
068: if (consumer != null)
069: consumer.newFifoDataAvailable(object);
070: } catch (Exception e) {
071: SessionUtilities
072: .getLoggingInterface()
073: .debugMsg(
074: SessionUtilities.getLoggingInterface().ERROR,
075: SessionUtilities.getLoggingInterface().GENERALSTATUS,
076: "Problems in FifoQueue, thread named "
077: + this .myThread.getName());
078: e.printStackTrace();
079: }
080: }
081: }
082:
083: /**
084: * used only if this class is to be used as a Thread to wait for data that will then
085: * be supplied to this consumer as it becomes available, will start thread that supplies
086: * the consumer with objects as they are put into the FIFO with the write method.
087: */
088: public void setConsumer(FifoConsumer consumer) {
089: if (this .consumer == null) {
090: this .consumer = consumer;
091: myThread = new Thread(this , "FifoQueueThread-for-"
092: + consumer);
093: myThread.setDaemon(true);
094: myThread.start();
095: } else {
096: SessionUtilities
097: .getLoggingInterface()
098: .debugMsg(
099: SessionUtilities.getLoggingInterface().DEBUG,
100: SessionUtilities.getLoggingInterface().GENERALSTATUS,
101: "FifoQueue.setConsumer, queue already has a consumer-> "
102: + this .consumer);
103: Thread.dumpStack();
104: }
105: }
106:
107: /**
108: * used to put the Objects into a buffer
109: * @param object Object to be added to our FIFO
110: */
111: public void write(Object object) {
112: //SessionUtilities.getLoggingInterface().debugMsg(SessionUtilities.getLoggingInterface().DEBUG,
113: // SessionUtilities.getLoggingInterface().GENERALSTATUS,
114: // "FifoQueue writing an object for thread " + Thread.currentThread().getName());
115: synchronized (list) {
116: list.add(object);
117: list.notifyAll();
118: }
119: }
120:
121: /**
122: * Returns the oldest object in our FIFO, or blocks until an object is available, may issue null object when FifoQueue is closed
123: */
124: public Object read() {
125: Object object = null;
126: if (!closed) {
127: synchronized (list) {
128: // if list is empty, wait for notify
129: if (list.isEmpty()) {
130: try {
131: //SessionUtilities.getLoggingInterface().debugMsg(SessionUtilities.getLoggingInterface().DEBUG,
132: // SessionUtilities.getLoggingInterface().GENERALSTATUS,
133: // "FifoQueue waiting for an object to read by thread " + Thread.currentThread().getName());
134: list.wait();
135: } catch (InterruptedException ie) {
136: SessionUtilities
137: .getLoggingInterface()
138: .debugMsg(
139: SessionUtilities
140: .getLoggingInterface().WARNING,
141: SessionUtilities
142: .getLoggingInterface().GENERALSTATUS,
143: "FifoQueue had interrupted exception...");
144: ie.printStackTrace();
145: }
146: }
147: try {
148: if (!closed) // if closed, we want to issue a null object
149: object = list.remove(0); // remove first element
150: } catch (Exception e) {
151: SessionUtilities
152: .getLoggingInterface()
153: .debugMsg(
154: SessionUtilities
155: .getLoggingInterface().ERROR,
156: SessionUtilities
157: .getLoggingInterface().GENERALSTATUS,
158: "Problems getting object from pipe for reader "
159: + Thread.currentThread()
160: .getName());
161: e.printStackTrace();
162: object = null;
163: }
164: }
165: }
166:
167: if (object == null)
168: SessionUtilities
169: .getLoggingInterface()
170: .debugMsg(
171: SessionUtilities.getLoggingInterface().DEBUG,
172: SessionUtilities.getLoggingInterface().GENERALSTATUS,
173: "FifoQueue issuing null object to thread "
174: + Thread.currentThread().getName()
175: + " (to close queue)");
176:
177: return object;
178: }
179:
180: /**
181: * used to return the number of objects currently in the buffer
182: */
183: public int size() {
184: return list.size();
185: }
186:
187: /**
188: * used when the pipe is no longer needed, will cause a blocking read to release
189: * with a null object returned, will also stop the thread if a consumer was set
190: * with setConsumer().
191: */
192: public void close() {
193: SessionUtilities.getLoggingInterface().debugMsg(
194: SessionUtilities.getLoggingInterface().DEBUG,
195: SessionUtilities.getLoggingInterface().GENERALSTATUS,
196: "Closing FifoQueue");
197: closed = true;
198: consumer = null;
199: reset();
200: synchronized (list) {
201: list.notifyAll(); // should release read and may issue null object
202: }
203: }
204:
205: /**
206: * empties the FIFO
207: */
208: public void reset() {
209: synchronized (list) {
210: if (!list.isEmpty()) {
211: list.clear();
212: }
213: }
214: }
215:
216: }
|