001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.kahadb; 018 019import static org.apache.activemq.store.kahadb.disk.journal.Location.NOT_SET; 020 021import java.io.ByteArrayInputStream; 022import java.io.ByteArrayOutputStream; 023import java.io.DataInput; 024import java.io.DataOutput; 025import java.io.EOFException; 026import java.io.File; 027import java.io.IOException; 028import java.io.InputStream; 029import java.io.InterruptedIOException; 030import java.io.ObjectInputStream; 031import java.io.ObjectOutputStream; 032import java.io.OutputStream; 033import java.util.ArrayList; 034import java.util.Arrays; 035import java.util.Collection; 036import java.util.Collections; 037import java.util.Date; 038import java.util.HashMap; 039import java.util.HashSet; 040import java.util.Iterator; 041import java.util.LinkedHashMap; 042import java.util.LinkedHashSet; 043import java.util.LinkedList; 044import java.util.List; 045import java.util.Map; 046import java.util.Map.Entry; 047import java.util.Set; 048import java.util.SortedSet; 049import java.util.TreeMap; 050import java.util.TreeSet; 051import java.util.concurrent.ConcurrentHashMap; 052import java.util.concurrent.ConcurrentMap; 053import java.util.concurrent.Executors; 054import java.util.concurrent.ScheduledExecutorService; 055import java.util.concurrent.ThreadFactory; 056import java.util.concurrent.TimeUnit; 057import java.util.concurrent.atomic.AtomicBoolean; 058import java.util.concurrent.atomic.AtomicLong; 059import java.util.concurrent.atomic.AtomicReference; 060import java.util.concurrent.locks.ReentrantReadWriteLock; 061 062import org.apache.activemq.ActiveMQMessageAuditNoSync; 063import org.apache.activemq.broker.BrokerService; 064import org.apache.activemq.broker.BrokerServiceAware; 065import org.apache.activemq.broker.region.Destination; 066import org.apache.activemq.broker.region.Queue; 067import org.apache.activemq.broker.region.Topic; 068import org.apache.activemq.command.MessageAck; 069import org.apache.activemq.command.TransactionId; 070import org.apache.activemq.openwire.OpenWireFormat; 071import org.apache.activemq.protobuf.Buffer; 072import org.apache.activemq.store.MessageStore; 073import org.apache.activemq.store.MessageStoreStatistics; 074import org.apache.activemq.store.MessageStoreSubscriptionStatistics; 075import org.apache.activemq.store.TopicMessageStore; 076import org.apache.activemq.store.kahadb.data.KahaAckMessageFileMapCommand; 077import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand; 078import org.apache.activemq.store.kahadb.data.KahaCommitCommand; 079import org.apache.activemq.store.kahadb.data.KahaDestination; 080import org.apache.activemq.store.kahadb.data.KahaEntryType; 081import org.apache.activemq.store.kahadb.data.KahaPrepareCommand; 082import org.apache.activemq.store.kahadb.data.KahaProducerAuditCommand; 083import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand; 084import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand; 085import org.apache.activemq.store.kahadb.data.KahaRewrittenDataFileCommand; 086import org.apache.activemq.store.kahadb.data.KahaRollbackCommand; 087import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand; 088import org.apache.activemq.store.kahadb.data.KahaTraceCommand; 089import org.apache.activemq.store.kahadb.data.KahaTransactionInfo; 090import org.apache.activemq.store.kahadb.data.KahaUpdateMessageCommand; 091import org.apache.activemq.store.kahadb.disk.index.BTreeIndex; 092import org.apache.activemq.store.kahadb.disk.index.BTreeVisitor; 093import org.apache.activemq.store.kahadb.disk.index.ListIndex; 094import org.apache.activemq.store.kahadb.disk.journal.DataFile; 095import org.apache.activemq.store.kahadb.disk.journal.Journal; 096import org.apache.activemq.store.kahadb.disk.journal.Journal.JournalDiskSyncStrategy; 097import org.apache.activemq.store.kahadb.disk.journal.Location; 098import org.apache.activemq.store.kahadb.disk.journal.TargetedDataFileAppender; 099import org.apache.activemq.store.kahadb.disk.page.Page; 100import org.apache.activemq.store.kahadb.disk.page.PageFile; 101import org.apache.activemq.store.kahadb.disk.page.Transaction; 102import org.apache.activemq.store.kahadb.disk.util.LocationMarshaller; 103import org.apache.activemq.store.kahadb.disk.util.LongMarshaller; 104import org.apache.activemq.store.kahadb.disk.util.Marshaller; 105import org.apache.activemq.store.kahadb.disk.util.Sequence; 106import org.apache.activemq.store.kahadb.disk.util.SequenceSet; 107import org.apache.activemq.store.kahadb.disk.util.StringMarshaller; 108import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller; 109import org.apache.activemq.util.ByteSequence; 110import org.apache.activemq.util.DataByteArrayInputStream; 111import org.apache.activemq.util.DataByteArrayOutputStream; 112import org.apache.activemq.util.IOExceptionSupport; 113import org.apache.activemq.util.IOHelper; 114import org.apache.activemq.util.ServiceStopper; 115import org.apache.activemq.util.ServiceSupport; 116import org.apache.activemq.util.ThreadPoolUtils; 117import org.slf4j.Logger; 118import org.slf4j.LoggerFactory; 119import org.slf4j.MDC; 120 121public abstract class MessageDatabase extends ServiceSupport implements BrokerServiceAware { 122 123 protected BrokerService brokerService; 124 125 public static final String PROPERTY_LOG_SLOW_ACCESS_TIME = "org.apache.activemq.store.kahadb.LOG_SLOW_ACCESS_TIME"; 126 public static final int LOG_SLOW_ACCESS_TIME = Integer.getInteger(PROPERTY_LOG_SLOW_ACCESS_TIME, 0); 127 public static final File DEFAULT_DIRECTORY = new File("KahaDB"); 128 protected static final Buffer UNMATCHED; 129 static { 130 UNMATCHED = new Buffer(new byte[]{}); 131 } 132 private static final Logger LOG = LoggerFactory.getLogger(MessageDatabase.class); 133 134 static final int CLOSED_STATE = 1; 135 static final int OPEN_STATE = 2; 136 static final long NOT_ACKED = -1; 137 138 static final int VERSION = 6; 139 140 static final byte COMPACTED_JOURNAL_FILE = DataFile.STANDARD_LOG_FILE + 1; 141 142 protected class Metadata { 143 protected Page<Metadata> page; 144 protected int state; 145 protected BTreeIndex<String, StoredDestination> destinations; 146 protected Location lastUpdate; 147 protected Location firstInProgressTransactionLocation; 148 protected Location producerSequenceIdTrackerLocation = null; 149 protected Location ackMessageFileMapLocation = null; 150 protected transient ActiveMQMessageAuditNoSync producerSequenceIdTracker = new ActiveMQMessageAuditNoSync(); 151 protected transient Map<Integer, Set<Integer>> ackMessageFileMap = new HashMap<Integer, Set<Integer>>(); 152 protected int version = VERSION; 153 protected int openwireVersion = OpenWireFormat.DEFAULT_STORE_VERSION; 154 155 public void read(DataInput is) throws IOException { 156 state = is.readInt(); 157 destinations = new BTreeIndex<String, StoredDestination>(pageFile, is.readLong()); 158 if (is.readBoolean()) { 159 lastUpdate = LocationMarshaller.INSTANCE.readPayload(is); 160 } else { 161 lastUpdate = null; 162 } 163 if (is.readBoolean()) { 164 firstInProgressTransactionLocation = LocationMarshaller.INSTANCE.readPayload(is); 165 } else { 166 firstInProgressTransactionLocation = null; 167 } 168 try { 169 if (is.readBoolean()) { 170 producerSequenceIdTrackerLocation = LocationMarshaller.INSTANCE.readPayload(is); 171 } else { 172 producerSequenceIdTrackerLocation = null; 173 } 174 } catch (EOFException expectedOnUpgrade) { 175 } 176 try { 177 version = is.readInt(); 178 } catch (EOFException expectedOnUpgrade) { 179 version = 1; 180 } 181 if (version >= 5 && is.readBoolean()) { 182 ackMessageFileMapLocation = LocationMarshaller.INSTANCE.readPayload(is); 183 } else { 184 ackMessageFileMapLocation = null; 185 } 186 try { 187 openwireVersion = is.readInt(); 188 } catch (EOFException expectedOnUpgrade) { 189 openwireVersion = OpenWireFormat.DEFAULT_LEGACY_VERSION; 190 } 191 LOG.info("KahaDB is version " + version); 192 } 193 194 public void write(DataOutput os) throws IOException { 195 os.writeInt(state); 196 os.writeLong(destinations.getPageId()); 197 198 if (lastUpdate != null) { 199 os.writeBoolean(true); 200 LocationMarshaller.INSTANCE.writePayload(lastUpdate, os); 201 } else { 202 os.writeBoolean(false); 203 } 204 205 if (firstInProgressTransactionLocation != null) { 206 os.writeBoolean(true); 207 LocationMarshaller.INSTANCE.writePayload(firstInProgressTransactionLocation, os); 208 } else { 209 os.writeBoolean(false); 210 } 211 212 if (producerSequenceIdTrackerLocation != null) { 213 os.writeBoolean(true); 214 LocationMarshaller.INSTANCE.writePayload(producerSequenceIdTrackerLocation, os); 215 } else { 216 os.writeBoolean(false); 217 } 218 os.writeInt(VERSION); 219 if (ackMessageFileMapLocation != null) { 220 os.writeBoolean(true); 221 LocationMarshaller.INSTANCE.writePayload(ackMessageFileMapLocation, os); 222 } else { 223 os.writeBoolean(false); 224 } 225 os.writeInt(this.openwireVersion); 226 } 227 } 228 229 class MetadataMarshaller extends VariableMarshaller<Metadata> { 230 @Override 231 public Metadata readPayload(DataInput dataIn) throws IOException { 232 Metadata rc = createMetadata(); 233 rc.read(dataIn); 234 return rc; 235 } 236 237 @Override 238 public void writePayload(Metadata object, DataOutput dataOut) throws IOException { 239 object.write(dataOut); 240 } 241 } 242 243 protected PageFile pageFile; 244 protected Journal journal; 245 protected Metadata metadata = new Metadata(); 246 247 protected MetadataMarshaller metadataMarshaller = new MetadataMarshaller(); 248 249 protected boolean failIfDatabaseIsLocked; 250 251 protected boolean deleteAllMessages; 252 protected File directory = DEFAULT_DIRECTORY; 253 protected File indexDirectory = null; 254 protected ScheduledExecutorService scheduler; 255 private final Object schedulerLock = new Object(); 256 257 protected String journalDiskSyncStrategy = JournalDiskSyncStrategy.ALWAYS.name(); 258 protected boolean archiveDataLogs; 259 protected File directoryArchive; 260 protected AtomicLong journalSize = new AtomicLong(0); 261 long journalDiskSyncInterval = 1000; 262 long checkpointInterval = 5*1000; 263 long cleanupInterval = 30*1000; 264 int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH; 265 int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE; 266 boolean enableIndexWriteAsync = false; 267 int setIndexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE; 268 private String preallocationScope = Journal.PreallocationScope.ENTIRE_JOURNAL.name(); 269 private String preallocationStrategy = Journal.PreallocationStrategy.SPARSE_FILE.name(); 270 271 protected AtomicBoolean opened = new AtomicBoolean(); 272 private boolean ignoreMissingJournalfiles = false; 273 private int indexCacheSize = 10000; 274 private boolean checkForCorruptJournalFiles = false; 275 private boolean checksumJournalFiles = true; 276 protected boolean forceRecoverIndex = false; 277 private boolean archiveCorruptedIndex = false; 278 private boolean useIndexLFRUEviction = false; 279 private float indexLFUEvictionFactor = 0.2f; 280 private boolean enableIndexDiskSyncs = true; 281 private boolean enableIndexRecoveryFile = true; 282 private boolean enableIndexPageCaching = true; 283 ReentrantReadWriteLock checkpointLock = new ReentrantReadWriteLock(); 284 285 private boolean enableAckCompaction = true; 286 private int compactAcksAfterNoGC = 10; 287 private boolean compactAcksIgnoresStoreGrowth = false; 288 private int checkPointCyclesWithNoGC; 289 private int journalLogOnLastCompactionCheck; 290 private boolean enableSubscriptionStatistics = false; 291 292 //only set when using JournalDiskSyncStrategy.PERIODIC 293 protected final AtomicReference<Location> lastAsyncJournalUpdate = new AtomicReference<>(); 294 295 @Override 296 public void doStart() throws Exception { 297 load(); 298 } 299 300 @Override 301 public void doStop(ServiceStopper stopper) throws Exception { 302 unload(); 303 } 304 305 private void loadPageFile() throws IOException { 306 this.indexLock.writeLock().lock(); 307 try { 308 final PageFile pageFile = getPageFile(); 309 pageFile.load(); 310 pageFile.tx().execute(new Transaction.Closure<IOException>() { 311 @Override 312 public void execute(Transaction tx) throws IOException { 313 if (pageFile.getPageCount() == 0) { 314 // First time this is created.. Initialize the metadata 315 Page<Metadata> page = tx.allocate(); 316 assert page.getPageId() == 0; 317 page.set(metadata); 318 metadata.page = page; 319 metadata.state = CLOSED_STATE; 320 metadata.destinations = new BTreeIndex<String, StoredDestination>(pageFile, tx.allocate().getPageId()); 321 322 tx.store(metadata.page, metadataMarshaller, true); 323 } else { 324 Page<Metadata> page = tx.load(0, metadataMarshaller); 325 metadata = page.get(); 326 metadata.page = page; 327 } 328 metadata.destinations.setKeyMarshaller(StringMarshaller.INSTANCE); 329 metadata.destinations.setValueMarshaller(new StoredDestinationMarshaller()); 330 metadata.destinations.load(tx); 331 } 332 }); 333 // Load up all the destinations since we need to scan all the indexes to figure out which journal files can be deleted. 334 // Perhaps we should just keep an index of file 335 storedDestinations.clear(); 336 pageFile.tx().execute(new Transaction.Closure<IOException>() { 337 @Override 338 public void execute(Transaction tx) throws IOException { 339 for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator.hasNext();) { 340 Entry<String, StoredDestination> entry = iterator.next(); 341 StoredDestination sd = loadStoredDestination(tx, entry.getKey(), entry.getValue().subscriptions!=null); 342 storedDestinations.put(entry.getKey(), sd); 343 344 if (checkForCorruptJournalFiles) { 345 // sanity check the index also 346 if (!entry.getValue().locationIndex.isEmpty(tx)) { 347 if (entry.getValue().orderIndex.nextMessageId <= 0) { 348 throw new IOException("Detected uninitialized orderIndex nextMessageId with pending messages for " + entry.getKey()); 349 } 350 } 351 } 352 } 353 } 354 }); 355 pageFile.flush(); 356 } finally { 357 this.indexLock.writeLock().unlock(); 358 } 359 } 360 361 private void startCheckpoint() { 362 if (checkpointInterval == 0 && cleanupInterval == 0) { 363 LOG.info("periodic checkpoint/cleanup disabled, will ocurr on clean shutdown/restart"); 364 return; 365 } 366 synchronized (schedulerLock) { 367 if (scheduler == null || scheduler.isShutdown()) { 368 scheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { 369 370 @Override 371 public Thread newThread(Runnable r) { 372 Thread schedulerThread = new Thread(r); 373 374 schedulerThread.setName("ActiveMQ Journal Checkpoint Worker"); 375 schedulerThread.setDaemon(true); 376 377 return schedulerThread; 378 } 379 }); 380 381 // Short intervals for check-point and cleanups 382 long delay; 383 if (journal.isJournalDiskSyncPeriodic()) { 384 delay = Math.min(journalDiskSyncInterval > 0 ? journalDiskSyncInterval : checkpointInterval, 500); 385 } else { 386 delay = Math.min(checkpointInterval > 0 ? checkpointInterval : cleanupInterval, 500); 387 } 388 389 scheduler.scheduleWithFixedDelay(new CheckpointRunner(), 0, delay, TimeUnit.MILLISECONDS); 390 } 391 } 392 } 393 394 private final class CheckpointRunner implements Runnable { 395 396 private long lastCheckpoint = System.currentTimeMillis(); 397 private long lastCleanup = System.currentTimeMillis(); 398 private long lastSync = System.currentTimeMillis(); 399 private Location lastAsyncUpdate = null; 400 401 @Override 402 public void run() { 403 try { 404 // Decide on cleanup vs full checkpoint here. 405 if (opened.get()) { 406 long now = System.currentTimeMillis(); 407 if (journal.isJournalDiskSyncPeriodic() && 408 journalDiskSyncInterval > 0 && (now - lastSync >= journalDiskSyncInterval)) { 409 Location currentUpdate = lastAsyncJournalUpdate.get(); 410 if (currentUpdate != null && !currentUpdate.equals(lastAsyncUpdate)) { 411 lastAsyncUpdate = currentUpdate; 412 if (LOG.isTraceEnabled()) { 413 LOG.trace("Writing trace command to trigger journal sync"); 414 } 415 store(new KahaTraceCommand(), true, null, null); 416 } 417 lastSync = now; 418 } 419 if (cleanupInterval > 0 && (now - lastCleanup >= cleanupInterval)) { 420 checkpointCleanup(true); 421 lastCleanup = now; 422 lastCheckpoint = now; 423 } else if (checkpointInterval > 0 && (now - lastCheckpoint >= checkpointInterval)) { 424 checkpointCleanup(false); 425 lastCheckpoint = now; 426 } 427 } 428 } catch (IOException ioe) { 429 LOG.error("Checkpoint failed", ioe); 430 brokerService.handleIOException(ioe); 431 } catch (Throwable e) { 432 LOG.error("Checkpoint failed", e); 433 brokerService.handleIOException(IOExceptionSupport.create(e)); 434 } 435 } 436 } 437 438 public void open() throws IOException { 439 if( opened.compareAndSet(false, true) ) { 440 getJournal().start(); 441 try { 442 loadPageFile(); 443 } catch (Throwable t) { 444 LOG.warn("Index corrupted. Recovering the index through journal replay. Cause:" + t); 445 if (LOG.isDebugEnabled()) { 446 LOG.debug("Index load failure", t); 447 } 448 // try to recover index 449 try { 450 pageFile.unload(); 451 } catch (Exception ignore) {} 452 if (archiveCorruptedIndex) { 453 pageFile.archive(); 454 } else { 455 pageFile.delete(); 456 } 457 metadata = createMetadata(); 458 //The metadata was recreated after a detect corruption so we need to 459 //reconfigure anything that was configured on the old metadata on startup 460 configureMetadata(); 461 pageFile = null; 462 loadPageFile(); 463 } 464 recover(); 465 startCheckpoint(); 466 } 467 } 468 469 public void load() throws IOException { 470 this.indexLock.writeLock().lock(); 471 IOHelper.mkdirs(directory); 472 try { 473 if (deleteAllMessages) { 474 getJournal().start(); 475 getJournal().delete(); 476 getJournal().close(); 477 journal = null; 478 getPageFile().delete(); 479 LOG.info("Persistence store purged."); 480 deleteAllMessages = false; 481 } 482 483 open(); 484 store(new KahaTraceCommand().setMessage("LOADED " + new Date())); 485 } finally { 486 this.indexLock.writeLock().unlock(); 487 } 488 } 489 490 public void close() throws IOException, InterruptedException { 491 if( opened.compareAndSet(true, false)) { 492 checkpointLock.writeLock().lock(); 493 try { 494 if (metadata.page != null) { 495 checkpointUpdate(true); 496 } 497 pageFile.unload(); 498 metadata = createMetadata(); 499 } finally { 500 checkpointLock.writeLock().unlock(); 501 } 502 journal.close(); 503 synchronized(schedulerLock) { 504 if (scheduler != null) { 505 ThreadPoolUtils.shutdownGraceful(scheduler, -1); 506 scheduler = null; 507 } 508 } 509 // clear the cache and journalSize on shutdown of the store 510 storeCache.clear(); 511 journalSize.set(0); 512 } 513 } 514 515 public void unload() throws IOException, InterruptedException { 516 this.indexLock.writeLock().lock(); 517 try { 518 if( pageFile != null && pageFile.isLoaded() ) { 519 metadata.state = CLOSED_STATE; 520 metadata.firstInProgressTransactionLocation = getInProgressTxLocationRange()[0]; 521 522 if (metadata.page != null) { 523 pageFile.tx().execute(new Transaction.Closure<IOException>() { 524 @Override 525 public void execute(Transaction tx) throws IOException { 526 tx.store(metadata.page, metadataMarshaller, true); 527 } 528 }); 529 } 530 } 531 } finally { 532 this.indexLock.writeLock().unlock(); 533 } 534 close(); 535 } 536 537 // public for testing 538 @SuppressWarnings("rawtypes") 539 public Location[] getInProgressTxLocationRange() { 540 Location[] range = new Location[]{null, null}; 541 synchronized (inflightTransactions) { 542 if (!inflightTransactions.isEmpty()) { 543 for (List<Operation> ops : inflightTransactions.values()) { 544 if (!ops.isEmpty()) { 545 trackMaxAndMin(range, ops); 546 } 547 } 548 } 549 if (!preparedTransactions.isEmpty()) { 550 for (List<Operation> ops : preparedTransactions.values()) { 551 if (!ops.isEmpty()) { 552 trackMaxAndMin(range, ops); 553 } 554 } 555 } 556 } 557 return range; 558 } 559 560 @SuppressWarnings("rawtypes") 561 private void trackMaxAndMin(Location[] range, List<Operation> ops) { 562 Location t = ops.get(0).getLocation(); 563 if (range[0] == null || t.compareTo(range[0]) <= 0) { 564 range[0] = t; 565 } 566 t = ops.get(ops.size() -1).getLocation(); 567 if (range[1] == null || t.compareTo(range[1]) >= 0) { 568 range[1] = t; 569 } 570 } 571 572 class TranInfo { 573 TransactionId id; 574 Location location; 575 576 class opCount { 577 int add; 578 int remove; 579 } 580 HashMap<KahaDestination, opCount> destinationOpCount = new HashMap<KahaDestination, opCount>(); 581 582 @SuppressWarnings("rawtypes") 583 public void track(Operation operation) { 584 if (location == null ) { 585 location = operation.getLocation(); 586 } 587 KahaDestination destination; 588 boolean isAdd = false; 589 if (operation instanceof AddOperation) { 590 AddOperation add = (AddOperation) operation; 591 destination = add.getCommand().getDestination(); 592 isAdd = true; 593 } else { 594 RemoveOperation removeOpperation = (RemoveOperation) operation; 595 destination = removeOpperation.getCommand().getDestination(); 596 } 597 opCount opCount = destinationOpCount.get(destination); 598 if (opCount == null) { 599 opCount = new opCount(); 600 destinationOpCount.put(destination, opCount); 601 } 602 if (isAdd) { 603 opCount.add++; 604 } else { 605 opCount.remove++; 606 } 607 } 608 609 @Override 610 public String toString() { 611 StringBuffer buffer = new StringBuffer(); 612 buffer.append(location).append(";").append(id).append(";\n"); 613 for (Entry<KahaDestination, opCount> op : destinationOpCount.entrySet()) { 614 buffer.append(op.getKey()).append('+').append(op.getValue().add).append(',').append('-').append(op.getValue().remove).append(';'); 615 } 616 return buffer.toString(); 617 } 618 } 619 620 @SuppressWarnings("rawtypes") 621 public String getTransactions() { 622 623 ArrayList<TranInfo> infos = new ArrayList<TranInfo>(); 624 synchronized (inflightTransactions) { 625 if (!inflightTransactions.isEmpty()) { 626 for (Entry<TransactionId, List<Operation>> entry : inflightTransactions.entrySet()) { 627 TranInfo info = new TranInfo(); 628 info.id = entry.getKey(); 629 for (Operation operation : entry.getValue()) { 630 info.track(operation); 631 } 632 infos.add(info); 633 } 634 } 635 } 636 synchronized (preparedTransactions) { 637 if (!preparedTransactions.isEmpty()) { 638 for (Entry<TransactionId, List<Operation>> entry : preparedTransactions.entrySet()) { 639 TranInfo info = new TranInfo(); 640 info.id = entry.getKey(); 641 for (Operation operation : entry.getValue()) { 642 info.track(operation); 643 } 644 infos.add(info); 645 } 646 } 647 } 648 return infos.toString(); 649 } 650 651 /** 652 * Move all the messages that were in the journal into long term storage. We 653 * just replay and do a checkpoint. 654 * 655 * @throws IOException 656 * @throws IOException 657 * @throws IllegalStateException 658 */ 659 private void recover() throws IllegalStateException, IOException { 660 this.indexLock.writeLock().lock(); 661 try { 662 663 long start = System.currentTimeMillis(); 664 Location afterProducerAudit = recoverProducerAudit(); 665 Location afterAckMessageFile = recoverAckMessageFileMap(); 666 Location lastIndoubtPosition = getRecoveryPosition(); 667 668 if (afterProducerAudit != null && afterProducerAudit.equals(metadata.ackMessageFileMapLocation)) { 669 // valid checkpoint, possible recover from afterAckMessageFile 670 afterProducerAudit = null; 671 } 672 Location recoveryPosition = minimum(afterProducerAudit, afterAckMessageFile); 673 recoveryPosition = minimum(recoveryPosition, lastIndoubtPosition); 674 675 if (recoveryPosition != null) { 676 int redoCounter = 0; 677 int dataFileRotationTracker = recoveryPosition.getDataFileId(); 678 LOG.info("Recovering from the journal @" + recoveryPosition); 679 while (recoveryPosition != null) { 680 try { 681 JournalCommand<?> message = load(recoveryPosition); 682 metadata.lastUpdate = recoveryPosition; 683 process(message, recoveryPosition, lastIndoubtPosition); 684 redoCounter++; 685 } catch (IOException failedRecovery) { 686 if (isIgnoreMissingJournalfiles()) { 687 LOG.debug("Failed to recover data at position:" + recoveryPosition, failedRecovery); 688 // track this dud location 689 journal.corruptRecoveryLocation(recoveryPosition); 690 } else { 691 throw new IOException("Failed to recover data at position:" + recoveryPosition, failedRecovery); 692 } 693 } 694 recoveryPosition = journal.getNextLocation(recoveryPosition); 695 // hold on to the minimum number of open files during recovery 696 if (recoveryPosition != null && dataFileRotationTracker != recoveryPosition.getDataFileId()) { 697 dataFileRotationTracker = recoveryPosition.getDataFileId(); 698 journal.cleanup(); 699 } 700 if (LOG.isInfoEnabled() && redoCounter % 100000 == 0) { 701 LOG.info("@" + recoveryPosition + ", " + redoCounter + " entries recovered .."); 702 } 703 } 704 if (LOG.isInfoEnabled()) { 705 long end = System.currentTimeMillis(); 706 LOG.info("Recovery replayed " + redoCounter + " operations from the journal in " + ((end - start) / 1000.0f) + " seconds."); 707 } 708 } 709 710 // We may have to undo some index updates. 711 pageFile.tx().execute(new Transaction.Closure<IOException>() { 712 @Override 713 public void execute(Transaction tx) throws IOException { 714 recoverIndex(tx); 715 } 716 }); 717 718 // rollback any recovered inflight local transactions, and discard any inflight XA transactions. 719 Set<TransactionId> toRollback = new HashSet<TransactionId>(); 720 Set<TransactionId> toDiscard = new HashSet<TransactionId>(); 721 synchronized (inflightTransactions) { 722 for (Iterator<TransactionId> it = inflightTransactions.keySet().iterator(); it.hasNext(); ) { 723 TransactionId id = it.next(); 724 if (id.isLocalTransaction()) { 725 toRollback.add(id); 726 } else { 727 toDiscard.add(id); 728 } 729 } 730 for (TransactionId tx: toRollback) { 731 if (LOG.isDebugEnabled()) { 732 LOG.debug("rolling back recovered indoubt local transaction " + tx); 733 } 734 store(new KahaRollbackCommand().setTransactionInfo(TransactionIdConversion.convertToLocal(tx)), false, null, null); 735 } 736 for (TransactionId tx: toDiscard) { 737 if (LOG.isDebugEnabled()) { 738 LOG.debug("discarding recovered in-flight XA transaction " + tx); 739 } 740 inflightTransactions.remove(tx); 741 } 742 } 743 744 synchronized (preparedTransactions) { 745 for (TransactionId txId : preparedTransactions.keySet()) { 746 LOG.warn("Recovered prepared XA TX: [{}]", txId); 747 } 748 } 749 750 } finally { 751 this.indexLock.writeLock().unlock(); 752 } 753 } 754 755 @SuppressWarnings("unused") 756 private KahaTransactionInfo createLocalTransactionInfo(TransactionId tx) { 757 return TransactionIdConversion.convertToLocal(tx); 758 } 759 760 private Location minimum(Location x, 761 Location y) { 762 Location min = null; 763 if (x != null) { 764 min = x; 765 if (y != null) { 766 int compare = y.compareTo(x); 767 if (compare < 0) { 768 min = y; 769 } 770 } 771 } else { 772 min = y; 773 } 774 return min; 775 } 776 777 private Location recoverProducerAudit() throws IOException { 778 if (metadata.producerSequenceIdTrackerLocation != null) { 779 try { 780 KahaProducerAuditCommand audit = (KahaProducerAuditCommand) load(metadata.producerSequenceIdTrackerLocation); 781 ObjectInputStream objectIn = new ObjectInputStream(audit.getAudit().newInput()); 782 int maxNumProducers = getMaxFailoverProducersToTrack(); 783 int maxAuditDepth = getFailoverProducersAuditDepth(); 784 metadata.producerSequenceIdTracker = (ActiveMQMessageAuditNoSync) objectIn.readObject(); 785 metadata.producerSequenceIdTracker.setAuditDepth(maxAuditDepth); 786 metadata.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxNumProducers); 787 return getNextInitializedLocation(metadata.producerSequenceIdTrackerLocation); 788 } catch (Exception e) { 789 LOG.warn("Cannot recover message audit", e); 790 return journal.getNextLocation(null); 791 } 792 } else { 793 // got no audit stored so got to recreate via replay from start of the journal 794 return journal.getNextLocation(null); 795 } 796 } 797 798 @SuppressWarnings("unchecked") 799 private Location recoverAckMessageFileMap() throws IOException { 800 if (metadata.ackMessageFileMapLocation != null) { 801 try { 802 KahaAckMessageFileMapCommand audit = (KahaAckMessageFileMapCommand) load(metadata.ackMessageFileMapLocation); 803 ObjectInputStream objectIn = new ObjectInputStream(audit.getAckMessageFileMap().newInput()); 804 metadata.ackMessageFileMap = (Map<Integer, Set<Integer>>) objectIn.readObject(); 805 return getNextInitializedLocation(metadata.ackMessageFileMapLocation); 806 } catch (Exception e) { 807 LOG.warn("Cannot recover ackMessageFileMap", e); 808 return journal.getNextLocation(null); 809 } 810 } else { 811 // got no ackMessageFileMap stored so got to recreate via replay from start of the journal 812 return journal.getNextLocation(null); 813 } 814 } 815 816 protected void recoverIndex(Transaction tx) throws IOException { 817 long start = System.currentTimeMillis(); 818 // It is possible index updates got applied before the journal updates.. 819 // in that case we need to removed references to messages that are not in the journal 820 final Location lastAppendLocation = journal.getLastAppendLocation(); 821 long undoCounter=0; 822 823 // Go through all the destinations to see if they have messages past the lastAppendLocation 824 for (String key : storedDestinations.keySet()) { 825 StoredDestination sd = storedDestinations.get(key); 826 827 final ArrayList<Long> matches = new ArrayList<Long>(); 828 // Find all the Locations that are >= than the last Append Location. 829 sd.locationIndex.visit(tx, new BTreeVisitor.GTEVisitor<Location, Long>(lastAppendLocation) { 830 @Override 831 protected void matched(Location key, Long value) { 832 matches.add(value); 833 } 834 }); 835 836 for (Long sequenceId : matches) { 837 MessageKeys keys = sd.orderIndex.remove(tx, sequenceId); 838 if (keys != null) { 839 sd.locationIndex.remove(tx, keys.location); 840 sd.messageIdIndex.remove(tx, keys.messageId); 841 metadata.producerSequenceIdTracker.rollback(keys.messageId); 842 undoCounter++; 843 decrementAndSubSizeToStoreStat(key, keys.location.getSize()); 844 // TODO: do we need to modify the ack positions for the pub sub case? 845 } 846 } 847 } 848 849 if (undoCounter > 0) { 850 // The rolledback operations are basically in flight journal writes. To avoid getting 851 // these the end user should do sync writes to the journal. 852 if (LOG.isInfoEnabled()) { 853 long end = System.currentTimeMillis(); 854 LOG.info("Rolled back " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds."); 855 } 856 } 857 858 undoCounter = 0; 859 start = System.currentTimeMillis(); 860 861 // Lets be extra paranoid here and verify that all the datafiles being referenced 862 // by the indexes still exists. 863 864 final SequenceSet ss = new SequenceSet(); 865 for (StoredDestination sd : storedDestinations.values()) { 866 // Use a visitor to cut down the number of pages that we load 867 sd.locationIndex.visit(tx, new BTreeVisitor<Location, Long>() { 868 int last=-1; 869 870 @Override 871 public boolean isInterestedInKeysBetween(Location first, Location second) { 872 if( first==null ) { 873 return !ss.contains(0, second.getDataFileId()); 874 } else if( second==null ) { 875 return true; 876 } else { 877 return !ss.contains(first.getDataFileId(), second.getDataFileId()); 878 } 879 } 880 881 @Override 882 public void visit(List<Location> keys, List<Long> values) { 883 for (Location l : keys) { 884 int fileId = l.getDataFileId(); 885 if( last != fileId ) { 886 ss.add(fileId); 887 last = fileId; 888 } 889 } 890 } 891 892 }); 893 } 894 HashSet<Integer> missingJournalFiles = new HashSet<Integer>(); 895 while (!ss.isEmpty()) { 896 missingJournalFiles.add((int) ss.removeFirst()); 897 } 898 899 for (Entry<Integer, Set<Integer>> entry : metadata.ackMessageFileMap.entrySet()) { 900 missingJournalFiles.add(entry.getKey()); 901 for (Integer i : entry.getValue()) { 902 missingJournalFiles.add(i); 903 } 904 } 905 906 missingJournalFiles.removeAll(journal.getFileMap().keySet()); 907 908 if (!missingJournalFiles.isEmpty()) { 909 LOG.warn("Some journal files are missing: " + missingJournalFiles); 910 } 911 912 ArrayList<BTreeVisitor.Predicate<Location>> knownCorruption = new ArrayList<BTreeVisitor.Predicate<Location>>(); 913 ArrayList<BTreeVisitor.Predicate<Location>> missingPredicates = new ArrayList<BTreeVisitor.Predicate<Location>>(); 914 for (Integer missing : missingJournalFiles) { 915 missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(missing, 0), new Location(missing + 1, 0))); 916 } 917 918 if (checkForCorruptJournalFiles) { 919 Collection<DataFile> dataFiles = journal.getFileMap().values(); 920 for (DataFile dataFile : dataFiles) { 921 int id = dataFile.getDataFileId(); 922 // eof to next file id 923 missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, dataFile.getLength()), new Location(id + 1, 0))); 924 Sequence seq = dataFile.getCorruptedBlocks().getHead(); 925 while (seq != null) { 926 BTreeVisitor.BetweenVisitor<Location, Long> visitor = 927 new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, (int) seq.getFirst()), new Location(id, (int) seq.getLast() + 1)); 928 missingPredicates.add(visitor); 929 knownCorruption.add(visitor); 930 seq = seq.getNext(); 931 } 932 } 933 } 934 935 if (!missingPredicates.isEmpty()) { 936 for (Entry<String, StoredDestination> sdEntry : storedDestinations.entrySet()) { 937 final StoredDestination sd = sdEntry.getValue(); 938 final LinkedHashMap<Long, Location> matches = new LinkedHashMap<Long, Location>(); 939 sd.locationIndex.visit(tx, new BTreeVisitor.OrVisitor<Location, Long>(missingPredicates) { 940 @Override 941 protected void matched(Location key, Long value) { 942 matches.put(value, key); 943 } 944 }); 945 946 // If some message references are affected by the missing data files... 947 if (!matches.isEmpty()) { 948 949 // We either 'gracefully' recover dropping the missing messages or 950 // we error out. 951 if( ignoreMissingJournalfiles ) { 952 // Update the index to remove the references to the missing data 953 for (Long sequenceId : matches.keySet()) { 954 MessageKeys keys = sd.orderIndex.remove(tx, sequenceId); 955 sd.locationIndex.remove(tx, keys.location); 956 sd.messageIdIndex.remove(tx, keys.messageId); 957 LOG.info("[" + sdEntry.getKey() + "] dropped: " + keys.messageId + " at corrupt location: " + keys.location); 958 undoCounter++; 959 decrementAndSubSizeToStoreStat(sdEntry.getKey(), keys.location.getSize()); 960 // TODO: do we need to modify the ack positions for the pub sub case? 961 } 962 } else { 963 LOG.error("[" + sdEntry.getKey() + "] references corrupt locations: " + matches); 964 throw new IOException("Detected missing/corrupt journal files referenced by:[" + sdEntry.getKey() + "] " +matches.size()+" messages affected."); 965 } 966 } 967 } 968 } 969 970 if (!ignoreMissingJournalfiles) { 971 if (!knownCorruption.isEmpty()) { 972 LOG.error("Detected corrupt journal files. " + knownCorruption); 973 throw new IOException("Detected corrupt journal files. " + knownCorruption); 974 } 975 976 if (!missingJournalFiles.isEmpty()) { 977 LOG.error("Detected missing journal files. " + missingJournalFiles); 978 throw new IOException("Detected missing journal files. " + missingJournalFiles); 979 } 980 } 981 982 if (undoCounter > 0) { 983 // The rolledback operations are basically in flight journal writes. To avoid getting these the end user 984 // should do sync writes to the journal. 985 if (LOG.isInfoEnabled()) { 986 long end = System.currentTimeMillis(); 987 LOG.info("Detected missing/corrupt journal files. Dropped " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds."); 988 } 989 } 990 } 991 992 private Location nextRecoveryPosition; 993 private Location lastRecoveryPosition; 994 995 public void incrementalRecover() throws IOException { 996 this.indexLock.writeLock().lock(); 997 try { 998 if( nextRecoveryPosition == null ) { 999 if( lastRecoveryPosition==null ) { 1000 nextRecoveryPosition = getRecoveryPosition(); 1001 } else { 1002 nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition); 1003 } 1004 } 1005 while (nextRecoveryPosition != null) { 1006 lastRecoveryPosition = nextRecoveryPosition; 1007 metadata.lastUpdate = lastRecoveryPosition; 1008 JournalCommand<?> message = load(lastRecoveryPosition); 1009 process(message, lastRecoveryPosition, (IndexAware) null); 1010 nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition); 1011 } 1012 } finally { 1013 this.indexLock.writeLock().unlock(); 1014 } 1015 } 1016 1017 public Location getLastUpdatePosition() throws IOException { 1018 return metadata.lastUpdate; 1019 } 1020 1021 private Location getRecoveryPosition() throws IOException { 1022 1023 if (!this.forceRecoverIndex) { 1024 1025 // If we need to recover the transactions.. 1026 if (metadata.firstInProgressTransactionLocation != null) { 1027 return metadata.firstInProgressTransactionLocation; 1028 } 1029 1030 // Perhaps there were no transactions... 1031 if( metadata.lastUpdate!=null) { 1032 // Start replay at the record after the last one recorded in the index file. 1033 return getNextInitializedLocation(metadata.lastUpdate); 1034 } 1035 } 1036 // This loads the first position. 1037 return journal.getNextLocation(null); 1038 } 1039 1040 private Location getNextInitializedLocation(Location location) throws IOException { 1041 Location mayNotBeInitialized = journal.getNextLocation(location); 1042 if (location.getSize() == NOT_SET && mayNotBeInitialized.getSize() != NOT_SET) { 1043 // need to init size and type to skip 1044 return journal.getNextLocation(mayNotBeInitialized); 1045 } else { 1046 return mayNotBeInitialized; 1047 } 1048 } 1049 1050 protected void checkpointCleanup(final boolean cleanup) throws IOException { 1051 long start; 1052 this.indexLock.writeLock().lock(); 1053 try { 1054 start = System.currentTimeMillis(); 1055 if( !opened.get() ) { 1056 return; 1057 } 1058 } finally { 1059 this.indexLock.writeLock().unlock(); 1060 } 1061 checkpointUpdate(cleanup); 1062 long end = System.currentTimeMillis(); 1063 if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) { 1064 if (LOG.isInfoEnabled()) { 1065 LOG.info("Slow KahaDB access: cleanup took " + (end - start)); 1066 } 1067 } 1068 } 1069 1070 public ByteSequence toByteSequence(JournalCommand<?> data) throws IOException { 1071 int size = data.serializedSizeFramed(); 1072 DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1); 1073 os.writeByte(data.type().getNumber()); 1074 data.writeFramed(os); 1075 return os.toByteSequence(); 1076 } 1077 1078 // ///////////////////////////////////////////////////////////////// 1079 // Methods call by the broker to update and query the store. 1080 // ///////////////////////////////////////////////////////////////// 1081 public Location store(JournalCommand<?> data) throws IOException { 1082 return store(data, false, null,null); 1083 } 1084 1085 public Location store(JournalCommand<?> data, Runnable onJournalStoreComplete) throws IOException { 1086 return store(data, false, null, null, onJournalStoreComplete); 1087 } 1088 1089 public Location store(JournalCommand<?> data, boolean sync, IndexAware before,Runnable after) throws IOException { 1090 return store(data, sync, before, after, null); 1091 } 1092 1093 /** 1094 * All updated are are funneled through this method. The updates are converted 1095 * to a JournalMessage which is logged to the journal and then the data from 1096 * the JournalMessage is used to update the index just like it would be done 1097 * during a recovery process. 1098 */ 1099 public Location store(JournalCommand<?> data, boolean sync, IndexAware before, Runnable after, Runnable onJournalStoreComplete) throws IOException { 1100 try { 1101 ByteSequence sequence = toByteSequence(data); 1102 Location location; 1103 1104 checkpointLock.readLock().lock(); 1105 try { 1106 1107 long start = System.currentTimeMillis(); 1108 location = onJournalStoreComplete == null ? journal.write(sequence, sync) : journal.write(sequence, onJournalStoreComplete) ; 1109 long start2 = System.currentTimeMillis(); 1110 //Track the last async update so we know if we need to sync at the next checkpoint 1111 if (!sync && journal.isJournalDiskSyncPeriodic()) { 1112 lastAsyncJournalUpdate.set(location); 1113 } 1114 process(data, location, before); 1115 1116 long end = System.currentTimeMillis(); 1117 if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) { 1118 if (LOG.isInfoEnabled()) { 1119 LOG.info("Slow KahaDB access: Journal append took: "+(start2-start)+" ms, Index update took "+(end-start2)+" ms"); 1120 } 1121 } 1122 } finally { 1123 checkpointLock.readLock().unlock(); 1124 } 1125 1126 if (after != null) { 1127 after.run(); 1128 } 1129 1130 if (scheduler == null && opened.get()) { 1131 startCheckpoint(); 1132 } 1133 return location; 1134 } catch (IOException ioe) { 1135 LOG.error("KahaDB failed to store to Journal", ioe); 1136 brokerService.handleIOException(ioe); 1137 throw ioe; 1138 } 1139 } 1140 1141 /** 1142 * Loads a previously stored JournalMessage 1143 * 1144 * @param location 1145 * @return 1146 * @throws IOException 1147 */ 1148 public JournalCommand<?> load(Location location) throws IOException { 1149 long start = System.currentTimeMillis(); 1150 ByteSequence data = journal.read(location); 1151 long end = System.currentTimeMillis(); 1152 if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) { 1153 if (LOG.isInfoEnabled()) { 1154 LOG.info("Slow KahaDB access: Journal read took: "+(end-start)+" ms"); 1155 } 1156 } 1157 DataByteArrayInputStream is = new DataByteArrayInputStream(data); 1158 byte readByte = is.readByte(); 1159 KahaEntryType type = KahaEntryType.valueOf(readByte); 1160 if( type == null ) { 1161 try { 1162 is.close(); 1163 } catch (IOException e) {} 1164 throw new IOException("Could not load journal record. Invalid location: "+location); 1165 } 1166 JournalCommand<?> message = (JournalCommand<?>)type.createMessage(); 1167 message.mergeFramed(is); 1168 return message; 1169 } 1170 1171 /** 1172 * do minimal recovery till we reach the last inDoubtLocation 1173 * @param data 1174 * @param location 1175 * @param inDoubtlocation 1176 * @throws IOException 1177 */ 1178 void process(JournalCommand<?> data, final Location location, final Location inDoubtlocation) throws IOException { 1179 if (inDoubtlocation != null && location.compareTo(inDoubtlocation) >= 0) { 1180 process(data, location, (IndexAware) null); 1181 } else { 1182 // just recover producer audit 1183 data.visit(new Visitor() { 1184 @Override 1185 public void visit(KahaAddMessageCommand command) throws IOException { 1186 metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId()); 1187 } 1188 }); 1189 } 1190 } 1191 1192 // ///////////////////////////////////////////////////////////////// 1193 // Journaled record processing methods. Once the record is journaled, 1194 // these methods handle applying the index updates. These may be called 1195 // from the recovery method too so they need to be idempotent 1196 // ///////////////////////////////////////////////////////////////// 1197 1198 void process(JournalCommand<?> data, final Location location, final IndexAware onSequenceAssignedCallback) throws IOException { 1199 data.visit(new Visitor() { 1200 @Override 1201 public void visit(KahaAddMessageCommand command) throws IOException { 1202 process(command, location, onSequenceAssignedCallback); 1203 } 1204 1205 @Override 1206 public void visit(KahaRemoveMessageCommand command) throws IOException { 1207 process(command, location); 1208 } 1209 1210 @Override 1211 public void visit(KahaPrepareCommand command) throws IOException { 1212 process(command, location); 1213 } 1214 1215 @Override 1216 public void visit(KahaCommitCommand command) throws IOException { 1217 process(command, location, onSequenceAssignedCallback); 1218 } 1219 1220 @Override 1221 public void visit(KahaRollbackCommand command) throws IOException { 1222 process(command, location); 1223 } 1224 1225 @Override 1226 public void visit(KahaRemoveDestinationCommand command) throws IOException { 1227 process(command, location); 1228 } 1229 1230 @Override 1231 public void visit(KahaSubscriptionCommand command) throws IOException { 1232 process(command, location); 1233 } 1234 1235 @Override 1236 public void visit(KahaProducerAuditCommand command) throws IOException { 1237 processLocation(location); 1238 } 1239 1240 @Override 1241 public void visit(KahaAckMessageFileMapCommand command) throws IOException { 1242 processLocation(location); 1243 } 1244 1245 @Override 1246 public void visit(KahaTraceCommand command) { 1247 processLocation(location); 1248 } 1249 1250 @Override 1251 public void visit(KahaUpdateMessageCommand command) throws IOException { 1252 process(command, location); 1253 } 1254 1255 @Override 1256 public void visit(KahaRewrittenDataFileCommand command) throws IOException { 1257 process(command, location); 1258 } 1259 }); 1260 } 1261 1262 @SuppressWarnings("rawtypes") 1263 protected void process(final KahaAddMessageCommand command, final Location location, final IndexAware runWithIndexLock) throws IOException { 1264 if (command.hasTransactionInfo()) { 1265 List<Operation> inflightTx = getInflightTx(command.getTransactionInfo()); 1266 inflightTx.add(new AddOperation(command, location, runWithIndexLock)); 1267 } else { 1268 this.indexLock.writeLock().lock(); 1269 try { 1270 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1271 @Override 1272 public void execute(Transaction tx) throws IOException { 1273 long assignedIndex = updateIndex(tx, command, location); 1274 if (runWithIndexLock != null) { 1275 runWithIndexLock.sequenceAssignedWithIndexLocked(assignedIndex); 1276 } 1277 } 1278 }); 1279 1280 } finally { 1281 this.indexLock.writeLock().unlock(); 1282 } 1283 } 1284 } 1285 1286 protected void process(final KahaUpdateMessageCommand command, final Location location) throws IOException { 1287 this.indexLock.writeLock().lock(); 1288 try { 1289 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1290 @Override 1291 public void execute(Transaction tx) throws IOException { 1292 updateIndex(tx, command, location); 1293 } 1294 }); 1295 } finally { 1296 this.indexLock.writeLock().unlock(); 1297 } 1298 } 1299 1300 @SuppressWarnings("rawtypes") 1301 protected void process(final KahaRemoveMessageCommand command, final Location location) throws IOException { 1302 if (command.hasTransactionInfo()) { 1303 List<Operation> inflightTx = getInflightTx(command.getTransactionInfo()); 1304 inflightTx.add(new RemoveOperation(command, location)); 1305 } else { 1306 this.indexLock.writeLock().lock(); 1307 try { 1308 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1309 @Override 1310 public void execute(Transaction tx) throws IOException { 1311 updateIndex(tx, command, location); 1312 } 1313 }); 1314 } finally { 1315 this.indexLock.writeLock().unlock(); 1316 } 1317 } 1318 } 1319 1320 protected void process(final KahaRemoveDestinationCommand command, final Location location) throws IOException { 1321 this.indexLock.writeLock().lock(); 1322 try { 1323 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1324 @Override 1325 public void execute(Transaction tx) throws IOException { 1326 updateIndex(tx, command, location); 1327 } 1328 }); 1329 } finally { 1330 this.indexLock.writeLock().unlock(); 1331 } 1332 } 1333 1334 protected void process(final KahaSubscriptionCommand command, final Location location) throws IOException { 1335 this.indexLock.writeLock().lock(); 1336 try { 1337 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1338 @Override 1339 public void execute(Transaction tx) throws IOException { 1340 updateIndex(tx, command, location); 1341 } 1342 }); 1343 } finally { 1344 this.indexLock.writeLock().unlock(); 1345 } 1346 } 1347 1348 protected void processLocation(final Location location) { 1349 this.indexLock.writeLock().lock(); 1350 try { 1351 metadata.lastUpdate = location; 1352 } finally { 1353 this.indexLock.writeLock().unlock(); 1354 } 1355 } 1356 1357 @SuppressWarnings("rawtypes") 1358 protected void process(KahaCommitCommand command, final Location location, final IndexAware before) throws IOException { 1359 TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo()); 1360 List<Operation> inflightTx; 1361 synchronized (inflightTransactions) { 1362 inflightTx = inflightTransactions.remove(key); 1363 if (inflightTx == null) { 1364 inflightTx = preparedTransactions.remove(key); 1365 } 1366 } 1367 if (inflightTx == null) { 1368 // only non persistent messages in this tx 1369 if (before != null) { 1370 before.sequenceAssignedWithIndexLocked(-1); 1371 } 1372 return; 1373 } 1374 1375 final List<Operation> messagingTx = inflightTx; 1376 indexLock.writeLock().lock(); 1377 try { 1378 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1379 @Override 1380 public void execute(Transaction tx) throws IOException { 1381 for (Operation op : messagingTx) { 1382 op.execute(tx); 1383 } 1384 } 1385 }); 1386 metadata.lastUpdate = location; 1387 } finally { 1388 indexLock.writeLock().unlock(); 1389 } 1390 } 1391 1392 @SuppressWarnings("rawtypes") 1393 protected void process(KahaPrepareCommand command, Location location) { 1394 TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo()); 1395 synchronized (inflightTransactions) { 1396 List<Operation> tx = inflightTransactions.remove(key); 1397 if (tx != null) { 1398 preparedTransactions.put(key, tx); 1399 } 1400 } 1401 } 1402 1403 @SuppressWarnings("rawtypes") 1404 protected void process(KahaRollbackCommand command, Location location) throws IOException { 1405 TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo()); 1406 List<Operation> updates = null; 1407 synchronized (inflightTransactions) { 1408 updates = inflightTransactions.remove(key); 1409 if (updates == null) { 1410 updates = preparedTransactions.remove(key); 1411 } 1412 } 1413 } 1414 1415 protected void process(KahaRewrittenDataFileCommand command, Location location) throws IOException { 1416 final TreeSet<Integer> completeFileSet = new TreeSet<Integer>(journal.getFileMap().keySet()); 1417 1418 // Mark the current journal file as a compacted file so that gc checks can skip 1419 // over logs that are smaller compaction type logs. 1420 DataFile current = journal.getDataFileById(location.getDataFileId()); 1421 current.setTypeCode(command.getRewriteType()); 1422 1423 if (completeFileSet.contains(command.getSourceDataFileId()) && command.getSkipIfSourceExists()) { 1424 // Move offset so that next location read jumps to next file. 1425 location.setOffset(journalMaxFileLength); 1426 } 1427 } 1428 1429 // ///////////////////////////////////////////////////////////////// 1430 // These methods do the actual index updates. 1431 // ///////////////////////////////////////////////////////////////// 1432 1433 protected final ReentrantReadWriteLock indexLock = new ReentrantReadWriteLock(); 1434 private final HashSet<Integer> journalFilesBeingReplicated = new HashSet<Integer>(); 1435 1436 long updateIndex(Transaction tx, KahaAddMessageCommand command, Location location) throws IOException { 1437 StoredDestination sd = getStoredDestination(command.getDestination(), tx); 1438 1439 // Skip adding the message to the index if this is a topic and there are 1440 // no subscriptions. 1441 if (sd.subscriptions != null && sd.subscriptions.isEmpty(tx)) { 1442 return -1; 1443 } 1444 1445 // Add the message. 1446 int priority = command.getPrioritySupported() ? command.getPriority() : javax.jms.Message.DEFAULT_PRIORITY; 1447 long id = sd.orderIndex.getNextMessageId(); 1448 Long previous = sd.locationIndex.put(tx, location, id); 1449 if (previous == null) { 1450 previous = sd.messageIdIndex.put(tx, command.getMessageId(), id); 1451 if (previous == null) { 1452 incrementAndAddSizeToStoreStat(command.getDestination(), location.getSize()); 1453 sd.orderIndex.put(tx, priority, id, new MessageKeys(command.getMessageId(), location)); 1454 if (sd.subscriptions != null && !sd.subscriptions.isEmpty(tx)) { 1455 addAckLocationForNewMessage(tx, command.getDestination(), sd, id); 1456 } 1457 metadata.lastUpdate = location; 1458 } else { 1459 1460 MessageKeys messageKeys = sd.orderIndex.get(tx, previous); 1461 if (messageKeys != null && messageKeys.location.compareTo(location) < 0) { 1462 // If the message ID is indexed, then the broker asked us to store a duplicate before the message was dispatched and acked, we ignore this add attempt 1463 LOG.warn("Duplicate message add attempt rejected. Destination: {}://{}, Message id: {}", command.getDestination().getType(), command.getDestination().getName(), command.getMessageId()); 1464 } 1465 sd.messageIdIndex.put(tx, command.getMessageId(), previous); 1466 sd.locationIndex.remove(tx, location); 1467 id = -1; 1468 } 1469 } else { 1470 // restore the previous value.. Looks like this was a redo of a previously 1471 // added message. We don't want to assign it a new id as the other indexes would 1472 // be wrong.. 1473 sd.locationIndex.put(tx, location, previous); 1474 // ensure sequence is not broken 1475 sd.orderIndex.revertNextMessageId(); 1476 metadata.lastUpdate = location; 1477 } 1478 // record this id in any event, initial send or recovery 1479 metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId()); 1480 1481 return id; 1482 } 1483 1484 void trackPendingAdd(KahaDestination destination, Long seq) { 1485 StoredDestination sd = storedDestinations.get(key(destination)); 1486 if (sd != null) { 1487 sd.trackPendingAdd(seq); 1488 } 1489 } 1490 1491 void trackPendingAddComplete(KahaDestination destination, Long seq) { 1492 StoredDestination sd = storedDestinations.get(key(destination)); 1493 if (sd != null) { 1494 sd.trackPendingAddComplete(seq); 1495 } 1496 } 1497 1498 void updateIndex(Transaction tx, KahaUpdateMessageCommand updateMessageCommand, Location location) throws IOException { 1499 KahaAddMessageCommand command = updateMessageCommand.getMessage(); 1500 StoredDestination sd = getStoredDestination(command.getDestination(), tx); 1501 1502 Long id = sd.messageIdIndex.get(tx, command.getMessageId()); 1503 if (id != null) { 1504 MessageKeys previousKeys = sd.orderIndex.put( 1505 tx, 1506 command.getPrioritySupported() ? command.getPriority() : javax.jms.Message.DEFAULT_PRIORITY, 1507 id, 1508 new MessageKeys(command.getMessageId(), location) 1509 ); 1510 sd.locationIndex.put(tx, location, id); 1511 incrementAndAddSizeToStoreStat(command.getDestination(), location.getSize()); 1512 1513 if (previousKeys != null) { 1514 //Remove the existing from the size 1515 decrementAndSubSizeToStoreStat(command.getDestination(), previousKeys.location.getSize()); 1516 1517 //update all the subscription metrics 1518 if (enableSubscriptionStatistics && location.getSize() != previousKeys.location.getSize()) { 1519 Iterator<Entry<String, SequenceSet>> iter = sd.ackPositions.iterator(tx); 1520 while (iter.hasNext()) { 1521 Entry<String, SequenceSet> e = iter.next(); 1522 if (e.getValue().contains(id)) { 1523 incrementAndAddSizeToStoreStat(key(command.getDestination()), e.getKey(), location.getSize()); 1524 decrementAndSubSizeToStoreStat(key(command.getDestination()), e.getKey(), previousKeys.location.getSize()); 1525 } 1526 } 1527 } 1528 1529 // on first update previous is original location, on recovery/replay it may be the updated location 1530 if(!previousKeys.location.equals(location)) { 1531 sd.locationIndex.remove(tx, previousKeys.location); 1532 } 1533 } 1534 metadata.lastUpdate = location; 1535 } else { 1536 //Add the message if it can't be found 1537 this.updateIndex(tx, command, location); 1538 } 1539 } 1540 1541 void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location ackLocation) throws IOException { 1542 StoredDestination sd = getStoredDestination(command.getDestination(), tx); 1543 if (!command.hasSubscriptionKey()) { 1544 1545 // In the queue case we just remove the message from the index.. 1546 Long sequenceId = sd.messageIdIndex.remove(tx, command.getMessageId()); 1547 if (sequenceId != null) { 1548 MessageKeys keys = sd.orderIndex.remove(tx, sequenceId); 1549 if (keys != null) { 1550 sd.locationIndex.remove(tx, keys.location); 1551 decrementAndSubSizeToStoreStat(command.getDestination(), keys.location.getSize()); 1552 recordAckMessageReferenceLocation(ackLocation, keys.location); 1553 metadata.lastUpdate = ackLocation; 1554 } else if (LOG.isDebugEnabled()) { 1555 LOG.debug("message not found in order index: " + sequenceId + " for: " + command.getMessageId()); 1556 } 1557 } else if (LOG.isDebugEnabled()) { 1558 LOG.debug("message not found in sequence id index: " + command.getMessageId()); 1559 } 1560 } else { 1561 // In the topic case we need remove the message once it's been acked 1562 // by all the subs 1563 Long sequence = sd.messageIdIndex.get(tx, command.getMessageId()); 1564 1565 // Make sure it's a valid message id... 1566 if (sequence != null) { 1567 String subscriptionKey = command.getSubscriptionKey(); 1568 if (command.getAck() != UNMATCHED) { 1569 sd.orderIndex.get(tx, sequence); 1570 byte priority = sd.orderIndex.lastGetPriority(); 1571 sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(sequence, priority)); 1572 } 1573 1574 MessageKeys keys = sd.orderIndex.get(tx, sequence); 1575 if (keys != null) { 1576 recordAckMessageReferenceLocation(ackLocation, keys.location); 1577 } 1578 // The following method handles deleting un-referenced messages. 1579 removeAckLocation(command, tx, sd, subscriptionKey, sequence); 1580 metadata.lastUpdate = ackLocation; 1581 } else if (LOG.isDebugEnabled()) { 1582 LOG.debug("on ack, no message sequence exists for id: " + command.getMessageId() + " and sub: " + command.getSubscriptionKey()); 1583 } 1584 1585 } 1586 } 1587 1588 private void recordAckMessageReferenceLocation(Location ackLocation, Location messageLocation) { 1589 Set<Integer> referenceFileIds = metadata.ackMessageFileMap.get(Integer.valueOf(ackLocation.getDataFileId())); 1590 if (referenceFileIds == null) { 1591 referenceFileIds = new HashSet<Integer>(); 1592 referenceFileIds.add(messageLocation.getDataFileId()); 1593 metadata.ackMessageFileMap.put(ackLocation.getDataFileId(), referenceFileIds); 1594 } else { 1595 Integer id = Integer.valueOf(messageLocation.getDataFileId()); 1596 if (!referenceFileIds.contains(id)) { 1597 referenceFileIds.add(id); 1598 } 1599 } 1600 } 1601 1602 void updateIndex(Transaction tx, KahaRemoveDestinationCommand command, Location location) throws IOException { 1603 StoredDestination sd = getStoredDestination(command.getDestination(), tx); 1604 sd.orderIndex.remove(tx); 1605 1606 sd.locationIndex.clear(tx); 1607 sd.locationIndex.unload(tx); 1608 tx.free(sd.locationIndex.getPageId()); 1609 1610 sd.messageIdIndex.clear(tx); 1611 sd.messageIdIndex.unload(tx); 1612 tx.free(sd.messageIdIndex.getPageId()); 1613 1614 if (sd.subscriptions != null) { 1615 sd.subscriptions.clear(tx); 1616 sd.subscriptions.unload(tx); 1617 tx.free(sd.subscriptions.getPageId()); 1618 1619 sd.subscriptionAcks.clear(tx); 1620 sd.subscriptionAcks.unload(tx); 1621 tx.free(sd.subscriptionAcks.getPageId()); 1622 1623 sd.ackPositions.clear(tx); 1624 sd.ackPositions.unload(tx); 1625 tx.free(sd.ackPositions.getHeadPageId()); 1626 1627 sd.subLocations.clear(tx); 1628 sd.subLocations.unload(tx); 1629 tx.free(sd.subLocations.getHeadPageId()); 1630 } 1631 1632 String key = key(command.getDestination()); 1633 storedDestinations.remove(key); 1634 metadata.destinations.remove(tx, key); 1635 clearStoreStats(command.getDestination()); 1636 storeCache.remove(key(command.getDestination())); 1637 } 1638 1639 void updateIndex(Transaction tx, KahaSubscriptionCommand command, Location location) throws IOException { 1640 StoredDestination sd = getStoredDestination(command.getDestination(), tx); 1641 final String subscriptionKey = command.getSubscriptionKey(); 1642 1643 // If set then we are creating it.. otherwise we are destroying the sub 1644 if (command.hasSubscriptionInfo()) { 1645 Location existing = sd.subLocations.get(tx, subscriptionKey); 1646 if (existing != null && existing.compareTo(location) == 0) { 1647 // replay on recovery, ignore 1648 LOG.trace("ignoring journal replay of replay of sub from: " + location); 1649 return; 1650 } 1651 1652 sd.subscriptions.put(tx, subscriptionKey, command); 1653 sd.subLocations.put(tx, subscriptionKey, location); 1654 long ackLocation=NOT_ACKED; 1655 if (!command.getRetroactive()) { 1656 ackLocation = sd.orderIndex.nextMessageId-1; 1657 } else { 1658 addAckLocationForRetroactiveSub(tx, sd, subscriptionKey); 1659 } 1660 sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(ackLocation)); 1661 sd.subscriptionCache.add(subscriptionKey); 1662 } else { 1663 // delete the sub... 1664 sd.subscriptions.remove(tx, subscriptionKey); 1665 sd.subLocations.remove(tx, subscriptionKey); 1666 sd.subscriptionAcks.remove(tx, subscriptionKey); 1667 sd.subscriptionCache.remove(subscriptionKey); 1668 removeAckLocationsForSub(command, tx, sd, subscriptionKey); 1669 MessageStoreSubscriptionStatistics subStats = getSubStats(key(command.getDestination())); 1670 if (subStats != null) { 1671 subStats.removeSubscription(subscriptionKey); 1672 } 1673 1674 if (sd.subscriptions.isEmpty(tx)) { 1675 // remove the stored destination 1676 KahaRemoveDestinationCommand removeDestinationCommand = new KahaRemoveDestinationCommand(); 1677 removeDestinationCommand.setDestination(command.getDestination()); 1678 updateIndex(tx, removeDestinationCommand, null); 1679 clearStoreStats(command.getDestination()); 1680 } 1681 } 1682 } 1683 1684 private void checkpointUpdate(final boolean cleanup) throws IOException { 1685 checkpointLock.writeLock().lock(); 1686 try { 1687 this.indexLock.writeLock().lock(); 1688 try { 1689 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1690 @Override 1691 public void execute(Transaction tx) throws IOException { 1692 checkpointUpdate(tx, cleanup); 1693 } 1694 }); 1695 } finally { 1696 this.indexLock.writeLock().unlock(); 1697 } 1698 1699 } finally { 1700 checkpointLock.writeLock().unlock(); 1701 } 1702 } 1703 1704 /** 1705 * @param tx 1706 * @throws IOException 1707 */ 1708 void checkpointUpdate(Transaction tx, boolean cleanup) throws IOException { 1709 MDC.put("activemq.persistenceDir", getDirectory().getName()); 1710 LOG.debug("Checkpoint started."); 1711 1712 // reflect last update exclusive of current checkpoint 1713 Location lastUpdate = metadata.lastUpdate; 1714 1715 metadata.state = OPEN_STATE; 1716 metadata.producerSequenceIdTrackerLocation = checkpointProducerAudit(); 1717 metadata.ackMessageFileMapLocation = checkpointAckMessageFileMap(); 1718 Location[] inProgressTxRange = getInProgressTxLocationRange(); 1719 metadata.firstInProgressTransactionLocation = inProgressTxRange[0]; 1720 tx.store(metadata.page, metadataMarshaller, true); 1721 pageFile.flush(); 1722 1723 if (cleanup) { 1724 1725 final TreeSet<Integer> completeFileSet = new TreeSet<Integer>(journal.getFileMap().keySet()); 1726 final TreeSet<Integer> gcCandidateSet = new TreeSet<Integer>(completeFileSet); 1727 1728 if (LOG.isTraceEnabled()) { 1729 LOG.trace("Last update: " + lastUpdate + ", full gc candidates set: " + gcCandidateSet); 1730 } 1731 1732 if (lastUpdate != null) { 1733 gcCandidateSet.remove(lastUpdate.getDataFileId()); 1734 } 1735 1736 // Don't GC files under replication 1737 if( journalFilesBeingReplicated!=null ) { 1738 gcCandidateSet.removeAll(journalFilesBeingReplicated); 1739 } 1740 1741 if (metadata.producerSequenceIdTrackerLocation != null) { 1742 int dataFileId = metadata.producerSequenceIdTrackerLocation.getDataFileId(); 1743 if (gcCandidateSet.contains(dataFileId) && gcCandidateSet.first() == dataFileId) { 1744 // rewrite so we don't prevent gc 1745 metadata.producerSequenceIdTracker.setModified(true); 1746 if (LOG.isTraceEnabled()) { 1747 LOG.trace("rewriting producerSequenceIdTracker:" + metadata.producerSequenceIdTrackerLocation); 1748 } 1749 } 1750 gcCandidateSet.remove(dataFileId); 1751 if (LOG.isTraceEnabled()) { 1752 LOG.trace("gc candidates after producerSequenceIdTrackerLocation:" + dataFileId + ", " + gcCandidateSet); 1753 } 1754 } 1755 1756 if (metadata.ackMessageFileMapLocation != null) { 1757 int dataFileId = metadata.ackMessageFileMapLocation.getDataFileId(); 1758 gcCandidateSet.remove(dataFileId); 1759 if (LOG.isTraceEnabled()) { 1760 LOG.trace("gc candidates after ackMessageFileMapLocation:" + dataFileId + ", " + gcCandidateSet); 1761 } 1762 } 1763 1764 // Don't GC files referenced by in-progress tx 1765 if (inProgressTxRange[0] != null) { 1766 for (int pendingTx=inProgressTxRange[0].getDataFileId(); pendingTx <= inProgressTxRange[1].getDataFileId(); pendingTx++) { 1767 gcCandidateSet.remove(pendingTx); 1768 } 1769 } 1770 if (LOG.isTraceEnabled()) { 1771 LOG.trace("gc candidates after tx range:" + Arrays.asList(inProgressTxRange) + ", " + gcCandidateSet); 1772 } 1773 1774 // Go through all the destinations to see if any of them can remove GC candidates. 1775 for (Entry<String, StoredDestination> entry : storedDestinations.entrySet()) { 1776 if( gcCandidateSet.isEmpty() ) { 1777 break; 1778 } 1779 1780 // Use a visitor to cut down the number of pages that we load 1781 entry.getValue().locationIndex.visit(tx, new BTreeVisitor<Location, Long>() { 1782 int last=-1; 1783 @Override 1784 public boolean isInterestedInKeysBetween(Location first, Location second) { 1785 if( first==null ) { 1786 SortedSet<Integer> subset = gcCandidateSet.headSet(second.getDataFileId()+1); 1787 if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) { 1788 subset.remove(second.getDataFileId()); 1789 } 1790 return !subset.isEmpty(); 1791 } else if( second==null ) { 1792 SortedSet<Integer> subset = gcCandidateSet.tailSet(first.getDataFileId()); 1793 if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) { 1794 subset.remove(first.getDataFileId()); 1795 } 1796 return !subset.isEmpty(); 1797 } else { 1798 SortedSet<Integer> subset = gcCandidateSet.subSet(first.getDataFileId(), second.getDataFileId()+1); 1799 if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) { 1800 subset.remove(first.getDataFileId()); 1801 } 1802 if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) { 1803 subset.remove(second.getDataFileId()); 1804 } 1805 return !subset.isEmpty(); 1806 } 1807 } 1808 1809 @Override 1810 public void visit(List<Location> keys, List<Long> values) { 1811 for (Location l : keys) { 1812 int fileId = l.getDataFileId(); 1813 if( last != fileId ) { 1814 gcCandidateSet.remove(fileId); 1815 last = fileId; 1816 } 1817 } 1818 } 1819 }); 1820 1821 // Durable Subscription 1822 if (entry.getValue().subLocations != null) { 1823 Iterator<Entry<String, Location>> iter = entry.getValue().subLocations.iterator(tx); 1824 while (iter.hasNext()) { 1825 Entry<String, Location> subscription = iter.next(); 1826 int dataFileId = subscription.getValue().getDataFileId(); 1827 1828 // Move subscription along if it has no outstanding messages that need ack'd 1829 // and its in the last log file in the journal. 1830 if (!gcCandidateSet.isEmpty() && gcCandidateSet.first() == dataFileId) { 1831 final StoredDestination destination = entry.getValue(); 1832 final String subscriptionKey = subscription.getKey(); 1833 SequenceSet pendingAcks = destination.ackPositions.get(tx, subscriptionKey); 1834 1835 // When pending is size one that is the next message Id meaning there 1836 // are no pending messages currently. 1837 if (pendingAcks == null || pendingAcks.isEmpty() || 1838 (pendingAcks.size() == 1 && pendingAcks.getTail().range() == 1)) { 1839 1840 if (LOG.isTraceEnabled()) { 1841 LOG.trace("Found candidate for rewrite: {} from file {}", entry.getKey(), dataFileId); 1842 } 1843 1844 final KahaSubscriptionCommand kahaSub = 1845 destination.subscriptions.get(tx, subscriptionKey); 1846 destination.subLocations.put( 1847 tx, subscriptionKey, checkpointSubscriptionCommand(kahaSub)); 1848 1849 // Skips the remove from candidates if we rewrote the subscription 1850 // in order to prevent duplicate subscription commands on recover. 1851 // If another subscription is on the same file and isn't rewritten 1852 // than it will remove the file from the set. 1853 continue; 1854 } 1855 } 1856 1857 gcCandidateSet.remove(dataFileId); 1858 } 1859 } 1860 1861 if (LOG.isTraceEnabled()) { 1862 LOG.trace("gc candidates after dest:" + entry.getKey() + ", " + gcCandidateSet); 1863 } 1864 } 1865 1866 // check we are not deleting file with ack for in-use journal files 1867 if (LOG.isTraceEnabled()) { 1868 LOG.trace("gc candidates: " + gcCandidateSet); 1869 LOG.trace("ackMessageFileMap: " + metadata.ackMessageFileMap); 1870 } 1871 1872 boolean ackMessageFileMapMod = false; 1873 Iterator<Integer> candidates = gcCandidateSet.iterator(); 1874 while (candidates.hasNext()) { 1875 Integer candidate = candidates.next(); 1876 Set<Integer> referencedFileIds = metadata.ackMessageFileMap.get(candidate); 1877 if (referencedFileIds != null) { 1878 for (Integer referencedFileId : referencedFileIds) { 1879 if (completeFileSet.contains(referencedFileId) && !gcCandidateSet.contains(referencedFileId)) { 1880 // active file that is not targeted for deletion is referenced so don't delete 1881 candidates.remove(); 1882 break; 1883 } 1884 } 1885 if (gcCandidateSet.contains(candidate)) { 1886 ackMessageFileMapMod |= (metadata.ackMessageFileMap.remove(candidate) != null); 1887 } else { 1888 if (LOG.isTraceEnabled()) { 1889 LOG.trace("not removing data file: " + candidate 1890 + " as contained ack(s) refer to referenced file: " + referencedFileIds); 1891 } 1892 } 1893 } 1894 } 1895 1896 if (!gcCandidateSet.isEmpty()) { 1897 LOG.debug("Cleanup removing the data files: {}", gcCandidateSet); 1898 journal.removeDataFiles(gcCandidateSet); 1899 for (Integer candidate : gcCandidateSet) { 1900 for (Set<Integer> ackFiles : metadata.ackMessageFileMap.values()) { 1901 ackMessageFileMapMod |= ackFiles.remove(candidate); 1902 } 1903 } 1904 if (ackMessageFileMapMod) { 1905 checkpointUpdate(tx, false); 1906 } 1907 } else if (isEnableAckCompaction()) { 1908 if (++checkPointCyclesWithNoGC >= getCompactAcksAfterNoGC()) { 1909 // First check length of journal to make sure it makes sense to even try. 1910 // 1911 // If there is only one journal file with Acks in it we don't need to move 1912 // it since it won't be chained to any later logs. 1913 // 1914 // If the logs haven't grown since the last time then we need to compact 1915 // otherwise there seems to still be room for growth and we don't need to incur 1916 // the overhead. Depending on configuration this check can be avoided and 1917 // Ack compaction will run any time the store has not GC'd a journal file in 1918 // the configured amount of cycles. 1919 if (metadata.ackMessageFileMap.size() > 1 && 1920 (journalLogOnLastCompactionCheck == journal.getCurrentDataFileId() || isCompactAcksIgnoresStoreGrowth())) { 1921 1922 LOG.trace("No files GC'd checking if threshold to ACK compaction has been met."); 1923 try { 1924 scheduler.execute(new AckCompactionRunner()); 1925 } catch (Exception ex) { 1926 LOG.warn("Error on queueing the Ack Compactor", ex); 1927 } 1928 } else { 1929 LOG.trace("Journal activity detected, no Ack compaction scheduled."); 1930 } 1931 1932 checkPointCyclesWithNoGC = 0; 1933 } else { 1934 LOG.trace("Not yet time to check for compaction: {} of {} cycles", 1935 checkPointCyclesWithNoGC, getCompactAcksAfterNoGC()); 1936 } 1937 1938 journalLogOnLastCompactionCheck = journal.getCurrentDataFileId(); 1939 } 1940 } 1941 MDC.remove("activemq.persistenceDir"); 1942 1943 LOG.debug("Checkpoint done."); 1944 } 1945 1946 private final class AckCompactionRunner implements Runnable { 1947 1948 @Override 1949 public void run() { 1950 1951 int journalToAdvance = -1; 1952 Set<Integer> journalLogsReferenced = new HashSet<Integer>(); 1953 1954 //flag to know whether the ack forwarding completed without an exception 1955 boolean forwarded = false; 1956 1957 try { 1958 //acquire the checkpoint lock to prevent other threads from 1959 //running a checkpoint while this is running 1960 // 1961 //Normally this task runs on the same executor as the checkpoint task 1962 //so this ack compaction runner wouldn't run at the same time as the checkpoint task. 1963 // 1964 //However, there are two cases where this isn't always true. 1965 //First, the checkpoint() method is public and can be called through the 1966 //PersistenceAdapter interface by someone at the same time this is running. 1967 //Second, a checkpoint is called during shutdown without using the executor. 1968 // 1969 //In the future it might be better to just remove the checkpointLock entirely 1970 //and only use the executor but this would need to be examined for any unintended 1971 //consequences 1972 checkpointLock.readLock().lock(); 1973 1974 try { 1975 1976 // Lock index to capture the ackMessageFileMap data 1977 indexLock.writeLock().lock(); 1978 1979 // Map keys might not be sorted, find the earliest log file to forward acks 1980 // from and move only those, future cycles can chip away at more as needed. 1981 // We won't move files that are themselves rewritten on a previous compaction. 1982 List<Integer> journalFileIds = new ArrayList<Integer>(metadata.ackMessageFileMap.keySet()); 1983 Collections.sort(journalFileIds); 1984 for (Integer journalFileId : journalFileIds) { 1985 DataFile current = journal.getDataFileById(journalFileId); 1986 if (current != null && current.getTypeCode() != COMPACTED_JOURNAL_FILE) { 1987 journalToAdvance = journalFileId; 1988 break; 1989 } 1990 } 1991 1992 // Check if we found one, or if we only found the current file being written to. 1993 if (journalToAdvance == -1 || journalToAdvance == journal.getCurrentDataFileId()) { 1994 return; 1995 } 1996 1997 journalLogsReferenced.addAll(metadata.ackMessageFileMap.get(journalToAdvance)); 1998 1999 } finally { 2000 indexLock.writeLock().unlock(); 2001 } 2002 2003 try { 2004 // Background rewrite of the old acks 2005 forwardAllAcks(journalToAdvance, journalLogsReferenced); 2006 forwarded = true; 2007 } catch (IOException ioe) { 2008 LOG.error("Forwarding of acks failed", ioe); 2009 brokerService.handleIOException(ioe); 2010 } catch (Throwable e) { 2011 LOG.error("Forwarding of acks failed", e); 2012 brokerService.handleIOException(IOExceptionSupport.create(e)); 2013 } 2014 } finally { 2015 checkpointLock.readLock().unlock(); 2016 } 2017 2018 try { 2019 if (forwarded) { 2020 // Checkpoint with changes from the ackMessageFileMap 2021 checkpointUpdate(false); 2022 } 2023 } catch (IOException ioe) { 2024 LOG.error("Checkpoint failed", ioe); 2025 brokerService.handleIOException(ioe); 2026 } catch (Throwable e) { 2027 LOG.error("Checkpoint failed", e); 2028 brokerService.handleIOException(IOExceptionSupport.create(e)); 2029 } 2030 } 2031 } 2032 2033 private void forwardAllAcks(Integer journalToRead, Set<Integer> journalLogsReferenced) throws IllegalStateException, IOException { 2034 LOG.trace("Attempting to move all acks in journal:{} to the front.", journalToRead); 2035 2036 DataFile forwardsFile = journal.reserveDataFile(); 2037 forwardsFile.setTypeCode(COMPACTED_JOURNAL_FILE); 2038 LOG.trace("Reserved now file for forwarded acks: {}", forwardsFile); 2039 2040 Map<Integer, Set<Integer>> updatedAckLocations = new HashMap<Integer, Set<Integer>>(); 2041 2042 try (TargetedDataFileAppender appender = new TargetedDataFileAppender(journal, forwardsFile);) { 2043 KahaRewrittenDataFileCommand compactionMarker = new KahaRewrittenDataFileCommand(); 2044 compactionMarker.setSourceDataFileId(journalToRead); 2045 compactionMarker.setRewriteType(forwardsFile.getTypeCode()); 2046 2047 ByteSequence payload = toByteSequence(compactionMarker); 2048 appender.storeItem(payload, Journal.USER_RECORD_TYPE, false); 2049 LOG.trace("Marked ack rewrites file as replacing file: {}", journalToRead); 2050 2051 Location nextLocation = getNextLocationForAckForward(new Location(journalToRead, 0)); 2052 while (nextLocation != null && nextLocation.getDataFileId() == journalToRead) { 2053 JournalCommand<?> command = null; 2054 try { 2055 command = load(nextLocation); 2056 } catch (IOException ex) { 2057 LOG.trace("Error loading command during ack forward: {}", nextLocation); 2058 } 2059 2060 if (command != null && command instanceof KahaRemoveMessageCommand) { 2061 payload = toByteSequence(command); 2062 Location location = appender.storeItem(payload, Journal.USER_RECORD_TYPE, false); 2063 updatedAckLocations.put(location.getDataFileId(), journalLogsReferenced); 2064 } 2065 2066 nextLocation = getNextLocationForAckForward(nextLocation); 2067 } 2068 } 2069 2070 LOG.trace("ACKS forwarded, updates for ack locations: {}", updatedAckLocations); 2071 2072 // Lock index while we update the ackMessageFileMap. 2073 indexLock.writeLock().lock(); 2074 2075 // Update the ack map with the new locations of the acks 2076 for (Entry<Integer, Set<Integer>> entry : updatedAckLocations.entrySet()) { 2077 Set<Integer> referenceFileIds = metadata.ackMessageFileMap.get(entry.getKey()); 2078 if (referenceFileIds == null) { 2079 referenceFileIds = new HashSet<Integer>(); 2080 referenceFileIds.addAll(entry.getValue()); 2081 metadata.ackMessageFileMap.put(entry.getKey(), referenceFileIds); 2082 } else { 2083 referenceFileIds.addAll(entry.getValue()); 2084 } 2085 } 2086 2087 // remove the old location data from the ack map so that the old journal log file can 2088 // be removed on next GC. 2089 metadata.ackMessageFileMap.remove(journalToRead); 2090 2091 indexLock.writeLock().unlock(); 2092 2093 LOG.trace("ACK File Map following updates: {}", metadata.ackMessageFileMap); 2094 } 2095 2096 private Location getNextLocationForAckForward(final Location nextLocation) { 2097 //getNextLocation() can throw an IOException, we should handle it and set 2098 //nextLocation to null and abort gracefully 2099 //Should not happen in the normal case 2100 Location location = null; 2101 try { 2102 location = journal.getNextLocation(nextLocation); 2103 } catch (IOException e) { 2104 LOG.warn("Failed to load next journal location: {}", e.getMessage()); 2105 if (LOG.isDebugEnabled()) { 2106 LOG.debug("Failed to load next journal location", e); 2107 } 2108 } 2109 return location; 2110 } 2111 2112 final Runnable nullCompletionCallback = new Runnable() { 2113 @Override 2114 public void run() { 2115 } 2116 }; 2117 2118 private Location checkpointProducerAudit() throws IOException { 2119 if (metadata.producerSequenceIdTracker == null || metadata.producerSequenceIdTracker.modified()) { 2120 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 2121 ObjectOutputStream oout = new ObjectOutputStream(baos); 2122 oout.writeObject(metadata.producerSequenceIdTracker); 2123 oout.flush(); 2124 oout.close(); 2125 // using completion callback allows a disk sync to be avoided when enableJournalDiskSyncs = false 2126 Location location = store(new KahaProducerAuditCommand().setAudit(new Buffer(baos.toByteArray())), nullCompletionCallback); 2127 try { 2128 location.getLatch().await(); 2129 } catch (InterruptedException e) { 2130 throw new InterruptedIOException(e.toString()); 2131 } 2132 return location; 2133 } 2134 return metadata.producerSequenceIdTrackerLocation; 2135 } 2136 2137 private Location checkpointAckMessageFileMap() throws IOException { 2138 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 2139 ObjectOutputStream oout = new ObjectOutputStream(baos); 2140 oout.writeObject(metadata.ackMessageFileMap); 2141 oout.flush(); 2142 oout.close(); 2143 // using completion callback allows a disk sync to be avoided when enableJournalDiskSyncs = false 2144 Location location = store(new KahaAckMessageFileMapCommand().setAckMessageFileMap(new Buffer(baos.toByteArray())), nullCompletionCallback); 2145 try { 2146 location.getLatch().await(); 2147 } catch (InterruptedException e) { 2148 throw new InterruptedIOException(e.toString()); 2149 } 2150 return location; 2151 } 2152 2153 private Location checkpointSubscriptionCommand(KahaSubscriptionCommand subscription) throws IOException { 2154 2155 ByteSequence sequence = toByteSequence(subscription); 2156 Location location = journal.write(sequence, nullCompletionCallback) ; 2157 2158 try { 2159 location.getLatch().await(); 2160 } catch (InterruptedException e) { 2161 throw new InterruptedIOException(e.toString()); 2162 } 2163 return location; 2164 } 2165 2166 public HashSet<Integer> getJournalFilesBeingReplicated() { 2167 return journalFilesBeingReplicated; 2168 } 2169 2170 // ///////////////////////////////////////////////////////////////// 2171 // StoredDestination related implementation methods. 2172 // ///////////////////////////////////////////////////////////////// 2173 2174 protected final HashMap<String, StoredDestination> storedDestinations = new HashMap<String, StoredDestination>(); 2175 2176 static class MessageKeys { 2177 final String messageId; 2178 final Location location; 2179 2180 public MessageKeys(String messageId, Location location) { 2181 this.messageId=messageId; 2182 this.location=location; 2183 } 2184 2185 @Override 2186 public String toString() { 2187 return "["+messageId+","+location+"]"; 2188 } 2189 } 2190 2191 protected class MessageKeysMarshaller extends VariableMarshaller<MessageKeys> { 2192 final LocationSizeMarshaller locationSizeMarshaller = new LocationSizeMarshaller(); 2193 2194 @Override 2195 public MessageKeys readPayload(DataInput dataIn) throws IOException { 2196 return new MessageKeys(dataIn.readUTF(), locationSizeMarshaller.readPayload(dataIn)); 2197 } 2198 2199 @Override 2200 public void writePayload(MessageKeys object, DataOutput dataOut) throws IOException { 2201 dataOut.writeUTF(object.messageId); 2202 locationSizeMarshaller.writePayload(object.location, dataOut); 2203 } 2204 } 2205 2206 class LastAck { 2207 long lastAckedSequence; 2208 byte priority; 2209 2210 public LastAck(LastAck source) { 2211 this.lastAckedSequence = source.lastAckedSequence; 2212 this.priority = source.priority; 2213 } 2214 2215 public LastAck() { 2216 this.priority = MessageOrderIndex.HI; 2217 } 2218 2219 public LastAck(long ackLocation) { 2220 this.lastAckedSequence = ackLocation; 2221 this.priority = MessageOrderIndex.LO; 2222 } 2223 2224 public LastAck(long ackLocation, byte priority) { 2225 this.lastAckedSequence = ackLocation; 2226 this.priority = priority; 2227 } 2228 2229 @Override 2230 public String toString() { 2231 return "[" + lastAckedSequence + ":" + priority + "]"; 2232 } 2233 } 2234 2235 protected class LastAckMarshaller implements Marshaller<LastAck> { 2236 2237 @Override 2238 public void writePayload(LastAck object, DataOutput dataOut) throws IOException { 2239 dataOut.writeLong(object.lastAckedSequence); 2240 dataOut.writeByte(object.priority); 2241 } 2242 2243 @Override 2244 public LastAck readPayload(DataInput dataIn) throws IOException { 2245 LastAck lastAcked = new LastAck(); 2246 lastAcked.lastAckedSequence = dataIn.readLong(); 2247 if (metadata.version >= 3) { 2248 lastAcked.priority = dataIn.readByte(); 2249 } 2250 return lastAcked; 2251 } 2252 2253 @Override 2254 public int getFixedSize() { 2255 return 9; 2256 } 2257 2258 @Override 2259 public LastAck deepCopy(LastAck source) { 2260 return new LastAck(source); 2261 } 2262 2263 @Override 2264 public boolean isDeepCopySupported() { 2265 return true; 2266 } 2267 } 2268 2269 class StoredDestination { 2270 2271 MessageOrderIndex orderIndex = new MessageOrderIndex(); 2272 BTreeIndex<Location, Long> locationIndex; 2273 BTreeIndex<String, Long> messageIdIndex; 2274 2275 // These bits are only set for Topics 2276 BTreeIndex<String, KahaSubscriptionCommand> subscriptions; 2277 BTreeIndex<String, LastAck> subscriptionAcks; 2278 HashMap<String, MessageOrderCursor> subscriptionCursors; 2279 ListIndex<String, SequenceSet> ackPositions; 2280 ListIndex<String, Location> subLocations; 2281 2282 // Transient data used to track which Messages are no longer needed. 2283 final TreeMap<Long, Long> messageReferences = new TreeMap<Long, Long>(); 2284 final HashSet<String> subscriptionCache = new LinkedHashSet<String>(); 2285 2286 public void trackPendingAdd(Long seq) { 2287 orderIndex.trackPendingAdd(seq); 2288 } 2289 2290 public void trackPendingAddComplete(Long seq) { 2291 orderIndex.trackPendingAddComplete(seq); 2292 } 2293 2294 @Override 2295 public String toString() { 2296 return "nextSeq:" + orderIndex.nextMessageId + ",lastRet:" + orderIndex.cursor + ",pending:" + orderIndex.pendingAdditions.size(); 2297 } 2298 } 2299 2300 protected class StoredDestinationMarshaller extends VariableMarshaller<StoredDestination> { 2301 2302 final MessageKeysMarshaller messageKeysMarshaller = new MessageKeysMarshaller(); 2303 2304 @Override 2305 public StoredDestination readPayload(final DataInput dataIn) throws IOException { 2306 final StoredDestination value = new StoredDestination(); 2307 value.orderIndex.defaultPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong()); 2308 value.locationIndex = new BTreeIndex<Location, Long>(pageFile, dataIn.readLong()); 2309 value.messageIdIndex = new BTreeIndex<String, Long>(pageFile, dataIn.readLong()); 2310 2311 if (dataIn.readBoolean()) { 2312 value.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, dataIn.readLong()); 2313 value.subscriptionAcks = new BTreeIndex<String, LastAck>(pageFile, dataIn.readLong()); 2314 if (metadata.version >= 4) { 2315 value.ackPositions = new ListIndex<String, SequenceSet>(pageFile, dataIn.readLong()); 2316 } else { 2317 // upgrade 2318 pageFile.tx().execute(new Transaction.Closure<IOException>() { 2319 @Override 2320 public void execute(Transaction tx) throws IOException { 2321 LinkedHashMap<String, SequenceSet> temp = new LinkedHashMap<String, SequenceSet>(); 2322 2323 if (metadata.version >= 3) { 2324 // migrate 2325 BTreeIndex<Long, HashSet<String>> oldAckPositions = 2326 new BTreeIndex<Long, HashSet<String>>(pageFile, dataIn.readLong()); 2327 oldAckPositions.setKeyMarshaller(LongMarshaller.INSTANCE); 2328 oldAckPositions.setValueMarshaller(HashSetStringMarshaller.INSTANCE); 2329 oldAckPositions.load(tx); 2330 2331 2332 // Do the initial build of the data in memory before writing into the store 2333 // based Ack Positions List to avoid a lot of disk thrashing. 2334 Iterator<Entry<Long, HashSet<String>>> iterator = oldAckPositions.iterator(tx); 2335 while (iterator.hasNext()) { 2336 Entry<Long, HashSet<String>> entry = iterator.next(); 2337 2338 for(String subKey : entry.getValue()) { 2339 SequenceSet pendingAcks = temp.get(subKey); 2340 if (pendingAcks == null) { 2341 pendingAcks = new SequenceSet(); 2342 temp.put(subKey, pendingAcks); 2343 } 2344 2345 pendingAcks.add(entry.getKey()); 2346 } 2347 } 2348 } 2349 // Now move the pending messages to ack data into the store backed 2350 // structure. 2351 value.ackPositions = new ListIndex<String, SequenceSet>(pageFile, tx.allocate()); 2352 value.ackPositions.setKeyMarshaller(StringMarshaller.INSTANCE); 2353 value.ackPositions.setValueMarshaller(SequenceSet.Marshaller.INSTANCE); 2354 value.ackPositions.load(tx); 2355 for(String subscriptionKey : temp.keySet()) { 2356 value.ackPositions.put(tx, subscriptionKey, temp.get(subscriptionKey)); 2357 } 2358 2359 } 2360 }); 2361 } 2362 2363 if (metadata.version >= 5) { 2364 value.subLocations = new ListIndex<String, Location>(pageFile, dataIn.readLong()); 2365 } else { 2366 // upgrade 2367 pageFile.tx().execute(new Transaction.Closure<IOException>() { 2368 @Override 2369 public void execute(Transaction tx) throws IOException { 2370 value.subLocations = new ListIndex<String, Location>(pageFile, tx.allocate()); 2371 value.subLocations.setKeyMarshaller(StringMarshaller.INSTANCE); 2372 value.subLocations.setValueMarshaller(LocationMarshaller.INSTANCE); 2373 value.subLocations.load(tx); 2374 } 2375 }); 2376 } 2377 } 2378 if (metadata.version >= 2) { 2379 value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong()); 2380 value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong()); 2381 } else { 2382 // upgrade 2383 pageFile.tx().execute(new Transaction.Closure<IOException>() { 2384 @Override 2385 public void execute(Transaction tx) throws IOException { 2386 value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate()); 2387 value.orderIndex.lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); 2388 value.orderIndex.lowPriorityIndex.setValueMarshaller(messageKeysMarshaller); 2389 value.orderIndex.lowPriorityIndex.load(tx); 2390 2391 value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate()); 2392 value.orderIndex.highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); 2393 value.orderIndex.highPriorityIndex.setValueMarshaller(messageKeysMarshaller); 2394 value.orderIndex.highPriorityIndex.load(tx); 2395 } 2396 }); 2397 } 2398 2399 return value; 2400 } 2401 2402 @Override 2403 public void writePayload(StoredDestination value, DataOutput dataOut) throws IOException { 2404 dataOut.writeLong(value.orderIndex.defaultPriorityIndex.getPageId()); 2405 dataOut.writeLong(value.locationIndex.getPageId()); 2406 dataOut.writeLong(value.messageIdIndex.getPageId()); 2407 if (value.subscriptions != null) { 2408 dataOut.writeBoolean(true); 2409 dataOut.writeLong(value.subscriptions.getPageId()); 2410 dataOut.writeLong(value.subscriptionAcks.getPageId()); 2411 dataOut.writeLong(value.ackPositions.getHeadPageId()); 2412 dataOut.writeLong(value.subLocations.getHeadPageId()); 2413 } else { 2414 dataOut.writeBoolean(false); 2415 } 2416 dataOut.writeLong(value.orderIndex.lowPriorityIndex.getPageId()); 2417 dataOut.writeLong(value.orderIndex.highPriorityIndex.getPageId()); 2418 } 2419 } 2420 2421 static class KahaSubscriptionCommandMarshaller extends VariableMarshaller<KahaSubscriptionCommand> { 2422 final static KahaSubscriptionCommandMarshaller INSTANCE = new KahaSubscriptionCommandMarshaller(); 2423 2424 @Override 2425 public KahaSubscriptionCommand readPayload(DataInput dataIn) throws IOException { 2426 KahaSubscriptionCommand rc = new KahaSubscriptionCommand(); 2427 rc.mergeFramed((InputStream)dataIn); 2428 return rc; 2429 } 2430 2431 @Override 2432 public void writePayload(KahaSubscriptionCommand object, DataOutput dataOut) throws IOException { 2433 object.writeFramed((OutputStream)dataOut); 2434 } 2435 } 2436 2437 protected StoredDestination getStoredDestination(KahaDestination destination, Transaction tx) throws IOException { 2438 String key = key(destination); 2439 StoredDestination rc = storedDestinations.get(key); 2440 if (rc == null) { 2441 boolean topic = destination.getType() == KahaDestination.DestinationType.TOPIC || destination.getType() == KahaDestination.DestinationType.TEMP_TOPIC; 2442 rc = loadStoredDestination(tx, key, topic); 2443 // Cache it. We may want to remove/unload destinations from the 2444 // cache that are not used for a while 2445 // to reduce memory usage. 2446 storedDestinations.put(key, rc); 2447 } 2448 return rc; 2449 } 2450 2451 protected StoredDestination getExistingStoredDestination(KahaDestination destination, Transaction tx) throws IOException { 2452 String key = key(destination); 2453 StoredDestination rc = storedDestinations.get(key); 2454 if (rc == null && metadata.destinations.containsKey(tx, key)) { 2455 rc = getStoredDestination(destination, tx); 2456 } 2457 return rc; 2458 } 2459 2460 /** 2461 * @param tx 2462 * @param key 2463 * @param topic 2464 * @return 2465 * @throws IOException 2466 */ 2467 private StoredDestination loadStoredDestination(Transaction tx, String key, boolean topic) throws IOException { 2468 // Try to load the existing indexes.. 2469 StoredDestination rc = metadata.destinations.get(tx, key); 2470 if (rc == null) { 2471 // Brand new destination.. allocate indexes for it. 2472 rc = new StoredDestination(); 2473 rc.orderIndex.allocate(tx); 2474 rc.locationIndex = new BTreeIndex<Location, Long>(pageFile, tx.allocate()); 2475 rc.messageIdIndex = new BTreeIndex<String, Long>(pageFile, tx.allocate()); 2476 2477 if (topic) { 2478 rc.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, tx.allocate()); 2479 rc.subscriptionAcks = new BTreeIndex<String, LastAck>(pageFile, tx.allocate()); 2480 rc.ackPositions = new ListIndex<String, SequenceSet>(pageFile, tx.allocate()); 2481 rc.subLocations = new ListIndex<String, Location>(pageFile, tx.allocate()); 2482 } 2483 metadata.destinations.put(tx, key, rc); 2484 } 2485 2486 // Configure the marshalers and load. 2487 rc.orderIndex.load(tx); 2488 2489 // Figure out the next key using the last entry in the destination. 2490 rc.orderIndex.configureLast(tx); 2491 2492 rc.locationIndex.setKeyMarshaller(new LocationSizeMarshaller()); 2493 rc.locationIndex.setValueMarshaller(LongMarshaller.INSTANCE); 2494 rc.locationIndex.load(tx); 2495 2496 rc.messageIdIndex.setKeyMarshaller(StringMarshaller.INSTANCE); 2497 rc.messageIdIndex.setValueMarshaller(LongMarshaller.INSTANCE); 2498 rc.messageIdIndex.load(tx); 2499 2500 //go through an upgrade old index if older than version 6 2501 if (metadata.version < 6) { 2502 for (Iterator<Entry<Location, Long>> iterator = rc.locationIndex.iterator(tx); iterator.hasNext(); ) { 2503 Entry<Location, Long> entry = iterator.next(); 2504 // modify so it is upgraded 2505 rc.locationIndex.put(tx, entry.getKey(), entry.getValue()); 2506 } 2507 //upgrade the order index 2508 for (Iterator<Entry<Long, MessageKeys>> iterator = rc.orderIndex.iterator(tx); iterator.hasNext(); ) { 2509 Entry<Long, MessageKeys> entry = iterator.next(); 2510 //call get so that the last priority is updated 2511 rc.orderIndex.get(tx, entry.getKey()); 2512 rc.orderIndex.put(tx, rc.orderIndex.lastGetPriority(), entry.getKey(), entry.getValue()); 2513 } 2514 } 2515 2516 // If it was a topic... 2517 if (topic) { 2518 2519 rc.subscriptions.setKeyMarshaller(StringMarshaller.INSTANCE); 2520 rc.subscriptions.setValueMarshaller(KahaSubscriptionCommandMarshaller.INSTANCE); 2521 rc.subscriptions.load(tx); 2522 2523 rc.subscriptionAcks.setKeyMarshaller(StringMarshaller.INSTANCE); 2524 rc.subscriptionAcks.setValueMarshaller(new LastAckMarshaller()); 2525 rc.subscriptionAcks.load(tx); 2526 2527 rc.ackPositions.setKeyMarshaller(StringMarshaller.INSTANCE); 2528 rc.ackPositions.setValueMarshaller(SequenceSet.Marshaller.INSTANCE); 2529 rc.ackPositions.load(tx); 2530 2531 rc.subLocations.setKeyMarshaller(StringMarshaller.INSTANCE); 2532 rc.subLocations.setValueMarshaller(LocationMarshaller.INSTANCE); 2533 rc.subLocations.load(tx); 2534 2535 rc.subscriptionCursors = new HashMap<String, MessageOrderCursor>(); 2536 2537 if (metadata.version < 3) { 2538 2539 // on upgrade need to fill ackLocation with available messages past last ack 2540 for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) { 2541 Entry<String, LastAck> entry = iterator.next(); 2542 for (Iterator<Entry<Long, MessageKeys>> orderIterator = 2543 rc.orderIndex.iterator(tx, new MessageOrderCursor(entry.getValue().lastAckedSequence)); orderIterator.hasNext(); ) { 2544 Long sequence = orderIterator.next().getKey(); 2545 addAckLocation(tx, rc, sequence, entry.getKey()); 2546 } 2547 // modify so it is upgraded 2548 rc.subscriptionAcks.put(tx, entry.getKey(), entry.getValue()); 2549 } 2550 } 2551 2552 // Configure the message references index 2553 Iterator<Entry<String, SequenceSet>> subscriptions = rc.ackPositions.iterator(tx); 2554 while (subscriptions.hasNext()) { 2555 Entry<String, SequenceSet> subscription = subscriptions.next(); 2556 SequenceSet pendingAcks = subscription.getValue(); 2557 if (pendingAcks != null && !pendingAcks.isEmpty()) { 2558 Long lastPendingAck = pendingAcks.getTail().getLast(); 2559 for (Long sequenceId : pendingAcks) { 2560 Long current = rc.messageReferences.get(sequenceId); 2561 if (current == null) { 2562 current = new Long(0); 2563 } 2564 2565 // We always add a trailing empty entry for the next position to start from 2566 // so we need to ensure we don't count that as a message reference on reload. 2567 if (!sequenceId.equals(lastPendingAck)) { 2568 current = current.longValue() + 1; 2569 } else { 2570 current = Long.valueOf(0L); 2571 } 2572 2573 rc.messageReferences.put(sequenceId, current); 2574 } 2575 } 2576 } 2577 2578 // Configure the subscription cache 2579 for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) { 2580 Entry<String, LastAck> entry = iterator.next(); 2581 rc.subscriptionCache.add(entry.getKey()); 2582 } 2583 2584 if (rc.orderIndex.nextMessageId == 0) { 2585 // check for existing durable sub all acked out - pull next seq from acks as messages are gone 2586 if (!rc.subscriptionAcks.isEmpty(tx)) { 2587 for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext();) { 2588 Entry<String, LastAck> entry = iterator.next(); 2589 rc.orderIndex.nextMessageId = 2590 Math.max(rc.orderIndex.nextMessageId, entry.getValue().lastAckedSequence +1); 2591 } 2592 } 2593 } else { 2594 // update based on ackPositions for unmatched, last entry is always the next 2595 if (!rc.messageReferences.isEmpty()) { 2596 Long nextMessageId = (Long) rc.messageReferences.keySet().toArray()[rc.messageReferences.size() - 1]; 2597 rc.orderIndex.nextMessageId = 2598 Math.max(rc.orderIndex.nextMessageId, nextMessageId); 2599 } 2600 } 2601 } 2602 2603 if (metadata.version < VERSION) { 2604 // store again after upgrade 2605 metadata.destinations.put(tx, key, rc); 2606 } 2607 return rc; 2608 } 2609 2610 /** 2611 * Clear the counter for the destination, if one exists. 2612 * 2613 * @param kahaDestination 2614 */ 2615 protected void clearStoreStats(KahaDestination kahaDestination) { 2616 String key = key(kahaDestination); 2617 MessageStoreStatistics storeStats = getStoreStats(key); 2618 MessageStoreSubscriptionStatistics subStats = getSubStats(key); 2619 if (storeStats != null) { 2620 storeStats.reset(); 2621 } 2622 if (subStats != null) { 2623 subStats.reset(); 2624 } 2625 } 2626 2627 /** 2628 * Update MessageStoreStatistics 2629 * 2630 * @param kahaDestination 2631 * @param size 2632 */ 2633 protected void incrementAndAddSizeToStoreStat(KahaDestination kahaDestination, long size) { 2634 incrementAndAddSizeToStoreStat(key(kahaDestination), size); 2635 } 2636 2637 protected void incrementAndAddSizeToStoreStat(String kahaDestKey, long size) { 2638 MessageStoreStatistics storeStats = getStoreStats(kahaDestKey); 2639 if (storeStats != null) { 2640 storeStats.getMessageCount().increment(); 2641 if (size > 0) { 2642 storeStats.getMessageSize().addSize(size); 2643 } 2644 } 2645 } 2646 2647 protected void decrementAndSubSizeToStoreStat(KahaDestination kahaDestination, long size) { 2648 decrementAndSubSizeToStoreStat(key(kahaDestination), size); 2649 } 2650 2651 protected void decrementAndSubSizeToStoreStat(String kahaDestKey, long size) { 2652 MessageStoreStatistics storeStats = getStoreStats(kahaDestKey); 2653 if (storeStats != null) { 2654 storeStats.getMessageCount().decrement(); 2655 if (size > 0) { 2656 storeStats.getMessageSize().addSize(-size); 2657 } 2658 } 2659 } 2660 2661 protected void incrementAndAddSizeToStoreStat(KahaDestination kahaDestination, String subKey, long size) { 2662 incrementAndAddSizeToStoreStat(key(kahaDestination), subKey, size); 2663 } 2664 2665 protected void incrementAndAddSizeToStoreStat(String kahaDestKey, String subKey, long size) { 2666 if (enableSubscriptionStatistics) { 2667 MessageStoreSubscriptionStatistics subStats = getSubStats(kahaDestKey); 2668 if (subStats != null && subKey != null) { 2669 subStats.getMessageCount(subKey).increment(); 2670 if (size > 0) { 2671 subStats.getMessageSize(subKey).addSize(size); 2672 } 2673 } 2674 } 2675 } 2676 2677 2678 protected void decrementAndSubSizeToStoreStat(String kahaDestKey, String subKey, long size) { 2679 if (enableSubscriptionStatistics) { 2680 MessageStoreSubscriptionStatistics subStats = getSubStats(kahaDestKey); 2681 if (subStats != null && subKey != null) { 2682 subStats.getMessageCount(subKey).decrement(); 2683 if (size > 0) { 2684 subStats.getMessageSize(subKey).addSize(-size); 2685 } 2686 } 2687 } 2688 } 2689 2690 protected void decrementAndSubSizeToStoreStat(KahaDestination kahaDestination, String subKey, long size) { 2691 decrementAndSubSizeToStoreStat(key(kahaDestination), subKey, size); 2692 } 2693 2694 /** 2695 * This is a map to cache MessageStores for a specific 2696 * KahaDestination key 2697 */ 2698 protected final ConcurrentMap<String, MessageStore> storeCache = 2699 new ConcurrentHashMap<String, MessageStore>(); 2700 2701 /** 2702 * Locate the storeMessageSize counter for this KahaDestination 2703 * @param kahaDestination 2704 * @return 2705 */ 2706 protected MessageStoreStatistics getStoreStats(String kahaDestKey) { 2707 MessageStoreStatistics storeStats = null; 2708 try { 2709 MessageStore messageStore = storeCache.get(kahaDestKey); 2710 if (messageStore != null) { 2711 storeStats = messageStore.getMessageStoreStatistics(); 2712 } 2713 } catch (Exception e1) { 2714 LOG.error("Getting size counter of destination failed", e1); 2715 } 2716 2717 return storeStats; 2718 } 2719 2720 protected MessageStoreSubscriptionStatistics getSubStats(String kahaDestKey) { 2721 MessageStoreSubscriptionStatistics subStats = null; 2722 try { 2723 MessageStore messageStore = storeCache.get(kahaDestKey); 2724 if (messageStore instanceof TopicMessageStore) { 2725 subStats = ((TopicMessageStore)messageStore).getMessageStoreSubStatistics(); 2726 } 2727 } catch (Exception e1) { 2728 LOG.error("Getting size counter of destination failed", e1); 2729 } 2730 2731 return subStats; 2732 } 2733 2734 /** 2735 * Determine whether this Destination matches the DestinationType 2736 * 2737 * @param destination 2738 * @param type 2739 * @return 2740 */ 2741 protected boolean matchType(Destination destination, 2742 KahaDestination.DestinationType type) { 2743 if (destination instanceof Topic 2744 && type.equals(KahaDestination.DestinationType.TOPIC)) { 2745 return true; 2746 } else if (destination instanceof Queue 2747 && type.equals(KahaDestination.DestinationType.QUEUE)) { 2748 return true; 2749 } 2750 return false; 2751 } 2752 2753 class LocationSizeMarshaller implements Marshaller<Location> { 2754 2755 public LocationSizeMarshaller() { 2756 2757 } 2758 2759 @Override 2760 public Location readPayload(DataInput dataIn) throws IOException { 2761 Location rc = new Location(); 2762 rc.setDataFileId(dataIn.readInt()); 2763 rc.setOffset(dataIn.readInt()); 2764 if (metadata.version >= 6) { 2765 rc.setSize(dataIn.readInt()); 2766 } 2767 return rc; 2768 } 2769 2770 @Override 2771 public void writePayload(Location object, DataOutput dataOut) 2772 throws IOException { 2773 dataOut.writeInt(object.getDataFileId()); 2774 dataOut.writeInt(object.getOffset()); 2775 dataOut.writeInt(object.getSize()); 2776 } 2777 2778 @Override 2779 public int getFixedSize() { 2780 return 12; 2781 } 2782 2783 @Override 2784 public Location deepCopy(Location source) { 2785 return new Location(source); 2786 } 2787 2788 @Override 2789 public boolean isDeepCopySupported() { 2790 return true; 2791 } 2792 } 2793 2794 private void addAckLocation(Transaction tx, StoredDestination sd, Long messageSequence, String subscriptionKey) throws IOException { 2795 SequenceSet sequences = sd.ackPositions.get(tx, subscriptionKey); 2796 if (sequences == null) { 2797 sequences = new SequenceSet(); 2798 sequences.add(messageSequence); 2799 sd.ackPositions.add(tx, subscriptionKey, sequences); 2800 } else { 2801 sequences.add(messageSequence); 2802 sd.ackPositions.put(tx, subscriptionKey, sequences); 2803 } 2804 2805 Long count = sd.messageReferences.get(messageSequence); 2806 if (count == null) { 2807 count = Long.valueOf(0L); 2808 } 2809 count = count.longValue() + 1; 2810 sd.messageReferences.put(messageSequence, count); 2811 } 2812 2813 // new sub is interested in potentially all existing messages 2814 private void addAckLocationForRetroactiveSub(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException { 2815 SequenceSet allOutstanding = new SequenceSet(); 2816 Iterator<Map.Entry<String, SequenceSet>> iterator = sd.ackPositions.iterator(tx); 2817 while (iterator.hasNext()) { 2818 SequenceSet set = iterator.next().getValue(); 2819 for (Long entry : set) { 2820 allOutstanding.add(entry); 2821 } 2822 } 2823 sd.ackPositions.put(tx, subscriptionKey, allOutstanding); 2824 2825 for (Long ackPosition : allOutstanding) { 2826 Long count = sd.messageReferences.get(ackPosition); 2827 2828 // There might not be a reference if the ackLocation was the last 2829 // one which is a placeholder for the next incoming message and 2830 // no value was added to the message references table. 2831 if (count != null) { 2832 count = count.longValue() + 1; 2833 sd.messageReferences.put(ackPosition, count); 2834 } 2835 } 2836 } 2837 2838 // on a new message add, all existing subs are interested in this message 2839 private void addAckLocationForNewMessage(Transaction tx, KahaDestination kahaDest, 2840 StoredDestination sd, Long messageSequence) throws IOException { 2841 for(String subscriptionKey : sd.subscriptionCache) { 2842 SequenceSet sequences = sd.ackPositions.get(tx, subscriptionKey); 2843 if (sequences == null) { 2844 sequences = new SequenceSet(); 2845 sequences.add(new Sequence(messageSequence, messageSequence + 1)); 2846 sd.ackPositions.add(tx, subscriptionKey, sequences); 2847 } else { 2848 sequences.add(new Sequence(messageSequence, messageSequence + 1)); 2849 sd.ackPositions.put(tx, subscriptionKey, sequences); 2850 } 2851 2852 MessageKeys key = sd.orderIndex.get(tx, messageSequence); 2853 incrementAndAddSizeToStoreStat(kahaDest, subscriptionKey, 2854 key.location.getSize()); 2855 2856 Long count = sd.messageReferences.get(messageSequence); 2857 if (count == null) { 2858 count = Long.valueOf(0L); 2859 } 2860 count = count.longValue() + 1; 2861 sd.messageReferences.put(messageSequence, count); 2862 sd.messageReferences.put(messageSequence + 1, Long.valueOf(0L)); 2863 } 2864 } 2865 2866 private void removeAckLocationsForSub(KahaSubscriptionCommand command, 2867 Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException { 2868 if (!sd.ackPositions.isEmpty(tx)) { 2869 SequenceSet sequences = sd.ackPositions.remove(tx, subscriptionKey); 2870 if (sequences == null || sequences.isEmpty()) { 2871 return; 2872 } 2873 2874 ArrayList<Long> unreferenced = new ArrayList<Long>(); 2875 2876 for(Long sequenceId : sequences) { 2877 Long references = sd.messageReferences.get(sequenceId); 2878 if (references != null) { 2879 references = references.longValue() - 1; 2880 2881 if (references.longValue() > 0) { 2882 sd.messageReferences.put(sequenceId, references); 2883 } else { 2884 sd.messageReferences.remove(sequenceId); 2885 unreferenced.add(sequenceId); 2886 } 2887 } 2888 } 2889 2890 for(Long sequenceId : unreferenced) { 2891 // Find all the entries that need to get deleted. 2892 ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long, MessageKeys>>(); 2893 sd.orderIndex.getDeleteList(tx, deletes, sequenceId); 2894 2895 // Do the actual deletes. 2896 for (Entry<Long, MessageKeys> entry : deletes) { 2897 sd.locationIndex.remove(tx, entry.getValue().location); 2898 sd.messageIdIndex.remove(tx, entry.getValue().messageId); 2899 sd.orderIndex.remove(tx, entry.getKey()); 2900 decrementAndSubSizeToStoreStat(command.getDestination(), entry.getValue().location.getSize()); 2901 } 2902 } 2903 } 2904 } 2905 2906 /** 2907 * @param tx 2908 * @param sd 2909 * @param subscriptionKey 2910 * @param messageSequence 2911 * @throws IOException 2912 */ 2913 private void removeAckLocation(KahaRemoveMessageCommand command, 2914 Transaction tx, StoredDestination sd, String subscriptionKey, 2915 Long messageSequence) throws IOException { 2916 // Remove the sub from the previous location set.. 2917 if (messageSequence != null) { 2918 SequenceSet range = sd.ackPositions.get(tx, subscriptionKey); 2919 if (range != null && !range.isEmpty()) { 2920 range.remove(messageSequence); 2921 if (!range.isEmpty()) { 2922 sd.ackPositions.put(tx, subscriptionKey, range); 2923 } else { 2924 sd.ackPositions.remove(tx, subscriptionKey); 2925 } 2926 2927 MessageKeys key = sd.orderIndex.get(tx, messageSequence); 2928 decrementAndSubSizeToStoreStat(command.getDestination(), subscriptionKey, 2929 key.location.getSize()); 2930 2931 // Check if the message is reference by any other subscription. 2932 Long count = sd.messageReferences.get(messageSequence); 2933 if (count != null) { 2934 long references = count.longValue() - 1; 2935 if (references > 0) { 2936 sd.messageReferences.put(messageSequence, Long.valueOf(references)); 2937 return; 2938 } else { 2939 sd.messageReferences.remove(messageSequence); 2940 } 2941 } 2942 2943 // Find all the entries that need to get deleted. 2944 ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long, MessageKeys>>(); 2945 sd.orderIndex.getDeleteList(tx, deletes, messageSequence); 2946 2947 // Do the actual deletes. 2948 for (Entry<Long, MessageKeys> entry : deletes) { 2949 sd.locationIndex.remove(tx, entry.getValue().location); 2950 sd.messageIdIndex.remove(tx, entry.getValue().messageId); 2951 sd.orderIndex.remove(tx, entry.getKey()); 2952 decrementAndSubSizeToStoreStat(command.getDestination(), entry.getValue().location.getSize()); 2953 } 2954 } 2955 } 2956 } 2957 2958 public LastAck getLastAck(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException { 2959 return sd.subscriptionAcks.get(tx, subscriptionKey); 2960 } 2961 2962 public long getStoredMessageCount(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException { 2963 SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey); 2964 if (messageSequences != null) { 2965 long result = messageSequences.rangeSize(); 2966 // if there's anything in the range the last value is always the nextMessage marker, so remove 1. 2967 return result > 0 ? result - 1 : 0; 2968 } 2969 2970 return 0; 2971 } 2972 2973 public long getStoredMessageSize(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException { 2974 //grab the messages attached to this subscription 2975 SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey); 2976 2977 long locationSize = 0; 2978 if (messageSequences != null) { 2979 Sequence head = messageSequences.getHead(); 2980 if (head != null) { 2981 //get an iterator over the order index starting at the first unacked message 2982 //and go over each message to add up the size 2983 Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, 2984 new MessageOrderCursor(head.getFirst())); 2985 2986 while (iterator.hasNext()) { 2987 Entry<Long, MessageKeys> entry = iterator.next(); 2988 locationSize += entry.getValue().location.getSize(); 2989 } 2990 } 2991 } 2992 2993 return locationSize; 2994 } 2995 2996 protected String key(KahaDestination destination) { 2997 return destination.getType().getNumber() + ":" + destination.getName(); 2998 } 2999 3000 // ///////////////////////////////////////////////////////////////// 3001 // Transaction related implementation methods. 3002 // ///////////////////////////////////////////////////////////////// 3003 @SuppressWarnings("rawtypes") 3004 private final LinkedHashMap<TransactionId, List<Operation>> inflightTransactions = new LinkedHashMap<TransactionId, List<Operation>>(); 3005 @SuppressWarnings("rawtypes") 3006 protected final LinkedHashMap<TransactionId, List<Operation>> preparedTransactions = new LinkedHashMap<TransactionId, List<Operation>>(); 3007 protected final Set<String> ackedAndPrepared = new HashSet<String>(); 3008 protected final Set<String> rolledBackAcks = new HashSet<String>(); 3009 3010 // messages that have prepared (pending) acks cannot be re-dispatched unless the outcome is rollback, 3011 // till then they are skipped by the store. 3012 // 'at most once' XA guarantee 3013 public void trackRecoveredAcks(ArrayList<MessageAck> acks) { 3014 this.indexLock.writeLock().lock(); 3015 try { 3016 for (MessageAck ack : acks) { 3017 ackedAndPrepared.add(ack.getLastMessageId().toProducerKey()); 3018 } 3019 } finally { 3020 this.indexLock.writeLock().unlock(); 3021 } 3022 } 3023 3024 public void forgetRecoveredAcks(ArrayList<MessageAck> acks, boolean rollback) throws IOException { 3025 if (acks != null) { 3026 this.indexLock.writeLock().lock(); 3027 try { 3028 for (MessageAck ack : acks) { 3029 final String id = ack.getLastMessageId().toProducerKey(); 3030 ackedAndPrepared.remove(id); 3031 if (rollback) { 3032 rolledBackAcks.add(id); 3033 } 3034 } 3035 } finally { 3036 this.indexLock.writeLock().unlock(); 3037 } 3038 } 3039 } 3040 3041 @SuppressWarnings("rawtypes") 3042 private List<Operation> getInflightTx(KahaTransactionInfo info) { 3043 TransactionId key = TransactionIdConversion.convert(info); 3044 List<Operation> tx; 3045 synchronized (inflightTransactions) { 3046 tx = inflightTransactions.get(key); 3047 if (tx == null) { 3048 tx = Collections.synchronizedList(new ArrayList<Operation>()); 3049 inflightTransactions.put(key, tx); 3050 } 3051 } 3052 return tx; 3053 } 3054 3055 @SuppressWarnings("unused") 3056 private TransactionId key(KahaTransactionInfo transactionInfo) { 3057 return TransactionIdConversion.convert(transactionInfo); 3058 } 3059 3060 abstract class Operation <T extends JournalCommand<T>> { 3061 final T command; 3062 final Location location; 3063 3064 public Operation(T command, Location location) { 3065 this.command = command; 3066 this.location = location; 3067 } 3068 3069 public Location getLocation() { 3070 return location; 3071 } 3072 3073 public T getCommand() { 3074 return command; 3075 } 3076 3077 abstract public void execute(Transaction tx) throws IOException; 3078 } 3079 3080 class AddOperation extends Operation<KahaAddMessageCommand> { 3081 final IndexAware runWithIndexLock; 3082 public AddOperation(KahaAddMessageCommand command, Location location, IndexAware runWithIndexLock) { 3083 super(command, location); 3084 this.runWithIndexLock = runWithIndexLock; 3085 } 3086 3087 @Override 3088 public void execute(Transaction tx) throws IOException { 3089 long seq = updateIndex(tx, command, location); 3090 if (runWithIndexLock != null) { 3091 runWithIndexLock.sequenceAssignedWithIndexLocked(seq); 3092 } 3093 } 3094 } 3095 3096 class RemoveOperation extends Operation<KahaRemoveMessageCommand> { 3097 3098 public RemoveOperation(KahaRemoveMessageCommand command, Location location) { 3099 super(command, location); 3100 } 3101 3102 @Override 3103 public void execute(Transaction tx) throws IOException { 3104 updateIndex(tx, command, location); 3105 } 3106 } 3107 3108 // ///////////////////////////////////////////////////////////////// 3109 // Initialization related implementation methods. 3110 // ///////////////////////////////////////////////////////////////// 3111 3112 private PageFile createPageFile() throws IOException { 3113 if (indexDirectory == null) { 3114 indexDirectory = directory; 3115 } 3116 IOHelper.mkdirs(indexDirectory); 3117 PageFile index = new PageFile(indexDirectory, "db"); 3118 index.setEnableWriteThread(isEnableIndexWriteAsync()); 3119 index.setWriteBatchSize(getIndexWriteBatchSize()); 3120 index.setPageCacheSize(indexCacheSize); 3121 index.setUseLFRUEviction(isUseIndexLFRUEviction()); 3122 index.setLFUEvictionFactor(getIndexLFUEvictionFactor()); 3123 index.setEnableDiskSyncs(isEnableIndexDiskSyncs()); 3124 index.setEnableRecoveryFile(isEnableIndexRecoveryFile()); 3125 index.setEnablePageCaching(isEnableIndexPageCaching()); 3126 return index; 3127 } 3128 3129 private Journal createJournal() throws IOException { 3130 Journal manager = new Journal(); 3131 manager.setDirectory(directory); 3132 manager.setMaxFileLength(getJournalMaxFileLength()); 3133 manager.setCheckForCorruptionOnStartup(checkForCorruptJournalFiles); 3134 manager.setChecksum(checksumJournalFiles || checkForCorruptJournalFiles); 3135 manager.setWriteBatchSize(getJournalMaxWriteBatchSize()); 3136 manager.setArchiveDataLogs(isArchiveDataLogs()); 3137 manager.setSizeAccumulator(journalSize); 3138 manager.setEnableAsyncDiskSync(isEnableJournalDiskSyncs()); 3139 manager.setPreallocationScope(Journal.PreallocationScope.valueOf(preallocationScope.trim().toUpperCase())); 3140 manager.setPreallocationStrategy( 3141 Journal.PreallocationStrategy.valueOf(preallocationStrategy.trim().toUpperCase())); 3142 manager.setJournalDiskSyncStrategy( 3143 Journal.JournalDiskSyncStrategy.valueOf(journalDiskSyncStrategy.trim().toUpperCase())); 3144 if (getDirectoryArchive() != null) { 3145 IOHelper.mkdirs(getDirectoryArchive()); 3146 manager.setDirectoryArchive(getDirectoryArchive()); 3147 } 3148 return manager; 3149 } 3150 3151 private Metadata createMetadata() { 3152 Metadata md = new Metadata(); 3153 md.producerSequenceIdTracker.setAuditDepth(getFailoverProducersAuditDepth()); 3154 md.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(getMaxFailoverProducersToTrack()); 3155 return md; 3156 } 3157 3158 protected abstract void configureMetadata(); 3159 3160 public int getJournalMaxWriteBatchSize() { 3161 return journalMaxWriteBatchSize; 3162 } 3163 3164 public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) { 3165 this.journalMaxWriteBatchSize = journalMaxWriteBatchSize; 3166 } 3167 3168 public File getDirectory() { 3169 return directory; 3170 } 3171 3172 public void setDirectory(File directory) { 3173 this.directory = directory; 3174 } 3175 3176 public boolean isDeleteAllMessages() { 3177 return deleteAllMessages; 3178 } 3179 3180 public void setDeleteAllMessages(boolean deleteAllMessages) { 3181 this.deleteAllMessages = deleteAllMessages; 3182 } 3183 3184 public void setIndexWriteBatchSize(int setIndexWriteBatchSize) { 3185 this.setIndexWriteBatchSize = setIndexWriteBatchSize; 3186 } 3187 3188 public int getIndexWriteBatchSize() { 3189 return setIndexWriteBatchSize; 3190 } 3191 3192 public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) { 3193 this.enableIndexWriteAsync = enableIndexWriteAsync; 3194 } 3195 3196 boolean isEnableIndexWriteAsync() { 3197 return enableIndexWriteAsync; 3198 } 3199 3200 /** 3201 * @deprecated use {@link #getJournalDiskSyncStrategy} instead 3202 * @return 3203 */ 3204 public boolean isEnableJournalDiskSyncs() { 3205 return journalDiskSyncStrategy != null && JournalDiskSyncStrategy.ALWAYS.name().equals( 3206 journalDiskSyncStrategy.trim().toUpperCase()); 3207 } 3208 3209 /** 3210 * @deprecated use {@link #setEnableJournalDiskSyncs} instead 3211 * @param syncWrites 3212 */ 3213 public void setEnableJournalDiskSyncs(boolean syncWrites) { 3214 if (syncWrites) { 3215 journalDiskSyncStrategy = JournalDiskSyncStrategy.ALWAYS.name(); 3216 } else { 3217 journalDiskSyncStrategy = JournalDiskSyncStrategy.NEVER.name(); 3218 } 3219 } 3220 3221 public String getJournalDiskSyncStrategy() { 3222 return journalDiskSyncStrategy; 3223 } 3224 3225 public void setJournalDiskSyncStrategy(String journalDiskSyncStrategy) { 3226 this.journalDiskSyncStrategy = journalDiskSyncStrategy; 3227 } 3228 3229 public long getJournalDiskSyncInterval() { 3230 return journalDiskSyncInterval; 3231 } 3232 3233 public void setJournalDiskSyncInterval(long journalDiskSyncInterval) { 3234 this.journalDiskSyncInterval = journalDiskSyncInterval; 3235 } 3236 3237 public long getCheckpointInterval() { 3238 return checkpointInterval; 3239 } 3240 3241 public void setCheckpointInterval(long checkpointInterval) { 3242 this.checkpointInterval = checkpointInterval; 3243 } 3244 3245 public long getCleanupInterval() { 3246 return cleanupInterval; 3247 } 3248 3249 public void setCleanupInterval(long cleanupInterval) { 3250 this.cleanupInterval = cleanupInterval; 3251 } 3252 3253 public void setJournalMaxFileLength(int journalMaxFileLength) { 3254 this.journalMaxFileLength = journalMaxFileLength; 3255 } 3256 3257 public int getJournalMaxFileLength() { 3258 return journalMaxFileLength; 3259 } 3260 3261 public void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack) { 3262 this.metadata.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxFailoverProducersToTrack); 3263 } 3264 3265 public int getMaxFailoverProducersToTrack() { 3266 return this.metadata.producerSequenceIdTracker.getMaximumNumberOfProducersToTrack(); 3267 } 3268 3269 public void setFailoverProducersAuditDepth(int failoverProducersAuditDepth) { 3270 this.metadata.producerSequenceIdTracker.setAuditDepth(failoverProducersAuditDepth); 3271 } 3272 3273 public int getFailoverProducersAuditDepth() { 3274 return this.metadata.producerSequenceIdTracker.getAuditDepth(); 3275 } 3276 3277 public PageFile getPageFile() throws IOException { 3278 if (pageFile == null) { 3279 pageFile = createPageFile(); 3280 } 3281 return pageFile; 3282 } 3283 3284 public Journal getJournal() throws IOException { 3285 if (journal == null) { 3286 journal = createJournal(); 3287 } 3288 return journal; 3289 } 3290 3291 protected Metadata getMetadata() { 3292 return metadata; 3293 } 3294 3295 public boolean isFailIfDatabaseIsLocked() { 3296 return failIfDatabaseIsLocked; 3297 } 3298 3299 public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) { 3300 this.failIfDatabaseIsLocked = failIfDatabaseIsLocked; 3301 } 3302 3303 public boolean isIgnoreMissingJournalfiles() { 3304 return ignoreMissingJournalfiles; 3305 } 3306 3307 public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) { 3308 this.ignoreMissingJournalfiles = ignoreMissingJournalfiles; 3309 } 3310 3311 public int getIndexCacheSize() { 3312 return indexCacheSize; 3313 } 3314 3315 public void setIndexCacheSize(int indexCacheSize) { 3316 this.indexCacheSize = indexCacheSize; 3317 } 3318 3319 public boolean isCheckForCorruptJournalFiles() { 3320 return checkForCorruptJournalFiles; 3321 } 3322 3323 public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) { 3324 this.checkForCorruptJournalFiles = checkForCorruptJournalFiles; 3325 } 3326 3327 public boolean isChecksumJournalFiles() { 3328 return checksumJournalFiles; 3329 } 3330 3331 public void setChecksumJournalFiles(boolean checksumJournalFiles) { 3332 this.checksumJournalFiles = checksumJournalFiles; 3333 } 3334 3335 @Override 3336 public void setBrokerService(BrokerService brokerService) { 3337 this.brokerService = brokerService; 3338 } 3339 3340 /** 3341 * @return the archiveDataLogs 3342 */ 3343 public boolean isArchiveDataLogs() { 3344 return this.archiveDataLogs; 3345 } 3346 3347 /** 3348 * @param archiveDataLogs the archiveDataLogs to set 3349 */ 3350 public void setArchiveDataLogs(boolean archiveDataLogs) { 3351 this.archiveDataLogs = archiveDataLogs; 3352 } 3353 3354 /** 3355 * @return the directoryArchive 3356 */ 3357 public File getDirectoryArchive() { 3358 return this.directoryArchive; 3359 } 3360 3361 /** 3362 * @param directoryArchive the directoryArchive to set 3363 */ 3364 public void setDirectoryArchive(File directoryArchive) { 3365 this.directoryArchive = directoryArchive; 3366 } 3367 3368 public boolean isArchiveCorruptedIndex() { 3369 return archiveCorruptedIndex; 3370 } 3371 3372 public void setArchiveCorruptedIndex(boolean archiveCorruptedIndex) { 3373 this.archiveCorruptedIndex = archiveCorruptedIndex; 3374 } 3375 3376 public float getIndexLFUEvictionFactor() { 3377 return indexLFUEvictionFactor; 3378 } 3379 3380 public void setIndexLFUEvictionFactor(float indexLFUEvictionFactor) { 3381 this.indexLFUEvictionFactor = indexLFUEvictionFactor; 3382 } 3383 3384 public boolean isUseIndexLFRUEviction() { 3385 return useIndexLFRUEviction; 3386 } 3387 3388 public void setUseIndexLFRUEviction(boolean useIndexLFRUEviction) { 3389 this.useIndexLFRUEviction = useIndexLFRUEviction; 3390 } 3391 3392 public void setEnableIndexDiskSyncs(boolean enableIndexDiskSyncs) { 3393 this.enableIndexDiskSyncs = enableIndexDiskSyncs; 3394 } 3395 3396 public void setEnableIndexRecoveryFile(boolean enableIndexRecoveryFile) { 3397 this.enableIndexRecoveryFile = enableIndexRecoveryFile; 3398 } 3399 3400 public void setEnableIndexPageCaching(boolean enableIndexPageCaching) { 3401 this.enableIndexPageCaching = enableIndexPageCaching; 3402 } 3403 3404 public boolean isEnableIndexDiskSyncs() { 3405 return enableIndexDiskSyncs; 3406 } 3407 3408 public boolean isEnableIndexRecoveryFile() { 3409 return enableIndexRecoveryFile; 3410 } 3411 3412 public boolean isEnableIndexPageCaching() { 3413 return enableIndexPageCaching; 3414 } 3415 3416 // ///////////////////////////////////////////////////////////////// 3417 // Internal conversion methods. 3418 // ///////////////////////////////////////////////////////////////// 3419 3420 class MessageOrderCursor{ 3421 long defaultCursorPosition; 3422 long lowPriorityCursorPosition; 3423 long highPriorityCursorPosition; 3424 MessageOrderCursor(){ 3425 } 3426 3427 MessageOrderCursor(long position){ 3428 this.defaultCursorPosition=position; 3429 this.lowPriorityCursorPosition=position; 3430 this.highPriorityCursorPosition=position; 3431 } 3432 3433 MessageOrderCursor(MessageOrderCursor other){ 3434 this.defaultCursorPosition=other.defaultCursorPosition; 3435 this.lowPriorityCursorPosition=other.lowPriorityCursorPosition; 3436 this.highPriorityCursorPosition=other.highPriorityCursorPosition; 3437 } 3438 3439 MessageOrderCursor copy() { 3440 return new MessageOrderCursor(this); 3441 } 3442 3443 void reset() { 3444 this.defaultCursorPosition=0; 3445 this.highPriorityCursorPosition=0; 3446 this.lowPriorityCursorPosition=0; 3447 } 3448 3449 void increment() { 3450 if (defaultCursorPosition!=0) { 3451 defaultCursorPosition++; 3452 } 3453 if (highPriorityCursorPosition!=0) { 3454 highPriorityCursorPosition++; 3455 } 3456 if (lowPriorityCursorPosition!=0) { 3457 lowPriorityCursorPosition++; 3458 } 3459 } 3460 3461 @Override 3462 public String toString() { 3463 return "MessageOrderCursor:[def:" + defaultCursorPosition 3464 + ", low:" + lowPriorityCursorPosition 3465 + ", high:" + highPriorityCursorPosition + "]"; 3466 } 3467 3468 public void sync(MessageOrderCursor other) { 3469 this.defaultCursorPosition=other.defaultCursorPosition; 3470 this.lowPriorityCursorPosition=other.lowPriorityCursorPosition; 3471 this.highPriorityCursorPosition=other.highPriorityCursorPosition; 3472 } 3473 } 3474 3475 class MessageOrderIndex { 3476 static final byte HI = 9; 3477 static final byte LO = 0; 3478 static final byte DEF = 4; 3479 3480 long nextMessageId; 3481 BTreeIndex<Long, MessageKeys> defaultPriorityIndex; 3482 BTreeIndex<Long, MessageKeys> lowPriorityIndex; 3483 BTreeIndex<Long, MessageKeys> highPriorityIndex; 3484 final MessageOrderCursor cursor = new MessageOrderCursor(); 3485 Long lastDefaultKey; 3486 Long lastHighKey; 3487 Long lastLowKey; 3488 byte lastGetPriority; 3489 final List<Long> pendingAdditions = new LinkedList<Long>(); 3490 final MessageKeysMarshaller messageKeysMarshaller = new MessageKeysMarshaller(); 3491 3492 MessageKeys remove(Transaction tx, Long key) throws IOException { 3493 MessageKeys result = defaultPriorityIndex.remove(tx, key); 3494 if (result == null && highPriorityIndex!=null) { 3495 result = highPriorityIndex.remove(tx, key); 3496 if (result ==null && lowPriorityIndex!=null) { 3497 result = lowPriorityIndex.remove(tx, key); 3498 } 3499 } 3500 return result; 3501 } 3502 3503 void load(Transaction tx) throws IOException { 3504 defaultPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); 3505 defaultPriorityIndex.setValueMarshaller(messageKeysMarshaller); 3506 defaultPriorityIndex.load(tx); 3507 lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); 3508 lowPriorityIndex.setValueMarshaller(messageKeysMarshaller); 3509 lowPriorityIndex.load(tx); 3510 highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); 3511 highPriorityIndex.setValueMarshaller(messageKeysMarshaller); 3512 highPriorityIndex.load(tx); 3513 } 3514 3515 void allocate(Transaction tx) throws IOException { 3516 defaultPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate()); 3517 if (metadata.version >= 2) { 3518 lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate()); 3519 highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate()); 3520 } 3521 } 3522 3523 void configureLast(Transaction tx) throws IOException { 3524 // Figure out the next key using the last entry in the destination. 3525 TreeSet<Long> orderedSet = new TreeSet<Long>(); 3526 3527 addLast(orderedSet, highPriorityIndex, tx); 3528 addLast(orderedSet, defaultPriorityIndex, tx); 3529 addLast(orderedSet, lowPriorityIndex, tx); 3530 3531 if (!orderedSet.isEmpty()) { 3532 nextMessageId = orderedSet.last() + 1; 3533 } 3534 } 3535 3536 private void addLast(TreeSet<Long> orderedSet, BTreeIndex<Long, MessageKeys> index, Transaction tx) throws IOException { 3537 if (index != null) { 3538 Entry<Long, MessageKeys> lastEntry = index.getLast(tx); 3539 if (lastEntry != null) { 3540 orderedSet.add(lastEntry.getKey()); 3541 } 3542 } 3543 } 3544 3545 void clear(Transaction tx) throws IOException { 3546 this.remove(tx); 3547 this.resetCursorPosition(); 3548 this.allocate(tx); 3549 this.load(tx); 3550 this.configureLast(tx); 3551 } 3552 3553 void remove(Transaction tx) throws IOException { 3554 defaultPriorityIndex.clear(tx); 3555 defaultPriorityIndex.unload(tx); 3556 tx.free(defaultPriorityIndex.getPageId()); 3557 if (lowPriorityIndex != null) { 3558 lowPriorityIndex.clear(tx); 3559 lowPriorityIndex.unload(tx); 3560 3561 tx.free(lowPriorityIndex.getPageId()); 3562 } 3563 if (highPriorityIndex != null) { 3564 highPriorityIndex.clear(tx); 3565 highPriorityIndex.unload(tx); 3566 tx.free(highPriorityIndex.getPageId()); 3567 } 3568 } 3569 3570 void resetCursorPosition() { 3571 this.cursor.reset(); 3572 lastDefaultKey = null; 3573 lastHighKey = null; 3574 lastLowKey = null; 3575 } 3576 3577 void setBatch(Transaction tx, Long sequence) throws IOException { 3578 if (sequence != null) { 3579 Long nextPosition = new Long(sequence.longValue() + 1); 3580 lastDefaultKey = sequence; 3581 cursor.defaultCursorPosition = nextPosition.longValue(); 3582 lastHighKey = sequence; 3583 cursor.highPriorityCursorPosition = nextPosition.longValue(); 3584 lastLowKey = sequence; 3585 cursor.lowPriorityCursorPosition = nextPosition.longValue(); 3586 } 3587 } 3588 3589 void setBatch(Transaction tx, LastAck last) throws IOException { 3590 setBatch(tx, last.lastAckedSequence); 3591 if (cursor.defaultCursorPosition == 0 3592 && cursor.highPriorityCursorPosition == 0 3593 && cursor.lowPriorityCursorPosition == 0) { 3594 long next = last.lastAckedSequence + 1; 3595 switch (last.priority) { 3596 case DEF: 3597 cursor.defaultCursorPosition = next; 3598 cursor.highPriorityCursorPosition = next; 3599 break; 3600 case HI: 3601 cursor.highPriorityCursorPosition = next; 3602 break; 3603 case LO: 3604 cursor.lowPriorityCursorPosition = next; 3605 cursor.defaultCursorPosition = next; 3606 cursor.highPriorityCursorPosition = next; 3607 break; 3608 } 3609 } 3610 } 3611 3612 void stoppedIterating() { 3613 if (lastDefaultKey!=null) { 3614 cursor.defaultCursorPosition=lastDefaultKey.longValue()+1; 3615 } 3616 if (lastHighKey!=null) { 3617 cursor.highPriorityCursorPosition=lastHighKey.longValue()+1; 3618 } 3619 if (lastLowKey!=null) { 3620 cursor.lowPriorityCursorPosition=lastLowKey.longValue()+1; 3621 } 3622 lastDefaultKey = null; 3623 lastHighKey = null; 3624 lastLowKey = null; 3625 } 3626 3627 void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>> deletes, Long sequenceId) 3628 throws IOException { 3629 if (defaultPriorityIndex.containsKey(tx, sequenceId)) { 3630 getDeleteList(tx, deletes, defaultPriorityIndex, sequenceId); 3631 } else if (highPriorityIndex != null && highPriorityIndex.containsKey(tx, sequenceId)) { 3632 getDeleteList(tx, deletes, highPriorityIndex, sequenceId); 3633 } else if (lowPriorityIndex != null && lowPriorityIndex.containsKey(tx, sequenceId)) { 3634 getDeleteList(tx, deletes, lowPriorityIndex, sequenceId); 3635 } 3636 } 3637 3638 void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>> deletes, 3639 BTreeIndex<Long, MessageKeys> index, Long sequenceId) throws IOException { 3640 3641 Iterator<Entry<Long, MessageKeys>> iterator = index.iterator(tx, sequenceId, null); 3642 deletes.add(iterator.next()); 3643 } 3644 3645 long getNextMessageId() { 3646 return nextMessageId++; 3647 } 3648 3649 void revertNextMessageId() { 3650 nextMessageId--; 3651 } 3652 3653 MessageKeys get(Transaction tx, Long key) throws IOException { 3654 MessageKeys result = defaultPriorityIndex.get(tx, key); 3655 if (result == null) { 3656 result = highPriorityIndex.get(tx, key); 3657 if (result == null) { 3658 result = lowPriorityIndex.get(tx, key); 3659 lastGetPriority = LO; 3660 } else { 3661 lastGetPriority = HI; 3662 } 3663 } else { 3664 lastGetPriority = DEF; 3665 } 3666 return result; 3667 } 3668 3669 MessageKeys put(Transaction tx, int priority, Long key, MessageKeys value) throws IOException { 3670 if (priority == javax.jms.Message.DEFAULT_PRIORITY) { 3671 return defaultPriorityIndex.put(tx, key, value); 3672 } else if (priority > javax.jms.Message.DEFAULT_PRIORITY) { 3673 return highPriorityIndex.put(tx, key, value); 3674 } else { 3675 return lowPriorityIndex.put(tx, key, value); 3676 } 3677 } 3678 3679 Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx) throws IOException{ 3680 return new MessageOrderIterator(tx,cursor,this); 3681 } 3682 3683 Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx, MessageOrderCursor m) throws IOException{ 3684 return new MessageOrderIterator(tx,m,this); 3685 } 3686 3687 public byte lastGetPriority() { 3688 return lastGetPriority; 3689 } 3690 3691 public boolean alreadyDispatched(Long sequence) { 3692 return (cursor.highPriorityCursorPosition > 0 && cursor.highPriorityCursorPosition >= sequence) || 3693 (cursor.defaultCursorPosition > 0 && cursor.defaultCursorPosition >= sequence) || 3694 (cursor.lowPriorityCursorPosition > 0 && cursor.lowPriorityCursorPosition >= sequence); 3695 } 3696 3697 public void trackPendingAdd(Long seq) { 3698 synchronized (pendingAdditions) { 3699 pendingAdditions.add(seq); 3700 } 3701 } 3702 3703 public void trackPendingAddComplete(Long seq) { 3704 synchronized (pendingAdditions) { 3705 pendingAdditions.remove(seq); 3706 } 3707 } 3708 3709 public Long minPendingAdd() { 3710 synchronized (pendingAdditions) { 3711 if (!pendingAdditions.isEmpty()) { 3712 return pendingAdditions.get(0); 3713 } else { 3714 return null; 3715 } 3716 } 3717 } 3718 3719 class MessageOrderIterator implements Iterator<Entry<Long, MessageKeys>>{ 3720 Iterator<Entry<Long, MessageKeys>>currentIterator; 3721 final Iterator<Entry<Long, MessageKeys>>highIterator; 3722 final Iterator<Entry<Long, MessageKeys>>defaultIterator; 3723 final Iterator<Entry<Long, MessageKeys>>lowIterator; 3724 3725 MessageOrderIterator(Transaction tx, MessageOrderCursor m, MessageOrderIndex messageOrderIndex) throws IOException { 3726 Long pendingAddLimiter = messageOrderIndex.minPendingAdd(); 3727 this.defaultIterator = defaultPriorityIndex.iterator(tx, m.defaultCursorPosition, pendingAddLimiter); 3728 if (highPriorityIndex != null) { 3729 this.highIterator = highPriorityIndex.iterator(tx, m.highPriorityCursorPosition, pendingAddLimiter); 3730 } else { 3731 this.highIterator = null; 3732 } 3733 if (lowPriorityIndex != null) { 3734 this.lowIterator = lowPriorityIndex.iterator(tx, m.lowPriorityCursorPosition, pendingAddLimiter); 3735 } else { 3736 this.lowIterator = null; 3737 } 3738 } 3739 3740 @Override 3741 public boolean hasNext() { 3742 if (currentIterator == null) { 3743 if (highIterator != null) { 3744 if (highIterator.hasNext()) { 3745 currentIterator = highIterator; 3746 return currentIterator.hasNext(); 3747 } 3748 if (defaultIterator.hasNext()) { 3749 currentIterator = defaultIterator; 3750 return currentIterator.hasNext(); 3751 } 3752 if (lowIterator.hasNext()) { 3753 currentIterator = lowIterator; 3754 return currentIterator.hasNext(); 3755 } 3756 return false; 3757 } else { 3758 currentIterator = defaultIterator; 3759 return currentIterator.hasNext(); 3760 } 3761 } 3762 if (highIterator != null) { 3763 if (currentIterator.hasNext()) { 3764 return true; 3765 } 3766 if (currentIterator == highIterator) { 3767 if (defaultIterator.hasNext()) { 3768 currentIterator = defaultIterator; 3769 return currentIterator.hasNext(); 3770 } 3771 if (lowIterator.hasNext()) { 3772 currentIterator = lowIterator; 3773 return currentIterator.hasNext(); 3774 } 3775 return false; 3776 } 3777 3778 if (currentIterator == defaultIterator) { 3779 if (lowIterator.hasNext()) { 3780 currentIterator = lowIterator; 3781 return currentIterator.hasNext(); 3782 } 3783 return false; 3784 } 3785 } 3786 return currentIterator.hasNext(); 3787 } 3788 3789 @Override 3790 public Entry<Long, MessageKeys> next() { 3791 Entry<Long, MessageKeys> result = currentIterator.next(); 3792 if (result != null) { 3793 Long key = result.getKey(); 3794 if (highIterator != null) { 3795 if (currentIterator == defaultIterator) { 3796 lastDefaultKey = key; 3797 } else if (currentIterator == highIterator) { 3798 lastHighKey = key; 3799 } else { 3800 lastLowKey = key; 3801 } 3802 } else { 3803 lastDefaultKey = key; 3804 } 3805 } 3806 return result; 3807 } 3808 3809 @Override 3810 public void remove() { 3811 throw new UnsupportedOperationException(); 3812 } 3813 } 3814 } 3815 3816 private static class HashSetStringMarshaller extends VariableMarshaller<HashSet<String>> { 3817 final static HashSetStringMarshaller INSTANCE = new HashSetStringMarshaller(); 3818 3819 @Override 3820 public void writePayload(HashSet<String> object, DataOutput dataOut) throws IOException { 3821 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 3822 ObjectOutputStream oout = new ObjectOutputStream(baos); 3823 oout.writeObject(object); 3824 oout.flush(); 3825 oout.close(); 3826 byte[] data = baos.toByteArray(); 3827 dataOut.writeInt(data.length); 3828 dataOut.write(data); 3829 } 3830 3831 @Override 3832 @SuppressWarnings("unchecked") 3833 public HashSet<String> readPayload(DataInput dataIn) throws IOException { 3834 int dataLen = dataIn.readInt(); 3835 byte[] data = new byte[dataLen]; 3836 dataIn.readFully(data); 3837 ByteArrayInputStream bais = new ByteArrayInputStream(data); 3838 ObjectInputStream oin = new ObjectInputStream(bais); 3839 try { 3840 return (HashSet<String>) oin.readObject(); 3841 } catch (ClassNotFoundException cfe) { 3842 IOException ioe = new IOException("Failed to read HashSet<String>: " + cfe); 3843 ioe.initCause(cfe); 3844 throw ioe; 3845 } 3846 } 3847 } 3848 3849 public File getIndexDirectory() { 3850 return indexDirectory; 3851 } 3852 3853 public void setIndexDirectory(File indexDirectory) { 3854 this.indexDirectory = indexDirectory; 3855 } 3856 3857 interface IndexAware { 3858 public void sequenceAssignedWithIndexLocked(long index); 3859 } 3860 3861 public String getPreallocationScope() { 3862 return preallocationScope; 3863 } 3864 3865 public void setPreallocationScope(String preallocationScope) { 3866 this.preallocationScope = preallocationScope; 3867 } 3868 3869 public String getPreallocationStrategy() { 3870 return preallocationStrategy; 3871 } 3872 3873 public void setPreallocationStrategy(String preallocationStrategy) { 3874 this.preallocationStrategy = preallocationStrategy; 3875 } 3876 3877 public int getCompactAcksAfterNoGC() { 3878 return compactAcksAfterNoGC; 3879 } 3880 3881 /** 3882 * Sets the number of GC cycles where no journal logs were removed before an attempt to 3883 * move forward all the acks in the last log that contains them and is otherwise unreferenced. 3884 * <p> 3885 * A value of -1 will disable this feature. 3886 * 3887 * @param compactAcksAfterNoGC 3888 * Number of empty GC cycles before we rewrite old ACKS. 3889 */ 3890 public void setCompactAcksAfterNoGC(int compactAcksAfterNoGC) { 3891 this.compactAcksAfterNoGC = compactAcksAfterNoGC; 3892 } 3893 3894 /** 3895 * Returns whether Ack compaction will ignore that the store is still growing 3896 * and run more often. 3897 * 3898 * @return the compactAcksIgnoresStoreGrowth current value. 3899 */ 3900 public boolean isCompactAcksIgnoresStoreGrowth() { 3901 return compactAcksIgnoresStoreGrowth; 3902 } 3903 3904 /** 3905 * Configure if Ack compaction will occur regardless of continued growth of the 3906 * journal logs meaning that the store has not run out of space yet. Because the 3907 * compaction operation can be costly this value is defaulted to off and the Ack 3908 * compaction is only done when it seems that the store cannot grow and larger. 3909 * 3910 * @param compactAcksIgnoresStoreGrowth the compactAcksIgnoresStoreGrowth to set 3911 */ 3912 public void setCompactAcksIgnoresStoreGrowth(boolean compactAcksIgnoresStoreGrowth) { 3913 this.compactAcksIgnoresStoreGrowth = compactAcksIgnoresStoreGrowth; 3914 } 3915 3916 /** 3917 * Returns whether Ack compaction is enabled 3918 * 3919 * @return enableAckCompaction 3920 */ 3921 public boolean isEnableAckCompaction() { 3922 return enableAckCompaction; 3923 } 3924 3925 /** 3926 * Configure if the Ack compaction task should be enabled to run 3927 * 3928 * @param enableAckCompaction 3929 */ 3930 public void setEnableAckCompaction(boolean enableAckCompaction) { 3931 this.enableAckCompaction = enableAckCompaction; 3932 } 3933 3934 /** 3935 * @return 3936 */ 3937 public boolean isEnableSubscriptionStatistics() { 3938 return enableSubscriptionStatistics; 3939 } 3940 3941 /** 3942 * Enable caching statistics for each subscription to allow non-blocking 3943 * retrieval of metrics. This could incur some overhead to compute if there are a lot 3944 * of subscriptions. 3945 * 3946 * @param enableSubscriptionStatistics 3947 */ 3948 public void setEnableSubscriptionStatistics(boolean enableSubscriptionStatistics) { 3949 this.enableSubscriptionStatistics = enableSubscriptionStatistics; 3950 } 3951}