View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.transport.serial;
21  
22  import gnu.io.SerialPort;
23  import gnu.io.SerialPortEvent;
24  import gnu.io.SerialPortEventListener;
25  
26  import java.io.IOException;
27  import java.io.InputStream;
28  import java.io.OutputStream;
29  import java.util.TooManyListenersException;
30  
31  import org.apache.mina.core.session.AbstractIoSession;
32  import org.apache.mina.core.filterchain.DefaultIoFilterChain;
33  import org.apache.mina.core.service.DefaultTransportMetadata;
34  import org.apache.mina.util.ExceptionMonitor;
35  import org.apache.mina.core.buffer.IoBuffer;
36  import org.apache.mina.core.filterchain.IoFilterChain;
37  import org.apache.mina.core.service.IoHandler;
38  import org.apache.mina.core.service.IoProcessor;
39  import org.apache.mina.core.service.IoService;
40  import org.apache.mina.core.service.IoServiceListenerSupport;
41  import org.apache.mina.core.service.TransportMetadata;
42  import org.apache.mina.core.write.WriteRequest;
43  
44  import org.slf4j.Logger;
45  import org.slf4j.LoggerFactory;
46  
47  /**
48   * An imlpementation of {@link SerialSession}.
49   *
50   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
51   */
52  class SerialSessionImpl extends AbstractIoSession implements
53          SerialSession, SerialPortEventListener {
54  
55      static final TransportMetadata METADATA =
56          new DefaultTransportMetadata(
57                  "rxtx", "serial", false, true, SerialAddress.class,
58                  SerialSessionConfig.class, IoBuffer.class);
59  
60      private final SerialSessionConfig config = new DefaultSerialSessionConfig();
61      private final IoProcessor<SerialSessionImpl> processor = new SerialIoProcessor();
62      private final IoHandler ioHandler;
63      private final IoFilterChain filterChain;
64      private final SerialConnector service;
65      private final IoServiceListenerSupport serviceListeners;
66      private final SerialAddress address;
67      private final SerialPort port;
68      private final Logger log;
69  
70      private InputStream inputStream;
71      private OutputStream outputStream;
72  
73      SerialSessionImpl(
74              SerialConnector service, IoServiceListenerSupport serviceListeners,
75              SerialAddress address, SerialPort port) {
76          this.service = service;
77          this.serviceListeners = serviceListeners;
78          ioHandler = service.getHandler();
79          filterChain = new DefaultIoFilterChain(this);
80          this.port = port;
81          this.address = address;
82  
83          log = LoggerFactory.getLogger(SerialSessionImpl.class);
84      }
85  
86      public SerialSessionConfig getConfig() {
87          return config;
88      }
89  
90      public IoFilterChain getFilterChain() {
91          return filterChain;
92      }
93  
94      public IoHandler getHandler() {
95          return ioHandler;
96      }
97  
98      public TransportMetadata getTransportMetadata() {
99          return METADATA;
100     }
101 
102     public SerialAddress getLocalAddress() {
103         return null; // not applicable
104     }
105 
106     public SerialAddress getRemoteAddress() {
107         return address;
108     }
109 
110     @Override
111     public SerialAddress getServiceAddress() {
112         return (SerialAddress) super.getServiceAddress();
113     }
114 
115     public IoService getService() {
116         return service;
117     }
118 
119     public void setDTR(boolean dtr) {
120         port.setDTR(dtr);
121     }
122 
123     public boolean isDTR() {
124         return port.isDTR();
125     }
126 
127     public void setRTS(boolean rts) {
128         port.setRTS(rts);
129     }
130 
131     public boolean isRTS() {
132         return port.isRTS();
133     }
134 
135     /**
136      * start handling streams
137      *
138      * @throws IOException
139      * @throws TooManyListenersException
140      */
141     void start() throws IOException, TooManyListenersException {
142         inputStream = port.getInputStream();
143         outputStream = port.getOutputStream();
144         ReadWorker w = new ReadWorker();
145         w.start();
146         port.addEventListener(this);
147         service.getIdleStatusChecker0().addSession(this);
148         try {
149             getService().getFilterChainBuilder().buildFilterChain(getFilterChain());
150             serviceListeners.fireSessionCreated(this);
151         } catch (Throwable e) {
152             getFilterChain().fireExceptionCaught(e);
153             processor.remove(this);
154         }
155     }
156 
157     private final Object writeMonitor = new Object();
158     private WriteWorker writeWorker;
159 
160     private class WriteWorker extends Thread {
161         @Override
162         public void run() {
163             while (isConnected() && !isClosing()) {
164                 flushWrites();
165 
166                 // wait for more data
167                 synchronized (writeMonitor) {
168                     try {
169                         writeMonitor.wait();
170                     } catch (InterruptedException e) {
171                         log.error("InterruptedException", e);
172                     }
173                 }
174             }
175         }
176     }
177 
178     private void flushWrites() {
179         for (; ;) {
180             WriteRequest req = getCurrentWriteRequest();
181             if (req == null) {
182                 req = getWriteRequestQueue().poll(this);
183                 if (req == null) {
184                     break;
185                 }
186             }
187 
188             IoBuffer buf = (IoBuffer) req.getMessage();
189             if (buf.remaining() == 0) {
190                 setCurrentWriteRequest(null);
191                 buf.reset();
192                 this.getFilterChain().fireMessageSent(req);
193                 continue;
194             }
195 
196             int writtenBytes = buf.remaining();
197             try {
198                 outputStream.write(buf.array(), buf.position(), writtenBytes);
199                 buf.position(buf.position() + writtenBytes);
200                 
201                 // increase written bytes
202                 increaseWrittenBytes(writtenBytes, System.currentTimeMillis());
203                 
204                 setCurrentWriteRequest(null);
205                 buf.reset();
206                 
207                 // fire the message sent event
208                 getFilterChain().fireMessageSent(req);
209             } catch (IOException e) {
210                 this.getFilterChain().fireExceptionCaught(e);
211             }
212 
213         }
214     }
215 
216     private final Object readReadyMonitor = new Object();
217 
218     private class ReadWorker extends Thread {
219         @Override
220         public void run() {
221             while (isConnected() && !isClosing()) {
222                 synchronized (readReadyMonitor) {
223                     try {
224                         readReadyMonitor.wait();
225                     } catch (InterruptedException e) {
226                         log.error("InterruptedException", e);
227                     }
228                     if (isClosing() || !isConnected()) {
229                         break;
230                     }
231                     int dataSize;
232                     try {
233                         dataSize = inputStream.available();
234                         byte[] data = new byte[dataSize];
235                         int readBytes = inputStream.read(data);
236 
237                         if (readBytes > 0) {
238                             IoBuffer buf = IoBuffer
239                                     .wrap(data, 0, readBytes);
240                             buf.put(data, 0, readBytes);
241                             buf.flip();
242                             getFilterChain().fireMessageReceived(
243                                     buf);
244                         }
245                     } catch (IOException e) {
246                         getFilterChain().fireExceptionCaught(
247                                 e);
248                     }
249                 }
250             }
251         }
252     }
253 
254     public void serialEvent(SerialPortEvent evt) {
255         if (evt.getEventType() == SerialPortEvent.DATA_AVAILABLE) {
256             synchronized (readReadyMonitor) {
257                 readReadyMonitor.notifyAll();
258             }
259         }
260     }
261 
262     @Override
263     public IoProcessor<SerialSessionImpl> getProcessor() {
264         return processor;
265     }
266 
267     private class SerialIoProcessor implements IoProcessor<SerialSessionImpl> {
268         public void add(SerialSessionImpl session) {
269             // It's already added when the session is constructed.
270         }
271 
272         public void flush(SerialSessionImpl session) {
273             if (writeWorker == null) {
274                 writeWorker = new WriteWorker();
275                 writeWorker.start();
276             } else {
277                 synchronized (writeMonitor) {
278                     writeMonitor.notifyAll();
279                 }
280             }
281         }
282 
283         public void remove(SerialSessionImpl session) {
284             try {
285                 inputStream.close();
286             } catch (IOException e) {
287                 ExceptionMonitor.getInstance().exceptionCaught(e);
288             }
289             try {
290                 outputStream.close();
291             } catch (IOException e) {
292                 ExceptionMonitor.getInstance().exceptionCaught(e);
293             }
294 
295             port.close();
296             flush(session);
297             synchronized (readReadyMonitor) {
298                 readReadyMonitor.notifyAll();
299             }
300 
301             serviceListeners.fireSessionDestroyed(SerialSessionImpl.this);
302         }
303 
304         public void updateTrafficControl(SerialSessionImpl session) {
305             throw new UnsupportedOperationException();
306         }
307 
308         public void dispose() {
309             // Nothing to dispose
310         }
311 
312         public boolean isDisposed() {
313             return false;
314         }
315 
316         public boolean isDisposing() {
317             return false;
318         }
319     }
320 }