Skip to content

feat: upgrade arrow to version 50 #14784

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 26 commits into from
Mar 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4,917 changes: 2,537 additions & 2,380 deletions Cargo.lock

Large diffs are not rendered by default.

29 changes: 15 additions & 14 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ feature-set = { version = "0.1.1" }
geo = { version = "0.27.0", features = ["use-serde"] }
geozero = { version = "0.11.0", features = ["default", "with-wkb"] }
itertools = "0.10.5"
log = { version = "0.4.19", features = ["serde", "kv_unstable_std"] }
log = { version = "0.4.21", features = ["serde", "kv_unstable_std"] }
logcall = "0.1.5"
match-template = "0.0.1"
metrics = "0.20.1"
Expand Down Expand Up @@ -185,19 +185,19 @@ anyhow = { version = "1.0.65" }
thiserror = { version = "1" }

# Crates from arrow-rs
arrow = { version = "47.0.0" }
arrow-array = { version = "47.0.0" }
arrow-buffer = { version = "47.0.0" }
arrow-cast = { version = "47.0.0", features = ["prettyprint"] }
arrow-data = { version = "47.0.0" }
arrow-flight = { version = "47.0.0", features = ["flight-sql-experimental", "tls"] }
arrow = { version = "50" }
arrow-array = { version = "50" }
arrow-buffer = { version = "50" }
arrow-cast = { version = "50", features = ["prettyprint"] }
arrow-data = { version = "50" }
arrow-flight = { version = "50", features = ["flight-sql-experimental", "tls"] }
arrow-format = { version = "0.8.1", features = ["flight-data", "flight-service", "ipc"] }
arrow-ipc = { version = "47.0.0" }
arrow-ord = { version = "47.0.0" }
arrow-schema = { version = "47.0.0", features = ["serde"] }
arrow-select = { version = "47.0.0" }
parquet = { version = "47.0.0", features = ["async"] }
parquet_rs = { package = "parquet", version = "47.0.0" }
arrow-ipc = { version = "50" }
arrow-ord = { version = "50" }
arrow-schema = { version = "50", features = ["serde"] }
arrow-select = { version = "50" }
parquet = { version = "50", features = ["async"] }
parquet_rs = { package = "parquet", version = "50" }

# Serialization
prost = { version = "0.12.1" }
Expand Down Expand Up @@ -256,7 +256,8 @@ rpath = false
arrow-format = { git = "https://github.com/everpcpc/arrow-format", rev = "ad8f2dd" }
parquet2 = { git = "https://github.com/jorgecarleitao/parquet2", rev = "b0e6545" }
metrics = { git = "https://github.com/datafuse-extras/metrics.git", rev = "fc2ecd1" }
icelake = { git = "https://github.com/icelake-io/icelake", rev = "f06cdf3" }
icelake = { git = "https://github.com/icelake-io/icelake", rev = "54fd72f" }
sentry = { git = "https://github.com/getsentry/sentry-rust", rev = "6ef6d97" }
micromarshal = { git = "https://github.com/ariesdevil/opensrv", rev = "6c96813" }
async-backtrace = { git = "https://github.com/zhang2014/async-backtrace.git", rev = "dea4553" }
z3 = { git = "https://github.com/prove-rs/z3.rs", rev = "247d308" }
Expand Down
2 changes: 1 addition & 1 deletion src/bendpy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ databend-query = { path = "../query/service", features = [

# # Crates.io dependencies
ctor = "0.2.5"
pyo3 = { version = "0.19.1", features = ["extension-module", "abi3", "abi3-py37"] }
pyo3 = { version = "0.20", features = ["extension-module", "abi3", "abi3-py37"] }
tokio = { workspace = true, features = ["macros", "rt", "rt-multi-thread", "sync"] }
tokio-stream = { workspace = true }
uuid = { workspace = true }
2 changes: 1 addition & 1 deletion src/bendpy/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ impl PyDataFrame {
.into_iter()
.map(|block| {
block
.to_record_batch(self.df.schema().as_ref())
.to_record_batch_with_dataschema(self.df.schema().as_ref())
.unwrap()
.to_pyarrow(py)
})
Expand Down
2 changes: 1 addition & 1 deletion src/common/arrow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ ethnum = { workspace = true }
simdutf8 = "0.1.4"

# A Rust port of SwissTable
hashbrown = { version = "0.14", default-features = false, features = ["ahash"] }
hashbrown = { version = "0.14.3", default-features = false, features = ["ahash"] }

# for timezone support
chrono-tz = { workspace = true, optional = true }
Expand Down
2 changes: 1 addition & 1 deletion src/common/cache/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ heapsize = ["heapsize_"]

# Crates.io dependencies
bytes = { workspace = true }
hashbrown = "0.14"
hashbrown = "0.14.3"
hashlink = "0.8"

[target.'cfg(not(target_os = "macos"))'.dependencies]
Expand Down
25 changes: 12 additions & 13 deletions src/meta/api/src/background_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ use databend_common_meta_types::MetaError;
use databend_common_meta_types::MetaSpec;
use databend_common_meta_types::Operation;
use databend_common_meta_types::TxnRequest;
use log::as_debug;
use log::debug;
use minitrace::func_name;

Expand Down Expand Up @@ -77,7 +76,7 @@ impl<KV: kvapi::KVApi<Error = MetaError>> BackgroundApi for KV {
&self,
req: CreateBackgroundJobReq,
) -> Result<CreateBackgroundJobReply, KVAppError> {
debug!(req = as_debug!(&req); "BackgroundApi: {}", func_name!());
debug!(req :? =(&req); "BackgroundApi: {}", func_name!());

let name_key = &req.job_name;

Expand All @@ -87,7 +86,7 @@ impl<KV: kvapi::KVApi<Error = MetaError>> BackgroundApi for KV {

// Get db mask by name to ensure absence
let (seq, id) = get_u64_value(self, name_key).await?;
debug!(seq = seq, id = id, name_key = as_debug!(name_key); "create_background_job");
debug!(seq = seq, id = id, name_key :? =(name_key); "create_background_job");

if seq > 0 {
return if req.if_not_exists {
Expand All @@ -106,8 +105,8 @@ impl<KV: kvapi::KVApi<Error = MetaError>> BackgroundApi for KV {
let id_key = BackgroundJobId { id };

debug!(
id = as_debug!(&id_key),
name_key = as_debug!(name_key);
id :? =(&id_key),
name_key :? =(name_key);
"new backgroundjob id"
);

Expand All @@ -128,8 +127,8 @@ impl<KV: kvapi::KVApi<Error = MetaError>> BackgroundApi for KV {
let (succ, _responses) = send_txn(self, txn_req).await?;

debug!(
name = as_debug!(name_key),
id = as_debug!(&id_key),
name :? =(name_key),
id :? =(&id_key),
succ = succ;
"create_background_job"
);
Expand Down Expand Up @@ -192,7 +191,7 @@ impl<KV: kvapi::KVApi<Error = MetaError>> BackgroundApi for KV {
&self,
req: GetBackgroundJobReq,
) -> Result<GetBackgroundJobReply, KVAppError> {
debug!(req = as_debug!(&req); "BackgroundApi: {}", func_name!());
debug!(req :? =(&req); "BackgroundApi: {}", func_name!());

let name_key = &req.name;

Expand Down Expand Up @@ -232,9 +231,9 @@ impl<KV: kvapi::KVApi<Error = MetaError>> BackgroundApi for KV {
&self,
req: UpdateBackgroundTaskReq,
) -> Result<UpdateBackgroundTaskReply, KVAppError> {
debug!(req = as_debug!(&req); "BackgroundApi: {}", func_name!());
debug!(req :? =(&req); "BackgroundApi: {}", func_name!());
let name_key = &req.task_name;
debug!(name_key = as_debug!(name_key); "update_background_task");
debug!(name_key :? =(name_key); "update_background_task");

let meta = req.task_info.clone();

Expand Down Expand Up @@ -281,7 +280,7 @@ impl<KV: kvapi::KVApi<Error = MetaError>> BackgroundApi for KV {
req: GetBackgroundTaskReq,
) -> Result<GetBackgroundTaskReply, KVAppError> {
debug!(
req = as_debug!(&req);
req :? =(&req);
"BackgroundTaskApi: {}",
func_name!()
);
Expand Down Expand Up @@ -325,7 +324,7 @@ pub fn background_job_has_to_exist(
name_ident: &BackgroundJobIdent,
) -> Result<(), KVAppError> {
if seq == 0 {
debug!(seq = seq, name_ident = as_debug!(name_ident); "background job does not exist");
debug!(seq = seq, name_ident :? =(name_ident); "background job does not exist");
Err(KVAppError::AppError(AppError::UnknownBackgroundJob(
UnknownBackgroundJob::new(&name_ident.name, format!("{:?}", name_ident)),
)))
Expand Down Expand Up @@ -355,7 +354,7 @@ async fn update_background_job<F: FnOnce(&mut BackgroundJobInfo) -> bool>(
name: &BackgroundJobIdent,
mutation: F,
) -> Result<UpdateBackgroundJobReply, KVAppError> {
debug!(req = as_debug!(name); "BackgroundApi: {}", func_name!());
debug!(req :? =(name); "BackgroundApi: {}", func_name!());
let (id, id_val_seq, mut info) =
get_background_job_or_error(kv_api, name, "update_background_job").await?;
let should_update = mutation(&mut info);
Expand Down
23 changes: 11 additions & 12 deletions src/meta/api/src/data_mask_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ use databend_common_meta_types::MetaError;
use databend_common_meta_types::TxnCondition;
use databend_common_meta_types::TxnOp;
use databend_common_meta_types::TxnRequest;
use log::as_debug;
use log::debug;
use minitrace::func_name;

Expand All @@ -63,7 +62,7 @@ impl<KV: kvapi::KVApi<Error = MetaError>> DatamaskApi for KV {
&self,
req: CreateDatamaskReq,
) -> Result<CreateDatamaskReply, KVAppError> {
debug!(req = as_debug!(&req); "DatamaskApi: {}", func_name!());
debug!(req :? =(&req); "DatamaskApi: {}", func_name!());

let name_key = &req.name;

Expand All @@ -73,7 +72,7 @@ impl<KV: kvapi::KVApi<Error = MetaError>> DatamaskApi for KV {

// Get db mask by name to ensure absence
let (seq, id) = get_u64_value(self, name_key).await?;
debug!(seq = seq, id = id, name_key = as_debug!(name_key); "create_data_mask");
debug!(seq = seq, id = id, name_key :? =(name_key); "create_data_mask");

let mut condition = vec![];
let mut if_then = vec![];
Expand Down Expand Up @@ -117,8 +116,8 @@ impl<KV: kvapi::KVApi<Error = MetaError>> DatamaskApi for KV {
};

debug!(
id = as_debug!(&id_key),
name_key = as_debug!(name_key);
id :? =(&id_key),
name_key :? =(name_key);
"new datamask id"
);

Expand All @@ -141,8 +140,8 @@ impl<KV: kvapi::KVApi<Error = MetaError>> DatamaskApi for KV {
let (succ, _responses) = send_txn(self, txn_req).await?;

debug!(
name = as_debug!(name_key),
id = as_debug!(&id_key),
name :? =(name_key),
id :? =(&id_key),
succ = succ;
"create_data_mask"
);
Expand All @@ -157,7 +156,7 @@ impl<KV: kvapi::KVApi<Error = MetaError>> DatamaskApi for KV {
}

async fn drop_data_mask(&self, req: DropDatamaskReq) -> Result<DropDatamaskReply, KVAppError> {
debug!(req = as_debug!(&req); "DatamaskApi: {}", func_name!());
debug!(req :? =(&req); "DatamaskApi: {}", func_name!());

let name_key = &req.name;

Expand Down Expand Up @@ -199,7 +198,7 @@ impl<KV: kvapi::KVApi<Error = MetaError>> DatamaskApi for KV {
}

async fn get_data_mask(&self, req: GetDatamaskReq) -> Result<GetDatamaskReply, KVAppError> {
debug!(req = as_debug!(&req); "DatamaskApi: {}", func_name!());
debug!(req :? =(&req); "DatamaskApi: {}", func_name!());

let name_key = &req.name;

Expand Down Expand Up @@ -242,7 +241,7 @@ pub fn data_mask_has_to_exist(
msg: impl Display,
) -> Result<(), KVAppError> {
if seq == 0 {
debug!(seq = seq, name_ident = as_debug!(name_ident); "data mask does not exist");
debug!(seq = seq, name_ident :? =(name_ident); "data mask does not exist");

Err(KVAppError::AppError(AppError::UnknownDatamask(
UnknownDatamask::new(&name_ident.name, format!("{}: {}", msg, name_ident)),
Expand Down Expand Up @@ -329,8 +328,8 @@ async fn construct_drop_mask_policy_operations(
}

debug!(
name = as_debug!(name_key),
id = as_debug!(&DatamaskId { id }),
name :? =(name_key),
id :? =(&DatamaskId { id }),
ctx = ctx;
"construct_drop_mask_policy_operations"
);
Expand Down
Loading