Skip to content

Commit

Permalink
Centralized page manager (#1853)
Browse files Browse the repository at this point in the history
* feat: 1st step adding the database in PageId to use a centralized PageManager instead of one per database

* feat: refactor of PageManager to be centralized (1 per JVM)

* feat: forced page flush at database closing with new PageManager architeture

* fix: fixed page manager with new architecture

* feat: fixed all concurrent issues on flush thread and a concurrent database close

The solution was to park the list of pages from the queue into an atomic reference, to be used and flushed by the concurrent database close
  • Loading branch information
lvca authored Dec 6, 2024
1 parent 277c558 commit df74de0
Show file tree
Hide file tree
Showing 41 changed files with 502 additions and 354 deletions.
3 changes: 3 additions & 0 deletions engine/src/main/java/com/arcadedb/GlobalConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package com.arcadedb;

import com.arcadedb.engine.PageManager;
import com.arcadedb.exception.ConfigurationException;
import com.arcadedb.log.LogManager;
import com.arcadedb.serializer.BinaryComparator;
Expand Down Expand Up @@ -113,6 +114,8 @@ public Object call(final Object value) {
ASYNC_OPERATIONS_QUEUE_IMPL.setValue("standard");
SERVER_HTTP_IO_THREADS.setValue(cores > 8 ? 4 : 2);

PageManager.INSTANCE.configure();

} else if (v.equalsIgnoreCase("low-cpu")) {
ASYNC_WORKER_THREADS.setValue(1);
ASYNC_OPERATIONS_QUEUE_IMPL.setValue("standard");
Expand Down
10 changes: 2 additions & 8 deletions engine/src/main/java/com/arcadedb/Profiler.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ public synchronized JSONObject toJSON() {
final JSONObject json = new JSONObject();

long readCacheUsed = 0;
long writeCacheUsed = 0;
long cacheMax = 0;
long pagesRead = 0;
long pagesWritten = 0;
Expand Down Expand Up @@ -109,7 +108,6 @@ public synchronized JSONObject toJSON() {

final PageManager.PPageManagerStats pStats = db.getPageManager().getStats();
readCacheUsed += pStats.readCacheRAM;
writeCacheUsed += pStats.writeCacheRAM;
cacheMax += pStats.maxRAM;
pagesRead += pStats.pagesRead;
pagesReadSize += pStats.pagesReadSize;
Expand Down Expand Up @@ -138,7 +136,6 @@ public synchronized JSONObject toJSON() {
}

json.put("readCacheUsed", new JSONObject().put("space", readCacheUsed));
json.put("writeCacheUsed", new JSONObject().put("space", writeCacheUsed));
json.put("cacheMax", new JSONObject().put("space", cacheMax));
json.put("pagesRead", new JSONObject().put("count", pagesRead));
json.put("pagesWritten", new JSONObject().put("count", pagesWritten));
Expand Down Expand Up @@ -243,7 +240,6 @@ public synchronized void dumpMetrics(final PrintStream out) {
final long totalSpaceInMB = new File(".").getTotalSpace();

long readCacheUsed = 0;
long writeCacheUsed = 0;
long cacheMax = 0;
long pagesRead = 0;
long pagesWritten = 0;
Expand Down Expand Up @@ -278,7 +274,6 @@ public synchronized void dumpMetrics(final PrintStream out) {
long evictionRuns = 0;
long pagesEvicted = 0;
int readCachePages = 0;
int writeCachePages = 0;
long indexCompactions = 0;

try {
Expand All @@ -302,7 +297,6 @@ public synchronized void dumpMetrics(final PrintStream out) {

final PageManager.PPageManagerStats pStats = db.getPageManager().getStats();
readCacheUsed += pStats.readCacheRAM;
writeCacheUsed += pStats.writeCacheRAM;
cacheMax += pStats.maxRAM;
pagesRead += pStats.pagesRead;
pagesReadSize += pStats.pagesReadSize;
Expand Down Expand Up @@ -361,8 +355,8 @@ public synchronized void dumpMetrics(final PrintStream out) {
String.format("%n JVM heap=%s/%s gc=%dms", FileUtils.getSizeAsString(runtime.totalMemory() - runtime.freeMemory()),
FileUtils.getSizeAsString(runtime.maxMemory()), gcTime));

buffer.append(String.format("%n PAGE-CACHE read=%s (pages=%d) write=%s (pages=%d) max=%s readOps=%d (%s) writeOps=%d (%s)",
FileUtils.getSizeAsString(readCacheUsed), readCachePages, FileUtils.getSizeAsString(writeCacheUsed), writeCachePages,
buffer.append(String.format("%n PAGE-CACHE read=%s (pages=%d) max=%s readOps=%d (%s) writeOps=%d (%s)",
FileUtils.getSizeAsString(readCacheUsed), readCachePages,
FileUtils.getSizeAsString(cacheMax), pagesRead, FileUtils.getSizeAsString(pagesReadSize), pagesWritten,
FileUtils.getSizeAsString(pagesWrittenSize)));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,33 +86,34 @@ public void compareBuckets(final DatabaseInternal db1, final DatabaseInternal db
// AT THIS POINT BOTH BUCKETS HAVE THE SAME PAGES
final int pageSize = bucket1.getPageSize();
for (int i = 0; i < bucket1.getTotalPages(); ++i) {
final PageId pageId = new PageId(bucket1.getFileId(), i);
final PageId pageId1 = new PageId(bucket1.getDatabase(), bucket1.getFileId(), i);
final PageId pageId2 = new PageId(bucket2.getDatabase(), bucket2.getFileId(), i);

final ImmutablePage page1;
final ImmutablePage page2;

try {
page1 = db1.getPageManager().getImmutablePage(pageId, pageSize, false, true);
page1 = db1.getPageManager().getImmutablePage(pageId1, pageSize, false, true);
} catch (final IOException e) {
throw new DatabaseAreNotIdentical("Error on reading page %s from bucket '%s' DB1 (cause=%s)", pageId, bucket1.getName(),
throw new DatabaseAreNotIdentical("Error on reading page %s from bucket '%s' DB1 (cause=%s)", pageId1, bucket1.getName(),
e.toString());
}

try {
page2 = db2.getPageManager().getImmutablePage(pageId, pageSize, false, true);
page2 = db2.getPageManager().getImmutablePage(pageId2, pageSize, false, true);
} catch (final IOException e) {
throw new DatabaseAreNotIdentical("Error on reading page %s from bucket '%s' DB2 (cause=%s)", pageId, bucket2.getName(),
throw new DatabaseAreNotIdentical("Error on reading page %s from bucket '%s' DB2 (cause=%s)", pageId2, bucket2.getName(),
e.toString());
}

final boolean sameContent = Arrays.equals(page1.getContent().array(), page2.getContent().array());

if (page1.getVersion() != page2.getVersion())
throw new DatabaseAreNotIdentical("Page %s has different versions on databases. DB1 %d <> DB2 %d (sameContent=%s)",
pageId, page1.getVersion(), page2.getVersion(), sameContent);
pageId1, page1.getVersion(), page2.getVersion(), sameContent);

if (!sameContent)
throw new DatabaseAreNotIdentical("Page %s has different content on databases", pageId);
throw new DatabaseAreNotIdentical("Page %s has different content on databases", pageId1);

db2.getPageManager().removePageFromCache(page2.getPageId());
}
Expand Down
14 changes: 11 additions & 3 deletions engine/src/main/java/com/arcadedb/database/DatabaseFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.arcadedb.ContextConfiguration;
import com.arcadedb.engine.ComponentFile;
import com.arcadedb.engine.PageManager;
import com.arcadedb.exception.DatabaseOperationException;
import com.arcadedb.schema.LocalSchema;
import com.arcadedb.security.SecurityManager;
Expand Down Expand Up @@ -73,6 +74,9 @@ public Database open() {
public synchronized Database open(final ComponentFile.MODE mode) {
checkForActiveInstance(databasePath);

if (ACTIVE_INSTANCES.isEmpty())
PageManager.INSTANCE.configure();

final LocalDatabase database = new LocalDatabase(databasePath, mode, contextConfiguration, security, callbacks);
database.setAutoTransaction(autoTransaction);
database.open();
Expand All @@ -85,7 +89,11 @@ public synchronized Database open(final ComponentFile.MODE mode) {
public synchronized Database create() {
checkForActiveInstance(databasePath);

final LocalDatabase database = new LocalDatabase(databasePath, ComponentFile.MODE.READ_WRITE, contextConfiguration, security, callbacks);
if (ACTIVE_INSTANCES.isEmpty())
PageManager.INSTANCE.configure();

final LocalDatabase database = new LocalDatabase(databasePath, ComponentFile.MODE.READ_WRITE, contextConfiguration, security,
callbacks);
database.setAutoTransaction(autoTransaction);
database.create();

Expand Down Expand Up @@ -128,15 +136,15 @@ private static Path getNormalizedPath(final String path) {
return Paths.get(path).toAbsolutePath().normalize();
}


public static Database getActiveDatabaseInstance(final String databasePath) {
var normalizedPath = getNormalizedPath(databasePath);
return ACTIVE_INSTANCES.get(normalizedPath);
}

protected static void removeActiveDatabaseInstance(final String databasePath) {
protected static boolean removeActiveDatabaseInstance(final String databasePath) {
var normalizedPath = getNormalizedPath(databasePath);
ACTIVE_INSTANCES.remove(normalizedPath);
return ACTIVE_INSTANCES.isEmpty();
}

public static Collection<Database> getActiveDatabaseInstances() {
Expand Down
27 changes: 16 additions & 11 deletions engine/src/main/java/com/arcadedb/database/LocalDatabase.java
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ public class LocalDatabase extends RWLockContext implements DatabaseInternal {
protected final QueryEngineManager queryEngineManager;
protected final DatabaseStats stats = new DatabaseStats();
protected FileManager fileManager;
protected PageManager pageManager;
protected LocalSchema schema;
protected TransactionManager transactionManager;
protected volatile DatabaseAsyncExecutorImpl async = null;
Expand Down Expand Up @@ -224,7 +223,7 @@ public void drop() {
if (mode == ComponentFile.MODE.READ_ONLY)
throw new DatabaseIsReadOnlyException("Cannot drop database");

internalClose(true);
closeInternal(true);

executeInWriteLock(() -> {
FileUtils.deleteRecursively(new File(databasePath));
Expand All @@ -234,7 +233,7 @@ public void drop() {

@Override
public void close() {
internalClose(false);
closeInternal(false);
}

/**
Expand All @@ -251,7 +250,7 @@ public void kill() {

try {
schema.close();
pageManager.kill();
PageManager.INSTANCE.simulateKillOfDatabase(this);
fileManager.close();
transactionManager.kill();

Expand Down Expand Up @@ -1124,7 +1123,7 @@ public BinarySerializer getSerializer() {
@Override
public PageManager getPageManager() {
checkDatabaseIsOpen();
return pageManager;
return PageManager.INSTANCE;
}

@Override
Expand Down Expand Up @@ -1649,7 +1648,7 @@ private void checkDatabaseName() {
throw new IllegalArgumentException("Invalid characters used in database name '" + name + "'");
}

private void internalClose(final boolean drop) {
private void closeInternal(final boolean drop) {
if (async != null) {
try {
// EXECUTE OUTSIDE LOCK
Expand All @@ -1674,8 +1673,15 @@ private void internalClose(final boolean drop) {
name);
}

if (drop)
PageManager.INSTANCE.removeModifiedPagesOfDatabase(this);
else
PageManager.INSTANCE.flushModifiedPagesOfDatabase(this);

open = false;

PageManager.INSTANCE.removeAllReadPagesOfDatabase(this);

try {
final List<DatabaseContext.DatabaseContextTL> dbContexts = DatabaseContext.INSTANCE.removeAllContexts(databasePath);
for (DatabaseContext.DatabaseContextTL dbContext : dbContexts) {
Expand All @@ -1700,7 +1706,6 @@ private void internalClose(final boolean drop) {

try {
schema.close();
pageManager.close();
fileManager.close();
transactionManager.close(drop);
statementCache.clear();
Expand Down Expand Up @@ -1738,7 +1743,8 @@ private void internalClose(final boolean drop) {
return null;
});

DatabaseFactory.removeActiveDatabaseInstance(databasePath);
if (DatabaseFactory.removeActiveDatabaseInstance(databasePath))
PageManager.INSTANCE.close();
}

private void checkForRecovery() throws IOException {
Expand Down Expand Up @@ -1776,7 +1782,6 @@ private void openInternal() {

fileManager = new FileManager(databasePath, mode, SUPPORTED_FILE_EXT);
transactionManager = new TransactionManager(wrappedDatabaseInstance);
pageManager = new PageManager(fileManager, transactionManager, configuration, name);

open = true;

Expand All @@ -1801,11 +1806,11 @@ private void openInternal() {

} catch (final RuntimeException e) {
open = false;
pageManager.close();
PageManager.INSTANCE.removeAllReadPagesOfDatabase(this);
throw e;
} catch (final Exception e) {
open = false;
pageManager.close();
PageManager.INSTANCE.removeAllReadPagesOfDatabase(this);
throw new DatabaseOperationException("Error on creating new database instance", e);
}
} catch (final Exception e) {
Expand Down
2 changes: 1 addition & 1 deletion engine/src/main/java/com/arcadedb/database/RID.java
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ public boolean isValid() {
}

public PageId getPageId() {
return new PageId(bucketId,
return new PageId(database, bucketId,
(int) (getPosition() / ((LocalBucket) database.getSchema().getBucketById(bucketId)).getMaxRecordsInPage()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ public MutablePage addPage(final PageId pageId, final int pageSize) {
assureIsActive();

// CREATE A PAGE ID BASED ON NEW PAGES IN TX. IN CASE OF ROLLBACK THEY ARE SIMPLY REMOVED AND THE GLOBAL PAGE COUNT IS UNCHANGED
final MutablePage page = new MutablePage(database.getPageManager(), pageId, pageSize);
final MutablePage page = new MutablePage(pageId, pageSize);
newPages.put(pageId, page);

final Integer indexCounter = newPageCounters.get(pageId.getFileId());
Expand Down Expand Up @@ -492,7 +492,7 @@ public void commitFromReplica(final WALFile.WALTransaction buffer,
final PaginatedComponentFile file = (PaginatedComponentFile) database.getFileManager().getFile(p.fileId);
final int pageSize = file.getPageSize();

final PageId pageId = new PageId(p.fileId, p.pageNumber);
final PageId pageId = new PageId(database, p.fileId, p.pageNumber);

final boolean isNew = p.pageNumber >= file.getTotalPages();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,10 @@ private AsyncThread(final DatabaseInternal database, final int id) {
super("AsyncExecutor-" + database.getName() + "-" + id);
this.database = database;

final int queueSize =
int queueSize =
database.getConfiguration().getValueAsInteger(GlobalConfiguration.ASYNC_OPERATIONS_QUEUE_SIZE) / parallelLevel;
if (queueSize < 1)
queueSize = 1;

final String cfgQueueImpl = database.getConfiguration().getValueAsString(GlobalConfiguration.ASYNC_OPERATIONS_QUEUE_IMPL);
if ("fast".equalsIgnoreCase(cfgQueueImpl))
Expand Down
6 changes: 2 additions & 4 deletions engine/src/main/java/com/arcadedb/engine/BasePage.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,13 @@ public abstract class BasePage {
protected static final int PAGE_CONTENTSIZE_OFFSET = Binary.INT_SERIALIZED_SIZE;
public static final int PAGE_HEADER_SIZE = Binary.INT_SERIALIZED_SIZE + Binary.INT_SERIALIZED_SIZE;

protected final PageManager manager;

protected final PageId pageId;
protected Binary content;
protected final int size;
protected int version;

protected BasePage(final PageManager manager, final PageId pageId, final int size, final byte[] buffer, final int version, final int contentSize) {
this.manager = manager;
protected BasePage(final PageId pageId, final int size, final byte[] buffer, final int version,
final int contentSize) {
this.pageId = pageId;
this.size = size;
this.content = new Binary(buffer, contentSize);
Expand Down
11 changes: 6 additions & 5 deletions engine/src/main/java/com/arcadedb/engine/BucketIterator.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@

public class BucketIterator implements Iterator<Record> {
private final static int PREFETCH_SIZE = 1_024;
private final DatabaseInternal database;
private final LocalBucket bucket;
final Record[] nextBatch = new Record[PREFETCH_SIZE];
private final DatabaseInternal database;
private final LocalBucket bucket;
final Record[] nextBatch = new Record[PREFETCH_SIZE];
private int prefetchIndex = 0;
final long limit;
int nextPageNumber = 0;
Expand Down Expand Up @@ -67,7 +67,7 @@ public void setPosition(final RID position) throws IOException {
nextBatch[prefetchIndex] = position.getRecord();
nextPageNumber = (int) (position.getPosition() / bucket.getMaxRecordsInPage());
currentRecordInPage = (int) (position.getPosition() % bucket.getMaxRecordsInPage()) + 1;
currentPage = database.getTransaction().getPage(new PageId(position.getBucketId(), nextPageNumber), bucket.pageSize);
currentPage = database.getTransaction().getPage(new PageId(database, position.getBucketId(), nextPageNumber), bucket.pageSize);
recordCountInCurrentPage = currentPage.readShort(LocalBucket.PAGE_RECORD_COUNT_IN_PAGE_OFFSET);
}

Expand Down Expand Up @@ -104,7 +104,8 @@ private void fetchNext() {
if (nextPageNumber > totalPages) {
return null;
}
currentPage = database.getTransaction().getPage(new PageId(bucket.file.getFileId(), nextPageNumber), bucket.pageSize);
currentPage = database.getTransaction()
.getPage(new PageId(database, bucket.file.getFileId(), nextPageNumber), bucket.pageSize);
recordCountInCurrentPage = currentPage.readShort(LocalBucket.PAGE_RECORD_COUNT_IN_PAGE_OFFSET);
}

Expand Down
Loading

0 comments on commit df74de0

Please sign in to comment.