Skip to content

Commit 40e65e5

Browse files
authored
shard reader: refactor (#94)
* shard reader: refactor * erl: export `JoinHandle` for timeout abort
1 parent 1eb19f1 commit 40e65e5

File tree

4 files changed

+109
-53
lines changed

4 files changed

+109
-53
lines changed

src/hstreamdb/src/client.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ fn set_scheme(url: &str) -> Option<String> {
7575
}
7676

7777
impl Client {
78-
async fn new_channel_provider(
78+
pub(crate) async fn new_channel_provider(
7979
&self,
8080
mut channel_provider_settings: ChannelProviderSettings,
8181
) -> common::Result<Channels> {

src/hstreamdb/src/producer.rs

+4-1
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,7 @@ impl Producer {
198198
}
199199
};
200200
}
201+
log::debug!("producer channels closed, awaiting for all tasks to be finished");
201202

202203
self.shard_buffer_timer
203204
.iter()
@@ -213,9 +214,11 @@ impl Producer {
213214
let tasks = std::mem::take(&mut self.tasks);
214215
for task in tasks {
215216
task.await.unwrap_or_else(|err| {
216-
log::error!("await for task in stopping producer failed: {err}")
217+
log::error!("failed to await for task when stopping producer: {err}")
217218
})
218219
}
220+
221+
log::info!("producer: graceful shutdown")
219222
}
220223

221224
async fn handle_flush_request(&mut self, request: Option<ShardId>) {

src/hstreamdb/src/reader.rs

+18-16
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,17 @@ use hstreamdb_pb::{
44
};
55
use prost::DecodeError;
66

7+
use crate::channel_provider::Channels;
78
use crate::client::Client;
89
use crate::common::{self, ShardId};
910
use crate::utils::decode_received_records;
10-
use crate::{format_url, Payload};
11+
use crate::{format_url, ChannelProviderSettings, Payload};
1112

12-
pub struct ShardReaderId {
13+
pub struct ShardReader {
1314
reader_id: String,
1415
server_url: String,
16+
17+
channels: Channels,
1518
}
1619

1720
impl Client {
@@ -22,7 +25,8 @@ impl Client {
2225
shard_id: ShardId,
2326
shard_offset: crate::common::StreamShardOffset,
2427
timeout_ms: u32,
25-
) -> common::Result<ShardReaderId> {
28+
channel_provider_settings: ChannelProviderSettings,
29+
) -> common::Result<ShardReader> {
2630
let request = CreateShardReaderRequest {
2731
stream_name,
2832
shard_id,
@@ -49,24 +53,25 @@ impl Client {
4953
.ok_or_else(|| common::Error::PBUnwrapError("server_node".to_string()))?;
5054
let server_url = format_url!(&self.url_scheme, server_node);
5155

52-
Ok(ShardReaderId {
56+
let channels = self.new_channel_provider(channel_provider_settings).await?;
57+
58+
Ok(ShardReader {
5359
reader_id,
5460
server_url,
61+
channels,
5562
})
5663
}
64+
}
5765

66+
impl ShardReader {
5867
pub async fn read_shard(
5968
&self,
60-
shard_reader_id: &ShardReaderId,
6169
max_records: u32,
6270
) -> common::Result<Vec<(RecordId, Result<Payload, DecodeError>)>> {
63-
let mut channel = self
64-
.channels
65-
.channel_at(shard_reader_id.server_url.clone())
66-
.await?;
71+
let mut channel = self.channels.channel_at(self.server_url.clone()).await?;
6772
let records = channel
6873
.read_shard(ReadShardRequest {
69-
reader_id: shard_reader_id.reader_id.clone(),
74+
reader_id: self.reader_id.clone(),
7075
max_records,
7176
})
7277
.await?
@@ -84,14 +89,11 @@ impl Client {
8489
Ok(records)
8590
}
8691

87-
pub async fn delete_shard_reader(&self, shard_reader_id: &ShardReaderId) -> common::Result<()> {
88-
let mut channel = self
89-
.channels
90-
.channel_at(shard_reader_id.server_url.clone())
91-
.await?;
92+
pub async fn delete_shard_reader(self) -> common::Result<()> {
93+
let mut channel = self.channels.channel_at(self.server_url).await?;
9294
channel
9395
.delete_shard_reader(DeleteShardReaderRequest {
94-
reader_id: shard_reader_id.reader_id.clone(),
96+
reader_id: self.reader_id,
9597
})
9698
.await?;
9799
Ok(())

0 commit comments

Comments
 (0)