001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb.plist;
018
019import org.apache.activemq.broker.BrokerService;
020import org.apache.activemq.broker.BrokerServiceAware;
021import org.apache.activemq.openwire.OpenWireFormat;
022import org.apache.activemq.store.JournaledStore;
023import org.apache.activemq.store.PList;
024import org.apache.activemq.store.PListStore;
025import org.apache.activemq.store.kahadb.disk.index.BTreeIndex;
026import org.apache.activemq.store.kahadb.disk.journal.Journal;
027import org.apache.activemq.store.kahadb.disk.journal.Location;
028import org.apache.activemq.store.kahadb.disk.page.Page;
029import org.apache.activemq.store.kahadb.disk.page.PageFile;
030import org.apache.activemq.store.kahadb.disk.page.Transaction;
031import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
032import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller;
033import org.apache.activemq.thread.Scheduler;
034import org.apache.activemq.util.*;
035import org.apache.activemq.wireformat.WireFormat;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038
039import java.io.DataInput;
040import java.io.DataOutput;
041import java.io.File;
042import java.io.IOException;
043import java.util.*;
044import java.util.Map.Entry;
045
046/**
047 * @org.apache.xbean.XBean
048 */
049public class PListStoreImpl extends ServiceSupport implements BrokerServiceAware, Runnable, PListStore, JournaledStore {
050    static final Logger LOG = LoggerFactory.getLogger(PListStoreImpl.class);
051    private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
052
053    static final int CLOSED_STATE = 1;
054    static final int OPEN_STATE = 2;
055
056    private File directory;
057    private File indexDirectory;
058    PageFile pageFile;
059    private Journal journal;
060    private LockFile lockFile;
061    private boolean failIfDatabaseIsLocked;
062    private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
063    private int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
064    private boolean enableIndexWriteAsync = false;
065    private boolean initialized = false;
066    private boolean lazyInit = true;
067    // private int indexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
068    MetaData metaData = new MetaData(this);
069    final MetaDataMarshaller metaDataMarshaller = new MetaDataMarshaller(this);
070    Map<String, PListImpl> persistentLists = new HashMap<String, PListImpl>();
071    final Object indexLock = new Object();
072    private Scheduler scheduler;
073    private long cleanupInterval = 30000;
074
075    private int indexPageSize = PageFile.DEFAULT_PAGE_SIZE;
076    private int indexCacheSize = PageFile.DEFAULT_PAGE_CACHE_SIZE;
077    private int indexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
078    private boolean indexEnablePageCaching = true;
079
080    public Object getIndexLock() {
081        return indexLock;
082    }
083
084    @Override
085    public void setBrokerService(BrokerService brokerService) {
086        this.scheduler = brokerService.getScheduler();
087    }
088
089    public int getIndexPageSize() {
090        return indexPageSize;
091    }
092
093    public int getIndexCacheSize() {
094        return indexCacheSize;
095    }
096
097    public int getIndexWriteBatchSize() {
098        return indexWriteBatchSize;
099    }
100
101    public void setIndexPageSize(int indexPageSize) {
102        this.indexPageSize = indexPageSize;
103    }
104
105    public void setIndexCacheSize(int indexCacheSize) {
106        this.indexCacheSize = indexCacheSize;
107    }
108
109    public void setIndexWriteBatchSize(int indexWriteBatchSize) {
110        this.indexWriteBatchSize = indexWriteBatchSize;
111    }
112
113    public boolean getIndexEnablePageCaching() {
114        return indexEnablePageCaching;
115    }
116
117    public void setIndexEnablePageCaching(boolean indexEnablePageCaching) {
118        this.indexEnablePageCaching = indexEnablePageCaching;
119    }
120
121    protected class MetaData {
122        protected MetaData(PListStoreImpl store) {
123            this.store = store;
124        }
125
126        private final PListStoreImpl store;
127        Page<MetaData> page;
128        BTreeIndex<String, PListImpl> lists;
129
130        void createIndexes(Transaction tx) throws IOException {
131            this.lists = new BTreeIndex<String, PListImpl>(pageFile, tx.allocate().getPageId());
132        }
133
134        void load(Transaction tx) throws IOException {
135            this.lists.setKeyMarshaller(StringMarshaller.INSTANCE);
136            this.lists.setValueMarshaller(new PListMarshaller(this.store));
137            this.lists.load(tx);
138        }
139
140        void loadLists(Transaction tx, Map<String, PListImpl> lists) throws IOException {
141            for (Iterator<Entry<String, PListImpl>> i = this.lists.iterator(tx); i.hasNext();) {
142                Entry<String, PListImpl> entry = i.next();
143                entry.getValue().load(tx);
144                lists.put(entry.getKey(), entry.getValue());
145            }
146        }
147
148        public void read(DataInput is) throws IOException {
149            this.lists = new BTreeIndex<String, PListImpl>(pageFile, is.readLong());
150            this.lists.setKeyMarshaller(StringMarshaller.INSTANCE);
151            this.lists.setValueMarshaller(new PListMarshaller(this.store));
152        }
153
154        public void write(DataOutput os) throws IOException {
155            os.writeLong(this.lists.getPageId());
156        }
157    }
158
159    class MetaDataMarshaller extends VariableMarshaller<MetaData> {
160        private final PListStoreImpl store;
161
162        MetaDataMarshaller(PListStoreImpl store) {
163            this.store = store;
164        }
165        @Override
166        public MetaData readPayload(DataInput dataIn) throws IOException {
167            MetaData rc = new MetaData(this.store);
168            rc.read(dataIn);
169            return rc;
170        }
171
172        @Override
173        public void writePayload(MetaData object, DataOutput dataOut) throws IOException {
174            object.write(dataOut);
175        }
176    }
177
178    class PListMarshaller extends VariableMarshaller<PListImpl> {
179        private final PListStoreImpl store;
180        PListMarshaller(PListStoreImpl store) {
181            this.store = store;
182        }
183        @Override
184        public PListImpl readPayload(DataInput dataIn) throws IOException {
185            PListImpl result = new PListImpl(this.store);
186            result.read(dataIn);
187            return result;
188        }
189
190        @Override
191        public void writePayload(PListImpl list, DataOutput dataOut) throws IOException {
192            list.write(dataOut);
193        }
194    }
195
196    public Journal getJournal() {
197        return this.journal;
198    }
199
200    @Override
201    public File getDirectory() {
202        return directory;
203    }
204
205    @Override
206    public void setDirectory(File directory) {
207        this.directory = directory;
208    }
209
210    public File getIndexDirectory() {
211        return indexDirectory != null ? indexDirectory : directory;
212    }
213
214    public void setIndexDirectory(File indexDirectory) {
215        this.indexDirectory = indexDirectory;
216    }
217
218    @Override
219    public long size() {
220        synchronized (this) {
221            if (!initialized) {
222                return 0;
223            }
224        }
225        try {
226            return journal.getDiskSize() + pageFile.getDiskSize();
227        } catch (IOException e) {
228            throw new RuntimeException(e);
229        }
230    }
231
232    @Override
233    public PListImpl getPList(final String name) throws Exception {
234        if (!isStarted()) {
235            throw new IllegalStateException("Not started");
236        }
237        intialize();
238        synchronized (indexLock) {
239            synchronized (this) {
240                PListImpl result = this.persistentLists.get(name);
241                if (result == null) {
242                    final PListImpl pl = new PListImpl(this);
243                    pl.setName(name);
244                    getPageFile().tx().execute(new Transaction.Closure<IOException>() {
245                        @Override
246                        public void execute(Transaction tx) throws IOException {
247                            pl.setHeadPageId(tx.allocate().getPageId());
248                            pl.load(tx);
249                            metaData.lists.put(tx, name, pl);
250                        }
251                    });
252                    result = pl;
253                    this.persistentLists.put(name, pl);
254                }
255                final PListImpl toLoad = result;
256                getPageFile().tx().execute(new Transaction.Closure<IOException>() {
257                    @Override
258                    public void execute(Transaction tx) throws IOException {
259                        toLoad.load(tx);
260                    }
261                });
262
263                return result;
264            }
265        }
266    }
267
268    @Override
269    public boolean removePList(final String name) throws Exception {
270        boolean result = false;
271        synchronized (indexLock) {
272            synchronized (this) {
273                final PList pl = this.persistentLists.remove(name);
274                result = pl != null;
275                if (result) {
276                    getPageFile().tx().execute(new Transaction.Closure<IOException>() {
277                        @Override
278                        public void execute(Transaction tx) throws IOException {
279                            metaData.lists.remove(tx, name);
280                            pl.destroy();
281                        }
282                    });
283                }
284            }
285        }
286        return result;
287    }
288
289    protected synchronized void intialize() throws Exception {
290        if (isStarted()) {
291            if (this.initialized == false) {
292                if (this.directory == null) {
293                    this.directory = getDefaultDirectory();
294                }
295                IOHelper.mkdirs(this.directory);
296                IOHelper.deleteChildren(this.directory);
297                if (this.indexDirectory != null) {
298                    IOHelper.mkdirs(this.indexDirectory);
299                    IOHelper.deleteChildren(this.indexDirectory);
300                }
301                lock();
302                this.journal = new Journal();
303                this.journal.setDirectory(directory);
304                this.journal.setMaxFileLength(getJournalMaxFileLength());
305                this.journal.setWriteBatchSize(getJournalMaxWriteBatchSize());
306                this.journal.start();
307                this.pageFile = new PageFile(getIndexDirectory(), "tmpDB");
308                this.pageFile.setEnablePageCaching(getIndexEnablePageCaching());
309                this.pageFile.setPageSize(getIndexPageSize());
310                this.pageFile.setWriteBatchSize(getIndexWriteBatchSize());
311                this.pageFile.setPageCacheSize(getIndexCacheSize());
312                this.pageFile.load();
313
314                this.pageFile.tx().execute(new Transaction.Closure<IOException>() {
315                    @Override
316                    public void execute(Transaction tx) throws IOException {
317                        if (pageFile.getPageCount() == 0) {
318                            Page<MetaData> page = tx.allocate();
319                            assert page.getPageId() == 0;
320                            page.set(metaData);
321                            metaData.page = page;
322                            metaData.createIndexes(tx);
323                            tx.store(metaData.page, metaDataMarshaller, true);
324
325                        } else {
326                            Page<MetaData> page = tx.load(0, metaDataMarshaller);
327                            metaData = page.get();
328                            metaData.page = page;
329                        }
330                        metaData.load(tx);
331                        metaData.loadLists(tx, persistentLists);
332                    }
333                });
334                this.pageFile.flush();
335
336                if (cleanupInterval > 0) {
337                    if (scheduler == null) {
338                        scheduler = new Scheduler(PListStoreImpl.class.getSimpleName());
339                        scheduler.start();
340                    }
341                    scheduler.executePeriodically(this, cleanupInterval);
342                }
343                this.initialized = true;
344                LOG.info(this + " initialized");
345            }
346        }
347    }
348
349    protected File getDefaultDirectory() {
350        return new File(IOHelper.getDefaultDataDirectory() + File.pathSeparator + "delayedDB");
351    }
352
353    protected void cleanupDirectory(final File dir) {
354        if (dir != null && dir.exists()) {
355            IOHelper.delete(dir);
356        }
357    }
358
359    @Override
360    protected synchronized void doStart() throws Exception {
361        if (!lazyInit) {
362            intialize();
363        } else {
364            if (this.directory == null) {
365                this.directory = getDefaultDirectory();
366            }
367            //Go ahead and clean up previous data on start up
368            cleanupDirectory(this.directory);
369            cleanupDirectory(this.indexDirectory);
370        }
371        LOG.info(this + " started");
372    }
373
374    @Override
375    protected synchronized void doStop(ServiceStopper stopper) throws Exception {
376        if (scheduler != null) {
377            if (PListStoreImpl.class.getSimpleName().equals(scheduler.getName())) {
378                scheduler.stop();
379                scheduler = null;
380            }
381        }
382        for (PListImpl pl : this.persistentLists.values()) {
383            pl.unload(null);
384        }
385        if (this.pageFile != null) {
386            this.pageFile.unload();
387        }
388        if (this.journal != null) {
389            journal.close();
390        }
391        if (this.lockFile != null) {
392            this.lockFile.unlock();
393        }
394        this.lockFile = null;
395        this.initialized = false;
396        LOG.info(this + " stopped");
397
398    }
399
400    @Override
401    public void run() {
402        try {
403            if (isStopping()) {
404                return;
405            }
406            final int lastJournalFileId = journal.getLastAppendLocation().getDataFileId();
407            final Set<Integer> candidates = journal.getFileMap().keySet();
408            LOG.trace("Full gc candidate set:" + candidates);
409            if (candidates.size() > 1) {
410                // prune current write
411                for (Iterator<Integer> iterator = candidates.iterator(); iterator.hasNext();) {
412                    if (iterator.next() >= lastJournalFileId) {
413                        iterator.remove();
414                    }
415                }
416                List<PListImpl> plists = null;
417                synchronized (indexLock) {
418                    synchronized (this) {
419                        plists = new ArrayList<PListImpl>(persistentLists.values());
420                    }
421                }
422                for (PListImpl list : plists) {
423                    list.claimFileLocations(candidates);
424                    if (isStopping()) {
425                        return;
426                    }
427                    LOG.trace("Remaining gc candidate set after refs from: " + list.getName() + ":" + candidates);
428                }
429                LOG.trace("GC Candidate set:" + candidates);
430                this.journal.removeDataFiles(candidates);
431            }
432        } catch (IOException e) {
433            LOG.error("Exception on periodic cleanup: " + e, e);
434        }
435    }
436
437    ByteSequence getPayload(Location location) throws IllegalStateException, IOException {
438        ByteSequence result = null;
439        result = this.journal.read(location);
440        return result;
441    }
442
443    Location write(ByteSequence payload, boolean sync) throws IllegalStateException, IOException {
444        return this.journal.write(payload, sync);
445    }
446
447    private void lock() throws IOException {
448        if (lockFile == null) {
449            File lockFileName = new File(directory, "lock");
450            lockFile = new LockFile(lockFileName, true);
451            if (failIfDatabaseIsLocked) {
452                lockFile.lock();
453            } else {
454                while (true) {
455                    try {
456                        lockFile.lock();
457                        break;
458                    } catch (IOException e) {
459                        LOG.info("Database " + lockFileName + " is locked... waiting "
460                                + (DATABASE_LOCKED_WAIT_DELAY / 1000)
461                                + " seconds for the database to be unlocked. Reason: " + e);
462                        try {
463                            Thread.sleep(DATABASE_LOCKED_WAIT_DELAY);
464                        } catch (InterruptedException e1) {
465                        }
466                    }
467                }
468            }
469        }
470    }
471
472    PageFile getPageFile() {
473        this.pageFile.isLoaded();
474        return this.pageFile;
475    }
476
477    public boolean isFailIfDatabaseIsLocked() {
478        return failIfDatabaseIsLocked;
479    }
480
481    public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) {
482        this.failIfDatabaseIsLocked = failIfDatabaseIsLocked;
483    }
484
485    @Override
486    public int getJournalMaxFileLength() {
487        return journalMaxFileLength;
488    }
489
490    public void setJournalMaxFileLength(int journalMaxFileLength) {
491        this.journalMaxFileLength = journalMaxFileLength;
492    }
493
494    public int getJournalMaxWriteBatchSize() {
495        return journalMaxWriteBatchSize;
496    }
497
498    public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
499        this.journalMaxWriteBatchSize = journalMaxWriteBatchSize;
500    }
501
502    public boolean isEnableIndexWriteAsync() {
503        return enableIndexWriteAsync;
504    }
505
506    public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
507        this.enableIndexWriteAsync = enableIndexWriteAsync;
508    }
509
510    public long getCleanupInterval() {
511        return cleanupInterval;
512    }
513
514    public void setCleanupInterval(long cleanupInterval) {
515        this.cleanupInterval = cleanupInterval;
516    }
517
518    public boolean isLazyInit() {
519        return lazyInit;
520    }
521
522    public void setLazyInit(boolean lazyInit) {
523        this.lazyInit = lazyInit;
524    }
525
526    @Override
527    public String toString() {
528        String path = getDirectory() != null ? getDirectory().getAbsolutePath() : "DIRECTORY_NOT_SET";
529        if (indexDirectory != null) {
530            path += "|" + indexDirectory.getAbsolutePath();
531        }
532        return "PListStore:[" + path + "]";
533    }
534}