001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017: package org.apache.catalina.tribes.io;
018:
019: import java.io.IOException;
020: import java.net.Socket;
021: import java.nio.ByteBuffer;
022: import java.nio.channels.SocketChannel;
023:
024: import org.apache.catalina.tribes.ChannelMessage;
025:
026: /**
027: * The object reader object is an object used in conjunction with
028: * java.nio TCP messages. This object stores the message bytes in a
029: * <code>XByteBuffer</code> until a full package has been received.
030: * This object uses an XByteBuffer which is an extendable object buffer that also allows
031: * for message encoding and decoding.
032: *
033: * @author Filip Hanik
034: * @version $Revision: 467173 $, $Date: 2006-10-24 01:12:17 +0200 (mar., 24 oct. 2006) $
035: */
036: public class ObjectReader {
037:
038: protected static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory
039: .getLog(ObjectReader.class);
040:
041: private XByteBuffer buffer;
042:
043: protected long lastAccess = System.currentTimeMillis();
044:
045: protected boolean accessed = false;
046: private boolean cancelled;
047:
048: /**
049: * Creates an <code>ObjectReader</code> for a TCP NIO socket channel
050: * @param channel - the channel to be read.
051: */
052: public ObjectReader(SocketChannel channel) {
053: this (channel.socket());
054: }
055:
056: /**
057: * Creates an <code>ObjectReader</code> for a TCP socket
058: * @param socket Socket
059: */
060: public ObjectReader(Socket socket) {
061: try {
062: this .buffer = new XByteBuffer(
063: socket.getReceiveBufferSize(), true);
064: } catch (IOException x) {
065: //unable to get buffer size
066: log
067: .warn("Unable to retrieve the socket receiver buffer size, setting to default 43800 bytes.");
068: this .buffer = new XByteBuffer(43800, true);
069: }
070: }
071:
072: public synchronized void access() {
073: this .accessed = true;
074: this .lastAccess = System.currentTimeMillis();
075: }
076:
077: public synchronized void finish() {
078: this .accessed = false;
079: this .lastAccess = System.currentTimeMillis();
080: }
081:
082: public boolean isAccessed() {
083: return this .accessed;
084: }
085:
086: /**
087: * Append new bytes to buffer.
088: * @see XByteBuffer#countPackages()
089: * @param data new transfer buffer
090: * @param off offset
091: * @param len length in buffer
092: * @return number of messages that sended to callback
093: * @throws java.io.IOException
094: */
095: public int append(ByteBuffer data, int len, boolean count)
096: throws java.io.IOException {
097: buffer.append(data, len);
098: int pkgCnt = -1;
099: if (count)
100: pkgCnt = buffer.countPackages();
101: return pkgCnt;
102: }
103:
104: public int append(byte[] data, int off, int len, boolean count)
105: throws java.io.IOException {
106: buffer.append(data, off, len);
107: int pkgCnt = -1;
108: if (count)
109: pkgCnt = buffer.countPackages();
110: return pkgCnt;
111: }
112:
113: /**
114: * Send buffer to cluster listener (callback).
115: * Is message complete receiver send message to callback?
116: *
117: * @see org.apache.catalina.tribes.transport.ClusterReceiverBase#messageDataReceived(ChannelMessage)
118: * @see XByteBuffer#doesPackageExist()
119: * @see XByteBuffer#extractPackage(boolean)
120: *
121: * @return number of received packages/messages
122: * @throws java.io.IOException
123: */
124: public ChannelMessage[] execute() throws java.io.IOException {
125: int pkgCnt = buffer.countPackages();
126: ChannelMessage[] result = new ChannelMessage[pkgCnt];
127: for (int i = 0; i < pkgCnt; i++) {
128: ChannelMessage data = buffer.extractPackage(true);
129: result[i] = data;
130: }
131: return result;
132: }
133:
134: public int bufferSize() {
135: return buffer.getLength();
136: }
137:
138: public boolean hasPackage() {
139: return buffer.countPackages(true) > 0;
140: }
141:
142: /**
143: * Returns the number of packages that the reader has read
144: * @return int
145: */
146: public int count() {
147: return buffer.countPackages();
148: }
149:
150: public void close() {
151: this .buffer = null;
152: }
153:
154: public long getLastAccess() {
155: return lastAccess;
156: }
157:
158: public boolean isCancelled() {
159: return cancelled;
160: }
161:
162: public void setLastAccess(long lastAccess) {
163: this .lastAccess = lastAccess;
164: }
165:
166: public void setCancelled(boolean cancelled) {
167: this.cancelled = cancelled;
168: }
169:
170: }
|