Skip to content

Add H2O.ai Database-like Ops benchmark to dfbench (join support) #14902

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Feb 28, 2025

Conversation

zhuqi-lucas
Copy link
Contributor

Which issue does this PR close?

Rationale for this change

We have supported group by in
#7209
PR:
#13996

This ticket we will support join for H2O.ai Database-like Ops benchmark to dfbench

The upstream just merged the join data generation:
mrpowers-io/falsa#24

What changes are included in this PR?

Add join support

Are these changes tested?

yes

Are there any user-facing changes?

Add new join support, we need add a follow-up to change the doc also

@zhuqi-lucas
Copy link
Contributor Author

zhuqi-lucas commented Feb 26, 2025

Try to reproduce the OOM issue for join with datafusion:
#13765

But current main branch, our join passed! It takes about 50s when i setting the ,--mem-pool-type fair --memory-limit 16G it's a good result! cc @alamb @2010YOUY01

The data generation will take long time for big data.

./bench.sh data h2o_big_join
cargo run --release --bin dfbench -- h2o --mem-pool-type fair --memory-limit 16G  --iterations 3 --join-paths /Users/zhuqi/arrow-datafusion/benchmarks/data/h2o/J1_1e9_NA_0.csv,/Users/zhuqi/arrow-datafusion/benchmarks/data/h2o/J1_1e9_1e3_0.csv,/Users/zhuqi/arrow-datafusion/benchmarks/data/h2o/J1_1e9_1e6_0.csv,/Users/zhuqi/arrow-datafusion/benchmarks/data/h2o/J1_1e9_1e9_NA.csv --queries-path /Users/zhuqi/arrow-datafusion/benchmarks/queries/h2o/join.sql -o /Users/zhuqi/arrow-datafusion/benchmarks/results/issue_14867/h2o_join.json
    Finished `release` profile [optimized] target(s) in 0.19s
     Running `/Users/zhuqi/arrow-datafusion/target/release/dfbench h2o --mem-pool-type fair --memory-limit 16G --iterations 3 --join-paths /Users/zhuqi/arrow-datafusion/benchmarks/data/h2o/J1_1e9_NA_0.csv,/Users/zhuqi/arrow-datafusion/benchmarks/data/h2o/J1_1e9_1e3_0.csv,/Users/zhuqi/arrow-datafusion/benchmarks/data/h2o/J1_1e9_1e6_0.csv,/Users/zhuqi/arrow-datafusion/benchmarks/data/h2o/J1_1e9_1e9_NA.csv --queries-path /Users/zhuqi/arrow-datafusion/benchmarks/queries/h2o/join.sql -o /Users/zhuqi/arrow-datafusion/benchmarks/results/issue_14867/h2o_join.json`
Running benchmarks with the following options: RunOpt { query: None, common: CommonOpt { iterations: 3, partitions: None, batch_size: 8192, mem_pool_type: "fair", memory_limit: Some(17179869184), sort_spill_reservation_bytes: None, debug: false }, queries_path: "/Users/zhuqi/arrow-datafusion/benchmarks/queries/h2o/join.sql", path: "benchmarks/data/h2o/G1_1e7_1e7_100_0.csv", join_paths: "/Users/zhuqi/arrow-datafusion/benchmarks/data/h2o/J1_1e9_NA_0.csv,/Users/zhuqi/arrow-datafusion/benchmarks/data/h2o/J1_1e9_1e3_0.csv,/Users/zhuqi/arrow-datafusion/benchmarks/data/h2o/J1_1e9_1e6_0.csv,/Users/zhuqi/arrow-datafusion/benchmarks/data/h2o/J1_1e9_1e9_NA.csv", output_path: Some("/Users/zhuqi/arrow-datafusion/benchmarks/results/issue_14867/h2o_join.json") }
Q1: SELECT x.id1, x.id2, x.id3, x.id4 as xid4, small.id4 as smallid4, x.id5, x.id6, x.v1, small.v2 FROM x INNER JOIN small ON x.id1 = small.id1;
Query 1 iteration 1 took 38.1 ms and returned 900 rows
Query 1 iteration 2 took 3.3 ms and returned 900 rows
Query 1 iteration 3 took 2.1 ms and returned 900 rows
Q2: SELECT x.id1 as xid1, medium.id1 as mediumid1, x.id2, x.id3, x.id4 as xid4, medium.id4 as mediumid4, x.id5 as xid5, medium.id5 as mediumid5, x.id6, x.v1, medium.v2 FROM x INNER JOIN medium ON x.id2 = medium.id2;
Query 2 iteration 1 took 46.1 ms and returned 912 rows
Query 2 iteration 2 took 18.4 ms and returned 912 rows
Query 2 iteration 3 took 18.6 ms and returned 912 rows
Q3: SELECT x.id1 as xid1, medium.id1 as mediumid1, x.id2, x.id3, x.id4 as xid4, medium.id4 as mediumid4, x.id5 as xid5, medium.id5 as mediumid5, x.id6, x.v1, medium.v2 FROM x LEFT JOIN medium ON x.id2 = medium.id2;
Query 3 iteration 1 took 18.2 ms and returned 1000 rows
Query 3 iteration 2 took 18.5 ms and returned 1000 rows
Query 3 iteration 3 took 17.7 ms and returned 1000 rows
Q4: SELECT x.id1 as xid1, medium.id1 as mediumid1, x.id2, x.id3, x.id4 as xid4, medium.id4 as mediumid4, x.id5 as xid5, medium.id5 as mediumid5, x.id6, x.v1, medium.v2 FROM x JOIN medium ON x.id5 = medium.id5;
Query 4 iteration 1 took 17.8 ms and returned 912 rows
Query 4 iteration 2 took 18.1 ms and returned 912 rows
Query 4 iteration 3 took 17.6 ms and returned 912 rows
Q5: SELECT x.id1 as xid1, large.id1 as largeid1, x.id2 as xid2, large.id2 as largeid2, x.id3, x.id4 as xid4, large.id4 as largeid4, x.id5 as xid5, large.id5 as largeid5, x.id6 as xid6, large.id6 as largeid6, x.v1, large.v2 FROM x JOIN large ON x.id3 = large.id3;
Query 5 iteration 1 took 49496.6 ms and returned 906 rows
Query 5 iteration 2 took 49838.1 ms and returned 906 rows
Query 5 iteration 3 took 49552.0 ms and returned 906 rows

@SemyonSinchenko
Copy link
Member

The data generation will take long time for big data.

How bad is it? I can try to dig into the problem and try to improve it on the side of falsa (generation library).

@zhuqi-lucas
Copy link
Contributor Author

Thanks @SemyonSinchenko for review, not too bad for my case, and it takes more time and it's expected for huge file generation. But my computer is 48GB memory, i assume lower memory computer may take long time.

@2010YOUY01
Copy link
Contributor

The data generation will take long time for big data.

How bad is it? I can try to dig into the problem and try to improve it on the side of falsa (generation library).

I tried generating 4 join tables in the largest dataset, it took around 8 minutes on my MacBook with M4 pro chip. I think this is definitely not a problem for our benchmarking use case.

@SemyonSinchenko
Copy link
Member

@zhuqi-lucas Did you try to increase the batch_size argument? It is designed to avoid OOMs but the small batch size can also reduce the generation speed. If your computer has enough memory you can try to increase this value. The default value (5_000_000 of rows) is quite conservative.

@2010YOUY01
Copy link
Contributor

Thank you for the benchmark, I've tested it locally and it's working well. I have several small suggestions:

  1. Add document for this new join benchmark https://github.com/apache/datafusion/tree/main/benchmarks
  2. I remember other benchmarks like TPCH will display average time, it would be great to include it.

Q1: SELECT x.id1, x.id2, x.id3, x.id4 as xid4, small.id4 as smallid4, x.id5, x.id6, x.v1, small.v2 FROM x INNER JOIN small ON x.id1 = small.id1;
Query 1 iteration 1 took 38.1 ms and returned 900 rows
Query 1 iteration 2 took 3.3 ms and returned 900 rows
Query 1 iteration 3 took 2.1 ms and returned 900 rows

let avg = millis.iter().sum::<f64>() / millis.len() as f64;
println!("Query {query_id} avg time: {avg:.2} ms");

Regarding the previous Q5 OOM issue, I've tried and it seems consume very small memory now
(the following command is using the command generated by ./bench.sh run h2o_big_join and append --query 5)

/usr/bin/time -l cargo run --release --bin dfbench -- h2o --iterations 1 --join-paths /Users/yongting/Code/datafusion/benchmarks/data/h2o/J1_1e9_NA_0.csv,/Users/yongting/Code/datafusion/benchmarks/data/h2o/J1_1e9_1e3_0.csv,/Users/yongting/Code/datafusion/benchmarks/data/h2o/J1_1e9_1e6_0.csv,/Users/yongting/Code/datafusion/benchmarks/data/h2o/J1_1e9_1e9_NA.csv --queries-path /Users/yongting/Code/datafusion/benchmarks/queries/h2o/join.sql -o /Users/yongting/Code/datafusion/benchmarks/results/pr-14902/h2o_join.json --query 5
    Finished `release` profile [optimized] target(s) in 0.10s
     Running `/Users/yongting/Code/datafusion/target/release/dfbench h2o --iterations 1 --join-paths /Users/yongting/Code/datafusion/benchmarks/data/h2o/J1_1e9_NA_0.csv,/Users/yongting/Code/datafusion/benchmarks/data/h2o/J1_1e9_1e3_0.csv,/Users/yongting/Code/datafusion/benchmarks/data/h2o/J1_1e9_1e6_0.csv,/Users/yongting/Code/datafusion/benchmarks/data/h2o/J1_1e9_1e9_NA.csv --queries-path /Users/yongting/Code/datafusion/benchmarks/queries/h2o/join.sql -o /Users/yongting/Code/datafusion/benchmarks/results/pr-14902/h2o_join.json --query 5`
Running benchmarks with the following options: RunOpt { query: Some(5), common: CommonOpt { iterations: 1, partitions: None, batch_size: 8192, mem_pool_type: "fair", memory_limit: None, sort_spill_reservation_bytes: None, debug: false }, queries_path: "/Users/yongting/Code/datafusion/benchmarks/queries/h2o/join.sql", path: "benchmarks/data/h2o/G1_1e7_1e7_100_0.csv", join_paths: "/Users/yongting/Code/datafusion/benchmarks/data/h2o/J1_1e9_NA_0.csv,/Users/yongting/Code/datafusion/benchmarks/data/h2o/J1_1e9_1e3_0.csv,/Users/yongting/Code/datafusion/benchmarks/data/h2o/J1_1e9_1e6_0.csv,/Users/yongting/Code/datafusion/benchmarks/data/h2o/J1_1e9_1e9_NA.csv", output_path: Some("/Users/yongting/Code/datafusion/benchmarks/results/pr-14902/h2o_join.json") }
Q5: SELECT x.id1 as xid1, large.id1 as largeid1, x.id2 as xid2, large.id2 as largeid2, x.id3, x.id4 as xid4, large.id4 as largeid4, x.id5 as xid5, large.id5 as largeid5, x.id6 as xid6, large.id6 as largeid6, x.v1, large.v2 FROM x JOIN large ON x.id3 = large.id3;
Query 5 iteration 1 took 47010.3 ms and returned 906 rows
       47.21 real       152.57 user        46.83 sys
           153337856  maximum resident set size
                   0  average shared memory size
                   0  average unshared data size
                   0  average unshared stack size
               16753  page reclaims
                 917  page faults
                   0  swaps
                   0  block input operations
                   0  block output operations
                   0  messages sent
                   0  messages received
                   0  signals received
             3279460  voluntary context switches
             1401644  involuntary context switches
       3271486751853  instructions retired
        770247407285  cycles elapsed
           140297632  peak memory footprint

@zhuqi-lucas
Copy link
Contributor Author

Thanks @2010YOUY01 @SemyonSinchenko for review , I tried again, it's not a problem for me now, and previously may due to my disk is not enough, i cleaned up some disk usage.

Thank you for the benchmark, I've tested it locally and it's working well. I have several small suggestions:

  1. Add document for this new join benchmark https://github.com/apache/datafusion/tree/main/benchmarks
  2. I remember other benchmarks like TPCH will display average time, it would be great to include it.

Q1: SELECT x.id1, x.id2, x.id3, x.id4 as xid4, small.id4 as smallid4, x.id5, x.id6, x.v1, small.v2 FROM x INNER JOIN small ON x.id1 = small.id1;
Query 1 iteration 1 took 38.1 ms and returned 900 rows
Query 1 iteration 2 took 3.3 ms and returned 900 rows
Query 1 iteration 3 took 2.1 ms and returned 900 rows

let avg = millis.iter().sum::<f64>() / millis.len() as f64;
println!("Query {query_id} avg time: {avg:.2} ms");

Regarding the previous Q5 OOM issue, I've tried and it seems consume very small memory now (the following command is using the command generated by ./bench.sh run h2o_big_join and append --query 5)

/usr/bin/time -l cargo run --release --bin dfbench -- h2o --iterations 1 --join-paths /Users/yongting/Code/datafusion/benchmarks/data/h2o/J1_1e9_NA_0.csv,/Users/yongting/Code/datafusion/benchmarks/data/h2o/J1_1e9_1e3_0.csv,/Users/yongting/Code/datafusion/benchmarks/data/h2o/J1_1e9_1e6_0.csv,/Users/yongting/Code/datafusion/benchmarks/data/h2o/J1_1e9_1e9_NA.csv --queries-path /Users/yongting/Code/datafusion/benchmarks/queries/h2o/join.sql -o /Users/yongting/Code/datafusion/benchmarks/results/pr-14902/h2o_join.json --query 5
    Finished `release` profile [optimized] target(s) in 0.10s
     Running `/Users/yongting/Code/datafusion/target/release/dfbench h2o --iterations 1 --join-paths /Users/yongting/Code/datafusion/benchmarks/data/h2o/J1_1e9_NA_0.csv,/Users/yongting/Code/datafusion/benchmarks/data/h2o/J1_1e9_1e3_0.csv,/Users/yongting/Code/datafusion/benchmarks/data/h2o/J1_1e9_1e6_0.csv,/Users/yongting/Code/datafusion/benchmarks/data/h2o/J1_1e9_1e9_NA.csv --queries-path /Users/yongting/Code/datafusion/benchmarks/queries/h2o/join.sql -o /Users/yongting/Code/datafusion/benchmarks/results/pr-14902/h2o_join.json --query 5`
Running benchmarks with the following options: RunOpt { query: Some(5), common: CommonOpt { iterations: 1, partitions: None, batch_size: 8192, mem_pool_type: "fair", memory_limit: None, sort_spill_reservation_bytes: None, debug: false }, queries_path: "/Users/yongting/Code/datafusion/benchmarks/queries/h2o/join.sql", path: "benchmarks/data/h2o/G1_1e7_1e7_100_0.csv", join_paths: "/Users/yongting/Code/datafusion/benchmarks/data/h2o/J1_1e9_NA_0.csv,/Users/yongting/Code/datafusion/benchmarks/data/h2o/J1_1e9_1e3_0.csv,/Users/yongting/Code/datafusion/benchmarks/data/h2o/J1_1e9_1e6_0.csv,/Users/yongting/Code/datafusion/benchmarks/data/h2o/J1_1e9_1e9_NA.csv", output_path: Some("/Users/yongting/Code/datafusion/benchmarks/results/pr-14902/h2o_join.json") }
Q5: SELECT x.id1 as xid1, large.id1 as largeid1, x.id2 as xid2, large.id2 as largeid2, x.id3, x.id4 as xid4, large.id4 as largeid4, x.id5 as xid5, large.id5 as largeid5, x.id6 as xid6, large.id6 as largeid6, x.v1, large.v2 FROM x JOIN large ON x.id3 = large.id3;
Query 5 iteration 1 took 47010.3 ms and returned 906 rows
       47.21 real       152.57 user        46.83 sys
           153337856  maximum resident set size
                   0  average shared memory size
                   0  average unshared data size
                   0  average unshared stack size
               16753  page reclaims
                 917  page faults
                   0  swaps
                   0  block input operations
                   0  block output operations
                   0  messages sent
                   0  messages received
                   0  signals received
             3279460  voluntary context switches
             1401644  involuntary context switches
       3271486751853  instructions retired
        770247407285  cycles elapsed
           140297632  peak memory footprint

This is a good suggestion.

@zhuqi-lucas
Copy link
Contributor Author

Thank you for the benchmark, I've tested it locally and it's working well. I have several small suggestions:

  1. Add document for this new join benchmark https://github.com/apache/datafusion/tree/main/benchmarks
  2. I remember other benchmarks like TPCH will display average time, it would be great to include it.

Q1: SELECT x.id1, x.id2, x.id3, x.id4 as xid4, small.id4 as smallid4, x.id5, x.id6, x.v1, small.v2 FROM x INNER JOIN small ON x.id1 = small.id1;
Query 1 iteration 1 took 38.1 ms and returned 900 rows
Query 1 iteration 2 took 3.3 ms and returned 900 rows
Query 1 iteration 3 took 2.1 ms and returned 900 rows

let avg = millis.iter().sum::<f64>() / millis.len() as f64;
println!("Query {query_id} avg time: {avg:.2} ms");

Regarding the previous Q5 OOM issue, I've tried and it seems consume very small memory now (the following command is using the command generated by ./bench.sh run h2o_big_join and append --query 5)

/usr/bin/time -l cargo run --release --bin dfbench -- h2o --iterations 1 --join-paths /Users/yongting/Code/datafusion/benchmarks/data/h2o/J1_1e9_NA_0.csv,/Users/yongting/Code/datafusion/benchmarks/data/h2o/J1_1e9_1e3_0.csv,/Users/yongting/Code/datafusion/benchmarks/data/h2o/J1_1e9_1e6_0.csv,/Users/yongting/Code/datafusion/benchmarks/data/h2o/J1_1e9_1e9_NA.csv --queries-path /Users/yongting/Code/datafusion/benchmarks/queries/h2o/join.sql -o /Users/yongting/Code/datafusion/benchmarks/results/pr-14902/h2o_join.json --query 5
    Finished `release` profile [optimized] target(s) in 0.10s
     Running `/Users/yongting/Code/datafusion/target/release/dfbench h2o --iterations 1 --join-paths /Users/yongting/Code/datafusion/benchmarks/data/h2o/J1_1e9_NA_0.csv,/Users/yongting/Code/datafusion/benchmarks/data/h2o/J1_1e9_1e3_0.csv,/Users/yongting/Code/datafusion/benchmarks/data/h2o/J1_1e9_1e6_0.csv,/Users/yongting/Code/datafusion/benchmarks/data/h2o/J1_1e9_1e9_NA.csv --queries-path /Users/yongting/Code/datafusion/benchmarks/queries/h2o/join.sql -o /Users/yongting/Code/datafusion/benchmarks/results/pr-14902/h2o_join.json --query 5`
Running benchmarks with the following options: RunOpt { query: Some(5), common: CommonOpt { iterations: 1, partitions: None, batch_size: 8192, mem_pool_type: "fair", memory_limit: None, sort_spill_reservation_bytes: None, debug: false }, queries_path: "/Users/yongting/Code/datafusion/benchmarks/queries/h2o/join.sql", path: "benchmarks/data/h2o/G1_1e7_1e7_100_0.csv", join_paths: "/Users/yongting/Code/datafusion/benchmarks/data/h2o/J1_1e9_NA_0.csv,/Users/yongting/Code/datafusion/benchmarks/data/h2o/J1_1e9_1e3_0.csv,/Users/yongting/Code/datafusion/benchmarks/data/h2o/J1_1e9_1e6_0.csv,/Users/yongting/Code/datafusion/benchmarks/data/h2o/J1_1e9_1e9_NA.csv", output_path: Some("/Users/yongting/Code/datafusion/benchmarks/results/pr-14902/h2o_join.json") }
Q5: SELECT x.id1 as xid1, large.id1 as largeid1, x.id2 as xid2, large.id2 as largeid2, x.id3, x.id4 as xid4, large.id4 as largeid4, x.id5 as xid5, large.id5 as largeid5, x.id6 as xid6, large.id6 as largeid6, x.v1, large.v2 FROM x JOIN large ON x.id3 = large.id3;
Query 5 iteration 1 took 47010.3 ms and returned 906 rows
       47.21 real       152.57 user        46.83 sys
           153337856  maximum resident set size
                   0  average shared memory size
                   0  average unshared data size
                   0  average unshared stack size
               16753  page reclaims
                 917  page faults
                   0  swaps
                   0  block input operations
                   0  block output operations
                   0  messages sent
                   0  messages received
                   0  signals received
             3279460  voluntary context switches
             1401644  involuntary context switches
       3271486751853  instructions retired
        770247407285  cycles elapsed
           140297632  peak memory footprint

Thank you @2010YOUY01 for good suggestions, addressed in latest PR, also tested well:

cargo run --release --bin dfbench -- h2o --join-paths ./benchmarks/data/h2o/J1_1e7_NA_0.csv,./benchmarks/data/h2o/J1_1e7_1e1_0.csv,./benchmarks/data/h2o/J1_1e7_1e4_0.csv,./benchmarks/data/h2o/J1_1e7_1e7_NA.csv --queries-path ./benchmarks/queries/h2o/join.sql --query 1

    Finished `release` profile [optimized] target(s) in 0.39s
     Running `target/release/dfbench h2o --join-paths ./benchmarks/data/h2o/J1_1e7_NA_0.csv,./benchmarks/data/h2o/J1_1e7_1e1_0.csv,./benchmarks/data/h2o/J1_1e7_1e4_0.csv,./benchmarks/data/h2o/J1_1e7_1e7_NA.csv --queries-path ./benchmarks/queries/h2o/join.sql --query 1`
Running benchmarks with the following options: RunOpt { query: Some(1), common: CommonOpt { iterations: 3, partitions: None, batch_size: 8192, mem_pool_type: "fair", memory_limit: None, sort_spill_reservation_bytes: None, debug: false }, queries_path: "./benchmarks/queries/h2o/join.sql", path: "benchmarks/data/h2o/G1_1e7_1e7_100_0.csv", join_paths: "./benchmarks/data/h2o/J1_1e7_NA_0.csv,./benchmarks/data/h2o/J1_1e7_1e1_0.csv,./benchmarks/data/h2o/J1_1e7_1e4_0.csv,./benchmarks/data/h2o/J1_1e7_1e7_NA.csv", output_path: None }
Q1: SELECT x.id1, x.id2, x.id3, x.id4 as xid4, small.id4 as smallid4, x.id5, x.id6, x.v1, small.v2 FROM x INNER JOIN small ON x.id1 = small.id1;
Query 1 iteration 1 took 39.6 ms and returned 9 rows
Query 1 iteration 2 took 1.8 ms and returned 9 rows
Query 1 iteration 3 took 1.5 ms and returned 9 rows
Query 1 avg time: 14.27 ms

@Dandandan Dandandan merged commit aa1c7c4 into apache:main Feb 28, 2025
24 checks passed
@Dandandan
Copy link
Contributor

Nice, hopefully we can find ways to improve joins further 💪

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants