Skip to content

Commit 505c314

Browse files
fix: staging files count in metrics (#1207)
current - we find group of arrow files and a parquet file in order to merge all arrow files in a group and write to parquet in disk in metrics, we set count of arrow files for 1 group not all change - we get sum of total count of arrow files from all the groups then set this sum in metrics fixes: #1149
1 parent 245ec54 commit 505c314

File tree

2 files changed

+23
-15
lines changed

2 files changed

+23
-15
lines changed

src/parseable/streams.rs

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -444,21 +444,27 @@ impl Stream {
444444
.set(0);
445445
}
446446

447+
//find sum of arrow files in staging directory for a stream
448+
let total_arrow_files = staging_files.values().map(|v| v.len()).sum::<usize>();
449+
metrics::STAGING_FILES
450+
.with_label_values(&[&self.stream_name])
451+
.set(total_arrow_files as i64);
452+
453+
//find sum of file sizes of all arrow files in staging_files
454+
let total_arrow_files_size = staging_files
455+
.values()
456+
.map(|v| {
457+
v.iter()
458+
.map(|file| file.metadata().unwrap().len())
459+
.sum::<u64>()
460+
})
461+
.sum::<u64>();
462+
metrics::STORAGE_SIZE
463+
.with_label_values(&["staging", &self.stream_name, "arrows"])
464+
.set(total_arrow_files_size as i64);
465+
447466
// warn!("staging files-\n{staging_files:?}\n");
448467
for (parquet_path, arrow_files) in staging_files {
449-
metrics::STAGING_FILES
450-
.with_label_values(&[&self.stream_name])
451-
.set(arrow_files.len() as i64);
452-
453-
for file in &arrow_files {
454-
let file_size = file.metadata().unwrap().len();
455-
let file_type = file.extension().unwrap().to_str().unwrap();
456-
457-
metrics::STORAGE_SIZE
458-
.with_label_values(&["staging", &self.stream_name, file_type])
459-
.add(file_size as i64);
460-
}
461-
462468
let record_reader = MergedReverseRecordReader::try_new(&arrow_files);
463469
if record_reader.readers.is_empty() {
464470
continue;
@@ -494,6 +500,7 @@ impl Stream {
494500
"Couldn't rename part file: {part_path:?} -> {parquet_path:?}, error = {e}"
495501
);
496502
}
503+
497504
for file in arrow_files {
498505
// warn!("file-\n{file:?}\n");
499506
let file_size = file.metadata().unwrap().len();

src/storage/object_storage.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@ use datafusion::{datasource::listing::ListingTableUrl, execution::runtime_env::R
3636
use once_cell::sync::OnceCell;
3737
use relative_path::RelativePath;
3838
use relative_path::RelativePathBuf;
39-
use tracing::{debug, error, warn};
39+
use tracing::info;
40+
use tracing::{error, warn};
4041
use ulid::Ulid;
4142

4243
use crate::alerts::AlertConfig;
@@ -718,7 +719,7 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static {
718719

719720
// get all streams
720721
for stream_name in PARSEABLE.streams.list() {
721-
debug!("Starting object_store_sync for stream- {stream_name}");
722+
info!("Starting object_store_sync for stream- {stream_name}");
722723

723724
let stream = PARSEABLE.get_or_create_stream(&stream_name);
724725
let custom_partition = stream.get_custom_partition();

0 commit comments

Comments
 (0)