Skip to content

Commit

Permalink
Use existing timestamp as experiment id
Browse files Browse the repository at this point in the history
This commit introduces two changes. First, it changes type of experiment
id from UUID to timestamp, mainly to reuse existing id and improve
readability of events. This simplifies experiment analysis when the db
contains results from many tests.

Second, it adds initital test framework to validate event stream
generate by benchmark executor. The test does not really execute any sql
statements. A mock simulates the execution so that just the executor's
behavior can be validated.

Addresses microsoft#53
  • Loading branch information
ashvina committed Jun 6, 2023
1 parent 3494cb2 commit 2a687c3
Show file tree
Hide file tree
Showing 6 changed files with 170 additions and 36 deletions.
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,18 @@
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>5.9.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>5.2.0</version>
</dependency>
</dependencies>

<profiles>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
Expand All @@ -61,8 +60,8 @@ public class LSTBenchmarkExecutor extends BenchmarkRunnable {
private final BenchmarkConfig config;
private final JDBCTelemetryRegistry telemetryRegistry;

// UUID to identify the experiment run. The experiment telemetry will be tagged with this UUID.
private final UUID experimentRunId;
// timestamp of the start of the first iteration of an experiment.
private String experimentStartTime;

public LSTBenchmarkExecutor(
Map<String, ConnectionManager> idToConnectionManager,
Expand All @@ -72,16 +71,14 @@ public LSTBenchmarkExecutor(
this.idToConnectionManager = Collections.unmodifiableMap(idToConnectionManager);
this.config = config;
this.telemetryRegistry = telemetryRegistry;
this.experimentRunId = UUID.randomUUID();
}

/** This method runs the experiment. */
public void execute() throws Exception {
LOGGER.info("Running experiment: {}, run-id: {}", config.getId(), experimentRunId);
this.experimentStartTime = DateTimeFormatter.U_FORMATTER.format(Instant.now());
LOGGER.info("Running experiment: {}, start-time: {}", config.getId(), experimentStartTime);

final WorkloadExec workload = config.getWorkload();
final String experimentStartTimeStr = DateTimeFormatter.U_FORMATTER.format(Instant.now());
LOGGER.info("Experiment start time: {}", experimentStartTimeStr);

for (int i = 0; i < config.getRepetitions(); i++) {
LOGGER.info("Starting repetition: {}", i);
Expand All @@ -99,7 +96,7 @@ public void execute() throws Exception {
// Fill in specific runtime parameter values
Map<String, Object> runtimeParameterValues = new HashMap<>();
runtimeParameterValues.put("repetition", i);
runtimeParameterValues.put("experiment_start_time", experimentStartTimeStr);
runtimeParameterValues.put("experiment_start_time", experimentStartTime);
experimentMetadata.putAll(runtimeParameterValues);
// Go over phases and execute
Map<String, Instant> phaseIdToEndTime = new HashMap<>();
Expand Down Expand Up @@ -173,7 +170,12 @@ private EventInfo writeExperimentEvent(
Instant startTime, String id, Status status, String payload) {
EventInfo eventInfo =
ImmutableEventInfo.of(
experimentRunId, startTime, Instant.now(), id, EventType.EXEC_EXPERIMENT, status)
experimentStartTime,
startTime,
Instant.now(),
id,
EventType.EXEC_EXPERIMENT,
status)
.withPayload(payload);
telemetryRegistry.writeEvent(eventInfo);
return eventInfo;
Expand All @@ -182,39 +184,39 @@ private EventInfo writeExperimentEvent(
private EventInfo writePhaseEvent(Instant startTime, String id, Status status) {
EventInfo eventInfo =
ImmutableEventInfo.of(
experimentRunId, startTime, Instant.now(), id, EventType.EXEC_PHASE, status);
experimentStartTime, startTime, Instant.now(), id, EventType.EXEC_PHASE, status);
telemetryRegistry.writeEvent(eventInfo);
return eventInfo;
}

private EventInfo writeSessionEvent(Instant startTime, String id, Status status) {
EventInfo eventInfo =
ImmutableEventInfo.of(
experimentRunId, startTime, Instant.now(), id, EventType.EXEC_SESSION, status);
experimentStartTime, startTime, Instant.now(), id, EventType.EXEC_SESSION, status);
telemetryRegistry.writeEvent(eventInfo);
return eventInfo;
}

private EventInfo writeTaskEvent(Instant startTime, String id, Status status) {
EventInfo eventInfo =
ImmutableEventInfo.of(
experimentRunId, startTime, Instant.now(), id, EventType.EXEC_TASK, status);
experimentStartTime, startTime, Instant.now(), id, EventType.EXEC_TASK, status);
telemetryRegistry.writeEvent(eventInfo);
return eventInfo;
}

private EventInfo writeFileEvent(Instant startTime, String id, Status status) {
EventInfo eventInfo =
ImmutableEventInfo.of(
experimentRunId, startTime, Instant.now(), id, EventType.EXEC_FILE, status);
experimentStartTime, startTime, Instant.now(), id, EventType.EXEC_FILE, status);
telemetryRegistry.writeEvent(eventInfo);
return eventInfo;
}

private EventInfo writeStatementEvent(Instant startTime, String id, Status status) {
EventInfo eventInfo =
ImmutableEventInfo.of(
experimentRunId, startTime, Instant.now(), id, EventType.EXEC_STATEMENT, status);
experimentStartTime, startTime, Instant.now(), id, EventType.EXEC_STATEMENT, status);
telemetryRegistry.writeEvent(eventInfo);
return eventInfo;
}
Expand Down
30 changes: 26 additions & 4 deletions src/main/java/com/microsoft/lst_bench/telemetry/EventInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package com.microsoft.lst_bench.telemetry;

import java.time.Instant;
import java.util.UUID;
import javax.annotation.Nullable;
import org.immutables.value.Value;

Expand All @@ -26,25 +25,48 @@
public interface EventInfo {
/**
* Returns the unique identifier for the experiment run. This identifier helps in distinguishing
* events of one experiment run from another.
* events of one experiment run from another. Currently, the experiment run start timestamp is
* used as the unique identifier.
*/
UUID getExperimentRunId();
String getExperimentId();

Instant getStartTime();

Instant getEndTime();

/**
* Returns the type of operation that generated the event, e.g., phase name, task name, etc.
* provided in the workload specification.
*/
@Value.Parameter(false)
@Nullable String getOperationType();

/**
* Returns the unique identifier of the operation run that generated the event. Currently, the
* operation run start timestamp is used as the unique identifier.
*/
@Value.Parameter(false)
@Nullable String getOperationId();

String getEventId();

EventType getEventType();

Status getStatus();
@Nullable Status getStatus();

@Value.Parameter(false)
@Nullable String getPayload();

/** Enumerates the different types of events that can be captured. */
enum EventType {
// Event types related to workload execution timeline
EXPERIMENT_STARTED,
PHASE_STARTED,
SESSION_STARTED,
TASK_STARTED,
FILE_STARTED,
STATEMENT_STARTED,

EXEC_EXPERIMENT,
EXEC_PHASE,
EXEC_SESSION,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,9 @@ public JDBCTelemetryRegistry(
this.connectionManager = connectionManager;
this.eventsStream = Collections.synchronizedList(new ArrayList<>());
this.insertFileStatements =
Collections.unmodifiableList(
SQLParser.getStatements(insertFile).getStatements().stream()
.map(s -> StringUtils.replaceParameters(s, parameterValues))
.collect(Collectors.toList()));
SQLParser.getStatements(insertFile).getStatements().stream()
.map(s -> StringUtils.replaceParameters(s, parameterValues))
.collect(Collectors.toUnmodifiableList());
// Create the tables if they don't exist.
if (executeDdl) {
executeDdl(ddlFile, parameterValues);
Expand Down Expand Up @@ -96,7 +95,7 @@ public void flush() throws EventException {
o ->
String.join(
",",
StringUtils.quote(o.getExperimentRunId().toString()),
StringUtils.quote(o.getExperimentId()),
StringUtils.quote(o.getStartTime().toString()),
StringUtils.quote(o.getEndTime().toString()),
StringUtils.quote(o.getEventId()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,20 @@
import com.microsoft.lst_bench.sql.ConnectionManager;
import com.microsoft.lst_bench.telemetry.JDBCTelemetryRegistry;
import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
import java.util.UUID;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

class LSTBenchmarkExecutorTest {

Expand All @@ -51,6 +58,11 @@ void tearDown() {
if (telemetryDbFile.exists()) {
telemetryDbFile.delete();
}

File telemetryDbWalFile = new File(telemetryDbFileName.toString() + ".wal");
if (telemetryDbWalFile.exists()) {
telemetryDbWalFile.delete();
}
}

/**
Expand All @@ -62,12 +74,74 @@ void tearDown() {
void testNoOpSetup() throws Exception {
var idToConnectionManager = new HashMap<String, ConnectionManager>();
ExperimentConfig experimentConfig =
ImmutableExperimentConfig.builder().id("telemetryTest").version(1).repetitions(1).build();
ImmutableExperimentConfig.builder().id("nooptest").version(1).repetitions(1).build();
TaskLibrary taskLibrary = ImmutableTaskLibrary.builder().version(1).build();
Workload workload = ImmutableWorkload.builder().id("telemetryTest").version(1).build();
Workload workload = ImmutableWorkload.builder().id("nooptest").version(1).build();

var config = BenchmarkConfig.from(experimentConfig, taskLibrary, workload);

JDBCTelemetryRegistry telemetryRegistry = getTelemetryRegistry();

LSTBenchmarkExecutor benchmark =
new LSTBenchmarkExecutor(idToConnectionManager, config, telemetryRegistry);
benchmark.run();
}

/**
* This test runs a sample benchmark workload with a mock connection manager. The mock connection
* manager does not execute any sql. As such the test can validate the telemetry events that are
* generated by the benchmark executor. The test events are fetched from the telemetry database.
*/
@Test
void testExperimentTimelineTelemetry() throws Exception {
ObjectMapper mapper = new YAMLMapper();

Connection mockConnection = Mockito.mock(Connection.class);
Statement mockStatement = Mockito.mock(Statement.class);
Mockito.when(mockConnection.createStatement()).thenReturn(mockStatement);

ConnectionManager mockConnectionManager = Mockito.mock(ConnectionManager.class);
Mockito.when(mockConnectionManager.createConnection()).thenReturn(mockConnection);

var idToConnectionManager = new HashMap<String, ConnectionManager>();
idToConnectionManager.put("telemetryTest", mockConnectionManager);

ExperimentConfig experimentConfig =
ImmutableExperimentConfig.builder().id("telemetryTest").version(1).repetitions(1).build();

URL taskLibFile =
getClass().getClassLoader().getResource("./config/samples/task_library_0.yaml");
Assertions.assertNotNull(taskLibFile);
TaskLibrary taskLibrary = mapper.readValue(new File(taskLibFile.getFile()), TaskLibrary.class);

URL workloadFile =
getClass().getClassLoader().getResource("./config/spark/w_all_tpcds_delta.yaml");
Assertions.assertNotNull(workloadFile);
Workload workload = mapper.readValue(new File(workloadFile.getFile()), Workload.class);

var config = BenchmarkConfig.from(experimentConfig, taskLibrary, workload);

JDBCTelemetryRegistry telemetryRegistry = getTelemetryRegistry();

LSTBenchmarkExecutor benchmark =
new LSTBenchmarkExecutor(idToConnectionManager, config, telemetryRegistry);
benchmark.run();

try (var validationConnection =
DriverManager.getConnection("jdbc:duckdb:./" + telemetryDbFileName)) {
ResultSet resultset =
validationConnection.createStatement().executeQuery("SELECT * FROM experiment_telemetry");
int totalEvents = 0;
while (resultset.next()) {
totalEvents++;
}
Assertions.assertEquals(165, totalEvents);

// TODO improve event validation
}
}

private JDBCTelemetryRegistry getTelemetryRegistry() throws IOException, SQLException {
URL telemetryConfigFile =
getClass().getClassLoader().getResource("./config/spark/telemetry_config.yaml");
Assertions.assertNotNull(telemetryConfigFile);
Expand All @@ -81,16 +155,11 @@ void testNoOpSetup() throws Exception {
.url("jdbc:duckdb:./" + telemetryDbFileName)
.build();

final JDBCTelemetryRegistry telemetryRegistry =
new JDBCTelemetryRegistry(
ConnectionManager.from(uniqueTelemetryDbName),
telemetryConfig.isExecuteDDL(),
telemetryConfig.getDDLFile(),
telemetryConfig.getInsertFile(),
telemetryConfig.getParameterValues());

LSTBenchmarkExecutor benchmark =
new LSTBenchmarkExecutor(idToConnectionManager, config, telemetryRegistry);
benchmark.run();
return new JDBCTelemetryRegistry(
ConnectionManager.from(uniqueTelemetryDbName),
telemetryConfig.isExecuteDDL(),
telemetryConfig.getDDLFile(),
telemetryConfig.getInsertFile(),
telemetryConfig.getParameterValues());
}
}
36 changes: 36 additions & 0 deletions src/test/resources/config/samples/task_library_0.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Description: Tasks Library
---
version: 1
task_templates:
# Create external tables needed for benchmark
- id: setup
files:
- src/main/resources/scripts/tpcds/setup/spark/ddl-external-tables.sql
# Create data maintenance external tables needed for benchmark
- id: setup_data_maintenance
files:
- src/main/resources/scripts/tpcds/setup_data_maintenance/spark/ddl-external-tables-refresh.sql
parameter_values_file: src/main/resources/auxiliary/tpcds/setup_data_maintenance/parameter_values.dat
# Create schema and drop existing tables
- id: init
files:
- src/main/resources/scripts/tpcds/init/spark/init.sql
# Create benchmark tables and load data into them
- id: build
files:
- src/main/resources/scripts/tpcds/build/spark/1_create_call_center.sql
- src/main/resources/scripts/tpcds/build/spark/1_create_catalog_page.sql
- src/main/resources/scripts/tpcds/build/spark/2_load_customer.sql
# Execution of TPC-DS queries (possibly in a previous point-in-time)
- id: single_user
files:
- src/main/resources/scripts/tpcds/single_user/spark/query7.sql
- src/main/resources/scripts/tpcds/single_user/spark/query15.sql
supports_time_travel: true
# Execution of TPC-DS data maintenance queries (Delta)
- id: data_maintenance_delta
files:
- src/main/resources/scripts/tpcds/data_maintenance/spark/LF_CS.sql
- id: optimize_delta
files:
- src/main/resources/scripts/tpcds/optimize/spark/o_ship_mode-delta.sql

0 comments on commit 2a687c3

Please sign in to comment.