Skip to content

Commit

Permalink
perf: supported inverse iteration on the bucket
Browse files Browse the repository at this point in the history
This was a leftover from the beginning of ArcadeDB: the bucket iterator didn't support browsing in reverse order. With ArcadeBrain and a specific query (select from Chat order by @Rid desc) was getting very slow, so I dug into it and found out the SQL engine was sorting the records instead of using an inverse iterator at the source.

Issue #1854
  • Loading branch information
lvca committed Dec 6, 2024
1 parent df74de0 commit 0809e36
Show file tree
Hide file tree
Showing 8 changed files with 143 additions and 52 deletions.
2 changes: 2 additions & 0 deletions engine/src/main/java/com/arcadedb/engine/Bucket.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ public interface Bucket {

Iterator<Record> iterator();

Iterator<Record> inverseIterator();

long count();

int getFileId();
Expand Down
64 changes: 49 additions & 15 deletions engine/src/main/java/com/arcadedb/engine/BucketIterator.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package com.arcadedb.engine;

import com.arcadedb.database.Binary;
import com.arcadedb.database.Database;
import com.arcadedb.database.DatabaseInternal;
import com.arcadedb.database.RID;
import com.arcadedb.database.Record;
Expand All @@ -39,26 +38,38 @@ public class BucketIterator implements Iterator<Record> {
final Record[] nextBatch = new Record[PREFETCH_SIZE];
private int prefetchIndex = 0;
final long limit;
int nextPageNumber = 0;
BasePage currentPage = null;
int nextPageNumber;
BasePage currentPage = null;
short recordCountInCurrentPage;
int totalPages;
int currentRecordInPage = 0;
long browsed = 0;
private int writeIndex = 0;
int currentRecordInPage;
long browsed = 0;
private int writeIndex = 0;
private boolean forwardDirection = true;

BucketIterator(final LocalBucket bucket, final Database db) {
((DatabaseInternal) db).checkPermissionsOnFile(bucket.fileId, SecurityDatabaseUser.ACCESS.READ_RECORD);
BucketIterator(final LocalBucket bucket, final boolean forwardDirection) {
final DatabaseInternal db = bucket.getDatabase();
db.checkPermissionsOnFile(bucket.fileId, SecurityDatabaseUser.ACCESS.READ_RECORD);

this.database = (DatabaseInternal) db;
this.database = db;
this.bucket = bucket;
this.forwardDirection = forwardDirection;
this.totalPages = bucket.pageCount.get();

final Integer txPageCounter = database.getTransaction().getPageCounter(bucket.fileId);
if (txPageCounter != null && txPageCounter > totalPages)
this.totalPages = txPageCounter;

limit = database.getResultSetLimit();

if (forwardDirection) {
currentRecordInPage = 0;
nextPageNumber = 0;
} else {
nextPageNumber = this.totalPages - 1;
currentRecordInPage = Integer.MAX_VALUE;
}

fetchNext();
}

Expand Down Expand Up @@ -101,15 +112,28 @@ private void fetchNext() {

for (writeIndex = 0; writeIndex < nextBatch.length; ) {
if (currentPage == null) {
if (nextPageNumber > totalPages) {
return null;
if (forwardDirection) {
// MOVE FORWARD
if (nextPageNumber > totalPages)
return null;
} else {
// MOVE BACKWARDS
if (nextPageNumber < 0)
return null;
}

currentPage = database.getTransaction()
.getPage(new PageId(database, bucket.file.getFileId(), nextPageNumber), bucket.pageSize);
recordCountInCurrentPage = currentPage.readShort(LocalBucket.PAGE_RECORD_COUNT_IN_PAGE_OFFSET);

if (!forwardDirection && currentRecordInPage == Integer.MAX_VALUE)
currentRecordInPage = recordCountInCurrentPage - 1;
}

if (recordCountInCurrentPage > 0 && currentRecordInPage < recordCountInCurrentPage) {
if (recordCountInCurrentPage > 0 &&
(forwardDirection && currentRecordInPage < recordCountInCurrentPage) ||
(!forwardDirection && currentRecordInPage > -1)
) {
try {
final int recordPositionInPage = (int) currentPage.readUnsignedInt(
LocalBucket.PAGE_RECORD_TABLE_OFFSET + currentRecordInPage * INT_SERIALIZED_SIZE);
Expand Down Expand Up @@ -147,15 +171,25 @@ private void fetchNext() {
(nextPageNumber * bucket.getMaxRecordsInPage()) + currentRecordInPage, e.getMessage());
LogManager.instance().log(this, Level.SEVERE, msg);
} finally {
currentRecordInPage++;
if (forwardDirection)
currentRecordInPage++;
else
currentRecordInPage--;
}

} else if (currentRecordInPage == recordCountInCurrentPage) {
} else if (forwardDirection && currentRecordInPage == recordCountInCurrentPage) {
currentRecordInPage = 0;
currentPage = null;
nextPageNumber++;
} else if (!forwardDirection && currentRecordInPage < 0) {
currentRecordInPage = Integer.MAX_VALUE;
currentPage = null;
nextPageNumber--;
} else {
currentRecordInPage++;
if (forwardDirection)
currentRecordInPage++;
else
currentRecordInPage--;
}
}
return null;
Expand Down
8 changes: 7 additions & 1 deletion engine/src/main/java/com/arcadedb/engine/LocalBucket.java
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,13 @@ public void fetchPageInTransaction(final RID rid) throws IOException {
@Override
public Iterator<Record> iterator() {
database.checkPermissionsOnFile(fileId, SecurityDatabaseUser.ACCESS.READ_RECORD);
return new BucketIterator(this, database);
return new BucketIterator(this, true);
}

@Override
public Iterator<Record> inverseIterator() {
database.checkPermissionsOnFile(fileId, SecurityDatabaseUser.ACCESS.READ_RECORD);
return new BucketIterator(this, false);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,15 @@ public class FetchFromClusterExecutionStep extends AbstractExecutionStep {
private Iterator<Record> iterator;

public FetchFromClusterExecutionStep(final int bucketId, final CommandContext context) {
this(bucketId, null, context);
this(bucketId, null, null, context);
}

public FetchFromClusterExecutionStep(final int bucketId, final QueryPlanningInfo queryPlanning, final CommandContext context) {
public FetchFromClusterExecutionStep(final int bucketId, final QueryPlanningInfo queryPlanning, final Object order,
final CommandContext context) {
super(context);
this.bucketId = bucketId;
this.queryPlanning = queryPlanning;
this.order = order;
}

@Override
Expand All @@ -53,16 +55,16 @@ public ResultSet syncPull(final CommandContext context, final int nRecords) thro
final long begin = context.isProfiling() ? System.nanoTime() : 0;
try {
if (iterator == null) {
iterator = context.getDatabase().getSchema().getBucketById(bucketId).iterator();
if (order == ORDER_DESC)
iterator = context.getDatabase().getSchema().getBucketById(bucketId).inverseIterator();
else
iterator = context.getDatabase().getSchema().getBucketById(bucketId).iterator();

//TODO check how to support ranges and DESC
//TODO check how to support ranges
// long minClusterPosition = calculateMinClusterPosition();
// long maxClusterPosition = calculateMaxClusterPosition();
// new ORecordIteratorCluster((ODatabaseDocumentInternal) context.getDatabase(),
// (ODatabaseDocumentInternal) context.getDatabase(), bucketId, minClusterPosition, maxClusterPosition);
// if (ORDER_DESC == order) {
// iterator.last();
// }
}
return new ResultSet() {
int nFetched = 0;
Expand All @@ -81,7 +83,7 @@ public boolean hasNext() {
return iterator.hasNext();
// }
} finally {
if( context.isProfiling() ) {
if (context.isProfiling()) {
cost += (System.nanoTime() - begin1);
}
}
Expand Down Expand Up @@ -116,14 +118,14 @@ record = iterator.next();
return result;

} finally {
if( context.isProfiling() ) {
if (context.isProfiling()) {
cost += (System.nanoTime() - begin1);
}
}
}
};
} finally {
if( context.isProfiling() ) {
if (context.isProfiling()) {
cost += (System.nanoTime() - begin);
}
}
Expand Down Expand Up @@ -192,9 +194,10 @@ record = iterator.next();
@Override
public String prettyPrint(final int depth, final int indent) {
String result =
ExecutionStepInternal.getIndent(depth, indent) + "+ FETCH FROM BUCKET " + bucketId + " (" + context.getDatabase().getSchema().getBucketById(bucketId)
ExecutionStepInternal.getIndent(depth, indent) + "+ FETCH FROM BUCKET " + bucketId + " (" + context.getDatabase()
.getSchema().getBucketById(bucketId)
.getName() + ") " + (ORDER_DESC.equals(order) ? "DESC" : "ASC" + " = " + totalFetched + " RECORDS");
if ( context.isProfiling() )
if (context.isProfiling())
result += " (" + getCostFormatted() + ")";
return result;
}
Expand All @@ -210,6 +213,7 @@ public boolean canBeCached() {

@Override
public ExecutionStep copy(final CommandContext context) {
return new FetchFromClusterExecutionStep(this.bucketId, this.queryPlanning == null ? null : this.queryPlanning.copy(), context);
return new FetchFromClusterExecutionStep(this.bucketId, this.queryPlanning == null ? null : this.queryPlanning.copy(),
this.order, context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ else if (Boolean.FALSE.equals(ridOrder))
sortBuckets(bucketIds);
for (final int bucketId : bucketIds) {
if (bucketId > 0) {
final FetchFromClusterExecutionStep step = new FetchFromClusterExecutionStep(bucketId, planningInfo, context);
final FetchFromClusterExecutionStep step = new FetchFromClusterExecutionStep(bucketId, planningInfo, null, context);
if (orderByRidAsc)
step.setOrder(FetchFromClusterExecutionStep.ORDER_ASC);
else if (orderByRidDesc)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1273,13 +1273,12 @@ private void handleClassAsTarget(final SelectExecutionPlan plan, final Set<Strin
return;
}

Boolean orderByRidAsc = null;//null: no order. true: asc, false:desc
if (isOrderByRidAsc(info)) {
Boolean orderByRidAsc = null; // null: no order. true: asc, false:desc
if (isOrderByRidAsc(info))
orderByRidAsc = true;
}
// else if (isOrderByRidDesc(info)) {
// orderByRidAsc = false;
// }
else if (isOrderByRidDesc(info))
orderByRidAsc = false;

final FetchFromTypeExecutionStep fetcher = new FetchFromTypeExecutionStep(identifier.getStringValue(), filterClusters, info,
context, orderByRidAsc);
if (orderByRidAsc != null)
Expand Down Expand Up @@ -2255,8 +2254,9 @@ private void handleClustersAsTarget(final SelectExecutionPlan plan, final QueryP
Boolean orderByRidAsc = null;//null: no order. true: asc, false:desc
if (isOrderByRidAsc(info))
orderByRidAsc = true;
// else if (isOrderByRidDesc(info))
// orderByRidAsc = false;
else if (isOrderByRidDesc(info))
orderByRidAsc = false;

if (orderByRidAsc != null)
info.orderApplied = true;

Expand Down Expand Up @@ -2310,21 +2310,18 @@ private void handleSubqueryAsTarget(final SelectExecutionPlan plan, final Statem
}

private boolean isOrderByRidDesc(final QueryPlanningInfo info) {
if (!hasTargetWithSortedRids(info))
return false;

if (info.orderBy == null)
return false;

if (info.orderBy.getItems().size() == 1) {
OrderByItem item = info.orderBy.getItems().get(0);
String recordAttr = item.getRecordAttr();
return recordAttr != null && recordAttr.equalsIgnoreCase("@rid") && OrderByItem.DESC.equals(item.getType());
}
return false;
//TODO buckets do not support reverse iteration, so order by rid desc cannot be optimised!
// if (!hasTargetWithSortedRids(info)) {
// return false;
// }
//
// if (info.orderBy == null) {
// return false;
// }
// if (info.orderBy.getItems().size() == 1) {
// OrderByItem item = info.orderBy.getItems().get(0);
// String recordAttr = item.getRecordAttr();
// return recordAttr != null && recordAttr.equalsIgnoreCase("@rid") && OrderByItem.DESC.equals(item.getType());
// }
// return false;
}

private boolean isOrderByRidAsc(final QueryPlanningInfo info) {
Expand Down
5 changes: 5 additions & 0 deletions network/src/main/java/com/arcadedb/remote/RemoteBucket.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ public Iterator<Record> iterator() {
throw new UnsupportedOperationException();
}

@Override
public Iterator<Record> inverseIterator() {
throw new UnsupportedOperationException();
}

@Override
public long count() {
throw new UnsupportedOperationException();
Expand Down
43 changes: 43 additions & 0 deletions server/src/test/java/com/arcadedb/server/SelectOrderTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import com.arcadedb.database.DatabaseFactory;
import com.arcadedb.database.RID;
import com.arcadedb.engine.Bucket;
import com.arcadedb.graph.MutableVertex;
import com.arcadedb.graph.Vertex;
import com.arcadedb.integration.misc.IntegrationUtils;
import com.arcadedb.query.sql.executor.Result;
import com.arcadedb.query.sql.executor.ResultSet;
Expand Down Expand Up @@ -282,6 +284,47 @@ public void testLocalDateTimeOrderBy() {
}
}

@Test
public void testRIDOrderingDesc() {
final ContextConfiguration serverConfiguration = new ContextConfiguration();
final String rootPath = IntegrationUtils.setRootPath(serverConfiguration);

try (DatabaseFactory databaseFactory = new DatabaseFactory(rootPath + "/databases/" + DATABASE_NAME)) {
if (databaseFactory.exists())
databaseFactory.open().drop();

try (Database db = databaseFactory.create()) {
db.transaction(() -> {
DocumentType dtProduct = db.getSchema().createVertexType("Product", 1);
dtProduct.createProperty("name", Type.STRING);
});

db.transaction(() -> {
for (int i = 0; i < 1_000; i++) {
final MutableVertex v = db.newVertex("Product");
v.set("id", i);
v.save();
}

for (int i = 0; i < 10; i++) {
int last = Integer.MAX_VALUE;
int total = 0;
final ResultSet resultset = db.query("sql", "select from Product order by @rid desc");
while (resultset.hasNext()) {
final Vertex v = resultset.next().getVertex().get();
final Integer id = v.getInteger("id");
assertThat(id).isLessThan(last);

last = id;
++total;
}
assertThat(total).isEqualTo(1_000);
}
});
}
}
}

@BeforeEach
public void beginTests() {
final ContextConfiguration serverConfiguration = new ContextConfiguration();
Expand Down

0 comments on commit 0809e36

Please sign in to comment.