Skip to content

feat : object store #4

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
548 changes: 540 additions & 8 deletions Cargo.lock

Large diffs are not rendered by default.

13 changes: 11 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ edition = "2024"

[dependencies]
tokio = { version = "1.43", features = ["full"] }
foyer = {version ="0.17.0",features=["serde"]}
object_store = { version = "0.11.2", features = ["aws", "azure", "gcp"] }
datafusion = "46.0.0"
arrow = "54.2.0"
uuid = { version = "1.13", features = ["v4", "serde"] }
Expand All @@ -18,7 +20,7 @@ log = "0.4.25"
color-eyre = "0.6.3"
arrow-schema = "54.1.0"
regex = "1.11.1"
deltalake = { version = "0.25.0", features = ["datafusion", "s3"] }
deltalake = { version = "0.25", features = ["datafusion", "s3","azure", "gcs",] }
delta_kernel = { version = "0.8.0", features = [
"arrow-conversion",
"default-engine",
Expand Down Expand Up @@ -67,6 +69,9 @@ aws-sdk-s3 = "1.3.0"
url = "2.5.4"
datafusion-common = "46.0.0"
tokio-cron-scheduler = "0.10"
metrics = "0.24.2"
flate2 = "1.1.1"
async-stream = "0.3"

[dev-dependencies]
serial_test = "3.2.0"
Expand All @@ -75,5 +80,9 @@ scopeguard = "1.2.0"
rand = "0.8.5"

[features]
default = []
default = ["s3", "azure", "gcs"]
s3 = ["deltalake/s3", "object_store/aws"]
azure = ["deltalake/azure", "object_store/azure"]
gcs = ["deltalake/gcs", "object_store/gcp"]
test = []

176 changes: 88 additions & 88 deletions benches/benchmarks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,51 +34,51 @@ fn bench_batch_ingestion(c: &mut Criterion) {
let mut records = Vec::with_capacity(batch_size);
for _ in 0..batch_size {
records.push(IngestRecord {
table_name: "bench_table".to_string(),
project_id: "bench_project".to_string(),
id: Uuid::new_v4().to_string(),
version: 1,
event_type: "bench_event".to_string(),
timestamp: "2025-03-11T12:00:00Z".to_string(),
trace_id: "trace".to_string(),
span_id: "span".to_string(),
parent_span_id: None,
trace_state: None,
start_time: "2025-03-11T12:00:00Z".to_string(),
end_time: Some("2025-03-11T12:00:01Z".to_string()),
duration_ns: 1_000_000_000,
span_name: "span_name".to_string(),
span_kind: "client".to_string(),
span_type: "bench".to_string(),
status: None,
status_code: 0,
status_message: "OK".to_string(),
severity_text: None,
severity_number: 0,
host: "localhost".to_string(),
url_path: "/".to_string(),
raw_url: "/".to_string(),
method: "GET".to_string(),
referer: "".to_string(),
path_params: None,
query_params: None,
request_headers: None,
response_headers: None,
request_body: None,
response_body: None,
endpoint_hash: "hash".to_string(),
shape_hash: "shape".to_string(),
format_hashes: vec!["fmt".to_string()],
field_hashes: vec!["field".to_string()],
sdk_type: "rust".to_string(),
service_version: None,
attributes: None,
events: None,
links: None,
resource: None,
table_name: "bench_table".to_string(),
project_id: "bench_project".to_string(),
id: Uuid::new_v4().to_string(),
version: 1,
event_type: "bench_event".to_string(),
timestamp: "2025-03-11T12:00:00Z".to_string(),
trace_id: "trace".to_string(),
span_id: "span".to_string(),
parent_span_id: None,
trace_state: None,
start_time: "2025-03-11T12:00:00Z".to_string(),
end_time: Some("2025-03-11T12:00:01Z".to_string()),
duration_ns: 1_000_000_000,
span_name: "span_name".to_string(),
span_kind: "client".to_string(),
span_type: "bench".to_string(),
status: None,
status_code: 0,
status_message: "OK".to_string(),
severity_text: None,
severity_number: 0,
host: "localhost".to_string(),
url_path: "/".to_string(),
raw_url: "/".to_string(),
method: "GET".to_string(),
referer: "".to_string(),
path_params: None,
query_params: None,
request_headers: None,
response_headers: None,
request_body: None,
response_body: None,
endpoint_hash: "hash".to_string(),
shape_hash: "shape".to_string(),
format_hashes: vec!["fmt".to_string()],
field_hashes: vec!["field".to_string()],
sdk_type: "rust".to_string(),
service_version: None,
attributes: None,
events: None,
links: None,
resource: None,
instrumentation_scope: None,
errors: None,
tags: vec!["tag".to_string()],
errors: None,
tags: vec!["tag".to_string()],
});
}

Expand Down Expand Up @@ -110,51 +110,51 @@ fn bench_insertion_range(c: &mut Criterion) {
let mut records = Vec::with_capacity(size);
for _ in 0..size {
records.push(IngestRecord {
table_name: "bench_table".to_string(),
project_id: "bench_project".to_string(),
id: Uuid::new_v4().to_string(),
version: 1,
event_type: "bench_event".to_string(),
timestamp: "2025-03-11T12:00:00Z".to_string(),
trace_id: "trace".to_string(),
span_id: "span".to_string(),
parent_span_id: None,
trace_state: None,
start_time: "2025-03-11T12:00:00Z".to_string(),
end_time: Some("2025-03-11T12:00:01Z".to_string()),
duration_ns: 1_000_000_000,
span_name: "span_name".to_string(),
span_kind: "client".to_string(),
span_type: "bench".to_string(),
status: None,
status_code: 0,
status_message: "OK".to_string(),
severity_text: None,
severity_number: 0,
host: "localhost".to_string(),
url_path: "/".to_string(),
raw_url: "/".to_string(),
method: "GET".to_string(),
referer: "".to_string(),
path_params: None,
query_params: None,
request_headers: None,
response_headers: None,
request_body: None,
response_body: None,
endpoint_hash: "hash".to_string(),
shape_hash: "shape".to_string(),
format_hashes: vec!["fmt".to_string()],
field_hashes: vec!["field".to_string()],
sdk_type: "rust".to_string(),
service_version: None,
attributes: None,
events: None,
links: None,
resource: None,
table_name: "bench_table".to_string(),
project_id: "bench_project".to_string(),
id: Uuid::new_v4().to_string(),
version: 1,
event_type: "bench_event".to_string(),
timestamp: "2025-03-11T12:00:00Z".to_string(),
trace_id: "trace".to_string(),
span_id: "span".to_string(),
parent_span_id: None,
trace_state: None,
start_time: "2025-03-11T12:00:00Z".to_string(),
end_time: Some("2025-03-11T12:00:01Z".to_string()),
duration_ns: 1_000_000_000,
span_name: "span_name".to_string(),
span_kind: "client".to_string(),
span_type: "bench".to_string(),
status: None,
status_code: 0,
status_message: "OK".to_string(),
severity_text: None,
severity_number: 0,
host: "localhost".to_string(),
url_path: "/".to_string(),
raw_url: "/".to_string(),
method: "GET".to_string(),
referer: "".to_string(),
path_params: None,
query_params: None,
request_headers: None,
response_headers: None,
request_body: None,
response_body: None,
endpoint_hash: "hash".to_string(),
shape_hash: "shape".to_string(),
format_hashes: vec!["fmt".to_string()],
field_hashes: vec!["field".to_string()],
sdk_type: "rust".to_string(),
service_version: None,
attributes: None,
events: None,
links: None,
resource: None,
instrumentation_scope: None,
errors: None,
tags: vec!["tag".to_string()],
errors: None,
tags: vec!["tag".to_string()],
});
}

Expand Down
21 changes: 11 additions & 10 deletions src/batch_queue.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::{
sync::Arc,
time::{Duration, Instant},
};

use anyhow::Result;
use crossbeam::queue::SegQueue;
use delta_kernel::arrow::record_batch::RecordBatch;
use tokio::sync::RwLock;
use tokio::time::interval;
use tokio::{sync::RwLock, time::interval};
use tracing::{error, info};

/// BatchQueue collects RecordBatches and processes them at intervals
#[derive(Debug)]
pub struct BatchQueue {
queue: Arc<SegQueue<RecordBatch>>,
queue: Arc<SegQueue<RecordBatch>>,
is_shutting_down: Arc<RwLock<bool>>,
}

Expand Down Expand Up @@ -105,14 +106,14 @@ async fn process_batches(db: &Arc<crate::database::Database>, queue: &Arc<SegQue

#[cfg(test)]
mod tests {
use super::*;
use crate::database::Database;
use crate::persistent_queue::OtelLogsAndSpans;
use chrono::Utc;
use serde_arrow::schema::SchemaLike;
use std::sync::Arc;

use chrono::Utc;
use tokio::time::sleep;

use super::*;
use crate::{database::Database, persistent_queue::OtelLogsAndSpans};

#[tokio::test]
async fn test_batch_queue() -> Result<()> {
dotenv::dotenv().ok();
Expand Down
Loading
Loading