diff --git a/docs/src/main/sphinx/connector/iceberg.md b/docs/src/main/sphinx/connector/iceberg.md index 37b42efb6534..214198cc9297 100644 --- a/docs/src/main/sphinx/connector/iceberg.md +++ b/docs/src/main/sphinx/connector/iceberg.md @@ -945,7 +945,7 @@ connector using a {doc}`WITH ` clause. - Optionally specifies the file system location URI for the table. * - `format_version` - Optionally specifies the format version of the Iceberg specification to use - for new tables; either `1`, `2` or `3`. Defaults to `2`. Version `2` is required + for new tables; either `1` or `2`. Defaults to `2`. Version `2` is required for row level deletes. * - `max_commit_retry` - Number of times to retry a commit before failing. Defaults to the value of diff --git a/plugin/trino-iceberg/pom.xml b/plugin/trino-iceberg/pom.xml index 5da5c912256a..b130610ad578 100644 --- a/plugin/trino-iceberg/pom.xml +++ b/plugin/trino-iceberg/pom.xml @@ -223,12 +223,6 @@ iceberg-core - - org.apache.iceberg - iceberg-data - ${dep.iceberg.version} - - org.apache.iceberg iceberg-nessie diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/CommitTaskData.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/CommitTaskData.java index a0ba6d319c84..73140ce60f0a 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/CommitTaskData.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/CommitTaskData.java @@ -13,28 +13,22 @@ */ package io.trino.plugin.iceberg; -import com.google.common.collect.ImmutableList; import org.apache.iceberg.FileContent; -import org.apache.iceberg.FileFormat; import java.util.List; import java.util.Optional; -import java.util.OptionalLong; import static java.util.Objects.requireNonNull; public record CommitTaskData( String path, - FileFormat fileFormat, + IcebergFileFormat fileFormat, long fileSizeInBytes, MetricsWrapper metrics, String partitionSpecJson, Optional partitionDataJson, FileContent content, Optional referencedDataFile, - List deletionVectorFiles, - OptionalLong deletionVectorContentOffset, - OptionalLong deletionVectorContentSize, Optional> fileSplitOffsets) { public CommitTaskData @@ -46,9 +40,6 @@ public record CommitTaskData( requireNonNull(partitionDataJson, "partitionDataJson is null"); requireNonNull(content, "content is null"); requireNonNull(referencedDataFile, "referencedDataFile is null"); - deletionVectorFiles = ImmutableList.copyOf(deletionVectorFiles); requireNonNull(fileSplitOffsets, "fileSplitOffsets is null"); - requireNonNull(deletionVectorContentOffset, "deletionVectorContentOffset is null"); - requireNonNull(deletionVectorContentSize, "deletionVectorContentSize is null"); } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergAvroFileWriter.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergAvroFileWriter.java index 0f3393c89a43..854ce8e7324b 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergAvroFileWriter.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergAvroFileWriter.java @@ -18,7 +18,6 @@ import io.trino.spi.Page; import io.trino.spi.TrinoException; import io.trino.spi.type.Type; -import org.apache.iceberg.FileFormat; import org.apache.iceberg.Schema; import org.apache.iceberg.avro.Avro; import org.apache.iceberg.data.Record; @@ -37,7 +36,6 @@ import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_WRITER_OPEN_ERROR; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; import static java.util.Objects.requireNonNull; -import static org.apache.iceberg.FileFormat.AVRO; import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION; public final class IcebergAvroFileWriter @@ -48,7 +46,6 @@ public final class IcebergAvroFileWriter // Use static table name instead of the actual name because it becomes outdated once the table is renamed public static final String AVRO_TABLE_NAME = "table"; - private final String location; private final Schema icebergSchema; private final List types; private final FileAppender avroWriter; @@ -61,7 +58,6 @@ public IcebergAvroFileWriter( List types, HiveCompressionCodec hiveCompressionCodec) { - this.location = requireNonNull(file.location(), "location is null"); this.rollbackAction = requireNonNull(rollbackAction, "rollbackAction null"); this.icebergSchema = requireNonNull(icebergSchema, "icebergSchema is null"); this.types = ImmutableList.copyOf(requireNonNull(types, "types is null")); @@ -75,22 +71,10 @@ public IcebergAvroFileWriter( .build(); } catch (IOException e) { - throw new TrinoException(ICEBERG_WRITER_OPEN_ERROR, "Error creating Avro file: " + location, e); + throw new TrinoException(ICEBERG_WRITER_OPEN_ERROR, "Error creating Avro file: " + file.location(), e); } } - @Override - public FileFormat fileFormat() - { - return AVRO; - } - - @Override - public String location() - { - return location; - } - @Override public long getWrittenBytes() { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java index 965436fe548e..77de8d534b51 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java @@ -52,8 +52,7 @@ public class IcebergConfig { public static final int FORMAT_VERSION_SUPPORT_MIN = 1; - private static final int FORMAT_VERSION_DEFAULT = 2; - public static final int FORMAT_VERSION_SUPPORT_MAX = 3; + public static final int FORMAT_VERSION_SUPPORT_MAX = 2; public static final String EXTENDED_STATISTICS_CONFIG = "iceberg.extended-statistics.enabled"; public static final String EXTENDED_STATISTICS_DESCRIPTION = "Enable collection (ANALYZE) and use of extended statistics."; public static final String COLLECT_EXTENDED_STATISTICS_ON_WRITE_DESCRIPTION = "Collect extended statistics during writes"; @@ -75,7 +74,7 @@ public class IcebergConfig private boolean registerTableProcedureEnabled; private boolean addFilesProcedureEnabled; private Optional hiveCatalogName = Optional.empty(); - private int formatVersion = FORMAT_VERSION_DEFAULT; + private int formatVersion = FORMAT_VERSION_SUPPORT_MAX; private Duration expireSnapshotsMinRetention = new Duration(7, DAYS); private Duration removeOrphanFilesMinRetention = new Duration(7, DAYS); private DataSize targetMaxFileSize = DataSize.of(1, GIGABYTE); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergFileWriter.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergFileWriter.java index 72b4f0c5cf79..f6616bab1c9e 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergFileWriter.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergFileWriter.java @@ -13,9 +13,7 @@ */ package io.trino.plugin.iceberg; -import com.google.common.collect.ImmutableList; import io.trino.plugin.hive.FileWriter; -import org.apache.iceberg.FileFormat; import org.apache.iceberg.Metrics; import java.util.List; @@ -24,15 +22,6 @@ public interface IcebergFileWriter extends FileWriter { - FileFormat fileFormat(); - - String location(); - - default List rewrittenDeleteFiles() - { - return ImmutableList.of(); - } - FileMetrics getFileMetrics(); record FileMetrics(Metrics metrics, Optional> splitOffsets) {} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergFileWriterFactory.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergFileWriterFactory.java index 23cb98fe2ddc..0d4a7049fff4 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergFileWriterFactory.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergFileWriterFactory.java @@ -33,19 +33,14 @@ import io.trino.plugin.hive.HiveCompressionCodec; import io.trino.plugin.hive.NodeVersion; import io.trino.plugin.hive.orc.OrcWriterConfig; -import io.trino.plugin.iceberg.delete.DeletionVectorWriter; import io.trino.plugin.iceberg.fileio.ForwardingOutputFile; import io.trino.spi.TrinoException; import io.trino.spi.connector.ConnectorSession; import io.trino.spi.type.Type; import io.trino.spi.type.TypeManager; -import org.apache.iceberg.FileFormat; import org.apache.iceberg.MetricsConfig; -import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; -import org.apache.iceberg.deletes.PositionDeleteIndex; import org.apache.iceberg.types.Types; -import org.apache.iceberg.util.DeleteFileSet; import org.weakref.jmx.Managed; import java.io.Closeable; @@ -53,12 +48,10 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.IntStream; import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkState; import static com.google.common.collect.ImmutableList.toImmutableList; import static io.trino.plugin.hive.HiveCompressionCodecs.toCompressionCodec; import static io.trino.plugin.hive.HiveMetadata.TRINO_QUERY_ID_NAME; @@ -89,7 +82,6 @@ import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; import static java.lang.String.format; import static java.util.Objects.requireNonNull; -import static org.apache.iceberg.MetadataColumns.DELETE_FILE_POS; import static org.apache.iceberg.TableProperties.DEFAULT_WRITE_METRICS_MODE; import static org.apache.iceberg.io.DeleteSchemaUtil.pathPosSchema; import static org.apache.iceberg.parquet.ParquetSchemaUtil.convert; @@ -147,37 +139,16 @@ public IcebergFileWriter createPositionDeleteWriter( TrinoFileSystem fileSystem, Location outputPath, ConnectorSession session, - String dataFilePath, - FileFormat fileFormat, - PartitionSpec partitionSpec, - Optional partition, - Map storageProperties, - Map previousDeleteFiles) + IcebergFileFormat fileFormat, + Map storageProperties) { return switch (fileFormat) { - case PUFFIN -> createDeletionVectorWriter(nodeVersion, fileSystem, outputPath, dataFilePath, partitionSpec, partition, previousDeleteFiles); case PARQUET -> createParquetWriter(FULL_METRICS_CONFIG, fileSystem, outputPath, POSITION_DELETE_SCHEMA, session, storageProperties); case ORC -> createOrcWriter(FULL_METRICS_CONFIG, fileSystem, outputPath, POSITION_DELETE_SCHEMA, session, storageProperties, DataSize.ofBytes(Integer.MAX_VALUE)); case AVRO -> createAvroWriter(fileSystem, outputPath, POSITION_DELETE_SCHEMA, session); - case METADATA -> throw new IllegalArgumentException("Unexpected METADATA file format"); }; } - private static DeletionVectorWriter createDeletionVectorWriter( - NodeVersion nodeVersion, - TrinoFileSystem fileSystem, - Location outputPath, - String dataFilePath, - PartitionSpec partitionSpec, - Optional partition, - Map previousDeleteFiles) - { - Function previousDeleteLoader = DeletionVectorWriter.create(fileSystem, previousDeleteFiles); - int positionChannel = POSITION_DELETE_SCHEMA.columns().indexOf(DELETE_FILE_POS); - checkState(positionChannel != -1, "positionChannel not found"); - return new DeletionVectorWriter(nodeVersion, fileSystem, outputPath, dataFilePath, partitionSpec, partition, previousDeleteLoader::apply, positionChannel); - } - private IcebergFileWriter createParquetWriter( MetricsConfig metricsConfig, TrinoFileSystem fileSystem, @@ -263,7 +234,6 @@ private IcebergFileWriter createOrcWriter( } return new IcebergOrcFileWriter( - outputPath, metricsConfig, icebergSchema, orcDataSink, diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMergeSink.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMergeSink.java index 747e126ff9c6..f6095c9d4bd9 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMergeSink.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMergeSink.java @@ -30,7 +30,6 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.io.LocationProvider; import org.apache.iceberg.types.Type; -import org.apache.iceberg.util.DeleteFileSet; import org.roaringbitmap.longlong.ImmutableLongBitmapDataProvider; import org.roaringbitmap.longlong.LongBitmapDataProvider; import org.roaringbitmap.longlong.Roaring64Bitmap; @@ -56,10 +55,8 @@ public class IcebergMergeSink private final LocationProvider locationProvider; private final IcebergFileWriterFactory fileWriterFactory; private final TrinoFileSystem fileSystem; - private final Map previousDeleteFiles; private final JsonCodec jsonCodec; private final ConnectorSession session; - private final int formatVersion; private final IcebergFileFormat fileFormat; private final Map storageProperties; private final Schema schema; @@ -72,10 +69,8 @@ public IcebergMergeSink( LocationProvider locationProvider, IcebergFileWriterFactory fileWriterFactory, TrinoFileSystem fileSystem, - Map previousDeleteFiles, JsonCodec jsonCodec, ConnectorSession session, - int formatVersion, IcebergFileFormat fileFormat, Map storageProperties, Schema schema, @@ -86,10 +81,8 @@ public IcebergMergeSink( this.locationProvider = requireNonNull(locationProvider, "locationProvider is null"); this.fileWriterFactory = requireNonNull(fileWriterFactory, "fileWriterFactory is null"); this.fileSystem = requireNonNull(fileSystem, "fileSystem is null"); - this.previousDeleteFiles = ImmutableMap.copyOf(previousDeleteFiles); this.jsonCodec = requireNonNull(jsonCodec, "jsonCodec is null"); this.session = requireNonNull(session, "session is null"); - this.formatVersion = formatVersion; this.fileFormat = requireNonNull(fileFormat, "fileFormat is null"); this.storageProperties = ImmutableMap.copyOf(requireNonNull(storageProperties, "storageProperties is null")); this.schema = requireNonNull(schema, "schema is null"); @@ -169,10 +162,8 @@ private PositionDeleteWriter createPositionDeleteWriter(String dataFilePath, Par fileSystem, jsonCodec, session, - formatVersion, fileFormat, - storageProperties, - previousDeleteFiles); + storageProperties); } private static Collection writePositionDeletes(PositionDeleteWriter writer, ImmutableLongBitmapDataProvider rowsToDelete) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index 3b3d725cbcc3..c94b4c39c258 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -47,7 +47,6 @@ import io.trino.plugin.iceberg.aggregation.DataSketchStateSerializer; import io.trino.plugin.iceberg.aggregation.IcebergThetaSketchForStats; import io.trino.plugin.iceberg.catalog.TrinoCatalog; -import io.trino.plugin.iceberg.delete.PositionDeleteFiles; import io.trino.plugin.iceberg.functions.IcebergFunctionProvider; import io.trino.plugin.iceberg.procedure.IcebergAddFilesFromTableHandle; import io.trino.plugin.iceberg.procedure.IcebergAddFilesHandle; @@ -139,7 +138,6 @@ import org.apache.iceberg.AppendFiles; import org.apache.iceberg.BaseTable; import org.apache.iceberg.ContentFile; -import org.apache.iceberg.ContentFileParsers; import org.apache.iceberg.DataFile; import org.apache.iceberg.DataFiles; import org.apache.iceberg.DeleteFile; @@ -306,7 +304,6 @@ import static io.trino.plugin.iceberg.IcebergTableProperties.SORTED_BY_PROPERTY; import static io.trino.plugin.iceberg.IcebergTableProperties.getPartitioning; import static io.trino.plugin.iceberg.IcebergTableProperties.getTableLocation; -import static io.trino.plugin.iceberg.IcebergTableProperties.validateFormatVersion; import static io.trino.plugin.iceberg.IcebergUtil.buildPath; import static io.trino.plugin.iceberg.IcebergUtil.canEnforceColumnConstraintInSpecs; import static io.trino.plugin.iceberg.IcebergUtil.checkFormatForProperty; @@ -405,7 +402,6 @@ import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX; import static org.apache.iceberg.TableProperties.WRITE_DATA_LOCATION; import static org.apache.iceberg.TableProperties.WRITE_LOCATION_PROVIDER_IMPL; -import static org.apache.iceberg.TableUtil.formatVersion; import static org.apache.iceberg.expressions.Expressions.alwaysTrue; import static org.apache.iceberg.types.TypeUtil.indexParents; import static org.apache.iceberg.util.LocationUtil.stripTrailingSlash; @@ -416,8 +412,8 @@ public class IcebergMetadata { private static final Logger log = Logger.get(IcebergMetadata.class); private static final Pattern PATH_PATTERN = Pattern.compile("(.*)/[^/]+"); - private static final int OPTIMIZE_MAX_SUPPORTED_TABLE_VERSION = 3; - private static final int CLEANING_UP_PROCEDURES_MAX_SUPPORTED_TABLE_VERSION = 3; + private static final int OPTIMIZE_MAX_SUPPORTED_TABLE_VERSION = 2; + private static final int CLEANING_UP_PROCEDURES_MAX_SUPPORTED_TABLE_VERSION = 2; private static final String RETENTION_THRESHOLD = "retention_threshold"; private static final String UNKNOWN_SNAPSHOT_TOKEN = "UNKNOWN"; public static final Set UPDATABLE_TABLE_PROPERTIES = ImmutableSet.builder() @@ -1287,7 +1283,7 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con "to use unique table locations for every table.", location)); } } - return newWritableTableHandle(tableMetadata.getTable(), transaction.table(), ImmutableList.of(), retryMode); + return newWritableTableHandle(tableMetadata.getTable(), transaction.table(), retryMode); } catch (IOException e) { throw new TrinoException(ICEBERG_FILESYSTEM_ERROR, "Failed checking new table's location: " + location, e); @@ -1394,7 +1390,7 @@ public ConnectorInsertTableHandle beginInsert(ConnectorSession session, Connecto beginTransaction(icebergTable); - return newWritableTableHandle(table.getSchemaTableName(), icebergTable, ImmutableList.of(), retryMode); + return newWritableTableHandle(table.getSchemaTableName(), icebergTable, retryMode); } private List getChildNamespaces(ConnectorSession session, String parentNamespace) @@ -1410,21 +1406,15 @@ private List getChildNamespaces(ConnectorSession session, String parentN .collect(toImmutableList()); } - private IcebergWritableTableHandle newWritableTableHandle( - SchemaTableName name, - Table table, - List previousDeleteFiles, - RetryMode retryMode) + private IcebergWritableTableHandle newWritableTableHandle(SchemaTableName name, Table table, RetryMode retryMode) { return new IcebergWritableTableHandle( name, - formatVersion(table), SchemaParser.toJson(table.schema()), transformValues(table.specs(), PartitionSpecParser::toJson), table.spec().specId(), getSupportedSortFields(table.schema(), table.sortOrder()), getProjectedColumns(table.schema(), typeManager), - previousDeleteFiles, table.location(), getFileFormat(table), table.properties(), @@ -3042,11 +3032,7 @@ public ColumnHandle getMergeRowIdColumnHandle(ConnectorSession session, Connecto @Override public Optional getUpdateLayout(ConnectorSession session, ConnectorTableHandle tableHandle) { - Optional insertLayout = getInsertLayout(session, tableHandle); - if (insertLayout.isEmpty() || insertLayout.get().getPartitioning().isEmpty()) { - return Optional.of(new IcebergPartitioningHandle(true, List.of())); - } - return insertLayout + return getInsertLayout(session, tableHandle) .flatMap(ConnectorTableLayout::getPartitioning) .map(IcebergPartitioningHandle.class::cast) .map(IcebergPartitioningHandle::forUpdate); @@ -3063,36 +3049,10 @@ public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorT beginTransaction(icebergTable); - List previousDeleteFiles = loadPreviousDeleteFiles(icebergTable); - IcebergWritableTableHandle insertHandle = newWritableTableHandle(table.getSchemaTableName(), icebergTable, previousDeleteFiles, retryMode); + IcebergWritableTableHandle insertHandle = newWritableTableHandle(table.getSchemaTableName(), icebergTable, retryMode); return new IcebergMergeTableHandle(table, insertHandle); } - private static List loadPreviousDeleteFiles(Table icebergTable) - { - int formatVersion = formatVersion(icebergTable); - validateFormatVersion(formatVersion); - if (formatVersion < 3) { - return ImmutableList.of(); - } - - ImmutableList.Builder rewritableDeletes = ImmutableList.builder(); - try (CloseableIterable iterator = icebergTable.newScan().planFiles()) { - for (FileScanTask task : iterator) { - rewritableDeletes.add(new PositionDeleteFiles( - task.file().location(), - task.spec().specId(), - task.deletes().stream() - .map(deleteFile -> ContentFileParsers.toJson(deleteFile, task.spec())) - .collect(toImmutableList()))); - } - } - catch (IOException e) { - throw new UncheckedIOException(e); - } - return rewritableDeletes.build(); - } - @Override public void finishMerge(ConnectorSession session, ConnectorMergeTableHandle mergeTableHandle, List sourceTableHandles, Collection fragments, Collection computedStatistics) { @@ -3166,21 +3126,10 @@ private void finishWrite(ConnectorSession session, IcebergTableHandle table, Col case POSITION_DELETES -> { FileMetadata.Builder deleteBuilder = FileMetadata.deleteFileBuilder(partitionSpec) .withPath(task.path()) - .withFormat(task.fileFormat()) + .withFormat(task.fileFormat().toIceberg()) .ofPositionDeletes() .withFileSizeInBytes(task.fileSizeInBytes()) .withMetrics(task.metrics().metrics()); - - if (task.fileFormat() == FileFormat.PUFFIN) { - deleteBuilder.withRecordCount(task.metrics().recordCount()); - deleteBuilder.withContentOffset(task.deletionVectorContentOffset().orElseThrow(() -> new IllegalStateException("deletionVectorContentOffset is missing while constructing deletion vector"))); - deleteBuilder.withContentSizeInBytes(task.deletionVectorContentSize().orElseThrow(() -> new IllegalStateException("deletionVectorContentSize is missing while constructing deletion vector"))); - deleteBuilder.withReferencedDataFile(task.referencedDataFile().orElseThrow(() -> new IllegalStateException("referencedDataFile is missing while constructing deletion vector"))); - for (String rewrittenDeleteFile : task.deletionVectorFiles()) { - rowDelta.removeDeletes((DeleteFile) ContentFileParsers.fromJson(rewrittenDeleteFile, partitionSpec)); - } - } - task.fileSplitOffsets().ifPresent(deleteBuilder::withSplitOffsets); if (!partitionSpec.fields().isEmpty()) { String partitionDataJson = task.partitionDataJson() @@ -3194,7 +3143,7 @@ private void finishWrite(ConnectorSession session, IcebergTableHandle table, Col case DATA -> { DataFiles.Builder builder = DataFiles.builder(partitionSpec) .withPath(task.path()) - .withFormat(task.fileFormat()) + .withFormat(task.fileFormat().toIceberg()) .withFileSizeInBytes(task.fileSizeInBytes()) .withMetrics(task.metrics().metrics()); if (!icebergTable.spec().fields().isEmpty()) { @@ -3731,8 +3680,7 @@ && getOnlyElement(sourceTableHandles) instanceof IcebergTableHandle handle fromSnapshotForRefresh = Optional.of(Long.parseLong(sourceTable.getValue())); } - List rewritableDeletes = loadPreviousDeleteFiles(icebergTable); - return newWritableTableHandle(table.getSchemaTableName(), icebergTable, rewritableDeletes, retryMode); + return newWritableTableHandle(table.getSchemaTableName(), icebergTable, retryMode); } @Override diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergOrcFileWriter.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergOrcFileWriter.java index ddb8656757e3..3ec48539315e 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergOrcFileWriter.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergOrcFileWriter.java @@ -14,7 +14,6 @@ package io.trino.plugin.iceberg; import io.airlift.log.Logger; -import io.trino.filesystem.Location; import io.trino.orc.OrcDataSink; import io.trino.orc.OrcDataSource; import io.trino.orc.OrcWriteValidation.OrcWriteValidationMode; @@ -29,7 +28,6 @@ import io.trino.spi.block.Block; import io.trino.spi.block.RunLengthEncodedBlock; import io.trino.spi.type.Type; -import org.apache.iceberg.FileFormat; import org.apache.iceberg.MetricsConfig; import org.apache.iceberg.Schema; @@ -51,7 +49,6 @@ import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_WRITE_VALIDATION_FAILED; import static io.trino.plugin.iceberg.util.OrcMetrics.computeMetrics; import static java.util.Objects.requireNonNull; -import static org.apache.iceberg.FileFormat.ORC; public final class IcebergOrcFileWriter implements IcebergFileWriter @@ -61,7 +58,6 @@ public final class IcebergOrcFileWriter private static final ThreadMXBean THREAD_MX_BEAN = ManagementFactory.getThreadMXBean(); private final OrcWriter orcWriter; - private final Location outputPath; private final Schema icebergSchema; private final ColumnMetadata orcColumns; private final MetricsConfig metricsConfig; @@ -72,7 +68,6 @@ public final class IcebergOrcFileWriter private long validationCpuNanos; public IcebergOrcFileWriter( - Location outputPath, MetricsConfig metricsConfig, Schema icebergSchema, OrcDataSink orcDataSink, @@ -89,7 +84,6 @@ public IcebergOrcFileWriter( OrcWriterStats stats) { requireNonNull(orcDataSink, "orcDataSink is null"); - this.outputPath = requireNonNull(outputPath, "outputPath is null"); this.rollbackAction = requireNonNull(rollbackAction, "rollbackAction is null"); this.fileInputColumnIndexes = requireNonNull(fileInputColumnIndexes, "fileInputColumnIndexes is null"); @@ -114,18 +108,6 @@ public IcebergOrcFileWriter( orcColumns = fileColumnOrcTypes; } - @Override - public FileFormat fileFormat() - { - return ORC; - } - - @Override - public String location() - { - return outputPath.toString(); - } - @Override public FileMetrics getFileMetrics() { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSink.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSink.java index 7826d1c71a6a..3432159270f5 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSink.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSink.java @@ -54,7 +54,6 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; -import java.util.OptionalLong; import java.util.concurrent.CompletableFuture; import java.util.function.Function; @@ -425,16 +424,13 @@ private void closeWriter(int writerIndex) CommitTaskData task = new CommitTaskData( writeContext.getPath(), - writer.fileFormat(), + fileFormat, writer.getWrittenBytes(), new MetricsWrapper(writer.getFileMetrics().metrics()), PartitionSpecParser.toJson(partitionSpec), writeContext.getPartitionData().map(PartitionData::toJson), DATA, Optional.empty(), - writer.rewrittenDeleteFiles(), - OptionalLong.empty(), - OptionalLong.empty(), writer.getFileMetrics().splitOffsets()); commitTasks.add(wrappedBuffer(jsonCodec.toJsonBytes(task))); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSinkProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSinkProvider.java index 298230f5f093..bc4d694483f7 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSinkProvider.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSinkProvider.java @@ -17,7 +17,6 @@ import io.airlift.json.JsonCodec; import io.airlift.units.DataSize; import io.trino.plugin.hive.SortingFileWriterConfig; -import io.trino.plugin.iceberg.delete.PositionDeleteFiles; import io.trino.plugin.iceberg.procedure.IcebergOptimizeHandle; import io.trino.plugin.iceberg.procedure.IcebergTableExecuteHandle; import io.trino.spi.PageIndexerFactory; @@ -33,19 +32,14 @@ import io.trino.spi.connector.ConnectorTableExecuteHandle; import io.trino.spi.connector.ConnectorTransactionHandle; import io.trino.spi.type.TypeManager; -import org.apache.iceberg.ContentFileParsers; -import org.apache.iceberg.DeleteFile; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.PartitionSpecParser; import org.apache.iceberg.Schema; import org.apache.iceberg.SchemaParser; import org.apache.iceberg.io.LocationProvider; -import org.apache.iceberg.util.DeleteFileSet; import java.util.Map; -import static com.google.common.collect.ImmutableList.toImmutableList; -import static com.google.common.collect.ImmutableMap.toImmutableMap; import static com.google.common.collect.Maps.transformValues; import static io.trino.plugin.iceberg.IcebergUtil.getLocationProvider; import static java.util.Objects.requireNonNull; @@ -173,19 +167,13 @@ public ConnectorMergeSink createMergeSink(ConnectorTransactionHandle transaction Schema schema = SchemaParser.fromJson(tableHandle.schemaAsJson()); Map partitionsSpecs = transformValues(tableHandle.partitionsSpecsAsJson(), json -> PartitionSpecParser.fromJson(schema, json)); ConnectorPageSink pageSink = createPageSink(session, tableHandle); - Map previousDeleteFiles = tableHandle.previousDeleteFiles().stream() - .collect(toImmutableMap(PositionDeleteFiles::dataFileLocation, file -> DeleteFileSet.of(file.deletes().stream() - .map(delete -> (DeleteFile) ContentFileParsers.fromJson(delete, partitionsSpecs.get(file.partitionSpecId()))) - .collect(toImmutableList())))); return new IcebergMergeSink( locationProvider, fileWriterFactory, fileSystemFactory.create(session.getIdentity(), tableHandle.fileIoProperties()), - previousDeleteFiles, jsonCodec, session, - tableHandle.formatVersion(), tableHandle.fileFormat(), tableHandle.storageProperties(), schema, diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java index 0f02aada2787..bf2e9bf21af6 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java @@ -349,7 +349,6 @@ public ConnectorPageSource createPageSource( if (!deletes.isEmpty()) { Supplier> deletePredicate = memoize(() -> getDeleteManager(partitionSpec, partitionData) .getDeletePredicate( - fileSystem, path, dataSequenceNumber, deletes, diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergParquetFileWriter.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergParquetFileWriter.java index 666fc21f6192..06cccccebe3c 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergParquetFileWriter.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergParquetFileWriter.java @@ -22,7 +22,6 @@ import io.trino.spi.Page; import io.trino.spi.TrinoException; import io.trino.spi.type.Type; -import org.apache.iceberg.FileFormat; import org.apache.iceberg.MetricsConfig; import org.apache.parquet.format.CompressionCodec; import org.apache.parquet.schema.MessageType; @@ -39,7 +38,6 @@ import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR; import static java.lang.String.format; import static java.util.Objects.requireNonNull; -import static org.apache.iceberg.FileFormat.PARQUET; public final class IcebergParquetFileWriter implements IcebergFileWriter @@ -79,18 +77,6 @@ public IcebergParquetFileWriter( this.metricsConfig = requireNonNull(metricsConfig, "metricsConfig is null"); } - @Override - public FileFormat fileFormat() - { - return PARQUET; - } - - @Override - public String location() - { - return location.toString(); - } - @Override public FileMetrics getFileMetrics() { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSortingFileWriter.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSortingFileWriter.java index c34712dbb35e..6dc1501affa8 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSortingFileWriter.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSortingFileWriter.java @@ -23,7 +23,6 @@ import io.trino.spi.connector.SortOrder; import io.trino.spi.type.Type; import io.trino.spi.type.TypeOperators; -import org.apache.iceberg.FileFormat; import java.io.Closeable; import java.util.List; @@ -63,18 +62,6 @@ public IcebergSortingFileWriter( OrcFileWriterFactory::createOrcDataSink); } - @Override - public FileFormat fileFormat() - { - return outputWriter.fileFormat(); - } - - @Override - public String location() - { - return outputWriter.location(); - } - @Override public FileMetrics getFileMetrics() { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableProperties.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableProperties.java index 74a2f68304aa..42274551b5b5 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableProperties.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableProperties.java @@ -244,7 +244,7 @@ public static int getFormatVersion(Map tableProperties) return (int) tableProperties.get(FORMAT_VERSION_PROPERTY); } - public static void validateFormatVersion(int version) + private static void validateFormatVersion(int version) { if (version < FORMAT_VERSION_SUPPORT_MIN || version > FORMAT_VERSION_SUPPORT_MAX) { throw new TrinoException(INVALID_TABLE_PROPERTY, diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergWritableTableHandle.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergWritableTableHandle.java index 89f10aa0e68c..7f347564c3b7 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergWritableTableHandle.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergWritableTableHandle.java @@ -15,7 +15,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import io.trino.plugin.iceberg.delete.PositionDeleteFiles; import io.trino.spi.connector.ConnectorInsertTableHandle; import io.trino.spi.connector.ConnectorOutputTableHandle; import io.trino.spi.connector.RetryMode; @@ -29,13 +28,11 @@ public record IcebergWritableTableHandle( SchemaTableName name, - int formatVersion, String schemaAsJson, Map partitionsSpecsAsJson, int partitionSpecId, List sortOrder, List inputColumns, - List previousDeleteFiles, String outputPath, IcebergFileFormat fileFormat, Map storageProperties, @@ -50,7 +47,6 @@ public record IcebergWritableTableHandle( partitionsSpecsAsJson = ImmutableMap.copyOf(requireNonNull(partitionsSpecsAsJson, "partitionsSpecsAsJson is null")); sortOrder = ImmutableList.copyOf(requireNonNull(sortOrder, "sortOrder is null")); inputColumns = ImmutableList.copyOf(requireNonNull(inputColumns, "inputColumns is null")); - previousDeleteFiles = ImmutableList.copyOf(previousDeleteFiles); requireNonNull(outputPath, "outputPath is null"); requireNonNull(fileFormat, "fileFormat is null"); storageProperties = ImmutableMap.copyOf(requireNonNull(storageProperties, "storageProperties is null")); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DeleteFile.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DeleteFile.java index bb4600ad398e..94e60363d4bb 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DeleteFile.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DeleteFile.java @@ -34,8 +34,6 @@ public record DeleteFile( FileFormat format, long recordCount, long fileSizeInBytes, - Long contentOffset, - Long contentSizeInBytes, List equalityFieldIds, Optional rowPositionLowerBound, Optional rowPositionUpperBound, @@ -58,8 +56,6 @@ public static DeleteFile fromIceberg(org.apache.iceberg.DeleteFile deleteFile) deleteFile.format(), deleteFile.recordCount(), deleteFile.fileSizeInBytes(), - deleteFile.contentOffset(), - deleteFile.contentSizeInBytes(), Optional.ofNullable(deleteFile.equalityFieldIds()).orElseGet(ImmutableList::of), rowPositionLowerBound, rowPositionUpperBound, diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DeleteManager.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DeleteManager.java index 8d4b039aa7a9..dd64f3cf8b87 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DeleteManager.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DeleteManager.java @@ -19,9 +19,6 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import io.airlift.slice.Slice; -import io.trino.filesystem.Location; -import io.trino.filesystem.TrinoFileSystem; -import io.trino.filesystem.TrinoInput; import io.trino.plugin.iceberg.IcebergColumnHandle; import io.trino.plugin.iceberg.IcebergPageSourceProvider.ReaderPageSourceWithRowPositions; import io.trino.plugin.iceberg.delete.EqualityDeleteFilter.EqualityDeleteFilterBuilder; @@ -54,12 +51,10 @@ import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_BAD_DATA; import static io.trino.plugin.iceberg.IcebergUtil.getColumnHandle; import static io.trino.plugin.iceberg.IcebergUtil.schemaFromHandles; -import static io.trino.plugin.iceberg.delete.DeletionVectors.readDeletionVector; import static io.trino.plugin.iceberg.delete.PositionDeleteFilter.readPositionDeletes; import static io.trino.spi.type.VarcharType.VARCHAR; import static java.util.Objects.requireNonNull; import static java.util.concurrent.Future.State.SUCCESS; -import static org.apache.iceberg.FileFormat.PUFFIN; import static org.apache.iceberg.MetadataColumns.DELETE_FILE_PATH; import static org.apache.iceberg.MetadataColumns.DELETE_FILE_POS; @@ -74,7 +69,6 @@ public DeleteManager(TypeManager typeManager) } public Optional getDeletePredicate( - TrinoFileSystem fileSystem, String dataFilePath, long dataSequenceNumber, List deleteFiles, @@ -97,7 +91,7 @@ public Optional getDeletePredicate( } } - Optional positionDeletes = createPositionDeleteFilter(fileSystem, dataFilePath, positionDeleteFiles, readerPageSourceWithRowPositions, deletePageSourceProvider) + Optional positionDeletes = createPositionDeleteFilter(dataFilePath, positionDeleteFiles, readerPageSourceWithRowPositions, deletePageSourceProvider) .map(filter -> filter.createPredicate(readColumns, dataSequenceNumber)); Optional equalityDeletes = createEqualityDeleteFilter(equalityDeleteFiles, tableSchema, deletePageSourceProvider).stream() .map(filter -> filter.createPredicate(readColumns, dataSequenceNumber)) @@ -120,7 +114,6 @@ ConnectorPageSource openDeletes( } private Optional createPositionDeleteFilter( - TrinoFileSystem fileSystem, String dataFilePath, List positionDeleteFiles, ReaderPageSourceWithRowPositions readerPageSourceWithRowPositions, @@ -148,21 +141,11 @@ private Optional createPositionDeleteFilter( LongBitmapDataProvider deletedRows = new Roaring64Bitmap(); for (DeleteFile deleteFile : positionDeleteFiles) { if (shouldLoadPositionDeleteFile(deleteFile, startRowPosition, endRowPosition)) { - if (deleteFile.format() == PUFFIN) { - try (TrinoInput input = fileSystem.newInputFile(Location.of(deleteFile.path())).newInput()) { - readDeletionVector(input, deleteFile.recordCount(), deleteFile.contentOffset(), deleteFile.contentSizeInBytes(), deletedRows); - } - catch (IOException e) { - throw new UncheckedIOException(e); - } + try (ConnectorPageSource pageSource = deletePageSourceProvider.openDeletes(deleteFile, deleteColumns, deleteDomain)) { + readPositionDeletes(pageSource, targetPath, deletedRows); } - else { - try (ConnectorPageSource pageSource = deletePageSourceProvider.openDeletes(deleteFile, deleteColumns, deleteDomain)) { - readPositionDeletes(pageSource, targetPath, deletedRows); - } - catch (IOException e) { - throw new UncheckedIOException(e); - } + catch (IOException e) { + throw new UncheckedIOException(e); } } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DeletionVectorFileWriter.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DeletionVectorFileWriter.java deleted file mode 100644 index 62caa59043dc..000000000000 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DeletionVectorFileWriter.java +++ /dev/null @@ -1,222 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.plugin.iceberg.delete; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import io.trino.filesystem.Location; -import io.trino.filesystem.TrinoFileSystem; -import io.trino.plugin.hive.NodeVersion; -import io.trino.plugin.iceberg.fileio.ForwardingOutputFile; -import jakarta.annotation.Nullable; -import org.apache.iceberg.DeleteFile; -import org.apache.iceberg.FileMetadata; -import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.StructLike; -import org.apache.iceberg.deletes.DVFileWriter; -import org.apache.iceberg.deletes.PositionDeleteIndex; -import org.apache.iceberg.deletes.TrinoBitmapPositionDeleteIndex; -import org.apache.iceberg.io.DeleteWriteResult; -import org.apache.iceberg.puffin.Blob; -import org.apache.iceberg.puffin.BlobMetadata; -import org.apache.iceberg.puffin.Puffin; -import org.apache.iceberg.puffin.PuffinWriter; -import org.apache.iceberg.util.CharSequenceSet; -import org.apache.iceberg.util.ContentFileUtil; -import org.apache.iceberg.util.StructLikeUtil; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.function.Function; - -import static com.google.common.base.Preconditions.checkState; -import static com.google.common.collect.ImmutableList.toImmutableList; -import static java.util.Objects.requireNonNull; -import static org.apache.iceberg.FileFormat.PUFFIN; -import static org.apache.iceberg.MetadataColumns.ROW_POSITION; -import static org.apache.iceberg.puffin.StandardBlobTypes.DV_V1; - -/** - * Copy {@link org.apache.iceberg.deletes.BaseDVFileWriter} and replace its file system with TrinoFileSystem - */ -public class DeletionVectorFileWriter - implements DVFileWriter -{ - private static final String REFERENCED_DATA_FILE_KEY = "referenced-data-file"; - private static final String CARDINALITY_KEY = "cardinality"; - - private final NodeVersion nodeVersion; - private final TrinoFileSystem fileSystem; - private final Location location; - private final Function loadPreviousDeletes; - private final Map deletesByPath = new HashMap<>(); - private final Map blobsByPath = new HashMap<>(); - - private DeleteWriteResult result; - - public DeletionVectorFileWriter( - NodeVersion nodeVersion, - TrinoFileSystem fileSystem, - Location location, - Function loadPreviousDeletes) - { - this.nodeVersion = requireNonNull(nodeVersion, "nodeVersion is null"); - this.fileSystem = requireNonNull(fileSystem, "fileSystem is null"); - this.location = requireNonNull(location, "location is null"); - this.loadPreviousDeletes = requireNonNull(loadPreviousDeletes, "loadPreviousDeletes is null"); - } - - @Override - public void delete(String path, long pos, PartitionSpec spec, StructLike partition) - { - Deletes deletes = deletesByPath.computeIfAbsent(path, _ -> new Deletes(path, spec, partition)); - PositionDeleteIndex positions = deletes.positions(); - positions.delete(pos); - } - - @Override - public DeleteWriteResult result() - { - checkState(result != null, "Cannot get result from unclosed writer"); - return result; - } - - @Override - public void close() - throws IOException - { - if (result == null) { - CharSequenceSet referencedDataFiles = CharSequenceSet.empty(); - List rewrittenDeleteFiles = new ArrayList<>(); - - PuffinWriter writer = newWriter(); - - try (PuffinWriter closeableWriter = writer) { - for (Deletes deletes : deletesByPath.values()) { - String path = deletes.path(); - PositionDeleteIndex positions = deletes.positions(); - PositionDeleteIndex previousPositions = loadPreviousDeletes.apply(path); - if (previousPositions != null) { - positions.merge(previousPositions); - for (DeleteFile previousDeleteFile : previousPositions.deleteFiles()) { - // only DVs and file-scoped deletes can be discarded from the table state - if (ContentFileUtil.isFileScoped(previousDeleteFile)) { - rewrittenDeleteFiles.add(previousDeleteFile); - } - } - } - write(closeableWriter, deletes); - referencedDataFiles.add(path); - } - } - - // DVs share the Puffin path and file size but have different offsets - String puffinPath = writer.location(); - long puffinFileSize = writer.fileSize(); - - List dvs = deletesByPath.keySet().stream() - .map(path -> createDV(puffinPath, puffinFileSize, path)) - .collect(toImmutableList()); - - this.result = new DeleteWriteResult(dvs, referencedDataFiles, rewrittenDeleteFiles); - } - } - - private DeleteFile createDV(String path, long size, String referencedDataFile) - { - Deletes deletes = deletesByPath.get(referencedDataFile); - BlobMetadata blobMetadata = blobsByPath.get(referencedDataFile); - return FileMetadata.deleteFileBuilder(deletes.spec()) - .ofPositionDeletes() - .withFormat(PUFFIN) - .withPath(path) - .withPartition(deletes.partition()) - .withFileSizeInBytes(size) - .withReferencedDataFile(referencedDataFile) - .withContentOffset(blobMetadata.offset()) - .withContentSizeInBytes(blobMetadata.length()) - .withRecordCount(deletes.positions().cardinality()) - .build(); - } - - private void write(PuffinWriter writer, Deletes deletes) - { - String path = deletes.path(); - PositionDeleteIndex positions = deletes.positions(); - BlobMetadata blobMetadata = writer.write(toBlob(positions, path)); - blobsByPath.put(path, blobMetadata); - } - - private PuffinWriter newWriter() - { - return Puffin.write(new ForwardingOutputFile(fileSystem, location)) - .createdBy("Trino version " + nodeVersion.toString()) - .build(); - } - - private static Blob toBlob(PositionDeleteIndex positions, String path) - { - return new Blob( - DV_V1, - ImmutableList.of(ROW_POSITION.fieldId()), - -1 /* snapshot ID is inherited */, - -1 /* sequence number is inherited */, - positions.serialize(), - null /* uncompressed */, - ImmutableMap.builder() - .put(REFERENCED_DATA_FILE_KEY, path) - .put(CARDINALITY_KEY, String.valueOf(positions.cardinality())) - .buildOrThrow()); - } - - private static class Deletes - { - private final String path; - private final PartitionSpec spec; - private final StructLike partition; - private final PositionDeleteIndex positions; - - private Deletes(String path, PartitionSpec spec, @Nullable StructLike partition) - { - this.path = requireNonNull(path, "path is null"); - this.spec = requireNonNull(spec, "spec is null"); - this.partition = StructLikeUtil.copy(partition); - this.positions = new TrinoBitmapPositionDeleteIndex(); - } - - public String path() - { - return path; - } - - public PartitionSpec spec() - { - return spec; - } - - @Nullable - public StructLike partition() - { - return partition; - } - - public PositionDeleteIndex positions() - { - return positions; - } - } -} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DeletionVectorWriter.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DeletionVectorWriter.java deleted file mode 100644 index 26aa74cc6081..000000000000 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DeletionVectorWriter.java +++ /dev/null @@ -1,227 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.plugin.iceberg.delete; - -import com.google.common.collect.ImmutableMap; -import io.trino.filesystem.Location; -import io.trino.filesystem.TrinoFileSystem; -import io.trino.plugin.hive.NodeVersion; -import io.trino.plugin.iceberg.IcebergFileWriter; -import io.trino.plugin.iceberg.PartitionData; -import io.trino.plugin.iceberg.fileio.ForwardingInputFile; -import io.trino.spi.Page; -import io.trino.spi.TrinoException; -import io.trino.spi.block.LongArrayBlock; -import org.apache.iceberg.ContentFileParsers; -import org.apache.iceberg.DeleteFile; -import org.apache.iceberg.FileFormat; -import org.apache.iceberg.Metrics; -import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.data.BaseDeleteLoader; -import org.apache.iceberg.data.DeleteLoader; -import org.apache.iceberg.deletes.PositionDeleteIndex; -import org.apache.iceberg.io.DeleteWriteResult; -import org.apache.iceberg.util.DeleteFileSet; - -import java.io.Closeable; -import java.io.IOException; -import java.util.List; -import java.util.Map; -import java.util.NoSuchElementException; -import java.util.Optional; -import java.util.function.Function; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.collect.ImmutableList.toImmutableList; -import static io.airlift.slice.SizeOf.instanceSize; -import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_WRITER_CLOSE_ERROR; -import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_WRITER_OPEN_ERROR; -import static java.util.Objects.requireNonNull; -import static org.apache.iceberg.FileFormat.PUFFIN; - -public class DeletionVectorWriter - implements IcebergFileWriter -{ - private static final int INSTANCE_SIZE = instanceSize(DeletionVectorWriter.class); - - private final DeletionVectorFileWriter writer; - private final String dataFilePath; - private final PartitionSpec partitionSpec; - private final PartitionData partition; - private final int positionChannel; - private final Closeable rollbackAction; - private DeleteWriteResult result; - - public DeletionVectorWriter( - NodeVersion nodeVersion, - TrinoFileSystem fileSystem, - Location outputPath, - String dataFilePath, - PartitionSpec partitionSpec, - Optional partition, - Function loadPreviousDeletes, - int positionChannel) - { - writer = new DeletionVectorFileWriter(nodeVersion, fileSystem, outputPath, loadPreviousDeletes); - this.dataFilePath = requireNonNull(dataFilePath, "dataFilePath is null"); - this.partitionSpec = requireNonNull(partitionSpec, "partitionSpec is null"); - this.partition = requireNonNull(partition, "partition is null").orElse(null); - checkArgument(positionChannel >= 0, "positionChannel is negative"); - this.positionChannel = positionChannel; - rollbackAction = () -> fileSystem.deleteFile(outputPath); - } - - public static Function create(TrinoFileSystem fileSystem, Map deleteFiles) - { - if (deleteFiles == null) { - return _ -> null; - } - return new PreviousDeleteLoader(fileSystem, deleteFiles); - } - - private static class PreviousDeleteLoader - implements Function - { - private final Map deleteFiles; - private final DeleteLoader deleteLoader; - - private PreviousDeleteLoader(TrinoFileSystem fileSystem, Map deleteFiles) - { - requireNonNull(fileSystem, "fileSystem is null"); - this.deleteFiles = ImmutableMap.copyOf(deleteFiles); - this.deleteLoader = new BaseDeleteLoader(deleteFile -> new ForwardingInputFile(fileSystem.newInputFile(Location.of(deleteFile.location())))); - } - - @Override - public PositionDeleteIndex apply(CharSequence path) - { - DeleteFileSet deleteFileSet = deleteFiles.get(path.toString()); - if (deleteFileSet == null) { - return null; - } - - return deleteLoader.loadPositionDeletes(deleteFileSet, path); - } - } - - @Override - public FileFormat fileFormat() - { - return PUFFIN; - } - - @Override - public String location() - { - return deleteFile().location(); - } - - @Override - public List rewrittenDeleteFiles() - { - return result.rewrittenDeleteFiles().stream() - .map(file -> ContentFileParsers.toJson(file, partitionSpec)) - .collect(toImmutableList()); - } - - @Override - public FileMetrics getFileMetrics() - { - DeleteFile deleteFile = deleteFile(); - Metrics metrics = new Metrics( - deleteFile.recordCount(), - deleteFile.columnSizes(), - deleteFile.valueCounts(), - deleteFile.nullValueCounts(), - deleteFile.nanValueCounts(), - deleteFile.lowerBounds(), - deleteFile.upperBounds()); - return new FileMetrics(metrics, Optional.ofNullable(deleteFile.splitOffsets())); - } - - @Override - public long getWrittenBytes() - { - return deleteFile().fileSizeInBytes(); - } - - @Override - public long getMemoryUsage() - { - return INSTANCE_SIZE; - } - - @Override - public void appendRows(Page dataPage) - { - LongArrayBlock block = (LongArrayBlock) dataPage.getBlock(positionChannel); - for (int i = 0; i < block.getPositionCount(); i++) { - writer.delete(dataFilePath, block.getLong(i), partitionSpec, partition); - } - } - - private DeleteFile deleteFile() - { - try { - return result.deleteFiles().getLast(); - } - catch (NoSuchElementException e) { - throw new TrinoException(ICEBERG_WRITER_OPEN_ERROR, "Delete file must exist", e); - } - } - - public DeleteWriteResult result() - { - return writer.result(); - } - - @Override - public Closeable commit() - { - try { - writer.close(); - result = writer.result(); - } - catch (IOException e) { - try { - rollbackAction.close(); - } - catch (Exception ex) { - if (!e.equals(ex)) { - e.addSuppressed(ex); - } - } - throw new TrinoException(ICEBERG_WRITER_OPEN_ERROR, "Error closing Deletion Vector file", e); - } - return rollbackAction; - } - - @Override - public void rollback() - { - try (rollbackAction) { - writer.close(); - result = writer.result(); - } - catch (Exception e) { - throw new TrinoException(ICEBERG_WRITER_CLOSE_ERROR, "Error rolling back write to Deletion Vector file", e); - } - } - - @Override - public long getValidationCpuNanos() - { - return 0; - } -} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DeletionVectors.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DeletionVectors.java deleted file mode 100644 index 9e6dbf13869d..000000000000 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DeletionVectors.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.plugin.iceberg.delete; - -import io.trino.filesystem.TrinoInput; -import org.roaringbitmap.longlong.LongBitmapDataProvider; - -import java.io.IOException; - -import static java.lang.Math.toIntExact; -import static org.apache.iceberg.deletes.TrinoBitmapPositionDeleteIndex.deserialize; - -public final class DeletionVectors -{ - public static final int LENGTH_SIZE_BYTES = 4; - public static final int CRC_SIZE_BYTES = 4; - - private DeletionVectors() {} - - public static void readDeletionVector(TrinoInput input, long recordCount, Long contentOffset, Long contentSizeInBytes, LongBitmapDataProvider deletedRows) - throws IOException - { - byte[] bytes = input.readFully(contentOffset, LENGTH_SIZE_BYTES + toIntExact(contentSizeInBytes) + CRC_SIZE_BYTES).getBytes(); - deserialize(bytes, recordCount, contentSizeInBytes).forEach(deletedRows::addLong); - } -} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/PositionDeleteFiles.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/PositionDeleteFiles.java deleted file mode 100644 index eabbb9911a40..000000000000 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/PositionDeleteFiles.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.plugin.iceberg.delete; - -import com.google.common.collect.ImmutableList; - -import java.util.List; - -import static java.util.Objects.requireNonNull; - -public record PositionDeleteFiles(String dataFileLocation, int partitionSpecId, List deletes) -{ - public PositionDeleteFiles - { - requireNonNull(dataFileLocation, "dataFileLocation is null"); - deletes = ImmutableList.copyOf(deletes); - } -} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/PositionDeleteWriter.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/PositionDeleteWriter.java index be627a86e7a8..1f6bc8f71543 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/PositionDeleteWriter.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/PositionDeleteWriter.java @@ -28,23 +28,17 @@ import io.trino.spi.block.Block; import io.trino.spi.block.RunLengthEncodedBlock; import io.trino.spi.connector.ConnectorSession; -import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileContent; -import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.PartitionSpecParser; -import org.apache.iceberg.io.DeleteWriteResult; import org.apache.iceberg.io.LocationProvider; -import org.apache.iceberg.util.DeleteFileSet; import org.roaringbitmap.longlong.ImmutableLongBitmapDataProvider; import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.OptionalLong; -import static com.google.common.base.Preconditions.checkState; import static io.airlift.slice.Slices.utf8Slice; import static io.airlift.slice.Slices.wrappedBuffer; import static io.trino.spi.predicate.Utils.nativeValueToBlock; @@ -52,7 +46,6 @@ import static io.trino.spi.type.VarcharType.VARCHAR; import static java.util.Objects.requireNonNull; import static java.util.UUID.randomUUID; -import static org.apache.iceberg.FileFormat.PUFFIN; public class PositionDeleteWriter { @@ -60,8 +53,10 @@ public class PositionDeleteWriter private final Block dataFilePathBlock; private final PartitionSpec partitionSpec; private final Optional partition; + private final String outputPath; private final JsonCodec jsonCodec; private final IcebergFileWriter writer; + private final IcebergFileFormat fileFormat; public PositionDeleteWriter( String dataFilePath, @@ -72,35 +67,23 @@ public PositionDeleteWriter( TrinoFileSystem fileSystem, JsonCodec jsonCodec, ConnectorSession session, - int formatVersion, IcebergFileFormat fileFormat, - Map storageProperties, - Map previousDeleteFiles) + Map storageProperties) { this.dataFilePath = requireNonNull(dataFilePath, "dataFilePath is null"); this.dataFilePathBlock = nativeValueToBlock(VARCHAR, utf8Slice(dataFilePath)); this.jsonCodec = requireNonNull(jsonCodec, "jsonCodec is null"); this.partitionSpec = requireNonNull(partitionSpec, "partitionSpec is null"); this.partition = requireNonNull(partition, "partition is null"); - requireNonNull(fileFormat, "fileFormat is null"); + this.fileFormat = requireNonNull(fileFormat, "fileFormat is null"); // Prepend query ID to the file name, allowing us to determine the files written by a query. // This is necessary for opportunistic cleanup of extra files, which may be present for // successfully completed queries in the presence of failure recovery mechanisms. - FileFormat icebergFileFormat = formatVersion >= 3 ? PUFFIN : fileFormat.toIceberg(); - String fileName = icebergFileFormat.addExtension(session.getQueryId() + "-" + randomUUID()); - String outputPath = partition + String fileName = fileFormat.toIceberg().addExtension(session.getQueryId() + "-" + randomUUID()); + this.outputPath = partition .map(partitionData -> locationProvider.newDataLocation(partitionSpec, partitionData, fileName)) .orElseGet(() -> locationProvider.newDataLocation(fileName)); - this.writer = fileWriterFactory.createPositionDeleteWriter( - fileSystem, - Location.of(outputPath), - session, - dataFilePath, - icebergFileFormat, - partitionSpec, - partition, - storageProperties, - previousDeleteFiles); + this.writer = fileWriterFactory.createPositionDeleteWriter(fileSystem, Location.of(outputPath), session, fileFormat, storageProperties); } public Collection write(ImmutableLongBitmapDataProvider rowsToDelete) @@ -108,28 +91,15 @@ public Collection write(ImmutableLongBitmapDataProvider rowsToDelete) writeDeletes(rowsToDelete); writer.commit(); - OptionalLong contentOffset = OptionalLong.empty(); - OptionalLong contentSize = OptionalLong.empty(); - if (writer instanceof DeletionVectorWriter deletionVectorWriter) { - checkState(writer.fileFormat() == PUFFIN, "File format must be PUFFIN for deletion vector"); - DeleteWriteResult result = deletionVectorWriter.result(); - DeleteFile deleteFile = result.deleteFiles().getLast(); - contentOffset = OptionalLong.of(deleteFile.contentOffset()); - contentSize = OptionalLong.of(deleteFile.contentSizeInBytes()); - } - CommitTaskData task = new CommitTaskData( - writer.location(), - writer.fileFormat(), + outputPath, + fileFormat, writer.getWrittenBytes(), new MetricsWrapper(writer.getFileMetrics().metrics()), PartitionSpecParser.toJson(partitionSpec), partition.map(PartitionData::toJson), FileContent.POSITION_DELETES, Optional.of(dataFilePath), - writer.rewrittenDeleteFiles(), - contentOffset, - contentSize, writer.getFileMetrics().splitOffsets()); return List.of(wrappedBuffer(jsonCodec.toJsonBytes(task))); diff --git a/plugin/trino-iceberg/src/main/java/org/apache/iceberg/ContentFileParsers.java b/plugin/trino-iceberg/src/main/java/org/apache/iceberg/ContentFileParsers.java deleted file mode 100644 index 0cee4f358126..000000000000 --- a/plugin/trino-iceberg/src/main/java/org/apache/iceberg/ContentFileParsers.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.iceberg; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import io.airlift.json.ObjectMapperProvider; - -public final class ContentFileParsers -{ - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapperProvider().get(); - - private ContentFileParsers() {} - - public static String toJson(ContentFile contentFile, PartitionSpec spec) - { - return ContentFileParser.toJson(contentFile, spec); - } - - public static ContentFile fromJson(String jsonNode, PartitionSpec spec) - { - try { - return ContentFileParser.fromJson(OBJECT_MAPPER.readTree(jsonNode), spec); - } - catch (JsonProcessingException e) { - throw new RuntimeException(e); - } - } -} diff --git a/plugin/trino-iceberg/src/main/java/org/apache/iceberg/deletes/TrinoBitmapPositionDeleteIndex.java b/plugin/trino-iceberg/src/main/java/org/apache/iceberg/deletes/TrinoBitmapPositionDeleteIndex.java deleted file mode 100644 index e7ad87100691..000000000000 --- a/plugin/trino-iceberg/src/main/java/org/apache/iceberg/deletes/TrinoBitmapPositionDeleteIndex.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.iceberg.deletes; - -import java.nio.ByteBuffer; -import java.nio.ByteOrder; -import java.util.zip.CRC32; - -import static com.google.common.base.Preconditions.checkArgument; -import static io.trino.plugin.iceberg.delete.DeletionVectors.CRC_SIZE_BYTES; -import static io.trino.plugin.iceberg.delete.DeletionVectors.LENGTH_SIZE_BYTES; - -// Exposes package-private BitmapPositionDeleteIndex -public class TrinoBitmapPositionDeleteIndex - extends BitmapPositionDeleteIndex -{ - private static final int BITMAP_DATA_OFFSET = 4; - private static final int MAGIC_NUMBER = 1681511377; - - public static TrinoRoaringPositionBitmap deserialize(byte[] bytes, long recordCount, Long contentSizeInBytes) - { - ByteBuffer buffer = ByteBuffer.wrap(bytes); - int bitmapDataLength = readBitmapDataLength(buffer, contentSizeInBytes); - TrinoRoaringPositionBitmap bitmap = deserializeBitmap(bytes, bitmapDataLength, recordCount); - int crc = computeChecksum(bytes, bitmapDataLength); - int crcOffset = LENGTH_SIZE_BYTES + bitmapDataLength; - int expectedCrc = buffer.getInt(crcOffset); - checkArgument(crc == expectedCrc, "Invalid CRC"); - return bitmap; - } - - private static int readBitmapDataLength(ByteBuffer buffer, Long contentSizeInBytes) - { - int length = buffer.getInt(); - long expectedLength = contentSizeInBytes - LENGTH_SIZE_BYTES - CRC_SIZE_BYTES; - checkArgument(length == expectedLength, "Invalid bitmap data length: %s, expected %s", length, expectedLength); - return length; - } - - private static TrinoRoaringPositionBitmap deserializeBitmap(byte[] bytes, int bitmapDataLength, long recordCount) - { - ByteBuffer bitmapData = pointToBitmapData(bytes, bitmapDataLength); - int magicNumber = bitmapData.getInt(); - checkArgument(magicNumber == MAGIC_NUMBER, "Invalid magic number: %s, expected %s", magicNumber, MAGIC_NUMBER); - TrinoRoaringPositionBitmap bitmap = TrinoRoaringPositionBitmap.deserialize(bitmapData); - long cardinality = bitmap.cardinality(); - checkArgument(cardinality == recordCount, "Invalid cardinality: %s, expected %s", cardinality, recordCount); - return bitmap; - } - - private static int computeChecksum(byte[] bytes, int bitmapDataLength) - { - CRC32 crc = new CRC32(); - crc.update(bytes, BITMAP_DATA_OFFSET, bitmapDataLength); - return (int) crc.getValue(); - } - - private static ByteBuffer pointToBitmapData(byte[] bytes, int bitmapDataLength) - { - ByteBuffer bitmapData = ByteBuffer.wrap(bytes, BITMAP_DATA_OFFSET, bitmapDataLength); - bitmapData.order(ByteOrder.LITTLE_ENDIAN); - return bitmapData; - } -} diff --git a/plugin/trino-iceberg/src/main/java/org/apache/iceberg/deletes/TrinoRoaringPositionBitmap.java b/plugin/trino-iceberg/src/main/java/org/apache/iceberg/deletes/TrinoRoaringPositionBitmap.java deleted file mode 100644 index 0acb8da6844c..000000000000 --- a/plugin/trino-iceberg/src/main/java/org/apache/iceberg/deletes/TrinoRoaringPositionBitmap.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.iceberg.deletes; - -import java.nio.ByteBuffer; - -public class TrinoRoaringPositionBitmap - extends RoaringPositionBitmap -{ - public static TrinoRoaringPositionBitmap deserialize(ByteBuffer buffer) - { - TrinoRoaringPositionBitmap bitmap = new TrinoRoaringPositionBitmap(); - bitmap.setAll(RoaringPositionBitmap.deserialize(buffer)); - return bitmap; - } -} diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorSmokeTest.java index b7539079c4fa..2e3e8843d8a2 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorSmokeTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorSmokeTest.java @@ -133,16 +133,6 @@ public void testHiddenPathColumn() } } - @Test - public void testDeleteWithV3Format() - { - try (TestTable table = newTrinoTable("test_delete_with_v3", "WITH (format_version = 3) AS SELECT * FROM region")) { - assertUpdate("DELETE FROM " + table.getName() + " WHERE regionkey = 1", 1); - assertThat(query("SELECT * FROM " + table.getName())) - .matches("SELECT * FROM region WHERE regionkey <> 1"); - } - } - // Repeat test with invocationCount for better test coverage, since the tested aspect is inherently non-deterministic. @RepeatedTest(4) @Timeout(120) diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java index 6ebb2800dc2c..12b392458f51 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java @@ -184,15 +184,13 @@ public abstract class BaseIcebergConnectorTest private static final Pattern WITH_CLAUSE_EXTRACTOR = Pattern.compile(".*(WITH\\s*\\([^)]*\\))\\s*$", Pattern.DOTALL); protected final IcebergFileFormat format; - private final int formatVersion; protected TrinoFileSystem fileSystem; protected TimeUnit storageTimePrecision; - protected BaseIcebergConnectorTest(IcebergFileFormat format, int formatVersion) + protected BaseIcebergConnectorTest(IcebergFileFormat format) { this.format = requireNonNull(format, "format is null"); - this.formatVersion = formatVersion; } @Override @@ -208,7 +206,6 @@ protected IcebergQueryRunner.Builder createQueryRunnerBuilder() return IcebergQueryRunner.builder() .setIcebergProperties(ImmutableMap.builder() .put("iceberg.file-format", format.name()) - .put("iceberg.format-version", Integer.toString(formatVersion)) // Only allow some extra properties. Add "sorted_by" so that we can test that the property is disallowed by the connector explicitly. .put("iceberg.allowed-extra-properties", "extra.property.one,extra.property.two,extra.property.three,sorted_by") // Allows testing the sorting writer flushing to the file system with smaller tables @@ -380,7 +377,7 @@ public void testShowCreateTable() ")\n" + "WITH (\n" + " format = '" + format.name() + "',\n" + - " format_version = " + formatVersion + ",\n" + + " format_version = 2,\n" + " location = '\\E.*/tpch/orders-.*\\Q',\n" + " max_commit_retry = 4\n" + ")\\E"); @@ -1954,13 +1951,12 @@ private void testCreateTableLikeForFormat(IcebergFileFormat otherFormat) """ WITH ( format = '%s', - format_version = %s, + format_version = 2, location = '%s', max_commit_retry = 4, partitioning = ARRAY['adate'] )""", format, - formatVersion, tempDirPath)); assertUpdate("CREATE TABLE test_create_table_like_copy0 (LIKE test_create_table_like_original, col2 INTEGER)"); @@ -1972,12 +1968,11 @@ private void testCreateTableLikeForFormat(IcebergFileFormat otherFormat) """ WITH ( format = '%s', - format_version = %s, + format_version = 2, location = '%s', max_commit_retry = 4 )""", format, - formatVersion, getTableLocation("test_create_table_like_copy1"))); assertUpdate("CREATE TABLE test_create_table_like_copy2 (LIKE test_create_table_like_original EXCLUDING PROPERTIES)"); @@ -1985,12 +1980,11 @@ private void testCreateTableLikeForFormat(IcebergFileFormat otherFormat) """ WITH ( format = '%s', - format_version = %s, + format_version = 2, location = '%s', max_commit_retry = 4 )""", format, - formatVersion, getTableLocation("test_create_table_like_copy2"))); assertUpdate("DROP TABLE test_create_table_like_copy2"); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergTestUtils.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergTestUtils.java index 411269bbba48..ff0be14d7d03 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergTestUtils.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergTestUtils.java @@ -13,7 +13,6 @@ */ package io.trino.plugin.iceberg; -import com.google.common.collect.ImmutableList; import io.trino.Session; import io.trino.filesystem.FileEntry; import io.trino.filesystem.FileIterator; @@ -248,16 +247,4 @@ public static ParquetMetadata getParquetFileMetadata(TrinoInputFile inputFile) throw new UncheckedIOException(e); } } - - public static List listFiles(TrinoFileSystem trinoFileSystem, String location) - throws IOException - { - ImmutableList.Builder files = ImmutableList.builder(); - FileIterator fileIterator = trinoFileSystem.listFiles(Location.of(location)); - while (fileIterator.hasNext()) { - FileEntry entry = fileIterator.next(); - files.add(entry.location().fileName()); - } - return files.build(); - } } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestFileBasedConflictDetection.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestFileBasedConflictDetection.java index 7e6bd051522a..bbfb093e5e68 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestFileBasedConflictDetection.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestFileBasedConflictDetection.java @@ -21,7 +21,6 @@ import io.trino.spi.predicate.TupleDomain; import io.trino.spi.type.RowType; import org.apache.hadoop.conf.Configuration; -import org.apache.iceberg.FileFormat; import org.apache.iceberg.Metrics; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.PartitionSpecParser; @@ -208,11 +207,11 @@ void testConflictDetectionOnEvolvedTable() """ {"partitionValues":[40]} """; - CommitTaskData commitTaskData1 = new CommitTaskData("test_location/data/new.parquet", FileFormat.PARQUET, 0, new MetricsWrapper(new Metrics()), PartitionSpecParser.toJson(currentPartitionSpec), - Optional.of(partitionDataJson), DATA, Optional.empty(), List.of(), OptionalLong.empty(), OptionalLong.empty(), Optional.empty()); + CommitTaskData commitTaskData1 = new CommitTaskData("test_location/data/new.parquet", IcebergFileFormat.PARQUET, 0, new MetricsWrapper(new Metrics()), PartitionSpecParser.toJson(currentPartitionSpec), + Optional.of(partitionDataJson), DATA, Optional.empty(), Optional.empty()); // Remove file from version with previous partition specification - CommitTaskData commitTaskData2 = new CommitTaskData("test_location/data/old.parquet", FileFormat.PARQUET, 0, new MetricsWrapper(new Metrics()), PartitionSpecParser.toJson(previousPartitionSpec), - Optional.of(partitionDataJson), POSITION_DELETES, Optional.empty(), List.of(), OptionalLong.empty(), OptionalLong.empty(), Optional.empty()); + CommitTaskData commitTaskData2 = new CommitTaskData("test_location/data/old.parquet", IcebergFileFormat.PARQUET, 0, new MetricsWrapper(new Metrics()), PartitionSpecParser.toJson(previousPartitionSpec), + Optional.of(partitionDataJson), POSITION_DELETES, Optional.empty(), Optional.empty()); TupleDomain icebergColumnHandleTupleDomain = extractTupleDomainsFromCommitTasks(getIcebergTableHandle(currentPartitionSpec), icebergTable, List.of(commitTaskData1, commitTaskData2), null); assertThat(icebergColumnHandleTupleDomain.getDomains().orElseThrow()).isEmpty(); @@ -224,29 +223,23 @@ private static List getCommitTaskDataForUpdate(PartitionSpec par // Update operation contains two commit tasks CommitTaskData commitTaskData1 = new CommitTaskData( "test_location/data/new.parquet", - FileFormat.PARQUET, + IcebergFileFormat.PARQUET, 0, new MetricsWrapper(new Metrics()), PartitionSpecParser.toJson(partitionSpec), partitionDataJson, DATA, Optional.empty(), - List.of(), - OptionalLong.empty(), - OptionalLong.empty(), Optional.empty()); CommitTaskData commitTaskData2 = new CommitTaskData( "test_location/data/old.parquet", - FileFormat.PARQUET, + IcebergFileFormat.PARQUET, 0, new MetricsWrapper(new Metrics()), PartitionSpecParser.toJson(partitionSpec), partitionDataJson, POSITION_DELETES, Optional.empty(), - List.of(), - OptionalLong.empty(), - OptionalLong.empty(), Optional.empty()); return List.of(commitTaskData1, commitTaskData2); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergAvroConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergAvroConnectorTest.java index 9387b8e97b19..d85a8fe78043 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergAvroConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergAvroConnectorTest.java @@ -15,7 +15,6 @@ import org.junit.jupiter.api.Test; -import static io.trino.plugin.iceberg.IcebergConfig.FORMAT_VERSION_SUPPORT_MAX; import static io.trino.plugin.iceberg.IcebergFileFormat.AVRO; import static org.junit.jupiter.api.Assumptions.abort; @@ -24,7 +23,7 @@ public class TestIcebergAvroConnectorTest { public TestIcebergAvroConnectorTest() { - super(AVRO, FORMAT_VERSION_SUPPORT_MAX); + super(AVRO); } @Override diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergFileOperations.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergFileOperations.java index f2dc81543c52..c87fb97bb09d 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergFileOperations.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergFileOperations.java @@ -21,7 +21,6 @@ import io.trino.SystemSessionProperties; import io.trino.filesystem.TrinoFileSystemFactory; import io.trino.metastore.HiveMetastore; -import io.trino.plugin.iceberg.util.FileOperationUtils; import io.trino.plugin.iceberg.util.FileOperationUtils.Scope; import io.trino.plugin.tpch.TpchPlugin; import io.trino.sql.planner.plan.FilterNode; @@ -53,7 +52,6 @@ import static io.trino.plugin.iceberg.util.FileOperationUtils.FileType.DELETE; import static io.trino.plugin.iceberg.util.FileOperationUtils.FileType.MANIFEST; import static io.trino.plugin.iceberg.util.FileOperationUtils.FileType.METADATA_JSON; -import static io.trino.plugin.iceberg.util.FileOperationUtils.FileType.PUFFIN; import static io.trino.plugin.iceberg.util.FileOperationUtils.FileType.SNAPSHOT; import static io.trino.plugin.iceberg.util.FileOperationUtils.FileType.STATS; import static io.trino.plugin.iceberg.util.FileOperationUtils.Scope.ALL_FILES; @@ -914,45 +912,6 @@ public void testSystemMetadataMaterializedViews() assertUpdate("DROP SCHEMA " + schemaName + " CASCADE"); } - @Test - public void testV3MaterializedViews() - { - String schemaName = "test_v3_materialized_views_" + randomNameSuffix(); - assertUpdate("CREATE SCHEMA " + schemaName); - Session session = Session.builder(getSession()) - .setSchema(schemaName) - .build(); - - assertUpdate(session, "CREATE TABLE test_table WITH (format_version = 3) AS SELECT * FROM (VALUES 1, 2) AS t(a)", 2); - - assertUpdate(session, "CREATE MATERIALIZED VIEW mv AS SELECT * FROM test_table"); - assertFileSystemAccesses(session, "REFRESH MATERIALIZED VIEW mv", - ImmutableMultiset.builder() - .add(new FileOperationUtils.FileOperation(METADATA_JSON, "OutputFile.create")) - .addCopies(new FileOperationUtils.FileOperation(METADATA_JSON, "InputFile.newStream"), 2) - .add(new FileOperationUtils.FileOperation(MANIFEST, "OutputFile.create")) - .add(new FileOperationUtils.FileOperation(MANIFEST, "InputFile.newStream")) - .addCopies(new FileOperationUtils.FileOperation(SNAPSHOT, "OutputFile.create"), 2) - .addCopies(new FileOperationUtils.FileOperation(SNAPSHOT, "InputFile.newStream"), 4) - .addCopies(new FileOperationUtils.FileOperation(SNAPSHOT, "InputFile.length"), 4) - .build()); - - assertUpdate(session, "DELETE FROM test_table WHERE a = 1", 1); - assertFileSystemAccesses(session, "REFRESH MATERIALIZED VIEW mv", - ImmutableMultiset.builder() - .add(new FileOperationUtils.FileOperation(METADATA_JSON, "OutputFile.create")) - .addCopies(new FileOperationUtils.FileOperation(METADATA_JSON, "InputFile.newStream"), 2) - .addCopies(new FileOperationUtils.FileOperation(MANIFEST, "OutputFile.create"), 2) - .addCopies(new FileOperationUtils.FileOperation(MANIFEST, "InputFile.newStream"), 6) - .addCopies(new FileOperationUtils.FileOperation(SNAPSHOT, "OutputFile.create"), 2) - .addCopies(new FileOperationUtils.FileOperation(SNAPSHOT, "InputFile.newStream"), 10) - .addCopies(new FileOperationUtils.FileOperation(SNAPSHOT, "InputFile.length"), 6) - .add(new FileOperationUtils.FileOperation(PUFFIN, "InputFile.newInput")) - .build()); - - assertUpdate("DROP SCHEMA " + schemaName + " CASCADE"); - } - @Test public void testV2TableEnsureEqualityDeleteFilesAreReadOnce() throws Exception @@ -990,89 +949,6 @@ public void testV2TableEnsureEqualityDeleteFilesAreReadOnce() assertUpdate("DROP TABLE " + tableName); } - @Test - public void testDeletionVector() - { - String tableName = "test_deletion_vector" + randomNameSuffix(); - assertUpdate("CREATE TABLE " + tableName + "(id INT, age INT) WITH (format_version = 3)"); - assertUpdate("INSERT INTO " + tableName + " VALUES (1, 10), (2, 20), (3, 30)", 3); - - assertFileSystemAccesses( - "DELETE FROM " + tableName + " WHERE id = 1", - ImmutableMultiset.builder() - .add(new FileOperationUtils.FileOperation(METADATA_JSON, "OutputFile.create")) - .addCopies(new FileOperationUtils.FileOperation(METADATA_JSON, "InputFile.newStream"), 2) - .add(new FileOperationUtils.FileOperation(MANIFEST, "OutputFile.create")) - .addCopies(new FileOperationUtils.FileOperation(MANIFEST, "InputFile.newStream"), 2) - .add(new FileOperationUtils.FileOperation(SNAPSHOT, "OutputFile.create")) - .addCopies(new FileOperationUtils.FileOperation(SNAPSHOT, "InputFile.newStream"), 3) - .addCopies(new FileOperationUtils.FileOperation(SNAPSHOT, "InputFile.length"), 3) - .add(new FileOperationUtils.FileOperation(PUFFIN, "OutputFile.create")) - .build()); - - assertFileSystemAccesses( - "SELECT * FROM " + tableName, - ImmutableMultiset.builder() - .add(new FileOperationUtils.FileOperation(METADATA_JSON, "InputFile.newStream")) - .addCopies(new FileOperationUtils.FileOperation(MANIFEST, "InputFile.newStream"), 2) - .add(new FileOperationUtils.FileOperation(SNAPSHOT, "InputFile.newStream")) - .add(new FileOperationUtils.FileOperation(SNAPSHOT, "InputFile.length")) - .add(new FileOperationUtils.FileOperation(PUFFIN, "InputFile.newInput")) - .build()); - - assertFileSystemAccesses( - "DELETE FROM " + tableName + " WHERE id = 2", - ImmutableMultiset.builder() - .add(new FileOperationUtils.FileOperation(METADATA_JSON, "OutputFile.create")) - .addCopies(new FileOperationUtils.FileOperation(METADATA_JSON, "InputFile.newStream"), 2) - .addCopies(new FileOperationUtils.FileOperation(MANIFEST, "OutputFile.create"), 2) - .addCopies(new FileOperationUtils.FileOperation(MANIFEST, "InputFile.newStream"), 6) - .add(new FileOperationUtils.FileOperation(SNAPSHOT, "OutputFile.create")) - .addCopies(new FileOperationUtils.FileOperation(SNAPSHOT, "InputFile.newStream"), 3) - .addCopies(new FileOperationUtils.FileOperation(SNAPSHOT, "InputFile.length"), 3) - .add(new FileOperationUtils.FileOperation(PUFFIN, "InputFile.newInput")) - .add(new FileOperationUtils.FileOperation(PUFFIN, "InputFile.newStream")) - .add(new FileOperationUtils.FileOperation(PUFFIN, "OutputFile.create")) - .build()); - - assertFileSystemAccesses( - "SELECT * FROM " + tableName, - ImmutableMultiset.builder() - .add(new FileOperationUtils.FileOperation(METADATA_JSON, "InputFile.newStream")) - .addCopies(new FileOperationUtils.FileOperation(MANIFEST, "InputFile.newStream"), 2) - .add(new FileOperationUtils.FileOperation(SNAPSHOT, "InputFile.newStream")) - .add(new FileOperationUtils.FileOperation(SNAPSHOT, "InputFile.length")) - .add(new FileOperationUtils.FileOperation(PUFFIN, "InputFile.newInput")) - .build()); - - - assertFileSystemAccesses( - "ALTER TABLE " + tableName + " EXECUTE OPTIMIZE", - ImmutableMultiset.builder() - .addCopies(new FileOperationUtils.FileOperation(SNAPSHOT, "InputFile.length"), 3) - .add(new FileOperation(STATS, "InputFile.newStream")) - .addCopies(new FileOperation(MANIFEST, "InputFile.newStream"),4) - .addCopies(new FileOperationUtils.FileOperation(METADATA_JSON, "InputFile.newStream"), 3) - .add(new FileOperationUtils.FileOperation(SNAPSHOT, "OutputFile.create")) - .add(new FileOperationUtils.FileOperation(PUFFIN, "InputFile.newInput")) - .addCopies(new FileOperationUtils.FileOperation(SNAPSHOT, "InputFile.newStream"),3) - .addCopies(new FileOperationUtils.FileOperation(METADATA_JSON, "OutputFile.create"), 2) - .add(new FileOperation(STATS, "OutputFile.create")) - .addCopies(new FileOperation(MANIFEST, "OutputFile.create"), 3) - .build()); - - assertFileSystemAccesses( - "SELECT * FROM " + tableName, - ImmutableMultiset.builder() - .add(new FileOperationUtils.FileOperation(METADATA_JSON, "InputFile.newStream")) - .add(new FileOperationUtils.FileOperation(MANIFEST, "InputFile.newStream")) - .add(new FileOperationUtils.FileOperation(SNAPSHOT, "InputFile.newStream")) - .add(new FileOperationUtils.FileOperation(SNAPSHOT, "InputFile.length")) - .build()); - - assertUpdate("DROP TABLE " + tableName); - } - @Test public void testShowTables() { diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMinioOrcConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMinioOrcConnectorTest.java index cf90fb7f2ff8..7f6a39290266 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMinioOrcConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMinioOrcConnectorTest.java @@ -29,7 +29,6 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.io.Resources.getResource; -import static io.trino.plugin.iceberg.IcebergConfig.FORMAT_VERSION_SUPPORT_MAX; import static io.trino.plugin.iceberg.IcebergFileFormat.ORC; import static io.trino.plugin.iceberg.IcebergTestUtils.checkOrcFileSorting; import static io.trino.testing.TestingNames.randomNameSuffix; @@ -51,7 +50,7 @@ public class TestIcebergMinioOrcConnectorTest public TestIcebergMinioOrcConnectorTest() { - super(ORC, FORMAT_VERSION_SUPPORT_MAX); + super(ORC); } @Override @@ -66,7 +65,6 @@ protected QueryRunner createQueryRunner() .setIcebergProperties( ImmutableMap.builder() .put("iceberg.file-format", format.name()) - .put("iceberg.format-version", "3") .put("fs.hadoop.enabled", "true") .put("fs.native-s3.enabled", "true") .put("s3.aws-access-key", MINIO_ACCESS_KEY) diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergParquetConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergParquetConnectorTest.java similarity index 98% rename from plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergParquetConnectorTest.java rename to plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergParquetConnectorTest.java index 013f7bb3bfcf..28481bdc8ebe 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergParquetConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergParquetConnectorTest.java @@ -41,12 +41,12 @@ import static java.time.ZoneOffset.UTC; import static org.assertj.core.api.Assertions.assertThat; -public abstract class BaseIcebergParquetConnectorTest +public class TestIcebergParquetConnectorTest extends BaseIcebergConnectorTest { - public BaseIcebergParquetConnectorTest(int formatVersion) + public TestIcebergParquetConnectorTest() { - super(PARQUET, formatVersion); + super(PARQUET); } @Override diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergParquetV2ConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergParquetV2ConnectorTest.java deleted file mode 100644 index 10b0d5fe8118..000000000000 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergParquetV2ConnectorTest.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.plugin.iceberg; - -public class TestIcebergParquetV2ConnectorTest - extends BaseIcebergParquetConnectorTest -{ - public TestIcebergParquetV2ConnectorTest() - { - super(2); - } -} diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergParquetV3ConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergParquetV3ConnectorTest.java deleted file mode 100644 index 2a16a85e98a6..000000000000 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergParquetV3ConnectorTest.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.plugin.iceberg; - -public class TestIcebergParquetV3ConnectorTest - extends BaseIcebergParquetConnectorTest -{ - public TestIcebergParquetV3ConnectorTest() - { - super(3); - } -} diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java index 197c8c8430b8..d11334fbc4a8 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java @@ -95,7 +95,6 @@ import static io.trino.plugin.iceberg.IcebergTestUtils.getHiveMetastore; import static io.trino.plugin.iceberg.IcebergTestUtils.getMetadataFileAndUpdatedMillis; import static io.trino.plugin.iceberg.IcebergTestUtils.getTrinoCatalog; -import static io.trino.plugin.iceberg.IcebergTestUtils.listFiles; import static io.trino.plugin.iceberg.util.EqualityDeleteUtils.writeEqualityDeleteForTable; import static io.trino.plugin.iceberg.util.EqualityDeleteUtils.writeEqualityDeleteForTableWithSchema; import static io.trino.spi.type.BigintType.BIGINT; @@ -682,69 +681,18 @@ public void testOptimizingWholeTableRemovesDeleteFiles() } @Test - void testRemoveOrphanDeletionVectors() - throws Exception - { - Session singleWriterPerTask = Session.builder(getSession()) - .setSystemProperty("task_min_writer_count", "1") - .build(); - - Session shortRetentionUnlocked = Session.builder(getSession()) - .setCatalogSessionProperty("iceberg", "expire_snapshots_min_retention", "0s") - .setCatalogSessionProperty("iceberg", "remove_orphan_files_min_retention", "0s") - .build(); - - try (TestTable table = newTrinoTable("expire_snapshots_dv", "(x int) WITH (format_version = 3)", List.of("1", "2"))) { - Table icebergTable = loadTable(table.getName()); - String dataLocation = icebergTable.location() + "/data"; - - assertUpdate("DELETE FROM " + table.getName() + " WHERE x = 1", 1); - assertThat(listFiles(fileSystemFactory.create(SESSION), dataLocation)) - .anyMatch(file -> file.endsWith(".puffin")); - - assertUpdate(singleWriterPerTask, "ALTER TABLE " + table.getName() + " EXECUTE optimize"); - computeActual(shortRetentionUnlocked, "ALTER TABLE " + table.getName() + " EXECUTE expire_snapshots(retention_threshold => '0s')"); - computeActual(shortRetentionUnlocked, "ALTER TABLE " + table.getName() + " EXECUTE remove_orphan_files(retention_threshold => '0s')"); - assertThat(query("SELECT * FROM " + table.getName())).matches("VALUES 2"); - - assertThat(listFiles(fileSystemFactory.create(SESSION), dataLocation)) - .noneMatch(file -> file.endsWith(".puffin")); - } - } - - @Test - public void testUpgradeTableToV3FromTrino() + public void testUpgradeTableToV2FromTrino() { String tableName = "test_upgrade_table_to_v2_from_trino_" + randomNameSuffix(); assertUpdate("CREATE TABLE " + tableName + " WITH (format_version = 1) AS SELECT * FROM tpch.tiny.nation", 25); assertThat(loadTable(tableName).operations().current().formatVersion()).isEqualTo(1); - - // v1 -> v2 assertUpdate("ALTER TABLE " + tableName + " SET PROPERTIES format_version = 2"); assertThat(loadTable(tableName).operations().current().formatVersion()).isEqualTo(2); assertQuery("SELECT * FROM " + tableName, "SELECT * FROM nation"); - - // v2 -> v3 - assertUpdate("ALTER TABLE " + tableName + " SET PROPERTIES format_version = 3"); - assertThat(loadTable(tableName).operations().current().formatVersion()).isEqualTo(3); - assertQuery("SELECT * FROM " + tableName, "SELECT * FROM nation"); } @Test - public void testUpgradeTableFromV1ToV3() - { - String tableName = "test_upgrade_table_from_v1_to_v3_from_trino_" + randomNameSuffix(); - assertUpdate("CREATE TABLE " + tableName + " WITH (format_version = 1) AS SELECT * FROM tpch.tiny.nation", 25); - assertThat(loadTable(tableName).operations().current().formatVersion()).isEqualTo(1); - - // v1 -> v3 - assertUpdate("ALTER TABLE " + tableName + " SET PROPERTIES format_version = 3"); - assertThat(loadTable(tableName).operations().current().formatVersion()).isEqualTo(3); - assertQuery("SELECT * FROM " + tableName, "SELECT * FROM nation"); - } - - @Test - public void testDowngradingFromV2Fails() + public void testDowngradingV2TableToV1Fails() { String tableName = "test_downgrading_v2_table_to_v1_fails_" + randomNameSuffix(); assertUpdate("CREATE TABLE " + tableName + " WITH (format_version = 2) AS SELECT * FROM tpch.tiny.nation", 25); @@ -756,25 +704,6 @@ public void testDowngradingFromV2Fails() .hasMessage("Cannot downgrade v2 table to v1"); } - @Test - public void testDowngradingFromV3Fails() - { - String tableName = "test_downgrading_from_v3_fails_" + randomNameSuffix(); - assertUpdate("CREATE TABLE " + tableName + " WITH (format_version = 3) AS SELECT * FROM tpch.tiny.nation", 25); - assertThat(loadTable(tableName).operations().current().formatVersion()).isEqualTo(3); - - assertThat(query("ALTER TABLE " + tableName + " SET PROPERTIES format_version = 2")) - .failure() - .hasMessage("Failed to set new property values") - .rootCause() - .hasMessage("Cannot downgrade v3 table to v2"); - assertThat(query("ALTER TABLE " + tableName + " SET PROPERTIES format_version = 1")) - .failure() - .hasMessage("Failed to set new property values") - .rootCause() - .hasMessage("Cannot downgrade v3 table to v1"); - } - @Test public void testUpgradingToInvalidVersionFails() { @@ -782,7 +711,7 @@ public void testUpgradingToInvalidVersionFails() assertUpdate("CREATE TABLE " + tableName + " WITH (format_version = 2) AS SELECT * FROM tpch.tiny.nation", 25); assertThat(loadTable(tableName).operations().current().formatVersion()).isEqualTo(2); assertThat(query("ALTER TABLE " + tableName + " SET PROPERTIES format_version = 42")) - .failure().hasMessage("line 1:79: Unable to set catalog 'iceberg' table property 'format_version' to [42]: format_version must be between 1 and 3"); + .failure().hasMessage("line 1:79: Unable to set catalog 'iceberg' table property 'format_version' to [42]: format_version must be between 1 and 2"); } @Test @@ -1486,21 +1415,6 @@ private void testMapValueSchemaChange(String format, String expectedValue) } } - @Test - void testPositionDeleteAndDeletionVector() - { - try (TestTable table = newTrinoTable("test_delete_v2_v3", "(x int) WITH (format_version = 2)", List.of("1", "2", "3", "4"))) { - assertUpdate("DELETE FROM " + table.getName() + " WHERE x = 1", 1); - assertUpdate("ALTER TABLE " + table.getName() + " SET PROPERTIES format_version = 3"); - - assertUpdate("DELETE FROM " + table.getName() + " WHERE x = 2", 1); - assertThat(query("SELECT * FROM " + table.getName())).matches("VALUES 3, 4"); - - assertUpdate("DELETE FROM " + table.getName() + " WHERE x = 3", 1); - assertThat(query("SELECT * FROM " + table.getName())).matches("VALUES 4"); - } - } - @Test public void testUpdateAfterEqualityDelete() throws Exception diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergS3TablesConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergS3TablesConnectorSmokeTest.java index b2e153d97426..8b33492cbb80 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergS3TablesConnectorSmokeTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergS3TablesConnectorSmokeTest.java @@ -140,15 +140,6 @@ public void testShowCreateTable() "\\)"); } - @Test - @Override - public void testDeleteWithV3Format() - { - assertThatThrownBy(super::testDeleteWithV3Format) - .hasMessageContaining("Failed to create transaction") - .hasStackTraceContaining("Unsupported format version: v3"); - } - @Test @Override public void testRenameSchema() diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergUnityRestCatalogConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergUnityRestCatalogConnectorSmokeTest.java index 65322b4fd9e3..f1052173d5d3 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergUnityRestCatalogConnectorSmokeTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergUnityRestCatalogConnectorSmokeTest.java @@ -261,14 +261,6 @@ public void testDeleteRowsConcurrently() .hasMessageContaining("Access Denied"); } - @Test - @Override - public void testDeleteWithV3Format() - { - assertThatThrownBy(super::testDeleteWithV3Format) - .hasMessageContaining("Access Denied"); - } - @Test @Override public void testCreateOrReplaceTable() diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergVendingRestCatalogConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergVendingRestCatalogConnectorSmokeTest.java index d9dee8e88c76..48c73bb6fb8a 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergVendingRestCatalogConnectorSmokeTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergVendingRestCatalogConnectorSmokeTest.java @@ -312,14 +312,6 @@ public void testDropTableWithNonExistentTableLocation() .hasMessageMatching("Failed to load table: (.*)"); } - @Test - @Override - public void testDeleteWithV3Format() - { - assertThatThrownBy(super::testDeleteWithV3Format) - .hasMessageMatching("Unsupported format version: v3.*"); - } - @Override protected boolean isFileSorted(Location path, String sortColumnName) { diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/snowflake/TestIcebergSnowflakeCatalogConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/snowflake/TestIcebergSnowflakeCatalogConnectorSmokeTest.java index 0e08f477c09e..40a14700347d 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/snowflake/TestIcebergSnowflakeCatalogConnectorSmokeTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/snowflake/TestIcebergSnowflakeCatalogConnectorSmokeTest.java @@ -257,14 +257,6 @@ public void testHiddenPathColumn() .hasMessageContaining("Snowflake managed Iceberg tables do not support modifications"); } - @Test - @Override - public void testDeleteWithV3Format() - { - assertThatThrownBy(super::testDeleteWithV3Format) - .hasMessageContaining("Snowflake managed Iceberg tables do not support modifications"); - } - @Test @Override public void testDeleteRowsConcurrently() diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/delete/TestDeletionVectors.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/delete/TestDeletionVectors.java deleted file mode 100644 index fa76188a03dc..000000000000 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/delete/TestDeletionVectors.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.plugin.iceberg.delete; - -import io.trino.filesystem.Location; -import io.trino.filesystem.TrinoFileSystemFactory; -import io.trino.filesystem.TrinoInput; -import io.trino.metastore.HiveMetastore; -import io.trino.plugin.iceberg.IcebergQueryRunner; -import io.trino.plugin.iceberg.IcebergTestUtils; -import io.trino.testing.AbstractTestQueryFramework; -import io.trino.testing.QueryRunner; -import io.trino.testing.sql.TestTable; -import org.apache.iceberg.BaseTable; -import org.apache.iceberg.DeleteFile; -import org.apache.iceberg.deletes.BaseDVFileWriter; -import org.apache.iceberg.io.DeleteWriteResult; -import org.apache.iceberg.io.OutputFileFactory; -import org.junit.jupiter.api.Test; -import org.roaringbitmap.longlong.LongBitmapDataProvider; -import org.roaringbitmap.longlong.Roaring64Bitmap; - -import java.util.List; - -import static io.trino.plugin.iceberg.IcebergTestUtils.getFileSystemFactory; -import static io.trino.plugin.iceberg.IcebergTestUtils.getHiveMetastore; -import static io.trino.plugin.iceberg.delete.DeletionVectors.readDeletionVector; -import static io.trino.testing.TestingConnectorSession.SESSION; -import static org.apache.iceberg.FileFormat.PUFFIN; -import static org.assertj.core.api.Assertions.assertThat; - -final class TestDeletionVectors - extends AbstractTestQueryFramework -{ - private HiveMetastore metastore; - private TrinoFileSystemFactory fileSystemFactory; - - @Override - protected QueryRunner createQueryRunner() - throws Exception - { - QueryRunner queryRunner = IcebergQueryRunner.builder() - .addIcebergProperty("iceberg.format-version", "3") - .build(); - metastore = getHiveMetastore(queryRunner); - fileSystemFactory = getFileSystemFactory(queryRunner); - return queryRunner; - } - - @Test - void testReadDeletionVector() - throws Exception - { - try (TestTable table = newTrinoTable("test_dv", "(x int)", List.of("1", "2", "3", "4", "5"))) { - BaseTable icebergTable = loadTable(table.getName()); - String filePath = (String) computeScalar("SELECT file_path FROM \"" + table.getName() + "$files\""); - - // Write deletion vectors in the Puffin format - OutputFileFactory fileFactory = OutputFileFactory.builderFor(icebergTable, 1, 1).format(PUFFIN).build(); - BaseDVFileWriter writer = new BaseDVFileWriter(fileFactory, _ -> null); - writer.delete(filePath, 0, icebergTable.spec(), null); - writer.delete(filePath, 2, icebergTable.spec(), null); - writer.delete(filePath, 4, icebergTable.spec(), null); - writer.close(); - - DeleteWriteResult result = writer.result(); - List deleteFiles = result.deleteFiles(); - assertThat(deleteFiles).hasSize(1); - DeleteFile deleteFile = deleteFiles.getFirst(); - - // Verify deletion vectors - LongBitmapDataProvider deletedRows = new Roaring64Bitmap(); - try (TrinoInput input = fileSystemFactory.create(SESSION).newInputFile(Location.of(deleteFile.location())).newInput()) { - readDeletionVector(input, deleteFile.recordCount(), deleteFile.contentOffset(), deleteFile.contentSizeInBytes(), deletedRows); - } - assertThat(deletedRows.getLongCardinality()).isEqualTo(3); - assertThat(deletedRows.contains(0)).isTrue(); - assertThat(deletedRows.contains(1)).isFalse(); - assertThat(deletedRows.contains(2)).isTrue(); - assertThat(deletedRows.contains(3)).isFalse(); - assertThat(deletedRows.contains(4)).isTrue(); - } - } - - private BaseTable loadTable(String tableName) - { - return IcebergTestUtils.loadTable(tableName, metastore, fileSystemFactory, "iceberg", "tpch"); - } -} diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/util/FileOperationUtils.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/util/FileOperationUtils.java index 4e2465314ac5..d93e19527807 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/util/FileOperationUtils.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/util/FileOperationUtils.java @@ -19,7 +19,6 @@ import java.util.List; import java.util.function.Predicate; -import java.util.regex.Pattern; import static io.trino.filesystem.tracing.FileSystemAttributes.FILE_LOCATION; import static io.trino.plugin.iceberg.util.FileOperationUtils.FileType.DATA; @@ -30,8 +29,6 @@ public final class FileOperationUtils { - private static final Pattern MANIFEST_PATTERN = Pattern.compile(".*-m[0-9]*.avro"); - private FileOperationUtils() {} public static Multiset getOperations(List spans) @@ -76,7 +73,6 @@ public enum FileType SNAPSHOT, MANIFEST, STATS, - PUFFIN, DATA, DELETE, METASTORE, @@ -90,15 +86,12 @@ public static FileType fromFilePath(String path) if (path.contains("/snap-")) { return SNAPSHOT; } - if (MANIFEST_PATTERN.matcher(path).matches()) { + if (path.endsWith("-m0.avro")) { return MANIFEST; } if (path.endsWith(".stats")) { return STATS; } - if (path.endsWith(".puffin")) { - return PUFFIN; - } if (path.contains("/data/") && (path.endsWith(".orc") || path.endsWith(".parquet"))) { return DATA; } diff --git a/pom.xml b/pom.xml index df0cc5ee2e3d..3ec73cf8ed41 100644 --- a/pom.xml +++ b/pom.xml @@ -2713,21 +2713,6 @@ shaded.parquet.it.unimi.dsi.fastutil - - - - org.apache.orc - orc-core - - - org.apache.orc - orc-format - - - - org.apache.orc - - diff --git a/testing/trino-faulttolerant-tests/src/test/java/io/trino/faulttolerant/iceberg/TestIcebergParquetFaultTolerantExecutionConnectorTest.java b/testing/trino-faulttolerant-tests/src/test/java/io/trino/faulttolerant/iceberg/TestIcebergParquetFaultTolerantExecutionConnectorTest.java index a3a2bc975d24..c7b1a17de184 100644 --- a/testing/trino-faulttolerant-tests/src/test/java/io/trino/faulttolerant/iceberg/TestIcebergParquetFaultTolerantExecutionConnectorTest.java +++ b/testing/trino-faulttolerant-tests/src/test/java/io/trino/faulttolerant/iceberg/TestIcebergParquetFaultTolerantExecutionConnectorTest.java @@ -16,15 +16,14 @@ import io.trino.filesystem.Location; import io.trino.plugin.exchange.filesystem.FileSystemExchangePlugin; import io.trino.plugin.exchange.filesystem.containers.MinioStorage; -import io.trino.plugin.iceberg.BaseIcebergParquetConnectorTest; import io.trino.plugin.iceberg.IcebergQueryRunner; +import io.trino.plugin.iceberg.TestIcebergParquetConnectorTest; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.parallel.Isolated; import static io.trino.plugin.exchange.filesystem.containers.MinioStorage.getExchangeManagerProperties; -import static io.trino.plugin.iceberg.IcebergConfig.FORMAT_VERSION_SUPPORT_MAX; import static io.trino.plugin.iceberg.IcebergTestUtils.checkParquetFileSorting; import static io.trino.testing.FaultTolerantExecutionConnectorTestHelper.getExtraProperties; import static io.trino.testing.TestingNames.randomNameSuffix; @@ -35,15 +34,10 @@ @Isolated @TestInstance(PER_CLASS) public class TestIcebergParquetFaultTolerantExecutionConnectorTest - extends BaseIcebergParquetConnectorTest + extends TestIcebergParquetConnectorTest { private MinioStorage minioStorage; - public TestIcebergParquetFaultTolerantExecutionConnectorTest() - { - super(FORMAT_VERSION_SUPPORT_MAX); - } - @Override protected IcebergQueryRunner.Builder createQueryRunnerBuilder() { diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java index e2bdc493b1d0..f80c01e50989 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java @@ -1963,19 +1963,12 @@ public void testTrinoReadsSparkRowLevelDeletesWithRowTypes(StorageFormat tableSt @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormats") public void testSparkReadsTrinoRowLevelDeletes(StorageFormat storageFormat) - { - // V1 doesn't support row level deletes - testSparkReadsTrinoRowLevelDeletes(storageFormat, 2); - testSparkReadsTrinoRowLevelDeletes(storageFormat, 3); - } - - private static void testSparkReadsTrinoRowLevelDeletes(StorageFormat storageFormat, int formatVersion) { String tableName = toLowerCase(format("test_spark_reads_trino_row_level_deletes_%s_%s", storageFormat.name(), randomNameSuffix())); String sparkTableName = sparkTableName(tableName); String trinoTableName = trinoTableName(tableName); - onTrino().executeQuery("CREATE TABLE " + trinoTableName + "(a INT, b INT) WITH(partitioning = ARRAY['b'], format_version = " + formatVersion + ", format = '" + storageFormat.name() + "')"); + onTrino().executeQuery("CREATE TABLE " + trinoTableName + "(a INT, b INT) WITH(partitioning = ARRAY['b'], format_version = 2, format = '" + storageFormat.name() + "')"); onTrino().executeQuery("INSERT INTO " + trinoTableName + " VALUES (1, 2), (2, 2), (3, 2), (11, 12), (12, 12), (13, 12)"); // Delete one row in a file onTrino().executeQuery("DELETE FROM " + trinoTableName + " WHERE a = 13"); @@ -1995,49 +1988,15 @@ private static void testSparkReadsTrinoRowLevelDeletes(StorageFormat storageForm onSpark().executeQuery("DROP TABLE " + sparkTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormats") - public void testSparkReadsTrinoMultipleDeleteFiles(StorageFormat storageFormat) - { - // V1 doesn't support row level deletes - testSparkReadsTrinoMultipleDeleteFiles(storageFormat, 2); - testSparkReadsTrinoMultipleDeleteFiles(storageFormat, 3); - } - - private static void testSparkReadsTrinoMultipleDeleteFiles(StorageFormat storageFormat, int formatVersion) - { - String tableName = toLowerCase(format("test_spark_reads_trino_multiple_delete_files_%s_%s", storageFormat.name(), randomNameSuffix())); - String sparkTableName = sparkTableName(tableName); - String trinoTableName = trinoTableName(tableName); - - onTrino().executeQuery("CREATE TABLE " + trinoTableName + "(a INT, b INT) WITH(partitioning = ARRAY['b'], format_version = " + formatVersion + ", format = '" + storageFormat.name() + "')"); - onTrino().executeQuery("INSERT INTO " + trinoTableName + " VALUES (1, 2), (2, 2), (3, 2), (11, 12), (12, 12), (13, 12)"); - - // Delete one row from each data file - onTrino().executeQuery("DELETE FROM " + trinoTableName + " WHERE a IN (1, 11)"); - - List expected = ImmutableList.of(row(2, 2), row(3, 2), row(12, 12), row(13, 12)); - assertThat(onTrino().executeQuery("SELECT * FROM " + trinoTableName)).containsOnly(expected); - assertThat(onSpark().executeQuery("SELECT * FROM " + sparkTableName)).containsOnly(expected); - - onSpark().executeQuery("DROP TABLE " + sparkTableName); - } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormats") public void testSparkReadsTrinoRowLevelDeletesWithRowTypes(StorageFormat storageFormat) - { - // V1 doesn't support row level deletes - testSparkReadsTrinoRowLevelDeletesWithRowTypes(storageFormat, 2); - testSparkReadsTrinoRowLevelDeletesWithRowTypes(storageFormat, 3); - } - - private static void testSparkReadsTrinoRowLevelDeletesWithRowTypes(StorageFormat storageFormat, int formatVersion) { String tableName = toLowerCase(format("test_spark_reads_trino_row_level_deletes_row_types_%s_%s", storageFormat.name(), randomNameSuffix())); String sparkTableName = sparkTableName(tableName); String trinoTableName = trinoTableName(tableName); onTrino().executeQuery("CREATE TABLE " + trinoTableName + "(part_key INT, int_t INT, row_t ROW(a INT, b INT)) " + - "WITH(partitioning = ARRAY['part_key'], format_version = " + formatVersion + ", format = '" + storageFormat.name() + "') "); + "WITH(partitioning = ARRAY['part_key'], format_version = 2, format = '" + storageFormat.name() + "') "); onTrino().executeQuery("INSERT INTO " + trinoTableName + " VALUES (1, 1, row(1, 2)), (1, 2, row(3, 4)), (1, 3, row(5, 6)), (2, 4, row(1, 2))"); onTrino().executeQuery("DELETE FROM " + trinoTableName + " WHERE int_t = 2"); @@ -2116,20 +2075,14 @@ public void testMissingMetrics() } @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}) - public void testOptimizeIcebergTable() - { - testOptimizeIcebergTable(2); - testOptimizeIcebergTable(3); - } - - private static void testOptimizeIcebergTable(int formatVersion) + public void testOptimizeOnV2IcebergTable() { String tableName = format("test_optimize_on_v2_iceberg_table_%s", randomNameSuffix()); String sparkTableName = sparkTableName(tableName); String trinoTableName = trinoTableName(tableName); onSpark().executeQuery("CREATE TABLE " + sparkTableName + "(a INT, b INT) " + "USING ICEBERG PARTITIONED BY (b) " + - "TBLPROPERTIES ('format-version'='" + formatVersion + "', 'write.delete.mode'='merge-on-read')"); + "TBLPROPERTIES ('format-version'='2', 'write.delete.mode'='merge-on-read')"); onSpark().executeQuery("INSERT INTO " + sparkTableName + " VALUES (1, 2), (2, 2), (3, 2), (11, 12), (12, 12), (13, 12)"); onTrino().executeQuery(format("ALTER TABLE %s EXECUTE OPTIMIZE", trinoTableName));