Skip to content

Commit

Permalink
fix: fixed concurrency with page flush
Browse files Browse the repository at this point in the history
  • Loading branch information
lvca committed Dec 10, 2024
1 parent e7e5c87 commit 412a45d
Showing 1 changed file with 44 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.arcadedb.ContextConfiguration;
import com.arcadedb.GlobalConfiguration;
import com.arcadedb.database.BasicDatabase;
import com.arcadedb.database.Database;
import com.arcadedb.database.DatabaseInternal;
import com.arcadedb.exception.DatabaseMetadataException;
Expand All @@ -35,13 +36,23 @@
* Flushes pages to disk asynchronously.
*/
public class PageManagerFlushThread extends Thread {
private final PageManager pageManager;
public final ArrayBlockingQueue<List<MutablePage>> queue;
private final String logContext;
private volatile boolean running = true;
private final ConcurrentHashMap<Database, Boolean> suspended = new ConcurrentHashMap<>(); // USED DURING BACKUP
private final static List<MutablePage> SHUTDOWN_THREAD = new ArrayList<>();
private final AtomicReference<List<MutablePage>> nextPagesToFlush = new AtomicReference<>();
private final PageManager pageManager;
public final ArrayBlockingQueue<PagesToFlush> queue;
private final String logContext;
private volatile boolean running = true;
private final ConcurrentHashMap<Database, Boolean> suspended = new ConcurrentHashMap<>(); // USED DURING BACKUP
private final static PagesToFlush SHUTDOWN_THREAD = new PagesToFlush(null);
private final AtomicReference<PagesToFlush> nextPagesToFlush = new AtomicReference<>();

public static class PagesToFlush {
public final BasicDatabase database;
public final List<MutablePage> pages;

public PagesToFlush(final List<MutablePage> pages) {
this.pages = pages;
this.database = pages == null || pages.isEmpty() ? null : pages.get(0).pageId.getDatabase();
}
}

public PageManagerFlushThread(final PageManager pageManager, final ContextConfiguration configuration) {
super("ArcadeDB AsyncFlush");
Expand All @@ -58,7 +69,7 @@ public void scheduleFlushOfPages(final List<MutablePage> pages) throws Interrupt

// TRY TO INSERT THE PAGE IN THE QUEUE UNTIL THE THREAD IS STILL RUNNING
while (running) {
if (queue.offer(pages, 1, TimeUnit.SECONDS))
if (queue.offer(new PagesToFlush(pages), 1, TimeUnit.SECONDS))
return;
}

Expand Down Expand Up @@ -92,22 +103,22 @@ public void run() {
*/
protected void flushAllPagesOfDatabase(final Database database) {
// FLUSH PENDING PAGES FROM THREAD
final List<MutablePage> pending = nextPagesToFlush.get();
final PagesToFlush pending = nextPagesToFlush.get();
if (pending != null)
flushPagesOfDatabase(database, pending);

if (queue.isEmpty())
return;

for (final List<MutablePage> pages : queue.stream().toList())
for (final PagesToFlush pages : queue.stream().toList())
flushPagesOfDatabase(database, pages);
}

private void flushPagesOfDatabase(Database database, List<MutablePage> pages) {
if (!pages.isEmpty())
synchronized (pages) {
if (pages.get(0).getPageId().getDatabase().equals(database))
for (final Iterator<MutablePage> it = pages.iterator(); it.hasNext(); ) {
private void flushPagesOfDatabase(final Database database, final PagesToFlush pagesToFlush) {
if (pagesToFlush.database.equals(database))
if (!pagesToFlush.pages.isEmpty())
synchronized (pagesToFlush.pages) {
for (final Iterator<MutablePage> it = pagesToFlush.pages.iterator(); it.hasNext(); ) {
final MutablePage page = it.next();
try {
pageManager.flushPage(page);
Expand All @@ -116,26 +127,25 @@ private void flushPagesOfDatabase(Database database, List<MutablePage> pages) {
LogManager.instance().log(this, Level.WARNING, "Error on flushing page '%s' to disk", e, page);
}
}
}
}
}

protected void flushPagesFromQueueToDisk(final Database database, final long timeout) throws InterruptedException, IOException {
final List<MutablePage> pages = queue.poll(timeout, TimeUnit.MILLISECONDS);
final PagesToFlush pagesToFlush = queue.poll(timeout, TimeUnit.MILLISECONDS);

if (pages != null) {
if (pages == SHUTDOWN_THREAD)
if (pagesToFlush != null) {
if (pagesToFlush == SHUTDOWN_THREAD)
// SPECIAL CONTENT FOR SHUTDOWN
running = false;
else if (!pages.isEmpty()) {
final PageId firstPage = pages.get(0).pageId;
if (database == null || firstPage.getDatabase().equals(database)) {
else if (!pagesToFlush.pages.isEmpty()) {
if (database == null || pagesToFlush.database.equals(database)) {
// SET THE PAGES TO FLUSH TO BE RETRIEVED BY A CONCURRENT DB CLOSE = FORCE FLUSH OF PAGES
nextPagesToFlush.set(pages);
nextPagesToFlush.set(pagesToFlush);
try {
// EXECUTE THE FLUSH IN A DB READ LOCK TO PREVENT CONCURRENT CLOSING
((DatabaseInternal) firstPage.getDatabase()).executeInReadLock(() -> {
synchronized (pages) {
for (final MutablePage page : pages) {
((DatabaseInternal) pagesToFlush.database).executeInReadLock(() -> {
synchronized (pagesToFlush.pages) {
for (final MutablePage page : pagesToFlush.pages) {
try {
pageManager.flushPage(page);
} catch (final DatabaseMetadataException e) {
Expand Down Expand Up @@ -175,11 +185,11 @@ public void closeAndJoin() throws InterruptedException {
public CachedPage getCachedPageFromMutablePageInQueue(final PageId pageId) {
final Object[] content = queue.toArray();
for (int i = 0; i < content.length; i++) {
final List<MutablePage> pages = (List<MutablePage>) content[i];
if (pages != null) {
synchronized (pages) {
for (int j = 0; j < pages.size(); j++) {
final MutablePage page = pages.get(j);
final PagesToFlush pagesToFlush = (PagesToFlush) content[i];
if (pagesToFlush != null) {
synchronized (pagesToFlush.pages) {
for (int j = 0; j < pagesToFlush.pages.size(); j++) {
final MutablePage page = pagesToFlush.pages.get(j);
if (page.getPageId().equals(pageId))
return new CachedPage(page, true);
}
Expand All @@ -190,9 +200,9 @@ public CachedPage getCachedPageFromMutablePageInQueue(final PageId pageId) {
}

public void removeAllPagesOfDatabase(final Database database) {
for (final List<MutablePage> pages : queue.stream().toList())
synchronized (pages) {
pages.removeIf(page -> page.getPageId().getDatabase().equals(database));
for (final PagesToFlush pagesToFlush : queue.stream().toList())
synchronized (pagesToFlush.pages) {
pagesToFlush.pages.removeIf(page -> page.getPageId().getDatabase().equals(database));
}
}
}

0 comments on commit 412a45d

Please sign in to comment.