diff --git a/pom.xml b/pom.xml
index d3208524..7689636a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -78,12 +78,18 @@
+
org.junit.jupiter
junit-jupiter
5.9.3
test
+
+ org.mockito
+ mockito-core
+ 5.2.0
+
diff --git a/src/main/java/com/microsoft/lst_bench/common/LSTBenchmarkExecutor.java b/src/main/java/com/microsoft/lst_bench/common/LSTBenchmarkExecutor.java
index f44ca1b0..239054f3 100644
--- a/src/main/java/com/microsoft/lst_bench/common/LSTBenchmarkExecutor.java
+++ b/src/main/java/com/microsoft/lst_bench/common/LSTBenchmarkExecutor.java
@@ -41,7 +41,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -61,8 +60,8 @@ public class LSTBenchmarkExecutor extends BenchmarkRunnable {
private final BenchmarkConfig config;
private final JDBCTelemetryRegistry telemetryRegistry;
- // UUID to identify the experiment run. The experiment telemetry will be tagged with this UUID.
- private final UUID experimentRunId;
+ // timestamp of the start of the first iteration of an experiment.
+ private String experimentStartTime;
public LSTBenchmarkExecutor(
Map idToConnectionManager,
@@ -72,16 +71,14 @@ public LSTBenchmarkExecutor(
this.idToConnectionManager = Collections.unmodifiableMap(idToConnectionManager);
this.config = config;
this.telemetryRegistry = telemetryRegistry;
- this.experimentRunId = UUID.randomUUID();
}
/** This method runs the experiment. */
public void execute() throws Exception {
- LOGGER.info("Running experiment: {}, run-id: {}", config.getId(), experimentRunId);
+ this.experimentStartTime = DateTimeFormatter.U_FORMATTER.format(Instant.now());
+ LOGGER.info("Running experiment: {}, start-time: {}", config.getId(), experimentStartTime);
final WorkloadExec workload = config.getWorkload();
- final String experimentStartTimeStr = DateTimeFormatter.U_FORMATTER.format(Instant.now());
- LOGGER.info("Experiment start time: {}", experimentStartTimeStr);
for (int i = 0; i < config.getRepetitions(); i++) {
LOGGER.info("Starting repetition: {}", i);
@@ -99,7 +96,7 @@ public void execute() throws Exception {
// Fill in specific runtime parameter values
Map runtimeParameterValues = new HashMap<>();
runtimeParameterValues.put("repetition", i);
- runtimeParameterValues.put("experiment_start_time", experimentStartTimeStr);
+ runtimeParameterValues.put("experiment_start_time", experimentStartTime);
experimentMetadata.putAll(runtimeParameterValues);
// Go over phases and execute
Map phaseIdToEndTime = new HashMap<>();
@@ -173,7 +170,12 @@ private EventInfo writeExperimentEvent(
Instant startTime, String id, Status status, String payload) {
EventInfo eventInfo =
ImmutableEventInfo.of(
- experimentRunId, startTime, Instant.now(), id, EventType.EXEC_EXPERIMENT, status)
+ experimentStartTime,
+ startTime,
+ Instant.now(),
+ id,
+ EventType.EXEC_EXPERIMENT,
+ status)
.withPayload(payload);
telemetryRegistry.writeEvent(eventInfo);
return eventInfo;
@@ -182,7 +184,7 @@ private EventInfo writeExperimentEvent(
private EventInfo writePhaseEvent(Instant startTime, String id, Status status) {
EventInfo eventInfo =
ImmutableEventInfo.of(
- experimentRunId, startTime, Instant.now(), id, EventType.EXEC_PHASE, status);
+ experimentStartTime, startTime, Instant.now(), id, EventType.EXEC_PHASE, status);
telemetryRegistry.writeEvent(eventInfo);
return eventInfo;
}
@@ -190,7 +192,7 @@ private EventInfo writePhaseEvent(Instant startTime, String id, Status status) {
private EventInfo writeSessionEvent(Instant startTime, String id, Status status) {
EventInfo eventInfo =
ImmutableEventInfo.of(
- experimentRunId, startTime, Instant.now(), id, EventType.EXEC_SESSION, status);
+ experimentStartTime, startTime, Instant.now(), id, EventType.EXEC_SESSION, status);
telemetryRegistry.writeEvent(eventInfo);
return eventInfo;
}
@@ -198,7 +200,7 @@ private EventInfo writeSessionEvent(Instant startTime, String id, Status status)
private EventInfo writeTaskEvent(Instant startTime, String id, Status status) {
EventInfo eventInfo =
ImmutableEventInfo.of(
- experimentRunId, startTime, Instant.now(), id, EventType.EXEC_TASK, status);
+ experimentStartTime, startTime, Instant.now(), id, EventType.EXEC_TASK, status);
telemetryRegistry.writeEvent(eventInfo);
return eventInfo;
}
@@ -206,7 +208,7 @@ private EventInfo writeTaskEvent(Instant startTime, String id, Status status) {
private EventInfo writeFileEvent(Instant startTime, String id, Status status) {
EventInfo eventInfo =
ImmutableEventInfo.of(
- experimentRunId, startTime, Instant.now(), id, EventType.EXEC_FILE, status);
+ experimentStartTime, startTime, Instant.now(), id, EventType.EXEC_FILE, status);
telemetryRegistry.writeEvent(eventInfo);
return eventInfo;
}
@@ -214,7 +216,7 @@ private EventInfo writeFileEvent(Instant startTime, String id, Status status) {
private EventInfo writeStatementEvent(Instant startTime, String id, Status status) {
EventInfo eventInfo =
ImmutableEventInfo.of(
- experimentRunId, startTime, Instant.now(), id, EventType.EXEC_STATEMENT, status);
+ experimentStartTime, startTime, Instant.now(), id, EventType.EXEC_STATEMENT, status);
telemetryRegistry.writeEvent(eventInfo);
return eventInfo;
}
diff --git a/src/main/java/com/microsoft/lst_bench/telemetry/EventInfo.java b/src/main/java/com/microsoft/lst_bench/telemetry/EventInfo.java
index 5227eb2b..05bd303f 100644
--- a/src/main/java/com/microsoft/lst_bench/telemetry/EventInfo.java
+++ b/src/main/java/com/microsoft/lst_bench/telemetry/EventInfo.java
@@ -16,7 +16,6 @@
package com.microsoft.lst_bench.telemetry;
import java.time.Instant;
-import java.util.UUID;
import javax.annotation.Nullable;
import org.immutables.value.Value;
@@ -26,25 +25,48 @@
public interface EventInfo {
/**
* Returns the unique identifier for the experiment run. This identifier helps in distinguishing
- * events of one experiment run from another.
+ * events of one experiment run from another. Currently, the experiment run start timestamp is
+ * used as the unique identifier.
*/
- UUID getExperimentRunId();
+ String getExperimentId();
Instant getStartTime();
Instant getEndTime();
+ /**
+ * Returns the type of operation that generated the event, e.g., phase name, task name, etc.
+ * provided in the workload specification.
+ */
+ @Value.Parameter(false)
+ @Nullable String getOperationType();
+
+ /**
+ * Returns the unique identifier of the operation run that generated the event. Currently, the
+ * operation run start timestamp is used as the unique identifier.
+ */
+ @Value.Parameter(false)
+ @Nullable String getOperationId();
+
String getEventId();
EventType getEventType();
- Status getStatus();
+ @Nullable Status getStatus();
@Value.Parameter(false)
@Nullable String getPayload();
/** Enumerates the different types of events that can be captured. */
enum EventType {
+ // Event types related to workload execution timeline
+ EXPERIMENT_STARTED,
+ PHASE_STARTED,
+ SESSION_STARTED,
+ TASK_STARTED,
+ FILE_STARTED,
+ STATEMENT_STARTED,
+
EXEC_EXPERIMENT,
EXEC_PHASE,
EXEC_SESSION,
diff --git a/src/main/java/com/microsoft/lst_bench/telemetry/JDBCTelemetryRegistry.java b/src/main/java/com/microsoft/lst_bench/telemetry/JDBCTelemetryRegistry.java
index 8a23fa66..266c5504 100644
--- a/src/main/java/com/microsoft/lst_bench/telemetry/JDBCTelemetryRegistry.java
+++ b/src/main/java/com/microsoft/lst_bench/telemetry/JDBCTelemetryRegistry.java
@@ -53,10 +53,9 @@ public JDBCTelemetryRegistry(
this.connectionManager = connectionManager;
this.eventsStream = Collections.synchronizedList(new ArrayList<>());
this.insertFileStatements =
- Collections.unmodifiableList(
- SQLParser.getStatements(insertFile).getStatements().stream()
- .map(s -> StringUtils.replaceParameters(s, parameterValues))
- .collect(Collectors.toList()));
+ SQLParser.getStatements(insertFile).getStatements().stream()
+ .map(s -> StringUtils.replaceParameters(s, parameterValues))
+ .collect(Collectors.toUnmodifiableList());
// Create the tables if they don't exist.
if (executeDdl) {
executeDdl(ddlFile, parameterValues);
@@ -96,7 +95,7 @@ public void flush() throws EventException {
o ->
String.join(
",",
- StringUtils.quote(o.getExperimentRunId().toString()),
+ StringUtils.quote(o.getExperimentId()),
StringUtils.quote(o.getStartTime().toString()),
StringUtils.quote(o.getEndTime().toString()),
StringUtils.quote(o.getEventId()),
diff --git a/src/test/java/com/microsoft/lst_bench/common/LSTBenchmarkExecutorTest.java b/src/test/java/com/microsoft/lst_bench/common/LSTBenchmarkExecutorTest.java
index 71cc709e..b566f912 100644
--- a/src/test/java/com/microsoft/lst_bench/common/LSTBenchmarkExecutorTest.java
+++ b/src/test/java/com/microsoft/lst_bench/common/LSTBenchmarkExecutorTest.java
@@ -28,13 +28,20 @@
import com.microsoft.lst_bench.sql.ConnectionManager;
import com.microsoft.lst_bench.telemetry.JDBCTelemetryRegistry;
import java.io.File;
+import java.io.IOException;
import java.net.URL;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
import java.util.HashMap;
import java.util.UUID;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
class LSTBenchmarkExecutorTest {
@@ -51,6 +58,11 @@ void tearDown() {
if (telemetryDbFile.exists()) {
telemetryDbFile.delete();
}
+
+ File telemetryDbWalFile = new File(telemetryDbFileName.toString() + ".wal");
+ if (telemetryDbWalFile.exists()) {
+ telemetryDbWalFile.delete();
+ }
}
/**
@@ -62,12 +74,74 @@ void tearDown() {
void testNoOpSetup() throws Exception {
var idToConnectionManager = new HashMap();
ExperimentConfig experimentConfig =
- ImmutableExperimentConfig.builder().id("telemetryTest").version(1).repetitions(1).build();
+ ImmutableExperimentConfig.builder().id("nooptest").version(1).repetitions(1).build();
TaskLibrary taskLibrary = ImmutableTaskLibrary.builder().version(1).build();
- Workload workload = ImmutableWorkload.builder().id("telemetryTest").version(1).build();
+ Workload workload = ImmutableWorkload.builder().id("nooptest").version(1).build();
+
+ var config = BenchmarkConfig.from(experimentConfig, taskLibrary, workload);
+
+ JDBCTelemetryRegistry telemetryRegistry = getTelemetryRegistry();
+
+ LSTBenchmarkExecutor benchmark =
+ new LSTBenchmarkExecutor(idToConnectionManager, config, telemetryRegistry);
+ benchmark.run();
+ }
+
+ /**
+ * This test runs a sample benchmark workload with a mock connection manager. The mock connection
+ * manager does not execute any sql. As such the test can validate the telemetry events that are
+ * generated by the benchmark executor. The test events are fetched from the telemetry database.
+ */
+ @Test
+ void testExperimentTimelineTelemetry() throws Exception {
+ ObjectMapper mapper = new YAMLMapper();
+
+ Connection mockConnection = Mockito.mock(Connection.class);
+ Statement mockStatement = Mockito.mock(Statement.class);
+ Mockito.when(mockConnection.createStatement()).thenReturn(mockStatement);
+
+ ConnectionManager mockConnectionManager = Mockito.mock(ConnectionManager.class);
+ Mockito.when(mockConnectionManager.createConnection()).thenReturn(mockConnection);
+
+ var idToConnectionManager = new HashMap();
+ idToConnectionManager.put("telemetryTest", mockConnectionManager);
+
+ ExperimentConfig experimentConfig =
+ ImmutableExperimentConfig.builder().id("telemetryTest").version(1).repetitions(1).build();
+
+ URL taskLibFile =
+ getClass().getClassLoader().getResource("./config/samples/task_library_0.yaml");
+ Assertions.assertNotNull(taskLibFile);
+ TaskLibrary taskLibrary = mapper.readValue(new File(taskLibFile.getFile()), TaskLibrary.class);
+
+ URL workloadFile =
+ getClass().getClassLoader().getResource("./config/spark/w_all_tpcds_delta.yaml");
+ Assertions.assertNotNull(workloadFile);
+ Workload workload = mapper.readValue(new File(workloadFile.getFile()), Workload.class);
var config = BenchmarkConfig.from(experimentConfig, taskLibrary, workload);
+ JDBCTelemetryRegistry telemetryRegistry = getTelemetryRegistry();
+
+ LSTBenchmarkExecutor benchmark =
+ new LSTBenchmarkExecutor(idToConnectionManager, config, telemetryRegistry);
+ benchmark.run();
+
+ try (var validationConnection =
+ DriverManager.getConnection("jdbc:duckdb:./" + telemetryDbFileName)) {
+ ResultSet resultset =
+ validationConnection.createStatement().executeQuery("SELECT * FROM experiment_telemetry");
+ int totalEvents = 0;
+ while (resultset.next()) {
+ totalEvents++;
+ }
+ Assertions.assertEquals(165, totalEvents);
+
+ // TODO improve event validation
+ }
+ }
+
+ private JDBCTelemetryRegistry getTelemetryRegistry() throws IOException, SQLException {
URL telemetryConfigFile =
getClass().getClassLoader().getResource("./config/spark/telemetry_config.yaml");
Assertions.assertNotNull(telemetryConfigFile);
@@ -81,16 +155,11 @@ void testNoOpSetup() throws Exception {
.url("jdbc:duckdb:./" + telemetryDbFileName)
.build();
- final JDBCTelemetryRegistry telemetryRegistry =
- new JDBCTelemetryRegistry(
- ConnectionManager.from(uniqueTelemetryDbName),
- telemetryConfig.isExecuteDDL(),
- telemetryConfig.getDDLFile(),
- telemetryConfig.getInsertFile(),
- telemetryConfig.getParameterValues());
-
- LSTBenchmarkExecutor benchmark =
- new LSTBenchmarkExecutor(idToConnectionManager, config, telemetryRegistry);
- benchmark.run();
+ return new JDBCTelemetryRegistry(
+ ConnectionManager.from(uniqueTelemetryDbName),
+ telemetryConfig.isExecuteDDL(),
+ telemetryConfig.getDDLFile(),
+ telemetryConfig.getInsertFile(),
+ telemetryConfig.getParameterValues());
}
}
diff --git a/src/test/resources/config/samples/task_library_0.yaml b/src/test/resources/config/samples/task_library_0.yaml
new file mode 100644
index 00000000..a147db11
--- /dev/null
+++ b/src/test/resources/config/samples/task_library_0.yaml
@@ -0,0 +1,36 @@
+# Description: Tasks Library
+---
+version: 1
+task_templates:
+# Create external tables needed for benchmark
+- id: setup
+ files:
+ - src/main/resources/scripts/tpcds/setup/spark/ddl-external-tables.sql
+# Create data maintenance external tables needed for benchmark
+- id: setup_data_maintenance
+ files:
+ - src/main/resources/scripts/tpcds/setup_data_maintenance/spark/ddl-external-tables-refresh.sql
+ parameter_values_file: src/main/resources/auxiliary/tpcds/setup_data_maintenance/parameter_values.dat
+# Create schema and drop existing tables
+- id: init
+ files:
+ - src/main/resources/scripts/tpcds/init/spark/init.sql
+# Create benchmark tables and load data into them
+- id: build
+ files:
+ - src/main/resources/scripts/tpcds/build/spark/1_create_call_center.sql
+ - src/main/resources/scripts/tpcds/build/spark/1_create_catalog_page.sql
+ - src/main/resources/scripts/tpcds/build/spark/2_load_customer.sql
+# Execution of TPC-DS queries (possibly in a previous point-in-time)
+- id: single_user
+ files:
+ - src/main/resources/scripts/tpcds/single_user/spark/query7.sql
+ - src/main/resources/scripts/tpcds/single_user/spark/query15.sql
+ supports_time_travel: true
+# Execution of TPC-DS data maintenance queries (Delta)
+- id: data_maintenance_delta
+ files:
+ - src/main/resources/scripts/tpcds/data_maintenance/spark/LF_CS.sql
+- id: optimize_delta
+ files:
+ - src/main/resources/scripts/tpcds/optimize/spark/o_ship_mode-delta.sql
\ No newline at end of file