001: /*
002: * $Id: FunctionalStreamingTestComponent.java 10529 2008-01-25 05:58:36Z dfeist $
003: * --------------------------------------------------------------------------------------
004: * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com
005: *
006: * The software in this package is published under the terms of the CPAL v1.0
007: * license, a copy of which has been included with this distribution in the
008: * LICENSE.txt file.
009: */
010:
011: package org.mule.tck.functional;
012:
013: import org.mule.api.MuleEventContext;
014: import org.mule.api.lifecycle.Callable;
015: import org.mule.util.ClassUtils;
016: import org.mule.util.StringMessageUtils;
017:
018: import java.io.IOException;
019: import java.io.InputStream;
020:
021: import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
022:
023: import org.apache.commons.logging.Log;
024: import org.apache.commons.logging.LogFactory;
025:
026: /**
027: * A service that can be used by streaming functional tests. This service accepts an
028: * EventCallback that can be used to assert the state of the current event. To access the
029: * service when embedded in an (XML) model, make sure that the descriptor sets the
030: * singleton attribute true - see uses in TCP and FTP.
031: *
032: * Note that although this implements the full StreamingService interface, nothing is
033: * written to the output stream - this is intended as a final sink.
034: *
035: * @see org.mule.tck.functional.EventCallback
036: */
037:
038: public class FunctionalStreamingTestComponent implements Callable {
039: protected transient Log logger = LogFactory.getLog(getClass());
040:
041: private static AtomicInteger count = new AtomicInteger(0);
042: private int number = count.incrementAndGet();
043:
044: public static final int STREAM_SAMPLE_SIZE = 4;
045: public static final int STREAM_BUFFER_SIZE = 4096;
046: private EventCallback eventCallback;
047: private String summary = null;
048: private long targetSize = -1;
049:
050: public FunctionalStreamingTestComponent() {
051: logger.debug("creating " + toString());
052: }
053:
054: public void setEventCallback(EventCallback eventCallback,
055: long targetSize) {
056: logger.debug("setting callback: " + eventCallback + " in "
057: + toString());
058: this .eventCallback = eventCallback;
059: this .targetSize = targetSize;
060: }
061:
062: public String getSummary() {
063: return summary;
064: }
065:
066: public int getNumber() {
067: return number;
068: }
069:
070: public Object onCall(MuleEventContext context) throws Exception {
071: InputStream in = (InputStream) context.getMessage().getPayload(
072: InputStream.class);
073: try {
074: logger.debug("arrived at " + toString());
075: byte[] startData = new byte[STREAM_SAMPLE_SIZE];
076: long startDataSize = 0;
077: byte[] endData = new byte[STREAM_SAMPLE_SIZE]; // ring buffer
078: long endDataSize = 0;
079: long endRingPointer = 0;
080: long streamLength = 0;
081: byte[] buffer = new byte[STREAM_BUFFER_SIZE];
082:
083: // throw data on the floor, but keep a record of size, start and end values
084: long bytesRead = 0;
085: while (bytesRead >= 0) {
086: bytesRead = read(in, buffer);
087: if (bytesRead > 0) {
088: if (logger.isDebugEnabled()) {
089: logger.debug("read " + bytesRead + " bytes");
090: }
091:
092: streamLength += bytesRead;
093: long startOfEndBytes = 0;
094: for (long i = 0; startDataSize < STREAM_SAMPLE_SIZE
095: && i < bytesRead; ++i) {
096: startData[(int) startDataSize++] = buffer[(int) i];
097: ++startOfEndBytes; // skip data included in startData
098: }
099: startOfEndBytes = Math.max(startOfEndBytes,
100: bytesRead - STREAM_SAMPLE_SIZE);
101: for (long i = startOfEndBytes; i < bytesRead; ++i) {
102: ++endDataSize;
103: endData[(int) (endRingPointer++ % STREAM_SAMPLE_SIZE)] = buffer[(int) i];
104: }
105: if (streamLength >= targetSize) {
106: doCallback(startData, startDataSize, endData,
107: endDataSize, endRingPointer,
108: streamLength, context);
109: }
110: }
111: }
112:
113: in.close();
114: } catch (Exception e) {
115: in.close();
116:
117: e.printStackTrace();
118: if (logger.isDebugEnabled()) {
119: logger.debug(e);
120: }
121: throw e;
122: }
123:
124: return null;
125: }
126:
127: protected int read(InputStream in, byte[] buffer)
128: throws IOException {
129: return in.read(buffer);
130: }
131:
132: private void doCallback(byte[] startData, long startDataSize,
133: byte[] endData, long endDataSize, long endRingPointer,
134: long streamLength, MuleEventContext context)
135: throws Exception {
136: // make a nice summary of the data
137: StringBuffer result = new StringBuffer("Received stream");
138: result.append("; length: ");
139: result.append(streamLength);
140: result.append("; '");
141:
142: for (long i = 0; i < startDataSize; ++i) {
143: result.append((char) startData[(int) i]);
144: }
145:
146: long endSize = Math.min(endDataSize, STREAM_SAMPLE_SIZE);
147: if (endSize > 0) {
148: result.append("...");
149: for (long i = 0; i < endSize; ++i) {
150: result
151: .append((char) endData[(int) ((endRingPointer + i) % STREAM_SAMPLE_SIZE)]);
152: }
153: }
154: result.append("'");
155:
156: summary = result.toString();
157:
158: String msg = StringMessageUtils.getBoilerPlate(
159: "Message Received in service: "
160: + context.getService().getName() + ". "
161: + summary + "\n callback: " + eventCallback,
162: '*', 80);
163:
164: logger.info(msg);
165:
166: if (eventCallback != null) {
167: eventCallback.eventReceived(context, this );
168: }
169: }
170:
171: public String toString() {
172: return ClassUtils.getSimpleName(getClass()) + "/" + number;
173: }
174:
175: }
|