001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region; 018 019import java.io.IOException; 020import java.util.ArrayList; 021import java.util.HashSet; 022import java.util.Iterator; 023import java.util.List; 024import java.util.Map; 025import java.util.Set; 026import java.util.Timer; 027import java.util.TimerTask; 028import java.util.concurrent.ConcurrentHashMap; 029import java.util.concurrent.ConcurrentMap; 030 031import javax.jms.InvalidDestinationException; 032import javax.jms.JMSException; 033 034import org.apache.activemq.advisory.AdvisorySupport; 035import org.apache.activemq.broker.ConnectionContext; 036import org.apache.activemq.broker.region.policy.PolicyEntry; 037import org.apache.activemq.command.ActiveMQDestination; 038import org.apache.activemq.command.ConnectionId; 039import org.apache.activemq.command.ConsumerId; 040import org.apache.activemq.command.ConsumerInfo; 041import org.apache.activemq.command.RemoveSubscriptionInfo; 042import org.apache.activemq.command.SessionId; 043import org.apache.activemq.command.SubscriptionInfo; 044import org.apache.activemq.store.NoLocalSubscriptionAware; 045import org.apache.activemq.store.PersistenceAdapter; 046import org.apache.activemq.store.TopicMessageStore; 047import org.apache.activemq.thread.TaskRunnerFactory; 048import org.apache.activemq.usage.SystemUsage; 049import org.apache.activemq.util.LongSequenceGenerator; 050import org.apache.activemq.util.SubscriptionKey; 051import org.slf4j.Logger; 052import org.slf4j.LoggerFactory; 053 054/** 055 * 056 */ 057public class TopicRegion extends AbstractRegion { 058 private static final Logger LOG = LoggerFactory.getLogger(TopicRegion.class); 059 protected final ConcurrentMap<SubscriptionKey, DurableTopicSubscription> durableSubscriptions = new ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>(); 060 private final LongSequenceGenerator recoveredDurableSubIdGenerator = new LongSequenceGenerator(); 061 private final SessionId recoveredDurableSubSessionId = new SessionId(new ConnectionId("OFFLINE"), recoveredDurableSubIdGenerator.getNextSequenceId()); 062 private boolean keepDurableSubsActive; 063 064 private Timer cleanupTimer; 065 private TimerTask cleanupTask; 066 067 public TopicRegion(RegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, 068 DestinationFactory destinationFactory) { 069 super(broker, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); 070 if (broker.getBrokerService().getOfflineDurableSubscriberTaskSchedule() != -1 && broker.getBrokerService().getOfflineDurableSubscriberTimeout() != -1) { 071 this.cleanupTimer = new Timer("ActiveMQ Durable Subscriber Cleanup Timer", true); 072 this.cleanupTask = new TimerTask() { 073 @Override 074 public void run() { 075 doCleanup(); 076 } 077 }; 078 this.cleanupTimer.schedule(cleanupTask, broker.getBrokerService().getOfflineDurableSubscriberTaskSchedule(), broker.getBrokerService().getOfflineDurableSubscriberTaskSchedule()); 079 } 080 } 081 082 @Override 083 public void stop() throws Exception { 084 super.stop(); 085 if (cleanupTimer != null) { 086 cleanupTimer.cancel(); 087 } 088 } 089 090 public void doCleanup() { 091 long now = System.currentTimeMillis(); 092 for (Map.Entry<SubscriptionKey, DurableTopicSubscription> entry : durableSubscriptions.entrySet()) { 093 DurableTopicSubscription sub = entry.getValue(); 094 if (!sub.isActive()) { 095 long offline = sub.getOfflineTimestamp(); 096 if (offline != -1 && now - offline >= broker.getBrokerService().getOfflineDurableSubscriberTimeout()) { 097 LOG.info("Destroying durable subscriber due to inactivity: {}", sub); 098 try { 099 RemoveSubscriptionInfo info = new RemoveSubscriptionInfo(); 100 info.setClientId(entry.getKey().getClientId()); 101 info.setSubscriptionName(entry.getKey().getSubscriptionName()); 102 ConnectionContext context = new ConnectionContext(); 103 context.setBroker(broker); 104 context.setClientId(entry.getKey().getClientId()); 105 removeSubscription(context, info); 106 } catch (Exception e) { 107 LOG.error("Failed to remove inactive durable subscriber", e); 108 } 109 } 110 } 111 } 112 } 113 114 @Override 115 public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 116 if (info.isDurable()) { 117 if (broker.getBrokerService().isRejectDurableConsumers()) { 118 throw new JMSException("Durable Consumers are not allowed"); 119 } 120 ActiveMQDestination destination = info.getDestination(); 121 if (!destination.isPattern()) { 122 // Make sure the destination is created. 123 lookup(context, destination,true); 124 } 125 String clientId = context.getClientId(); 126 String subscriptionName = info.getSubscriptionName(); 127 SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName); 128 DurableTopicSubscription sub = durableSubscriptions.get(key); 129 if (sub != null) { 130 // throw this exception only if link stealing is off 131 if (!context.isAllowLinkStealing() && sub.isActive()) { 132 throw new JMSException("Durable consumer is in use for client: " + clientId + 133 " and subscriptionName: " + subscriptionName); 134 } 135 // Has the selector changed?? 136 if (hasDurableSubChanged(info, sub.getConsumerInfo())) { 137 // Remove the consumer first then add it. 138 durableSubscriptions.remove(key); 139 destinationsLock.readLock().lock(); 140 try { 141 for (Destination dest : destinations.values()) { 142 // Account for virtual destinations 143 if (dest instanceof Topic){ 144 Topic topic = (Topic)dest; 145 topic.deleteSubscription(context, key); 146 } 147 } 148 } finally { 149 destinationsLock.readLock().unlock(); 150 } 151 super.removeConsumer(context, sub.getConsumerInfo()); 152 super.addConsumer(context, info); 153 sub = durableSubscriptions.get(key); 154 } else { 155 // Change the consumer id key of the durable sub. 156 if (sub.getConsumerInfo().getConsumerId() != null) { 157 subscriptions.remove(sub.getConsumerInfo().getConsumerId()); 158 } 159 // set the info and context to the new ones. 160 // this is set in the activate() call below, but 161 // that call is a NOP if it is already active. 162 // hence need to set here and deactivate it first 163 if ((sub.context != context) || (sub.info != info)) { 164 sub.info = info; 165 sub.context = context; 166 sub.deactivate(keepDurableSubsActive, info.getLastDeliveredSequenceId()); 167 } 168 //If NoLocal we need to update the NoLocal selector with the new connectionId 169 //Simply setting the selector with the current one will trigger a 170 //refresh of of the connectionId for the NoLocal expression 171 if (info.isNoLocal()) { 172 sub.setSelector(sub.getSelector()); 173 } 174 subscriptions.put(info.getConsumerId(), sub); 175 } 176 } else { 177 super.addConsumer(context, info); 178 sub = durableSubscriptions.get(key); 179 if (sub == null) { 180 throw new JMSException("Cannot use the same consumerId: " + info.getConsumerId() + 181 " for two different durable subscriptions clientID: " + key.getClientId() + 182 " subscriberName: " + key.getSubscriptionName()); 183 } 184 } 185 sub.activate(usageManager, context, info, broker); 186 return sub; 187 } else { 188 return super.addConsumer(context, info); 189 } 190 } 191 192 @Override 193 public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 194 if (info.isDurable()) { 195 SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubscriptionName()); 196 DurableTopicSubscription sub = durableSubscriptions.get(key); 197 if (sub != null) { 198 // deactivate only if given context is same 199 // as what is in the sub. otherwise, during linksteal 200 // sub will get new context, but will be removed here 201 if (sub.getContext() == context) { 202 sub.deactivate(keepDurableSubsActive, info.getLastDeliveredSequenceId()); 203 } 204 } 205 } else { 206 super.removeConsumer(context, info); 207 } 208 } 209 210 @Override 211 public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception { 212 SubscriptionKey key = new SubscriptionKey(info.getClientId(), info.getSubscriptionName()); 213 DurableTopicSubscription sub = durableSubscriptions.get(key); 214 if (sub == null) { 215 throw new InvalidDestinationException("No durable subscription exists for clientID: " + 216 info.getClientId() + " and subscriptionName: " + 217 info.getSubscriptionName()); 218 } 219 if (sub.isActive()) { 220 throw new JMSException("Durable consumer is in use"); 221 } else { 222 durableSubscriptions.remove(key); 223 } 224 225 destinationsLock.readLock().lock(); 226 try { 227 for (Destination dest : destinations.values()) { 228 if (dest instanceof Topic){ 229 Topic topic = (Topic)dest; 230 topic.deleteSubscription(context, key); 231 } else if (dest instanceof DestinationFilter) { 232 DestinationFilter filter = (DestinationFilter) dest; 233 filter.deleteSubscription(context, key); 234 } 235 } 236 } finally { 237 destinationsLock.readLock().unlock(); 238 } 239 240 if (subscriptions.get(sub.getConsumerInfo().getConsumerId()) != null) { 241 super.removeConsumer(context, sub.getConsumerInfo()); 242 } else { 243 // try destroying inactive subscriptions 244 destroySubscription(sub); 245 } 246 } 247 248 @Override 249 public String toString() { 250 return "TopicRegion: destinations=" + destinations.size() + ", subscriptions=" + subscriptions.size() + ", memory=" + usageManager.getMemoryUsage().getPercentUsage() + "%"; 251 } 252 253 @Override 254 protected List<Subscription> addSubscriptionsForDestination(ConnectionContext context, Destination dest) throws Exception { 255 List<Subscription> rc = super.addSubscriptionsForDestination(context, dest); 256 Set<Subscription> dupChecker = new HashSet<Subscription>(rc); 257 258 TopicMessageStore store = (TopicMessageStore)dest.getMessageStore(); 259 // Eagerly recover the durable subscriptions 260 if (store != null) { 261 SubscriptionInfo[] infos = store.getAllSubscriptions(); 262 for (int i = 0; i < infos.length; i++) { 263 264 SubscriptionInfo info = infos[i]; 265 LOG.debug("Restoring durable subscription: {}", info); 266 SubscriptionKey key = new SubscriptionKey(info); 267 268 // A single durable sub may be subscribing to multiple topics. 269 // so it might exist already. 270 DurableTopicSubscription sub = durableSubscriptions.get(key); 271 ConsumerInfo consumerInfo = createInactiveConsumerInfo(info); 272 if (sub == null) { 273 ConnectionContext c = new ConnectionContext(); 274 c.setBroker(context.getBroker()); 275 c.setClientId(key.getClientId()); 276 c.setConnectionId(consumerInfo.getConsumerId().getParentId().getParentId()); 277 sub = (DurableTopicSubscription)createSubscription(c, consumerInfo); 278 sub.setOfflineTimestamp(System.currentTimeMillis()); 279 } 280 281 if (dupChecker.contains(sub)) { 282 continue; 283 } 284 285 dupChecker.add(sub); 286 rc.add(sub); 287 dest.addSubscription(context, sub); 288 } 289 290 // Now perhaps there other durable subscriptions (via wild card) 291 // that would match this destination.. 292 durableSubscriptions.values(); 293 for (DurableTopicSubscription sub : durableSubscriptions.values()) { 294 // Skip over subscriptions that we already added.. 295 if (dupChecker.contains(sub)) { 296 continue; 297 } 298 299 if (sub.matches(dest.getActiveMQDestination())) { 300 rc.add(sub); 301 dest.addSubscription(context, sub); 302 } 303 } 304 } 305 return rc; 306 } 307 308 public ConsumerInfo createInactiveConsumerInfo(SubscriptionInfo info) { 309 ConsumerInfo rc = new ConsumerInfo(); 310 rc.setSelector(info.getSelector()); 311 rc.setSubscriptionName(info.getSubscriptionName()); 312 rc.setDestination(info.getSubscribedDestination()); 313 rc.setConsumerId(createConsumerId()); 314 rc.setNoLocal(info.isNoLocal()); 315 return rc; 316 } 317 318 private ConsumerId createConsumerId() { 319 return new ConsumerId(recoveredDurableSubSessionId, recoveredDurableSubIdGenerator.getNextSequenceId()); 320 } 321 322 protected void configureTopic(Topic topic, ActiveMQDestination destination) { 323 if (broker.getDestinationPolicy() != null) { 324 PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination); 325 if (entry != null) { 326 entry.configure(broker,topic); 327 } 328 } 329 } 330 331 @Override 332 protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws JMSException { 333 ActiveMQDestination destination = info.getDestination(); 334 335 if (info.isDurable()) { 336 if (AdvisorySupport.isAdvisoryTopic(info.getDestination())) { 337 throw new JMSException("Cannot create a durable subscription for an advisory Topic"); 338 } 339 SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubscriptionName()); 340 DurableTopicSubscription sub = durableSubscriptions.get(key); 341 342 if (sub == null) { 343 344 sub = new DurableTopicSubscription(broker, usageManager, context, info, keepDurableSubsActive); 345 346 if (destination != null && broker.getDestinationPolicy() != null) { 347 PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination); 348 if (entry != null) { 349 entry.configure(broker, usageManager, sub); 350 } 351 } 352 durableSubscriptions.put(key, sub); 353 } else { 354 throw new JMSException("Durable subscription is already active for clientID: " + 355 context.getClientId() + " and subscriptionName: " + 356 info.getSubscriptionName()); 357 } 358 return sub; 359 } 360 try { 361 TopicSubscription answer = new TopicSubscription(broker, context, info, usageManager); 362 // lets configure the subscription depending on the destination 363 if (destination != null && broker.getDestinationPolicy() != null) { 364 PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination); 365 if (entry != null) { 366 entry.configure(broker, usageManager, answer); 367 } 368 } 369 answer.init(); 370 return answer; 371 } catch (Exception e) { 372 LOG.error("Failed to create TopicSubscription ", e); 373 JMSException jmsEx = new JMSException("Couldn't create TopicSubscription"); 374 jmsEx.setLinkedException(e); 375 throw jmsEx; 376 } 377 } 378 379 private boolean hasDurableSubChanged(ConsumerInfo info1, ConsumerInfo info2) throws IOException { 380 if (info1.getSelector() != null ^ info2.getSelector() != null) { 381 return true; 382 } 383 if (info1.getSelector() != null && !info1.getSelector().equals(info2.getSelector())) { 384 return true; 385 } 386 //Not all persistence adapters store the noLocal value for a subscription 387 PersistenceAdapter adapter = broker.getBrokerService().getPersistenceAdapter(); 388 if (adapter instanceof NoLocalSubscriptionAware) { 389 if (info1.isNoLocal() ^ info2.isNoLocal()) { 390 return true; 391 } 392 } 393 return !info1.getDestination().equals(info2.getDestination()); 394 } 395 396 @Override 397 protected Set<ActiveMQDestination> getInactiveDestinations() { 398 Set<ActiveMQDestination> inactiveDestinations = super.getInactiveDestinations(); 399 for (Iterator<ActiveMQDestination> iter = inactiveDestinations.iterator(); iter.hasNext();) { 400 ActiveMQDestination dest = iter.next(); 401 if (!dest.isTopic()) { 402 iter.remove(); 403 } 404 } 405 return inactiveDestinations; 406 } 407 408 public DurableTopicSubscription lookupSubscription(String subscriptionName, String clientId) { 409 SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName); 410 if (durableSubscriptions.containsKey(key)) { 411 return durableSubscriptions.get(key); 412 } 413 414 return null; 415 } 416 417 public List<DurableTopicSubscription> lookupSubscriptions(String clientId) { 418 List<DurableTopicSubscription> result = new ArrayList<DurableTopicSubscription>(); 419 420 for (Map.Entry<SubscriptionKey, DurableTopicSubscription> subscriptionEntry : durableSubscriptions.entrySet()) { 421 if (subscriptionEntry.getKey().getClientId().equals(clientId)) { 422 result.add(subscriptionEntry.getValue()); 423 } 424 } 425 426 return result; 427 } 428 429 public boolean isKeepDurableSubsActive() { 430 return keepDurableSubsActive; 431 } 432 433 public void setKeepDurableSubsActive(boolean keepDurableSubsActive) { 434 this.keepDurableSubsActive = keepDurableSubsActive; 435 } 436 437 public boolean durableSubscriptionExists(SubscriptionKey key) { 438 return this.durableSubscriptions.containsKey(key); 439 } 440 441 public DurableTopicSubscription getDurableSubscription(SubscriptionKey key) { 442 return durableSubscriptions.get(key); 443 } 444 445 public Map<SubscriptionKey, DurableTopicSubscription> getDurableSubscriptions() { 446 return durableSubscriptions; 447 } 448}