Skip to content

Commit 7217417

Browse files
authored
Core, Parquet, ORC: Fix missing data when writing unknown (#12581)
1 parent 21497fd commit 7217417

File tree

31 files changed

+226
-73
lines changed

31 files changed

+226
-73
lines changed

arrow/src/test/java/org/apache/iceberg/arrow/vectorized/ArrowReaderTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -874,7 +874,7 @@ private DataFile writeParquetFile(Table table, List<GenericRecord> records) thro
874874
FileAppender<GenericRecord> appender =
875875
Parquet.write(Files.localOutput(parquetFile))
876876
.schema(table.schema())
877-
.createWriterFunc(GenericParquetWriter::buildWriter)
877+
.createWriterFunc(GenericParquetWriter::create)
878878
.build();
879879
try {
880880
appender.addAll(records);

core/src/test/java/org/apache/iceberg/data/DataTest.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,11 @@ public void testTypeSchema(Type type) throws IOException {
148148
.as("variant is not yet implemented")
149149
.isTrue();
150150

151-
writeAndValidate(new Schema(required(1, "id", LongType.get()), optional(2, "test_type", type)));
151+
writeAndValidate(
152+
new Schema(
153+
required(1, "id", LongType.get()),
154+
optional(2, "test_type", type),
155+
required(3, "trailing_data", Types.StringType.get())));
152156
}
153157

154158
@Test

data/src/jmh/java/org/apache/iceberg/GenericParquetReaderBenchmark.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ protected CloseableIterable<Record> reader(File file, Schema schema) {
4040
protected FileAppender<Record> writer(File file, Schema schema) throws IOException {
4141
return Parquet.write(Files.localOutput(file))
4242
.schema(schema)
43-
.createWriterFunc(GenericParquetWriter::buildWriter)
43+
.createWriterFunc(GenericParquetWriter::create)
4444
.build();
4545
}
4646
}

data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ public FileAppender<Record> newAppender(
145145
case PARQUET:
146146
return Parquet.write(encryptedOutputFile)
147147
.schema(schema)
148-
.createWriterFunc(GenericParquetWriter::buildWriter)
148+
.createWriterFunc(GenericParquetWriter::create)
149149
.setAll(config)
150150
.metricsConfig(metricsConfig)
151151
.overwrite()
@@ -222,7 +222,7 @@ public EqualityDeleteWriter<Record> newEqDeleteWriter(
222222

223223
case PARQUET:
224224
return Parquet.writeDeletes(file)
225-
.createWriterFunc(GenericParquetWriter::buildWriter)
225+
.createWriterFunc(GenericParquetWriter::create)
226226
.withPartition(partition)
227227
.overwrite()
228228
.setAll(config)
@@ -276,7 +276,7 @@ public PositionDeleteWriter<Record> newPosDeleteWriter(
276276

277277
case PARQUET:
278278
return Parquet.writeDeletes(file)
279-
.createWriterFunc(GenericParquetWriter::buildWriter)
279+
.createWriterFunc(GenericParquetWriter::create)
280280
.withPartition(partition)
281281
.overwrite()
282282
.setAll(config)

data/src/main/java/org/apache/iceberg/data/GenericFileWriterFactory.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,17 +80,17 @@ protected void configurePositionDelete(Avro.DeleteWriteBuilder builder) {
8080

8181
@Override
8282
protected void configureDataWrite(Parquet.DataWriteBuilder builder) {
83-
builder.createWriterFunc(GenericParquetWriter::buildWriter);
83+
builder.createWriterFunc(GenericParquetWriter::create);
8484
}
8585

8686
@Override
8787
protected void configureEqualityDelete(Parquet.DeleteWriteBuilder builder) {
88-
builder.createWriterFunc(GenericParquetWriter::buildWriter);
88+
builder.createWriterFunc(GenericParquetWriter::create);
8989
}
9090

9191
@Override
9292
protected void configurePositionDelete(Parquet.DeleteWriteBuilder builder) {
93-
builder.createWriterFunc(GenericParquetWriter::buildWriter);
93+
builder.createWriterFunc(GenericParquetWriter::create);
9494
}
9595

9696
@Override

data/src/main/java/org/apache/iceberg/data/PartitionStatsHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ private static DataWriter<StructLike> dataWriter(
206206
case PARQUET:
207207
return Parquet.writeData(outputFile)
208208
.schema(dataSchema)
209-
.createWriterFunc(InternalWriter::create)
209+
.createWriterFunc(InternalWriter::createWriter)
210210
.withSpec(PartitionSpec.unpartitioned())
211211
.build();
212212
case AVRO:

data/src/test/java/org/apache/iceberg/data/TestMetricsRowGroupFilterTypes.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ public void createParquetInputFile(List<Record> records) throws IOException {
210210
try (FileAppender<Record> appender =
211211
Parquet.write(outFile)
212212
.schema(FILE_SCHEMA)
213-
.createWriterFunc(GenericParquetWriter::buildWriter)
213+
.createWriterFunc(GenericParquetWriter::create)
214214
.build()) {
215215
appender.addAll(records);
216216
}

data/src/test/java/org/apache/iceberg/data/parquet/TestGenericData.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ private void writeAndValidate(Schema writeSchema, Schema expectedSchema, List<Re
8686
try (FileAppender<Record> appender =
8787
Parquet.write(Files.localOutput(testFile))
8888
.schema(writeSchema)
89-
.createWriterFunc(GenericParquetWriter::buildWriter)
89+
.createWriterFunc(GenericParquetWriter::create)
9090
.build()) {
9191
appender.addAll(expected);
9292
}

data/src/test/java/org/apache/iceberg/data/parquet/TestGenericReadProjection.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ protected Record writeAndRead(String desc, Schema writeSchema, Schema readSchema
4040
try (FileAppender<Record> appender =
4141
Parquet.write(Files.localOutput(file))
4242
.schema(writeSchema)
43-
.createWriterFunc(GenericParquetWriter::buildWriter)
43+
.createWriterFunc(GenericParquetWriter::create)
4444
.build()) {
4545
appender.add(record);
4646
}

data/src/test/java/org/apache/iceberg/parquet/TestParquetMetrics.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ private Metrics getMetrics(
8080
Parquet.write(file)
8181
.schema(schema)
8282
.setAll(properties)
83-
.createWriterFunc(GenericParquetWriter::buildWriter)
83+
.createWriterFunc(GenericParquetWriter::create)
8484
.metricsConfig(metricsConfig)
8585
.build();
8686
try (FileAppender<Record> appender = writer) {

flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.flink.table.data.TimestampData;
3232
import org.apache.flink.table.types.logical.ArrayType;
3333
import org.apache.flink.table.types.logical.LogicalType;
34+
import org.apache.flink.table.types.logical.LogicalTypeRoot;
3435
import org.apache.flink.table.types.logical.MapType;
3536
import org.apache.flink.table.types.logical.RowType;
3637
import org.apache.flink.table.types.logical.RowType.RowField;
@@ -87,16 +88,22 @@ public ParquetValueWriter<?> message(
8788
@Override
8889
public ParquetValueWriter<?> struct(
8990
RowType sStruct, GroupType struct, List<ParquetValueWriter<?>> fieldWriters) {
90-
List<Type> fields = struct.getFields();
9191
List<RowField> flinkFields = sStruct.getFields();
9292
List<ParquetValueWriter<?>> writers = Lists.newArrayListWithExpectedSize(fieldWriters.size());
9393
List<LogicalType> flinkTypes = Lists.newArrayList();
94-
for (int i = 0; i < fields.size(); i += 1) {
95-
writers.add(newOption(struct.getType(i), fieldWriters.get(i)));
96-
flinkTypes.add(flinkFields.get(i).getType());
94+
int[] fieldIndexes = new int[fieldWriters.size()];
95+
int fieldIndex = 0;
96+
for (int i = 0; i < flinkFields.size(); i += 1) {
97+
LogicalType flinkType = flinkFields.get(i).getType();
98+
if (!flinkType.is(LogicalTypeRoot.NULL)) {
99+
writers.add(newOption(struct.getType(fieldIndex), fieldWriters.get(fieldIndex)));
100+
flinkTypes.add(flinkType);
101+
fieldIndexes[fieldIndex] = i;
102+
fieldIndex += 1;
103+
}
97104
}
98105

99-
return new RowDataWriter(writers, flinkTypes);
106+
return new RowDataWriter(fieldIndexes, writers, flinkTypes);
100107
}
101108

102109
@Override
@@ -584,11 +591,12 @@ public Map.Entry<K, V> next() {
584591
private static class RowDataWriter extends ParquetValueWriters.StructWriter<RowData> {
585592
private final RowData.FieldGetter[] fieldGetter;
586593

587-
RowDataWriter(List<ParquetValueWriter<?>> writers, List<LogicalType> types) {
594+
RowDataWriter(
595+
int[] fieldIndexes, List<ParquetValueWriter<?>> writers, List<LogicalType> types) {
588596
super(writers);
589597
fieldGetter = new RowData.FieldGetter[types.size()];
590598
for (int i = 0; i < types.size(); i += 1) {
591-
fieldGetter[i] = FlinkRowData.createFieldGetter(types.get(i), i);
599+
fieldGetter[i] = FlinkRowData.createFieldGetter(types.get(i), fieldIndexes[i]);
592600
}
593601
}
594602

flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ private void writeAndValidate(
222222
try (FileAppender<Record> writer =
223223
Parquet.write(Files.localOutput(testFile))
224224
.schema(writeSchema)
225-
.createWriterFunc(GenericParquetWriter::buildWriter)
225+
.createWriterFunc(GenericParquetWriter::create)
226226
.build()) {
227227
writer.addAll(iterable);
228228
}

flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.flink.table.data.TimestampData;
3232
import org.apache.flink.table.types.logical.ArrayType;
3333
import org.apache.flink.table.types.logical.LogicalType;
34+
import org.apache.flink.table.types.logical.LogicalTypeRoot;
3435
import org.apache.flink.table.types.logical.MapType;
3536
import org.apache.flink.table.types.logical.RowType;
3637
import org.apache.flink.table.types.logical.RowType.RowField;
@@ -87,16 +88,22 @@ public ParquetValueWriter<?> message(
8788
@Override
8889
public ParquetValueWriter<?> struct(
8990
RowType sStruct, GroupType struct, List<ParquetValueWriter<?>> fieldWriters) {
90-
List<Type> fields = struct.getFields();
9191
List<RowField> flinkFields = sStruct.getFields();
9292
List<ParquetValueWriter<?>> writers = Lists.newArrayListWithExpectedSize(fieldWriters.size());
9393
List<LogicalType> flinkTypes = Lists.newArrayList();
94-
for (int i = 0; i < fields.size(); i += 1) {
95-
writers.add(newOption(struct.getType(i), fieldWriters.get(i)));
96-
flinkTypes.add(flinkFields.get(i).getType());
94+
int[] fieldIndexes = new int[fieldWriters.size()];
95+
int fieldIndex = 0;
96+
for (int i = 0; i < flinkFields.size(); i += 1) {
97+
LogicalType flinkType = flinkFields.get(i).getType();
98+
if (!flinkType.is(LogicalTypeRoot.NULL)) {
99+
writers.add(newOption(struct.getType(fieldIndex), fieldWriters.get(fieldIndex)));
100+
flinkTypes.add(flinkType);
101+
fieldIndexes[fieldIndex] = i;
102+
fieldIndex += 1;
103+
}
97104
}
98105

99-
return new RowDataWriter(writers, flinkTypes);
106+
return new RowDataWriter(fieldIndexes, writers, flinkTypes);
100107
}
101108

102109
@Override
@@ -584,11 +591,12 @@ public Map.Entry<K, V> next() {
584591
private static class RowDataWriter extends ParquetValueWriters.StructWriter<RowData> {
585592
private final RowData.FieldGetter[] fieldGetter;
586593

587-
RowDataWriter(List<ParquetValueWriter<?>> writers, List<LogicalType> types) {
594+
RowDataWriter(
595+
int[] fieldIndexes, List<ParquetValueWriter<?>> writers, List<LogicalType> types) {
588596
super(writers);
589597
fieldGetter = new RowData.FieldGetter[types.size()];
590598
for (int i = 0; i < types.size(); i += 1) {
591-
fieldGetter[i] = FlinkRowData.createFieldGetter(types.get(i), i);
599+
fieldGetter[i] = FlinkRowData.createFieldGetter(types.get(i), fieldIndexes[i]);
592600
}
593601
}
594602

flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ private void writeAndValidate(
222222
try (FileAppender<Record> writer =
223223
Parquet.write(Files.localOutput(testFile))
224224
.schema(writeSchema)
225-
.createWriterFunc(GenericParquetWriter::buildWriter)
225+
.createWriterFunc(GenericParquetWriter::create)
226226
.build()) {
227227
writer.addAll(iterable);
228228
}

flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.flink.table.data.TimestampData;
3232
import org.apache.flink.table.types.logical.ArrayType;
3333
import org.apache.flink.table.types.logical.LogicalType;
34+
import org.apache.flink.table.types.logical.LogicalTypeRoot;
3435
import org.apache.flink.table.types.logical.MapType;
3536
import org.apache.flink.table.types.logical.RowType;
3637
import org.apache.flink.table.types.logical.RowType.RowField;
@@ -87,16 +88,22 @@ public ParquetValueWriter<?> message(
8788
@Override
8889
public ParquetValueWriter<?> struct(
8990
RowType sStruct, GroupType struct, List<ParquetValueWriter<?>> fieldWriters) {
90-
List<Type> fields = struct.getFields();
9191
List<RowField> flinkFields = sStruct.getFields();
9292
List<ParquetValueWriter<?>> writers = Lists.newArrayListWithExpectedSize(fieldWriters.size());
9393
List<LogicalType> flinkTypes = Lists.newArrayList();
94-
for (int i = 0; i < fields.size(); i += 1) {
95-
writers.add(newOption(struct.getType(i), fieldWriters.get(i)));
96-
flinkTypes.add(flinkFields.get(i).getType());
94+
int[] fieldIndexes = new int[fieldWriters.size()];
95+
int fieldIndex = 0;
96+
for (int i = 0; i < flinkFields.size(); i += 1) {
97+
LogicalType flinkType = flinkFields.get(i).getType();
98+
if (!flinkType.is(LogicalTypeRoot.NULL)) {
99+
writers.add(newOption(struct.getType(fieldIndex), fieldWriters.get(fieldIndex)));
100+
flinkTypes.add(flinkType);
101+
fieldIndexes[fieldIndex] = i;
102+
fieldIndex += 1;
103+
}
97104
}
98105

99-
return new RowDataWriter(writers, flinkTypes);
106+
return new RowDataWriter(fieldIndexes, writers, flinkTypes);
100107
}
101108

102109
@Override
@@ -584,11 +591,12 @@ public Map.Entry<K, V> next() {
584591
private static class RowDataWriter extends ParquetValueWriters.StructWriter<RowData> {
585592
private final RowData.FieldGetter[] fieldGetter;
586593

587-
RowDataWriter(List<ParquetValueWriter<?>> writers, List<LogicalType> types) {
594+
RowDataWriter(
595+
int[] fieldIndexes, List<ParquetValueWriter<?>> writers, List<LogicalType> types) {
588596
super(writers);
589597
fieldGetter = new RowData.FieldGetter[types.size()];
590598
for (int i = 0; i < types.size(); i += 1) {
591-
fieldGetter[i] = FlinkRowData.createFieldGetter(types.get(i), i);
599+
fieldGetter[i] = FlinkRowData.createFieldGetter(types.get(i), fieldIndexes[i]);
592600
}
593601
}
594602

flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ private void writeAndValidate(
222222
try (FileAppender<Record> writer =
223223
Parquet.write(Files.localOutput(testFile))
224224
.schema(writeSchema)
225-
.createWriterFunc(GenericParquetWriter::buildWriter)
225+
.createWriterFunc(GenericParquetWriter::create)
226226
.build()) {
227227
writer.addAll(iterable);
228228
}

orc/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public OrcValueWriter<Record> record(
6060
TypeDescription record,
6161
List<String> names,
6262
List<OrcValueWriter<?>> fields) {
63-
return new RecordWriter(fields);
63+
return new RecordWriter(iStruct, fields);
6464
}
6565

6666
@Override
@@ -156,8 +156,8 @@ public Stream<FieldMetrics<?>> metrics() {
156156

157157
private static class RecordWriter extends GenericOrcWriters.StructWriter<Record> {
158158

159-
RecordWriter(List<OrcValueWriter<?>> writers) {
160-
super(writers);
159+
RecordWriter(Types.StructType struct, List<OrcValueWriter<?>> writers) {
160+
super(struct, writers);
161161
}
162162

163163
@Override

orc/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import java.util.Map;
3535
import java.util.UUID;
3636
import java.util.function.Function;
37+
import java.util.stream.IntStream;
3738
import java.util.stream.Stream;
3839
import org.apache.iceberg.DoubleFieldMetrics;
3940
import org.apache.iceberg.FieldMetrics;
@@ -43,6 +44,8 @@
4344
import org.apache.iceberg.orc.OrcValueWriter;
4445
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
4546
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
47+
import org.apache.iceberg.types.Type;
48+
import org.apache.iceberg.types.Types;
4649
import org.apache.iceberg.util.ByteBuffers;
4750
import org.apache.iceberg.variants.Serialized;
4851
import org.apache.iceberg.variants.Variant;
@@ -579,9 +582,15 @@ public Stream<FieldMetrics<?>> metrics() {
579582
}
580583

581584
public abstract static class StructWriter<S> implements OrcValueWriter<S> {
585+
private final int[] fieldIndexes;
582586
private final List<OrcValueWriter<?>> writers;
583587

584588
protected StructWriter(List<OrcValueWriter<?>> writers) {
589+
this(null, writers);
590+
}
591+
592+
protected StructWriter(Types.StructType struct, List<OrcValueWriter<?>> writers) {
593+
this.fieldIndexes = writerToFieldIndex(struct, writers.size());
585594
this.writers = writers;
586595
}
587596

@@ -611,7 +620,7 @@ public void writeRow(S value, VectorizedRowBatch output) {
611620
private void write(int rowId, S value, Function<Integer, ColumnVector> colVectorAtFunc) {
612621
for (int c = 0; c < writers.size(); ++c) {
613622
OrcValueWriter writer = writers.get(c);
614-
writer.write(rowId, get(value, c), colVectorAtFunc.apply(c));
623+
writer.write(rowId, get(value, fieldIndexes[c]), colVectorAtFunc.apply(c));
615624
}
616625
}
617626

@@ -651,6 +660,27 @@ public void write(PositionDelete<T> row, VectorizedRowBatch output) throws IOExc
651660
}
652661
}
653662

663+
/** Returns a mapping from writer index to field index, skipping Unknown columns. */
664+
static int[] writerToFieldIndex(Types.StructType struct, int numWriters) {
665+
if (null == struct) {
666+
return IntStream.rangeClosed(0, numWriters).toArray();
667+
}
668+
669+
List<Types.NestedField> recordFields = struct.fields();
670+
671+
// value writer index to record field index
672+
int[] indexes = new int[numWriters];
673+
int writerIndex = 0;
674+
for (int pos = 0; pos < recordFields.size(); pos += 1) {
675+
if (recordFields.get(pos).type().typeId() != Type.TypeID.UNKNOWN) {
676+
indexes[writerIndex] = pos;
677+
writerIndex += 1;
678+
}
679+
}
680+
681+
return indexes;
682+
}
683+
654684
private static void growColumnVector(ColumnVector cv, int requestedSize) {
655685
if (cv.isNull.length < requestedSize) {
656686
// Use growth factor of 3 to avoid frequent array allocations

0 commit comments

Comments
 (0)