View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.transport.vmpipe;
21  
22  import java.io.IOException;
23  import java.net.SocketAddress;
24  import java.util.HashSet;
25  import java.util.Set;
26  import java.util.concurrent.Executor;
27  
28  import org.apache.mina.core.filterchain.IoFilterChain;
29  import org.apache.mina.core.future.ConnectFuture;
30  import org.apache.mina.core.future.DefaultConnectFuture;
31  import org.apache.mina.core.future.IoFuture;
32  import org.apache.mina.core.future.IoFutureListener;
33  import org.apache.mina.core.service.AbstractIoConnector;
34  import org.apache.mina.core.service.IoHandler;
35  import org.apache.mina.core.service.TransportMetadata;
36  import org.apache.mina.core.session.IdleStatusChecker;
37  import org.apache.mina.core.session.IoSessionInitializer;
38  import org.apache.mina.util.ExceptionMonitor;
39  
40  /**
41   * Connects to {@link IoHandler}s which is bound on the specified
42   * {@link VmPipeAddress}.
43   *
44   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
45   */
46  public final class VmPipeConnector extends AbstractIoConnector {
47  
48      // object used for checking session idle
49      private IdleStatusChecker idleChecker;
50  
51      /**
52       * Creates a new instance.
53       */
54      public VmPipeConnector() {
55          this(null);
56      }
57  
58      /**
59       * Creates a new instance.
60       * 
61       * @param executor The executor to use
62       */
63      public VmPipeConnector(Executor executor) {
64          super(new DefaultVmPipeSessionConfig(), executor);
65          idleChecker = new IdleStatusChecker();
66          // we schedule the idle status checking task in this service exceutor
67          // it will be woke up every seconds
68          executeWorker(idleChecker.getNotifyingTask(), "idleStatusChecker");
69      }
70  
71      /**
72       * {@inheritDoc}
73       */
74      public TransportMetadata getTransportMetadata() {
75          return VmPipeSession.METADATA;
76      }
77  
78      /**
79       * {@inheritDoc}
80       */
81      public VmPipeSessionConfig getSessionConfig() {
82          return (VmPipeSessionConfig) sessionConfig;
83      }
84  
85      /**
86       * {@inheritDoc}
87       */
88      @Override
89      protected ConnectFuture connect0(SocketAddress remoteAddress, SocketAddress localAddress,
90              IoSessionInitializer<? extends ConnectFuture> sessionInitializer) {
91          VmPipe entry = VmPipeAcceptor.boundHandlers.get(remoteAddress);
92          if (entry == null) {
93              return DefaultConnectFuture.newFailedFuture(new IOException("Endpoint unavailable: " + remoteAddress));
94          }
95  
96          DefaultConnectFuture future = new DefaultConnectFuture();
97  
98          // Assign the local address dynamically,
99          VmPipeAddress actualLocalAddress;
100         try {
101             actualLocalAddress = nextLocalAddress();
102         } catch (IOException e) {
103             return DefaultConnectFuture.newFailedFuture(e);
104         }
105 
106         VmPipeSession localSession = new VmPipeSession(this, getListeners(), actualLocalAddress, getHandler(), entry);
107 
108         initSession(localSession, future, sessionInitializer);
109 
110         // and reclaim the local address when the connection is closed.
111         localSession.getCloseFuture().addListener(LOCAL_ADDRESS_RECLAIMER);
112 
113         // initialize connector session
114         try {
115             IoFilterChain filterChain = localSession.getFilterChain();
116             this.getFilterChainBuilder().buildFilterChain(filterChain);
117 
118             // The following sentences don't throw any exceptions.
119             getListeners().fireSessionCreated(localSession);
120             idleChecker.addSession(localSession);
121         } catch (Exception e) {
122             future.setException(e);
123             return future;
124         }
125 
126         // initialize acceptor session
127         VmPipeSession remoteSession = localSession.getRemoteSession();
128         ((VmPipeAcceptor) remoteSession.getService()).doFinishSessionInitialization(remoteSession, null);
129         try {
130             IoFilterChain filterChain = remoteSession.getFilterChain();
131             entry.getAcceptor().getFilterChainBuilder().buildFilterChain(filterChain);
132 
133             // The following sentences don't throw any exceptions.
134             entry.getListeners().fireSessionCreated(remoteSession);
135             idleChecker.addSession(remoteSession);
136         } catch (Exception e) {
137             ExceptionMonitor.getInstance().exceptionCaught(e);
138             remoteSession.close(true);
139         }
140 
141         // Start chains, and then allow and messages read/written to be processed. This is to ensure that
142         // sessionOpened gets received before a messageReceived
143         ((VmPipeFilterChain) localSession.getFilterChain()).start();
144         ((VmPipeFilterChain) remoteSession.getFilterChain()).start();
145 
146         return future;
147     }
148 
149     /**
150      * {@inheritDoc}
151      */
152     @Override
153     protected void dispose0() throws Exception {
154         // stop the idle checking task
155         idleChecker.getNotifyingTask().cancel();
156     }
157 
158     private static final Set<VmPipeAddress> TAKEN_LOCAL_ADDRESSES = new HashSet<VmPipeAddress>();
159 
160     private static int nextLocalPort = -1;
161 
162     private static final IoFutureListener<IoFuture> LOCAL_ADDRESS_RECLAIMER = new LocalAddressReclaimer();
163 
164     private static VmPipeAddress nextLocalAddress() throws IOException {
165         synchronized (TAKEN_LOCAL_ADDRESSES) {
166             if (nextLocalPort >= 0) {
167                 nextLocalPort = -1;
168             }
169             for (int i = 0; i < Integer.MAX_VALUE; i++) {
170                 VmPipeAddress answer = new VmPipeAddress(nextLocalPort--);
171                 if (!TAKEN_LOCAL_ADDRESSES.contains(answer)) {
172                     TAKEN_LOCAL_ADDRESSES.add(answer);
173                     return answer;
174                 }
175             }
176         }
177 
178         throw new IOException("Can't assign a local VM pipe port.");
179     }
180 
181     private static class LocalAddressReclaimer implements IoFutureListener<IoFuture> {
182         public void operationComplete(IoFuture future) {
183             synchronized (TAKEN_LOCAL_ADDRESSES) {
184                 TAKEN_LOCAL_ADDRESSES.remove(future.getSession().getLocalAddress());
185             }
186         }
187     }
188 }