View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.core.service;
21  
22  import java.util.AbstractSet;
23  import java.util.Iterator;
24  import java.util.List;
25  import java.util.Map;
26  import java.util.Set;
27  import java.util.concurrent.Executor;
28  import java.util.concurrent.ExecutorService;
29  import java.util.concurrent.Executors;
30  import java.util.concurrent.TimeUnit;
31  import java.util.concurrent.atomic.AtomicInteger;
32  
33  import org.apache.mina.core.IoUtil;
34  import org.apache.mina.core.filterchain.DefaultIoFilterChain;
35  import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
36  import org.apache.mina.core.filterchain.IoFilterChainBuilder;
37  import org.apache.mina.core.future.ConnectFuture;
38  import org.apache.mina.core.future.DefaultIoFuture;
39  import org.apache.mina.core.future.IoFuture;
40  import org.apache.mina.core.future.WriteFuture;
41  import org.apache.mina.core.session.AbstractIoSession;
42  import org.apache.mina.core.session.DefaultIoSessionDataStructureFactory;
43  import org.apache.mina.core.session.IdleStatus;
44  import org.apache.mina.core.session.IoSession;
45  import org.apache.mina.core.session.IoSessionConfig;
46  import org.apache.mina.core.session.IoSessionDataStructureFactory;
47  import org.apache.mina.core.session.IoSessionInitializationException;
48  import org.apache.mina.core.session.IoSessionInitializer;
49  import org.apache.mina.util.ExceptionMonitor;
50  import org.apache.mina.util.NamePreservingRunnable;
51  import org.slf4j.Logger;
52  import org.slf4j.LoggerFactory;
53  
54  /**
55   * Base implementation of {@link IoService}s.
56   * 
57   * An instance of IoService contains an Executor which will handle the incoming
58   * events.
59   *
60   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
61   */
62  public abstract class AbstractIoService implements IoService {
63  
64      private static final Logger LOGGER = LoggerFactory.getLogger(AbstractIoService.class);
65      /** 
66       * The unique number identifying the Service. It's incremented
67       * for each new IoService created.
68       */
69      private static final AtomicInteger id = new AtomicInteger();
70  
71      /** 
72       * The thread name built from the IoService inherited 
73       * instance class name and the IoService Id 
74       **/
75      private final String threadName;
76  
77      /**
78       * The associated executor, responsible for handling execution of I/O events.
79       */
80      private final Executor executor;
81  
82      /**
83       * A flag used to indicate that the local executor has been created
84       * inside this instance, and not passed by a caller.
85       * 
86       * If the executor is locally created, then it will be an instance
87       * of the ThreadPoolExecutor class.
88       */
89      private final boolean createdExecutor;
90  
91      /**
92       * The IoHandler in charge of managing all the I/O Events. It is 
93       */
94      private IoHandler handler;
95  
96      /**
97       * The default {@link IoSessionConfig} which will be used to configure new sessions.
98       */
99      private final IoSessionConfig sessionConfig;
100 
101     private final IoServiceListener serviceActivationListener = new IoServiceListener() {
102         public void serviceActivated(IoService service) {
103             // Update lastIoTime.
104             AbstractIoService s = (AbstractIoService) service;
105             IoServiceStatistics _stats = s.getStatistics();
106             _stats.setLastReadTime(s.getActivationTime());
107             _stats.setLastWriteTime(s.getActivationTime());
108             _stats.setLastThroughputCalculationTime(s.getActivationTime());
109 
110         }
111 
112         public void serviceDeactivated(IoService service) {
113             // Empty handler
114         }
115 
116         public void serviceIdle(IoService service, IdleStatus idleStatus) {
117             // Empty handler
118         }
119 
120         public void sessionCreated(IoSession session) {
121             // Empty handler
122         }
123 
124         public void sessionDestroyed(IoSession session) {
125             // Empty handler
126         }
127     };
128 
129     /**
130      * Current filter chain builder.
131      */
132     private IoFilterChainBuilder filterChainBuilder = new DefaultIoFilterChainBuilder();
133 
134     private IoSessionDataStructureFactory sessionDataStructureFactory = new DefaultIoSessionDataStructureFactory();
135 
136     /**
137      * Maintains the {@link IoServiceListener}s of this service.
138      */
139     private final IoServiceListenerSupport listeners;
140 
141     /**
142      * A lock object which must be acquired when related resources are
143      * destroyed.
144      */
145     protected final Object disposalLock = new Object();
146 
147     private volatile boolean disposing;
148 
149     private volatile boolean disposed;
150 
151     /**
152      * {@inheritDoc}
153      */
154     private IoServiceStatistics stats = new IoServiceStatistics(this);
155     
156 
157     /**
158      * Constructor for {@link AbstractIoService}. You need to provide a default
159      * session configuration and an {@link Executor} for handling I/O events. If
160      * a null {@link Executor} is provided, a default one will be created using
161      * {@link Executors#newCachedThreadPool()}.
162      * 
163      * @param sessionConfig
164      *            the default configuration for the managed {@link IoSession}
165      * @param executor
166      *            the {@link Executor} used for handling execution of I/O
167      *            events. Can be <code>null</code>.
168      */
169     protected AbstractIoService(IoSessionConfig sessionConfig, Executor executor) {
170         if (sessionConfig == null) {
171             throw new IllegalArgumentException("sessionConfig");
172         }
173 
174         if (getTransportMetadata() == null) {
175             throw new IllegalArgumentException("TransportMetadata");
176         }
177 
178         if (!getTransportMetadata().getSessionConfigType().isAssignableFrom(
179                 sessionConfig.getClass())) {
180             throw new IllegalArgumentException("sessionConfig type: "
181                     + sessionConfig.getClass() + " (expected: "
182                     + getTransportMetadata().getSessionConfigType() + ")");
183         }
184 
185         // Create the listeners, and add a first listener : a activation listener
186         // for this service, which will give information on the service state.
187         listeners = new IoServiceListenerSupport(this);
188         listeners.add(serviceActivationListener);
189 
190         // Stores the given session configuration
191         this.sessionConfig = sessionConfig;
192 
193         // Make JVM load the exception monitor before some transports
194         // change the thread context class loader.
195         ExceptionMonitor.getInstance();
196 
197         if (executor == null) {
198             this.executor = Executors.newCachedThreadPool();
199             createdExecutor = true;
200         } else {
201             this.executor = executor;
202             createdExecutor = false;
203         }
204 
205         threadName = getClass().getSimpleName() + '-' + id.incrementAndGet();
206     }
207 
208     /**
209      * {@inheritDoc}
210      */
211     public final IoFilterChainBuilder getFilterChainBuilder() {
212         return filterChainBuilder;
213     }
214 
215     /**
216      * {@inheritDoc}
217      */
218     public final void setFilterChainBuilder(IoFilterChainBuilder builder) {
219         if (builder == null) {
220             builder = new DefaultIoFilterChainBuilder();
221         }
222         filterChainBuilder = builder;
223     }
224 
225     /**
226      * {@inheritDoc}
227      */
228     public final DefaultIoFilterChainBuilder getFilterChain() {
229         if (filterChainBuilder instanceof DefaultIoFilterChainBuilder) {
230             return (DefaultIoFilterChainBuilder) filterChainBuilder;
231         }
232         
233         
234         throw new IllegalStateException(
235                     "Current filter chain builder is not a DefaultIoFilterChainBuilder.");
236     }
237 
238     /**
239      * {@inheritDoc}
240      */
241     public final void addListener(IoServiceListener listener) {
242         listeners.add(listener);
243     }
244 
245     /**
246      * {@inheritDoc}
247      */
248     public final void removeListener(IoServiceListener listener) {
249         listeners.remove(listener);
250     }
251 
252     /**
253      * {@inheritDoc}
254      */
255     public final boolean isActive() {
256         return listeners.isActive();
257     }
258 
259     /**
260      * {@inheritDoc}
261      */
262     public final boolean isDisposing() {
263         return disposing;
264     }
265 
266     /**
267      * {@inheritDoc}
268      */
269     public final boolean isDisposed() {
270         return disposed;
271     }
272 
273     /**
274      * {@inheritDoc}
275      */
276     public final void dispose() {
277       dispose(false);
278     }
279 
280   /**
281    * {@inheritDoc}
282    */
283     public final void dispose(boolean awaitTermination) {
284       if (disposed) {
285           return;
286       }
287 
288       synchronized (disposalLock) {
289           if (!disposing) {
290               disposing = true;
291 
292               try {
293                   dispose0();
294               } catch (Exception e) {
295                   ExceptionMonitor.getInstance().exceptionCaught(e);
296               }
297           }
298       }
299 
300       if (createdExecutor) {
301           ExecutorService e = (ExecutorService) executor;
302           e.shutdownNow();
303           if (awaitTermination) {
304 
305             //Thread.currentThread().setName();
306 
307             try {
308               LOGGER.debug("awaitTermination on {} called by thread=[{}]", this, Thread.currentThread().getName());
309               e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
310               LOGGER.debug("awaitTermination on {} finished", this);
311             } catch (InterruptedException e1) {
312               LOGGER.warn("awaitTermination on [{}] was interrupted", this);
313               // Restore the interrupted status
314               Thread.currentThread().interrupt();
315             }
316           }
317       }
318       disposed = true;
319     }
320 
321     /**
322      * Implement this method to release any acquired resources.  This method
323      * is invoked only once by {@link #dispose()}.
324      */
325     protected abstract void dispose0() throws Exception;
326 
327     /**
328      * {@inheritDoc}
329      */
330     public final Map<Long, IoSession> getManagedSessions() {
331         return listeners.getManagedSessions();
332     }
333 
334     /**
335      * {@inheritDoc}
336      */
337     public final int getManagedSessionCount() {
338         return listeners.getManagedSessionCount();
339     }
340 
341     /**
342      * {@inheritDoc}
343      */
344     public final IoHandler getHandler() {
345         return handler;
346     }
347 
348     /**
349      * {@inheritDoc}
350      */
351     public final void setHandler(IoHandler handler) {
352         if (handler == null) {
353             throw new IllegalArgumentException("handler cannot be null");
354         }
355 
356         if (isActive()) {
357             throw new IllegalStateException(
358                     "handler cannot be set while the service is active.");
359         }
360 
361         this.handler = handler;
362     }
363 
364     /**
365      * {@inheritDoc}
366      */
367     public IoSessionConfig getSessionConfig() {
368         return sessionConfig;
369     }
370 
371     /**
372      * {@inheritDoc}
373      */
374     public final IoSessionDataStructureFactory getSessionDataStructureFactory() {
375         return sessionDataStructureFactory;
376     }
377 
378     /**
379      * {@inheritDoc}
380      */
381     public final void setSessionDataStructureFactory(
382             IoSessionDataStructureFactory sessionDataStructureFactory) {
383         if (sessionDataStructureFactory == null) {
384             throw new IllegalArgumentException("sessionDataStructureFactory");
385         }
386 
387         if (isActive()) {
388             throw new IllegalStateException(
389                     "sessionDataStructureFactory cannot be set while the service is active.");
390         }
391 
392         this.sessionDataStructureFactory = sessionDataStructureFactory;
393     }
394 
395     /**
396      * {@inheritDoc}
397      */
398     public IoServiceStatistics getStatistics() {
399         return stats;
400     }
401 
402     /**
403      * {@inheritDoc}
404      */
405     public final long getActivationTime() {
406         return listeners.getActivationTime();
407     }
408 
409     /**
410      * {@inheritDoc}
411      */
412     public final Set<WriteFuture> broadcast(Object message) {
413         // Convert to Set.  We do not return a List here because only the
414         // direct caller of MessageBroadcaster knows the order of write
415         // operations.
416         final List<WriteFuture> futures = IoUtil.broadcast(message,
417                 getManagedSessions().values());
418         return new AbstractSet<WriteFuture>() {
419             @Override
420             public Iterator<WriteFuture> iterator() {
421                 return futures.iterator();
422             }
423 
424             @Override
425             public int size() {
426                 return futures.size();
427             }
428         };
429     }
430 
431     public final IoServiceListenerSupport getListeners() {
432         return listeners;
433     }
434 
435 
436     protected final void executeWorker(Runnable worker) {
437         executeWorker(worker, null);
438     }
439 
440     protected final void executeWorker(Runnable worker, String suffix) {
441         String actualThreadName = threadName;
442         if (suffix != null) {
443             actualThreadName = actualThreadName + '-' + suffix;
444         }
445         executor.execute(new NamePreservingRunnable(worker, actualThreadName));
446     }
447 
448     // TODO Figure out make it work without causing a compiler error / warning.
449     @SuppressWarnings("unchecked")
450     protected final void initSession(IoSession session,
451             IoFuture future, IoSessionInitializer sessionInitializer) {
452         // Update lastIoTime if needed.
453         if (stats.getLastReadTime() == 0) {
454             stats.setLastReadTime(getActivationTime());
455         }
456         
457         if (stats.getLastWriteTime() == 0) {
458             stats.setLastWriteTime(getActivationTime());
459         }
460 
461         // Every property but attributeMap should be set now.
462         // Now initialize the attributeMap.  The reason why we initialize
463         // the attributeMap at last is to make sure all session properties
464         // such as remoteAddress are provided to IoSessionDataStructureFactory.
465         try {
466             ((AbstractIoSession) session).setAttributeMap(session.getService()
467                     .getSessionDataStructureFactory().getAttributeMap(session));
468         } catch (IoSessionInitializationException e) {
469             throw e;
470         } catch (Exception e) {
471             throw new IoSessionInitializationException(
472                     "Failed to initialize an attributeMap.", e);
473         }
474 
475         try {
476             ((AbstractIoSession) session).setWriteRequestQueue(session
477                     .getService().getSessionDataStructureFactory()
478                     .getWriteRequestQueue(session));
479         } catch (IoSessionInitializationException e) {
480             throw e;
481         } catch (Exception e) {
482             throw new IoSessionInitializationException(
483                     "Failed to initialize a writeRequestQueue.", e);
484         }
485 
486         if ((future != null) && (future instanceof ConnectFuture)) {
487             // DefaultIoFilterChain will notify the future. (We support ConnectFuture only for now).
488             session.setAttribute(DefaultIoFilterChain.SESSION_CREATED_FUTURE,
489                     future);
490         }
491 
492         if (sessionInitializer != null) {
493             sessionInitializer.initializeSession(session, future);
494         }
495 
496         finishSessionInitialization0(session, future);
497     }
498 
499     /**
500      * Implement this method to perform additional tasks required for session
501      * initialization. Do not call this method directly;
502      * {@link #initSession(IoSession, IoFuture, IoSessionInitializer)} will call
503      * this method instead.
504      */
505     protected void finishSessionInitialization0(IoSession session,
506             IoFuture future) {
507         // Do nothing. Extended class might add some specific code 
508     }
509 
510     protected static class ServiceOperationFuture extends DefaultIoFuture {
511         public ServiceOperationFuture() {
512             super(null);
513         }
514 
515         public final boolean isDone() {
516             return getValue() == Boolean.TRUE;
517         }
518 
519         public final void setDone() {
520             setValue(Boolean.TRUE);
521         }
522 
523         public final Exception getException() {
524             if (getValue() instanceof Exception) {
525                 return (Exception) getValue();
526             }
527             
528             return null;
529         }
530 
531         public final void setException(Exception exception) {
532             if (exception == null) {
533                 throw new IllegalArgumentException("exception");
534             }
535             setValue(exception);
536         }
537     }
538 
539     /**
540      * {@inheritDoc}
541      */
542     public int getScheduledWriteBytes() {
543         return stats.getScheduledWriteBytes();
544     }
545 
546     /**
547      * {@inheritDoc}
548      */
549     public int getScheduledWriteMessages() {
550         return stats.getScheduledWriteMessages();
551     }
552     
553 }