001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region; 018 019import java.io.IOException; 020import java.util.ArrayList; 021import java.util.Iterator; 022import java.util.LinkedList; 023import java.util.List; 024import java.util.concurrent.CountDownLatch; 025import java.util.concurrent.TimeUnit; 026 027import javax.jms.JMSException; 028 029import org.apache.activemq.broker.Broker; 030import org.apache.activemq.broker.ConnectionContext; 031import org.apache.activemq.broker.region.cursors.PendingMessageCursor; 032import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor; 033import org.apache.activemq.command.ConsumerControl; 034import org.apache.activemq.command.ConsumerInfo; 035import org.apache.activemq.command.Message; 036import org.apache.activemq.command.MessageAck; 037import org.apache.activemq.command.MessageDispatch; 038import org.apache.activemq.command.MessageDispatchNotification; 039import org.apache.activemq.command.MessageId; 040import org.apache.activemq.command.MessagePull; 041import org.apache.activemq.command.Response; 042import org.apache.activemq.thread.Scheduler; 043import org.apache.activemq.transaction.Synchronization; 044import org.apache.activemq.transport.TransmitCallback; 045import org.apache.activemq.usage.SystemUsage; 046import org.slf4j.Logger; 047import org.slf4j.LoggerFactory; 048 049/** 050 * A subscription that honors the pre-fetch option of the ConsumerInfo. 051 */ 052public abstract class PrefetchSubscription extends AbstractSubscription { 053 054 private static final Logger LOG = LoggerFactory.getLogger(PrefetchSubscription.class); 055 protected final Scheduler scheduler; 056 057 protected PendingMessageCursor pending; 058 protected final List<MessageReference> dispatched = new ArrayList<MessageReference>(); 059 protected boolean usePrefetchExtension = true; 060 private int maxProducersToAudit=32; 061 private int maxAuditDepth=2048; 062 protected final SystemUsage usageManager; 063 protected final Object pendingLock = new Object(); 064 protected final Object dispatchLock = new Object(); 065 private final CountDownLatch okForAckAsDispatchDone = new CountDownLatch(1); 066 067 public PrefetchSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, PendingMessageCursor cursor) throws JMSException { 068 super(broker,context, info); 069 this.usageManager=usageManager; 070 pending = cursor; 071 try { 072 pending.start(); 073 } catch (Exception e) { 074 throw new JMSException(e.getMessage()); 075 } 076 this.scheduler = broker.getScheduler(); 077 } 078 079 public PrefetchSubscription(Broker broker,SystemUsage usageManager, ConnectionContext context, ConsumerInfo info) throws JMSException { 080 this(broker,usageManager,context, info, new VMPendingMessageCursor(false)); 081 } 082 083 /** 084 * Allows a message to be pulled on demand by a client 085 */ 086 @Override 087 public Response pullMessage(ConnectionContext context, final MessagePull pull) throws Exception { 088 // The slave should not deliver pull messages. 089 // TODO: when the slave becomes a master, He should send a NULL message to all the 090 // consumers to 'wake them up' in case they were waiting for a message. 091 if (getPrefetchSize() == 0) { 092 prefetchExtension.set(pull.getQuantity()); 093 final long dispatchCounterBeforePull = getSubscriptionStatistics().getDispatched().getCount(); 094 095 // Have the destination push us some messages. 096 for (Destination dest : destinations) { 097 dest.iterate(); 098 } 099 dispatchPending(); 100 101 synchronized(this) { 102 // If there was nothing dispatched.. we may need to setup a timeout. 103 if (dispatchCounterBeforePull == getSubscriptionStatistics().getDispatched().getCount() || pull.isAlwaysSignalDone()) { 104 // immediate timeout used by receiveNoWait() 105 if (pull.getTimeout() == -1) { 106 // Null message indicates the pull is done or did not have pending. 107 prefetchExtension.set(1); 108 add(QueueMessageReference.NULL_MESSAGE); 109 dispatchPending(); 110 } 111 if (pull.getTimeout() > 0) { 112 scheduler.executeAfterDelay(new Runnable() { 113 @Override 114 public void run() { 115 pullTimeout(dispatchCounterBeforePull, pull.isAlwaysSignalDone()); 116 } 117 }, pull.getTimeout()); 118 } 119 } 120 } 121 } 122 return null; 123 } 124 125 /** 126 * Occurs when a pull times out. If nothing has been dispatched since the 127 * timeout was setup, then send the NULL message. 128 */ 129 final void pullTimeout(long dispatchCounterBeforePull, boolean alwaysSignalDone) { 130 synchronized (pendingLock) { 131 if (dispatchCounterBeforePull == getSubscriptionStatistics().getDispatched().getCount() || alwaysSignalDone) { 132 try { 133 prefetchExtension.set(1); 134 add(QueueMessageReference.NULL_MESSAGE); 135 dispatchPending(); 136 } catch (Exception e) { 137 context.getConnection().serviceException(e); 138 } finally { 139 prefetchExtension.set(0); 140 } 141 } 142 } 143 } 144 145 @Override 146 public void add(MessageReference node) throws Exception { 147 synchronized (pendingLock) { 148 // The destination may have just been removed... 149 if (!destinations.contains(node.getRegionDestination()) && node != QueueMessageReference.NULL_MESSAGE) { 150 // perhaps we should inform the caller that we are no longer valid to dispatch to? 151 return; 152 } 153 154 // Don't increment for the pullTimeout control message. 155 if (!node.equals(QueueMessageReference.NULL_MESSAGE)) { 156 getSubscriptionStatistics().getEnqueues().increment(); 157 } 158 pending.addMessageLast(node); 159 } 160 dispatchPending(); 161 } 162 163 @Override 164 public void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception { 165 synchronized(pendingLock) { 166 try { 167 pending.reset(); 168 while (pending.hasNext()) { 169 MessageReference node = pending.next(); 170 node.decrementReferenceCount(); 171 if (node.getMessageId().equals(mdn.getMessageId())) { 172 // Synchronize between dispatched list and removal of messages from pending list 173 // related to remove subscription action 174 synchronized(dispatchLock) { 175 pending.remove(); 176 createMessageDispatch(node, node.getMessage()); 177 dispatched.add(node); 178 getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize()); 179 onDispatch(node, node.getMessage()); 180 } 181 return; 182 } 183 } 184 } finally { 185 pending.release(); 186 } 187 } 188 throw new JMSException( 189 "Slave broker out of sync with master: Dispatched message (" 190 + mdn.getMessageId() + ") was not in the pending list for " 191 + mdn.getConsumerId() + " on " + mdn.getDestination().getPhysicalName()); 192 } 193 194 @Override 195 public final void acknowledge(final ConnectionContext context,final MessageAck ack) throws Exception { 196 // Handle the standard acknowledgment case. 197 boolean callDispatchMatched = false; 198 Destination destination = null; 199 200 if (!okForAckAsDispatchDone.await(0l, TimeUnit.MILLISECONDS)) { 201 // suppress unexpected ack exception in this expected case 202 LOG.warn("Ignoring ack received before dispatch; result of failover with an outstanding ack. Acked messages will be replayed if present on this broker. Ignored ack: {}", ack); 203 return; 204 } 205 206 LOG.trace("ack: {}", ack); 207 208 synchronized(dispatchLock) { 209 if (ack.isStandardAck()) { 210 // First check if the ack matches the dispatched. When using failover this might 211 // not be the case. We don't ever want to ack the wrong messages. 212 assertAckMatchesDispatched(ack); 213 214 // Acknowledge all dispatched messages up till the message id of 215 // the acknowledgment. 216 boolean inAckRange = false; 217 List<MessageReference> removeList = new ArrayList<MessageReference>(); 218 for (final MessageReference node : dispatched) { 219 MessageId messageId = node.getMessageId(); 220 if (ack.getFirstMessageId() == null 221 || ack.getFirstMessageId().equals(messageId)) { 222 inAckRange = true; 223 } 224 if (inAckRange) { 225 // Don't remove the nodes until we are committed. 226 if (!context.isInTransaction()) { 227 getSubscriptionStatistics().getDequeues().increment(); 228 ((Destination)node.getRegionDestination()).getDestinationStatistics().getInflight().decrement(); 229 removeList.add(node); 230 } else { 231 registerRemoveSync(context, node); 232 } 233 acknowledge(context, ack, node); 234 if (ack.getLastMessageId().equals(messageId)) { 235 destination = (Destination) node.getRegionDestination(); 236 callDispatchMatched = true; 237 break; 238 } 239 } 240 } 241 for (final MessageReference node : removeList) { 242 dispatched.remove(node); 243 getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize()); 244 } 245 // this only happens after a reconnect - get an ack which is not 246 // valid 247 if (!callDispatchMatched) { 248 LOG.warn("Could not correlate acknowledgment with dispatched message: {}", ack); 249 } 250 } else if (ack.isIndividualAck()) { 251 // Message was delivered and acknowledge - but only delete the 252 // individual message 253 for (final MessageReference node : dispatched) { 254 MessageId messageId = node.getMessageId(); 255 if (ack.getLastMessageId().equals(messageId)) { 256 // Don't remove the nodes until we are committed - immediateAck option 257 if (!context.isInTransaction()) { 258 getSubscriptionStatistics().getDequeues().increment(); 259 ((Destination)node.getRegionDestination()).getDestinationStatistics().getInflight().decrement(); 260 dispatched.remove(node); 261 getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize()); 262 } else { 263 registerRemoveSync(context, node); 264 } 265 266 if (usePrefetchExtension && getPrefetchSize() != 0 && ack.isInTransaction()) { 267 // allow transaction batch to exceed prefetch 268 while (true) { 269 int currentExtension = prefetchExtension.get(); 270 int newExtension = Math.max(currentExtension, currentExtension + 1); 271 if (prefetchExtension.compareAndSet(currentExtension, newExtension)) { 272 break; 273 } 274 } 275 } 276 277 acknowledge(context, ack, node); 278 destination = (Destination) node.getRegionDestination(); 279 callDispatchMatched = true; 280 break; 281 } 282 } 283 }else if (ack.isDeliveredAck()) { 284 // Message was delivered but not acknowledged: update pre-fetch 285 // counters. 286 int index = 0; 287 for (Iterator<MessageReference> iter = dispatched.iterator(); iter.hasNext(); index++) { 288 final MessageReference node = iter.next(); 289 Destination nodeDest = (Destination) node.getRegionDestination(); 290 if (ack.getLastMessageId().equals(node.getMessageId())) { 291 if (usePrefetchExtension && getPrefetchSize() != 0) { 292 // allow batch to exceed prefetch 293 while (true) { 294 int currentExtension = prefetchExtension.get(); 295 int newExtension = Math.max(currentExtension, index + 1); 296 if (prefetchExtension.compareAndSet(currentExtension, newExtension)) { 297 break; 298 } 299 } 300 } 301 destination = nodeDest; 302 callDispatchMatched = true; 303 break; 304 } 305 } 306 if (!callDispatchMatched) { 307 throw new JMSException( 308 "Could not correlate acknowledgment with dispatched message: " 309 + ack); 310 } 311 } else if (ack.isExpiredAck()) { 312 // Message was expired 313 int index = 0; 314 boolean inAckRange = false; 315 for (Iterator<MessageReference> iter = dispatched.iterator(); iter.hasNext(); index++) { 316 final MessageReference node = iter.next(); 317 Destination nodeDest = (Destination) node.getRegionDestination(); 318 MessageId messageId = node.getMessageId(); 319 if (ack.getFirstMessageId() == null || ack.getFirstMessageId().equals(messageId)) { 320 inAckRange = true; 321 } 322 if (inAckRange) { 323 Destination regionDestination = nodeDest; 324 if (broker.isExpired(node)) { 325 regionDestination.messageExpired(context, this, node); 326 } 327 iter.remove(); 328 nodeDest.getDestinationStatistics().getInflight().decrement(); 329 330 if (ack.getLastMessageId().equals(messageId)) { 331 if (usePrefetchExtension && getPrefetchSize() != 0) { 332 // allow batch to exceed prefetch 333 while (true) { 334 int currentExtension = prefetchExtension.get(); 335 int newExtension = Math.max(currentExtension, index + 1); 336 if (prefetchExtension.compareAndSet(currentExtension, newExtension)) { 337 break; 338 } 339 } 340 } 341 342 destination = (Destination) node.getRegionDestination(); 343 callDispatchMatched = true; 344 break; 345 } 346 } 347 } 348 if (!callDispatchMatched) { 349 throw new JMSException( 350 "Could not correlate expiration acknowledgment with dispatched message: " 351 + ack); 352 } 353 } else if (ack.isRedeliveredAck()) { 354 // Message was re-delivered but it was not yet considered to be 355 // a DLQ message. 356 boolean inAckRange = false; 357 for (final MessageReference node : dispatched) { 358 MessageId messageId = node.getMessageId(); 359 if (ack.getFirstMessageId() == null 360 || ack.getFirstMessageId().equals(messageId)) { 361 inAckRange = true; 362 } 363 if (inAckRange) { 364 if (ack.getLastMessageId().equals(messageId)) { 365 destination = (Destination) node.getRegionDestination(); 366 callDispatchMatched = true; 367 break; 368 } 369 } 370 } 371 if (!callDispatchMatched) { 372 throw new JMSException( 373 "Could not correlate acknowledgment with dispatched message: " 374 + ack); 375 } 376 } else if (ack.isPoisonAck()) { 377 // TODO: what if the message is already in a DLQ??? 378 // Handle the poison ACK case: we need to send the message to a 379 // DLQ 380 if (ack.isInTransaction()) { 381 throw new JMSException("Poison ack cannot be transacted: " 382 + ack); 383 } 384 int index = 0; 385 boolean inAckRange = false; 386 List<MessageReference> removeList = new ArrayList<MessageReference>(); 387 for (final MessageReference node : dispatched) { 388 MessageId messageId = node.getMessageId(); 389 if (ack.getFirstMessageId() == null 390 || ack.getFirstMessageId().equals(messageId)) { 391 inAckRange = true; 392 } 393 if (inAckRange) { 394 sendToDLQ(context, node, ack.getPoisonCause()); 395 Destination nodeDest = (Destination) node.getRegionDestination(); 396 nodeDest.getDestinationStatistics() 397 .getInflight().decrement(); 398 removeList.add(node); 399 getSubscriptionStatistics().getDequeues().increment(); 400 index++; 401 acknowledge(context, ack, node); 402 if (ack.getLastMessageId().equals(messageId)) { 403 while (true) { 404 int currentExtension = prefetchExtension.get(); 405 int newExtension = Math.max(0, currentExtension - (index + 1)); 406 if (prefetchExtension.compareAndSet(currentExtension, newExtension)) { 407 break; 408 } 409 } 410 destination = nodeDest; 411 callDispatchMatched = true; 412 break; 413 } 414 } 415 } 416 for (final MessageReference node : removeList) { 417 dispatched.remove(node); 418 getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize()); 419 } 420 if (!callDispatchMatched) { 421 throw new JMSException( 422 "Could not correlate acknowledgment with dispatched message: " 423 + ack); 424 } 425 } 426 } 427 if (callDispatchMatched && destination != null) { 428 destination.wakeup(); 429 dispatchPending(); 430 431 if (pending.isEmpty()) { 432 wakeupDestinationsForDispatch(); 433 } 434 } else { 435 LOG.debug("Acknowledgment out of sync (Normally occurs when failover connection reconnects): {}", ack); 436 } 437 } 438 439 private void registerRemoveSync(ConnectionContext context, final MessageReference node) { 440 // setup a Synchronization to remove nodes from the 441 // dispatched list. 442 context.getTransaction().addSynchronization( 443 new Synchronization() { 444 445 @Override 446 public void beforeEnd() { 447 if (usePrefetchExtension && getPrefetchSize() != 0) { 448 while (true) { 449 int currentExtension = prefetchExtension.get(); 450 int newExtension = Math.max(0, currentExtension - 1); 451 if (prefetchExtension.compareAndSet(currentExtension, newExtension)) { 452 break; 453 } 454 } 455 } 456 } 457 458 @Override 459 public void afterCommit() 460 throws Exception { 461 Destination nodeDest = (Destination) node.getRegionDestination(); 462 synchronized(dispatchLock) { 463 getSubscriptionStatistics().getDequeues().increment(); 464 dispatched.remove(node); 465 getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize()); 466 nodeDest.getDestinationStatistics().getInflight().decrement(); 467 } 468 nodeDest.wakeup(); 469 dispatchPending(); 470 } 471 472 @Override 473 public void afterRollback() throws Exception { 474 synchronized(dispatchLock) { 475 // poisionAck will decrement - otherwise still inflight on client 476 } 477 } 478 }); 479 } 480 481 /** 482 * Checks an ack versus the contents of the dispatched list. 483 * called with dispatchLock held 484 * @param ack 485 * @throws JMSException if it does not match 486 */ 487 protected void assertAckMatchesDispatched(MessageAck ack) throws JMSException { 488 MessageId firstAckedMsg = ack.getFirstMessageId(); 489 MessageId lastAckedMsg = ack.getLastMessageId(); 490 int checkCount = 0; 491 boolean checkFoundStart = false; 492 boolean checkFoundEnd = false; 493 for (MessageReference node : dispatched) { 494 495 if (firstAckedMsg == null) { 496 checkFoundStart = true; 497 } else if (!checkFoundStart && firstAckedMsg.equals(node.getMessageId())) { 498 checkFoundStart = true; 499 } 500 501 if (checkFoundStart) { 502 checkCount++; 503 } 504 505 if (lastAckedMsg != null && lastAckedMsg.equals(node.getMessageId())) { 506 checkFoundEnd = true; 507 break; 508 } 509 } 510 if (!checkFoundStart && firstAckedMsg != null) 511 throw new JMSException("Unmatched acknowledge: " + ack 512 + "; Could not find Message-ID " + firstAckedMsg 513 + " in dispatched-list (start of ack)"); 514 if (!checkFoundEnd && lastAckedMsg != null) 515 throw new JMSException("Unmatched acknowledge: " + ack 516 + "; Could not find Message-ID " + lastAckedMsg 517 + " in dispatched-list (end of ack)"); 518 if (ack.getMessageCount() != checkCount && !ack.isInTransaction()) { 519 throw new JMSException("Unmatched acknowledge: " + ack 520 + "; Expected message count (" + ack.getMessageCount() 521 + ") differs from count in dispatched-list (" + checkCount 522 + ")"); 523 } 524 } 525 526 /** 527 * 528 * @param context 529 * @param node 530 * @param poisonCause 531 * @throws IOException 532 * @throws Exception 533 */ 534 protected void sendToDLQ(final ConnectionContext context, final MessageReference node, Throwable poisonCause) throws IOException, Exception { 535 broker.getRoot().sendToDeadLetterQueue(context, node, this, poisonCause); 536 } 537 538 @Override 539 public int getInFlightSize() { 540 return dispatched.size(); 541 } 542 543 /** 544 * Used to determine if the broker can dispatch to the consumer. 545 * 546 * @return true if the subscription is full 547 */ 548 @Override 549 public boolean isFull() { 550 return getPrefetchSize() == 0 ? prefetchExtension.get() == 0 : dispatched.size() - prefetchExtension.get() >= info.getPrefetchSize(); 551 } 552 553 /** 554 * @return true when 60% or more room is left for dispatching messages 555 */ 556 @Override 557 public boolean isLowWaterMark() { 558 return (dispatched.size() - prefetchExtension.get()) <= (info.getPrefetchSize() * .4); 559 } 560 561 /** 562 * @return true when 10% or less room is left for dispatching messages 563 */ 564 @Override 565 public boolean isHighWaterMark() { 566 return (dispatched.size() - prefetchExtension.get()) >= (info.getPrefetchSize() * .9); 567 } 568 569 @Override 570 public int countBeforeFull() { 571 return getPrefetchSize() == 0 ? prefetchExtension.get() : info.getPrefetchSize() + prefetchExtension.get() - dispatched.size(); 572 } 573 574 @Override 575 public int getPendingQueueSize() { 576 return pending.size(); 577 } 578 579 @Override 580 public long getPendingMessageSize() { 581 synchronized (pendingLock) { 582 return pending.messageSize(); 583 } 584 } 585 586 @Override 587 public int getDispatchedQueueSize() { 588 return dispatched.size(); 589 } 590 591 @Override 592 public long getDequeueCounter() { 593 return getSubscriptionStatistics().getDequeues().getCount(); 594 } 595 596 @Override 597 public long getDispatchedCounter() { 598 return getSubscriptionStatistics().getDispatched().getCount(); 599 } 600 601 @Override 602 public long getEnqueueCounter() { 603 return getSubscriptionStatistics().getEnqueues().getCount(); 604 } 605 606 @Override 607 public boolean isRecoveryRequired() { 608 return pending.isRecoveryRequired(); 609 } 610 611 public PendingMessageCursor getPending() { 612 return this.pending; 613 } 614 615 public void setPending(PendingMessageCursor pending) { 616 this.pending = pending; 617 if (this.pending!=null) { 618 this.pending.setSystemUsage(usageManager); 619 this.pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark()); 620 } 621 } 622 623 @Override 624 public void add(ConnectionContext context, Destination destination) throws Exception { 625 synchronized(pendingLock) { 626 super.add(context, destination); 627 pending.add(context, destination); 628 } 629 } 630 631 @Override 632 public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception { 633 return remove(context, destination, dispatched); 634 } 635 636 public List<MessageReference> remove(ConnectionContext context, Destination destination, List<MessageReference> dispatched) throws Exception { 637 LinkedList<MessageReference> redispatch = new LinkedList<MessageReference>(); 638 synchronized(pendingLock) { 639 super.remove(context, destination); 640 // Here is a potential problem concerning Inflight stat: 641 // Messages not already committed or rolled back may not be removed from dispatched list at the moment 642 // Except if each commit or rollback callback action comes before remove of subscriber. 643 redispatch.addAll(pending.remove(context, destination)); 644 645 if (dispatched == null) { 646 return redispatch; 647 } 648 649 // Synchronized to DispatchLock if necessary 650 if (dispatched == this.dispatched) { 651 synchronized(dispatchLock) { 652 addReferencesAndUpdateRedispatch(redispatch, destination, dispatched); 653 } 654 } else { 655 addReferencesAndUpdateRedispatch(redispatch, destination, dispatched); 656 } 657 } 658 659 return redispatch; 660 } 661 662 private void addReferencesAndUpdateRedispatch(LinkedList<MessageReference> redispatch, Destination destination, List<MessageReference> dispatched) { 663 ArrayList<MessageReference> references = new ArrayList<MessageReference>(); 664 for (MessageReference r : dispatched) { 665 if (r.getRegionDestination() == destination) { 666 references.add(r); 667 getSubscriptionStatistics().getInflightMessageSize().addSize(-r.getSize()); 668 } 669 } 670 redispatch.addAll(0, references); 671 destination.getDestinationStatistics().getInflight().subtract(references.size()); 672 dispatched.removeAll(references); 673 } 674 675 // made public so it can be used in MQTTProtocolConverter 676 public void dispatchPending() throws IOException { 677 List<Destination> slowConsumerTargets = null; 678 679 synchronized(pendingLock) { 680 try { 681 int numberToDispatch = countBeforeFull(); 682 if (numberToDispatch > 0) { 683 setSlowConsumer(false); 684 setPendingBatchSize(pending, numberToDispatch); 685 int count = 0; 686 pending.reset(); 687 while (pending.hasNext() && !isFull() && count < numberToDispatch) { 688 MessageReference node = pending.next(); 689 if (node == null) { 690 break; 691 } 692 693 // Synchronize between dispatched list and remove of message from pending list 694 // related to remove subscription action 695 synchronized(dispatchLock) { 696 pending.remove(); 697 if (!isDropped(node) && canDispatch(node)) { 698 699 // Message may have been sitting in the pending 700 // list a while waiting for the consumer to ak the message. 701 if (node != QueueMessageReference.NULL_MESSAGE && node.isExpired()) { 702 //increment number to dispatch 703 numberToDispatch++; 704 if (broker.isExpired(node)) { 705 ((Destination)node.getRegionDestination()).messageExpired(context, this, node); 706 } 707 708 if (!isBrowser()) { 709 node.decrementReferenceCount(); 710 continue; 711 } 712 } 713 dispatch(node); 714 count++; 715 } 716 } 717 // decrement after dispatch has taken ownership to avoid usage jitter 718 node.decrementReferenceCount(); 719 } 720 } else if (!isSlowConsumer()) { 721 setSlowConsumer(true); 722 slowConsumerTargets = destinations; 723 } 724 } finally { 725 pending.release(); 726 } 727 } 728 729 if (slowConsumerTargets != null) { 730 for (Destination dest : slowConsumerTargets) { 731 dest.slowConsumer(context, this); 732 } 733 } 734 } 735 736 protected void setPendingBatchSize(PendingMessageCursor pending, int numberToDispatch) { 737 pending.setMaxBatchSize(numberToDispatch); 738 } 739 740 // called with dispatchLock held 741 protected boolean dispatch(final MessageReference node) throws IOException { 742 final Message message = node.getMessage(); 743 if (message == null) { 744 return false; 745 } 746 747 okForAckAsDispatchDone.countDown(); 748 749 MessageDispatch md = createMessageDispatch(node, message); 750 if (node != QueueMessageReference.NULL_MESSAGE) { 751 getSubscriptionStatistics().getDispatched().increment(); 752 dispatched.add(node); 753 getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize()); 754 } 755 if (getPrefetchSize() == 0) { 756 while (true) { 757 int currentExtension = prefetchExtension.get(); 758 int newExtension = Math.max(0, currentExtension - 1); 759 if (prefetchExtension.compareAndSet(currentExtension, newExtension)) { 760 break; 761 } 762 } 763 } 764 if (info.isDispatchAsync()) { 765 md.setTransmitCallback(new TransmitCallback() { 766 767 @Override 768 public void onSuccess() { 769 // Since the message gets queued up in async dispatch, we don't want to 770 // decrease the reference count until it gets put on the wire. 771 onDispatch(node, message); 772 } 773 774 @Override 775 public void onFailure() { 776 Destination nodeDest = (Destination) node.getRegionDestination(); 777 if (nodeDest != null) { 778 if (node != QueueMessageReference.NULL_MESSAGE) { 779 nodeDest.getDestinationStatistics().getDispatched().increment(); 780 nodeDest.getDestinationStatistics().getInflight().increment(); 781 LOG.trace("{} failed to dispatch: {} - {}, dispatched: {}, inflight: {}", new Object[]{ info.getConsumerId(), message.getMessageId(), message.getDestination(), getSubscriptionStatistics().getDispatched().getCount(), dispatched.size() }); 782 } 783 } 784 if (node instanceof QueueMessageReference) { 785 ((QueueMessageReference) node).unlock(); 786 } 787 } 788 }); 789 context.getConnection().dispatchAsync(md); 790 } else { 791 context.getConnection().dispatchSync(md); 792 onDispatch(node, message); 793 } 794 return true; 795 } 796 797 protected void onDispatch(final MessageReference node, final Message message) { 798 Destination nodeDest = (Destination) node.getRegionDestination(); 799 if (nodeDest != null) { 800 if (node != QueueMessageReference.NULL_MESSAGE) { 801 nodeDest.getDestinationStatistics().getDispatched().increment(); 802 nodeDest.getDestinationStatistics().getInflight().increment(); 803 LOG.trace("{} dispatched: {} - {}, dispatched: {}, inflight: {}", new Object[]{ info.getConsumerId(), message.getMessageId(), message.getDestination(), getSubscriptionStatistics().getDispatched().getCount(), dispatched.size() }); 804 } 805 } 806 807 if (info.isDispatchAsync()) { 808 try { 809 dispatchPending(); 810 } catch (IOException e) { 811 context.getConnection().serviceExceptionAsync(e); 812 } 813 } 814 } 815 816 /** 817 * inform the MessageConsumer on the client to change it's prefetch 818 * 819 * @param newPrefetch 820 */ 821 @Override 822 public void updateConsumerPrefetch(int newPrefetch) { 823 if (context != null && context.getConnection() != null && context.getConnection().isManageable()) { 824 ConsumerControl cc = new ConsumerControl(); 825 cc.setConsumerId(info.getConsumerId()); 826 cc.setPrefetch(newPrefetch); 827 context.getConnection().dispatchAsync(cc); 828 } 829 } 830 831 /** 832 * @param node 833 * @param message 834 * @return MessageDispatch 835 */ 836 protected MessageDispatch createMessageDispatch(MessageReference node, Message message) { 837 MessageDispatch md = new MessageDispatch(); 838 md.setConsumerId(info.getConsumerId()); 839 840 if (node == QueueMessageReference.NULL_MESSAGE) { 841 md.setMessage(null); 842 md.setDestination(null); 843 } else { 844 Destination regionDestination = (Destination) node.getRegionDestination(); 845 md.setDestination(regionDestination.getActiveMQDestination()); 846 md.setMessage(message); 847 md.setRedeliveryCounter(node.getRedeliveryCounter()); 848 } 849 850 return md; 851 } 852 853 /** 854 * Use when a matched message is about to be dispatched to the client. 855 * 856 * @param node 857 * @return false if the message should not be dispatched to the client 858 * (another sub may have already dispatched it for example). 859 * @throws IOException 860 */ 861 protected abstract boolean canDispatch(MessageReference node) throws IOException; 862 863 protected abstract boolean isDropped(MessageReference node); 864 865 /** 866 * Used during acknowledgment to remove the message. 867 * 868 * @throws IOException 869 */ 870 protected abstract void acknowledge(ConnectionContext context, final MessageAck ack, final MessageReference node) throws IOException; 871 872 873 public int getMaxProducersToAudit() { 874 return maxProducersToAudit; 875 } 876 877 public void setMaxProducersToAudit(int maxProducersToAudit) { 878 this.maxProducersToAudit = maxProducersToAudit; 879 if (this.pending != null) { 880 this.pending.setMaxProducersToAudit(maxProducersToAudit); 881 } 882 } 883 884 public int getMaxAuditDepth() { 885 return maxAuditDepth; 886 } 887 888 public void setMaxAuditDepth(int maxAuditDepth) { 889 this.maxAuditDepth = maxAuditDepth; 890 if (this.pending != null) { 891 this.pending.setMaxAuditDepth(maxAuditDepth); 892 } 893 } 894 895 public boolean isUsePrefetchExtension() { 896 return usePrefetchExtension; 897 } 898 899 public void setUsePrefetchExtension(boolean usePrefetchExtension) { 900 this.usePrefetchExtension = usePrefetchExtension; 901 } 902 903 @Override 904 public void setPrefetchSize(int prefetchSize) { 905 this.info.setPrefetchSize(prefetchSize); 906 try { 907 this.dispatchPending(); 908 } catch (Exception e) { 909 LOG.trace("Caught exception during dispatch after prefetch change.", e); 910 } 911 } 912}