001: /*
002: * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
003: *
004: * Copyright 1997-2007 Sun Microsystems, Inc. All rights reserved.
005: *
006: * The contents of this file are subject to the terms of either the GNU
007: * General Public License Version 2 only ("GPL") or the Common Development
008: * and Distribution License("CDDL") (collectively, the "License"). You
009: * may not use this file except in compliance with the License. You can obtain
010: * a copy of the License at https://glassfish.dev.java.net/public/CDDL+GPL.html
011: * or glassfish/bootstrap/legal/LICENSE.txt. See the License for the specific
012: * language governing permissions and limitations under the License.
013: *
014: * When distributing the software, include this License Header Notice in each
015: * file and include the License file at glassfish/bootstrap/legal/LICENSE.txt.
016: * Sun designates this particular file as subject to the "Classpath" exception
017: * as provided by Sun in the GPL Version 2 section of the License file that
018: * accompanied this code. If applicable, add the following below the License
019: * Header, with the fields enclosed by brackets [] replaced by your own
020: * identifying information: "Portions Copyrighted [year]
021: * [name of copyright owner]"
022: *
023: * Contributor(s):
024: *
025: * If you wish your version of this file to be governed by only the CDDL or
026: * only the GPL Version 2, indicate your decision by adding "[Contributor]
027: * elects to include this software in this distribution under the [CDDL or GPL
028: * Version 2] license." If you don't indicate a single choice of license, a
029: * recipient has the option to distribute your version of this file under
030: * either the CDDL, the GPL Version 2 or to extend the choice of license to
031: * its licensees as provided above. However, if you add GPL Version 2 code
032: * and therefore, elected the GPL Version 2 license, then the option applies
033: * only if the new code is made subject to such option by the copyright
034: * holder.
035: */
036:
037: package com.sun.xml.ws.transport.tcp.io;
038:
039: import com.sun.xml.ws.transport.tcp.pool.LifeCycle;
040: import com.sun.xml.ws.transport.tcp.resources.MessagesMessages;
041: import com.sun.xml.ws.transport.tcp.util.FrameType;
042: import com.sun.xml.ws.transport.tcp.util.SelectorFactory;
043: import com.sun.xml.ws.transport.tcp.util.TCPConstants;
044: import java.io.EOFException;
045: import java.io.IOException;
046: import java.io.InputStream;
047: import java.nio.ByteBuffer;
048: import java.nio.channels.SelectionKey;
049: import java.nio.channels.Selector;
050: import java.nio.channels.SocketChannel;
051: import java.util.HashMap;
052: import java.util.Map;
053: import java.util.logging.Level;
054: import java.util.logging.Logger;
055:
056: /**
057: * Stream wrapper around a <code>ByteBuffer</code>
058: */
059: public final class FramedMessageInputStream extends InputStream
060: implements LifeCycle {
061: private static final Logger logger = Logger
062: .getLogger(com.sun.xml.ws.transport.tcp.util.TCPConstants.LoggingDomain
063: + ".streams");
064:
065: private ByteBuffer byteBuffer;
066:
067: private SocketChannel socketChannel;
068:
069: private int bufferClearLimit; // clear buffer before read from socket if its size already more than this value
070: /**
071: * The time to wait before timing out when reading bytes
072: */
073: final private static int READ_TIMEOUT = 1000;
074:
075: /**
076: * Number of times to retry before return EOF
077: */
078: final static int READ_TRY = 10;
079:
080: private final int[] headerTmpArray = new int[2];
081:
082: /** is message framed or direct mode is used */
083: private boolean isDirectMode;
084:
085: private int frameSize;
086: private int frameBytesRead;
087: private boolean isLastFrame;
088: private int currentFrameDataSize; // for last frame actual data size could be smaller than frame size
089:
090: private int channelId;
091: private int contentId;
092: private int messageId;
093: private final Map<Integer, String> contentProps = new HashMap<Integer, String>(
094: 8);
095:
096: private boolean isReadingHeader;
097:
098: /**
099: * could be useful for debug reasons
100: */
101: private long receivedMessageLength;
102:
103: // ------------------------------------------------- Constructor -------//
104:
105: public FramedMessageInputStream() {
106: this (TCPConstants.DEFAULT_FRAME_SIZE);
107: }
108:
109: public FramedMessageInputStream(int frameSize) {
110: setFrameSize(frameSize);
111: }
112:
113: // ---------------------------------------------------------------------//
114:
115: public void setSocketChannel(final SocketChannel socketChannel) {
116: this .socketChannel = socketChannel;
117: }
118:
119: public int getChannelId() {
120: return channelId;
121: }
122:
123: public int getMessageId() {
124: return messageId;
125: }
126:
127: public int getContentId() {
128: return contentId;
129: }
130:
131: public Map<Integer, String> getContentProperties() {
132: return contentProps;
133: }
134:
135: public boolean isDirectMode() {
136: return isDirectMode;
137: }
138:
139: public void setDirectMode(final boolean isDirectMode) {
140: reset();
141: this .isDirectMode = isDirectMode;
142: }
143:
144: public void setFrameSize(final int frameSize) {
145: this .frameSize = frameSize;
146: }
147:
148: public void setByteBuffer(final ByteBuffer byteBuffer) {
149: this .byteBuffer = byteBuffer;
150: if (byteBuffer != null) {
151: bufferClearLimit = byteBuffer.capacity() * 3 / 4;
152: }
153: }
154:
155: /**
156: * Return the available bytes
157: * @return the wrapped byteBuffer.remaining()
158: */
159: public int available() {
160: return remaining();
161: }
162:
163: /**
164: * Close this stream.
165: */
166: public void close() {
167: }
168:
169: /**
170: * Return true if mark is supported.
171: */
172: public boolean markSupported() {
173: return false;
174: }
175:
176: /**
177: * Read the first byte from the wrapped <code>ByteBuffer</code>.
178: */
179: public int read() {
180: int eof = 0;
181: if (!byteBuffer.hasRemaining()) {
182: eof = readFromChannel();
183: } else if (remaining() == 0 && isLastFrame) { // if in buffer there is last frame's tale only
184: return -1;
185: }
186:
187: if (eof == -1 || readFrameHeaderIfRequired() == -1)
188: return -1;
189:
190: if (byteBuffer.hasRemaining()) {
191: frameBytesRead++;
192: receivedMessageLength++;
193: return byteBuffer.get() & 0xff;
194: }
195:
196: return read();
197: }
198:
199: /**
200: * Read the bytes from the wrapped <code>ByteBuffer</code>.
201: */
202: public int read(final byte[] b) {
203: return (read(b, 0, b.length));
204: }
205:
206: /**
207: * Read the first byte of the wrapped <code>ByteBuffer</code>.
208: */
209: public int read(final byte[] b, final int offset, int length) {
210: if (!byteBuffer.hasRemaining()) {
211: final int eof = readFromChannel();
212: if (eof <= 0) {
213: return -1;
214: }
215: } else if (remaining() == 0 && isLastFrame) { // if in buffer there is last frame's tale only
216: return -1;
217: }
218:
219: if (readFrameHeaderIfRequired() == -1)
220: return -1;
221:
222: //@TODO add logic for reading from several frames if required
223: final int remaining = remaining();
224: if (length > remaining) {
225: length = remaining;
226: }
227:
228: byteBuffer.get(b, offset, length);
229: frameBytesRead += length;
230: receivedMessageLength += length;
231:
232: return length;
233: }
234:
235: public void forceHeaderRead() throws IOException {
236: readHeader();
237: }
238:
239: private int readFrameHeaderIfRequired() {
240: if (!isDirectMode
241: && !isLastFrame
242: && !isReadingHeader
243: && (frameBytesRead == 0 || frameBytesRead == currentFrameDataSize)) {
244: try {
245: readHeader();
246: } catch (IOException ex) {
247: return -1;
248: }
249: }
250:
251: return 0;
252: }
253:
254: private void readHeader() throws IOException {
255: if (logger.isLoggable(Level.FINEST)) {
256: logger.log(Level.FINEST, MessagesMessages
257: .WSTCP_1060_FRAMED_MESSAGE_IS_READ_HEADER_ENTER());
258: }
259: frameBytesRead = 0;
260: isReadingHeader = true;
261: // Read channel-id and message-id
262: int lowNeebleValue = DataInOutUtils.readInts4(this ,
263: headerTmpArray, 2, 0);
264: channelId = headerTmpArray[0];
265: messageId = headerTmpArray[1];
266:
267: if (FrameType.isFrameContainsParams(messageId)) { //message types have description
268: // Read content-id and number-of-parameters
269: lowNeebleValue = DataInOutUtils.readInts4(this ,
270: headerTmpArray, 2, lowNeebleValue);
271: contentId = headerTmpArray[0];
272: final int paramNumber = headerTmpArray[1];
273: for (int i = 0; i < paramNumber; i++) {
274: // Read parameter-id and length of parameter-value buffer
275: DataInOutUtils.readInts4(this , headerTmpArray, 2,
276: lowNeebleValue);
277: final int paramId = headerTmpArray[0];
278: final int paramValueLen = headerTmpArray[1];
279: byte[] paramValueBytes = new byte[paramValueLen];
280: // Read parameter-value
281: DataInOutUtils.readFully(this , paramValueBytes);
282: final String paramValue = new String(paramValueBytes,
283: TCPConstants.UTF8);
284: contentProps.put(paramId, paramValue);
285: lowNeebleValue = 0;
286: }
287: }
288:
289: // Read payload-size
290: currentFrameDataSize = DataInOutUtils.readInt8(this );
291: isLastFrame = FrameType.isLastFrame(messageId);
292: currentFrameDataSize += frameBytesRead;
293: isReadingHeader = false;
294: if (logger.isLoggable(Level.FINEST)) {
295: logger.log(Level.FINEST, MessagesMessages
296: .WSTCP_1061_FRAMED_MESSAGE_IS_READ_HEADER_DONE(
297: channelId, messageId, contentId,
298: contentProps, currentFrameDataSize,
299: isLastFrame));
300: }
301: }
302:
303: private int readFromChannel() {
304: int eof = 0;
305: for (int i = 0; i < READ_TRY; i++) {
306: eof = doRead();
307:
308: if (eof != 0) {
309: break;
310: }
311: }
312:
313: return eof;
314: }
315:
316: public void skipToEndOfMessage() throws EOFException {
317: do {
318: readFrameHeaderIfRequired();
319: skipToEndOfFrame();
320: frameBytesRead = 0;
321: } while (!isLastFrame);
322: }
323:
324: private void skipToEndOfFrame() throws EOFException {
325: if (currentFrameDataSize > 0) {
326: if (byteBuffer.hasRemaining()) {
327: final int remainFrameBytes = currentFrameDataSize
328: - frameBytesRead;
329: if (remainFrameBytes <= byteBuffer.remaining()) {
330: byteBuffer.position(byteBuffer.position()
331: + remainFrameBytes);
332: return;
333: }
334:
335: frameBytesRead += byteBuffer.remaining();
336: byteBuffer.position(byteBuffer.limit());
337: }
338:
339: while (frameBytesRead < currentFrameDataSize) {
340: final int eof = readFromChannel();
341: if (eof == -1) {
342: String errorMessage = MessagesMessages
343: .WSTCP_1062_FRAMED_MESSAGE_IS_READ_UNEXPECTED_EOF(
344: isLastFrame, frameBytesRead,
345: frameSize, currentFrameDataSize);
346: logger.log(Level.SEVERE, errorMessage);
347: throw new EOFException(errorMessage);
348: }
349: frameBytesRead += eof;
350: byteBuffer.position(byteBuffer.position() + eof);
351: }
352:
353: // if extra frame bytes were read - move position backwards
354: byteBuffer.position(byteBuffer.position()
355: - (frameBytesRead - currentFrameDataSize));
356: }
357: }
358:
359: /**
360: * Read bytes using the read <code>ReadSelector</code>
361: */
362: private int doRead() {
363: if (socketChannel == null)
364: return -1;
365: if (isEOF()) {
366: return -1;
367: }
368:
369: if (!byteBuffer.hasRemaining()
370: && byteBuffer.position() >= bufferClearLimit) {
371: byteBuffer.clear();
372: }
373:
374: final int bbPosition = byteBuffer.position();
375: byteBuffer.limit(byteBuffer.capacity());
376:
377: int count;
378: int byteRead = 0;
379: Selector readSelector = null;
380: SelectionKey tmpKey = null;
381:
382: try {
383: do {
384: count = socketChannel.read(byteBuffer);
385: byteRead += count;
386: } while (count > 0);
387:
388: if (count == -1 && byteRead >= 0)
389: byteRead++;
390:
391: if (byteRead == 0) {
392: readSelector = SelectorFactory.getSelector();
393:
394: if (readSelector == null) {
395: return 0;
396: }
397: tmpKey = socketChannel.register(readSelector,
398: SelectionKey.OP_READ);
399: tmpKey.interestOps(tmpKey.interestOps()
400: | SelectionKey.OP_READ);
401: final int code = readSelector.select(READ_TIMEOUT);
402:
403: //Nothing so return.
404: tmpKey.interestOps(tmpKey.interestOps()
405: & (~SelectionKey.OP_READ));
406: if (code == 0) {
407: return 0;
408: }
409:
410: do {
411: count = socketChannel.read(byteBuffer);
412: byteRead += count;
413: } while (count > 0);
414: if (count == -1 && byteRead >= 0)
415: byteRead++;
416: }
417: } catch (Exception e) {
418: logger.log(Level.SEVERE, MessagesMessages
419: .WSTCP_0018_ERROR_READING_FROM_SOCKET(), e);
420: return -1;
421: } finally {
422: if (tmpKey != null)
423: tmpKey.cancel();
424:
425: if (readSelector != null) {
426: try {
427: readSelector.selectNow();
428: } catch (IOException ex) {
429: }
430: SelectorFactory.returnSelector(readSelector);
431: }
432:
433: byteBuffer.flip();
434: byteBuffer.position(bbPosition);
435: }
436:
437: return byteRead;
438: }
439:
440: public boolean isMessageInProcess() {
441: if (currentFrameDataSize == 0 || isEOF())
442: return false;
443:
444: return true;
445: }
446:
447: private boolean isEOF() {
448: return isLastFrame
449: && frameBytesRead >= currentFrameDataSize - 1;
450: }
451:
452: private int remaining() {
453: if (isReadingHeader || isDirectMode) {
454: return byteBuffer.remaining();
455: }
456:
457: return Math.min(currentFrameDataSize - frameBytesRead,
458: byteBuffer.remaining());
459: }
460:
461: public void reset() {
462: frameBytesRead = 0;
463: currentFrameDataSize = 0;
464: isLastFrame = false;
465: isReadingHeader = false;
466: contentId = -1;
467: messageId = -1;
468: contentProps.clear();
469: receivedMessageLength = 0;
470: }
471:
472: public void activate() {
473: }
474:
475: public void passivate() {
476: reset();
477: setSocketChannel(null);
478: setByteBuffer(null);
479: }
480:
481: public String toString() {
482: final StringBuffer buffer = new StringBuffer(100);
483: buffer.append("ByteBuffer: ");
484: buffer.append(byteBuffer);
485: buffer.append(" FrameBytesRead: ");
486: buffer.append(frameBytesRead);
487: buffer.append(" CurrentFrameDataSize: ");
488: buffer.append(currentFrameDataSize);
489: buffer.append(" isLastFrame: ");
490: buffer.append(isLastFrame);
491: buffer.append(" isDirectMode: ");
492: buffer.append(isDirectMode);
493: buffer.append(" isReadingHeader: ");
494: buffer.append(isReadingHeader);
495:
496: return buffer.toString();
497: }
498: }
|