1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.core.service;
21
22 import java.util.AbstractSet;
23 import java.util.Iterator;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.Set;
27 import java.util.concurrent.Executor;
28 import java.util.concurrent.ExecutorService;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.TimeUnit;
31 import java.util.concurrent.atomic.AtomicInteger;
32
33 import org.apache.mina.core.IoUtil;
34 import org.apache.mina.core.filterchain.DefaultIoFilterChain;
35 import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
36 import org.apache.mina.core.filterchain.IoFilterChainBuilder;
37 import org.apache.mina.core.future.ConnectFuture;
38 import org.apache.mina.core.future.DefaultIoFuture;
39 import org.apache.mina.core.future.IoFuture;
40 import org.apache.mina.core.future.WriteFuture;
41 import org.apache.mina.core.session.AbstractIoSession;
42 import org.apache.mina.core.session.DefaultIoSessionDataStructureFactory;
43 import org.apache.mina.core.session.IdleStatus;
44 import org.apache.mina.core.session.IoSession;
45 import org.apache.mina.core.session.IoSessionConfig;
46 import org.apache.mina.core.session.IoSessionDataStructureFactory;
47 import org.apache.mina.core.session.IoSessionInitializationException;
48 import org.apache.mina.core.session.IoSessionInitializer;
49 import org.apache.mina.util.ExceptionMonitor;
50 import org.apache.mina.util.NamePreservingRunnable;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
53
54
55
56
57
58
59
60
61
62 public abstract class AbstractIoService implements IoService {
63
64 private static final Logger LOGGER = LoggerFactory.getLogger(AbstractIoService.class);
65
66
67
68
69
70 private static final AtomicInteger id = new AtomicInteger();
71
72
73
74
75
76 private final String threadName;
77
78
79
80
81 private final Executor executor;
82
83
84
85
86
87
88
89
90 private final boolean createdExecutor;
91
92
93
94
95 private IoHandler handler;
96
97
98
99
100 protected final IoSessionConfig sessionConfig;
101
102 private final IoServiceListener serviceActivationListener = new IoServiceListener() {
103 public void serviceActivated(IoService service) {
104
105 AbstractIoService s = (AbstractIoService) service;
106 IoServiceStatistics _stats = s.getStatistics();
107 _stats.setLastReadTime(s.getActivationTime());
108 _stats.setLastWriteTime(s.getActivationTime());
109 _stats.setLastThroughputCalculationTime(s.getActivationTime());
110
111 }
112
113 public void serviceDeactivated(IoService service) {
114
115 }
116
117 public void serviceIdle(IoService service, IdleStatus idleStatus) {
118
119 }
120
121 public void sessionCreated(IoSession session) {
122
123 }
124
125 public void sessionDestroyed(IoSession session) {
126
127 }
128 };
129
130
131
132
133 private IoFilterChainBuilder filterChainBuilder = new DefaultIoFilterChainBuilder();
134
135 private IoSessionDataStructureFactory sessionDataStructureFactory = new DefaultIoSessionDataStructureFactory();
136
137
138
139
140 private final IoServiceListenerSupport listeners;
141
142
143
144
145
146 protected final Object disposalLock = new Object();
147
148 private volatile boolean disposing;
149
150 private volatile boolean disposed;
151
152
153
154
155 private IoServiceStatistics stats = new IoServiceStatistics(this);
156
157
158
159
160
161
162
163
164
165
166
167
168
169 protected AbstractIoService(IoSessionConfig sessionConfig, Executor executor) {
170 if (sessionConfig == null) {
171 throw new IllegalArgumentException("sessionConfig");
172 }
173
174 if (getTransportMetadata() == null) {
175 throw new IllegalArgumentException("TransportMetadata");
176 }
177
178 if (!getTransportMetadata().getSessionConfigType().isAssignableFrom(sessionConfig.getClass())) {
179 throw new IllegalArgumentException("sessionConfig type: " + sessionConfig.getClass() + " (expected: "
180 + getTransportMetadata().getSessionConfigType() + ")");
181 }
182
183
184
185 listeners = new IoServiceListenerSupport(this);
186 listeners.add(serviceActivationListener);
187
188
189 this.sessionConfig = sessionConfig;
190
191
192
193 ExceptionMonitor.getInstance();
194
195 if (executor == null) {
196 this.executor = Executors.newCachedThreadPool();
197 createdExecutor = true;
198 } else {
199 this.executor = executor;
200 createdExecutor = false;
201 }
202
203 threadName = getClass().getSimpleName() + '-' + id.incrementAndGet();
204 }
205
206
207
208
209 public final IoFilterChainBuilder getFilterChainBuilder() {
210 return filterChainBuilder;
211 }
212
213
214
215
216 public final void setFilterChainBuilder(IoFilterChainBuilder builder) {
217 if (builder == null) {
218 builder = new DefaultIoFilterChainBuilder();
219 }
220 filterChainBuilder = builder;
221 }
222
223
224
225
226 public final DefaultIoFilterChainBuilder getFilterChain() {
227 if (filterChainBuilder instanceof DefaultIoFilterChainBuilder) {
228 return (DefaultIoFilterChainBuilder) filterChainBuilder;
229 }
230
231 throw new IllegalStateException("Current filter chain builder is not a DefaultIoFilterChainBuilder.");
232 }
233
234
235
236
237 public final void addListener(IoServiceListener listener) {
238 listeners.add(listener);
239 }
240
241
242
243
244 public final void removeListener(IoServiceListener listener) {
245 listeners.remove(listener);
246 }
247
248
249
250
251 public final boolean isActive() {
252 return listeners.isActive();
253 }
254
255
256
257
258 public final boolean isDisposing() {
259 return disposing;
260 }
261
262
263
264
265 public final boolean isDisposed() {
266 return disposed;
267 }
268
269
270
271
272 public final void dispose() {
273 dispose(false);
274 }
275
276
277
278
279 public final void dispose(boolean awaitTermination) {
280 if (disposed) {
281 return;
282 }
283
284 synchronized (disposalLock) {
285 if (!disposing) {
286 disposing = true;
287
288 try {
289 dispose0();
290 } catch (Exception e) {
291 ExceptionMonitor.getInstance().exceptionCaught(e);
292 }
293 }
294 }
295
296 if (createdExecutor) {
297 ExecutorService e = (ExecutorService) executor;
298 e.shutdownNow();
299 if (awaitTermination) {
300
301
302
303 try {
304 LOGGER.debug("awaitTermination on {} called by thread=[{}]", this, Thread.currentThread().getName());
305 e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
306 LOGGER.debug("awaitTermination on {} finished", this);
307 } catch (InterruptedException e1) {
308 LOGGER.warn("awaitTermination on [{}] was interrupted", this);
309
310 Thread.currentThread().interrupt();
311 }
312 }
313 }
314 disposed = true;
315 }
316
317
318
319
320
321 protected abstract void dispose0() throws Exception;
322
323
324
325
326 public final Map<Long, IoSession> getManagedSessions() {
327 return listeners.getManagedSessions();
328 }
329
330
331
332
333 public final int getManagedSessionCount() {
334 return listeners.getManagedSessionCount();
335 }
336
337
338
339
340 public final IoHandler getHandler() {
341 return handler;
342 }
343
344
345
346
347 public final void setHandler(IoHandler handler) {
348 if (handler == null) {
349 throw new IllegalArgumentException("handler cannot be null");
350 }
351
352 if (isActive()) {
353 throw new IllegalStateException("handler cannot be set while the service is active.");
354 }
355
356 this.handler = handler;
357 }
358
359
360
361
362 public final IoSessionDataStructureFactory getSessionDataStructureFactory() {
363 return sessionDataStructureFactory;
364 }
365
366
367
368
369 public final void setSessionDataStructureFactory(IoSessionDataStructureFactory sessionDataStructureFactory) {
370 if (sessionDataStructureFactory == null) {
371 throw new IllegalArgumentException("sessionDataStructureFactory");
372 }
373
374 if (isActive()) {
375 throw new IllegalStateException("sessionDataStructureFactory cannot be set while the service is active.");
376 }
377
378 this.sessionDataStructureFactory = sessionDataStructureFactory;
379 }
380
381
382
383
384 public IoServiceStatistics getStatistics() {
385 return stats;
386 }
387
388
389
390
391 public final long getActivationTime() {
392 return listeners.getActivationTime();
393 }
394
395
396
397
398 public final Set<WriteFuture> broadcast(Object message) {
399
400
401
402 final List<WriteFuture> futures = IoUtil.broadcast(message, getManagedSessions().values());
403 return new AbstractSet<WriteFuture>() {
404 @Override
405 public Iterator<WriteFuture> iterator() {
406 return futures.iterator();
407 }
408
409 @Override
410 public int size() {
411 return futures.size();
412 }
413 };
414 }
415
416 public final IoServiceListenerSupport getListeners() {
417 return listeners;
418 }
419
420 protected final void executeWorker(Runnable worker) {
421 executeWorker(worker, null);
422 }
423
424 protected final void executeWorker(Runnable worker, String suffix) {
425 String actualThreadName = threadName;
426 if (suffix != null) {
427 actualThreadName = actualThreadName + '-' + suffix;
428 }
429 executor.execute(new NamePreservingRunnable(worker, actualThreadName));
430 }
431
432
433 @SuppressWarnings("unchecked")
434 protected final void initSession(IoSession session, IoFuture future, IoSessionInitializer sessionInitializer) {
435
436 if (stats.getLastReadTime() == 0) {
437 stats.setLastReadTime(getActivationTime());
438 }
439
440 if (stats.getLastWriteTime() == 0) {
441 stats.setLastWriteTime(getActivationTime());
442 }
443
444
445
446
447
448 try {
449 ((AbstractIoSession) session).setAttributeMap(session.getService().getSessionDataStructureFactory()
450 .getAttributeMap(session));
451 } catch (IoSessionInitializationException e) {
452 throw e;
453 } catch (Exception e) {
454 throw new IoSessionInitializationException("Failed to initialize an attributeMap.", e);
455 }
456
457 try {
458 ((AbstractIoSession) session).setWriteRequestQueue(session.getService().getSessionDataStructureFactory()
459 .getWriteRequestQueue(session));
460 } catch (IoSessionInitializationException e) {
461 throw e;
462 } catch (Exception e) {
463 throw new IoSessionInitializationException("Failed to initialize a writeRequestQueue.", e);
464 }
465
466 if ((future != null) && (future instanceof ConnectFuture)) {
467
468 session.setAttribute(DefaultIoFilterChain.SESSION_CREATED_FUTURE, future);
469 }
470
471 if (sessionInitializer != null) {
472 sessionInitializer.initializeSession(session, future);
473 }
474
475 finishSessionInitialization0(session, future);
476 }
477
478
479
480
481
482
483
484 protected void finishSessionInitialization0(IoSession session, IoFuture future) {
485
486 }
487
488 protected static class ServiceOperationFuture extends DefaultIoFuture {
489 public ServiceOperationFuture() {
490 super(null);
491 }
492
493 public final boolean isDone() {
494 return getValue() == Boolean.TRUE;
495 }
496
497 public final void setDone() {
498 setValue(Boolean.TRUE);
499 }
500
501 public final Exception getException() {
502 if (getValue() instanceof Exception) {
503 return (Exception) getValue();
504 }
505
506 return null;
507 }
508
509 public final void setException(Exception exception) {
510 if (exception == null) {
511 throw new IllegalArgumentException("exception");
512 }
513 setValue(exception);
514 }
515 }
516
517
518
519
520 public int getScheduledWriteBytes() {
521 return stats.getScheduledWriteBytes();
522 }
523
524
525
526
527 public int getScheduledWriteMessages() {
528 return stats.getScheduledWriteMessages();
529 }
530
531 }