From b45d6dc4010b4babb82570e3f944002c507b549f Mon Sep 17 00:00:00 2001 From: Bahram Zaeri Date: Wed, 15 Jan 2025 16:14:23 +0000 Subject: [PATCH 1/5] add new field updatedRows to QueryStatistics.java where it's available in EventListener callbacks. It comes from TableMutationOperator.java where update/delete queries are issued. --- .../io/trino/dispatcher/FailedDispatchQuery.java | 1 + .../main/java/io/trino/event/QueryMonitor.java | 2 ++ .../io/trino/execution/DistributionSnapshot.java | 1 + .../io/trino/execution/QueryStateMachine.java | 4 ++++ .../main/java/io/trino/execution/QueryStats.java | 10 ++++++++++ .../io/trino/execution/StageStateMachine.java | 3 +++ .../main/java/io/trino/execution/StageStats.java | 11 +++++++++++ .../java/io/trino/operator/DriverContext.java | 5 +++++ .../main/java/io/trino/operator/DriverStats.java | 11 +++++++++++ .../java/io/trino/operator/OperatorContext.java | 7 +++++++ .../java/io/trino/operator/OperatorStats.java | 15 +++++++++++++++ .../java/io/trino/operator/PipelineContext.java | 5 +++++ .../java/io/trino/operator/PipelineStats.java | 12 ++++++++++++ .../io/trino/operator/TableMutationOperator.java | 2 ++ .../main/java/io/trino/operator/TaskContext.java | 3 +++ .../main/java/io/trino/operator/TaskStats.java | 14 ++++++++++++++ .../execution/MockManagedQueryExecution.java | 1 + .../java/io/trino/execution/TestQueryInfo.java | 1 + .../java/io/trino/execution/TestQueryStats.java | 4 ++++ .../io/trino/execution/TestStageStateMachine.java | 1 + .../java/io/trino/execution/TestStageStats.java | 1 + .../TestLeastWastedEffortTaskLowMemoryKiller.java | 1 + ...ervationOnBlockedNodesTaskLowMemoryKiller.java | 1 + .../java/io/trino/operator/TestDriverStats.java | 1 + .../java/io/trino/operator/TestOperatorStats.java | 2 ++ .../java/io/trino/operator/TestPipelineStats.java | 1 + .../java/io/trino/operator/TestTaskStats.java | 1 + .../java/io/trino/server/TestBasicQueryInfo.java | 1 + .../java/io/trino/server/TestQueryStateInfo.java | 1 + core/trino-spi/pom.xml | 14 ++++++++++++++ .../trino/spi/eventlistener/QueryStatistics.java | 11 +++++++++++ .../plugin/httpquery/TestHttpEventListener.java | 1 + .../httpquery/TestHttpServerEventListener.java | 1 + .../plugin/eventlistener/kafka/TestUtils.java | 1 + .../mysql/TestMysqlEventListener.java | 2 ++ .../trino/plugin/openlineage/TrinoEventData.java | 1 + 36 files changed, 154 insertions(+) diff --git a/core/trino-main/src/main/java/io/trino/dispatcher/FailedDispatchQuery.java b/core/trino-main/src/main/java/io/trino/dispatcher/FailedDispatchQuery.java index 1e1f00a645cb..a780aadac4a3 100644 --- a/core/trino-main/src/main/java/io/trino/dispatcher/FailedDispatchQuery.java +++ b/core/trino-main/src/main/java/io/trino/dispatcher/FailedDispatchQuery.java @@ -332,6 +332,7 @@ private static QueryStats immediateFailureQueryStats() DataSize.ofBytes(0), 0, 0, + 0, new Duration(0, MILLISECONDS), new Duration(0, MILLISECONDS), DataSize.ofBytes(0), diff --git a/core/trino-main/src/main/java/io/trino/event/QueryMonitor.java b/core/trino-main/src/main/java/io/trino/event/QueryMonitor.java index 9e0bc49c53bc..d3e9ee4fbc74 100644 --- a/core/trino-main/src/main/java/io/trino/event/QueryMonitor.java +++ b/core/trino-main/src/main/java/io/trino/event/QueryMonitor.java @@ -231,6 +231,7 @@ public void queryImmediateFailureEvent(BasicQueryInfo queryInfo, ExecutionFailur 0, 0, 0, + 0, ImmutableList.of(), 0, true, @@ -343,6 +344,7 @@ private QueryStatistics createQueryStatistics(QueryInfo queryInfo) queryStats.getRawInputPositions(), queryStats.getOutputDataSize().toBytes(), queryStats.getOutputPositions(), + queryStats.getUpdatedPositions(), queryStats.getLogicalWrittenDataSize().toBytes(), queryStats.getWrittenPositions(), queryStats.getSpilledDataSize().toBytes(), diff --git a/core/trino-main/src/main/java/io/trino/execution/DistributionSnapshot.java b/core/trino-main/src/main/java/io/trino/execution/DistributionSnapshot.java index 5323c5f08a54..98f6b35c0df8 100644 --- a/core/trino-main/src/main/java/io/trino/execution/DistributionSnapshot.java +++ b/core/trino-main/src/main/java/io/trino/execution/DistributionSnapshot.java @@ -71,6 +71,7 @@ public static OperatorStats pruneOperatorStats(OperatorStats operatorStats) operatorStats.getGetOutputCpu(), operatorStats.getOutputDataSize(), operatorStats.getOutputPositions(), + operatorStats.getUpdatedPositions(), operatorStats.getDynamicFilterSplitsProcessed(), pruneMetrics(operatorStats.getMetrics()), pruneMetrics(operatorStats.getConnectorMetrics()), diff --git a/core/trino-main/src/main/java/io/trino/execution/QueryStateMachine.java b/core/trino-main/src/main/java/io/trino/execution/QueryStateMachine.java index 83a754e92193..97615297d30c 100644 --- a/core/trino-main/src/main/java/io/trino/execution/QueryStateMachine.java +++ b/core/trino-main/src/main/java/io/trino/execution/QueryStateMachine.java @@ -686,6 +686,7 @@ private QueryStats getQueryStats(Optional rootStage, List long outputDataSize = 0; long failedOutputDataSize = 0; long outputPositions = 0; + long updatedPositions = 0; long failedOutputPositions = 0; long outputBlockedTime = 0; @@ -772,6 +773,7 @@ private QueryStats getQueryStats(Optional rootStage, List outputDataSize += outputStageStats.getOutputDataSize().toBytes(); failedOutputDataSize += outputStageStats.getFailedOutputDataSize().toBytes(); outputPositions += outputStageStats.getOutputPositions(); + updatedPositions += outputStageStats.getUpdatedPositions(); failedOutputPositions += outputStageStats.getFailedOutputPositions(); } @@ -898,6 +900,7 @@ private QueryStats getQueryStats(Optional rootStage, List succinctBytes(outputDataSize), succinctBytes(failedOutputDataSize), outputPositions, + updatedPositions, failedOutputPositions, new Duration(outputBlockedTime, NANOSECONDS).convertToMostSuccinctTimeUnit(), @@ -1502,6 +1505,7 @@ private static QueryStats pruneQueryStats(QueryStats queryStats) queryStats.getOutputDataSize(), queryStats.getFailedOutputDataSize(), queryStats.getOutputPositions(), + queryStats.getUpdatedPositions(), queryStats.getFailedOutputPositions(), queryStats.getOutputBlockedTime(), queryStats.getFailedOutputBlockedTime(), diff --git a/core/trino-main/src/main/java/io/trino/execution/QueryStats.java b/core/trino-main/src/main/java/io/trino/execution/QueryStats.java index bdb0b08a5431..6c7aec2e1ed6 100644 --- a/core/trino-main/src/main/java/io/trino/execution/QueryStats.java +++ b/core/trino-main/src/main/java/io/trino/execution/QueryStats.java @@ -118,6 +118,7 @@ public class QueryStats private final DataSize outputDataSize; private final DataSize failedOutputDataSize; private final long outputPositions; + private final long updatedPositions; private final long failedOutputPositions; private final Duration outputBlockedTime; @@ -213,6 +214,7 @@ public QueryStats( @JsonProperty("outputDataSize") DataSize outputDataSize, @JsonProperty("failedOutputDataSize") DataSize failedOutputDataSize, @JsonProperty("outputPositions") long outputPositions, + @JsonProperty("updatedPositions") long updatedPositions, @JsonProperty("failedOutputPositions") long failedOutputPositions, @JsonProperty("outputBlockedTime") Duration outputBlockedTime, @@ -323,6 +325,8 @@ public QueryStats( this.failedOutputDataSize = requireNonNull(failedOutputDataSize, "failedOutputDataSize is null"); checkArgument(outputPositions >= 0, "outputPositions is negative"); this.outputPositions = outputPositions; + checkArgument(updatedPositions >= 0, "updatedPositions is negative"); + this.updatedPositions = updatedPositions; checkArgument(failedOutputPositions >= 0, "failedOutputPositions is negative"); this.failedOutputPositions = failedOutputPositions; @@ -743,6 +747,12 @@ public long getOutputPositions() return outputPositions; } + @JsonProperty + public long getUpdatedPositions() + { + return updatedPositions; + } + @JsonProperty public long getFailedOutputPositions() { diff --git a/core/trino-main/src/main/java/io/trino/execution/StageStateMachine.java b/core/trino-main/src/main/java/io/trino/execution/StageStateMachine.java index 63a6ca7f8020..c610a8a22c64 100644 --- a/core/trino-main/src/main/java/io/trino/execution/StageStateMachine.java +++ b/core/trino-main/src/main/java/io/trino/execution/StageStateMachine.java @@ -473,6 +473,7 @@ public StageInfo getStageInfo(Supplier> taskInfosSupplier) long outputDataSize = 0; long failedOutputDataSize = 0; long outputPositions = 0; + long updatedPositions = 0; long failedOutputPositions = 0; Metrics.Accumulator outputBufferMetrics = Metrics.accumulator(); @@ -553,6 +554,7 @@ public StageInfo getStageInfo(Supplier> taskInfosSupplier) taskInfo.outputBuffers().getUtilization().ifPresent(bufferUtilizationHistograms::add); outputDataSize += taskStats.getOutputDataSize().toBytes(); outputPositions += taskStats.getOutputPositions(); + updatedPositions += taskStats.getUpdatedPositions(); bufferMetrics.ifPresent(outputBufferMetrics::add); outputBlockedTime += taskStats.getOutputBlockedTime().roundTo(NANOSECONDS); @@ -660,6 +662,7 @@ public StageInfo getStageInfo(Supplier> taskInfosSupplier) succinctBytes(outputDataSize), succinctBytes(failedOutputDataSize), outputPositions, + updatedPositions, failedOutputPositions, outputBufferMetrics.get(), succinctDuration(outputBlockedTime, NANOSECONDS), diff --git a/core/trino-main/src/main/java/io/trino/execution/StageStats.java b/core/trino-main/src/main/java/io/trino/execution/StageStats.java index f2bff77da178..5c971cc91081 100644 --- a/core/trino-main/src/main/java/io/trino/execution/StageStats.java +++ b/core/trino-main/src/main/java/io/trino/execution/StageStats.java @@ -106,6 +106,7 @@ public class StageStats private final DataSize outputDataSize; private final DataSize failedOutputDataSize; private final long outputPositions; + private final long updatedPositions; private final long failedOutputPositions; private final Metrics outputBufferMetrics; @@ -182,6 +183,7 @@ public StageStats( @JsonProperty("outputDataSize") DataSize outputDataSize, @JsonProperty("failedOutputDataSize") DataSize failedOutputDataSize, @JsonProperty("outputPositions") long outputPositions, + @JsonProperty("updatedPositions") long updatedPositions, @JsonProperty("failedOutputPositions") long failedOutputPositions, @JsonProperty("outputBufferMetrics") Metrics outputBufferMetrics, @@ -273,6 +275,8 @@ public StageStats( this.failedOutputDataSize = requireNonNull(failedOutputDataSize, "failedOutputDataSize is null"); checkArgument(outputPositions >= 0, "outputPositions is negative"); this.outputPositions = outputPositions; + checkArgument(updatedPositions >= 0, "updatedPositions is negative"); + this.updatedPositions = updatedPositions; checkArgument(failedOutputPositions >= 0, "failedOutputPositions is negative"); this.failedOutputPositions = failedOutputPositions; this.outputBufferMetrics = requireNonNull(outputBufferMetrics, "outputBufferMetrics is null"); @@ -588,6 +592,12 @@ public long getOutputPositions() return outputPositions; } + @JsonProperty + public long getUpdatedPositions() + { + return updatedPositions; + } + @JsonProperty public long getFailedOutputPositions() { @@ -736,6 +746,7 @@ public static StageStats createInitial() zeroBytes, 0, 0, + 0, Metrics.EMPTY, zeroSeconds, zeroSeconds, diff --git a/core/trino-main/src/main/java/io/trino/operator/DriverContext.java b/core/trino-main/src/main/java/io/trino/operator/DriverContext.java index e087c609bb39..0f20557793d6 100644 --- a/core/trino-main/src/main/java/io/trino/operator/DriverContext.java +++ b/core/trino-main/src/main/java/io/trino/operator/DriverContext.java @@ -346,6 +346,7 @@ public DriverStats getDriverStats() Duration inputBlockedTime; DataSize outputDataSize; long outputPositions; + long updatedPositions; Duration outputBlockedTime; if (inputOperator != null) { physicalInputDataSize = inputOperator.getPhysicalInputDataSize(); @@ -364,6 +365,8 @@ public DriverStats getDriverStats() inputBlockedTime = inputOperator.getBlockedWall(); + updatedPositions = inputOperator.getUpdatedPositions(); + OperatorStats outputOperator = requireNonNull(getLast(operators, null)); outputDataSize = outputOperator.getOutputDataSize(); outputPositions = outputOperator.getOutputPositions(); @@ -389,6 +392,7 @@ public DriverStats getDriverStats() outputDataSize = DataSize.ofBytes(0); outputPositions = 0; + updatedPositions = 0; outputBlockedTime = new Duration(0, MILLISECONDS); } @@ -428,6 +432,7 @@ public DriverStats getDriverStats() inputBlockedTime, outputDataSize.succinct(), outputPositions, + updatedPositions, outputBlockedTime, succinctBytes(physicalWrittenDataSize), operators); diff --git a/core/trino-main/src/main/java/io/trino/operator/DriverStats.java b/core/trino-main/src/main/java/io/trino/operator/DriverStats.java index 3c985a00a355..345b77e0d9bf 100644 --- a/core/trino-main/src/main/java/io/trino/operator/DriverStats.java +++ b/core/trino-main/src/main/java/io/trino/operator/DriverStats.java @@ -67,6 +67,7 @@ public class DriverStats private final DataSize outputDataSize; private final long outputPositions; + private final long updatedPositions; private final Duration outputBlockedTime; @@ -109,6 +110,7 @@ public DriverStats() this.outputDataSize = DataSize.ofBytes(0); this.outputPositions = 0; + this.updatedPositions = 0; this.outputBlockedTime = new Duration(0, MILLISECONDS); @@ -152,6 +154,7 @@ public DriverStats( @JsonProperty("outputDataSize") DataSize outputDataSize, @JsonProperty("outputPositions") long outputPositions, + @JsonProperty("updatedPositions") long updatedPositions, @JsonProperty("outputBlockedTime") Duration outputBlockedTime, @@ -197,6 +200,8 @@ public DriverStats( this.outputDataSize = requireNonNull(outputDataSize, "outputDataSize is null"); checkArgument(outputPositions >= 0, "outputPositions is negative"); this.outputPositions = outputPositions; + checkArgument(updatedPositions >= 0, "updatedPositions is negative"); + this.updatedPositions = updatedPositions; this.outputBlockedTime = requireNonNull(outputBlockedTime, "outputBlockedTime is null"); @@ -357,6 +362,12 @@ public long getOutputPositions() return outputPositions; } + @JsonProperty + public long getUpdatedPositions() + { + return updatedPositions; + } + @JsonProperty public Duration getOutputBlockedTime() { diff --git a/core/trino-main/src/main/java/io/trino/operator/OperatorContext.java b/core/trino-main/src/main/java/io/trino/operator/OperatorContext.java index 6b268c5849a0..3e7106f3005a 100644 --- a/core/trino-main/src/main/java/io/trino/operator/OperatorContext.java +++ b/core/trino-main/src/main/java/io/trino/operator/OperatorContext.java @@ -82,6 +82,7 @@ public class OperatorContext private final OperationTiming getOutputTiming = new OperationTiming(); private final CounterStat outputDataSize = new CounterStat(); private final CounterStat outputPositions = new CounterStat(); + private final CounterStat updatedPositions = new CounterStat(); private final AtomicLong dynamicFilterSplitsProcessed = new AtomicLong(); private final AtomicReference metrics = new AtomicReference<>(Metrics.EMPTY); // this is not incremental, but gets overwritten by the latest value. @@ -490,6 +491,11 @@ public CounterStat getOutputPositions() return outputPositions; } + public CounterStat getUpdatedPositions() + { + return updatedPositions; + } + public long getWriterInputDataSize() { return writerInputDataSize.get(); @@ -554,6 +560,7 @@ public OperatorStats getOperatorStats() new Duration(getOutputTiming.getCpuNanos(), NANOSECONDS).convertToMostSuccinctTimeUnit(), DataSize.ofBytes(outputDataSize.getTotalCount()), outputPositions.getTotalCount(), + updatedPositions.getTotalCount(), dynamicFilterSplitsProcessed.get(), getOperatorMetrics( diff --git a/core/trino-main/src/main/java/io/trino/operator/OperatorStats.java b/core/trino-main/src/main/java/io/trino/operator/OperatorStats.java index a0ec50f4e85d..d112acc97eaf 100644 --- a/core/trino-main/src/main/java/io/trino/operator/OperatorStats.java +++ b/core/trino-main/src/main/java/io/trino/operator/OperatorStats.java @@ -62,6 +62,7 @@ public class OperatorStats private final Duration getOutputCpu; private final DataSize outputDataSize; private final long outputPositions; + private final long updatedPositions; private final long dynamicFilterSplitsProcessed; private final Metrics metrics; @@ -117,6 +118,7 @@ public OperatorStats( @JsonProperty("getOutputCpu") Duration getOutputCpu, @JsonProperty("outputDataSize") DataSize outputDataSize, @JsonProperty("outputPositions") long outputPositions, + @JsonProperty("updatedPositions") long updatedPositions, @JsonProperty("dynamicFilterSplitsProcessed") long dynamicFilterSplitsProcessed, @JsonProperty("metrics") Metrics metrics, @@ -174,6 +176,8 @@ public OperatorStats( this.outputDataSize = requireNonNull(outputDataSize, "outputDataSize is null"); checkArgument(outputPositions >= 0, "outputPositions is negative"); this.outputPositions = outputPositions; + checkArgument(updatedPositions >= 0, "updatedPositions is negative"); + this.updatedPositions = updatedPositions; this.dynamicFilterSplitsProcessed = dynamicFilterSplitsProcessed; this.metrics = requireNonNull(metrics, "metrics is null"); @@ -340,6 +344,12 @@ public long getOutputPositions() return outputPositions; } + @JsonProperty + public long getUpdatedPositions() + { + return updatedPositions; + } + @JsonProperty public long getDynamicFilterSplitsProcessed() { @@ -485,6 +495,7 @@ private OperatorStats add(Iterable operators, Optional p long getOutputCpu = this.getOutputCpu.roundTo(NANOSECONDS); long outputDataSize = this.outputDataSize.toBytes(); long outputPositions = this.outputPositions; + long updatedPositions = this.updatedPositions; long dynamicFilterSplitsProcessed = this.dynamicFilterSplitsProcessed; Metrics.Accumulator metricsAccumulator = Metrics.accumulator().add(this.getMetrics()); @@ -535,6 +546,7 @@ private OperatorStats add(Iterable operators, Optional p getOutputCpu += operator.getGetOutputCpu().roundTo(NANOSECONDS); outputDataSize += operator.getOutputDataSize().toBytes(); outputPositions += operator.getOutputPositions(); + updatedPositions += operator.getUpdatedPositions(); dynamicFilterSplitsProcessed += operator.getDynamicFilterSplitsProcessed(); metricsAccumulator.add(operator.getMetrics()); @@ -596,6 +608,7 @@ private OperatorStats add(Iterable operators, Optional p new Duration(getOutputCpu, NANOSECONDS).convertToMostSuccinctTimeUnit(), DataSize.ofBytes(outputDataSize), outputPositions, + updatedPositions, dynamicFilterSplitsProcessed, metricsAccumulator.get(), @@ -672,6 +685,7 @@ public OperatorStats summarize() getOutputCpu, outputDataSize, outputPositions, + updatedPositions, dynamicFilterSplitsProcessed, metrics, connectorMetrics, @@ -717,6 +731,7 @@ public OperatorStats withPipelineMetrics(Metrics pipelineMetrics) getOutputCpu, outputDataSize, outputPositions, + updatedPositions, dynamicFilterSplitsProcessed, metrics, connectorMetrics, diff --git a/core/trino-main/src/main/java/io/trino/operator/PipelineContext.java b/core/trino-main/src/main/java/io/trino/operator/PipelineContext.java index 9bc61bc88708..ed14ddbdd0bb 100644 --- a/core/trino-main/src/main/java/io/trino/operator/PipelineContext.java +++ b/core/trino-main/src/main/java/io/trino/operator/PipelineContext.java @@ -98,6 +98,7 @@ public class PipelineContext private final CounterStat outputDataSize = new CounterStat(); private final CounterStat outputPositions = new CounterStat(); + private final CounterStat updatedPositions = new CounterStat(); private final AtomicLong outputBlockedTime = new AtomicLong(); @@ -237,6 +238,7 @@ public void driverFinished(DriverContext driverContext) outputDataSize.update(driverStats.getOutputDataSize().toBytes()); outputPositions.update(driverStats.getOutputPositions()); + updatedPositions.update(driverStats.getUpdatedPositions()); outputBlockedTime.getAndAdd(driverStats.getOutputBlockedTime().roundTo(NANOSECONDS)); @@ -403,6 +405,7 @@ public PipelineStats getPipelineStats() long outputDataSize = this.outputDataSize.getTotalCount(); long outputPositions = this.outputPositions.getTotalCount(); + long updatedPositions = this.updatedPositions.getTotalCount(); long outputBlockedTime = this.outputBlockedTime.get(); @@ -455,6 +458,7 @@ public PipelineStats getPipelineStats() outputDataSize += driverStats.getOutputDataSize().toBytes(); outputPositions += driverStats.getOutputPositions(); + updatedPositions += driverStats.getUpdatedPositions(); outputBlockedTime += driverStats.getOutputBlockedTime().roundTo(NANOSECONDS); @@ -538,6 +542,7 @@ else if (pipelineLevelMetrics != Metrics.EMPTY) { succinctBytes(outputDataSize), outputPositions, + updatedPositions, new Duration(outputBlockedTime, NANOSECONDS).convertToMostSuccinctTimeUnit(), diff --git a/core/trino-main/src/main/java/io/trino/operator/PipelineStats.java b/core/trino-main/src/main/java/io/trino/operator/PipelineStats.java index 72a3f823fb72..ab01404a0f7e 100644 --- a/core/trino-main/src/main/java/io/trino/operator/PipelineStats.java +++ b/core/trino-main/src/main/java/io/trino/operator/PipelineStats.java @@ -82,6 +82,7 @@ public class PipelineStats private final DataSize outputDataSize; private final long outputPositions; + private final long updatedPositions; private final Duration outputBlockedTime; @@ -140,6 +141,7 @@ public PipelineStats( @JsonProperty("outputDataSize") DataSize outputDataSize, @JsonProperty("outputPositions") long outputPositions, + @JsonProperty("updatedPositions") long updatedPositions, @JsonProperty("outputBlockedTime") Duration outputBlockedTime, @@ -210,6 +212,8 @@ public PipelineStats( this.outputDataSize = requireNonNull(outputDataSize, "outputDataSize is null"); checkArgument(outputPositions >= 0, "outputPositions is negative"); this.outputPositions = outputPositions; + checkArgument(updatedPositions >= 0, "updatedPositions is negative"); + this.updatedPositions = updatedPositions; this.outputBlockedTime = requireNonNull(outputBlockedTime, "outputBlockedTime is null"); @@ -438,6 +442,12 @@ public long getOutputPositions() return outputPositions; } + @JsonProperty + public long getUpdatedPositions() + { + return updatedPositions; + } + @JsonProperty public Duration getOutputBlockedTime() { @@ -501,6 +511,7 @@ public PipelineStats summarize() inputBlockedTime, outputDataSize, outputPositions, + updatedPositions, outputBlockedTime, physicalWrittenDataSize, summarizeOperatorStats(operatorSummaries), @@ -546,6 +557,7 @@ public PipelineStats pruneDigests() inputBlockedTime, outputDataSize, outputPositions, + updatedPositions, outputBlockedTime, physicalWrittenDataSize, operatorSummaries.stream() diff --git a/core/trino-main/src/main/java/io/trino/operator/TableMutationOperator.java b/core/trino-main/src/main/java/io/trino/operator/TableMutationOperator.java index 7c663af6525f..8931e64b0765 100644 --- a/core/trino-main/src/main/java/io/trino/operator/TableMutationOperator.java +++ b/core/trino-main/src/main/java/io/trino/operator/TableMutationOperator.java @@ -121,6 +121,8 @@ public Page getOutput() OptionalLong rowsUpdatedCount = operation.execute(); + operatorContext.getUpdatedPositions().update(rowsUpdatedCount.orElse(0L)); + return buildUpdatedCountPage(rowsUpdatedCount); } diff --git a/core/trino-main/src/main/java/io/trino/operator/TaskContext.java b/core/trino-main/src/main/java/io/trino/operator/TaskContext.java index e72670b2f4c6..9e64ec9c1457 100644 --- a/core/trino-main/src/main/java/io/trino/operator/TaskContext.java +++ b/core/trino-main/src/main/java/io/trino/operator/TaskContext.java @@ -482,6 +482,7 @@ public TaskStats getTaskStats() long outputDataSize = 0; long outputPositions = 0; + long updatedPositions = 0; long outputBlockedTime = 0; @@ -536,6 +537,7 @@ public TaskStats getTaskStats() if (pipeline.isOutputPipeline()) { outputDataSize += pipeline.getOutputDataSize().toBytes(); outputPositions += pipeline.getOutputPositions(); + updatedPositions += pipeline.getUpdatedPositions(); outputBlockedTime += pipeline.getOutputBlockedTime().roundTo(NANOSECONDS); } @@ -617,6 +619,7 @@ public TaskStats getTaskStats() new Duration(inputBlockedTime, NANOSECONDS).convertToMostSuccinctTimeUnit(), succinctBytes(outputDataSize), outputPositions, + updatedPositions, new Duration(outputBlockedTime, NANOSECONDS).convertToMostSuccinctTimeUnit(), succinctBytes(getWriterInputDataSize()), succinctBytes(physicalWrittenDataSize), diff --git a/core/trino-main/src/main/java/io/trino/operator/TaskStats.java b/core/trino-main/src/main/java/io/trino/operator/TaskStats.java index b8719dd2f7ee..45626902a605 100644 --- a/core/trino-main/src/main/java/io/trino/operator/TaskStats.java +++ b/core/trino-main/src/main/java/io/trino/operator/TaskStats.java @@ -81,6 +81,7 @@ public class TaskStats private final DataSize outputDataSize; private final long outputPositions; + private final long updatedPositions; private final Duration outputBlockedTime; @@ -133,6 +134,7 @@ public TaskStats(DateTime createTime, DateTime endTime) new Duration(0, MILLISECONDS), DataSize.ofBytes(0), 0, + 0, new Duration(0, MILLISECONDS), DataSize.ofBytes(0), DataSize.ofBytes(0), @@ -191,6 +193,7 @@ public TaskStats( @JsonProperty("outputDataSize") DataSize outputDataSize, @JsonProperty("outputPositions") long outputPositions, + @JsonProperty("updatedPositions") long updatedPositions, @JsonProperty("outputBlockedTime") Duration outputBlockedTime, @@ -267,6 +270,8 @@ public TaskStats( this.outputDataSize = requireNonNull(outputDataSize, "outputDataSize is null"); checkArgument(outputPositions >= 0, "outputPositions is negative"); this.outputPositions = outputPositions; + checkArgument(updatedPositions >= 0, "updatedPositions is negative"); + this.updatedPositions = updatedPositions; this.outputBlockedTime = requireNonNull(outputBlockedTime, "outputBlockedTime is null"); @@ -490,6 +495,12 @@ public long getOutputPositions() return outputPositions; } + @JsonProperty + public long getUpdatedPositions() + { + return updatedPositions; + } + @JsonProperty public Duration getOutputBlockedTime() { @@ -597,6 +608,7 @@ public TaskStats summarize() inputBlockedTime, outputDataSize, outputPositions, + updatedPositions, outputBlockedTime, writerInputDataSize, physicalWrittenDataSize, @@ -647,6 +659,7 @@ public TaskStats summarizeFinal() inputBlockedTime, outputDataSize, outputPositions, + updatedPositions, outputBlockedTime, writerInputDataSize, physicalWrittenDataSize, @@ -697,6 +710,7 @@ public TaskStats pruneDigests() inputBlockedTime, outputDataSize, outputPositions, + updatedPositions, outputBlockedTime, writerInputDataSize, physicalWrittenDataSize, diff --git a/core/trino-main/src/test/java/io/trino/execution/MockManagedQueryExecution.java b/core/trino-main/src/test/java/io/trino/execution/MockManagedQueryExecution.java index 4029f63085bc..c527129a7652 100644 --- a/core/trino-main/src/test/java/io/trino/execution/MockManagedQueryExecution.java +++ b/core/trino-main/src/test/java/io/trino/execution/MockManagedQueryExecution.java @@ -252,6 +252,7 @@ public QueryInfo getFullQueryInfo() DataSize.ofBytes(0), 30, 0, + 0, new Duration(223, NANOSECONDS), new Duration(224, NANOSECONDS), diff --git a/core/trino-main/src/test/java/io/trino/execution/TestQueryInfo.java b/core/trino-main/src/test/java/io/trino/execution/TestQueryInfo.java index eb53c6e26a72..5e03b1a538ea 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestQueryInfo.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestQueryInfo.java @@ -318,6 +318,7 @@ private static StageStats createStageStats(int value) succinctBytes(value), value, value, + value, Metrics.EMPTY, Duration.succinctDuration(value, NANOSECONDS), Duration.succinctDuration(value, NANOSECONDS), diff --git a/core/trino-main/src/test/java/io/trino/execution/TestQueryStats.java b/core/trino-main/src/test/java/io/trino/execution/TestQueryStats.java index c389d31bc680..6c156b797ac3 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestQueryStats.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestQueryStats.java @@ -66,6 +66,7 @@ public class TestQueryStats new Duration(114, NANOSECONDS), succinctBytes(116L), 117L, + 0, 1833, Metrics.EMPTY, Metrics.EMPTY, @@ -107,6 +108,7 @@ public class TestQueryStats new Duration(214, NANOSECONDS), succinctBytes(216L), 217L, + 0, 2833, Metrics.EMPTY, Metrics.EMPTY, @@ -148,6 +150,7 @@ public class TestQueryStats new Duration(314, NANOSECONDS), succinctBytes(316L), 317L, + 0, 3833, Metrics.EMPTY, Metrics.EMPTY, @@ -257,6 +260,7 @@ public class TestQueryStats DataSize.ofBytes(43), DataSize.ofBytes(44), 45, + 0, 46, new Duration(103, SECONDS), diff --git a/core/trino-main/src/test/java/io/trino/execution/TestStageStateMachine.java b/core/trino-main/src/test/java/io/trino/execution/TestStageStateMachine.java index b0f6114c08e4..08a22b83d242 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestStageStateMachine.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestStageStateMachine.java @@ -312,6 +312,7 @@ private static TaskStats taskStats(List pipelineContexts, int b new Duration(baseValue, MILLISECONDS), DataSize.ofBytes(baseValue), baseValue, + 0, new Duration(baseValue, MILLISECONDS), DataSize.ofBytes(baseValue), DataSize.ofBytes(baseValue), diff --git a/core/trino-main/src/test/java/io/trino/execution/TestStageStats.java b/core/trino-main/src/test/java/io/trino/execution/TestStageStats.java index df8cde38f918..4f2948c828d7 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestStageStats.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestStageStats.java @@ -96,6 +96,7 @@ public class TestStageStats DataSize.ofBytes(35), DataSize.ofBytes(36), 37, + 0, 38, Metrics.EMPTY, diff --git a/core/trino-main/src/test/java/io/trino/memory/TestLeastWastedEffortTaskLowMemoryKiller.java b/core/trino-main/src/test/java/io/trino/memory/TestLeastWastedEffortTaskLowMemoryKiller.java index 4917316a2958..ab41632c795c 100644 --- a/core/trino-main/src/test/java/io/trino/memory/TestLeastWastedEffortTaskLowMemoryKiller.java +++ b/core/trino-main/src/test/java/io/trino/memory/TestLeastWastedEffortTaskLowMemoryKiller.java @@ -324,6 +324,7 @@ private static TaskInfo buildTaskInfo(TaskId taskId, TaskState state, Duration s new Duration(0, MILLISECONDS), DataSize.ofBytes(0), 0, + 0, new Duration(0, MILLISECONDS), DataSize.ofBytes(0), DataSize.ofBytes(0), diff --git a/core/trino-main/src/test/java/io/trino/memory/TestTotalReservationOnBlockedNodesTaskLowMemoryKiller.java b/core/trino-main/src/test/java/io/trino/memory/TestTotalReservationOnBlockedNodesTaskLowMemoryKiller.java index 60f2552432dd..a60e8559f363 100644 --- a/core/trino-main/src/test/java/io/trino/memory/TestTotalReservationOnBlockedNodesTaskLowMemoryKiller.java +++ b/core/trino-main/src/test/java/io/trino/memory/TestTotalReservationOnBlockedNodesTaskLowMemoryKiller.java @@ -299,6 +299,7 @@ private static TaskInfo buildTaskInfo(TaskId taskId, boolean speculative) new Duration(0, MILLISECONDS), DataSize.ofBytes(0), 0, + 0, new Duration(0, MILLISECONDS), DataSize.ofBytes(0), DataSize.ofBytes(0), diff --git a/core/trino-main/src/test/java/io/trino/operator/TestDriverStats.java b/core/trino-main/src/test/java/io/trino/operator/TestDriverStats.java index 58f671a05a1c..cda2a454bc0f 100644 --- a/core/trino-main/src/test/java/io/trino/operator/TestDriverStats.java +++ b/core/trino-main/src/test/java/io/trino/operator/TestDriverStats.java @@ -63,6 +63,7 @@ public class TestDriverStats DataSize.ofBytes(18), 19, + 0, new Duration(102, NANOSECONDS), diff --git a/core/trino-main/src/test/java/io/trino/operator/TestOperatorStats.java b/core/trino-main/src/test/java/io/trino/operator/TestOperatorStats.java index 0662bc263cb8..69b5d432baf5 100644 --- a/core/trino-main/src/test/java/io/trino/operator/TestOperatorStats.java +++ b/core/trino-main/src/test/java/io/trino/operator/TestOperatorStats.java @@ -63,6 +63,7 @@ public class TestOperatorStats new Duration(11, NANOSECONDS), DataSize.ofBytes(12), 13, + 0, 533, new Metrics(ImmutableMap.of("metrics", new LongCount(42))), new Metrics(ImmutableMap.of("connectorMetrics", new LongCount(43))), @@ -112,6 +113,7 @@ public class TestOperatorStats new Duration(11, NANOSECONDS), DataSize.ofBytes(12), 13, + 0, 533, new Metrics(ImmutableMap.of("metrics", new LongCount(42))), new Metrics(ImmutableMap.of("connectorMetrics", new LongCount(43))), diff --git a/core/trino-main/src/test/java/io/trino/operator/TestPipelineStats.java b/core/trino-main/src/test/java/io/trino/operator/TestPipelineStats.java index 7c622f61a95c..d7c10d851616 100644 --- a/core/trino-main/src/test/java/io/trino/operator/TestPipelineStats.java +++ b/core/trino-main/src/test/java/io/trino/operator/TestPipelineStats.java @@ -80,6 +80,7 @@ public class TestPipelineStats DataSize.ofBytes(18), 19, + 0, new Duration(102, NANOSECONDS), diff --git a/core/trino-main/src/test/java/io/trino/operator/TestTaskStats.java b/core/trino-main/src/test/java/io/trino/operator/TestTaskStats.java index bffb372a199b..9e47e7835695 100644 --- a/core/trino-main/src/test/java/io/trino/operator/TestTaskStats.java +++ b/core/trino-main/src/test/java/io/trino/operator/TestTaskStats.java @@ -77,6 +77,7 @@ public class TestTaskStats DataSize.ofBytes(23), 24, + 0, new Duration(272, NANOSECONDS), diff --git a/core/trino-main/src/test/java/io/trino/server/TestBasicQueryInfo.java b/core/trino-main/src/test/java/io/trino/server/TestBasicQueryInfo.java index f543cdaac64b..b9ba4aed33e5 100644 --- a/core/trino-main/src/test/java/io/trino/server/TestBasicQueryInfo.java +++ b/core/trino-main/src/test/java/io/trino/server/TestBasicQueryInfo.java @@ -123,6 +123,7 @@ public void testConstructor() DataSize.valueOf("47GB"), DataSize.valueOf("48GB"), 49, + 0, 50, new Duration(103, SECONDS), new Duration(104, SECONDS), diff --git a/core/trino-main/src/test/java/io/trino/server/TestQueryStateInfo.java b/core/trino-main/src/test/java/io/trino/server/TestQueryStateInfo.java index 9e7ec6e246d7..3667840c68cf 100644 --- a/core/trino-main/src/test/java/io/trino/server/TestQueryStateInfo.java +++ b/core/trino-main/src/test/java/io/trino/server/TestQueryStateInfo.java @@ -175,6 +175,7 @@ private QueryInfo createQueryInfo(String queryId, QueryState state, String query DataSize.valueOf("36GB"), DataSize.valueOf("37GB"), 38, + 0, 39, new Duration(103, SECONDS), new Duration(104, SECONDS), diff --git a/core/trino-spi/pom.xml b/core/trino-spi/pom.xml index e6358e816586..e6bdbfbc5509 100644 --- a/core/trino-spi/pom.xml +++ b/core/trino-spi/pom.xml @@ -184,6 +184,20 @@ true + + true + java.method.numberOfParametersChanged + A new field updatedRows was added that holds the number of affected rows by DELETE/UPDATE queries + io.trino.spi.eventlistener + io.trino.spi.eventlistener.QueryStatistics + QueryStatistics + <init> + constructor + io.trino:trino-spi:jar:468 + primary + io.trino:trino-spi:jar:469-SNAPSHOT + primary + java.annotation.added diff --git a/core/trino-spi/src/main/java/io/trino/spi/eventlistener/QueryStatistics.java b/core/trino-spi/src/main/java/io/trino/spi/eventlistener/QueryStatistics.java index 72525f07e406..a46c90dd9742 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/eventlistener/QueryStatistics.java +++ b/core/trino-spi/src/main/java/io/trino/spi/eventlistener/QueryStatistics.java @@ -60,6 +60,7 @@ public class QueryStatistics private final long totalRows; private final long outputBytes; private final long outputRows; + private final long updatedRows; private final long writtenBytes; private final long writtenRows; private final long spilledBytes; @@ -122,6 +123,7 @@ public QueryStatistics( long totalRows, long outputBytes, long outputRows, + long updatedRows, long writtenBytes, long writtenRows, long spilledBytes, @@ -169,6 +171,7 @@ public QueryStatistics( totalRows, outputBytes, outputRows, + updatedRows, writtenBytes, writtenRows, spilledBytes, @@ -217,6 +220,7 @@ public QueryStatistics( long totalRows, long outputBytes, long outputRows, + long updatedRows, long writtenBytes, long writtenRows, long spilledBytes, @@ -263,6 +267,7 @@ public QueryStatistics( this.totalRows = totalRows; this.outputBytes = outputBytes; this.outputRows = outputRows; + this.updatedRows = updatedRows; this.writtenBytes = writtenBytes; this.writtenRows = writtenRows; this.spilledBytes = spilledBytes; @@ -460,6 +465,12 @@ public long getOutputRows() return outputRows; } + @JsonProperty + public long getUpdatedRows() + { + return updatedRows; + } + @JsonProperty public long getWrittenBytes() { diff --git a/plugin/trino-http-event-listener/src/test/java/io/trino/plugin/httpquery/TestHttpEventListener.java b/plugin/trino-http-event-listener/src/test/java/io/trino/plugin/httpquery/TestHttpEventListener.java index 8d5fe476e40f..33b15a3f1290 100644 --- a/plugin/trino-http-event-listener/src/test/java/io/trino/plugin/httpquery/TestHttpEventListener.java +++ b/plugin/trino-http-event-listener/src/test/java/io/trino/plugin/httpquery/TestHttpEventListener.java @@ -186,6 +186,7 @@ final class TestHttpEventListener 0L, 0L, 0L, + 0L, 0.0f, Collections.emptyList(), 0, diff --git a/plugin/trino-http-server-event-listener/src/test/java/io/trino/plugin/httpquery/TestHttpServerEventListener.java b/plugin/trino-http-server-event-listener/src/test/java/io/trino/plugin/httpquery/TestHttpServerEventListener.java index 8f2defd9c343..38bbfc198204 100644 --- a/plugin/trino-http-server-event-listener/src/test/java/io/trino/plugin/httpquery/TestHttpServerEventListener.java +++ b/plugin/trino-http-server-event-listener/src/test/java/io/trino/plugin/httpquery/TestHttpServerEventListener.java @@ -157,6 +157,7 @@ final class TestHttpServerEventListener 0L, 0L, 0L, + 0L, 0.0f, Collections.emptyList(), 0, diff --git a/plugin/trino-kafka-event-listener/src/test/java/io/trino/plugin/eventlistener/kafka/TestUtils.java b/plugin/trino-kafka-event-listener/src/test/java/io/trino/plugin/eventlistener/kafka/TestUtils.java index e9bfa85475f4..55f02bed1da0 100644 --- a/plugin/trino-kafka-event-listener/src/test/java/io/trino/plugin/eventlistener/kafka/TestUtils.java +++ b/plugin/trino-kafka-event-listener/src/test/java/io/trino/plugin/eventlistener/kafka/TestUtils.java @@ -148,6 +148,7 @@ private TestUtils() {} 0L, 0L, 0L, + 0L, 0.0f, Collections.emptyList(), 0, diff --git a/plugin/trino-mysql-event-listener/src/test/java/io/trino/plugin/eventlistener/mysql/TestMysqlEventListener.java b/plugin/trino-mysql-event-listener/src/test/java/io/trino/plugin/eventlistener/mysql/TestMysqlEventListener.java index e5a06e514d25..66934aa79997 100644 --- a/plugin/trino-mysql-event-listener/src/test/java/io/trino/plugin/eventlistener/mysql/TestMysqlEventListener.java +++ b/plugin/trino-mysql-event-listener/src/test/java/io/trino/plugin/eventlistener/mysql/TestMysqlEventListener.java @@ -117,6 +117,7 @@ final class TestMysqlEventListener 123L, 124L, 125L, + 0L, 126L, 127L, 1271L, @@ -280,6 +281,7 @@ final class TestMysqlEventListener 123L, 124L, 125L, + 0L, 126L, 127L, 1271L, diff --git a/plugin/trino-openlineage/src/test/java/io/trino/plugin/openlineage/TrinoEventData.java b/plugin/trino-openlineage/src/test/java/io/trino/plugin/openlineage/TrinoEventData.java index 5a6adad0f390..80b94b0ed126 100644 --- a/plugin/trino-openlineage/src/test/java/io/trino/plugin/openlineage/TrinoEventData.java +++ b/plugin/trino-openlineage/src/test/java/io/trino/plugin/openlineage/TrinoEventData.java @@ -128,6 +128,7 @@ private TrinoEventData() 0L, 0L, 0L, + 0L, 0.0f, Collections.emptyList(), 0, From 0f6ec97ac5349cf697d6820d467a9fa1d78a1291 Mon Sep 17 00:00:00 2001 From: Bahram Zaeri Date: Thu, 16 Jan 2025 00:20:03 +0000 Subject: [PATCH 2/5] add method `recordUpdatedPositions` to the OperatorContext class --- .../src/main/java/io/trino/operator/OperatorContext.java | 6 ++++++ .../main/java/io/trino/operator/TableMutationOperator.java | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/core/trino-main/src/main/java/io/trino/operator/OperatorContext.java b/core/trino-main/src/main/java/io/trino/operator/OperatorContext.java index 3e7106f3005a..85e5396fdfff 100644 --- a/core/trino-main/src/main/java/io/trino/operator/OperatorContext.java +++ b/core/trino-main/src/main/java/io/trino/operator/OperatorContext.java @@ -227,6 +227,12 @@ public void recordDynamicFilterSplitProcessed(long dynamicFilterSplits) dynamicFilterSplitsProcessed.getAndAdd(dynamicFilterSplits); } + public void recordUpdatedPositions(long updatedPositions) + { + checkArgument(updatedPositions >= 0, "updatedPositions is negative (%s)", updatedPositions); + this.updatedPositions.update(updatedPositions); + } + /** * Overwrites the metrics with the latest one. * diff --git a/core/trino-main/src/main/java/io/trino/operator/TableMutationOperator.java b/core/trino-main/src/main/java/io/trino/operator/TableMutationOperator.java index 8931e64b0765..00fb19166675 100644 --- a/core/trino-main/src/main/java/io/trino/operator/TableMutationOperator.java +++ b/core/trino-main/src/main/java/io/trino/operator/TableMutationOperator.java @@ -121,7 +121,7 @@ public Page getOutput() OptionalLong rowsUpdatedCount = operation.execute(); - operatorContext.getUpdatedPositions().update(rowsUpdatedCount.orElse(0L)); + operatorContext.recordUpdatedPositions(rowsUpdatedCount.orElse(0L)); return buildUpdatedCountPage(rowsUpdatedCount); } From b1e986a5838525990de33d4a7321e1980d8cfbbf Mon Sep 17 00:00:00 2001 From: Bahram Zaeri Date: Wed, 15 Jan 2025 16:14:23 +0000 Subject: [PATCH 3/5] add new field updatedRows to QueryStatistics.java where it's available in EventListener callbacks. It comes from TableMutationOperator.java where update/delete queries are issued. --- .../io/trino/dispatcher/FailedDispatchQuery.java | 1 + .../main/java/io/trino/event/QueryMonitor.java | 2 ++ .../io/trino/execution/DistributionSnapshot.java | 1 + .../io/trino/execution/QueryStateMachine.java | 4 ++++ .../main/java/io/trino/execution/QueryStats.java | 10 ++++++++++ .../io/trino/execution/StageStateMachine.java | 3 +++ .../main/java/io/trino/execution/StageStats.java | 11 +++++++++++ .../java/io/trino/operator/DriverContext.java | 5 +++++ .../main/java/io/trino/operator/DriverStats.java | 11 +++++++++++ .../java/io/trino/operator/OperatorContext.java | 7 +++++++ .../java/io/trino/operator/OperatorStats.java | 15 +++++++++++++++ .../java/io/trino/operator/PipelineContext.java | 5 +++++ .../java/io/trino/operator/PipelineStats.java | 12 ++++++++++++ .../io/trino/operator/TableMutationOperator.java | 2 ++ .../main/java/io/trino/operator/TaskContext.java | 3 +++ .../main/java/io/trino/operator/TaskStats.java | 14 ++++++++++++++ .../execution/MockManagedQueryExecution.java | 1 + .../java/io/trino/execution/TestQueryInfo.java | 1 + .../java/io/trino/execution/TestQueryStats.java | 4 ++++ .../io/trino/execution/TestStageStateMachine.java | 1 + .../java/io/trino/execution/TestStageStats.java | 1 + .../TestLeastWastedEffortTaskLowMemoryKiller.java | 1 + ...ervationOnBlockedNodesTaskLowMemoryKiller.java | 1 + .../java/io/trino/operator/TestDriverStats.java | 1 + .../java/io/trino/operator/TestOperatorStats.java | 2 ++ .../java/io/trino/operator/TestPipelineStats.java | 1 + .../java/io/trino/operator/TestTaskStats.java | 1 + .../java/io/trino/server/TestBasicQueryInfo.java | 1 + .../java/io/trino/server/TestQueryStateInfo.java | 1 + core/trino-spi/pom.xml | 14 ++++++++++++++ .../trino/spi/eventlistener/QueryStatistics.java | 11 +++++++++++ .../plugin/httpquery/TestHttpEventListener.java | 1 + .../httpquery/TestHttpServerEventListener.java | 1 + .../plugin/eventlistener/kafka/TestUtils.java | 1 + .../mysql/TestMysqlEventListener.java | 2 ++ .../trino/plugin/openlineage/TrinoEventData.java | 1 + 36 files changed, 154 insertions(+) diff --git a/core/trino-main/src/main/java/io/trino/dispatcher/FailedDispatchQuery.java b/core/trino-main/src/main/java/io/trino/dispatcher/FailedDispatchQuery.java index 1e1f00a645cb..a780aadac4a3 100644 --- a/core/trino-main/src/main/java/io/trino/dispatcher/FailedDispatchQuery.java +++ b/core/trino-main/src/main/java/io/trino/dispatcher/FailedDispatchQuery.java @@ -332,6 +332,7 @@ private static QueryStats immediateFailureQueryStats() DataSize.ofBytes(0), 0, 0, + 0, new Duration(0, MILLISECONDS), new Duration(0, MILLISECONDS), DataSize.ofBytes(0), diff --git a/core/trino-main/src/main/java/io/trino/event/QueryMonitor.java b/core/trino-main/src/main/java/io/trino/event/QueryMonitor.java index 9e0bc49c53bc..d3e9ee4fbc74 100644 --- a/core/trino-main/src/main/java/io/trino/event/QueryMonitor.java +++ b/core/trino-main/src/main/java/io/trino/event/QueryMonitor.java @@ -231,6 +231,7 @@ public void queryImmediateFailureEvent(BasicQueryInfo queryInfo, ExecutionFailur 0, 0, 0, + 0, ImmutableList.of(), 0, true, @@ -343,6 +344,7 @@ private QueryStatistics createQueryStatistics(QueryInfo queryInfo) queryStats.getRawInputPositions(), queryStats.getOutputDataSize().toBytes(), queryStats.getOutputPositions(), + queryStats.getUpdatedPositions(), queryStats.getLogicalWrittenDataSize().toBytes(), queryStats.getWrittenPositions(), queryStats.getSpilledDataSize().toBytes(), diff --git a/core/trino-main/src/main/java/io/trino/execution/DistributionSnapshot.java b/core/trino-main/src/main/java/io/trino/execution/DistributionSnapshot.java index 5323c5f08a54..98f6b35c0df8 100644 --- a/core/trino-main/src/main/java/io/trino/execution/DistributionSnapshot.java +++ b/core/trino-main/src/main/java/io/trino/execution/DistributionSnapshot.java @@ -71,6 +71,7 @@ public static OperatorStats pruneOperatorStats(OperatorStats operatorStats) operatorStats.getGetOutputCpu(), operatorStats.getOutputDataSize(), operatorStats.getOutputPositions(), + operatorStats.getUpdatedPositions(), operatorStats.getDynamicFilterSplitsProcessed(), pruneMetrics(operatorStats.getMetrics()), pruneMetrics(operatorStats.getConnectorMetrics()), diff --git a/core/trino-main/src/main/java/io/trino/execution/QueryStateMachine.java b/core/trino-main/src/main/java/io/trino/execution/QueryStateMachine.java index 7e84de77b97d..7c48e112a3cc 100644 --- a/core/trino-main/src/main/java/io/trino/execution/QueryStateMachine.java +++ b/core/trino-main/src/main/java/io/trino/execution/QueryStateMachine.java @@ -686,6 +686,7 @@ private QueryStats getQueryStats(Optional rootStage, List long outputDataSize = 0; long failedOutputDataSize = 0; long outputPositions = 0; + long updatedPositions = 0; long failedOutputPositions = 0; long outputBlockedTime = 0; @@ -772,6 +773,7 @@ private QueryStats getQueryStats(Optional rootStage, List outputDataSize += outputStageStats.getOutputDataSize().toBytes(); failedOutputDataSize += outputStageStats.getFailedOutputDataSize().toBytes(); outputPositions += outputStageStats.getOutputPositions(); + updatedPositions += outputStageStats.getUpdatedPositions(); failedOutputPositions += outputStageStats.getFailedOutputPositions(); } @@ -898,6 +900,7 @@ private QueryStats getQueryStats(Optional rootStage, List succinctBytes(outputDataSize), succinctBytes(failedOutputDataSize), outputPositions, + updatedPositions, failedOutputPositions, new Duration(outputBlockedTime, NANOSECONDS).convertToMostSuccinctTimeUnit(), @@ -1502,6 +1505,7 @@ private static QueryStats pruneQueryStats(QueryStats queryStats) queryStats.getOutputDataSize(), queryStats.getFailedOutputDataSize(), queryStats.getOutputPositions(), + queryStats.getUpdatedPositions(), queryStats.getFailedOutputPositions(), queryStats.getOutputBlockedTime(), queryStats.getFailedOutputBlockedTime(), diff --git a/core/trino-main/src/main/java/io/trino/execution/QueryStats.java b/core/trino-main/src/main/java/io/trino/execution/QueryStats.java index bdb0b08a5431..6c7aec2e1ed6 100644 --- a/core/trino-main/src/main/java/io/trino/execution/QueryStats.java +++ b/core/trino-main/src/main/java/io/trino/execution/QueryStats.java @@ -118,6 +118,7 @@ public class QueryStats private final DataSize outputDataSize; private final DataSize failedOutputDataSize; private final long outputPositions; + private final long updatedPositions; private final long failedOutputPositions; private final Duration outputBlockedTime; @@ -213,6 +214,7 @@ public QueryStats( @JsonProperty("outputDataSize") DataSize outputDataSize, @JsonProperty("failedOutputDataSize") DataSize failedOutputDataSize, @JsonProperty("outputPositions") long outputPositions, + @JsonProperty("updatedPositions") long updatedPositions, @JsonProperty("failedOutputPositions") long failedOutputPositions, @JsonProperty("outputBlockedTime") Duration outputBlockedTime, @@ -323,6 +325,8 @@ public QueryStats( this.failedOutputDataSize = requireNonNull(failedOutputDataSize, "failedOutputDataSize is null"); checkArgument(outputPositions >= 0, "outputPositions is negative"); this.outputPositions = outputPositions; + checkArgument(updatedPositions >= 0, "updatedPositions is negative"); + this.updatedPositions = updatedPositions; checkArgument(failedOutputPositions >= 0, "failedOutputPositions is negative"); this.failedOutputPositions = failedOutputPositions; @@ -743,6 +747,12 @@ public long getOutputPositions() return outputPositions; } + @JsonProperty + public long getUpdatedPositions() + { + return updatedPositions; + } + @JsonProperty public long getFailedOutputPositions() { diff --git a/core/trino-main/src/main/java/io/trino/execution/StageStateMachine.java b/core/trino-main/src/main/java/io/trino/execution/StageStateMachine.java index 63a6ca7f8020..c610a8a22c64 100644 --- a/core/trino-main/src/main/java/io/trino/execution/StageStateMachine.java +++ b/core/trino-main/src/main/java/io/trino/execution/StageStateMachine.java @@ -473,6 +473,7 @@ public StageInfo getStageInfo(Supplier> taskInfosSupplier) long outputDataSize = 0; long failedOutputDataSize = 0; long outputPositions = 0; + long updatedPositions = 0; long failedOutputPositions = 0; Metrics.Accumulator outputBufferMetrics = Metrics.accumulator(); @@ -553,6 +554,7 @@ public StageInfo getStageInfo(Supplier> taskInfosSupplier) taskInfo.outputBuffers().getUtilization().ifPresent(bufferUtilizationHistograms::add); outputDataSize += taskStats.getOutputDataSize().toBytes(); outputPositions += taskStats.getOutputPositions(); + updatedPositions += taskStats.getUpdatedPositions(); bufferMetrics.ifPresent(outputBufferMetrics::add); outputBlockedTime += taskStats.getOutputBlockedTime().roundTo(NANOSECONDS); @@ -660,6 +662,7 @@ public StageInfo getStageInfo(Supplier> taskInfosSupplier) succinctBytes(outputDataSize), succinctBytes(failedOutputDataSize), outputPositions, + updatedPositions, failedOutputPositions, outputBufferMetrics.get(), succinctDuration(outputBlockedTime, NANOSECONDS), diff --git a/core/trino-main/src/main/java/io/trino/execution/StageStats.java b/core/trino-main/src/main/java/io/trino/execution/StageStats.java index f2bff77da178..5c971cc91081 100644 --- a/core/trino-main/src/main/java/io/trino/execution/StageStats.java +++ b/core/trino-main/src/main/java/io/trino/execution/StageStats.java @@ -106,6 +106,7 @@ public class StageStats private final DataSize outputDataSize; private final DataSize failedOutputDataSize; private final long outputPositions; + private final long updatedPositions; private final long failedOutputPositions; private final Metrics outputBufferMetrics; @@ -182,6 +183,7 @@ public StageStats( @JsonProperty("outputDataSize") DataSize outputDataSize, @JsonProperty("failedOutputDataSize") DataSize failedOutputDataSize, @JsonProperty("outputPositions") long outputPositions, + @JsonProperty("updatedPositions") long updatedPositions, @JsonProperty("failedOutputPositions") long failedOutputPositions, @JsonProperty("outputBufferMetrics") Metrics outputBufferMetrics, @@ -273,6 +275,8 @@ public StageStats( this.failedOutputDataSize = requireNonNull(failedOutputDataSize, "failedOutputDataSize is null"); checkArgument(outputPositions >= 0, "outputPositions is negative"); this.outputPositions = outputPositions; + checkArgument(updatedPositions >= 0, "updatedPositions is negative"); + this.updatedPositions = updatedPositions; checkArgument(failedOutputPositions >= 0, "failedOutputPositions is negative"); this.failedOutputPositions = failedOutputPositions; this.outputBufferMetrics = requireNonNull(outputBufferMetrics, "outputBufferMetrics is null"); @@ -588,6 +592,12 @@ public long getOutputPositions() return outputPositions; } + @JsonProperty + public long getUpdatedPositions() + { + return updatedPositions; + } + @JsonProperty public long getFailedOutputPositions() { @@ -736,6 +746,7 @@ public static StageStats createInitial() zeroBytes, 0, 0, + 0, Metrics.EMPTY, zeroSeconds, zeroSeconds, diff --git a/core/trino-main/src/main/java/io/trino/operator/DriverContext.java b/core/trino-main/src/main/java/io/trino/operator/DriverContext.java index e087c609bb39..0f20557793d6 100644 --- a/core/trino-main/src/main/java/io/trino/operator/DriverContext.java +++ b/core/trino-main/src/main/java/io/trino/operator/DriverContext.java @@ -346,6 +346,7 @@ public DriverStats getDriverStats() Duration inputBlockedTime; DataSize outputDataSize; long outputPositions; + long updatedPositions; Duration outputBlockedTime; if (inputOperator != null) { physicalInputDataSize = inputOperator.getPhysicalInputDataSize(); @@ -364,6 +365,8 @@ public DriverStats getDriverStats() inputBlockedTime = inputOperator.getBlockedWall(); + updatedPositions = inputOperator.getUpdatedPositions(); + OperatorStats outputOperator = requireNonNull(getLast(operators, null)); outputDataSize = outputOperator.getOutputDataSize(); outputPositions = outputOperator.getOutputPositions(); @@ -389,6 +392,7 @@ public DriverStats getDriverStats() outputDataSize = DataSize.ofBytes(0); outputPositions = 0; + updatedPositions = 0; outputBlockedTime = new Duration(0, MILLISECONDS); } @@ -428,6 +432,7 @@ public DriverStats getDriverStats() inputBlockedTime, outputDataSize.succinct(), outputPositions, + updatedPositions, outputBlockedTime, succinctBytes(physicalWrittenDataSize), operators); diff --git a/core/trino-main/src/main/java/io/trino/operator/DriverStats.java b/core/trino-main/src/main/java/io/trino/operator/DriverStats.java index 3c985a00a355..345b77e0d9bf 100644 --- a/core/trino-main/src/main/java/io/trino/operator/DriverStats.java +++ b/core/trino-main/src/main/java/io/trino/operator/DriverStats.java @@ -67,6 +67,7 @@ public class DriverStats private final DataSize outputDataSize; private final long outputPositions; + private final long updatedPositions; private final Duration outputBlockedTime; @@ -109,6 +110,7 @@ public DriverStats() this.outputDataSize = DataSize.ofBytes(0); this.outputPositions = 0; + this.updatedPositions = 0; this.outputBlockedTime = new Duration(0, MILLISECONDS); @@ -152,6 +154,7 @@ public DriverStats( @JsonProperty("outputDataSize") DataSize outputDataSize, @JsonProperty("outputPositions") long outputPositions, + @JsonProperty("updatedPositions") long updatedPositions, @JsonProperty("outputBlockedTime") Duration outputBlockedTime, @@ -197,6 +200,8 @@ public DriverStats( this.outputDataSize = requireNonNull(outputDataSize, "outputDataSize is null"); checkArgument(outputPositions >= 0, "outputPositions is negative"); this.outputPositions = outputPositions; + checkArgument(updatedPositions >= 0, "updatedPositions is negative"); + this.updatedPositions = updatedPositions; this.outputBlockedTime = requireNonNull(outputBlockedTime, "outputBlockedTime is null"); @@ -357,6 +362,12 @@ public long getOutputPositions() return outputPositions; } + @JsonProperty + public long getUpdatedPositions() + { + return updatedPositions; + } + @JsonProperty public Duration getOutputBlockedTime() { diff --git a/core/trino-main/src/main/java/io/trino/operator/OperatorContext.java b/core/trino-main/src/main/java/io/trino/operator/OperatorContext.java index 6b268c5849a0..3e7106f3005a 100644 --- a/core/trino-main/src/main/java/io/trino/operator/OperatorContext.java +++ b/core/trino-main/src/main/java/io/trino/operator/OperatorContext.java @@ -82,6 +82,7 @@ public class OperatorContext private final OperationTiming getOutputTiming = new OperationTiming(); private final CounterStat outputDataSize = new CounterStat(); private final CounterStat outputPositions = new CounterStat(); + private final CounterStat updatedPositions = new CounterStat(); private final AtomicLong dynamicFilterSplitsProcessed = new AtomicLong(); private final AtomicReference metrics = new AtomicReference<>(Metrics.EMPTY); // this is not incremental, but gets overwritten by the latest value. @@ -490,6 +491,11 @@ public CounterStat getOutputPositions() return outputPositions; } + public CounterStat getUpdatedPositions() + { + return updatedPositions; + } + public long getWriterInputDataSize() { return writerInputDataSize.get(); @@ -554,6 +560,7 @@ public OperatorStats getOperatorStats() new Duration(getOutputTiming.getCpuNanos(), NANOSECONDS).convertToMostSuccinctTimeUnit(), DataSize.ofBytes(outputDataSize.getTotalCount()), outputPositions.getTotalCount(), + updatedPositions.getTotalCount(), dynamicFilterSplitsProcessed.get(), getOperatorMetrics( diff --git a/core/trino-main/src/main/java/io/trino/operator/OperatorStats.java b/core/trino-main/src/main/java/io/trino/operator/OperatorStats.java index a0ec50f4e85d..d112acc97eaf 100644 --- a/core/trino-main/src/main/java/io/trino/operator/OperatorStats.java +++ b/core/trino-main/src/main/java/io/trino/operator/OperatorStats.java @@ -62,6 +62,7 @@ public class OperatorStats private final Duration getOutputCpu; private final DataSize outputDataSize; private final long outputPositions; + private final long updatedPositions; private final long dynamicFilterSplitsProcessed; private final Metrics metrics; @@ -117,6 +118,7 @@ public OperatorStats( @JsonProperty("getOutputCpu") Duration getOutputCpu, @JsonProperty("outputDataSize") DataSize outputDataSize, @JsonProperty("outputPositions") long outputPositions, + @JsonProperty("updatedPositions") long updatedPositions, @JsonProperty("dynamicFilterSplitsProcessed") long dynamicFilterSplitsProcessed, @JsonProperty("metrics") Metrics metrics, @@ -174,6 +176,8 @@ public OperatorStats( this.outputDataSize = requireNonNull(outputDataSize, "outputDataSize is null"); checkArgument(outputPositions >= 0, "outputPositions is negative"); this.outputPositions = outputPositions; + checkArgument(updatedPositions >= 0, "updatedPositions is negative"); + this.updatedPositions = updatedPositions; this.dynamicFilterSplitsProcessed = dynamicFilterSplitsProcessed; this.metrics = requireNonNull(metrics, "metrics is null"); @@ -340,6 +344,12 @@ public long getOutputPositions() return outputPositions; } + @JsonProperty + public long getUpdatedPositions() + { + return updatedPositions; + } + @JsonProperty public long getDynamicFilterSplitsProcessed() { @@ -485,6 +495,7 @@ private OperatorStats add(Iterable operators, Optional p long getOutputCpu = this.getOutputCpu.roundTo(NANOSECONDS); long outputDataSize = this.outputDataSize.toBytes(); long outputPositions = this.outputPositions; + long updatedPositions = this.updatedPositions; long dynamicFilterSplitsProcessed = this.dynamicFilterSplitsProcessed; Metrics.Accumulator metricsAccumulator = Metrics.accumulator().add(this.getMetrics()); @@ -535,6 +546,7 @@ private OperatorStats add(Iterable operators, Optional p getOutputCpu += operator.getGetOutputCpu().roundTo(NANOSECONDS); outputDataSize += operator.getOutputDataSize().toBytes(); outputPositions += operator.getOutputPositions(); + updatedPositions += operator.getUpdatedPositions(); dynamicFilterSplitsProcessed += operator.getDynamicFilterSplitsProcessed(); metricsAccumulator.add(operator.getMetrics()); @@ -596,6 +608,7 @@ private OperatorStats add(Iterable operators, Optional p new Duration(getOutputCpu, NANOSECONDS).convertToMostSuccinctTimeUnit(), DataSize.ofBytes(outputDataSize), outputPositions, + updatedPositions, dynamicFilterSplitsProcessed, metricsAccumulator.get(), @@ -672,6 +685,7 @@ public OperatorStats summarize() getOutputCpu, outputDataSize, outputPositions, + updatedPositions, dynamicFilterSplitsProcessed, metrics, connectorMetrics, @@ -717,6 +731,7 @@ public OperatorStats withPipelineMetrics(Metrics pipelineMetrics) getOutputCpu, outputDataSize, outputPositions, + updatedPositions, dynamicFilterSplitsProcessed, metrics, connectorMetrics, diff --git a/core/trino-main/src/main/java/io/trino/operator/PipelineContext.java b/core/trino-main/src/main/java/io/trino/operator/PipelineContext.java index 9bc61bc88708..ed14ddbdd0bb 100644 --- a/core/trino-main/src/main/java/io/trino/operator/PipelineContext.java +++ b/core/trino-main/src/main/java/io/trino/operator/PipelineContext.java @@ -98,6 +98,7 @@ public class PipelineContext private final CounterStat outputDataSize = new CounterStat(); private final CounterStat outputPositions = new CounterStat(); + private final CounterStat updatedPositions = new CounterStat(); private final AtomicLong outputBlockedTime = new AtomicLong(); @@ -237,6 +238,7 @@ public void driverFinished(DriverContext driverContext) outputDataSize.update(driverStats.getOutputDataSize().toBytes()); outputPositions.update(driverStats.getOutputPositions()); + updatedPositions.update(driverStats.getUpdatedPositions()); outputBlockedTime.getAndAdd(driverStats.getOutputBlockedTime().roundTo(NANOSECONDS)); @@ -403,6 +405,7 @@ public PipelineStats getPipelineStats() long outputDataSize = this.outputDataSize.getTotalCount(); long outputPositions = this.outputPositions.getTotalCount(); + long updatedPositions = this.updatedPositions.getTotalCount(); long outputBlockedTime = this.outputBlockedTime.get(); @@ -455,6 +458,7 @@ public PipelineStats getPipelineStats() outputDataSize += driverStats.getOutputDataSize().toBytes(); outputPositions += driverStats.getOutputPositions(); + updatedPositions += driverStats.getUpdatedPositions(); outputBlockedTime += driverStats.getOutputBlockedTime().roundTo(NANOSECONDS); @@ -538,6 +542,7 @@ else if (pipelineLevelMetrics != Metrics.EMPTY) { succinctBytes(outputDataSize), outputPositions, + updatedPositions, new Duration(outputBlockedTime, NANOSECONDS).convertToMostSuccinctTimeUnit(), diff --git a/core/trino-main/src/main/java/io/trino/operator/PipelineStats.java b/core/trino-main/src/main/java/io/trino/operator/PipelineStats.java index 72a3f823fb72..ab01404a0f7e 100644 --- a/core/trino-main/src/main/java/io/trino/operator/PipelineStats.java +++ b/core/trino-main/src/main/java/io/trino/operator/PipelineStats.java @@ -82,6 +82,7 @@ public class PipelineStats private final DataSize outputDataSize; private final long outputPositions; + private final long updatedPositions; private final Duration outputBlockedTime; @@ -140,6 +141,7 @@ public PipelineStats( @JsonProperty("outputDataSize") DataSize outputDataSize, @JsonProperty("outputPositions") long outputPositions, + @JsonProperty("updatedPositions") long updatedPositions, @JsonProperty("outputBlockedTime") Duration outputBlockedTime, @@ -210,6 +212,8 @@ public PipelineStats( this.outputDataSize = requireNonNull(outputDataSize, "outputDataSize is null"); checkArgument(outputPositions >= 0, "outputPositions is negative"); this.outputPositions = outputPositions; + checkArgument(updatedPositions >= 0, "updatedPositions is negative"); + this.updatedPositions = updatedPositions; this.outputBlockedTime = requireNonNull(outputBlockedTime, "outputBlockedTime is null"); @@ -438,6 +442,12 @@ public long getOutputPositions() return outputPositions; } + @JsonProperty + public long getUpdatedPositions() + { + return updatedPositions; + } + @JsonProperty public Duration getOutputBlockedTime() { @@ -501,6 +511,7 @@ public PipelineStats summarize() inputBlockedTime, outputDataSize, outputPositions, + updatedPositions, outputBlockedTime, physicalWrittenDataSize, summarizeOperatorStats(operatorSummaries), @@ -546,6 +557,7 @@ public PipelineStats pruneDigests() inputBlockedTime, outputDataSize, outputPositions, + updatedPositions, outputBlockedTime, physicalWrittenDataSize, operatorSummaries.stream() diff --git a/core/trino-main/src/main/java/io/trino/operator/TableMutationOperator.java b/core/trino-main/src/main/java/io/trino/operator/TableMutationOperator.java index 7c663af6525f..8931e64b0765 100644 --- a/core/trino-main/src/main/java/io/trino/operator/TableMutationOperator.java +++ b/core/trino-main/src/main/java/io/trino/operator/TableMutationOperator.java @@ -121,6 +121,8 @@ public Page getOutput() OptionalLong rowsUpdatedCount = operation.execute(); + operatorContext.getUpdatedPositions().update(rowsUpdatedCount.orElse(0L)); + return buildUpdatedCountPage(rowsUpdatedCount); } diff --git a/core/trino-main/src/main/java/io/trino/operator/TaskContext.java b/core/trino-main/src/main/java/io/trino/operator/TaskContext.java index e72670b2f4c6..9e64ec9c1457 100644 --- a/core/trino-main/src/main/java/io/trino/operator/TaskContext.java +++ b/core/trino-main/src/main/java/io/trino/operator/TaskContext.java @@ -482,6 +482,7 @@ public TaskStats getTaskStats() long outputDataSize = 0; long outputPositions = 0; + long updatedPositions = 0; long outputBlockedTime = 0; @@ -536,6 +537,7 @@ public TaskStats getTaskStats() if (pipeline.isOutputPipeline()) { outputDataSize += pipeline.getOutputDataSize().toBytes(); outputPositions += pipeline.getOutputPositions(); + updatedPositions += pipeline.getUpdatedPositions(); outputBlockedTime += pipeline.getOutputBlockedTime().roundTo(NANOSECONDS); } @@ -617,6 +619,7 @@ public TaskStats getTaskStats() new Duration(inputBlockedTime, NANOSECONDS).convertToMostSuccinctTimeUnit(), succinctBytes(outputDataSize), outputPositions, + updatedPositions, new Duration(outputBlockedTime, NANOSECONDS).convertToMostSuccinctTimeUnit(), succinctBytes(getWriterInputDataSize()), succinctBytes(physicalWrittenDataSize), diff --git a/core/trino-main/src/main/java/io/trino/operator/TaskStats.java b/core/trino-main/src/main/java/io/trino/operator/TaskStats.java index b8719dd2f7ee..45626902a605 100644 --- a/core/trino-main/src/main/java/io/trino/operator/TaskStats.java +++ b/core/trino-main/src/main/java/io/trino/operator/TaskStats.java @@ -81,6 +81,7 @@ public class TaskStats private final DataSize outputDataSize; private final long outputPositions; + private final long updatedPositions; private final Duration outputBlockedTime; @@ -133,6 +134,7 @@ public TaskStats(DateTime createTime, DateTime endTime) new Duration(0, MILLISECONDS), DataSize.ofBytes(0), 0, + 0, new Duration(0, MILLISECONDS), DataSize.ofBytes(0), DataSize.ofBytes(0), @@ -191,6 +193,7 @@ public TaskStats( @JsonProperty("outputDataSize") DataSize outputDataSize, @JsonProperty("outputPositions") long outputPositions, + @JsonProperty("updatedPositions") long updatedPositions, @JsonProperty("outputBlockedTime") Duration outputBlockedTime, @@ -267,6 +270,8 @@ public TaskStats( this.outputDataSize = requireNonNull(outputDataSize, "outputDataSize is null"); checkArgument(outputPositions >= 0, "outputPositions is negative"); this.outputPositions = outputPositions; + checkArgument(updatedPositions >= 0, "updatedPositions is negative"); + this.updatedPositions = updatedPositions; this.outputBlockedTime = requireNonNull(outputBlockedTime, "outputBlockedTime is null"); @@ -490,6 +495,12 @@ public long getOutputPositions() return outputPositions; } + @JsonProperty + public long getUpdatedPositions() + { + return updatedPositions; + } + @JsonProperty public Duration getOutputBlockedTime() { @@ -597,6 +608,7 @@ public TaskStats summarize() inputBlockedTime, outputDataSize, outputPositions, + updatedPositions, outputBlockedTime, writerInputDataSize, physicalWrittenDataSize, @@ -647,6 +659,7 @@ public TaskStats summarizeFinal() inputBlockedTime, outputDataSize, outputPositions, + updatedPositions, outputBlockedTime, writerInputDataSize, physicalWrittenDataSize, @@ -697,6 +710,7 @@ public TaskStats pruneDigests() inputBlockedTime, outputDataSize, outputPositions, + updatedPositions, outputBlockedTime, writerInputDataSize, physicalWrittenDataSize, diff --git a/core/trino-main/src/test/java/io/trino/execution/MockManagedQueryExecution.java b/core/trino-main/src/test/java/io/trino/execution/MockManagedQueryExecution.java index 4029f63085bc..c527129a7652 100644 --- a/core/trino-main/src/test/java/io/trino/execution/MockManagedQueryExecution.java +++ b/core/trino-main/src/test/java/io/trino/execution/MockManagedQueryExecution.java @@ -252,6 +252,7 @@ public QueryInfo getFullQueryInfo() DataSize.ofBytes(0), 30, 0, + 0, new Duration(223, NANOSECONDS), new Duration(224, NANOSECONDS), diff --git a/core/trino-main/src/test/java/io/trino/execution/TestQueryInfo.java b/core/trino-main/src/test/java/io/trino/execution/TestQueryInfo.java index eb53c6e26a72..5e03b1a538ea 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestQueryInfo.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestQueryInfo.java @@ -318,6 +318,7 @@ private static StageStats createStageStats(int value) succinctBytes(value), value, value, + value, Metrics.EMPTY, Duration.succinctDuration(value, NANOSECONDS), Duration.succinctDuration(value, NANOSECONDS), diff --git a/core/trino-main/src/test/java/io/trino/execution/TestQueryStats.java b/core/trino-main/src/test/java/io/trino/execution/TestQueryStats.java index c389d31bc680..6c156b797ac3 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestQueryStats.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestQueryStats.java @@ -66,6 +66,7 @@ public class TestQueryStats new Duration(114, NANOSECONDS), succinctBytes(116L), 117L, + 0, 1833, Metrics.EMPTY, Metrics.EMPTY, @@ -107,6 +108,7 @@ public class TestQueryStats new Duration(214, NANOSECONDS), succinctBytes(216L), 217L, + 0, 2833, Metrics.EMPTY, Metrics.EMPTY, @@ -148,6 +150,7 @@ public class TestQueryStats new Duration(314, NANOSECONDS), succinctBytes(316L), 317L, + 0, 3833, Metrics.EMPTY, Metrics.EMPTY, @@ -257,6 +260,7 @@ public class TestQueryStats DataSize.ofBytes(43), DataSize.ofBytes(44), 45, + 0, 46, new Duration(103, SECONDS), diff --git a/core/trino-main/src/test/java/io/trino/execution/TestStageStateMachine.java b/core/trino-main/src/test/java/io/trino/execution/TestStageStateMachine.java index b0f6114c08e4..08a22b83d242 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestStageStateMachine.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestStageStateMachine.java @@ -312,6 +312,7 @@ private static TaskStats taskStats(List pipelineContexts, int b new Duration(baseValue, MILLISECONDS), DataSize.ofBytes(baseValue), baseValue, + 0, new Duration(baseValue, MILLISECONDS), DataSize.ofBytes(baseValue), DataSize.ofBytes(baseValue), diff --git a/core/trino-main/src/test/java/io/trino/execution/TestStageStats.java b/core/trino-main/src/test/java/io/trino/execution/TestStageStats.java index df8cde38f918..4f2948c828d7 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestStageStats.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestStageStats.java @@ -96,6 +96,7 @@ public class TestStageStats DataSize.ofBytes(35), DataSize.ofBytes(36), 37, + 0, 38, Metrics.EMPTY, diff --git a/core/trino-main/src/test/java/io/trino/memory/TestLeastWastedEffortTaskLowMemoryKiller.java b/core/trino-main/src/test/java/io/trino/memory/TestLeastWastedEffortTaskLowMemoryKiller.java index 4917316a2958..ab41632c795c 100644 --- a/core/trino-main/src/test/java/io/trino/memory/TestLeastWastedEffortTaskLowMemoryKiller.java +++ b/core/trino-main/src/test/java/io/trino/memory/TestLeastWastedEffortTaskLowMemoryKiller.java @@ -324,6 +324,7 @@ private static TaskInfo buildTaskInfo(TaskId taskId, TaskState state, Duration s new Duration(0, MILLISECONDS), DataSize.ofBytes(0), 0, + 0, new Duration(0, MILLISECONDS), DataSize.ofBytes(0), DataSize.ofBytes(0), diff --git a/core/trino-main/src/test/java/io/trino/memory/TestTotalReservationOnBlockedNodesTaskLowMemoryKiller.java b/core/trino-main/src/test/java/io/trino/memory/TestTotalReservationOnBlockedNodesTaskLowMemoryKiller.java index 60f2552432dd..a60e8559f363 100644 --- a/core/trino-main/src/test/java/io/trino/memory/TestTotalReservationOnBlockedNodesTaskLowMemoryKiller.java +++ b/core/trino-main/src/test/java/io/trino/memory/TestTotalReservationOnBlockedNodesTaskLowMemoryKiller.java @@ -299,6 +299,7 @@ private static TaskInfo buildTaskInfo(TaskId taskId, boolean speculative) new Duration(0, MILLISECONDS), DataSize.ofBytes(0), 0, + 0, new Duration(0, MILLISECONDS), DataSize.ofBytes(0), DataSize.ofBytes(0), diff --git a/core/trino-main/src/test/java/io/trino/operator/TestDriverStats.java b/core/trino-main/src/test/java/io/trino/operator/TestDriverStats.java index 58f671a05a1c..cda2a454bc0f 100644 --- a/core/trino-main/src/test/java/io/trino/operator/TestDriverStats.java +++ b/core/trino-main/src/test/java/io/trino/operator/TestDriverStats.java @@ -63,6 +63,7 @@ public class TestDriverStats DataSize.ofBytes(18), 19, + 0, new Duration(102, NANOSECONDS), diff --git a/core/trino-main/src/test/java/io/trino/operator/TestOperatorStats.java b/core/trino-main/src/test/java/io/trino/operator/TestOperatorStats.java index 0662bc263cb8..69b5d432baf5 100644 --- a/core/trino-main/src/test/java/io/trino/operator/TestOperatorStats.java +++ b/core/trino-main/src/test/java/io/trino/operator/TestOperatorStats.java @@ -63,6 +63,7 @@ public class TestOperatorStats new Duration(11, NANOSECONDS), DataSize.ofBytes(12), 13, + 0, 533, new Metrics(ImmutableMap.of("metrics", new LongCount(42))), new Metrics(ImmutableMap.of("connectorMetrics", new LongCount(43))), @@ -112,6 +113,7 @@ public class TestOperatorStats new Duration(11, NANOSECONDS), DataSize.ofBytes(12), 13, + 0, 533, new Metrics(ImmutableMap.of("metrics", new LongCount(42))), new Metrics(ImmutableMap.of("connectorMetrics", new LongCount(43))), diff --git a/core/trino-main/src/test/java/io/trino/operator/TestPipelineStats.java b/core/trino-main/src/test/java/io/trino/operator/TestPipelineStats.java index 7c622f61a95c..d7c10d851616 100644 --- a/core/trino-main/src/test/java/io/trino/operator/TestPipelineStats.java +++ b/core/trino-main/src/test/java/io/trino/operator/TestPipelineStats.java @@ -80,6 +80,7 @@ public class TestPipelineStats DataSize.ofBytes(18), 19, + 0, new Duration(102, NANOSECONDS), diff --git a/core/trino-main/src/test/java/io/trino/operator/TestTaskStats.java b/core/trino-main/src/test/java/io/trino/operator/TestTaskStats.java index bffb372a199b..9e47e7835695 100644 --- a/core/trino-main/src/test/java/io/trino/operator/TestTaskStats.java +++ b/core/trino-main/src/test/java/io/trino/operator/TestTaskStats.java @@ -77,6 +77,7 @@ public class TestTaskStats DataSize.ofBytes(23), 24, + 0, new Duration(272, NANOSECONDS), diff --git a/core/trino-main/src/test/java/io/trino/server/TestBasicQueryInfo.java b/core/trino-main/src/test/java/io/trino/server/TestBasicQueryInfo.java index f543cdaac64b..b9ba4aed33e5 100644 --- a/core/trino-main/src/test/java/io/trino/server/TestBasicQueryInfo.java +++ b/core/trino-main/src/test/java/io/trino/server/TestBasicQueryInfo.java @@ -123,6 +123,7 @@ public void testConstructor() DataSize.valueOf("47GB"), DataSize.valueOf("48GB"), 49, + 0, 50, new Duration(103, SECONDS), new Duration(104, SECONDS), diff --git a/core/trino-main/src/test/java/io/trino/server/TestQueryStateInfo.java b/core/trino-main/src/test/java/io/trino/server/TestQueryStateInfo.java index 9e7ec6e246d7..3667840c68cf 100644 --- a/core/trino-main/src/test/java/io/trino/server/TestQueryStateInfo.java +++ b/core/trino-main/src/test/java/io/trino/server/TestQueryStateInfo.java @@ -175,6 +175,7 @@ private QueryInfo createQueryInfo(String queryId, QueryState state, String query DataSize.valueOf("36GB"), DataSize.valueOf("37GB"), 38, + 0, 39, new Duration(103, SECONDS), new Duration(104, SECONDS), diff --git a/core/trino-spi/pom.xml b/core/trino-spi/pom.xml index e6358e816586..e6bdbfbc5509 100644 --- a/core/trino-spi/pom.xml +++ b/core/trino-spi/pom.xml @@ -184,6 +184,20 @@ true + + true + java.method.numberOfParametersChanged + A new field updatedRows was added that holds the number of affected rows by DELETE/UPDATE queries + io.trino.spi.eventlistener + io.trino.spi.eventlistener.QueryStatistics + QueryStatistics + <init> + constructor + io.trino:trino-spi:jar:468 + primary + io.trino:trino-spi:jar:469-SNAPSHOT + primary + java.annotation.added diff --git a/core/trino-spi/src/main/java/io/trino/spi/eventlistener/QueryStatistics.java b/core/trino-spi/src/main/java/io/trino/spi/eventlistener/QueryStatistics.java index 72525f07e406..a46c90dd9742 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/eventlistener/QueryStatistics.java +++ b/core/trino-spi/src/main/java/io/trino/spi/eventlistener/QueryStatistics.java @@ -60,6 +60,7 @@ public class QueryStatistics private final long totalRows; private final long outputBytes; private final long outputRows; + private final long updatedRows; private final long writtenBytes; private final long writtenRows; private final long spilledBytes; @@ -122,6 +123,7 @@ public QueryStatistics( long totalRows, long outputBytes, long outputRows, + long updatedRows, long writtenBytes, long writtenRows, long spilledBytes, @@ -169,6 +171,7 @@ public QueryStatistics( totalRows, outputBytes, outputRows, + updatedRows, writtenBytes, writtenRows, spilledBytes, @@ -217,6 +220,7 @@ public QueryStatistics( long totalRows, long outputBytes, long outputRows, + long updatedRows, long writtenBytes, long writtenRows, long spilledBytes, @@ -263,6 +267,7 @@ public QueryStatistics( this.totalRows = totalRows; this.outputBytes = outputBytes; this.outputRows = outputRows; + this.updatedRows = updatedRows; this.writtenBytes = writtenBytes; this.writtenRows = writtenRows; this.spilledBytes = spilledBytes; @@ -460,6 +465,12 @@ public long getOutputRows() return outputRows; } + @JsonProperty + public long getUpdatedRows() + { + return updatedRows; + } + @JsonProperty public long getWrittenBytes() { diff --git a/plugin/trino-http-event-listener/src/test/java/io/trino/plugin/httpquery/TestHttpEventListener.java b/plugin/trino-http-event-listener/src/test/java/io/trino/plugin/httpquery/TestHttpEventListener.java index 8d5fe476e40f..33b15a3f1290 100644 --- a/plugin/trino-http-event-listener/src/test/java/io/trino/plugin/httpquery/TestHttpEventListener.java +++ b/plugin/trino-http-event-listener/src/test/java/io/trino/plugin/httpquery/TestHttpEventListener.java @@ -186,6 +186,7 @@ final class TestHttpEventListener 0L, 0L, 0L, + 0L, 0.0f, Collections.emptyList(), 0, diff --git a/plugin/trino-http-server-event-listener/src/test/java/io/trino/plugin/httpquery/TestHttpServerEventListener.java b/plugin/trino-http-server-event-listener/src/test/java/io/trino/plugin/httpquery/TestHttpServerEventListener.java index 8f2defd9c343..38bbfc198204 100644 --- a/plugin/trino-http-server-event-listener/src/test/java/io/trino/plugin/httpquery/TestHttpServerEventListener.java +++ b/plugin/trino-http-server-event-listener/src/test/java/io/trino/plugin/httpquery/TestHttpServerEventListener.java @@ -157,6 +157,7 @@ final class TestHttpServerEventListener 0L, 0L, 0L, + 0L, 0.0f, Collections.emptyList(), 0, diff --git a/plugin/trino-kafka-event-listener/src/test/java/io/trino/plugin/eventlistener/kafka/TestUtils.java b/plugin/trino-kafka-event-listener/src/test/java/io/trino/plugin/eventlistener/kafka/TestUtils.java index e9bfa85475f4..55f02bed1da0 100644 --- a/plugin/trino-kafka-event-listener/src/test/java/io/trino/plugin/eventlistener/kafka/TestUtils.java +++ b/plugin/trino-kafka-event-listener/src/test/java/io/trino/plugin/eventlistener/kafka/TestUtils.java @@ -148,6 +148,7 @@ private TestUtils() {} 0L, 0L, 0L, + 0L, 0.0f, Collections.emptyList(), 0, diff --git a/plugin/trino-mysql-event-listener/src/test/java/io/trino/plugin/eventlistener/mysql/TestMysqlEventListener.java b/plugin/trino-mysql-event-listener/src/test/java/io/trino/plugin/eventlistener/mysql/TestMysqlEventListener.java index e5a06e514d25..66934aa79997 100644 --- a/plugin/trino-mysql-event-listener/src/test/java/io/trino/plugin/eventlistener/mysql/TestMysqlEventListener.java +++ b/plugin/trino-mysql-event-listener/src/test/java/io/trino/plugin/eventlistener/mysql/TestMysqlEventListener.java @@ -117,6 +117,7 @@ final class TestMysqlEventListener 123L, 124L, 125L, + 0L, 126L, 127L, 1271L, @@ -280,6 +281,7 @@ final class TestMysqlEventListener 123L, 124L, 125L, + 0L, 126L, 127L, 1271L, diff --git a/plugin/trino-openlineage/src/test/java/io/trino/plugin/openlineage/TrinoEventData.java b/plugin/trino-openlineage/src/test/java/io/trino/plugin/openlineage/TrinoEventData.java index 5a6adad0f390..80b94b0ed126 100644 --- a/plugin/trino-openlineage/src/test/java/io/trino/plugin/openlineage/TrinoEventData.java +++ b/plugin/trino-openlineage/src/test/java/io/trino/plugin/openlineage/TrinoEventData.java @@ -128,6 +128,7 @@ private TrinoEventData() 0L, 0L, 0L, + 0L, 0.0f, Collections.emptyList(), 0, From 093bf868f63c53c056b7d6e333cd4aafcc9c52f0 Mon Sep 17 00:00:00 2001 From: Bahram Zaeri Date: Thu, 16 Jan 2025 00:20:03 +0000 Subject: [PATCH 4/5] add method `recordUpdatedPositions` to the OperatorContext class --- .../src/main/java/io/trino/operator/OperatorContext.java | 6 ++++++ .../main/java/io/trino/operator/TableMutationOperator.java | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/core/trino-main/src/main/java/io/trino/operator/OperatorContext.java b/core/trino-main/src/main/java/io/trino/operator/OperatorContext.java index 3e7106f3005a..85e5396fdfff 100644 --- a/core/trino-main/src/main/java/io/trino/operator/OperatorContext.java +++ b/core/trino-main/src/main/java/io/trino/operator/OperatorContext.java @@ -227,6 +227,12 @@ public void recordDynamicFilterSplitProcessed(long dynamicFilterSplits) dynamicFilterSplitsProcessed.getAndAdd(dynamicFilterSplits); } + public void recordUpdatedPositions(long updatedPositions) + { + checkArgument(updatedPositions >= 0, "updatedPositions is negative (%s)", updatedPositions); + this.updatedPositions.update(updatedPositions); + } + /** * Overwrites the metrics with the latest one. * diff --git a/core/trino-main/src/main/java/io/trino/operator/TableMutationOperator.java b/core/trino-main/src/main/java/io/trino/operator/TableMutationOperator.java index 8931e64b0765..00fb19166675 100644 --- a/core/trino-main/src/main/java/io/trino/operator/TableMutationOperator.java +++ b/core/trino-main/src/main/java/io/trino/operator/TableMutationOperator.java @@ -121,7 +121,7 @@ public Page getOutput() OptionalLong rowsUpdatedCount = operation.execute(); - operatorContext.getUpdatedPositions().update(rowsUpdatedCount.orElse(0L)); + operatorContext.recordUpdatedPositions(rowsUpdatedCount.orElse(0L)); return buildUpdatedCountPage(rowsUpdatedCount); } From e5a37c8b0f3ab0effea8912d4bcf4b82c43cae0a Mon Sep 17 00:00:00 2001 From: Bahram Zaeri Date: Fri, 24 Jan 2025 16:19:19 +0000 Subject: [PATCH 5/5] update the updatedPositions for the queries run through MergeWriterOperator.java --- .../java/io/trino/execution/QueryStateMachine.java | 2 +- .../main/java/io/trino/operator/DriverContext.java | 12 +++++++++--- .../java/io/trino/operator/MergeWriterOperator.java | 2 ++ 3 files changed, 12 insertions(+), 4 deletions(-) diff --git a/core/trino-main/src/main/java/io/trino/execution/QueryStateMachine.java b/core/trino-main/src/main/java/io/trino/execution/QueryStateMachine.java index 7c48e112a3cc..b7324ebd9ada 100644 --- a/core/trino-main/src/main/java/io/trino/execution/QueryStateMachine.java +++ b/core/trino-main/src/main/java/io/trino/execution/QueryStateMachine.java @@ -703,6 +703,7 @@ private QueryStats getQueryStats(Optional rootStage, List ImmutableList.Builder operatorStatsSummary = ImmutableList.builder(); for (StageInfo stageInfo : allStages) { StageStats stageStats = stageInfo.getStageStats(); + updatedPositions += stageStats.getUpdatedPositions(); totalTasks += stageStats.getTotalTasks(); runningTasks += stageStats.getRunningTasks(); completedTasks += stageStats.getCompletedTasks(); @@ -773,7 +774,6 @@ private QueryStats getQueryStats(Optional rootStage, List outputDataSize += outputStageStats.getOutputDataSize().toBytes(); failedOutputDataSize += outputStageStats.getFailedOutputDataSize().toBytes(); outputPositions += outputStageStats.getOutputPositions(); - updatedPositions += outputStageStats.getUpdatedPositions(); failedOutputPositions += outputStageStats.getFailedOutputPositions(); } diff --git a/core/trino-main/src/main/java/io/trino/operator/DriverContext.java b/core/trino-main/src/main/java/io/trino/operator/DriverContext.java index 0f20557793d6..8065c1835778 100644 --- a/core/trino-main/src/main/java/io/trino/operator/DriverContext.java +++ b/core/trino-main/src/main/java/io/trino/operator/DriverContext.java @@ -308,6 +308,14 @@ public List getOperatorStats() .collect(toImmutableList()); } + public Long getUpdatedPositions() + { + return operatorContexts.stream() + .map(OperatorContext::getOperatorStats) + .mapToLong(OperatorStats::getUpdatedPositions) + .sum(); + } + public DriverStats getDriverStats() { long totalScheduledTime = overallTiming.getWallNanos(); @@ -346,7 +354,7 @@ public DriverStats getDriverStats() Duration inputBlockedTime; DataSize outputDataSize; long outputPositions; - long updatedPositions; + long updatedPositions = getUpdatedPositions(); Duration outputBlockedTime; if (inputOperator != null) { physicalInputDataSize = inputOperator.getPhysicalInputDataSize(); @@ -365,8 +373,6 @@ public DriverStats getDriverStats() inputBlockedTime = inputOperator.getBlockedWall(); - updatedPositions = inputOperator.getUpdatedPositions(); - OperatorStats outputOperator = requireNonNull(getLast(operators, null)); outputDataSize = outputOperator.getOutputDataSize(); outputPositions = outputOperator.getOutputPositions(); diff --git a/core/trino-main/src/main/java/io/trino/operator/MergeWriterOperator.java b/core/trino-main/src/main/java/io/trino/operator/MergeWriterOperator.java index 918f31bbe683..8b5ddb5f60c1 100644 --- a/core/trino-main/src/main/java/io/trino/operator/MergeWriterOperator.java +++ b/core/trino-main/src/main/java/io/trino/operator/MergeWriterOperator.java @@ -185,6 +185,8 @@ public Page getOutput() VARBINARY.writeSlice(fragmentBuilder, fragment); } + this.operatorContext.recordUpdatedPositions(rowCount); + return page.build(); }