Skip to content

Remove support for deletion vectors in Iceberg #25550

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/src/main/sphinx/connector/iceberg.md
Original file line number Diff line number Diff line change
Expand Up @@ -945,7 +945,7 @@ connector using a {doc}`WITH </sql/create-table-as>` clause.
- Optionally specifies the file system location URI for the table.
* - `format_version`
- Optionally specifies the format version of the Iceberg specification to use
for new tables; either `1`, `2` or `3`. Defaults to `2`. Version `2` is required
for new tables; either `1` or `2`. Defaults to `2`. Version `2` is required
for row level deletes.
* - `max_commit_retry`
- Number of times to retry a commit before failing. Defaults to the value of
Expand Down
6 changes: 0 additions & 6 deletions plugin/trino-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -223,12 +223,6 @@
<artifactId>iceberg-core</artifactId>
</dependency>

<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-data</artifactId>
<version>${dep.iceberg.version}</version>
</dependency>

<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-nessie</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,28 +13,22 @@
*/
package io.trino.plugin.iceberg;

import com.google.common.collect.ImmutableList;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.FileFormat;

import java.util.List;
import java.util.Optional;
import java.util.OptionalLong;

import static java.util.Objects.requireNonNull;

public record CommitTaskData(
String path,
FileFormat fileFormat,
IcebergFileFormat fileFormat,
long fileSizeInBytes,
MetricsWrapper metrics,
String partitionSpecJson,
Optional<String> partitionDataJson,
FileContent content,
Optional<String> referencedDataFile,
List<String> deletionVectorFiles,
OptionalLong deletionVectorContentOffset,
OptionalLong deletionVectorContentSize,
Optional<List<Long>> fileSplitOffsets)
{
public CommitTaskData
Expand All @@ -46,9 +40,6 @@ public record CommitTaskData(
requireNonNull(partitionDataJson, "partitionDataJson is null");
requireNonNull(content, "content is null");
requireNonNull(referencedDataFile, "referencedDataFile is null");
deletionVectorFiles = ImmutableList.copyOf(deletionVectorFiles);
requireNonNull(fileSplitOffsets, "fileSplitOffsets is null");
requireNonNull(deletionVectorContentOffset, "deletionVectorContentOffset is null");
requireNonNull(deletionVectorContentSize, "deletionVectorContentSize is null");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import io.trino.spi.Page;
import io.trino.spi.TrinoException;
import io.trino.spi.type.Type;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Schema;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.data.Record;
Expand All @@ -37,7 +36,6 @@
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_WRITER_OPEN_ERROR;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static java.util.Objects.requireNonNull;
import static org.apache.iceberg.FileFormat.AVRO;
import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION;

public final class IcebergAvroFileWriter
Expand All @@ -48,7 +46,6 @@ public final class IcebergAvroFileWriter
// Use static table name instead of the actual name because it becomes outdated once the table is renamed
public static final String AVRO_TABLE_NAME = "table";

private final String location;
private final Schema icebergSchema;
private final List<Type> types;
private final FileAppender<Record> avroWriter;
Expand All @@ -61,7 +58,6 @@ public IcebergAvroFileWriter(
List<Type> types,
HiveCompressionCodec hiveCompressionCodec)
{
this.location = requireNonNull(file.location(), "location is null");
this.rollbackAction = requireNonNull(rollbackAction, "rollbackAction null");
this.icebergSchema = requireNonNull(icebergSchema, "icebergSchema is null");
this.types = ImmutableList.copyOf(requireNonNull(types, "types is null"));
Expand All @@ -75,22 +71,10 @@ public IcebergAvroFileWriter(
.build();
}
catch (IOException e) {
throw new TrinoException(ICEBERG_WRITER_OPEN_ERROR, "Error creating Avro file: " + location, e);
throw new TrinoException(ICEBERG_WRITER_OPEN_ERROR, "Error creating Avro file: " + file.location(), e);
}
}

@Override
public FileFormat fileFormat()
{
return AVRO;
}

@Override
public String location()
{
return location;
}

@Override
public long getWrittenBytes()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@
public class IcebergConfig
{
public static final int FORMAT_VERSION_SUPPORT_MIN = 1;
private static final int FORMAT_VERSION_DEFAULT = 2;
public static final int FORMAT_VERSION_SUPPORT_MAX = 3;
public static final int FORMAT_VERSION_SUPPORT_MAX = 2;
public static final String EXTENDED_STATISTICS_CONFIG = "iceberg.extended-statistics.enabled";
public static final String EXTENDED_STATISTICS_DESCRIPTION = "Enable collection (ANALYZE) and use of extended statistics.";
public static final String COLLECT_EXTENDED_STATISTICS_ON_WRITE_DESCRIPTION = "Collect extended statistics during writes";
Expand All @@ -75,7 +74,7 @@ public class IcebergConfig
private boolean registerTableProcedureEnabled;
private boolean addFilesProcedureEnabled;
private Optional<String> hiveCatalogName = Optional.empty();
private int formatVersion = FORMAT_VERSION_DEFAULT;
private int formatVersion = FORMAT_VERSION_SUPPORT_MAX;
private Duration expireSnapshotsMinRetention = new Duration(7, DAYS);
private Duration removeOrphanFilesMinRetention = new Duration(7, DAYS);
private DataSize targetMaxFileSize = DataSize.of(1, GIGABYTE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@
*/
package io.trino.plugin.iceberg;

import com.google.common.collect.ImmutableList;
import io.trino.plugin.hive.FileWriter;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Metrics;

import java.util.List;
Expand All @@ -24,15 +22,6 @@
public interface IcebergFileWriter
extends FileWriter
{
FileFormat fileFormat();

String location();

default List<String> rewrittenDeleteFiles()
{
return ImmutableList.of();
}

FileMetrics getFileMetrics();

record FileMetrics(Metrics metrics, Optional<List<Long>> splitOffsets) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,32 +33,25 @@
import io.trino.plugin.hive.HiveCompressionCodec;
import io.trino.plugin.hive.NodeVersion;
import io.trino.plugin.hive.orc.OrcWriterConfig;
import io.trino.plugin.iceberg.delete.DeletionVectorWriter;
import io.trino.plugin.iceberg.fileio.ForwardingOutputFile;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeManager;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.MetricsConfig;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.deletes.PositionDeleteIndex;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.DeleteFileSet;
import org.weakref.jmx.Managed;

import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.IntStream;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.plugin.hive.HiveCompressionCodecs.toCompressionCodec;
import static io.trino.plugin.hive.HiveMetadata.TRINO_QUERY_ID_NAME;
Expand Down Expand Up @@ -89,7 +82,6 @@
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static org.apache.iceberg.MetadataColumns.DELETE_FILE_POS;
import static org.apache.iceberg.TableProperties.DEFAULT_WRITE_METRICS_MODE;
import static org.apache.iceberg.io.DeleteSchemaUtil.pathPosSchema;
import static org.apache.iceberg.parquet.ParquetSchemaUtil.convert;
Expand Down Expand Up @@ -147,37 +139,16 @@ public IcebergFileWriter createPositionDeleteWriter(
TrinoFileSystem fileSystem,
Location outputPath,
ConnectorSession session,
String dataFilePath,
FileFormat fileFormat,
PartitionSpec partitionSpec,
Optional<PartitionData> partition,
Map<String, String> storageProperties,
Map<String, DeleteFileSet> previousDeleteFiles)
IcebergFileFormat fileFormat,
Map<String, String> storageProperties)
{
return switch (fileFormat) {
case PUFFIN -> createDeletionVectorWriter(nodeVersion, fileSystem, outputPath, dataFilePath, partitionSpec, partition, previousDeleteFiles);
case PARQUET -> createParquetWriter(FULL_METRICS_CONFIG, fileSystem, outputPath, POSITION_DELETE_SCHEMA, session, storageProperties);
case ORC -> createOrcWriter(FULL_METRICS_CONFIG, fileSystem, outputPath, POSITION_DELETE_SCHEMA, session, storageProperties, DataSize.ofBytes(Integer.MAX_VALUE));
case AVRO -> createAvroWriter(fileSystem, outputPath, POSITION_DELETE_SCHEMA, session);
case METADATA -> throw new IllegalArgumentException("Unexpected METADATA file format");
};
}

private static DeletionVectorWriter createDeletionVectorWriter(
NodeVersion nodeVersion,
TrinoFileSystem fileSystem,
Location outputPath,
String dataFilePath,
PartitionSpec partitionSpec,
Optional<PartitionData> partition,
Map<String, DeleteFileSet> previousDeleteFiles)
{
Function<CharSequence, PositionDeleteIndex> previousDeleteLoader = DeletionVectorWriter.create(fileSystem, previousDeleteFiles);
int positionChannel = POSITION_DELETE_SCHEMA.columns().indexOf(DELETE_FILE_POS);
checkState(positionChannel != -1, "positionChannel not found");
return new DeletionVectorWriter(nodeVersion, fileSystem, outputPath, dataFilePath, partitionSpec, partition, previousDeleteLoader::apply, positionChannel);
}

private IcebergFileWriter createParquetWriter(
MetricsConfig metricsConfig,
TrinoFileSystem fileSystem,
Expand Down Expand Up @@ -263,7 +234,6 @@ private IcebergFileWriter createOrcWriter(
}

return new IcebergOrcFileWriter(
outputPath,
metricsConfig,
icebergSchema,
orcDataSink,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.iceberg.Schema;
import org.apache.iceberg.io.LocationProvider;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.util.DeleteFileSet;
import org.roaringbitmap.longlong.ImmutableLongBitmapDataProvider;
import org.roaringbitmap.longlong.LongBitmapDataProvider;
import org.roaringbitmap.longlong.Roaring64Bitmap;
Expand All @@ -56,10 +55,8 @@ public class IcebergMergeSink
private final LocationProvider locationProvider;
private final IcebergFileWriterFactory fileWriterFactory;
private final TrinoFileSystem fileSystem;
private final Map<String, DeleteFileSet> previousDeleteFiles;
private final JsonCodec<CommitTaskData> jsonCodec;
private final ConnectorSession session;
private final int formatVersion;
private final IcebergFileFormat fileFormat;
private final Map<String, String> storageProperties;
private final Schema schema;
Expand All @@ -72,10 +69,8 @@ public IcebergMergeSink(
LocationProvider locationProvider,
IcebergFileWriterFactory fileWriterFactory,
TrinoFileSystem fileSystem,
Map<String, DeleteFileSet> previousDeleteFiles,
JsonCodec<CommitTaskData> jsonCodec,
ConnectorSession session,
int formatVersion,
IcebergFileFormat fileFormat,
Map<String, String> storageProperties,
Schema schema,
Expand All @@ -86,10 +81,8 @@ public IcebergMergeSink(
this.locationProvider = requireNonNull(locationProvider, "locationProvider is null");
this.fileWriterFactory = requireNonNull(fileWriterFactory, "fileWriterFactory is null");
this.fileSystem = requireNonNull(fileSystem, "fileSystem is null");
this.previousDeleteFiles = ImmutableMap.copyOf(previousDeleteFiles);
this.jsonCodec = requireNonNull(jsonCodec, "jsonCodec is null");
this.session = requireNonNull(session, "session is null");
this.formatVersion = formatVersion;
this.fileFormat = requireNonNull(fileFormat, "fileFormat is null");
this.storageProperties = ImmutableMap.copyOf(requireNonNull(storageProperties, "storageProperties is null"));
this.schema = requireNonNull(schema, "schema is null");
Expand Down Expand Up @@ -169,10 +162,8 @@ private PositionDeleteWriter createPositionDeleteWriter(String dataFilePath, Par
fileSystem,
jsonCodec,
session,
formatVersion,
fileFormat,
storageProperties,
previousDeleteFiles);
storageProperties);
}

private static Collection<Slice> writePositionDeletes(PositionDeleteWriter writer, ImmutableLongBitmapDataProvider rowsToDelete)
Expand Down
Loading