001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region; 018 019import java.util.Iterator; 020import java.util.Set; 021 022import javax.jms.JMSException; 023 024import org.apache.activemq.broker.ConnectionContext; 025import org.apache.activemq.broker.region.policy.PolicyEntry; 026import org.apache.activemq.command.ActiveMQDestination; 027import org.apache.activemq.command.ConsumerInfo; 028import org.apache.activemq.command.Message; 029import org.apache.activemq.command.MessageDispatchNotification; 030import org.apache.activemq.thread.TaskRunnerFactory; 031import org.apache.activemq.usage.SystemUsage; 032 033/** 034 * 035 * 036 */ 037public class QueueRegion extends AbstractRegion { 038 039 public QueueRegion(RegionBroker broker, DestinationStatistics destinationStatistics, 040 SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, 041 DestinationFactory destinationFactory) { 042 super(broker, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); 043 } 044 045 public String toString() { 046 return "QueueRegion: destinations=" + destinations.size() + ", subscriptions=" + subscriptions.size() 047 + ", memory=" + usageManager.getMemoryUsage().getPercentUsage() + "%"; 048 } 049 050 protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) 051 throws JMSException { 052 ActiveMQDestination destination = info.getDestination(); 053 PolicyEntry entry = null; 054 if (destination != null && broker.getDestinationPolicy() != null) { 055 entry = broker.getDestinationPolicy().getEntryFor(destination); 056 057 } 058 if (info.isBrowser()) { 059 QueueBrowserSubscription sub = new QueueBrowserSubscription(broker,usageManager, context, info); 060 if (entry != null) { 061 entry.configure(broker, usageManager, sub); 062 } 063 return sub; 064 } else { 065 QueueSubscription sub = new QueueSubscription(broker, usageManager,context, info); 066 if (entry != null) { 067 entry.configure(broker, usageManager, sub); 068 } 069 return sub; 070 } 071 } 072 073 protected Set<ActiveMQDestination> getInactiveDestinations() { 074 Set<ActiveMQDestination> inactiveDestinations = super.getInactiveDestinations(); 075 for (Iterator<ActiveMQDestination> iter = inactiveDestinations.iterator(); iter.hasNext();) { 076 ActiveMQDestination dest = iter.next(); 077 if (!dest.isQueue()) { 078 iter.remove(); 079 } 080 } 081 return inactiveDestinations; 082 } 083 084 /* 085 * For a Queue, dispatch order is imperative to match acks, so the dispatch is deferred till 086 * the notification to ensure that the subscription chosen by the master is used. 087 * 088 * (non-Javadoc) 089 * @see org.apache.activemq.broker.region.AbstractRegion#processDispatchNotification(org.apache.activemq.command.MessageDispatchNotification) 090 */ 091 public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception { 092 processDispatchNotificationViaDestination(messageDispatchNotification); 093 } 094}