Skip to content

Commit 649c000

Browse files
Devdutt Shenoinikhilsinhaparseable
Devdutt Shenoi
andauthored
feat: merge finish .arrows and convert to .parquet (#1200)
Signed-off-by: Devdutt Shenoi <[email protected]> Co-authored-by: Nikhil Sinha <[email protected]>
1 parent 505c314 commit 649c000

File tree

9 files changed

+78
-143
lines changed

9 files changed

+78
-143
lines changed

src/handlers/http/health_check.rs

+24-2
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ use actix_web::{
2727
HttpResponse,
2828
};
2929
use http::StatusCode;
30-
use tokio::sync::Mutex;
30+
use tokio::{sync::Mutex, task::JoinSet};
31+
use tracing::{error, info, warn};
3132

3233
use crate::parseable::PARSEABLE;
3334

@@ -60,8 +61,29 @@ pub async fn shutdown() {
6061
let mut shutdown_flag = SIGNAL_RECEIVED.lock().await;
6162
*shutdown_flag = true;
6263

64+
let mut joinset = JoinSet::new();
65+
6366
// Sync staging
64-
PARSEABLE.flush_all_streams();
67+
PARSEABLE.streams.flush_and_convert(&mut joinset, true);
68+
69+
while let Some(res) = joinset.join_next().await {
70+
match res {
71+
Ok(Ok(_)) => info!("Successfully converted arrow files to parquet."),
72+
Ok(Err(err)) => warn!("Failed to convert arrow files to parquet. {err:?}"),
73+
Err(err) => error!("Failed to join async task: {err}"),
74+
}
75+
}
76+
77+
if let Err(e) = PARSEABLE
78+
.storage
79+
.get_object_store()
80+
.upload_files_from_staging()
81+
.await
82+
{
83+
warn!("Failed to sync local data with object store. {:?}", e);
84+
} else {
85+
info!("Successfully synced all data to S3.");
86+
}
6587
}
6688

6789
pub async fn readiness() -> HttpResponse {

src/handlers/http/modal/mod.rs

-20
Original file line numberDiff line numberDiff line change
@@ -136,26 +136,6 @@ pub trait ParseableServer {
136136

137137
health_check::shutdown().await;
138138

139-
// Perform S3 sync and wait for completion
140-
info!("Starting data sync to S3...");
141-
142-
if let Err(e) = PARSEABLE.streams.prepare_parquet(true) {
143-
warn!("Failed to convert arrow files to parquet. {:?}", e);
144-
} else {
145-
info!("Successfully converted arrow files to parquet.");
146-
}
147-
148-
if let Err(e) = PARSEABLE
149-
.storage
150-
.get_object_store()
151-
.upload_files_from_staging()
152-
.await
153-
{
154-
warn!("Failed to sync local data with object store. {:?}", e);
155-
} else {
156-
info!("Successfully synced all data to S3.");
157-
}
158-
159139
// Initiate graceful shutdown
160140
info!("Graceful shutdown of HTTP server triggered");
161141
srv_handle.stop(true).await;

src/lib.rs

+9-2
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,15 @@ use reqwest::{Client, ClientBuilder};
5959
// It is very unlikely that panic will occur when dealing with locks.
6060
pub const LOCK_EXPECT: &str = "Thread shouldn't panic while holding a lock";
6161

62-
pub const STORAGE_CONVERSION_INTERVAL: u64 = 60;
63-
pub const STORAGE_UPLOAD_INTERVAL: u64 = 30;
62+
/// Describes the duration at the end of which in-memory buffers are flushed,
63+
/// arrows files are "finished" and compacted into parquet files.
64+
pub const LOCAL_SYNC_INTERVAL: Duration = Duration::from_secs(60);
65+
66+
/// Duration used to configure prefix generation.
67+
pub const OBJECT_STORE_DATA_GRANULARITY: u32 = LOCAL_SYNC_INTERVAL.as_secs() as u32 / 60;
68+
69+
/// Describes the duration at the end of which parquets are pushed into objectstore.
70+
pub const STORAGE_UPLOAD_INTERVAL: Duration = Duration::from_secs(30);
6471

6572
// A single HTTP client for all outgoing HTTP requests from the parseable server
6673
static HTTP_CLIENT: Lazy<Client> = Lazy::new(|| {

src/parseable/mod.rs

-10
Original file line numberDiff line numberDiff line change
@@ -179,16 +179,6 @@ impl Parseable {
179179
.unwrap_or_default())
180180
}
181181

182-
/// Writes all streams in staging onto disk, awaiting conversion into parquet.
183-
/// Deletes all in memory recordbatches, freeing up rows in mem-writer.
184-
pub fn flush_all_streams(&self) {
185-
let streams = self.streams.read().unwrap();
186-
187-
for staging in streams.values() {
188-
staging.flush()
189-
}
190-
}
191-
192182
// validate the storage, if the proper path for staging directory is provided
193183
// if the proper data directory is provided, or s3 bucket is provided etc
194184
pub async fn validate_storage(&self) -> Result<Option<Bytes>, ObjectStorageError> {

src/parseable/streams.rs

+18-11
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ use parquet::{
4141
};
4242
use rand::distributions::DistString;
4343
use relative_path::RelativePathBuf;
44+
use tokio::task::JoinSet;
4445
use tracing::{error, info, trace, warn};
4546

4647
use crate::{
@@ -49,11 +50,9 @@ use crate::{
4950
metadata::{LogStreamMetadata, SchemaVersion},
5051
metrics,
5152
option::Mode,
52-
storage::{
53-
object_storage::to_bytes, retention::Retention, StreamType, OBJECT_STORE_DATA_GRANULARITY,
54-
},
53+
storage::{object_storage::to_bytes, retention::Retention, StreamType},
5554
utils::minute_to_slot,
56-
LOCK_EXPECT,
55+
LOCK_EXPECT, OBJECT_STORE_DATA_GRANULARITY,
5756
};
5857

5958
use super::{
@@ -660,6 +659,13 @@ impl Stream {
660659
pub fn get_stream_type(&self) -> StreamType {
661660
self.metadata.read().expect(LOCK_EXPECT).stream_type
662661
}
662+
663+
/// First flushes arrows onto disk and then converts the arrow into parquet
664+
pub fn flush_and_convert(&self, shutdown_signal: bool) -> Result<(), StagingError> {
665+
self.flush();
666+
667+
self.prepare_parquet(shutdown_signal)
668+
}
663669
}
664670

665671
#[derive(Deref, DerefMut, Default)]
@@ -726,21 +732,22 @@ impl Streams {
726732
.collect()
727733
}
728734

729-
/// Convert arrow files into parquet, preparing it for upload
730-
pub fn prepare_parquet(&self, shutdown_signal: bool) -> Result<(), StagingError> {
735+
/// Asynchronously flushes arrows and compacts into parquet data on all streams in staging,
736+
/// so that it is ready to be pushed onto objectstore.
737+
pub fn flush_and_convert(
738+
&self,
739+
joinset: &mut JoinSet<Result<(), StagingError>>,
740+
shutdown_signal: bool,
741+
) {
731742
let streams: Vec<Arc<Stream>> = self
732743
.read()
733744
.expect(LOCK_EXPECT)
734745
.values()
735746
.map(Arc::clone)
736747
.collect();
737748
for stream in streams {
738-
stream
739-
.prepare_parquet(shutdown_signal)
740-
.inspect_err(|err| error!("Failed to run conversion task {err:?}"))?;
749+
joinset.spawn(async move { stream.flush_and_convert(shutdown_signal) });
741750
}
742-
743-
Ok(())
744751
}
745752
}
746753

src/query/listing_table_builder.rs

+2-3
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,8 @@ use itertools::Itertools;
3232
use object_store::{path::Path, ObjectMeta, ObjectStore};
3333

3434
use crate::{
35-
event::DEFAULT_TIMESTAMP_KEY,
36-
storage::{ObjectStorage, OBJECT_STORE_DATA_GRANULARITY},
37-
utils::time::TimeRange,
35+
event::DEFAULT_TIMESTAMP_KEY, storage::ObjectStorage, utils::time::TimeRange,
36+
OBJECT_STORE_DATA_GRANULARITY,
3837
};
3938

4039
use super::PartialTimeFilter;

src/storage/mod.rs

-8
Original file line numberDiff line numberDiff line change
@@ -57,14 +57,6 @@ pub const SCHEMA_FILE_NAME: &str = ".schema";
5757
pub const ALERTS_ROOT_DIRECTORY: &str = ".alerts";
5858
pub const MANIFEST_FILE: &str = "manifest.json";
5959

60-
/// local sync interval to move data.records to /tmp dir of that stream.
61-
/// 60 sec is a reasonable value.
62-
pub const LOCAL_SYNC_INTERVAL: u64 = 60;
63-
64-
/// duration used to configure prefix in objectstore and local disk structure
65-
/// used for storage. Defaults to 1 min.
66-
pub const OBJECT_STORE_DATA_GRANULARITY: u32 = (LOCAL_SYNC_INTERVAL as u32) / 60;
67-
6860
// max concurrent request allowed for datafusion object store
6961
const MAX_OBJECT_STORE_REQUESTS: usize = 1000;
7062

src/storage/object_storage.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -713,7 +713,7 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static {
713713
}
714714

715715
async fn upload_files_from_staging(&self) -> Result<(), ObjectStorageError> {
716-
if !Path::new(&PARSEABLE.options.staging_dir()).exists() {
716+
if !PARSEABLE.options.staging_dir().exists() {
717717
return Ok(());
718718
}
719719

src/sync.rs

+24-86
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,14 @@ use chrono::{TimeDelta, Timelike};
2020
use std::future::Future;
2121
use std::panic::AssertUnwindSafe;
2222
use tokio::sync::oneshot;
23+
use tokio::task::JoinSet;
2324
use tokio::time::{interval_at, sleep, Duration, Instant};
2425
use tokio::{select, task};
2526
use tracing::{error, info, trace, warn};
2627

2728
use crate::alerts::{alerts_utils, AlertConfig, AlertError};
2829
use crate::parseable::PARSEABLE;
29-
use crate::storage::LOCAL_SYNC_INTERVAL;
30-
use crate::{STORAGE_CONVERSION_INTERVAL, STORAGE_UPLOAD_INTERVAL};
30+
use crate::{LOCAL_SYNC_INTERVAL, STORAGE_UPLOAD_INTERVAL};
3131

3232
// Calculates the instant that is the start of the next minute
3333
fn next_minute() -> Instant {
@@ -63,7 +63,7 @@ where
6363
if warned_once {
6464
warn!(
6565
"Task '{task_name}' took longer than expected: {:?} (threshold: {threshold:?})",
66-
start_time.elapsed() - threshold
66+
start_time.elapsed()
6767
);
6868
}
6969
break res.expect("Task handle shouldn't error");
@@ -74,28 +74,22 @@ where
7474

7575
/// Flushes arrows onto disk every `LOCAL_SYNC_INTERVAL` seconds, packs arrows into parquet every
7676
/// `STORAGE_CONVERSION_INTERVAL` secondsand uploads them every `STORAGE_UPLOAD_INTERVAL` seconds.
77-
#[tokio::main(flavor = "current_thread")]
77+
#[tokio::main(flavor = "multi_thread", worker_threads = 2)]
7878
pub async fn handler(mut cancel_rx: oneshot::Receiver<()>) -> anyhow::Result<()> {
79-
let (localsync_handler, mut localsync_outbox, localsync_inbox) = run_local_sync();
79+
let (localsync_handler, mut localsync_outbox, localsync_inbox) = local_sync();
8080
let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) =
8181
object_store_sync();
82-
let (mut remote_conversion_handler, mut remote_conversion_outbox, mut remote_conversion_inbox) =
83-
arrow_conversion();
8482
loop {
8583
select! {
8684
_ = &mut cancel_rx => {
8785
// actix server finished .. stop other threads and stop the server
8886
remote_sync_inbox.send(()).unwrap_or(());
8987
localsync_inbox.send(()).unwrap_or(());
90-
remote_conversion_inbox.send(()).unwrap_or(());
9188
if let Err(e) = localsync_handler.await {
92-
error!("Error joining remote_sync_handler: {:?}", e);
89+
error!("Error joining localsync_handler: {e:?}");
9390
}
9491
if let Err(e) = remote_sync_handler.await {
95-
error!("Error joining remote_sync_handler: {:?}", e);
96-
}
97-
if let Err(e) = remote_conversion_handler.await {
98-
error!("Error joining remote_conversion_handler: {:?}", e);
92+
error!("Error joining remote_sync_handler: {e:?}");
9993
}
10094
return Ok(());
10195
},
@@ -107,17 +101,10 @@ pub async fn handler(mut cancel_rx: oneshot::Receiver<()>) -> anyhow::Result<()>
107101
_ = &mut remote_sync_outbox => {
108102
// remote_sync failed, this is recoverable by just starting remote_sync thread again
109103
if let Err(e) = remote_sync_handler.await {
110-
error!("Error joining remote_sync_handler: {:?}", e);
104+
error!("Error joining remote_sync_handler: {e:?}");
111105
}
112106
(remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = object_store_sync();
113107
},
114-
_ = &mut remote_conversion_outbox => {
115-
// remote_conversion failed, this is recoverable by just starting remote_conversion thread again
116-
if let Err(e) = remote_conversion_handler.await {
117-
error!("Error joining remote_conversion_handler: {:?}", e);
118-
}
119-
(remote_conversion_handler, remote_conversion_outbox, remote_conversion_inbox) = arrow_conversion();
120-
},
121108
}
122109
}
123110
}
@@ -132,8 +119,7 @@ pub fn object_store_sync() -> (
132119

133120
let handle = task::spawn(async move {
134121
let result = std::panic::catch_unwind(AssertUnwindSafe(|| async move {
135-
let mut sync_interval =
136-
interval_at(next_minute(), Duration::from_secs(STORAGE_UPLOAD_INTERVAL));
122+
let mut sync_interval = interval_at(next_minute(), STORAGE_UPLOAD_INTERVAL);
137123

138124
let mut inbox_rx = AssertUnwindSafe(inbox_rx);
139125

@@ -183,64 +169,8 @@ pub fn object_store_sync() -> (
183169
(handle, outbox_rx, inbox_tx)
184170
}
185171

186-
pub fn arrow_conversion() -> (
187-
task::JoinHandle<()>,
188-
oneshot::Receiver<()>,
189-
oneshot::Sender<()>,
190-
) {
191-
let (outbox_tx, outbox_rx) = oneshot::channel::<()>();
192-
let (inbox_tx, inbox_rx) = oneshot::channel::<()>();
193-
194-
let handle = task::spawn(async move {
195-
let result = std::panic::catch_unwind(AssertUnwindSafe(|| async move {
196-
let mut sync_interval = interval_at(
197-
next_minute() + Duration::from_secs(5), // 5 second delay
198-
Duration::from_secs(STORAGE_CONVERSION_INTERVAL),
199-
);
200-
201-
let mut inbox_rx = AssertUnwindSafe(inbox_rx);
202-
203-
loop {
204-
select! {
205-
_ = sync_interval.tick() => {
206-
trace!("Converting Arrow to Parquet... ");
207-
if let Err(e) = monitor_task_duration(
208-
"arrow_conversion",
209-
Duration::from_secs(30),
210-
|| async { PARSEABLE.streams.prepare_parquet(false) },
211-
).await
212-
{
213-
warn!("failed to convert local arrow data to parquet. {e:?}");
214-
}
215-
},
216-
res = &mut inbox_rx => {match res{
217-
Ok(_) => break,
218-
Err(_) => {
219-
warn!("Inbox channel closed unexpectedly");
220-
break;
221-
}}
222-
}
223-
}
224-
}
225-
}));
226-
227-
match result {
228-
Ok(future) => {
229-
future.await;
230-
}
231-
Err(panic_error) => {
232-
error!("Panic in object store sync task: {panic_error:?}");
233-
let _ = outbox_tx.send(());
234-
}
235-
}
236-
237-
info!("Object store sync task ended");
238-
});
239-
240-
(handle, outbox_rx, inbox_tx)
241-
}
242-
243-
pub fn run_local_sync() -> (
172+
/// Flush arrows onto disk and convert them into parquet files
173+
pub fn local_sync() -> (
244174
task::JoinHandle<()>,
245175
oneshot::Receiver<()>,
246176
oneshot::Sender<()>,
@@ -253,15 +183,23 @@ pub fn run_local_sync() -> (
253183
let mut inbox_rx = inbox_rx;
254184

255185
let result = std::panic::catch_unwind(AssertUnwindSafe(|| async move {
256-
let mut sync_interval =
257-
interval_at(next_minute(), Duration::from_secs(LOCAL_SYNC_INTERVAL));
186+
let mut sync_interval = interval_at(next_minute(), LOCAL_SYNC_INTERVAL);
187+
let mut joinset = JoinSet::new();
258188

259189
loop {
260190
select! {
191+
// Spawns a flush+conversion task every `LOCAL_SYNC_INTERVAL` seconds
261192
_ = sync_interval.tick() => {
262-
trace!("Flushing Arrows to disk...");
263-
PARSEABLE.flush_all_streams();
193+
PARSEABLE.streams.flush_and_convert(&mut joinset, false)
264194
},
195+
// Joins and logs errors in spawned tasks
196+
Some(res) = joinset.join_next(), if !joinset.is_empty() => {
197+
match res {
198+
Ok(Ok(_)) => info!("Successfully converted arrow files to parquet."),
199+
Ok(Err(err)) => warn!("Failed to convert arrow files to parquet. {err:?}"),
200+
Err(err) => error!("Issue joining flush+conversion task: {err}"),
201+
}
202+
}
265203
res = &mut inbox_rx => {match res{
266204
Ok(_) => break,
267205
Err(_) => {
@@ -278,7 +216,7 @@ pub fn run_local_sync() -> (
278216
future.await;
279217
}
280218
Err(panic_error) => {
281-
error!("Panic in local sync task: {:?}", panic_error);
219+
error!("Panic in local sync task: {panic_error:?}");
282220
}
283221
}
284222

0 commit comments

Comments
 (0)