001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region.cursors;
018
019import java.io.IOException;
020import java.util.ArrayList;
021import java.util.Iterator;
022import java.util.LinkedList;
023import java.util.List;
024import java.util.concurrent.atomic.AtomicBoolean;
025import java.util.concurrent.atomic.AtomicLong;
026
027import org.apache.activemq.broker.Broker;
028import org.apache.activemq.broker.ConnectionContext;
029import org.apache.activemq.broker.region.Destination;
030import org.apache.activemq.broker.region.IndirectMessageReference;
031import org.apache.activemq.broker.region.MessageReference;
032import org.apache.activemq.broker.region.QueueMessageReference;
033import org.apache.activemq.command.Message;
034import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
035import org.apache.activemq.openwire.OpenWireFormat;
036import org.apache.activemq.store.PList;
037import org.apache.activemq.store.PListEntry;
038import org.apache.activemq.store.PListStore;
039import org.apache.activemq.usage.SystemUsage;
040import org.apache.activemq.usage.Usage;
041import org.apache.activemq.usage.UsageListener;
042import org.apache.activemq.util.ByteSequence;
043import org.apache.activemq.wireformat.WireFormat;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046
047/**
048 * persist pending messages pending message (messages awaiting dispatch to a
049 * consumer) cursor
050 */
051public class FilePendingMessageCursor extends AbstractPendingMessageCursor implements UsageListener {
052
053    static final Logger LOG = LoggerFactory.getLogger(FilePendingMessageCursor.class);
054
055    private static final AtomicLong NAME_COUNT = new AtomicLong();
056
057    protected Broker broker;
058    private final PListStore store;
059    private final String name;
060    private PendingList memoryList;
061    private PList diskList;
062    private Iterator<MessageReference> iter;
063    private Destination regionDestination;
064    private boolean iterating;
065    private boolean flushRequired;
066    private final AtomicBoolean started = new AtomicBoolean();
067    private final WireFormat wireFormat = new OpenWireFormat();
068
069    /**
070     * @param broker
071     * @param name
072     * @param prioritizedMessages
073     */
074    public FilePendingMessageCursor(Broker broker, String name, boolean prioritizedMessages) {
075        super(prioritizedMessages);
076        if (this.prioritizedMessages) {
077            this.memoryList = new PrioritizedPendingList();
078        } else {
079            this.memoryList = new OrderedPendingList();
080        }
081        this.broker = broker;
082        // the store can be null if the BrokerService has persistence
083        // turned off
084        this.store = broker.getTempDataStore();
085        this.name = NAME_COUNT.incrementAndGet() + "_" + name;
086    }
087
088    @Override
089    public void start() throws Exception {
090        if (started.compareAndSet(false, true)) {
091            if( this.broker != null) {
092                wireFormat.setVersion(this.broker.getBrokerService().getStoreOpenWireVersion());
093            }
094            super.start();
095            if (systemUsage != null) {
096                systemUsage.getMemoryUsage().addUsageListener(this);
097            }
098        }
099    }
100
101    @Override
102    public void stop() throws Exception {
103        if (started.compareAndSet(true, false)) {
104            super.stop();
105            if (systemUsage != null) {
106                systemUsage.getMemoryUsage().removeUsageListener(this);
107            }
108        }
109    }
110
111    /**
112     * @return true if there are no pending messages
113     */
114    @Override
115    public synchronized boolean isEmpty() {
116        if (memoryList.isEmpty() && isDiskListEmpty()) {
117            return true;
118        }
119        for (Iterator<MessageReference> iterator = memoryList.iterator(); iterator.hasNext();) {
120            MessageReference node = iterator.next();
121            if (node == QueueMessageReference.NULL_MESSAGE) {
122                continue;
123            }
124            if (!node.isDropped()) {
125                return false;
126            }
127            // We can remove dropped references.
128            iterator.remove();
129        }
130        return isDiskListEmpty();
131    }
132
133    /**
134     * reset the cursor
135     */
136    @Override
137    public synchronized void reset() {
138        iterating = true;
139        last = null;
140        if (isDiskListEmpty()) {
141            this.iter = this.memoryList.iterator();
142        } else {
143            this.iter = new DiskIterator();
144        }
145    }
146
147    @Override
148    public synchronized void release() {
149        iterating = false;
150        if (iter instanceof DiskIterator) {
151           ((DiskIterator)iter).release();
152        };
153        if (flushRequired) {
154            flushRequired = false;
155            if (!hasSpace()) {
156                flushToDisk();
157            }
158        }
159        // ensure any memory ref is released
160        iter = null;
161    }
162
163    @Override
164    public synchronized void destroy() throws Exception {
165        stop();
166        for (Iterator<MessageReference> i = memoryList.iterator(); i.hasNext();) {
167            MessageReference node = i.next();
168            node.decrementReferenceCount();
169        }
170        memoryList.clear();
171        destroyDiskList();
172    }
173
174    private void destroyDiskList() throws Exception {
175        if (diskList != null) {
176            store.removePList(name);
177            diskList = null;
178        }
179    }
180
181    @Override
182    public synchronized LinkedList<MessageReference> pageInList(int maxItems) {
183        LinkedList<MessageReference> result = new LinkedList<MessageReference>();
184        int count = 0;
185        for (Iterator<MessageReference> i = memoryList.iterator(); i.hasNext() && count < maxItems;) {
186            MessageReference ref = i.next();
187            ref.incrementReferenceCount();
188            result.add(ref);
189            count++;
190        }
191        if (count < maxItems && !isDiskListEmpty()) {
192            for (Iterator<MessageReference> i = new DiskIterator(); i.hasNext() && count < maxItems;) {
193                Message message = (Message) i.next();
194                message.setRegionDestination(regionDestination);
195                message.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
196                message.incrementReferenceCount();
197                result.add(message);
198                count++;
199            }
200        }
201        return result;
202    }
203
204    /**
205     * add message to await dispatch
206     *
207     * @param node
208     * @throws Exception
209     */
210    @Override
211    public synchronized boolean tryAddMessageLast(MessageReference node, long maxWaitTime) throws Exception {
212        if (!node.isExpired()) {
213            try {
214                regionDestination = (Destination) node.getMessage().getRegionDestination();
215                if (isDiskListEmpty()) {
216                    if (hasSpace() || this.store == null) {
217                        memoryList.addMessageLast(node);
218                        node.incrementReferenceCount();
219                        setCacheEnabled(true);
220                        return true;
221                    }
222                }
223                if (!hasSpace()) {
224                    if (isDiskListEmpty()) {
225                        expireOldMessages();
226                        if (hasSpace()) {
227                            memoryList.addMessageLast(node);
228                            node.incrementReferenceCount();
229                            return true;
230                        } else {
231                            flushToDisk();
232                        }
233                    }
234                }
235                if (systemUsage.getTempUsage().waitForSpace(maxWaitTime)) {
236                    ByteSequence bs = getByteSequence(node.getMessage());
237                    getDiskList().addLast(node.getMessageId().toString(), bs);
238                    return true;
239                }
240                return false;
241
242            } catch (Exception e) {
243                LOG.error("Caught an Exception adding a message: {} first to FilePendingMessageCursor ", node, e);
244                throw new RuntimeException(e);
245            }
246        } else {
247            discardExpiredMessage(node);
248        }
249        //message expired
250        return true;
251    }
252
253    /**
254     * add message to await dispatch
255     *
256     * @param node
257     */
258    @Override
259    public synchronized void addMessageFirst(MessageReference node) {
260        if (!node.isExpired()) {
261            try {
262                regionDestination = (Destination) node.getMessage().getRegionDestination();
263                if (isDiskListEmpty()) {
264                    if (hasSpace()) {
265                        memoryList.addMessageFirst(node);
266                        node.incrementReferenceCount();
267                        setCacheEnabled(true);
268                        return;
269                    }
270                }
271                if (!hasSpace()) {
272                    if (isDiskListEmpty()) {
273                        expireOldMessages();
274                        if (hasSpace()) {
275                            memoryList.addMessageFirst(node);
276                            node.incrementReferenceCount();
277                            return;
278                        } else {
279                            flushToDisk();
280                        }
281                    }
282                }
283                systemUsage.getTempUsage().waitForSpace();
284                node.decrementReferenceCount();
285                ByteSequence bs = getByteSequence(node.getMessage());
286                Object locator = getDiskList().addFirst(node.getMessageId().toString(), bs);
287                node.getMessageId().setPlistLocator(locator);
288
289            } catch (Exception e) {
290                LOG.error("Caught an Exception adding a message: {} first to FilePendingMessageCursor ", node, e);
291                throw new RuntimeException(e);
292            }
293        } else {
294            discardExpiredMessage(node);
295        }
296    }
297
298    /**
299     * @return true if there pending messages to dispatch
300     */
301    @Override
302    public synchronized boolean hasNext() {
303        return iter.hasNext();
304    }
305
306    /**
307     * @return the next pending message
308     */
309    @Override
310    public synchronized MessageReference next() {
311        MessageReference reference = iter.next();
312        last = reference;
313        if (!isDiskListEmpty()) {
314            // got from disk
315            reference.getMessage().setRegionDestination(regionDestination);
316            reference.getMessage().setMemoryUsage(this.getSystemUsage().getMemoryUsage());
317        }
318        reference.incrementReferenceCount();
319        return reference;
320    }
321
322    /**
323     * remove the message at the cursor position
324     */
325    @Override
326    public synchronized void remove() {
327        iter.remove();
328        if (last != null) {
329            last.decrementReferenceCount();
330        }
331    }
332
333    /**
334     * @param node
335     * @see org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor#remove(org.apache.activemq.broker.region.MessageReference)
336     */
337    @Override
338    public synchronized void remove(MessageReference node) {
339        if (memoryList.remove(node) != null) {
340            node.decrementReferenceCount();
341        }
342        if (!isDiskListEmpty()) {
343            try {
344                getDiskList().remove(node.getMessageId().getPlistLocator());
345            } catch (IOException e) {
346                throw new RuntimeException(e);
347            }
348        }
349    }
350
351    /**
352     * @return the number of pending messages
353     */
354    @Override
355    public synchronized int size() {
356        return memoryList.size() + (isDiskListEmpty() ? 0 : (int)getDiskList().size());
357    }
358
359    @Override
360    public synchronized long messageSize() {
361        return memoryList.messageSize() + (isDiskListEmpty() ? 0 : getDiskList().messageSize());
362    }
363
364    /**
365     * clear all pending messages
366     */
367    @Override
368    public synchronized void clear() {
369        memoryList.clear();
370        if (!isDiskListEmpty()) {
371            try {
372                getDiskList().destroy();
373            } catch (IOException e) {
374                throw new RuntimeException(e);
375            }
376        }
377        last = null;
378    }
379
380    @Override
381    public synchronized boolean isFull() {
382        return super.isFull() || (!isDiskListEmpty() && systemUsage != null && systemUsage.getTempUsage().isFull());
383    }
384
385    @Override
386    public boolean hasMessagesBufferedToDeliver() {
387        return !isEmpty();
388    }
389
390    @Override
391    public void setSystemUsage(SystemUsage usageManager) {
392        super.setSystemUsage(usageManager);
393    }
394
395    @Override
396    public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
397        if (newPercentUsage >= getMemoryUsageHighWaterMark()) {
398            List<MessageReference> expiredMessages = null;
399            synchronized (this) {
400                if (!flushRequired && size() != 0) {
401                    flushRequired =true;
402                    if (!iterating) {
403                        expiredMessages = expireOldMessages();
404                        if (!hasSpace()) {
405                            flushToDisk();
406                            flushRequired = false;
407                        }
408                    }
409                }
410            }
411
412            if (expiredMessages != null) {
413                for (MessageReference node : expiredMessages) {
414                    discardExpiredMessage(node);
415                }
416            }
417        }
418    }
419
420    @Override
421    public boolean isTransient() {
422        return true;
423    }
424
425    private synchronized List<MessageReference> expireOldMessages() {
426        List<MessageReference> expired = new ArrayList<MessageReference>();
427        if (!memoryList.isEmpty()) {
428            for (Iterator<MessageReference> iterator = memoryList.iterator(); iterator.hasNext();) {
429                MessageReference node = iterator.next();
430                if (node.isExpired()) {
431                    node.decrementReferenceCount();
432                    expired.add(node);
433                    iterator.remove();
434                }
435            }
436        }
437
438        return expired;
439    }
440
441    protected synchronized void flushToDisk() {
442        if (!memoryList.isEmpty() && store != null) {
443            long start = 0;
444            if (LOG.isTraceEnabled()) {
445                start = System.currentTimeMillis();
446                LOG.trace("{}, flushToDisk() mem list size: {} {}", new Object[] { name, memoryList.size(),
447                    (systemUsage != null ? systemUsage.getMemoryUsage() : "") });
448            }
449            for (Iterator<MessageReference> iterator = memoryList.iterator(); iterator.hasNext();) {
450                MessageReference node = iterator.next();
451                node.decrementReferenceCount();
452                ByteSequence bs;
453                try {
454                    bs = getByteSequence(node.getMessage());
455                    getDiskList().addLast(node.getMessageId().toString(), bs);
456                } catch (IOException e) {
457                    LOG.error("Failed to write to disk list", e);
458                    throw new RuntimeException(e);
459                }
460
461            }
462            memoryList.clear();
463            setCacheEnabled(false);
464            LOG.trace("{}, flushToDisk() done - {} ms {}", new Object[]{ name, (System.currentTimeMillis() - start), (systemUsage != null ? systemUsage.getMemoryUsage() : "") });
465        }
466    }
467
468    protected boolean isDiskListEmpty() {
469        return diskList == null || diskList.isEmpty();
470    }
471
472    public PList getDiskList() {
473        if (diskList == null) {
474            try {
475                diskList = store.getPList(name);
476            } catch (Exception e) {
477                LOG.error("Caught an IO Exception getting the DiskList {}", name, e);
478                throw new RuntimeException(e);
479            }
480        }
481        return diskList;
482    }
483
484    private void discardExpiredMessage(MessageReference reference) {
485        LOG.debug("Discarding expired message {}", reference);
486        if (reference.isExpired() && broker.isExpired(reference)) {
487            ConnectionContext context = new ConnectionContext(new NonCachedMessageEvaluationContext());
488            context.setBroker(broker);
489            ((Destination)reference.getRegionDestination()).messageExpired(context, null, new IndirectMessageReference(reference.getMessage()));
490        }
491    }
492
493    protected ByteSequence getByteSequence(Message message) throws IOException {
494        org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
495        return new ByteSequence(packet.data, packet.offset, packet.length);
496    }
497
498    protected Message getMessage(ByteSequence bs) throws IOException {
499        org.apache.activemq.util.ByteSequence packet = new org.apache.activemq.util.ByteSequence(bs.getData(), bs
500                .getOffset(), bs.getLength());
501        return (Message) this.wireFormat.unmarshal(packet);
502
503    }
504
505    final class DiskIterator implements Iterator<MessageReference> {
506        private final PList.PListIterator iterator;
507        DiskIterator() {
508            try {
509                iterator = getDiskList().iterator();
510            } catch (Exception e) {
511                throw new RuntimeException(e);
512            }
513        }
514
515        @Override
516        public boolean hasNext() {
517            return iterator.hasNext();
518        }
519
520        @Override
521        public MessageReference next() {
522            try {
523                PListEntry entry = iterator.next();
524                Message message = getMessage(entry.getByteSequence());
525                message.getMessageId().setPlistLocator(entry.getLocator());
526                return message;
527            } catch (IOException e) {
528                LOG.error("I/O error", e);
529                throw new RuntimeException(e);
530            }
531        }
532
533        @Override
534        public void remove() {
535            iterator.remove();
536        }
537
538        public void release() {
539            iterator.release();
540        }
541    }
542}