Skip to content

Commit

Permalink
Fix splits generation from iceberg TableChangesSplitSource
Browse files Browse the repository at this point in the history
  • Loading branch information
jinyangli34 authored and raunaqmorarka committed Jan 23, 2025
1 parent 30def9d commit 6f64088
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public CompletableFuture<ConnectorSplitBatch> getNextBatch(int maxSize)
@Override
public boolean isFinished()
{
return changelogScanIterator != null && !changelogScanIterator.hasNext();
return changelogScanIterator != null && !changelogScanIterator.hasNext() && !fileTasksIterator.hasNext();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,12 +138,9 @@ private static boolean checkOrcFileSorting(Supplier<OrcDataSource> dataSourceSup
@SuppressWarnings({"unchecked", "rawtypes"})
public static boolean checkParquetFileSorting(TrinoInputFile inputFile, String sortColumnName)
{
ParquetMetadata parquetMetadata;
ParquetMetadata parquetMetadata = getParquetFileMetadata(inputFile);
List<BlockMetadata> blocks;
try {
parquetMetadata = MetadataReader.readFooter(
new TrinoParquetDataSource(inputFile, new ParquetReaderOptions(), new FileFormatDataSourceStats()),
Optional.empty());
blocks = parquetMetadata.getBlocks();
}
catch (IOException e) {
Expand Down Expand Up @@ -216,4 +213,16 @@ public static Map<String, Long> getMetadataFileAndUpdatedMillis(TrinoFileSystem
}
return metadataFiles;
}

public static ParquetMetadata getParquetFileMetadata(TrinoInputFile inputFile)
{
try {
return MetadataReader.readFooter(
new TrinoParquetDataSource(inputFile, new ParquetReaderOptions(), new FileFormatDataSourceStats()),
Optional.empty());
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,32 @@
*/
package io.trino.plugin.iceberg;

import com.google.common.collect.Iterables;
import io.trino.Session;
import io.trino.execution.QueryManagerConfig;
import io.trino.filesystem.Location;
import io.trino.operator.OperatorStats;
import io.trino.parquet.metadata.ParquetMetadata;
import io.trino.testing.MaterializedResult;
import io.trino.testing.QueryRunner;
import io.trino.testing.QueryRunner.MaterializedResultWithPlan;
import io.trino.testing.sql.TestTable;
import org.intellij.lang.annotations.Language;
import org.junit.jupiter.api.Test;

import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static io.trino.plugin.iceberg.IcebergFileFormat.PARQUET;
import static io.trino.plugin.iceberg.IcebergTestUtils.checkParquetFileSorting;
import static io.trino.plugin.iceberg.IcebergTestUtils.getParquetFileMetadata;
import static io.trino.plugin.iceberg.IcebergTestUtils.withSmallRowGroups;
import static io.trino.testing.QueryAssertions.assertEqualsIgnoreOrder;
import static java.lang.String.format;
import static java.time.ZoneOffset.UTC;
import static org.assertj.core.api.Assertions.assertThat;

public class TestIcebergParquetConnectorTest
Expand Down Expand Up @@ -146,6 +154,56 @@ public void testPushdownPredicateToParquetAfterColumnRename()
}
}

@Test
void testTableChangesOnMultiRowGroups()
throws Exception
{
try (TestTable table = newTrinoTable(
"test_table_changes_function_multi_row_groups_",
"AS SELECT orderkey, partkey, suppkey FROM tpch.tiny.lineitem WITH NO DATA")) {
long initialSnapshot = getMostRecentSnapshotId(table.getName());
assertUpdate(
withSmallRowGroups(getSession()),
"INSERT INTO %s SELECT orderkey, partkey, suppkey FROM tpch.tiny.lineitem".formatted(table.getName()),
60175L);
long snapshotAfterInsert = getMostRecentSnapshotId(table.getName());
DateTimeFormatter instantMillisFormatter = DateTimeFormatter.ofPattern("uuuu-MM-dd'T'HH:mm:ss.SSSVV").withZone(UTC);
String snapshotAfterInsertTime = getSnapshotTime(table.getName(), snapshotAfterInsert).format(instantMillisFormatter);

// make sure splits are processed in more than one batch
// Decrease parquet row groups size or add more columns if this test fails
String filePath = getOnlyTableFilePath(table.getName());
ParquetMetadata parquetMetadata = getParquetFileMetadata(fileSystem.newInputFile(Location.of(filePath)));
int blocksSize = parquetMetadata.getBlocks().size();
int splitBatchSize = new QueryManagerConfig().getScheduleSplitBatchSize();
assertThat(blocksSize > splitBatchSize && blocksSize % splitBatchSize != 0).isTrue();

assertQuery(
"""
SELECT orderkey, partkey, suppkey, _change_type, _change_version_id, to_iso8601(_change_timestamp), _change_ordinal
FROM TABLE(system.table_changes(CURRENT_SCHEMA, '%s', %s, %s))
""".formatted(table.getName(), initialSnapshot, snapshotAfterInsert),
"SELECT orderkey, partkey, suppkey, 'insert', %s, '%s', 0 FROM lineitem".formatted(snapshotAfterInsert, snapshotAfterInsertTime));
}
}

private String getOnlyTableFilePath(String tableName)
{
return (String) Iterables.getOnlyElement(getQueryRunner().execute(format("SELECT file_path FROM \"%s$files\"", tableName)).getOnlyColumnAsSet());
}

private long getMostRecentSnapshotId(String tableName)
{
return (long) Iterables.getOnlyElement(getQueryRunner().execute(format("SELECT snapshot_id FROM \"%s$snapshots\" ORDER BY committed_at DESC LIMIT 1", tableName))
.getOnlyColumnAsSet());
}

private ZonedDateTime getSnapshotTime(String tableName, long snapshotId)
{
return (ZonedDateTime) Iterables.getOnlyElement(getQueryRunner().execute(format("SELECT committed_at FROM \"%s$snapshots\" WHERE snapshot_id = %s", tableName, snapshotId))
.getOnlyColumnAsSet());
}

@Override
protected boolean isFileSorted(String path, String sortColumnName)
{
Expand Down

0 comments on commit 6f64088

Please sign in to comment.