Skip to content

Commit 27288a0

Browse files
authored
fix: Fix range out of index error with a temporary workaround (#584)
* fix: Fix range out of index error by using custom arrow-rs repo * Add custom Java Arrow classes * Add a hack * Update * Update * Update to use apache/arrow-rs#5958 * Use tustvold's branch * Use official arrow-rs repo
1 parent 0d994d0 commit 27288a0

File tree

5 files changed

+592
-68
lines changed

5 files changed

+592
-68
lines changed

common/src/main/java/org/apache/arrow/c/ArrowImporter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,8 @@ public FieldVector importVector(
5555
ArrowArray array, ArrowSchema schema, CDataDictionaryProvider provider) {
5656
Field field = importField(schema, provider);
5757
FieldVector vector = field.createVector(allocator);
58-
Data.importIntoVector(allocator, array, vector, provider);
58+
CometArrayImporter importer = new CometArrayImporter(allocator, vector, provider);
59+
importer.importArray(array);
5960
return vector;
6061
}
6162
}
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.arrow.c;
21+
22+
import java.util.Collections;
23+
import java.util.List;
24+
25+
import org.apache.arrow.memory.ArrowBuf;
26+
import org.apache.arrow.memory.BufferAllocator;
27+
import org.apache.arrow.util.Preconditions;
28+
import org.apache.arrow.vector.FieldVector;
29+
import org.apache.arrow.vector.dictionary.Dictionary;
30+
import org.apache.arrow.vector.dictionary.DictionaryProvider;
31+
import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
32+
import org.apache.arrow.vector.types.pojo.DictionaryEncoding;
33+
34+
import static org.apache.arrow.c.NativeUtil.NULL;
35+
import static org.apache.arrow.memory.util.LargeMemoryUtil.checkedCastToInt;
36+
import static org.apache.arrow.util.Preconditions.checkNotNull;
37+
import static org.apache.arrow.util.Preconditions.checkState;
38+
39+
/**
40+
* Importer for {@link ArrowArray}. We copy it from Arrow `ArrayImporter` because we need to use
41+
* `CometBufferImportTypeVisitor` instead of Arrow `BufferImportTypeVisitor`.
42+
*/
43+
final class CometArrayImporter {
44+
private static final int MAX_IMPORT_RECURSION_LEVEL = 64;
45+
46+
private final BufferAllocator allocator;
47+
private final FieldVector vector;
48+
private final DictionaryProvider dictionaryProvider;
49+
50+
private ReferenceCountedArrowArray underlyingAllocation;
51+
private int recursionLevel;
52+
53+
CometArrayImporter(
54+
BufferAllocator allocator, FieldVector vector, DictionaryProvider dictionaryProvider) {
55+
this.allocator = Preconditions.checkNotNull(allocator);
56+
this.vector = Preconditions.checkNotNull(vector);
57+
this.dictionaryProvider = dictionaryProvider;
58+
}
59+
60+
void importArray(ArrowArray src) {
61+
ArrowArray.Snapshot snapshot = src.snapshot();
62+
checkState(snapshot.release != NULL, "Cannot import released ArrowArray");
63+
64+
// Move imported array
65+
ArrowArray ownedArray = ArrowArray.allocateNew(allocator);
66+
ownedArray.save(snapshot);
67+
src.markReleased();
68+
src.close();
69+
70+
recursionLevel = 0;
71+
72+
// This keeps the array alive as long as there are any buffers that need it
73+
underlyingAllocation = new ReferenceCountedArrowArray(ownedArray);
74+
try {
75+
doImport(snapshot);
76+
} finally {
77+
underlyingAllocation.release();
78+
}
79+
}
80+
81+
private void importChild(CometArrayImporter parent, ArrowArray src) {
82+
ArrowArray.Snapshot snapshot = src.snapshot();
83+
checkState(snapshot.release != NULL, "Cannot import released ArrowArray");
84+
recursionLevel = parent.recursionLevel + 1;
85+
checkState(
86+
recursionLevel <= MAX_IMPORT_RECURSION_LEVEL,
87+
"Recursion level in ArrowArray struct exceeded");
88+
// Child buffers will keep the entire parent import alive.
89+
underlyingAllocation = parent.underlyingAllocation;
90+
doImport(snapshot);
91+
}
92+
93+
private void doImport(ArrowArray.Snapshot snapshot) {
94+
// First import children (required for reconstituting parent array data)
95+
long[] children =
96+
NativeUtil.toJavaArray(snapshot.children, checkedCastToInt(snapshot.n_children));
97+
if (children != null && children.length > 0) {
98+
List<FieldVector> childVectors = vector.getChildrenFromFields();
99+
checkState(
100+
children.length == childVectors.size(),
101+
"ArrowArray struct has %s children (expected %s)",
102+
children.length,
103+
childVectors.size());
104+
for (int i = 0; i < children.length; i++) {
105+
checkState(children[i] != NULL, "ArrowArray struct has NULL child at position %s", i);
106+
CometArrayImporter childImporter =
107+
new CometArrayImporter(allocator, childVectors.get(i), dictionaryProvider);
108+
childImporter.importChild(this, ArrowArray.wrap(children[i]));
109+
}
110+
}
111+
112+
// Handle import of a dictionary encoded vector
113+
if (snapshot.dictionary != NULL) {
114+
DictionaryEncoding encoding = vector.getField().getDictionary();
115+
checkNotNull(encoding, "Missing encoding on import of ArrowArray with dictionary");
116+
117+
Dictionary dictionary = dictionaryProvider.lookup(encoding.getId());
118+
checkNotNull(dictionary, "Dictionary lookup failed on import of ArrowArray with dictionary");
119+
120+
// reset the dictionary vector to the initial state
121+
dictionary.getVector().clear();
122+
123+
CometArrayImporter dictionaryImporter =
124+
new CometArrayImporter(allocator, dictionary.getVector(), dictionaryProvider);
125+
dictionaryImporter.importChild(this, ArrowArray.wrap(snapshot.dictionary));
126+
}
127+
128+
// Import main data
129+
ArrowFieldNode fieldNode = new ArrowFieldNode(snapshot.length, snapshot.null_count);
130+
long[] bufferPointers =
131+
NativeUtil.toJavaArray(snapshot.buffers, checkedCastToInt(snapshot.n_buffers));
132+
133+
try (final CometBufferImportTypeVisitor visitor =
134+
new CometBufferImportTypeVisitor(
135+
allocator, underlyingAllocation, fieldNode, snapshot, bufferPointers)) {
136+
final List<ArrowBuf> buffers;
137+
if (bufferPointers == null || bufferPointers.length == 0) {
138+
buffers = Collections.emptyList();
139+
} else {
140+
buffers = vector.getField().getType().accept(visitor);
141+
}
142+
vector.loadFieldBuffers(fieldNode, buffers);
143+
} catch (Exception e) {
144+
throw new IllegalArgumentException(
145+
"Could not load buffers for field "
146+
+ vector.getField()
147+
+ ". error message: "
148+
+ e.getMessage(),
149+
e);
150+
}
151+
}
152+
}

0 commit comments

Comments
 (0)