1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.mina.transport.vmpipe;
20
21 import static org.junit.Assert.fail;
22
23 import java.lang.management.ManagementFactory;
24 import java.lang.management.ThreadInfo;
25 import java.lang.management.ThreadMXBean;
26 import java.util.concurrent.CountDownLatch;
27 import java.util.concurrent.TimeUnit;
28 import java.util.concurrent.atomic.AtomicReference;
29
30 import org.apache.mina.core.future.ConnectFuture;
31 import org.apache.mina.core.service.IoAcceptor;
32 import org.apache.mina.core.service.IoConnector;
33 import org.apache.mina.core.service.IoHandlerAdapter;
34 import org.apache.mina.core.session.IoSession;
35 import org.junit.Test;
36
37
38
39
40
41
42 public class VmPipeSessionCrossCommunicationTest {
43 @Test
44 public void testOneSessionTalkingBackAndForthDoesNotDeadlock() throws Exception {
45 final VmPipeAddress address = new VmPipeAddress(1);
46 final IoConnector connector = new VmPipeConnector();
47 final AtomicReference<IoSession> c1 = new AtomicReference<>();
48 final CountDownLatch latch = new CountDownLatch(1);
49 final CountDownLatch messageCount = new CountDownLatch(2);
50 IoAcceptor acceptor = new VmPipeAcceptor();
51
52 acceptor.setHandler(new IoHandlerAdapter() {
53 @Override
54 public void messageReceived(IoSession session, Object message) throws Exception {
55
56
57 if ("start".equals(message)) {
58 session.write("open new");
59 } else if ("re-use c1".equals(message)) {
60 session.write("tell me something on c1 now");
61 } else if (((String) message).startsWith("please don't deadlock")) {
62 messageCount.countDown();
63 } else {
64 fail("unexpected message received " + message);
65 }
66 }
67 });
68 acceptor.bind(address);
69
70 connector.setHandler(new IoHandlerAdapter() {
71 @Override
72 public void messageReceived(IoSession session, Object message) throws Exception {
73
74
75 if ("open new".equals(message)) {
76
77
78 IoConnector c2 = new VmPipeConnector();
79 c2.setHandler(new IoHandlerAdapter() {
80 @Override
81 public void sessionOpened(IoSession session) throws Exception {
82 session.write("re-use c1");
83 }
84
85 @Override
86 public void messageReceived(IoSession session, Object message) throws Exception {
87
88
89 if ("tell me something on c1 now".equals(message)) {
90 latch.countDown();
91 c1.get().write("please don't deadlock via c1");
92 } else {
93 fail("unexpected message received " + message);
94 }
95 }
96 });
97
98 ConnectFuture c2Future = c2.connect(address);
99
100 c2Future.await();
101
102 latch.await();
103
104 c2Future.getSession().write("please don't deadlock via c2");
105 } else {
106 fail("unexpeced message received " + message);
107 }
108 }
109 });
110
111 ConnectFuture future = connector.connect(address);
112
113 future.await();
114
115 c1.set(future.getSession());
116 c1.get().write("start");
117
118 ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
119
120 while (!messageCount.await(100, TimeUnit.MILLISECONDS)) {
121 long[] threads = threadMXBean.findMonitorDeadlockedThreads();
122
123 if (null != threads) {
124 StringBuffer sb = new StringBuffer(256);
125 ThreadInfo[] infos = threadMXBean.getThreadInfo(threads, Integer.MAX_VALUE);
126
127 for (ThreadInfo info : infos) {
128 sb.append(info.getThreadName()).append(" blocked on ").append(info.getLockName())
129 .append(" owned by ").append(info.getLockOwnerName()).append("\n");
130 }
131
132 for (ThreadInfo info : infos) {
133 sb.append("\nStack for ").append(info.getThreadName()).append("\n");
134 for (StackTraceElement element : info.getStackTrace()) {
135 sb.append("\t").append(element).append("\n");
136 }
137 }
138
139 fail("deadlocked! \n" + sb);
140 }
141 }
142
143 acceptor.setCloseOnDeactivation(false);
144 acceptor.dispose();
145 }
146 }