001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region.cursors; 018 019import java.util.Collections; 020import java.util.HashMap; 021import java.util.List; 022import java.util.Map; 023import java.util.concurrent.CopyOnWriteArrayList; 024 025import org.apache.activemq.advisory.AdvisorySupport; 026import org.apache.activemq.broker.Broker; 027import org.apache.activemq.broker.ConnectionContext; 028import org.apache.activemq.broker.region.Destination; 029import org.apache.activemq.broker.region.DurableTopicSubscription; 030import org.apache.activemq.broker.region.MessageReference; 031import org.apache.activemq.broker.region.Topic; 032import org.apache.activemq.command.Message; 033import org.apache.activemq.usage.SystemUsage; 034import org.slf4j.Logger; 035import org.slf4j.LoggerFactory; 036 037/** 038 * persist pending messages pending message (messages awaiting dispatch to a 039 * consumer) cursor 040 */ 041public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { 042 043 private static final Logger LOG = LoggerFactory.getLogger(StoreDurableSubscriberCursor.class); 044 private final String clientId; 045 private final String subscriberName; 046 private final Map<Destination, TopicStorePrefetch> topics = new HashMap<Destination, TopicStorePrefetch>(); 047 private final List<PendingMessageCursor> storePrefetches = new CopyOnWriteArrayList<PendingMessageCursor>(); 048 private final PendingMessageCursor nonPersistent; 049 private PendingMessageCursor currentCursor; 050 private final DurableTopicSubscription subscription; 051 private boolean immediatePriorityDispatch = true; 052 053 /** 054 * @param broker Broker for this cursor 055 * @param clientId clientId for this cursor 056 * @param subscriberName subscriber name for this cursor 057 * @param maxBatchSize currently ignored 058 * @param subscription subscription for this cursor 059 */ 060 public StoreDurableSubscriberCursor(Broker broker,String clientId, String subscriberName,int maxBatchSize, DurableTopicSubscription subscription) { 061 super(AbstractPendingMessageCursor.isPrioritizedMessageSubscriber(broker,subscription)); 062 this.subscription=subscription; 063 this.clientId = clientId; 064 this.subscriberName = subscriberName; 065 if (broker.getBrokerService().isPersistent()) { 066 this.nonPersistent = new FilePendingMessageCursor(broker,clientId + subscriberName,this.prioritizedMessages); 067 } else { 068 this.nonPersistent = new VMPendingMessageCursor(this.prioritizedMessages); 069 } 070 071 this.nonPersistent.setMaxBatchSize(maxBatchSize); 072 this.nonPersistent.setSystemUsage(systemUsage); 073 this.storePrefetches.add(this.nonPersistent); 074 075 if (prioritizedMessages) { 076 setMaxAuditDepth(10*getMaxAuditDepth()); 077 } 078 } 079 080 @Override 081 public synchronized void start() throws Exception { 082 if (!isStarted()) { 083 super.start(); 084 for (PendingMessageCursor tsp : storePrefetches) { 085 tsp.setMessageAudit(getMessageAudit()); 086 tsp.start(); 087 } 088 } 089 } 090 091 @Override 092 public synchronized void stop() throws Exception { 093 if (isStarted()) { 094 if (subscription.isKeepDurableSubsActive()) { 095 super.gc(); 096 for (PendingMessageCursor tsp : storePrefetches) { 097 tsp.gc(); 098 } 099 } else { 100 super.stop(); 101 for (PendingMessageCursor tsp : storePrefetches) { 102 tsp.stop(); 103 } 104 getMessageAudit().clear(); 105 } 106 } 107 } 108 109 /** 110 * Add a destination 111 * 112 * @param context 113 * @param destination 114 * @throws Exception 115 */ 116 @Override 117 public synchronized void add(ConnectionContext context, Destination destination) throws Exception { 118 if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination.getActiveMQDestination())) { 119 TopicStorePrefetch tsp = new TopicStorePrefetch(this.subscription,(Topic)destination, clientId, subscriberName); 120 tsp.setMaxBatchSize(destination.getMaxPageSize()); 121 tsp.setSystemUsage(systemUsage); 122 tsp.setMessageAudit(getMessageAudit()); 123 tsp.setEnableAudit(isEnableAudit()); 124 tsp.setMemoryUsageHighWaterMark(getMemoryUsageHighWaterMark()); 125 tsp.setUseCache(isUseCache()); 126 tsp.setCacheEnabled(isUseCache() && tsp.isEmpty()); 127 topics.put(destination, tsp); 128 storePrefetches.add(tsp); 129 if (isStarted()) { 130 tsp.start(); 131 } 132 } 133 } 134 135 /** 136 * remove a destination 137 * 138 * @param context 139 * @param destination 140 * @throws Exception 141 */ 142 @Override 143 public synchronized List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception { 144 PendingMessageCursor tsp = topics.remove(destination); 145 if (tsp != null) { 146 storePrefetches.remove(tsp); 147 } 148 return Collections.EMPTY_LIST; 149 } 150 151 /** 152 * @return true if there are no pending messages 153 */ 154 @Override 155 public synchronized boolean isEmpty() { 156 for (PendingMessageCursor tsp : storePrefetches) { 157 if( !tsp.isEmpty() ) 158 return false; 159 } 160 return true; 161 } 162 163 @Override 164 public synchronized boolean isEmpty(Destination destination) { 165 boolean result = true; 166 TopicStorePrefetch tsp = topics.get(destination); 167 if (tsp != null) { 168 result = tsp.isEmpty(); 169 } 170 return result; 171 } 172 173 /** 174 * Informs the Broker if the subscription needs to intervention to recover 175 * it's state e.g. DurableTopicSubscriber may do 176 * 177 * @see org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor 178 * @return true if recovery required 179 */ 180 @Override 181 public boolean isRecoveryRequired() { 182 return false; 183 } 184 185 @Override 186 public synchronized boolean tryAddMessageLast(MessageReference node, long wait) throws Exception { 187 if (node != null) { 188 Message msg = node.getMessage(); 189 if (isStarted()) { 190 if (!msg.isPersistent()) { 191 nonPersistent.addMessageLast(node); 192 } 193 } 194 if (msg.isPersistent()) { 195 Destination dest = (Destination) msg.getRegionDestination(); 196 TopicStorePrefetch tsp = topics.get(dest); 197 if (tsp != null) { 198 tsp.addMessageLast(node); 199 if (prioritizedMessages && immediatePriorityDispatch && tsp.isPaging()) { 200 if (msg.getPriority() > tsp.getLastRecoveredPriority()) { 201 tsp.recoverMessage(node.getMessage(), true); 202 LOG.trace("cached high priority ({} message: {}, current paged batch priority: {}, cache size: {}", new Object[]{ msg.getPriority(), msg.getMessageId(), tsp.getLastRecoveredPriority(), tsp.batchList.size()}); 203 } 204 } 205 } 206 } 207 208 } 209 return true; 210 } 211 212 @Override 213 public boolean isTransient() { 214 return subscription.isKeepDurableSubsActive(); 215 } 216 217 @Override 218 public void addMessageFirst(MessageReference node) throws Exception { 219 // for keep durable subs active, need to deal with redispatch 220 if (node != null) { 221 Message msg = node.getMessage(); 222 if (!msg.isPersistent()) { 223 nonPersistent.addMessageFirst(node); 224 } else { 225 Destination dest = (Destination) msg.getRegionDestination(); 226 TopicStorePrefetch tsp = topics.get(dest); 227 if (tsp != null) { 228 tsp.addMessageFirst(node); 229 } 230 } 231 } 232 } 233 234 @Override 235 public synchronized void addRecoveredMessage(MessageReference node) throws Exception { 236 nonPersistent.addMessageLast(node); 237 } 238 239 @Override 240 public synchronized void clear() { 241 for (PendingMessageCursor tsp : storePrefetches) { 242 tsp.clear(); 243 } 244 } 245 246 @Override 247 public synchronized boolean hasNext() { 248 boolean result = true; 249 if (result) { 250 try { 251 currentCursor = getNextCursor(); 252 } catch (Exception e) { 253 LOG.error("Failed to get current cursor ", e); 254 throw new RuntimeException(e); 255 } 256 result = currentCursor != null ? currentCursor.hasNext() : false; 257 } 258 return result; 259 } 260 261 @Override 262 public synchronized MessageReference next() { 263 MessageReference result = currentCursor != null ? currentCursor.next() : null; 264 return result; 265 } 266 267 @Override 268 public synchronized void remove() { 269 if (currentCursor != null) { 270 currentCursor.remove(); 271 } 272 } 273 274 @Override 275 public synchronized void remove(MessageReference node) { 276 for (PendingMessageCursor tsp : storePrefetches) { 277 tsp.remove(node); 278 } 279 } 280 281 @Override 282 public synchronized void reset() { 283 for (PendingMessageCursor storePrefetch : storePrefetches) { 284 storePrefetch.reset(); 285 } 286 } 287 288 @Override 289 public synchronized void release() { 290 this.currentCursor = null; 291 for (PendingMessageCursor storePrefetch : storePrefetches) { 292 storePrefetch.release(); 293 } 294 } 295 296 @Override 297 public synchronized int size() { 298 int pendingCount=0; 299 for (PendingMessageCursor tsp : storePrefetches) { 300 pendingCount += tsp.size(); 301 } 302 return pendingCount; 303 } 304 305 @Override 306 public synchronized long messageSize() { 307 long pendingSize=0; 308 for (PendingMessageCursor tsp : storePrefetches) { 309 pendingSize += tsp.messageSize(); 310 } 311 return pendingSize; 312 } 313 314 @Override 315 public void setMaxBatchSize(int newMaxBatchSize) { 316 for (PendingMessageCursor storePrefetch : storePrefetches) { 317 storePrefetch.setMaxBatchSize(newMaxBatchSize); 318 } 319 super.setMaxBatchSize(newMaxBatchSize); 320 } 321 322 @Override 323 public synchronized void gc() { 324 for (PendingMessageCursor tsp : storePrefetches) { 325 tsp.gc(); 326 } 327 } 328 329 @Override 330 public void setSystemUsage(SystemUsage usageManager) { 331 super.setSystemUsage(usageManager); 332 for (PendingMessageCursor tsp : storePrefetches) { 333 tsp.setSystemUsage(usageManager); 334 } 335 } 336 337 @Override 338 public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) { 339 super.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark); 340 for (PendingMessageCursor cursor : storePrefetches) { 341 cursor.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark); 342 } 343 } 344 345 @Override 346 public void setMaxProducersToAudit(int maxProducersToAudit) { 347 super.setMaxProducersToAudit(maxProducersToAudit); 348 for (PendingMessageCursor cursor : storePrefetches) { 349 cursor.setMaxProducersToAudit(maxProducersToAudit); 350 } 351 } 352 353 @Override 354 public void setMaxAuditDepth(int maxAuditDepth) { 355 super.setMaxAuditDepth(maxAuditDepth); 356 for (PendingMessageCursor cursor : storePrefetches) { 357 cursor.setMaxAuditDepth(maxAuditDepth); 358 } 359 } 360 361 @Override 362 public void setEnableAudit(boolean enableAudit) { 363 super.setEnableAudit(enableAudit); 364 for (PendingMessageCursor cursor : storePrefetches) { 365 cursor.setEnableAudit(enableAudit); 366 } 367 } 368 369 @Override 370 public void setUseCache(boolean useCache) { 371 super.setUseCache(useCache); 372 for (PendingMessageCursor cursor : storePrefetches) { 373 cursor.setUseCache(useCache); 374 } 375 } 376 377 protected synchronized PendingMessageCursor getNextCursor() throws Exception { 378 if (currentCursor == null || currentCursor.isEmpty()) { 379 currentCursor = null; 380 for (PendingMessageCursor tsp : storePrefetches) { 381 if (tsp.hasNext()) { 382 currentCursor = tsp; 383 break; 384 } 385 } 386 // round-robin 387 if (storePrefetches.size()>1) { 388 PendingMessageCursor first = storePrefetches.remove(0); 389 storePrefetches.add(first); 390 } 391 } 392 return currentCursor; 393 } 394 395 @Override 396 public String toString() { 397 return "StoreDurableSubscriber(" + clientId + ":" + subscriberName + ")"; 398 } 399 400 public boolean isImmediatePriorityDispatch() { 401 return immediatePriorityDispatch; 402 } 403 404 public void setImmediatePriorityDispatch(boolean immediatePriorityDispatch) { 405 this.immediatePriorityDispatch = immediatePriorityDispatch; 406 } 407 408}