001: /*
002: * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
003: *
004: * Copyright 1997-2007 Sun Microsystems, Inc. All rights reserved.
005: *
006: * The contents of this file are subject to the terms of either the GNU
007: * General Public License Version 2 only ("GPL") or the Common Development
008: * and Distribution License("CDDL") (collectively, the "License"). You
009: * may not use this file except in compliance with the License. You can obtain
010: * a copy of the License at https://glassfish.dev.java.net/public/CDDL+GPL.html
011: * or glassfish/bootstrap/legal/LICENSE.txt. See the License for the specific
012: * language governing permissions and limitations under the License.
013: *
014: * When distributing the software, include this License Header Notice in each
015: * file and include the License file at glassfish/bootstrap/legal/LICENSE.txt.
016: * Sun designates this particular file as subject to the "Classpath" exception
017: * as provided by Sun in the GPL Version 2 section of the License file that
018: * accompanied this code. If applicable, add the following below the License
019: * Header, with the fields enclosed by brackets [] replaced by your own
020: * identifying information: "Portions Copyrighted [year]
021: * [name of copyright owner]"
022: *
023: * Contributor(s):
024: *
025: * If you wish your version of this file to be governed by only the CDDL or
026: * only the GPL Version 2, indicate your decision by adding "[Contributor]
027: * elects to include this software in this distribution under the [CDDL or GPL
028: * Version 2] license." If you don't indicate a single choice of license, a
029: * recipient has the option to distribute your version of this file under
030: * either the CDDL, the GPL Version 2 or to extend the choice of license to
031: * its licensees as provided above. However, if you add GPL Version 2 code
032: * and therefore, elected the GPL Version 2 license, then the option applies
033: * only if the new code is made subject to such option by the copyright
034: * holder.
035: */
036:
037: package com.sun.xml.ws.transport.tcp.io;
038:
039: import com.sun.xml.ws.transport.tcp.pool.LifeCycle;
040: import com.sun.xml.ws.transport.tcp.util.ByteBufferFactory;
041: import com.sun.xml.ws.transport.tcp.util.FrameType;
042: import com.sun.xml.ws.transport.tcp.util.TCPConstants;
043: import java.io.IOException;
044: import java.io.OutputStream;
045: import java.nio.ByteBuffer;
046: import java.nio.channels.SocketChannel;
047: import java.util.HashMap;
048: import java.util.Map;
049:
050: /**
051: * @author Alexey Stashok
052: */
053: public final class FramedMessageOutputStream extends OutputStream
054: implements LifeCycle {
055: private static final int HEADER_BUFFER_SIZE = 10;
056: private boolean useDirectBuffer;
057:
058: private ByteBuffer outputBuffer;
059:
060: private SocketChannel socketChannel;
061: private int frameNumber;
062: private int frameSize;
063: private boolean isFlushLast;
064:
065: // Fragment header attributes
066: private int channelId;
067: private int messageId;
068: private int contentId;
069: private Map<Integer, String> contentProps = new HashMap<Integer, String>(
070: 8);
071: private int payloadlengthLength;
072:
073: /** is message framed or direct mode is used */
074: private boolean isDirectMode;
075: // ByteBuffer for channel_id and message_id, which present in all messages
076: private final ByteBuffer headerBuffer;
077:
078: private final ByteBuffer[] frame = new ByteBuffer[2];
079:
080: /**
081: * could be useful for debug reasons
082: */
083: private long sentMessageLength;
084:
085: public FramedMessageOutputStream() {
086: this (TCPConstants.DEFAULT_FRAME_SIZE,
087: TCPConstants.DEFAULT_USE_DIRECT_BUFFER);
088: }
089:
090: public FramedMessageOutputStream(int frameSize) {
091: this (frameSize, TCPConstants.DEFAULT_USE_DIRECT_BUFFER);
092: }
093:
094: public FramedMessageOutputStream(int frameSize,
095: boolean useDirectBuffer) {
096: this .useDirectBuffer = useDirectBuffer;
097: headerBuffer = ByteBufferFactory.allocateView(frameSize,
098: useDirectBuffer);
099: setFrameSize(frameSize);
100: }
101:
102: public void setFrameSize(final int frameSize) {
103: this .frameSize = frameSize;
104: payloadlengthLength = (int) Math.ceil(Math.log(frameSize)
105: / Math.log(2));
106: outputBuffer = ByteBufferFactory.allocateView(frameSize,
107: useDirectBuffer);
108: formFrameBufferArray();
109: }
110:
111: public boolean isDirectMode() {
112: return isDirectMode;
113: }
114:
115: public void setDirectMode(final boolean isDirectMode) {
116: reset();
117: this .isDirectMode = isDirectMode;
118: }
119:
120: public void setSocketChannel(final SocketChannel socketChannel) {
121: this .socketChannel = socketChannel;
122: }
123:
124: public void setChannelId(final int channelId) {
125: this .channelId = channelId;
126: }
127:
128: public void setMessageId(final int messageId) {
129: this .messageId = messageId;
130: }
131:
132: public void setContentId(final int contentId) {
133: this .contentId = contentId;
134: }
135:
136: public void setContentProperty(int key, String value) {
137: this .contentProps.put(key, value);
138: }
139:
140: public void addAllContentProperties(Map<Integer, String> properties) {
141: this .contentProps.putAll(properties);
142: }
143:
144: public void write(final int data) throws IOException {
145: if (!outputBuffer.hasRemaining()) {
146: flushFrame();
147: }
148:
149: outputBuffer.put((byte) data);
150: }
151:
152: public void write(final byte[] data, int offset, int size)
153: throws IOException {
154: while (size > 0) {
155: final int bytesToWrite = Math.min(size, outputBuffer
156: .remaining());
157: outputBuffer.put(data, offset, bytesToWrite);
158: size -= bytesToWrite;
159: offset += bytesToWrite;
160: if (!outputBuffer.hasRemaining() && size > 0) {
161: flushFrame();
162: }
163: }
164: }
165:
166: public void flushLast() throws IOException {
167: if (!isFlushLast) {
168: outputBuffer.flip();
169: isFlushLast = true;
170:
171: do {
172: flushBuffer();
173: } while (outputBuffer.hasRemaining());
174: outputBuffer.clear();
175: }
176: }
177:
178: private void flushBuffer() throws IOException {
179: final int payloadLength = outputBuffer.remaining();
180: if (!isDirectMode) {
181: headerBuffer.clear();
182: // Write channel-id
183: int frameMessageIdHighValue = DataInOutUtils.writeInt4(
184: headerBuffer, channelId, 0, false);
185: int frameMessageIdPosition = headerBuffer.position();
186: boolean isFrameWithParameters = FrameType
187: .isFrameContainsParams(messageId)
188: && frameNumber == 0;
189:
190: // Write message-id without counting with possible chunking
191: int highValue = DataInOutUtils.writeInt4(headerBuffer,
192: messageId, frameMessageIdHighValue,
193: !isFrameWithParameters);
194:
195: if (isFrameWithParameters) {
196: // If required - serialize frame content-id, content-parameters
197: // Write content-id
198: highValue = DataInOutUtils.writeInt4(headerBuffer,
199: contentId, highValue, false);
200:
201: final int propsCount = contentProps.size();
202: // Write number-of-parameters
203: highValue = DataInOutUtils.writeInt4(headerBuffer,
204: propsCount, highValue, propsCount == 0);
205:
206: for (Map.Entry<Integer, String> entry : contentProps
207: .entrySet()) {
208: final String value = entry.getValue();
209: byte[] valueBytes = value
210: .getBytes(TCPConstants.UTF8);
211: // Write parameter-id
212: highValue = DataInOutUtils.writeInt4(headerBuffer,
213: entry.getKey(), highValue, false);
214: // Write parameter-value buffer length
215: DataInOutUtils.writeInt4(headerBuffer,
216: valueBytes.length, highValue, true);
217: // Write parameter-value
218: headerBuffer.put(valueBytes);
219: highValue = 0;
220: }
221: }
222:
223: int readyBytesToSend = headerBuffer.position()
224: + payloadlengthLength + payloadLength;
225:
226: if (messageId == FrameType.MESSAGE) {
227: // If message will be chunked - update message-id
228: updateMessageIdIfRequired(frameMessageIdPosition,
229: frameMessageIdHighValue, isFlushLast
230: && readyBytesToSend <= frameSize);
231: }
232:
233: final int sendingPayloadLength = calcPayloadSizeToSend(readyBytesToSend);
234:
235: // Write payload-length
236: DataInOutUtils
237: .writeInt8(headerBuffer, sendingPayloadLength);
238: headerBuffer.flip();
239: final int payloadLimit = outputBuffer.limit();
240: if (sendingPayloadLength < payloadLength) {
241: // check to change for outputBuffer.limit(sendingPayloadLength);
242: outputBuffer.limit(outputBuffer.limit()
243: - (payloadLength - sendingPayloadLength));
244: }
245:
246: OutputWriter.flushChannel(socketChannel, frame);
247: outputBuffer.limit(payloadLimit);
248: sentMessageLength += sendingPayloadLength;
249: frameNumber++;
250: } else {
251: OutputWriter.flushChannel(socketChannel, outputBuffer);
252: }
253: }
254:
255: private void updateMessageIdIfRequired(int frameMessageIdPosition,
256: int frameMessageIdHighValue, boolean isLastFrame) {
257:
258: int frameMessageId;
259: if (isLastFrame) {
260: if (frameNumber != 0) {
261: frameMessageId = FrameType.MESSAGE_END_CHUNK;
262: } else {
263: // Serialized message-id is correct
264: return;
265: }
266: } else if (frameNumber == 0) {
267: frameMessageId = FrameType.MESSAGE_START_CHUNK;
268: } else {
269: frameMessageId = FrameType.MESSAGE_CHUNK;
270: }
271:
272: // merge message-id Integer4 data with next value
273: if (frameMessageIdHighValue != 0) {
274: // merge message-id as lower octet nibble
275: headerBuffer
276: .put(
277: frameMessageIdPosition,
278: (byte) ((frameMessageIdHighValue & 0x70) | frameMessageId));
279: } else {
280: // merge message-id as higher octet nibble
281: int value = headerBuffer.get(frameMessageIdPosition);
282: headerBuffer.put(frameMessageIdPosition,
283: (byte) ((frameMessageId << 4) | (value & 0xF)));
284: }
285: }
286:
287: private int calcPayloadSizeToSend(final int readyBytesToSend)
288: throws IOException {
289: int payloadLength = outputBuffer.remaining();
290: if (readyBytesToSend > frameSize) {
291: payloadLength -= (readyBytesToSend - frameSize);
292: }
293:
294: return payloadLength;
295: }
296:
297: private void formFrameBufferArray() {
298: frame[0] = headerBuffer;
299: frame[1] = outputBuffer;
300: }
301:
302: public void reset() {
303: outputBuffer.clear();
304: headerBuffer.clear();
305: messageId = -1;
306: contentId = -1;
307: contentProps.clear();
308: frameNumber = 0;
309: isFlushLast = false;
310: sentMessageLength = 0;
311: }
312:
313: public void activate() {
314: }
315:
316: public void passivate() {
317: reset();
318: socketChannel = null;
319: }
320:
321: public void close() {
322: }
323:
324: private void flushFrame() throws IOException {
325: outputBuffer.flip();
326: flushBuffer();
327: outputBuffer.compact();
328: }
329:
330: }
|