001: /*
002: * hgcommons 7
003: * Hammurapi Group Common Library
004: * Copyright (C) 2003 Hammurapi Group
005: *
006: * This program is free software; you can redistribute it and/or
007: * modify it under the terms of the GNU Lesser General Public
008: * License as published by the Free Software Foundation; either
009: * version 2 of the License, or (at your option) any later version.
010: *
011: * This program is distributed in the hope that it will be useful,
012: * but WITHOUT ANY WARRANTY; without even the implied warranty of
013: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
014: * Lesser General Public License for more details.
015: *
016: * You should have received a copy of the GNU Lesser General Public
017: * License along with this library; if not, write to the Free Software
018: * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
019: *
020: * URL: http://www.hammurapi.biz/hammurapi-biz/ef/xmenu/hammurapi-group/products/products/hgcommons/index.html
021: * e-Mail: support@hammurapi.biz
022: */
023:
024: package biz.hammurapi.util;
025:
026: import java.io.IOException;
027: import java.io.InputStream;
028: import java.io.OutputStream;
029: import java.util.ArrayList;
030: import java.util.List;
031:
032: /**
033: *
034: * Copies all data from an input stream to an output stream.
035: * @author Pavel Vlasov
036: * @version $Revision: 1.3 $
037: */
038: public class StreamPumper implements Runnable {
039:
040: private final static int SIZE = 1024;
041: private InputStream is;
042: private OutputStream os;
043:
044: private ExceptionSink sink;
045:
046: private boolean closeStreams = false;
047:
048: /**
049: * Create a new stream pumper.
050: *
051: * @param is input stream to read data from
052: * @param os output stream to write data to.
053: */
054: public StreamPumper(InputStream is, OutputStream os,
055: ExceptionSink sink, boolean closeStreams) {
056: this .is = is;
057: this .os = os;
058: this .closeStreams = closeStreams;
059: this .sink = sink;
060: }
061:
062: private List listeners = new ArrayList();
063:
064: public void addListener(StreamPumpListener listener, int tickSize) {
065: synchronized (listeners) {
066: listeners.add(new StreamPumpListenerEntry(listener,
067: tickSize));
068: }
069: }
070:
071: public void removeListener(StreamPumpListener listener) {
072: synchronized (listeners) {
073: listeners.remove(listener);
074: }
075: }
076:
077: /**
078: * Copies data from the input stream to the output stream.
079: * Creates a copy of listeners collection before pumping.
080: * addListener() and removeListener() have no effect once pumping has started.
081: * Terminates as soon as the input stream is closed or an error occurs.
082: */
083: public void run() {
084: StreamPumpListenerEntry[] listenersArray;
085: synchronized (listeners) {
086: listenersArray = (StreamPumpListenerEntry[]) listeners
087: .toArray(new StreamPumpListenerEntry[listeners
088: .size()]);
089: }
090:
091: for (int i = 0; i < listenersArray.length; i++) {
092: listenersArray[i].listener.pumpStarted(this );
093: }
094:
095: long counter = 0;
096: try {
097: final byte[] buf = new byte[SIZE];
098: int length;
099: while ((length = is.read(buf)) != -1) {
100: os.write(buf, 0, length);
101: counter += length;
102: for (int i = 0; i < listenersArray.length; i++) {
103: listenersArray[i].counter += length;
104: if (listenersArray[i].counter >= listenersArray[i].tickSize) {
105: listenersArray[i].counter = 0;
106: listenersArray[i].listener.tick(this , counter);
107: }
108: }
109: }
110: } catch (IOException e) {
111: handleException(e, listenersArray);
112: } finally {
113: for (int i = 0; i < listenersArray.length; i++) {
114: listenersArray[i].listener.pumpFinished(this );
115: }
116: if (closeStreams) {
117: try {
118: is.close();
119: } catch (IOException ie) {
120: handleException(ie, listenersArray);
121: }
122:
123: try {
124: os.close();
125: } catch (IOException ie) {
126: handleException(ie, listenersArray);
127: }
128: }
129: }
130: }
131:
132: /**
133: * @param e
134: * @param listenersArray
135: */
136: private void handleException(Exception e,
137: StreamPumpListenerEntry[] listenersArray) {
138: for (int i = 0; i < listenersArray.length; i++) {
139: listenersArray[i].listener.pumpingError(this, e);
140: }
141:
142: if (sink == null) {
143: e.printStackTrace();
144: } else {
145: sink.consume(this, e);
146: }
147: }
148: }
|