001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: *
019: */
020: package org.apache.mina.transport.vmpipe;
021:
022: import java.util.ArrayList;
023: import java.util.List;
024: import java.util.Queue;
025: import java.util.concurrent.ConcurrentLinkedQueue;
026:
027: import org.apache.mina.common.AbstractIoSession;
028: import org.apache.mina.common.DefaultIoFilterChain;
029: import org.apache.mina.common.IdleStatus;
030: import org.apache.mina.common.IoBuffer;
031: import org.apache.mina.common.IoEvent;
032: import org.apache.mina.common.IoEventType;
033: import org.apache.mina.common.IoProcessor;
034: import org.apache.mina.common.IoSession;
035: import org.apache.mina.common.WriteRequest;
036: import org.apache.mina.common.WriteRequestQueue;
037: import org.apache.mina.common.WriteToClosedSessionException;
038:
039: /**
040: * @author The Apache MINA Project (dev@mina.apache.org)
041: * @version $Rev: 598504 $, $Date: 2007-11-26 20:08:12 -0700 (Mon, 26 Nov 2007) $
042: */
043: class VmPipeFilterChain extends DefaultIoFilterChain {
044:
045: private final Queue<IoEvent> eventQueue = new ConcurrentLinkedQueue<IoEvent>();
046: private final IoProcessor<VmPipeSessionImpl> processor = new VmPipeIoProcessor();
047:
048: private volatile boolean flushEnabled;
049: private volatile boolean sessionOpened;
050:
051: VmPipeFilterChain(AbstractIoSession session) {
052: super (session);
053: }
054:
055: IoProcessor<VmPipeSessionImpl> getProcessor() {
056: return processor;
057: }
058:
059: public void start() {
060: flushEnabled = true;
061: flushEvents();
062: flushPendingDataQueues((VmPipeSessionImpl) getSession());
063: }
064:
065: private void pushEvent(IoEvent e) {
066: eventQueue.add(e);
067: if (flushEnabled) {
068: flushEvents();
069: }
070: }
071:
072: private void flushEvents() {
073: IoEvent e;
074: while ((e = eventQueue.poll()) != null) {
075: fireEvent(e);
076: }
077: }
078:
079: private void fireEvent(IoEvent e) {
080: IoSession session = getSession();
081: IoEventType type = e.getType();
082: Object data = e.getParameter();
083:
084: if (type == IoEventType.MESSAGE_RECEIVED) {
085: VmPipeSessionImpl s = (VmPipeSessionImpl) session;
086: if (sessionOpened && s.getTrafficMask().isReadable()
087: && s.getLock().tryLock()) {
088: try {
089: if (!s.getTrafficMask().isReadable()) {
090: s.receivedMessageQueue.add(data);
091: } else {
092: super .fireMessageReceived(data);
093: }
094: } finally {
095: s.getLock().unlock();
096: }
097: } else {
098: s.receivedMessageQueue.add(data);
099: }
100: } else if (type == IoEventType.WRITE) {
101: super .fireFilterWrite((WriteRequest) data);
102: } else if (type == IoEventType.MESSAGE_SENT) {
103: super .fireMessageSent((WriteRequest) data);
104: } else if (type == IoEventType.EXCEPTION_CAUGHT) {
105: super .fireExceptionCaught((Throwable) data);
106: } else if (type == IoEventType.SESSION_IDLE) {
107: super .fireSessionIdle((IdleStatus) data);
108: } else if (type == IoEventType.SESSION_OPENED) {
109: super .fireSessionOpened();
110: sessionOpened = true;
111: } else if (type == IoEventType.SESSION_CREATED) {
112: super .fireSessionCreated();
113: } else if (type == IoEventType.SESSION_CLOSED) {
114: super .fireSessionClosed();
115: } else if (type == IoEventType.CLOSE) {
116: super .fireFilterClose();
117: }
118: }
119:
120: private static void flushPendingDataQueues(VmPipeSessionImpl s) {
121: s.getProcessor().updateTrafficMask(s);
122: s.getRemoteSession().getProcessor().updateTrafficMask(s);
123: }
124:
125: @Override
126: public void fireFilterClose() {
127: pushEvent(new IoEvent(IoEventType.CLOSE, getSession(), null));
128: }
129:
130: @Override
131: public void fireFilterWrite(WriteRequest writeRequest) {
132: pushEvent(new IoEvent(IoEventType.WRITE, getSession(),
133: writeRequest));
134: }
135:
136: @Override
137: public void fireExceptionCaught(Throwable cause) {
138: pushEvent(new IoEvent(IoEventType.EXCEPTION_CAUGHT,
139: getSession(), cause));
140: }
141:
142: @Override
143: public void fireMessageSent(WriteRequest request) {
144: pushEvent(new IoEvent(IoEventType.MESSAGE_SENT, getSession(),
145: request));
146: }
147:
148: @Override
149: public void fireSessionClosed() {
150: pushEvent(new IoEvent(IoEventType.SESSION_CLOSED, getSession(),
151: null));
152: }
153:
154: @Override
155: public void fireSessionCreated() {
156: pushEvent(new IoEvent(IoEventType.SESSION_CREATED,
157: getSession(), null));
158: }
159:
160: @Override
161: public void fireSessionIdle(IdleStatus status) {
162: pushEvent(new IoEvent(IoEventType.SESSION_IDLE, getSession(),
163: status));
164: }
165:
166: @Override
167: public void fireSessionOpened() {
168: pushEvent(new IoEvent(IoEventType.SESSION_OPENED, getSession(),
169: null));
170: }
171:
172: @Override
173: public void fireMessageReceived(Object message) {
174: pushEvent(new IoEvent(IoEventType.MESSAGE_RECEIVED,
175: getSession(), message));
176: }
177:
178: private class VmPipeIoProcessor implements
179: IoProcessor<VmPipeSessionImpl> {
180: public void flush(VmPipeSessionImpl session) {
181: WriteRequestQueue queue = session.getWriteRequestQueue0();
182: if (queue.isEmpty(session)) {
183: return;
184: }
185: if (session.isConnected()) {
186: if (session.getLock().tryLock()) {
187: try {
188: WriteRequest req;
189: while ((req = queue.poll(session)) != null) {
190: Object message = req.getMessage();
191: Object messageCopy = message;
192: if (message instanceof IoBuffer) {
193: IoBuffer rb = (IoBuffer) message;
194: rb.mark();
195: IoBuffer wb = IoBuffer.allocate(rb
196: .remaining());
197: wb.put(rb);
198: wb.flip();
199: rb.reset();
200: messageCopy = wb;
201: }
202:
203: session.getRemoteSession().getFilterChain()
204: .fireMessageReceived(messageCopy);
205: session.getFilterChain().fireMessageSent(
206: req);
207: }
208: } finally {
209: session.getLock().unlock();
210: }
211:
212: flushPendingDataQueues(session);
213: }
214: } else {
215: List<WriteRequest> failedRequests = new ArrayList<WriteRequest>();
216: WriteRequest req;
217: while ((req = queue.poll(session)) != null) {
218: failedRequests.add(req);
219: }
220:
221: if (!failedRequests.isEmpty()) {
222: WriteToClosedSessionException cause = new WriteToClosedSessionException(
223: failedRequests);
224: for (WriteRequest r : failedRequests) {
225: r.getFuture().setException(cause);
226: }
227: session.getFilterChain().fireExceptionCaught(cause);
228: }
229: }
230: }
231:
232: public void remove(VmPipeSessionImpl session) {
233: try {
234: session.getLock().lock();
235: if (!session.getCloseFuture().isClosed()) {
236: session.getServiceListeners().fireSessionDestroyed(
237: session);
238: session.getRemoteSession().close();
239: }
240: } finally {
241: session.getLock().unlock();
242: }
243: }
244:
245: public void add(VmPipeSessionImpl session) {
246: }
247:
248: public void updateTrafficMask(VmPipeSessionImpl session) {
249: if (session.getTrafficMask().isReadable()) {
250: List<Object> data = new ArrayList<Object>();
251: session.receivedMessageQueue.drainTo(data);
252: for (Object aData : data) {
253: VmPipeFilterChain.this .fireMessageReceived(aData);
254: }
255: }
256:
257: if (session.getTrafficMask().isWritable()) {
258: flush(session); // The second parameter is unused.
259: }
260: }
261:
262: public void dispose() {
263: }
264:
265: public boolean isDisposed() {
266: return false;
267: }
268:
269: public boolean isDisposing() {
270: return false;
271: }
272: }
273: }
|