001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: */
019: package org.apache.mina.transport.vmpipe;
020:
021: import java.lang.management.ManagementFactory;
022: import java.lang.management.ThreadInfo;
023: import java.lang.management.ThreadMXBean;
024: import java.util.concurrent.CountDownLatch;
025: import java.util.concurrent.TimeUnit;
026: import java.util.concurrent.atomic.AtomicReference;
027:
028: import junit.framework.TestCase;
029:
030: import org.apache.mina.common.ConnectFuture;
031: import org.apache.mina.common.IoAcceptor;
032: import org.apache.mina.common.IoConnector;
033: import org.apache.mina.common.IoHandlerAdapter;
034: import org.apache.mina.common.IoSession;
035:
036: /**
037: * @author Apache Mina Project (dev@mina.apache.org)
038: * @version $Rev: $, $Date: $
039: */
040: public class VmPipeSessionCrossCommunicationTest extends TestCase {
041: public void testOneSessionTalkingBackAndForthDoesNotDeadlock()
042: throws Exception {
043: final VmPipeAddress address = new VmPipeAddress(1);
044: final IoConnector connector = new VmPipeConnector();
045: final AtomicReference<IoSession> c1 = new AtomicReference<IoSession>();
046: final CountDownLatch latch = new CountDownLatch(1);
047: final CountDownLatch messageCount = new CountDownLatch(2);
048: IoAcceptor acceptor = new VmPipeAcceptor();
049:
050: acceptor.setHandler(new IoHandlerAdapter() {
051: @Override
052: public void messageReceived(IoSession session,
053: Object message) throws Exception {
054: System.out.println(Thread.currentThread().getName()
055: + ": " + message);
056:
057: if ("start".equals(message)) {
058: session.write("open new");
059: } else if ("re-use c1".equals(message)) {
060: session.write("tell me something on c1 now");
061: } else if (((String) message)
062: .startsWith("please don't deadlock")) {
063: messageCount.countDown();
064: } else {
065: fail("unexpected message received " + message);
066: }
067: }
068: });
069: acceptor.bind(address);
070:
071: connector.setHandler(new IoHandlerAdapter() {
072: @Override
073: public void messageReceived(IoSession session,
074: Object message) throws Exception {
075: System.out.println(Thread.currentThread().getName()
076: + ": " + message);
077:
078: if ("open new".equals(message)) {
079: System.out.println("opening c2 from "
080: + Thread.currentThread().getName());
081:
082: IoConnector c2 = new VmPipeConnector();
083: c2.setHandler(new IoHandlerAdapter() {
084: @Override
085: public void sessionOpened(IoSession session)
086: throws Exception {
087: session.write("re-use c1");
088: }
089:
090: @Override
091: public void messageReceived(IoSession session,
092: Object message) throws Exception {
093: System.out.println(Thread.currentThread()
094: .getName()
095: + ": " + message);
096:
097: if ("tell me something on c1 now"
098: .equals(message)) {
099: latch.countDown();
100: c1.get().write(
101: "please don't deadlock via c1");
102: } else {
103: fail("unexpected message received "
104: + message);
105: }
106: }
107: });
108:
109: ConnectFuture c2Future = c2.connect(address);
110:
111: c2Future.await();
112:
113: latch.await();
114:
115: c2Future.getSession().write(
116: "please don't deadlock via c2");
117: } else {
118: fail("unexpeced message received " + message);
119: }
120: }
121: });
122:
123: ConnectFuture future = connector.connect(address);
124:
125: future.await();
126:
127: c1.set(future.getSession());
128: c1.get().write("start");
129:
130: ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
131:
132: while (!messageCount.await(100, TimeUnit.MILLISECONDS)) {
133: long[] threads = threadMXBean
134: .findMonitorDeadlockedThreads();
135:
136: if (null != threads) {
137: StringBuffer sb = new StringBuffer(256);
138: ThreadInfo[] infos = threadMXBean.getThreadInfo(
139: threads, Integer.MAX_VALUE);
140:
141: for (ThreadInfo info : infos) {
142: sb.append(info.getThreadName()).append(
143: " blocked on ").append(info.getLockName())
144: .append(" owned by ").append(
145: info.getLockOwnerName()).append(
146: "\n");
147: }
148:
149: for (ThreadInfo info : infos) {
150: sb.append("\nStack for ").append(
151: info.getThreadName()).append("\n");
152: for (StackTraceElement element : info
153: .getStackTrace()) {
154: sb.append("\t").append(element).append("\n");
155: }
156: }
157:
158: fail("deadlocked! \n" + sb);
159: }
160: }
161:
162: acceptor.setCloseOnDeactivation(false);
163: acceptor.dispose();
164: }
165: }
|