001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.tcp;
018
019import java.io.IOException;
020import java.net.InetAddress;
021import java.net.InetSocketAddress;
022import java.net.ServerSocket;
023import java.net.Socket;
024import java.net.SocketException;
025import java.net.SocketTimeoutException;
026import java.net.URI;
027import java.net.URISyntaxException;
028import java.net.UnknownHostException;
029import java.nio.channels.ClosedChannelException;
030import java.nio.channels.SelectionKey;
031import java.nio.channels.Selector;
032import java.nio.channels.ServerSocketChannel;
033import java.nio.channels.SocketChannel;
034import java.util.HashMap;
035import java.util.Iterator;
036import java.util.Set;
037import java.util.concurrent.BlockingQueue;
038import java.util.concurrent.LinkedBlockingQueue;
039import java.util.concurrent.TimeUnit;
040import java.util.concurrent.atomic.AtomicInteger;
041
042import javax.net.ServerSocketFactory;
043import javax.net.ssl.SSLParameters;
044import javax.net.ssl.SSLServerSocket;
045
046import org.apache.activemq.Service;
047import org.apache.activemq.ThreadPriorities;
048import org.apache.activemq.TransportLoggerSupport;
049import org.apache.activemq.command.BrokerInfo;
050import org.apache.activemq.openwire.OpenWireFormatFactory;
051import org.apache.activemq.transport.Transport;
052import org.apache.activemq.transport.TransportFactory;
053import org.apache.activemq.transport.TransportServer;
054import org.apache.activemq.transport.TransportServerThreadSupport;
055import org.apache.activemq.util.IOExceptionSupport;
056import org.apache.activemq.util.InetAddressUtil;
057import org.apache.activemq.util.IntrospectionSupport;
058import org.apache.activemq.util.ServiceListener;
059import org.apache.activemq.util.ServiceStopper;
060import org.apache.activemq.util.ServiceSupport;
061import org.apache.activemq.wireformat.WireFormat;
062import org.apache.activemq.wireformat.WireFormatFactory;
063import org.slf4j.Logger;
064import org.slf4j.LoggerFactory;
065
066/**
067 * A TCP based implementation of {@link TransportServer}
068 */
069public class TcpTransportServer extends TransportServerThreadSupport implements ServiceListener {
070
071    private static final Logger LOG = LoggerFactory.getLogger(TcpTransportServer.class);
072
073    protected volatile ServerSocket serverSocket;
074    protected volatile Selector selector;
075    protected int backlog = 5000;
076    protected WireFormatFactory wireFormatFactory = new OpenWireFormatFactory();
077    protected final TcpTransportFactory transportFactory;
078    protected long maxInactivityDuration = 30000;
079    protected long maxInactivityDurationInitalDelay = 10000;
080    protected int minmumWireFormatVersion;
081    protected boolean useQueueForAccept = true;
082    protected boolean allowLinkStealing;
083    protected boolean verifyHostName = false;
084
085    /**
086     * trace=true -> the Transport stack where this TcpTransport object will be, will have a TransportLogger layer
087     * trace=false -> the Transport stack where this TcpTransport object will be, will NOT have a TransportLogger layer,
088     * and therefore will never be able to print logging messages. This parameter is most probably set in Connection or
089     * TransportConnector URIs.
090     */
091    protected boolean trace = false;
092
093    protected int soTimeout = 0;
094    protected int socketBufferSize = 64 * 1024;
095    protected int connectionTimeout = 30000;
096
097    /**
098     * Name of the LogWriter implementation to use. Names are mapped to classes in the
099     * resources/META-INF/services/org/apache/activemq/transport/logwriters directory. This parameter is most probably
100     * set in Connection or TransportConnector URIs.
101     */
102    protected String logWriterName = TransportLoggerSupport.defaultLogWriterName;
103
104    /**
105     * Specifies if the TransportLogger will be manageable by JMX or not. Also, as long as there is at least 1
106     * TransportLogger which is manageable, a TransportLoggerControl MBean will me created.
107     */
108    protected boolean dynamicManagement = false;
109
110    /**
111     * startLogging=true -> the TransportLogger object of the Transport stack will initially write messages to the log.
112     * startLogging=false -> the TransportLogger object of the Transport stack will initially NOT write messages to the
113     * log. This parameter only has an effect if trace == true. This parameter is most probably set in Connection or
114     * TransportConnector URIs.
115     */
116    protected boolean startLogging = true;
117    protected final ServerSocketFactory serverSocketFactory;
118    protected final BlockingQueue<Socket> socketQueue = new LinkedBlockingQueue<Socket>();
119    protected Thread socketHandlerThread;
120
121    /**
122     * The maximum number of sockets allowed for this server
123     */
124    protected int maximumConnections = Integer.MAX_VALUE;
125    protected final AtomicInteger currentTransportCount = new AtomicInteger();
126
127    public TcpTransportServer(TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory) throws IOException,
128        URISyntaxException {
129        super(location);
130        this.transportFactory = transportFactory;
131        this.serverSocketFactory = serverSocketFactory;
132    }
133
134    public void bind() throws IOException {
135        URI bind = getBindLocation();
136
137        String host = bind.getHost();
138        host = (host == null || host.length() == 0) ? "localhost" : host;
139        InetAddress addr = InetAddress.getByName(host);
140
141        try {
142            serverSocket = serverSocketFactory.createServerSocket(bind.getPort(), backlog, addr);
143            configureServerSocket(serverSocket);
144        } catch (IOException e) {
145            throw IOExceptionSupport.create("Failed to bind to server socket: " + bind + " due to: " + e, e);
146        }
147        try {
148            setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), resolveHostName(serverSocket, addr), serverSocket.getLocalPort(), bind.getPath(),
149                bind.getQuery(), bind.getFragment()));
150        } catch (URISyntaxException e) {
151            // it could be that the host name contains invalid characters such
152            // as _ on unix platforms so lets try use the IP address instead
153            try {
154                setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), addr.getHostAddress(), serverSocket.getLocalPort(), bind.getPath(),
155                    bind.getQuery(), bind.getFragment()));
156            } catch (URISyntaxException e2) {
157                throw IOExceptionSupport.create(e2);
158            }
159        }
160    }
161
162    private void configureServerSocket(ServerSocket socket) throws SocketException {
163        socket.setSoTimeout(2000);
164        if (transportOptions != null) {
165
166            // If the enabledCipherSuites option is invalid we don't want to ignore it as the call
167            // to SSLServerSocket to configure it has a side effect on the socket rendering it
168            // useless as all suites are enabled many of which are considered as insecure.  We
169            // instead trap that option here and throw an exception.  We should really consider
170            // all invalid options as breaking and not start the transport but the current design
171            // doesn't really allow for this.
172            //
173            //  see: https://issues.apache.org/jira/browse/AMQ-4582
174            //
175            if (socket instanceof SSLServerSocket) {
176                if (transportOptions.containsKey("verifyHostName")) {
177                    verifyHostName = Boolean.parseBoolean(transportOptions.get("verifyHostName").toString());
178                } else {
179                    transportOptions.put("verifyHostName", verifyHostName);
180                }
181
182                if (verifyHostName) {
183                    SSLParameters sslParams = new SSLParameters();
184                    sslParams.setEndpointIdentificationAlgorithm("HTTPS");
185                    ((SSLServerSocket)this.serverSocket).setSSLParameters(sslParams);
186                }
187
188
189                if (transportOptions.containsKey("enabledCipherSuites")) {
190                    Object cipherSuites = transportOptions.remove("enabledCipherSuites");
191
192                    if (!IntrospectionSupport.setProperty(socket, "enabledCipherSuites", cipherSuites)) {
193                        throw new SocketException(String.format(
194                            "Invalid transport options {enabledCipherSuites=%s}", cipherSuites));
195                    }
196                }
197            }
198
199            IntrospectionSupport.setProperties(socket, transportOptions);
200        }
201    }
202
203    /**
204     * @return Returns the wireFormatFactory.
205     */
206    public WireFormatFactory getWireFormatFactory() {
207        return wireFormatFactory;
208    }
209
210    /**
211     * @param wireFormatFactory
212     *            The wireFormatFactory to set.
213     */
214    public void setWireFormatFactory(WireFormatFactory wireFormatFactory) {
215        this.wireFormatFactory = wireFormatFactory;
216    }
217
218    /**
219     * Associates a broker info with the transport server so that the transport can do discovery advertisements of the
220     * broker.
221     *
222     * @param brokerInfo
223     */
224    @Override
225    public void setBrokerInfo(BrokerInfo brokerInfo) {
226    }
227
228    public long getMaxInactivityDuration() {
229        return maxInactivityDuration;
230    }
231
232    public void setMaxInactivityDuration(long maxInactivityDuration) {
233        this.maxInactivityDuration = maxInactivityDuration;
234    }
235
236    public long getMaxInactivityDurationInitalDelay() {
237        return this.maxInactivityDurationInitalDelay;
238    }
239
240    public void setMaxInactivityDurationInitalDelay(long maxInactivityDurationInitalDelay) {
241        this.maxInactivityDurationInitalDelay = maxInactivityDurationInitalDelay;
242    }
243
244    public int getMinmumWireFormatVersion() {
245        return minmumWireFormatVersion;
246    }
247
248    public void setMinmumWireFormatVersion(int minmumWireFormatVersion) {
249        this.minmumWireFormatVersion = minmumWireFormatVersion;
250    }
251
252    public boolean isTrace() {
253        return trace;
254    }
255
256    public void setTrace(boolean trace) {
257        this.trace = trace;
258    }
259
260    public String getLogWriterName() {
261        return logWriterName;
262    }
263
264    public void setLogWriterName(String logFormat) {
265        this.logWriterName = logFormat;
266    }
267
268    public boolean isDynamicManagement() {
269        return dynamicManagement;
270    }
271
272    public void setDynamicManagement(boolean useJmx) {
273        this.dynamicManagement = useJmx;
274    }
275
276    public boolean isStartLogging() {
277        return startLogging;
278    }
279
280    public void setStartLogging(boolean startLogging) {
281        this.startLogging = startLogging;
282    }
283
284    /**
285     * @return the backlog
286     */
287    public int getBacklog() {
288        return backlog;
289    }
290
291    /**
292     * @param backlog
293     *            the backlog to set
294     */
295    public void setBacklog(int backlog) {
296        this.backlog = backlog;
297    }
298
299    /**
300     * @return the useQueueForAccept
301     */
302    public boolean isUseQueueForAccept() {
303        return useQueueForAccept;
304    }
305
306    /**
307     * @param useQueueForAccept
308     *            the useQueueForAccept to set
309     */
310    public void setUseQueueForAccept(boolean useQueueForAccept) {
311        this.useQueueForAccept = useQueueForAccept;
312    }
313
314    /**
315     * pull Sockets from the ServerSocket
316     */
317    @Override
318    public void run() {
319        if (!isStopped() && !isStopping()) {
320            final ServerSocket serverSocket = this.serverSocket;
321            if (serverSocket == null) {
322                onAcceptError(new IOException("Server started without a valid ServerSocket"));
323            }
324
325            final ServerSocketChannel channel = serverSocket.getChannel();
326            if (channel != null) {
327                doRunWithServerSocketChannel(channel);
328            } else {
329                doRunWithServerSocket(serverSocket);
330            }
331        }
332    }
333
334    private void doRunWithServerSocketChannel(final ServerSocketChannel channel) {
335        try {
336            channel.configureBlocking(false);
337            final Selector selector = Selector.open();
338
339            try {
340                channel.register(selector, SelectionKey.OP_ACCEPT);
341            } catch (ClosedChannelException ex) {
342                try {
343                    selector.close();
344                } catch (IOException ignore) {}
345
346                throw ex;
347            }
348
349            // Update object instance for later cleanup.
350            this.selector = selector;
351
352            while (!isStopped()) {
353                int count = selector.select(10);
354
355                if (count == 0) {
356                    continue;
357                }
358
359                Set<SelectionKey> keys = selector.selectedKeys();
360
361                for (Iterator<SelectionKey> i = keys.iterator(); i.hasNext(); ) {
362                    final SelectionKey key = i.next();
363                    if (key.isAcceptable()) {
364                        try {
365                            SocketChannel sc = channel.accept();
366                            if (sc != null) {
367                                if (isStopped() || getAcceptListener() == null) {
368                                    sc.close();
369                                } else {
370                                    if (useQueueForAccept) {
371                                        socketQueue.put(sc.socket());
372                                    } else {
373                                        handleSocket(sc.socket());
374                                    }
375                                }
376                            }
377
378                        } catch (SocketTimeoutException ste) {
379                            // expect this to happen
380                        } catch (Exception e) {
381                            e.printStackTrace();
382                            if (!isStopping()) {
383                                onAcceptError(e);
384                            } else if (!isStopped()) {
385                                LOG.warn("run()", e);
386                                onAcceptError(e);
387                            }
388                        }
389                    }
390                    i.remove();
391                }
392            }
393        } catch (IOException ex) {
394            if (!isStopping()) {
395                onAcceptError(ex);
396            } else if (!isStopped()) {
397                LOG.warn("run()", ex);
398                onAcceptError(ex);
399            }
400        }
401    }
402
403    private void doRunWithServerSocket(final ServerSocket serverSocket) {
404        while (!isStopped()) {
405            Socket socket = null;
406            try {
407                socket = serverSocket.accept();
408                if (socket != null) {
409                    if (isStopped() || getAcceptListener() == null) {
410                        socket.close();
411                    } else {
412                        if (useQueueForAccept) {
413                            socketQueue.put(socket);
414                        } else {
415                            handleSocket(socket);
416                        }
417                    }
418                }
419            } catch (SocketTimeoutException ste) {
420                // expect this to happen
421            } catch (Exception e) {
422                if (!isStopping()) {
423                    onAcceptError(e);
424                } else if (!isStopped()) {
425                    LOG.warn("run()", e);
426                    onAcceptError(e);
427                }
428            }
429        }
430    }
431
432    /**
433     * Allow derived classes to override the Transport implementation that this transport server creates.
434     *
435     * @param socket
436     * @param format
437     *
438     * @return a new Transport instance.
439     *
440     * @throws IOException
441     */
442    protected Transport createTransport(Socket socket, WireFormat format) throws IOException {
443        return new TcpTransport(format, socket);
444    }
445
446    /**
447     * @return pretty print of this
448     */
449    @Override
450    public String toString() {
451        return "" + getBindLocation();
452    }
453
454    /**
455     * @param socket
456     * @param bindAddress
457     * @return real hostName
458     * @throws UnknownHostException
459     */
460    protected String resolveHostName(ServerSocket socket, InetAddress bindAddress) throws UnknownHostException {
461        String result = null;
462        if (socket.isBound()) {
463            if (socket.getInetAddress().isAnyLocalAddress()) {
464                // make it more human readable and useful, an alternative to 0.0.0.0
465                result = InetAddressUtil.getLocalHostName();
466            } else {
467                result = socket.getInetAddress().getCanonicalHostName();
468            }
469        } else {
470            result = bindAddress.getCanonicalHostName();
471        }
472        return result;
473    }
474
475    @Override
476    protected void doStart() throws Exception {
477        if (useQueueForAccept) {
478            Runnable run = new Runnable() {
479                @Override
480                public void run() {
481                    try {
482                        while (!isStopped() && !isStopping()) {
483                            Socket sock = socketQueue.poll(1, TimeUnit.SECONDS);
484                            if (sock != null) {
485                                try {
486                                    handleSocket(sock);
487                                } catch (Throwable thrown) {
488                                    if (!isStopping()) {
489                                        onAcceptError(new Exception(thrown));
490                                    } else if (!isStopped()) {
491                                        LOG.warn("Unexpected error thrown during accept handling: ", thrown);
492                                        onAcceptError(new Exception(thrown));
493                                    }
494                                }
495                            }
496                        }
497
498                    } catch (InterruptedException e) {
499                        if (!isStopped() || !isStopping()) {
500                            LOG.info("socketQueue interrupted - stopping");
501                            onAcceptError(e);
502                        }
503                    }
504                }
505            };
506            socketHandlerThread = new Thread(null, run, "ActiveMQ Transport Server Thread Handler: " + toString(), getStackSize());
507            socketHandlerThread.setDaemon(true);
508            socketHandlerThread.setPriority(ThreadPriorities.BROKER_MANAGEMENT - 1);
509            socketHandlerThread.start();
510        }
511        super.doStart();
512    }
513
514    @Override
515    protected void doStop(ServiceStopper stopper) throws Exception {
516        Exception firstFailure = null;
517
518        try {
519            if (selector != null) {
520                selector.close();
521                selector = null;
522            }
523        } catch (Exception error) {
524        }
525
526        try {
527            final ServerSocket serverSocket = this.serverSocket;
528            if (serverSocket != null) {
529                this.serverSocket = null;
530                serverSocket.close();
531            }
532        } catch (Exception error) {
533            firstFailure = error;
534        }
535
536        if (socketHandlerThread != null) {
537            socketHandlerThread.interrupt();
538            socketHandlerThread = null;
539        }
540
541        try {
542            super.doStop(stopper);
543        } catch (Exception error) {
544            if (firstFailure != null) {
545                firstFailure = error;
546            }
547        }
548
549        if (firstFailure != null) {
550            throw firstFailure;
551        }
552    }
553
554    @Override
555    public InetSocketAddress getSocketAddress() {
556        return (InetSocketAddress) serverSocket.getLocalSocketAddress();
557    }
558
559    protected void handleSocket(Socket socket) {
560        doHandleSocket(socket);
561    }
562
563    final protected void doHandleSocket(Socket socket) {
564        boolean closeSocket = true;
565        boolean countIncremented = false;
566        try {
567            int currentCount;
568            do {
569                currentCount = currentTransportCount.get();
570                if (currentCount >= this.maximumConnections) {
571                     throw new ExceededMaximumConnectionsException(
572                         "Exceeded the maximum number of allowed client connections. See the '" +
573                         "maximumConnections' property on the TCP transport configuration URI " +
574                         "in the ActiveMQ configuration file (e.g., activemq.xml)");
575                 }
576
577            //Increment this value before configuring the transport
578            //This is necessary because some of the transport servers must read from the
579            //socket during configureTransport() so we want to make sure this value is
580            //accurate as the transport server could pause here waiting for data to be sent from a client
581            } while(!currentTransportCount.compareAndSet(currentCount, currentCount + 1));
582            countIncremented = true;
583
584            HashMap<String, Object> options = new HashMap<String, Object>();
585            options.put("maxInactivityDuration", Long.valueOf(maxInactivityDuration));
586            options.put("maxInactivityDurationInitalDelay", Long.valueOf(maxInactivityDurationInitalDelay));
587            options.put("minmumWireFormatVersion", Integer.valueOf(minmumWireFormatVersion));
588            options.put("trace", Boolean.valueOf(trace));
589            options.put("soTimeout", Integer.valueOf(soTimeout));
590            options.put("socketBufferSize", Integer.valueOf(socketBufferSize));
591            options.put("connectionTimeout", Integer.valueOf(connectionTimeout));
592            options.put("logWriterName", logWriterName);
593            options.put("dynamicManagement", Boolean.valueOf(dynamicManagement));
594            options.put("startLogging", Boolean.valueOf(startLogging));
595            options.putAll(transportOptions);
596
597            TransportInfo transportInfo = configureTransport(this, socket);
598            closeSocket = false;
599
600            if (transportInfo.transport instanceof ServiceSupport) {
601                ((ServiceSupport) transportInfo.transport).addServiceListener(this);
602            }
603
604            Transport configuredTransport = transportInfo.transportFactory.serverConfigure(
605                    transportInfo.transport, transportInfo.format, options);
606
607            getAcceptListener().onAccept(configuredTransport);
608
609        } catch (SocketTimeoutException ste) {
610            // expect this to happen
611        } catch (Exception e) {
612            if (closeSocket) {
613                try {
614                    //if closing the socket, only decrement the count it was actually incremented
615                    //where it was incremented
616                    if (countIncremented) {
617                        currentTransportCount.decrementAndGet();
618                    }
619                    socket.close();
620                } catch (Exception ignore) {
621                }
622            }
623
624            if (!isStopping()) {
625                onAcceptError(e);
626            } else if (!isStopped()) {
627                LOG.warn("run()", e);
628                onAcceptError(e);
629            }
630        }
631    }
632
633    protected TransportInfo configureTransport(final TcpTransportServer server, final Socket socket) throws Exception {
634        WireFormat format = wireFormatFactory.createWireFormat();
635        Transport transport = createTransport(socket, format);
636        return new TransportInfo(format, transport, transportFactory);
637    }
638
639    protected class TransportInfo {
640        final WireFormat format;
641        final Transport transport;
642        final TransportFactory transportFactory;
643
644        public TransportInfo(WireFormat format, Transport transport, TransportFactory transportFactory) {
645            this.format = format;
646            this.transport = transport;
647            this.transportFactory = transportFactory;
648        }
649    }
650
651    public int getSoTimeout() {
652        return soTimeout;
653    }
654
655    public void setSoTimeout(int soTimeout) {
656        this.soTimeout = soTimeout;
657    }
658
659    public int getSocketBufferSize() {
660        return socketBufferSize;
661    }
662
663    public void setSocketBufferSize(int socketBufferSize) {
664        this.socketBufferSize = socketBufferSize;
665    }
666
667    public int getConnectionTimeout() {
668        return connectionTimeout;
669    }
670
671    public void setConnectionTimeout(int connectionTimeout) {
672        this.connectionTimeout = connectionTimeout;
673    }
674
675    /**
676     * @return the maximumConnections
677     */
678    public int getMaximumConnections() {
679        return maximumConnections;
680    }
681
682    /**
683     * @param maximumConnections
684     *            the maximumConnections to set
685     */
686    public void setMaximumConnections(int maximumConnections) {
687        this.maximumConnections = maximumConnections;
688    }
689
690    public AtomicInteger getCurrentTransportCount() {
691        return currentTransportCount;
692    }
693
694    @Override
695    public void started(Service service) {
696    }
697
698    @Override
699    public void stopped(Service service) {
700        this.currentTransportCount.decrementAndGet();
701    }
702
703    @Override
704    public boolean isSslServer() {
705        return false;
706    }
707
708    @Override
709    public boolean isAllowLinkStealing() {
710        return allowLinkStealing;
711    }
712
713    @Override
714    public void setAllowLinkStealing(boolean allowLinkStealing) {
715        this.allowLinkStealing = allowLinkStealing;
716    }
717}