001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.tcp; 018 019import java.io.IOException; 020import java.net.InetAddress; 021import java.net.InetSocketAddress; 022import java.net.ServerSocket; 023import java.net.Socket; 024import java.net.SocketException; 025import java.net.SocketTimeoutException; 026import java.net.URI; 027import java.net.URISyntaxException; 028import java.net.UnknownHostException; 029import java.nio.channels.ClosedChannelException; 030import java.nio.channels.SelectionKey; 031import java.nio.channels.Selector; 032import java.nio.channels.ServerSocketChannel; 033import java.nio.channels.SocketChannel; 034import java.util.HashMap; 035import java.util.Iterator; 036import java.util.Set; 037import java.util.concurrent.BlockingQueue; 038import java.util.concurrent.LinkedBlockingQueue; 039import java.util.concurrent.TimeUnit; 040import java.util.concurrent.atomic.AtomicInteger; 041 042import javax.net.ServerSocketFactory; 043import javax.net.ssl.SSLParameters; 044import javax.net.ssl.SSLServerSocket; 045 046import org.apache.activemq.Service; 047import org.apache.activemq.ThreadPriorities; 048import org.apache.activemq.TransportLoggerSupport; 049import org.apache.activemq.command.BrokerInfo; 050import org.apache.activemq.openwire.OpenWireFormatFactory; 051import org.apache.activemq.transport.Transport; 052import org.apache.activemq.transport.TransportFactory; 053import org.apache.activemq.transport.TransportServer; 054import org.apache.activemq.transport.TransportServerThreadSupport; 055import org.apache.activemq.util.IOExceptionSupport; 056import org.apache.activemq.util.InetAddressUtil; 057import org.apache.activemq.util.IntrospectionSupport; 058import org.apache.activemq.util.ServiceListener; 059import org.apache.activemq.util.ServiceStopper; 060import org.apache.activemq.util.ServiceSupport; 061import org.apache.activemq.wireformat.WireFormat; 062import org.apache.activemq.wireformat.WireFormatFactory; 063import org.slf4j.Logger; 064import org.slf4j.LoggerFactory; 065 066/** 067 * A TCP based implementation of {@link TransportServer} 068 */ 069public class TcpTransportServer extends TransportServerThreadSupport implements ServiceListener { 070 071 private static final Logger LOG = LoggerFactory.getLogger(TcpTransportServer.class); 072 073 protected volatile ServerSocket serverSocket; 074 protected volatile Selector selector; 075 protected int backlog = 5000; 076 protected WireFormatFactory wireFormatFactory = new OpenWireFormatFactory(); 077 protected final TcpTransportFactory transportFactory; 078 protected long maxInactivityDuration = 30000; 079 protected long maxInactivityDurationInitalDelay = 10000; 080 protected int minmumWireFormatVersion; 081 protected boolean useQueueForAccept = true; 082 protected boolean allowLinkStealing; 083 protected boolean verifyHostName = false; 084 085 /** 086 * trace=true -> the Transport stack where this TcpTransport object will be, will have a TransportLogger layer 087 * trace=false -> the Transport stack where this TcpTransport object will be, will NOT have a TransportLogger layer, 088 * and therefore will never be able to print logging messages. This parameter is most probably set in Connection or 089 * TransportConnector URIs. 090 */ 091 protected boolean trace = false; 092 093 protected int soTimeout = 0; 094 protected int socketBufferSize = 64 * 1024; 095 protected int connectionTimeout = 30000; 096 097 /** 098 * Name of the LogWriter implementation to use. Names are mapped to classes in the 099 * resources/META-INF/services/org/apache/activemq/transport/logwriters directory. This parameter is most probably 100 * set in Connection or TransportConnector URIs. 101 */ 102 protected String logWriterName = TransportLoggerSupport.defaultLogWriterName; 103 104 /** 105 * Specifies if the TransportLogger will be manageable by JMX or not. Also, as long as there is at least 1 106 * TransportLogger which is manageable, a TransportLoggerControl MBean will me created. 107 */ 108 protected boolean dynamicManagement = false; 109 110 /** 111 * startLogging=true -> the TransportLogger object of the Transport stack will initially write messages to the log. 112 * startLogging=false -> the TransportLogger object of the Transport stack will initially NOT write messages to the 113 * log. This parameter only has an effect if trace == true. This parameter is most probably set in Connection or 114 * TransportConnector URIs. 115 */ 116 protected boolean startLogging = true; 117 protected final ServerSocketFactory serverSocketFactory; 118 protected final BlockingQueue<Socket> socketQueue = new LinkedBlockingQueue<Socket>(); 119 protected Thread socketHandlerThread; 120 121 /** 122 * The maximum number of sockets allowed for this server 123 */ 124 protected int maximumConnections = Integer.MAX_VALUE; 125 protected final AtomicInteger currentTransportCount = new AtomicInteger(); 126 127 public TcpTransportServer(TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory) throws IOException, 128 URISyntaxException { 129 super(location); 130 this.transportFactory = transportFactory; 131 this.serverSocketFactory = serverSocketFactory; 132 } 133 134 public void bind() throws IOException { 135 URI bind = getBindLocation(); 136 137 String host = bind.getHost(); 138 host = (host == null || host.length() == 0) ? "localhost" : host; 139 InetAddress addr = InetAddress.getByName(host); 140 141 try { 142 serverSocket = serverSocketFactory.createServerSocket(bind.getPort(), backlog, addr); 143 configureServerSocket(serverSocket); 144 } catch (IOException e) { 145 throw IOExceptionSupport.create("Failed to bind to server socket: " + bind + " due to: " + e, e); 146 } 147 try { 148 setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), resolveHostName(serverSocket, addr), serverSocket.getLocalPort(), bind.getPath(), 149 bind.getQuery(), bind.getFragment())); 150 } catch (URISyntaxException e) { 151 // it could be that the host name contains invalid characters such 152 // as _ on unix platforms so lets try use the IP address instead 153 try { 154 setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), addr.getHostAddress(), serverSocket.getLocalPort(), bind.getPath(), 155 bind.getQuery(), bind.getFragment())); 156 } catch (URISyntaxException e2) { 157 throw IOExceptionSupport.create(e2); 158 } 159 } 160 } 161 162 private void configureServerSocket(ServerSocket socket) throws SocketException { 163 socket.setSoTimeout(2000); 164 if (transportOptions != null) { 165 166 // If the enabledCipherSuites option is invalid we don't want to ignore it as the call 167 // to SSLServerSocket to configure it has a side effect on the socket rendering it 168 // useless as all suites are enabled many of which are considered as insecure. We 169 // instead trap that option here and throw an exception. We should really consider 170 // all invalid options as breaking and not start the transport but the current design 171 // doesn't really allow for this. 172 // 173 // see: https://issues.apache.org/jira/browse/AMQ-4582 174 // 175 if (socket instanceof SSLServerSocket) { 176 if (transportOptions.containsKey("verifyHostName")) { 177 verifyHostName = Boolean.parseBoolean(transportOptions.get("verifyHostName").toString()); 178 } else { 179 transportOptions.put("verifyHostName", verifyHostName); 180 } 181 182 if (verifyHostName) { 183 SSLParameters sslParams = new SSLParameters(); 184 sslParams.setEndpointIdentificationAlgorithm("HTTPS"); 185 ((SSLServerSocket)this.serverSocket).setSSLParameters(sslParams); 186 } 187 188 189 if (transportOptions.containsKey("enabledCipherSuites")) { 190 Object cipherSuites = transportOptions.remove("enabledCipherSuites"); 191 192 if (!IntrospectionSupport.setProperty(socket, "enabledCipherSuites", cipherSuites)) { 193 throw new SocketException(String.format( 194 "Invalid transport options {enabledCipherSuites=%s}", cipherSuites)); 195 } 196 } 197 } 198 199 IntrospectionSupport.setProperties(socket, transportOptions); 200 } 201 } 202 203 /** 204 * @return Returns the wireFormatFactory. 205 */ 206 public WireFormatFactory getWireFormatFactory() { 207 return wireFormatFactory; 208 } 209 210 /** 211 * @param wireFormatFactory 212 * The wireFormatFactory to set. 213 */ 214 public void setWireFormatFactory(WireFormatFactory wireFormatFactory) { 215 this.wireFormatFactory = wireFormatFactory; 216 } 217 218 /** 219 * Associates a broker info with the transport server so that the transport can do discovery advertisements of the 220 * broker. 221 * 222 * @param brokerInfo 223 */ 224 @Override 225 public void setBrokerInfo(BrokerInfo brokerInfo) { 226 } 227 228 public long getMaxInactivityDuration() { 229 return maxInactivityDuration; 230 } 231 232 public void setMaxInactivityDuration(long maxInactivityDuration) { 233 this.maxInactivityDuration = maxInactivityDuration; 234 } 235 236 public long getMaxInactivityDurationInitalDelay() { 237 return this.maxInactivityDurationInitalDelay; 238 } 239 240 public void setMaxInactivityDurationInitalDelay(long maxInactivityDurationInitalDelay) { 241 this.maxInactivityDurationInitalDelay = maxInactivityDurationInitalDelay; 242 } 243 244 public int getMinmumWireFormatVersion() { 245 return minmumWireFormatVersion; 246 } 247 248 public void setMinmumWireFormatVersion(int minmumWireFormatVersion) { 249 this.minmumWireFormatVersion = minmumWireFormatVersion; 250 } 251 252 public boolean isTrace() { 253 return trace; 254 } 255 256 public void setTrace(boolean trace) { 257 this.trace = trace; 258 } 259 260 public String getLogWriterName() { 261 return logWriterName; 262 } 263 264 public void setLogWriterName(String logFormat) { 265 this.logWriterName = logFormat; 266 } 267 268 public boolean isDynamicManagement() { 269 return dynamicManagement; 270 } 271 272 public void setDynamicManagement(boolean useJmx) { 273 this.dynamicManagement = useJmx; 274 } 275 276 public boolean isStartLogging() { 277 return startLogging; 278 } 279 280 public void setStartLogging(boolean startLogging) { 281 this.startLogging = startLogging; 282 } 283 284 /** 285 * @return the backlog 286 */ 287 public int getBacklog() { 288 return backlog; 289 } 290 291 /** 292 * @param backlog 293 * the backlog to set 294 */ 295 public void setBacklog(int backlog) { 296 this.backlog = backlog; 297 } 298 299 /** 300 * @return the useQueueForAccept 301 */ 302 public boolean isUseQueueForAccept() { 303 return useQueueForAccept; 304 } 305 306 /** 307 * @param useQueueForAccept 308 * the useQueueForAccept to set 309 */ 310 public void setUseQueueForAccept(boolean useQueueForAccept) { 311 this.useQueueForAccept = useQueueForAccept; 312 } 313 314 /** 315 * pull Sockets from the ServerSocket 316 */ 317 @Override 318 public void run() { 319 if (!isStopped() && !isStopping()) { 320 final ServerSocket serverSocket = this.serverSocket; 321 if (serverSocket == null) { 322 onAcceptError(new IOException("Server started without a valid ServerSocket")); 323 } 324 325 final ServerSocketChannel channel = serverSocket.getChannel(); 326 if (channel != null) { 327 doRunWithServerSocketChannel(channel); 328 } else { 329 doRunWithServerSocket(serverSocket); 330 } 331 } 332 } 333 334 private void doRunWithServerSocketChannel(final ServerSocketChannel channel) { 335 try { 336 channel.configureBlocking(false); 337 final Selector selector = Selector.open(); 338 339 try { 340 channel.register(selector, SelectionKey.OP_ACCEPT); 341 } catch (ClosedChannelException ex) { 342 try { 343 selector.close(); 344 } catch (IOException ignore) {} 345 346 throw ex; 347 } 348 349 // Update object instance for later cleanup. 350 this.selector = selector; 351 352 while (!isStopped()) { 353 int count = selector.select(10); 354 355 if (count == 0) { 356 continue; 357 } 358 359 Set<SelectionKey> keys = selector.selectedKeys(); 360 361 for (Iterator<SelectionKey> i = keys.iterator(); i.hasNext(); ) { 362 final SelectionKey key = i.next(); 363 if (key.isAcceptable()) { 364 try { 365 SocketChannel sc = channel.accept(); 366 if (sc != null) { 367 if (isStopped() || getAcceptListener() == null) { 368 sc.close(); 369 } else { 370 if (useQueueForAccept) { 371 socketQueue.put(sc.socket()); 372 } else { 373 handleSocket(sc.socket()); 374 } 375 } 376 } 377 378 } catch (SocketTimeoutException ste) { 379 // expect this to happen 380 } catch (Exception e) { 381 e.printStackTrace(); 382 if (!isStopping()) { 383 onAcceptError(e); 384 } else if (!isStopped()) { 385 LOG.warn("run()", e); 386 onAcceptError(e); 387 } 388 } 389 } 390 i.remove(); 391 } 392 } 393 } catch (IOException ex) { 394 if (!isStopping()) { 395 onAcceptError(ex); 396 } else if (!isStopped()) { 397 LOG.warn("run()", ex); 398 onAcceptError(ex); 399 } 400 } 401 } 402 403 private void doRunWithServerSocket(final ServerSocket serverSocket) { 404 while (!isStopped()) { 405 Socket socket = null; 406 try { 407 socket = serverSocket.accept(); 408 if (socket != null) { 409 if (isStopped() || getAcceptListener() == null) { 410 socket.close(); 411 } else { 412 if (useQueueForAccept) { 413 socketQueue.put(socket); 414 } else { 415 handleSocket(socket); 416 } 417 } 418 } 419 } catch (SocketTimeoutException ste) { 420 // expect this to happen 421 } catch (Exception e) { 422 if (!isStopping()) { 423 onAcceptError(e); 424 } else if (!isStopped()) { 425 LOG.warn("run()", e); 426 onAcceptError(e); 427 } 428 } 429 } 430 } 431 432 /** 433 * Allow derived classes to override the Transport implementation that this transport server creates. 434 * 435 * @param socket 436 * @param format 437 * 438 * @return a new Transport instance. 439 * 440 * @throws IOException 441 */ 442 protected Transport createTransport(Socket socket, WireFormat format) throws IOException { 443 return new TcpTransport(format, socket); 444 } 445 446 /** 447 * @return pretty print of this 448 */ 449 @Override 450 public String toString() { 451 return "" + getBindLocation(); 452 } 453 454 /** 455 * @param socket 456 * @param bindAddress 457 * @return real hostName 458 * @throws UnknownHostException 459 */ 460 protected String resolveHostName(ServerSocket socket, InetAddress bindAddress) throws UnknownHostException { 461 String result = null; 462 if (socket.isBound()) { 463 if (socket.getInetAddress().isAnyLocalAddress()) { 464 // make it more human readable and useful, an alternative to 0.0.0.0 465 result = InetAddressUtil.getLocalHostName(); 466 } else { 467 result = socket.getInetAddress().getCanonicalHostName(); 468 } 469 } else { 470 result = bindAddress.getCanonicalHostName(); 471 } 472 return result; 473 } 474 475 @Override 476 protected void doStart() throws Exception { 477 if (useQueueForAccept) { 478 Runnable run = new Runnable() { 479 @Override 480 public void run() { 481 try { 482 while (!isStopped() && !isStopping()) { 483 Socket sock = socketQueue.poll(1, TimeUnit.SECONDS); 484 if (sock != null) { 485 try { 486 handleSocket(sock); 487 } catch (Throwable thrown) { 488 if (!isStopping()) { 489 onAcceptError(new Exception(thrown)); 490 } else if (!isStopped()) { 491 LOG.warn("Unexpected error thrown during accept handling: ", thrown); 492 onAcceptError(new Exception(thrown)); 493 } 494 } 495 } 496 } 497 498 } catch (InterruptedException e) { 499 if (!isStopped() || !isStopping()) { 500 LOG.info("socketQueue interrupted - stopping"); 501 onAcceptError(e); 502 } 503 } 504 } 505 }; 506 socketHandlerThread = new Thread(null, run, "ActiveMQ Transport Server Thread Handler: " + toString(), getStackSize()); 507 socketHandlerThread.setDaemon(true); 508 socketHandlerThread.setPriority(ThreadPriorities.BROKER_MANAGEMENT - 1); 509 socketHandlerThread.start(); 510 } 511 super.doStart(); 512 } 513 514 @Override 515 protected void doStop(ServiceStopper stopper) throws Exception { 516 Exception firstFailure = null; 517 518 try { 519 if (selector != null) { 520 selector.close(); 521 selector = null; 522 } 523 } catch (Exception error) { 524 } 525 526 try { 527 final ServerSocket serverSocket = this.serverSocket; 528 if (serverSocket != null) { 529 this.serverSocket = null; 530 serverSocket.close(); 531 } 532 } catch (Exception error) { 533 firstFailure = error; 534 } 535 536 if (socketHandlerThread != null) { 537 socketHandlerThread.interrupt(); 538 socketHandlerThread = null; 539 } 540 541 try { 542 super.doStop(stopper); 543 } catch (Exception error) { 544 if (firstFailure != null) { 545 firstFailure = error; 546 } 547 } 548 549 if (firstFailure != null) { 550 throw firstFailure; 551 } 552 } 553 554 @Override 555 public InetSocketAddress getSocketAddress() { 556 return (InetSocketAddress) serverSocket.getLocalSocketAddress(); 557 } 558 559 protected void handleSocket(Socket socket) { 560 doHandleSocket(socket); 561 } 562 563 final protected void doHandleSocket(Socket socket) { 564 boolean closeSocket = true; 565 boolean countIncremented = false; 566 try { 567 int currentCount; 568 do { 569 currentCount = currentTransportCount.get(); 570 if (currentCount >= this.maximumConnections) { 571 throw new ExceededMaximumConnectionsException( 572 "Exceeded the maximum number of allowed client connections. See the '" + 573 "maximumConnections' property on the TCP transport configuration URI " + 574 "in the ActiveMQ configuration file (e.g., activemq.xml)"); 575 } 576 577 //Increment this value before configuring the transport 578 //This is necessary because some of the transport servers must read from the 579 //socket during configureTransport() so we want to make sure this value is 580 //accurate as the transport server could pause here waiting for data to be sent from a client 581 } while(!currentTransportCount.compareAndSet(currentCount, currentCount + 1)); 582 countIncremented = true; 583 584 HashMap<String, Object> options = new HashMap<String, Object>(); 585 options.put("maxInactivityDuration", Long.valueOf(maxInactivityDuration)); 586 options.put("maxInactivityDurationInitalDelay", Long.valueOf(maxInactivityDurationInitalDelay)); 587 options.put("minmumWireFormatVersion", Integer.valueOf(minmumWireFormatVersion)); 588 options.put("trace", Boolean.valueOf(trace)); 589 options.put("soTimeout", Integer.valueOf(soTimeout)); 590 options.put("socketBufferSize", Integer.valueOf(socketBufferSize)); 591 options.put("connectionTimeout", Integer.valueOf(connectionTimeout)); 592 options.put("logWriterName", logWriterName); 593 options.put("dynamicManagement", Boolean.valueOf(dynamicManagement)); 594 options.put("startLogging", Boolean.valueOf(startLogging)); 595 options.putAll(transportOptions); 596 597 TransportInfo transportInfo = configureTransport(this, socket); 598 closeSocket = false; 599 600 if (transportInfo.transport instanceof ServiceSupport) { 601 ((ServiceSupport) transportInfo.transport).addServiceListener(this); 602 } 603 604 Transport configuredTransport = transportInfo.transportFactory.serverConfigure( 605 transportInfo.transport, transportInfo.format, options); 606 607 getAcceptListener().onAccept(configuredTransport); 608 609 } catch (SocketTimeoutException ste) { 610 // expect this to happen 611 } catch (Exception e) { 612 if (closeSocket) { 613 try { 614 //if closing the socket, only decrement the count it was actually incremented 615 //where it was incremented 616 if (countIncremented) { 617 currentTransportCount.decrementAndGet(); 618 } 619 socket.close(); 620 } catch (Exception ignore) { 621 } 622 } 623 624 if (!isStopping()) { 625 onAcceptError(e); 626 } else if (!isStopped()) { 627 LOG.warn("run()", e); 628 onAcceptError(e); 629 } 630 } 631 } 632 633 protected TransportInfo configureTransport(final TcpTransportServer server, final Socket socket) throws Exception { 634 WireFormat format = wireFormatFactory.createWireFormat(); 635 Transport transport = createTransport(socket, format); 636 return new TransportInfo(format, transport, transportFactory); 637 } 638 639 protected class TransportInfo { 640 final WireFormat format; 641 final Transport transport; 642 final TransportFactory transportFactory; 643 644 public TransportInfo(WireFormat format, Transport transport, TransportFactory transportFactory) { 645 this.format = format; 646 this.transport = transport; 647 this.transportFactory = transportFactory; 648 } 649 } 650 651 public int getSoTimeout() { 652 return soTimeout; 653 } 654 655 public void setSoTimeout(int soTimeout) { 656 this.soTimeout = soTimeout; 657 } 658 659 public int getSocketBufferSize() { 660 return socketBufferSize; 661 } 662 663 public void setSocketBufferSize(int socketBufferSize) { 664 this.socketBufferSize = socketBufferSize; 665 } 666 667 public int getConnectionTimeout() { 668 return connectionTimeout; 669 } 670 671 public void setConnectionTimeout(int connectionTimeout) { 672 this.connectionTimeout = connectionTimeout; 673 } 674 675 /** 676 * @return the maximumConnections 677 */ 678 public int getMaximumConnections() { 679 return maximumConnections; 680 } 681 682 /** 683 * @param maximumConnections 684 * the maximumConnections to set 685 */ 686 public void setMaximumConnections(int maximumConnections) { 687 this.maximumConnections = maximumConnections; 688 } 689 690 public AtomicInteger getCurrentTransportCount() { 691 return currentTransportCount; 692 } 693 694 @Override 695 public void started(Service service) { 696 } 697 698 @Override 699 public void stopped(Service service) { 700 this.currentTransportCount.decrementAndGet(); 701 } 702 703 @Override 704 public boolean isSslServer() { 705 return false; 706 } 707 708 @Override 709 public boolean isAllowLinkStealing() { 710 return allowLinkStealing; 711 } 712 713 @Override 714 public void setAllowLinkStealing(boolean allowLinkStealing) { 715 this.allowLinkStealing = allowLinkStealing; 716 } 717}