001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region.cursors;
018
019import static org.apache.activemq.broker.region.cursors.OrderedPendingList.getValues;
020
021import java.util.ArrayDeque;
022import java.util.Collection;
023import java.util.Deque;
024import java.util.HashMap;
025import java.util.Iterator;
026import java.util.List;
027import java.util.Map;
028
029import org.apache.activemq.broker.region.MessageReference;
030import org.apache.activemq.command.MessageId;
031import org.apache.activemq.management.SizeStatisticImpl;
032
033public class PrioritizedPendingList implements PendingList {
034
035    private static final Integer MAX_PRIORITY = 10;
036    private final OrderedPendingList[] lists = new OrderedPendingList[MAX_PRIORITY];
037    private final Map<MessageId, PendingNode> map = new HashMap<MessageId, PendingNode>();
038    private final SizeStatisticImpl messageSize;
039    private final PendingMessageHelper pendingMessageHelper;
040
041
042    public PrioritizedPendingList() {
043        for (int i = 0; i < MAX_PRIORITY; i++) {
044            this.lists[i] = new OrderedPendingList();
045        }
046        messageSize = new SizeStatisticImpl("messageSize", "The size in bytes of the pending messages");
047        messageSize.setEnabled(true);
048        pendingMessageHelper = new PendingMessageHelper(map, messageSize);
049    }
050
051    @Override
052    public PendingNode addMessageFirst(MessageReference message) {
053        PendingNode node = getList(message).addMessageFirst(message);
054        this.pendingMessageHelper.addToMap(message, node);
055        return node;
056    }
057
058    @Override
059    public PendingNode addMessageLast(MessageReference message) {
060        PendingNode node = getList(message).addMessageLast(message);
061        this.pendingMessageHelper.addToMap(message, node);
062        return node;
063    }
064
065    @Override
066    public void clear() {
067        for (int i = 0; i < MAX_PRIORITY; i++) {
068            this.lists[i].clear();
069        }
070        this.map.clear();
071        this.messageSize.reset();
072    }
073
074    @Override
075    public boolean isEmpty() {
076        return this.map.isEmpty();
077    }
078
079    @Override
080    public Iterator<MessageReference> iterator() {
081        return new PrioritizedPendingListIterator();
082    }
083
084    @Override
085    public PendingNode remove(MessageReference message) {
086        PendingNode node = null;
087        if (message != null) {
088            node = this.pendingMessageHelper.removeFromMap(message);
089            if (node != null) {
090                node.getList().removeNode(node);
091            }
092        }
093        return node;
094    }
095
096    @Override
097    public int size() {
098        return this.map.size();
099    }
100
101    @Override
102    public long messageSize() {
103        return this.messageSize.getTotalSize();
104    }
105
106    @Override
107    public String toString() {
108        return "PrioritizedPendingList(" + System.identityHashCode(this) + ")";
109    }
110
111    protected int getPriority(MessageReference message) {
112        int priority = javax.jms.Message.DEFAULT_PRIORITY;
113        if (message.getMessageId() != null) {
114            priority = Math.max(message.getMessage().getPriority(), 0);
115            priority = Math.min(priority, 9);
116        }
117        return priority;
118    }
119
120    protected OrderedPendingList getList(MessageReference msg) {
121        return lists[getPriority(msg)];
122    }
123
124    private final class PrioritizedPendingListIterator implements Iterator<MessageReference> {
125
126        private final Deque<Iterator<MessageReference>> iterators = new ArrayDeque<Iterator<MessageReference>>();
127
128        private Iterator<MessageReference> current;
129        private MessageReference currentMessage;
130
131        PrioritizedPendingListIterator() {
132            for (OrderedPendingList list : lists) {
133                if (!list.isEmpty()) {
134                    iterators.push(list.iterator());
135                }
136            }
137
138            current = iterators.poll();
139        }
140
141        @Override
142        public boolean hasNext() {
143            while (current != null) {
144                if (current.hasNext()) {
145                    return true;
146                } else {
147                    current = iterators.poll();
148                }
149            }
150
151            return false;
152        }
153
154        @Override
155        public MessageReference next() {
156            MessageReference result = null;
157
158            while (current != null) {
159                if (current.hasNext()) {
160                    result = currentMessage = current.next();
161                    break;
162                } else {
163                    current = iterators.poll();
164                }
165            }
166
167            return result;
168        }
169
170        @Override
171        public void remove() {
172            if (currentMessage != null) {
173                pendingMessageHelper.removeFromMap(currentMessage);
174                current.remove();
175                currentMessage = null;
176            }
177        }
178    }
179
180    @Override
181    public boolean contains(MessageReference message) {
182        if (message != null) {
183            return this.map.containsKey(message.getMessageId());
184        }
185        return false;
186    }
187
188    @Override
189    public Collection<MessageReference> values() {
190        return getValues(this);
191    }
192
193    @Override
194    public void addAll(PendingList pendingList) {
195        for(MessageReference messageReference : pendingList) {
196            addMessageLast(messageReference);
197        }
198    }
199
200    @Override
201    public MessageReference get(MessageId messageId) {
202        PendingNode node = map.get(messageId);
203        if (node != null) {
204            return node.getMessage();
205        }
206        return null;
207    }
208
209}