Skip to content

Commit

Permalink
Merge pull request #192 from ldbc/feature/async-update-streams
Browse files Browse the repository at this point in the history
Asynchronous update streams
  • Loading branch information
szarnyasg authored Sep 24, 2022
2 parents 88b3fea + 7e876c8 commit d972622
Show file tree
Hide file tree
Showing 25 changed files with 180 additions and 1,335 deletions.
18 changes: 17 additions & 1 deletion paramgen/paramgen-queries/pg-13b.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,26 @@ SELECT
person2Id AS 'person2Id',
GREATEST(Person1CreationDate, Person2CreationDate) AS 'useFrom',
LEAST(Person1DeletionDate, Person2DeletionDate) AS 'useUntil'
FROM people4Hops
FROM people4Hops,
(
SELECT Person1Id AS personId,
numFriendsOfFriends,
abs(numFriendsOfFriends - (
SELECT percentile_disc(0.65)
WITHIN GROUP (ORDER BY numFriendsOfFriends)
FROM personNumFriendsOfFriendsOfFriends)
) AS diff,
creationDate AS useFrom,
deletionDate AS useUntil
FROM personNumFriendsOfFriendsOfFriends
WHERE numFriends > 0 AND deletionDate - INTERVAL 1 DAY > :date_limit_filter AND creationDate + INTERVAL 1 DAY < :date_limit_filter
ORDER BY diff, md5(Person1Id)
LIMIT 100
) personIds
WHERE Person1CreationDate + INTERVAL 1 DAY < :date_limit_filter
AND Person2CreationDate + INTERVAL 1 DAY < :date_limit_filter
AND Person1DeletionDate - INTERVAL 1 DAY > :date_limit_filter
AND Person2DeletionDate - INTERVAL 1 DAY > :date_limit_filter
AND people4Hops.Person1Id = personIds.personId
ORDER BY md5(concat(person1Id + 1, person2Id + 2))
LIMIT 500
20 changes: 18 additions & 2 deletions paramgen/paramgen-queries/pg-14b.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,26 @@ SELECT
person2Id AS 'person2Id',
GREATEST(Person1CreationDate, Person2CreationDate) AS 'useFrom',
LEAST(Person1DeletionDate, Person2DeletionDate) AS 'useUntil'
FROM people4Hops
FROM people4Hops,
(
SELECT Person1Id AS personId,
numFriendsOfFriends,
abs(numFriendsOfFriends - (
SELECT percentile_disc(0.65)
WITHIN GROUP (ORDER BY numFriendsOfFriends)
FROM personNumFriendsOfFriendsOfFriends)
) AS diff,
creationDate AS useFrom,
deletionDate AS useUntil
FROM personNumFriendsOfFriendsOfFriends
WHERE numFriends > 0 AND deletionDate - INTERVAL 1 DAY > :date_limit_filter AND creationDate + INTERVAL 1 DAY < :date_limit_filter
ORDER BY diff, md5(Person1Id)
LIMIT 100
) personIds
WHERE Person1CreationDate + INTERVAL 1 DAY < :date_limit_filter
AND Person2CreationDate + INTERVAL 1 DAY < :date_limit_filter
AND Person1DeletionDate - INTERVAL 1 DAY > :date_limit_filter
AND Person2DeletionDate - INTERVAL 1 DAY > :date_limit_filter
ORDER BY md5(concat(person1Id + 3, person2Id + 4))
AND people4Hops.Person1Id = personIds.personId
ORDER BY md5(concat(person1Id + 1, person2Id + 2))
LIMIT 500
28 changes: 25 additions & 3 deletions paramgen/paramgen.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from pathlib import Path
import argparse

remove_duplicates_dict = {
remove_lower_times_dict = {
"Q_1" : "DELETE FROM Q_1 t1 WHERE t1.useUntil < (SELECT max(t2.useUntil) FROM Q_1 t2 WHERE t2.personId = t1.personId AND t2.firstName = t1.firstName);",
"Q_2" : "DELETE FROM Q_2 t1 WHERE t1.useUntil < (SELECT max(t2.useUntil) FROM Q_2 t2 WHERE t2.personId = t1.personId AND t2.maxDate = t1.maxDate);",
"Q_3a" : "DELETE FROM Q_3a t1 WHERE t1.useUntil < (SELECT max(t2.useUntil) FROM Q_3a t2 WHERE t2.personId = t1.personId AND t2.countryXName = t1.countryXName AND t2.countryYName = t1.countryYName AND t2.startDate = t1.startDate AND t2.durationDays = t1.durationDays);",
Expand All @@ -25,6 +25,25 @@
"Q_14b" : "DELETE FROM Q_14b t1 WHERE t1.useUntil < (SELECT max(t2.useUntil) FROM Q_14b t2 WHERE t2.person1Id = t1.person1Id AND t2.person2Id = t1.person2Id);"
}

remove_duplicates = {
"Q_1" : "CREATE TABLE Q_1_filtered AS SELECT personId, firstName, useFrom, useUntil FROM Q_1 GROUP BY personId, firstName, useFrom, useUntil;",
"Q_2" : "CREATE TABLE Q_2_filtered AS SELECT personId, maxDate, useFrom, useUntil FROM Q_2 GROUP BY personId, maxDate, useFrom, useUntil;",
"Q_3a" : "CREATE TABLE Q_3a_filtered AS SELECT personId, countryXName, countryYName, startDate, durationDays, useFrom, useUntil FROM Q_3a GROUP BY personId, countryXName, countryYName, startDate, durationDays, useFrom, useUntil;",
"Q_3b" : "CREATE TABLE Q_3b_filtered AS SELECT personId, countryXName, countryYName, startDate, durationDays, useFrom, useUntil FROM Q_3b GROUP BY personId, countryXName, countryYName, startDate, durationDays, useFrom, useUntil;",
"Q_4" : "CREATE TABLE Q_4_filtered AS SELECT personId, startDate, durationDays, useFrom, useUntil FROM Q_4 GROUP BY personId, startDate, durationDays, useFrom, useUntil;",
"Q_5" : "CREATE TABLE Q_5_filtered AS SELECT personId, minDate, useFrom, useUntil FROM Q_5 GROUP BY personId, minDate, useFrom, useUntil;",
"Q_6" : "CREATE TABLE Q_6_filtered AS SELECT personId, tagName, useFrom, useUntil FROM Q_6 GROUP BY personId, tagName, useFrom, useUntil;",
"Q_7" : "CREATE TABLE Q_7_filtered AS SELECT personId, useFrom, useUntil FROM Q_7 GROUP BY personId, useFrom, useUntil;",
"Q_8" : "CREATE TABLE Q_8_filtered AS SELECT personId, useFrom, useUntil FROM Q_8 GROUP BY personId, useFrom, useUntil;",
"Q_9" : "CREATE TABLE Q_9_filtered AS SELECT personId, maxDate, useFrom, useUntil FROM Q_9 GROUP BY personId, maxDate, useFrom, useUntil;",
"Q_10" : "CREATE TABLE Q_10_filtered AS SELECT personId, month, useFrom, useUntil FROM Q_10 GROUP BY personId, month, useFrom, useUntil;",
"Q_11" : "CREATE TABLE Q_11_filtered AS SELECT personId, countryName, workFromYear, useFrom, useUntil FROM Q_11 GROUP BY personId, countryName, workFromYear, useFrom, useUntil;",
"Q_12" : "CREATE TABLE Q_12_filtered AS SELECT personId, tagClassName, useFrom, useUntil FROM Q_12 GROUP BY personId, tagClassName, useFrom, useUntil;",
"Q_13a" : "CREATE TABLE Q_13a_filtered AS SELECT person1Id, person2Id, useFrom, useUntil FROM Q_13a GROUP BY person1Id, person2Id, useFrom, useUntil;",
"Q_13b" : "CREATE TABLE Q_13b_filtered AS SELECT person1Id, person2Id, useFrom, useUntil FROM Q_13b GROUP BY person1Id, person2Id, useFrom, useUntil;",
"Q_14a" : "CREATE TABLE Q_14a_filtered AS SELECT person1Id, person2Id, useFrom, useUntil FROM Q_14a GROUP BY person1Id, person2Id, useFrom, useUntil;",
"Q_14b" : "CREATE TABLE Q_14b_filtered AS SELECT person1Id, person2Id, useFrom, useUntil FROM Q_14b GROUP BY person1Id, person2Id, useFrom, useUntil;"
}

def generate_parameter_for_query_type(cursor, date_limit, date_start, create_tables, query_variant):
"""
Expand Down Expand Up @@ -111,9 +130,12 @@ def export_parameters(cursor):
print("============ Output parameters ============")
for query_variant in ["1", "2", "3a", "3b", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13a", "13b", "14a", "14b"]:
print(f"- Q{query_variant} TO parameters/interactive-{query_variant}.parquet")
query = remove_duplicates_dict[f"Q_{query_variant}"]
query = remove_lower_times_dict[f"Q_{query_variant}"]#remove_duplicates
cursor.execute(query)
cursor.execute(f"COPY 'Q_{query_variant}' TO 'parameters/interactive-{query_variant}.parquet' WITH (FORMAT PARQUET);")
query = remove_duplicates[f"Q_{query_variant}"]#remove_duplicates
cursor.execute(query)
cursor.execute(f"COPY 'Q_{query_variant}_filtered' TO 'parameters/interactive-{query_variant}.parquet' WITH (FORMAT PARQUET);")



def generate_short_parameters(cursor, date_start):
Expand Down
113 changes: 2 additions & 111 deletions src/main/java/org/ldbcouncil/snb/driver/WorkloadStreams.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
public class WorkloadStreams
{
private WorkloadStreamDefinition asynchronousStream = null;
private List<WorkloadStreamDefinition> blockingStreams = new ArrayList<>();

public static WorkloadStreams timeOffsetAndCompressWorkloadStreams(
WorkloadStreams originalWorkloadStreams,
Expand Down Expand Up @@ -66,44 +65,7 @@ public static WorkloadStreams timeOffsetAndCompressWorkloadStreams(
List<PeekingIterator<Operation>> peekingBlockingDependencyOperationStreams = new ArrayList<>();
List<Long> peekingBlockingNonDependencyOperationStreamsAheadOfMinByMillis = new ArrayList<>();
List<PeekingIterator<Operation>> peekingBlockingNonDependencyOperationStreams = new ArrayList<>();
List<WorkloadStreamDefinition> blockingStreams = originalWorkloadStreams.blockingStreamDefinitions();
for ( int i = 0; i < blockingStreams.size(); i++ )
{
PeekingIterator<Operation> peekingBlockingDependencyOperationStream =
Iterators.peekingIterator( blockingStreams.get( i ).dependencyOperations() );
try
{
long firstAsMilli = peekingBlockingDependencyOperationStream.peek().scheduledStartTimeAsMilli();
if ( firstAsMilli < minScheduledStartTimeAsMilli )
{
minScheduledStartTimeAsMilli = firstAsMilli;
}
}
catch ( NoSuchElementException e )
{
// do nothing, just means stream was empty
}
peekingBlockingDependencyOperationStreamsAheadOfMinByMillis.add( 0l );
peekingBlockingDependencyOperationStreams.add( peekingBlockingDependencyOperationStream );

PeekingIterator<Operation> peekingBlockingNonDependencyOperationStream =
Iterators.peekingIterator( blockingStreams.get( i ).nonDependencyOperations() );
try
{
long firstAsMilli = peekingBlockingNonDependencyOperationStream.peek().scheduledStartTimeAsMilli();
if ( firstAsMilli < minScheduledStartTimeAsMilli )
{
minScheduledStartTimeAsMilli = firstAsMilli;
}
}
catch ( NoSuchElementException e )
{
// do nothing, just means stream was empty
}
peekingBlockingNonDependencyOperationStreamsAheadOfMinByMillis.add( 0l );
peekingBlockingNonDependencyOperationStreams.add( peekingBlockingNonDependencyOperationStream );
}


if ( Long.MAX_VALUE == minScheduledStartTimeAsMilli )
{
minScheduledStartTimeAsMilli = newStartTimeAsMilli;
Expand Down Expand Up @@ -193,26 +155,6 @@ public static WorkloadStreams timeOffsetAndCompressWorkloadStreams(
originalWorkloadStreams.asynchronousStream().childOperationGenerator()
);

for ( int i = 0; i < blockingStreams.size(); i++ )
{
timeOffsetAndCompressedWorkloadStreams.addBlockingStream(
blockingStreams.get( i ).dependentOperationTypes(),
blockingStreams.get( i ).dependencyOperationTypes(),
gf.timeOffsetAndCompress(
peekingBlockingDependencyOperationStreams.get( i ),
newStartTimeAsMilli + peekingBlockingDependencyOperationStreamsAheadOfMinByMillis.get( i ),
compressionRatio
),
gf.timeOffsetAndCompress(
peekingBlockingNonDependencyOperationStreams.get( i ),
newStartTimeAsMilli +
peekingBlockingNonDependencyOperationStreamsAheadOfMinByMillis.get( i ),
compressionRatio
),
blockingStreams.get( i ).childOperationGenerator()
);
}

return timeOffsetAndCompressedWorkloadStreams;
}

Expand Down Expand Up @@ -266,15 +208,6 @@ public static Tuple3<WorkloadStreams,Workload,Long> createNewWorkloadWithOffsetA
streams.add( unlimitedWorkloadStreams.asynchronousStream().nonDependencyOperations() );
childOperationGenerators.add( unlimitedWorkloadStreams.asynchronousStream().childOperationGenerator() );

for ( WorkloadStreamDefinition stream : unlimitedWorkloadStreams.blockingStreamDefinitions() )
{
streams.add( stream.dependencyOperations() );
childOperationGenerators.add( stream.childOperationGenerator() );

streams.add( stream.nonDependencyOperations() );
childOperationGenerators.add( stream.childOperationGenerator() );
}

// stream through streams once, to calculate how many operations are needed from each,
// to get operation_count in total
Tuple3<long[],long[],Long> limitsAndMinimumsForStream =
Expand Down Expand Up @@ -303,16 +236,11 @@ public static Tuple3<WorkloadStreams,Workload,Long> createNewWorkloadWithOffsetA

// retrieve unbounded streams
unlimitedWorkloadStreams = workload.streams( gf, returnStreamsWithDbConnector );
List<WorkloadStreamDefinition> unlimitedBlockingStreams = unlimitedWorkloadStreams.blockingStreamDefinitions();

// advance to offsets
gf.consume( unlimitedWorkloadStreams.asynchronousStream().dependencyOperations(), startForStream[0] );
gf.consume( unlimitedWorkloadStreams.asynchronousStream().nonDependencyOperations(), startForStream[1] );
for ( int i = 0; i < unlimitedBlockingStreams.size(); i++ )
{
gf.consume( unlimitedBlockingStreams.get( i ).dependencyOperations(), startForStream[i * 2 + 2] );
gf.consume( unlimitedBlockingStreams.get( i ).nonDependencyOperations(), startForStream[i * 2 + 3] );
}


// copy unbounded streams to new workload streams instance, from offsets, applying limits
workloadStreams.setAsynchronousStream(
Expand All @@ -322,16 +250,6 @@ public static Tuple3<WorkloadStreams,Workload,Long> createNewWorkloadWithOffsetA
gf.limit( unlimitedWorkloadStreams.asynchronousStream().nonDependencyOperations(), limitForStream[1] ),
unlimitedWorkloadStreams.asynchronousStream().childOperationGenerator()
);
for ( int i = 0; i < unlimitedBlockingStreams.size(); i++ )
{
workloadStreams.addBlockingStream(
unlimitedBlockingStreams.get( i ).dependentOperationTypes(),
unlimitedBlockingStreams.get( i ).dependencyOperationTypes(),
gf.limit( unlimitedBlockingStreams.get( i ).dependencyOperations(), limitForStream[i * 2 + 2] ),
gf.limit( unlimitedBlockingStreams.get( i ).nonDependencyOperations(), limitForStream[i * 2 + 3] ),
unlimitedBlockingStreams.get( i ).childOperationGenerator()
);
}

return Tuple.tuple3(
workloadStreams,
Expand Down Expand Up @@ -584,38 +502,11 @@ public void setAsynchronousStream(
);
}

public List<WorkloadStreamDefinition> blockingStreamDefinitions()
{
return blockingStreams;
}

public void addBlockingStream(
Set<Class<? extends Operation>> dependentOperationTypes,
Set<Class<? extends Operation>> dependencyOperationTypes,
Iterator<Operation> dependencyOperations,
Iterator<Operation> nonDependencyOperations,
ChildOperationGenerator childOperationGenerator )
{
WorkloadStreamDefinition blockingStream = new WorkloadStreamDefinition(
dependentOperationTypes,
dependencyOperationTypes,
dependencyOperations,
nonDependencyOperations,
childOperationGenerator
);
this.blockingStreams.add( blockingStream );
}

public static Iterator<Operation> mergeSortedByStartTimeExcludingChildOperationGenerators(
GeneratorFactory gf,
WorkloadStreams workloadStreams )
{
List<Iterator<Operation>> allStreams = new ArrayList<>();
for ( WorkloadStreamDefinition streamDefinition : workloadStreams.blockingStreamDefinitions() )
{
allStreams.add( streamDefinition.dependencyOperations() );
allStreams.add( streamDefinition.nonDependencyOperations() );
}
allStreams.add( workloadStreams.asynchronousStream().dependencyOperations() );
allStreams.add( workloadStreams.asynchronousStream().nonDependencyOperations() );
return gf.mergeSortOperationsByTimeStamp( allStreams.toArray( new Iterator[allStreams.size()] ) );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ public BufferedIterator(
this.operationStreamBuffer = operationStreamBuffer;
}

public void init()
{
currentOperationStream = operationStreamBuffer.next();
}

@Override
public boolean hasNext()
{
Expand All @@ -33,6 +38,10 @@ public boolean hasNext()
currentOperationStream = Collections.emptyIterator();
isEmpty = true;
}
if (!currentOperationStream.hasNext())
{
isEmpty = true;
}
}
return currentOperationStream.hasNext();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.ldbcouncil.snb.driver.runtime.executor.OperationExecutor;
import org.ldbcouncil.snb.driver.runtime.executor.OperationExecutorException;
import org.ldbcouncil.snb.driver.runtime.executor.OperationStreamExecutorService;
import org.ldbcouncil.snb.driver.runtime.executor.SameThreadOperationExecutor;
import org.ldbcouncil.snb.driver.runtime.executor.ThreadPoolOperationExecutor;
import org.ldbcouncil.snb.driver.runtime.metrics.MetricsCollectionException;
import org.ldbcouncil.snb.driver.runtime.metrics.MetricsService;
Expand Down Expand Up @@ -347,44 +346,6 @@ public WorkloadRunnerThread( TimeSource timeSource,
completionTimeWriterForAsynchronous,
completionTimeService
);

for ( WorkloadStreamDefinition blockingStream : workloadStreams.blockingStreamDefinitions() )
{
// only create a completion time writer for an executor if it contains at least one READ_WRITE operation
// otherwise it will cause completion time to stall
CompletionTimeWriter completionTimeWriterForBlocking;
try
{
completionTimeWriterForBlocking = (blockingStream.dependencyOperations().hasNext())
? completionTimeService.newCompletionTimeWriter()
: DUMMY_COMPLETION_TIME_WRITER;
}
catch ( CompletionTimeException e )
{
throw new WorkloadException( "Error while attempting to create completion time writer", e );
}
OperationExecutor executorForBlocking = new SameThreadOperationExecutor(
db,
blockingStream,
completionTimeWriterForBlocking,
completionTimeService,
spinner,
timeSource,
errorReporter,
metricsService,
blockingStream.childOperationGenerator()
);
this.executorsForBlocking.add( executorForBlocking );
this.blockingStreamExecutorServices.add(
new OperationStreamExecutorService(
errorReporter,
blockingStream,
executorForBlocking,
completionTimeWriterForBlocking,
completionTimeService
)
);
}
this.stateRef = new AtomicReference<>( WorkloadRunnerThreadState.NOT_STARTED );
}

Expand Down
Loading

0 comments on commit d972622

Please sign in to comment.