001: /*
002: *
003: * The DbUnit Database Testing Framework
004: * Copyright (C)2002-2004, DbUnit.org
005: *
006: * This library is free software; you can redistribute it and/or
007: * modify it under the terms of the GNU Lesser General Public
008: * License as published by the Free Software Foundation; either
009: * version 2.1 of the License, or (at your option) any later version.
010: *
011: * This library is distributed in the hope that it will be useful,
012: * but WITHOUT ANY WARRANTY; without even the implied warranty of
013: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
014: * Lesser General Public License for more details.
015: *
016: * You should have received a copy of the GNU Lesser General Public
017: * License along with this library; if not, write to the Free Software
018: * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
019: *
020: */
021: package org.dbunit.dataset.stream;
022:
023: import org.slf4j.Logger;
024: import org.slf4j.LoggerFactory;
025:
026: import org.dbunit.DatabaseUnitRuntimeException;
027: import org.dbunit.dataset.*;
028: import org.dbunit.util.concurrent.BoundedBuffer;
029: import org.dbunit.util.concurrent.Channel;
030: import org.dbunit.util.concurrent.Puttable;
031: import org.dbunit.util.concurrent.Takable;
032:
033: /**
034: * @author Manuel Laflamme
035: * @since Apr 17, 2003
036: * @version $Revision: 554 $
037: */
038: public class StreamingIterator implements ITableIterator {
039:
040: /**
041: * Logger for this class
042: */
043: private static final Logger logger = LoggerFactory
044: .getLogger(StreamingIterator.class);
045:
046: private static final Object EOD = new Object(); // end of dataset marker
047:
048: private final Takable _channel;
049: private StreamingTable _activeTable;
050: private Object _taken = null;
051: private boolean _eod = false;
052:
053: public StreamingIterator(IDataSetProducer source)
054: throws DataSetException {
055: Channel channel = new BoundedBuffer(30);
056: _channel = channel;
057:
058: AsynchronousConsumer consumer = new AsynchronousConsumer(
059: source, channel);
060: Thread thread = new Thread(consumer);
061: thread.setDaemon(true);
062: thread.start();
063:
064: // Take first element from asyncronous handler
065: try {
066: _taken = _channel.take();
067: } catch (InterruptedException e) {
068: logger.error("StreamingIterator()", e);
069:
070: throw new DataSetException(e);
071: }
072: }
073:
074: ////////////////////////////////////////////////////////////////////////////
075: // ITableIterator interface
076:
077: public boolean next() throws DataSetException {
078: logger.debug("next() - start");
079:
080: // End of dataset has previously been reach
081: if (_eod) {
082: return false;
083: }
084:
085: // Iterate to the end of current table.
086: while (_activeTable != null && _activeTable.next())
087: ;
088:
089: // End of dataset is reach
090: if (_taken == EOD) {
091: _eod = true;
092: _activeTable = null;
093:
094: // System.out.println("End of iterator! - " + System.currentTimeMillis());
095: return false;
096: }
097:
098: // New table
099: if (_taken instanceof ITableMetaData) {
100: _activeTable = new StreamingTable((ITableMetaData) _taken);
101: return true;
102: }
103:
104: throw new IllegalStateException(
105: "Unexpected object taken from asyncronous handler: "
106: + _taken);
107: }
108:
109: public ITableMetaData getTableMetaData() throws DataSetException {
110: logger.debug("getTableMetaData() - start");
111:
112: return _activeTable.getTableMetaData();
113: }
114:
115: public ITable getTable() throws DataSetException {
116: logger.debug("getTable() - start");
117:
118: return _activeTable;
119: }
120:
121: ////////////////////////////////////////////////////////////////////////////
122: // StreamingTable class
123:
124: private class StreamingTable extends AbstractTable {
125:
126: /**
127: * Logger for this class
128: */
129: private final Logger logger = LoggerFactory
130: .getLogger(StreamingTable.class);
131:
132: private ITableMetaData _metaData;
133: private int _lastRow = -1;
134: private boolean _eot = false;
135: private Object[] _rowValues;
136:
137: public StreamingTable(ITableMetaData metaData) {
138: _metaData = metaData;
139: }
140:
141: boolean next() throws DataSetException {
142: logger.debug("next() - start");
143:
144: // End of table has previously been reach
145: if (_eot) {
146: return false;
147: }
148:
149: try {
150: _taken = _channel.take();
151: if (!(_taken instanceof Object[])) {
152: _eot = true;
153: return false;
154: }
155:
156: _lastRow++;
157: _rowValues = (Object[]) _taken;
158: return true;
159: } catch (InterruptedException e) {
160: logger.error("next()", e);
161:
162: throw new DataSetException();
163: }
164: }
165:
166: ////////////////////////////////////////////////////////////////////////
167: // ITable interface
168:
169: public ITableMetaData getTableMetaData() {
170: logger.debug("getTableMetaData() - start");
171:
172: return _metaData;
173: }
174:
175: public int getRowCount() {
176: logger.debug("getRowCount() - start");
177:
178: throw new UnsupportedOperationException();
179: }
180:
181: public Object getValue(int row, String column)
182: throws DataSetException {
183: logger.debug("getValue(row=" + row + ", column=" + column
184: + ") - start");
185:
186: // Iterate up to specified row
187: while (!_eot && row > _lastRow) {
188: next();
189: }
190:
191: if (row < _lastRow) {
192: throw new UnsupportedOperationException(
193: "Cannot go backward!");
194: }
195:
196: if (_eot || row > _lastRow) {
197: throw new RowOutOfBoundsException(row + " > "
198: + _lastRow);
199: }
200:
201: return _rowValues[getColumnIndex(column)];
202: }
203:
204: }
205:
206: ////////////////////////////////////////////////////////////////////////////
207: // AsynchronousConsumer class
208:
209: private static class AsynchronousConsumer implements Runnable,
210: IDataSetConsumer {
211:
212: /**
213: * Logger for this class
214: */
215: private static final Logger logger = LoggerFactory
216: .getLogger(AsynchronousConsumer.class);
217:
218: private final IDataSetProducer _producer;
219: private final Puttable _channel;
220:
221: public AsynchronousConsumer(IDataSetProducer source,
222: Puttable channel) {
223: _producer = source;
224: _channel = channel;
225: }
226:
227: ////////////////////////////////////////////////////////////////////////
228: // Runnable interface
229:
230: public void run() {
231: logger.debug("run() - start");
232:
233: try {
234: _producer.setConsumer(this );
235: _producer.produce();
236: // System.out.println("End of thread! - " + System.currentTimeMillis());
237: } catch (DataSetException e) {
238: logger.error("run()", e);
239:
240: throw new DatabaseUnitRuntimeException(e);
241: }
242: }
243:
244: ////////////////////////////////////////////////////////////////////////
245: // IDataSetConsumer interface
246:
247: public void startDataSet() throws DataSetException {
248: }
249:
250: public void endDataSet() throws DataSetException {
251: logger.debug("endDataSet() - start");
252:
253: try {
254: _channel.put(EOD);
255: } catch (InterruptedException e) {
256: logger.error("endDataSet()", e);
257:
258: throw new DataSetException();
259: }
260: }
261:
262: public void startTable(ITableMetaData metaData)
263: throws DataSetException {
264: logger.debug("startTable(metaData=" + metaData
265: + ") - start");
266:
267: try {
268: _channel.put(metaData);
269: } catch (InterruptedException e) {
270: logger.error("startTable()", e);
271:
272: throw new DataSetException();
273: }
274: }
275:
276: public void endTable() throws DataSetException {
277: }
278:
279: public void row(Object[] values) throws DataSetException {
280: logger.debug("row(values=" + values + ") - start");
281:
282: try {
283: _channel.put(values);
284: } catch (InterruptedException e) {
285: logger.error("row()", e);
286:
287: throw new DataSetException();
288: }
289: }
290: }
291: }
|