Skip to content

Commit cfd1348

Browse files
fix: multiple fixes around system stability under load (#1346)
1. perform object store sync for all streams in parallel 2. remove restriction of multi threading to utilise all available cores 3. add atomicity in conversion by - i. each conversion task processes one minute of arrows ii. move arrow files to inprocess folder to maintain atomicity iii. add a init sync task to process all pending files iv. add tokio sleep of 5 secs in shutdown task to let complete ongoing jobs v. remove unwrap of write locks to avoid thread poisoning
1 parent 3cdd854 commit cfd1348

File tree

9 files changed

+751
-323
lines changed

9 files changed

+751
-323
lines changed

src/handlers/http/health_check.rs

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,12 @@ use actix_web::{
2929
use http::StatusCode;
3030
use once_cell::sync::Lazy;
3131
use tokio::{sync::Mutex, task::JoinSet};
32-
use tracing::{error, info, warn};
32+
use tracing::{error, info};
3333

34-
use crate::parseable::PARSEABLE;
34+
use crate::{parseable::PARSEABLE, storage::object_storage::sync_all_streams};
3535

3636
// Create a global variable to store signal status
37-
static SIGNAL_RECEIVED: Lazy<Arc<Mutex<bool>>> = Lazy::new(|| Arc::new(Mutex::new(false)));
37+
pub static SIGNAL_RECEIVED: Lazy<Arc<Mutex<bool>>> = Lazy::new(|| Arc::new(Mutex::new(false)));
3838

3939
pub async fn liveness() -> HttpResponse {
4040
HttpResponse::new(StatusCode::OK)
@@ -60,28 +60,33 @@ pub async fn shutdown() {
6060
let mut shutdown_flag = SIGNAL_RECEIVED.lock().await;
6161
*shutdown_flag = true;
6262

63-
let mut joinset = JoinSet::new();
63+
//sleep for 5 secs to allow any ongoing requests to finish
64+
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
65+
let mut local_sync_joinset = JoinSet::new();
6466

6567
// Sync staging
66-
PARSEABLE.streams.flush_and_convert(&mut joinset, true);
68+
PARSEABLE
69+
.streams
70+
.flush_and_convert(&mut local_sync_joinset, false, true);
6771

68-
while let Some(res) = joinset.join_next().await {
72+
while let Some(res) = local_sync_joinset.join_next().await {
6973
match res {
7074
Ok(Ok(_)) => info!("Successfully converted arrow files to parquet."),
71-
Ok(Err(err)) => warn!("Failed to convert arrow files to parquet. {err:?}"),
75+
Ok(Err(err)) => error!("Failed to convert arrow files to parquet. {err:?}"),
7276
Err(err) => error!("Failed to join async task: {err}"),
7377
}
7478
}
7579

76-
if let Err(e) = PARSEABLE
77-
.storage
78-
.get_object_store()
79-
.upload_files_from_staging()
80-
.await
81-
{
82-
warn!("Failed to sync local data with object store. {:?}", e);
83-
} else {
84-
info!("Successfully synced all data to S3.");
80+
// Sync object store
81+
let mut object_store_joinset = JoinSet::new();
82+
sync_all_streams(&mut object_store_joinset);
83+
84+
while let Some(res) = object_store_joinset.join_next().await {
85+
match res {
86+
Ok(Ok(_)) => info!("Successfully synced all data to S3."),
87+
Ok(Err(err)) => error!("Failed to sync local data with object store. {err:?}"),
88+
Err(err) => error!("Failed to join async task: {err}"),
89+
}
8590
}
8691
}
8792

src/handlers/http/modal/ingest_server.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use tokio::sync::oneshot;
3131
use tokio::sync::OnceCell;
3232

3333
use crate::handlers::http::modal::NodeType;
34+
use crate::sync::sync_start;
3435
use crate::{
3536
analytics,
3637
handlers::{
@@ -114,6 +115,13 @@ impl ParseableServer for IngestServer {
114115

115116
migration::run_migration(&PARSEABLE).await?;
116117

118+
// local sync on init
119+
let startup_sync_handle = tokio::spawn(async {
120+
if let Err(e) = sync_start().await {
121+
tracing::warn!("local sync on server start failed: {e}");
122+
}
123+
});
124+
117125
// Run sync on a background thread
118126
let (cancel_tx, cancel_rx) = oneshot::channel();
119127
thread::spawn(|| sync::handler(cancel_rx));
@@ -124,7 +132,9 @@ impl ParseableServer for IngestServer {
124132
let result = self.start(shutdown_rx, prometheus.clone(), None).await;
125133
// Cancel sync jobs
126134
cancel_tx.send(()).expect("Cancellation should not fail");
127-
135+
if let Err(join_err) = startup_sync_handle.await {
136+
tracing::warn!("startup sync task panicked: {join_err}");
137+
}
128138
result
129139
}
130140
}

src/handlers/http/modal/query_server.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use crate::handlers::http::{logstream, MAX_EVENT_PAYLOAD_SIZE};
2727
use crate::handlers::http::{rbac, role};
2828
use crate::hottier::HotTierManager;
2929
use crate::rbac::role::Action;
30+
use crate::sync::sync_start;
3031
use crate::{analytics, migration, storage, sync};
3132
use actix_web::web::{resource, ServiceConfig};
3233
use actix_web::{web, Scope};
@@ -126,6 +127,13 @@ impl ParseableServer for QueryServer {
126127
if init_cluster_metrics_schedular().is_ok() {
127128
info!("Cluster metrics scheduler started successfully");
128129
}
130+
131+
// local sync on init
132+
let startup_sync_handle = tokio::spawn(async {
133+
if let Err(e) = sync_start().await {
134+
tracing::warn!("local sync on server start failed: {e}");
135+
}
136+
});
129137
if let Some(hot_tier_manager) = HotTierManager::global() {
130138
hot_tier_manager.put_internal_stream_hot_tier().await?;
131139
hot_tier_manager.download_from_s3()?;
@@ -142,7 +150,9 @@ impl ParseableServer for QueryServer {
142150
.await?;
143151
// Cancel sync jobs
144152
cancel_tx.send(()).expect("Cancellation should not fail");
145-
153+
if let Err(join_err) = startup_sync_handle.await {
154+
tracing::warn!("startup sync task panicked: {join_err}");
155+
}
146156
Ok(result)
147157
}
148158
}

src/handlers/http/modal/server.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use crate::metrics;
3333
use crate::migration;
3434
use crate::storage;
3535
use crate::sync;
36+
use crate::sync::sync_start;
3637

3738
use actix_web::web;
3839
use actix_web::web::resource;
@@ -122,6 +123,13 @@ impl ParseableServer for Server {
122123

123124
storage::retention::load_retention_from_global();
124125

126+
// local sync on init
127+
let startup_sync_handle = tokio::spawn(async {
128+
if let Err(e) = sync_start().await {
129+
tracing::warn!("local sync on server start failed: {e}");
130+
}
131+
});
132+
125133
if let Some(hot_tier_manager) = HotTierManager::global() {
126134
hot_tier_manager.download_from_s3()?;
127135
};
@@ -142,7 +150,9 @@ impl ParseableServer for Server {
142150
.await;
143151
// Cancel sync jobs
144152
cancel_tx.send(()).expect("Cancellation should not fail");
145-
153+
if let Err(join_err) = startup_sync_handle.await {
154+
tracing::warn!("startup sync task panicked: {join_err}");
155+
}
146156
return result;
147157
}
148158
}

0 commit comments

Comments
 (0)