@@ -29,7 +29,7 @@ use arrow::{
29
29
} ;
30
30
use arrow_flight:: {
31
31
flight_descriptor:: DescriptorType , flight_service_client:: FlightServiceClient ,
32
- utils:: flight_data_to_arrow_batch, FlightData , FlightDescriptor , Location , SchemaAsIpc , Ticket ,
32
+ utils:: flight_data_to_arrow_batch, FlightData , FlightDescriptor , IpcMessage , Location , Ticket ,
33
33
} ;
34
34
use futures:: { channel:: mpsc, sink:: SinkExt , stream, StreamExt } ;
35
35
use tonic:: { Request , Streaming } ;
@@ -72,7 +72,19 @@ async fn upload_data(
72
72
let ( mut upload_tx, upload_rx) = mpsc:: channel ( 10 ) ;
73
73
74
74
let options = arrow:: ipc:: writer:: IpcWriteOptions :: default ( ) ;
75
- let mut schema_flight_data: FlightData = SchemaAsIpc :: new ( & schema, & options) . into ( ) ;
75
+ let mut dict_tracker =
76
+ writer:: DictionaryTracker :: new_with_preserve_dict_id ( false , options. preserve_dict_id ( ) ) ;
77
+ let data_gen = writer:: IpcDataGenerator :: default ( ) ;
78
+ let data = IpcMessage (
79
+ data_gen
80
+ . schema_to_bytes_with_dictionary_tracker ( & schema, & mut dict_tracker, & options)
81
+ . ipc_message
82
+ . into ( ) ,
83
+ ) ;
84
+ let mut schema_flight_data = FlightData {
85
+ data_header : data. 0 ,
86
+ ..Default :: default ( )
87
+ } ;
76
88
// arrow_flight::utils::flight_data_from_arrow_schema(&schema, &options);
77
89
schema_flight_data. flight_descriptor = Some ( descriptor. clone ( ) ) ;
78
90
upload_tx. send ( schema_flight_data) . await ?;
@@ -82,7 +94,14 @@ async fn upload_data(
82
94
if let Some ( ( counter, first_batch) ) = original_data_iter. next ( ) {
83
95
let metadata = counter. to_string ( ) . into_bytes ( ) ;
84
96
// Preload the first batch into the channel before starting the request
85
- send_batch ( & mut upload_tx, & metadata, first_batch, & options) . await ?;
97
+ send_batch (
98
+ & mut upload_tx,
99
+ & metadata,
100
+ first_batch,
101
+ & options,
102
+ & mut dict_tracker,
103
+ )
104
+ . await ?;
86
105
87
106
let outer = client. do_put ( Request :: new ( upload_rx) ) . await ?;
88
107
let mut inner = outer. into_inner ( ) ;
@@ -97,7 +116,14 @@ async fn upload_data(
97
116
// Stream the rest of the batches
98
117
for ( counter, batch) in original_data_iter {
99
118
let metadata = counter. to_string ( ) . into_bytes ( ) ;
100
- send_batch ( & mut upload_tx, & metadata, batch, & options) . await ?;
119
+ send_batch (
120
+ & mut upload_tx,
121
+ & metadata,
122
+ batch,
123
+ & options,
124
+ & mut dict_tracker,
125
+ )
126
+ . await ?;
101
127
102
128
let r = inner
103
129
. next ( )
@@ -124,12 +150,12 @@ async fn send_batch(
124
150
metadata : & [ u8 ] ,
125
151
batch : & RecordBatch ,
126
152
options : & writer:: IpcWriteOptions ,
153
+ dictionary_tracker : & mut writer:: DictionaryTracker ,
127
154
) -> Result {
128
155
let data_gen = writer:: IpcDataGenerator :: default ( ) ;
129
- let mut dictionary_tracker = writer:: DictionaryTracker :: new_with_preserve_dict_id ( false , true ) ;
130
156
131
157
let ( encoded_dictionaries, encoded_batch) = data_gen
132
- . encoded_batch ( batch, & mut dictionary_tracker, options)
158
+ . encoded_batch ( batch, dictionary_tracker, options)
133
159
. expect ( "DictionaryTracker configured above to not error on replacement" ) ;
134
160
135
161
let dictionary_flight_data: Vec < FlightData > =
0 commit comments