View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.filter.codec;
21  
22  import java.util.Queue;
23  import java.util.concurrent.ConcurrentLinkedQueue;
24  
25  import org.apache.mina.core.buffer.IoBuffer;
26  
27  /**
28   * A {@link ProtocolEncoderOutput} based on queue.
29   *
30   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
31   */
32  public abstract class AbstractProtocolEncoderOutput implements ProtocolEncoderOutput {
33      /** The queue where the decoded messages are stored */
34      private final Queue<Object> messageQueue = new ConcurrentLinkedQueue<>();
35  
36      private boolean buffersOnly = true;
37  
38      /**
39       * Creates an instance of AbstractProtocolEncoderOutput
40       */
41      public AbstractProtocolEncoderOutput() {
42          // Do nothing
43      }
44  
45      /**
46       * @return The message queue
47       */
48      public Queue<Object> getMessageQueue() {
49          return messageQueue;
50      }
51  
52      /**
53       * {@inheritDoc}
54       */
55      @Override
56      public void write(Object encodedMessage) {
57          if (encodedMessage instanceof IoBuffer) {
58              IoBuffer buf = (IoBuffer) encodedMessage;
59              if (buf.hasRemaining()) {
60                  messageQueue.offer(buf);
61              } else {
62                  throw new IllegalArgumentException("buf is empty. Forgot to call flip()?");
63              }
64          } else {
65              messageQueue.offer(encodedMessage);
66              buffersOnly = false;
67          }
68      }
69  
70      /**
71       * {@inheritDoc}
72       */
73      @Override
74      public void mergeAll() {
75          if (!buffersOnly) {
76              throw new IllegalStateException("the encoded message list contains a non-buffer.");
77          }
78  
79          final int size = messageQueue.size();
80  
81          if (size < 2) {
82              // no need to merge!
83              return;
84          }
85  
86          // Get the size of merged BB
87          int sum = 0;
88          for (Object b : messageQueue) {
89              sum += ((IoBuffer) b).remaining();
90          }
91  
92          // Allocate a new BB that will contain all fragments
93          IoBuffer newBuf = IoBuffer.allocate(sum);
94  
95          // and merge all.
96          for (;;) {
97              IoBuffer buf = (IoBuffer) messageQueue.poll();
98              if (buf == null) {
99                  break;
100             }
101 
102             newBuf.put(buf);
103         }
104 
105         // Push the new buffer finally.
106         newBuf.flip();
107         messageQueue.add(newBuf);
108     }
109 }