001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region.cursors;
018
019import java.io.IOException;
020import java.util.LinkedList;
021import java.util.List;
022import org.apache.activemq.ActiveMQMessageAudit;
023import org.apache.activemq.Service;
024import org.apache.activemq.broker.ConnectionContext;
025import org.apache.activemq.broker.region.Destination;
026import org.apache.activemq.broker.region.MessageReference;
027import org.apache.activemq.command.MessageId;
028import org.apache.activemq.usage.SystemUsage;
029
030/**
031 * Interface to pending message (messages awaiting disptach to a consumer)
032 * cursor
033 *
034 *
035 */
036public interface PendingMessageCursor extends Service {
037
038    static final long INFINITE_WAIT = 0;
039
040    /**
041     * Add a destination
042     *
043     * @param context
044     * @param destination
045     * @throws Exception
046     */
047    void add(ConnectionContext context, Destination destination) throws Exception;
048
049    /**
050     * remove a destination
051     *
052     * @param context
053     * @param destination
054     * @throws Exception
055     */
056    List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception;
057
058    /**
059     * @return true if there are no pending messages
060     */
061    boolean isEmpty();
062
063    /**
064     * check if a Destination is Empty for this cursor
065     *
066     * @param destination
067     * @return true id the Destination is empty
068     */
069    boolean isEmpty(Destination destination);
070
071    /**
072     * reset the cursor
073     */
074    void reset();
075
076    /**
077     * hint to the cursor to release any locks it might have grabbed after a
078     * reset
079     */
080    void release();
081
082    /**
083     * add message to await dispatch
084     *
085     * @param node
086     * @return boolean true if successful, false if cursor traps a duplicate
087     * @throws IOException
088     * @throws Exception
089     */
090    boolean addMessageLast(MessageReference node) throws Exception;
091
092    /**
093     * add message to await dispatch - if it can
094     *
095     * @param node
096     * @param maxWaitTime
097     * @return true if successful
098     * @throws IOException
099     * @throws Exception
100     */
101    boolean tryAddMessageLast(MessageReference node, long maxWaitTime) throws Exception;
102
103    /**
104     * add message to await dispatch
105     *
106     * @param node
107     * @throws Exception
108     */
109    void addMessageFirst(MessageReference node) throws Exception;
110
111    /**
112     * Add a message recovered from a retroactive policy
113     *
114     * @param node
115     * @throws Exception
116     */
117    void addRecoveredMessage(MessageReference node) throws Exception;
118
119    /**
120     * @return true if there pending messages to dispatch
121     */
122    boolean hasNext();
123
124    /**
125     * @return the next pending message with its reference count increment
126     */
127    MessageReference next();
128
129    /**
130     * remove the message at the cursor position
131     */
132    void remove();
133
134    /**
135     * @return the number of pending messages
136     */
137    int size();
138
139    long messageSize();
140
141    /**
142     * clear all pending messages
143     */
144    void clear();
145
146    /**
147     * Informs the Broker if the subscription needs to intervention to recover
148     * it's state e.g. DurableTopicSubscriber may do
149     *
150     * @return true if recovery required
151     */
152    boolean isRecoveryRequired();
153
154    /**
155     * @return the maximum batch size
156     */
157    int getMaxBatchSize();
158
159    /**
160     * Set the max batch size
161     *
162     * @param maxBatchSize
163     */
164    void setMaxBatchSize(int maxBatchSize);
165
166    /**
167     * Give the cursor a hint that we are about to remove messages from memory
168     * only
169     */
170    void resetForGC();
171
172    /**
173     * remove a node
174     *
175     * @param node
176     */
177    void remove(MessageReference node);
178
179    /**
180     * free up any internal buffers
181     */
182    void gc();
183
184    /**
185     * Set the UsageManager
186     *
187     * @param systemUsage
188     * @see org.apache.activemq.usage.SystemUsage
189     */
190    void setSystemUsage(SystemUsage systemUsage);
191
192    /**
193     * @return the usageManager
194     */
195    SystemUsage getSystemUsage();
196
197    /**
198     * @return the memoryUsageHighWaterMark
199     */
200    int getMemoryUsageHighWaterMark();
201
202    /**
203     * @param memoryUsageHighWaterMark the memoryUsageHighWaterMark to set
204     */
205    void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark);
206
207    /**
208     * @return true if the cursor is full
209     */
210    boolean isFull();
211
212    /**
213     * @return true if the cursor has space to page messages into
214     */
215    public boolean hasSpace();
216
217    /**
218     * @return true if the cursor has buffered messages ready to deliver
219     */
220    boolean hasMessagesBufferedToDeliver();
221
222    /**
223     * destroy the cursor
224     *
225     * @throws Exception
226     */
227    void destroy() throws Exception;
228
229    /**
230     * Page in a restricted number of messages and increment the reference count
231     *
232     * @param maxItems
233     * @return a list of paged in messages
234     */
235    LinkedList<MessageReference> pageInList(int maxItems);
236
237    /**
238     * set the maximum number of producers to track at one time
239     * @param value
240     */
241    void setMaxProducersToAudit(int value);
242
243    /**
244     * @return the maximum number of producers to audit
245     */
246    int getMaxProducersToAudit();
247
248    /**
249     * Set the maximum depth of message ids to track
250     * @param depth
251     */
252    void setMaxAuditDepth(int depth);
253
254    /**
255     * @return the audit depth
256     */
257    int getMaxAuditDepth();
258
259    /**
260     * @return the enableAudit
261     */
262    public boolean isEnableAudit();
263    /**
264     * @param enableAudit the enableAudit to set
265     */
266    public void setEnableAudit(boolean enableAudit);
267
268    /**
269     * @return true if the underlying state of this cursor
270     * disappears when the broker shuts down
271     */
272    public boolean isTransient();
273
274
275    /**
276     * set the audit
277     * @param audit
278     */
279    public void setMessageAudit(ActiveMQMessageAudit audit);
280
281
282    /**
283     * @return the audit - could be null
284     */
285    public ActiveMQMessageAudit getMessageAudit();
286
287    /**
288     * use a cache to improve performance
289     * @param useCache
290     */
291    public void setUseCache(boolean useCache);
292
293    /**
294     * @return true if a cache may be used
295     */
296    public boolean isUseCache();
297
298    /**
299     * remove from auditing the message id
300     * @param id
301     */
302    public void rollback(MessageId id);
303
304    /**
305     * @return true if cache is being used
306     */
307    public boolean isCacheEnabled();
308
309    public void rebase();
310
311}