From 0fbdf33a3273d8a5573a9d7860cb1d70eef8dc4c Mon Sep 17 00:00:00 2001 From: liurenjie1024 Date: Thu, 24 Apr 2025 15:18:24 +0800 Subject: [PATCH 1/2] Introduce scheduler for sqllogictests Signed-off-by: liurenjie1024 --- Cargo.lock | 2 + crates/sqllogictest/Cargo.toml | 2 + crates/sqllogictest/src/engine/datafusion.rs | 24 ++--- crates/sqllogictest/src/engine/mod.rs | 35 ++++++- crates/sqllogictest/src/lib.rs | 2 + crates/sqllogictest/src/schedule.rs | 105 +++++++++++++++++++ 6 files changed, 156 insertions(+), 14 deletions(-) create mode 100644 crates/sqllogictest/src/schedule.rs diff --git a/Cargo.lock b/Cargo.lock index 4411365b0..fed28f15a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3458,8 +3458,10 @@ dependencies = [ "datafusion-sqllogictest", "enum-ordinalize", "indicatif", + "serde", "sqllogictest", "toml", + "tracing", ] [[package]] diff --git a/crates/sqllogictest/Cargo.toml b/crates/sqllogictest/Cargo.toml index d11a1a87f..9aba00710 100644 --- a/crates/sqllogictest/Cargo.toml +++ b/crates/sqllogictest/Cargo.toml @@ -33,6 +33,8 @@ datafusion-sqllogictest = {workspace = true} enum-ordinalize = {workspace = true} sqllogictest = {workspace = true} indicatif = {workspace = true} +serde = {workspace = true} +tracing = {workspace = true} [package.metadata.cargo-machete] # These dependencies are added to ensure minimal dependency version diff --git a/crates/sqllogictest/src/engine/datafusion.rs b/crates/sqllogictest/src/engine/datafusion.rs index bddc59d90..0b4799006 100644 --- a/crates/sqllogictest/src/engine/datafusion.rs +++ b/crates/sqllogictest/src/engine/datafusion.rs @@ -26,7 +26,7 @@ use indicatif::ProgressBar; use sqllogictest::runner::AsyncDB; use toml::Table as TomlTable; -use crate::engine::Engine; +use crate::engine::EngineRunner; use crate::error::Result; pub struct DataFusionEngine { @@ -34,17 +34,7 @@ pub struct DataFusionEngine { } #[async_trait::async_trait] -impl Engine for DataFusionEngine { - async fn new(config: TomlTable) -> Result { - let session_config = SessionConfig::new().with_target_partitions(4); - let ctx = SessionContext::new_with_config(session_config); - ctx.register_catalog("default", Self::create_catalog(&config).await?); - - Ok(Self { - datafusion: DataFusion::new(ctx, PathBuf::from("testdata"), ProgressBar::new(100)), - }) - } - +impl EngineRunner for DataFusionEngine { async fn run_slt_file(&mut self, path: &Path) -> Result<()> { let content = std::fs::read_to_string(path) .with_context(|| format!("Failed to read slt file {:?}", path)) @@ -61,6 +51,16 @@ impl Engine for DataFusionEngine { } impl DataFusionEngine { + pub async fn new(config: TomlTable) -> Result { + let session_config = SessionConfig::new().with_target_partitions(4); + let ctx = SessionContext::new_with_config(session_config); + ctx.register_catalog("default", Self::create_catalog(&config).await?); + + Ok(Self { + datafusion: DataFusion::new(ctx, PathBuf::from("testdata"), ProgressBar::new(100)), + }) + } + async fn create_catalog(_: &TomlTable) -> anyhow::Result> { todo!() } diff --git a/crates/sqllogictest/src/engine/mod.rs b/crates/sqllogictest/src/engine/mod.rs index 61722f663..0d576223e 100644 --- a/crates/sqllogictest/src/engine/mod.rs +++ b/crates/sqllogictest/src/engine/mod.rs @@ -21,10 +21,41 @@ use std::path::Path; use toml::Table as TomlTable; +use crate::engine::datafusion::DataFusionEngine; use crate::error::Result; +const KEY_TYPE: &str = "type"; +const TYPE_DATAFUSION: &str = "datafusion"; + #[async_trait::async_trait] -pub trait Engine: Sized { - async fn new(config: TomlTable) -> Result; +pub trait EngineRunner: Sized { async fn run_slt_file(&mut self, path: &Path) -> Result<()>; } + +pub enum Engine { + DataFusion(DataFusionEngine), +} + +impl Engine { + pub async fn new(config: TomlTable) -> Result { + let engine_type = config + .get(KEY_TYPE) + .ok_or_else(|| anyhow::anyhow!("Missing required key: {KEY_TYPE}"))? + .as_str() + .ok_or_else(|| anyhow::anyhow!("Config value for {KEY_TYPE} must be a string"))?; + + match engine_type { + TYPE_DATAFUSION => { + let engine = DataFusionEngine::new(config).await?; + Ok(Engine::DataFusion(engine)) + } + _ => Err(anyhow::anyhow!("Unsupported engine type: {engine_type}").into()), + } + } + + pub async fn run_slt_file(&mut self, path: &Path) -> Result<()> { + match self { + Engine::DataFusion(engine) => engine.run_slt_file(path).await, + } + } +} diff --git a/crates/sqllogictest/src/lib.rs b/crates/sqllogictest/src/lib.rs index c72d50c42..7b17727d1 100644 --- a/crates/sqllogictest/src/lib.rs +++ b/crates/sqllogictest/src/lib.rs @@ -22,3 +22,5 @@ mod engine; #[allow(dead_code)] mod error; +#[allow(dead_code)] +mod schedule; diff --git a/crates/sqllogictest/src/schedule.rs b/crates/sqllogictest/src/schedule.rs new file mode 100644 index 000000000..f1dedf9a8 --- /dev/null +++ b/crates/sqllogictest/src/schedule.rs @@ -0,0 +1,105 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; +use std::path::PathBuf; + +use serde::{Deserialize, Serialize}; + +use crate::engine::Engine; + +pub struct Schedule { + engines: HashMap, + steps: Vec, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct Step { + /// Engine name + engine: String, + /// Stl file path + slt: String, +} + +impl Schedule { + pub fn new(engines: HashMap, steps: Vec) -> Self { + Self { engines, steps } + } + + pub async fn run(mut self) -> anyhow::Result<()> { + for (idx, step) in self.steps.iter().enumerate() { + tracing::info!( + "Running step {}/{}, using engine {}, slt file path: {}", + idx + 1, + self.steps.len(), + &step.engine, + &step.slt + ); + + let engine = self + .engines + .get_mut(&step.engine) + .ok_or_else(|| anyhow::anyhow!("Engine {} not found", step.engine))?; + + engine + .run_slt_file(&PathBuf::from(step.slt.clone())) + .await?; + tracing::info!( + "Step {}/{}, engine {}, slt file path: {} finished", + idx + 1, + self.steps.len(), + &step.engine, + &step.slt + ); + } + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use toml::Table as TomlTable; + + use crate::schedule::Step; + + #[test] + fn test_parse_steps() { + let steps = r#" + [[steps]] + engine = "datafusion" + slt = "test.slt" + + [[steps]] + engine = "spark" + slt = "test2.slt" + "#; + + let steps: Vec = toml::from_str::(steps) + .unwrap() + .get("steps") + .unwrap() + .clone() + .try_into() + .unwrap(); + + assert_eq!(steps.len(), 2); + assert_eq!(steps[0].engine, "datafusion"); + assert_eq!(steps[0].slt, "test.slt"); + assert_eq!(steps[1].engine, "spark"); + assert_eq!(steps[1].slt, "test2.slt"); + } +} From 12c45696f0c8718e68d0c54db28b50ac06618425 Mon Sep 17 00:00:00 2001 From: liurenjie1024 Date: Fri, 25 Apr 2025 11:33:53 +0800 Subject: [PATCH 2/2] Fix comments --- crates/sqllogictest/src/schedule.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/crates/sqllogictest/src/schedule.rs b/crates/sqllogictest/src/schedule.rs index f1dedf9a8..e448e8164 100644 --- a/crates/sqllogictest/src/schedule.rs +++ b/crates/sqllogictest/src/schedule.rs @@ -31,7 +31,7 @@ pub struct Schedule { pub struct Step { /// Engine name engine: String, - /// Stl file path + /// Slt file path slt: String, } @@ -55,9 +55,7 @@ impl Schedule { .get_mut(&step.engine) .ok_or_else(|| anyhow::anyhow!("Engine {} not found", step.engine))?; - engine - .run_slt_file(&PathBuf::from(step.slt.clone())) - .await?; + engine.run_slt_file(&PathBuf::from(&step.slt)).await?; tracing::info!( "Step {}/{}, engine {}, slt file path: {} finished", idx + 1,