Skip to content

Commit a42c542

Browse files
committed
more tests
1 parent 9388ae7 commit a42c542

File tree

7 files changed

+64
-9
lines changed

7 files changed

+64
-9
lines changed

src/hstreamdb/src/common.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,11 @@ use tonic::transport;
99

1010
use crate::producer;
1111

12+
pub type SubscriptionId = String;
13+
1214
#[derive(Debug)]
1315
pub struct Subscription {
14-
pub subscription_id: String,
16+
pub subscription_id: SubscriptionId,
1517
pub stream_name: String,
1618
pub ack_timeout_seconds: i32,
1719
pub max_unacked_records: i32,

src/hstreamdb/src/consumer.rs

+11-2
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,27 @@ use prost_types::Struct;
44
use tokio::sync::mpsc::error::SendError;
55
use tokio::sync::mpsc::UnboundedSender;
66
use tokio_stream::wrappers::UnboundedReceiverStream;
7+
use tokio_stream::StreamExt;
78
use tonic::{Request, Streaming};
89

910
use crate::client::Client;
1011
use crate::common::{self, Payload};
1112
use crate::utils::decode_received_records;
1213

14+
pub struct ConsumerStream(UnboundedReceiverStream<(Payload, Responder)>);
15+
16+
impl ConsumerStream {
17+
pub async fn next(&mut self) -> Option<(Payload, Responder)> {
18+
self.0.next().await
19+
}
20+
}
21+
1322
impl Client {
1423
pub async fn streaming_fetch(
1524
&self,
1625
consumer_name: String,
1726
subscription_id: String,
18-
) -> common::Result<UnboundedReceiverStream<(Payload, Responder)>> {
27+
) -> common::Result<ConsumerStream> {
1928
let url = self.lookup_subscription(subscription_id.clone()).await?;
2029
log::debug!("lookup subscription for {subscription_id}, url = {url}");
2130
let mut channel = self.channels.channel_at(url).await?;
@@ -47,7 +56,7 @@ impl Client {
4756
sender,
4857
));
4958

50-
Ok(UnboundedReceiverStream::new(receiver))
59+
Ok(ConsumerStream(UnboundedReceiverStream::new(receiver)))
5160
}
5261
}
5362

src/hstreamdb/src/lib.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,6 @@ pub use channel_provider::ChannelProviderSettings;
1616
pub use client::Client;
1717
pub use common::{
1818
CompressionType, Consumer, Error, ListValue, Payload, Record, RecordId, Result, ShardId,
19-
SpecialOffset, Stream, StreamShardOffset, Struct, Subscription, Timestamp,
19+
SpecialOffset, Stream, StreamShardOffset, Struct, Subscription, SubscriptionId, Timestamp,
2020
};
21+
pub use consumer::ConsumerStream;

src/hstreamdb/tests/common.rs

+29-2
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::env;
33
use hstreamdb::appender::Appender;
44
use hstreamdb::common::{CompressionType, SpecialOffset, Stream};
55
use hstreamdb::producer::{FlushSettings, Producer};
6-
use hstreamdb::{ChannelProviderSettings, Record, Subscription};
6+
use hstreamdb::{ChannelProviderSettings, ConsumerStream, Record, Subscription, SubscriptionId};
77
use hstreamdb_test_utils::rand_alphanumeric;
88

99
pub async fn init_client() -> anyhow::Result<Client> {
@@ -15,7 +15,15 @@ pub async fn init_client() -> anyhow::Result<Client> {
1515

1616
#[tokio::test(flavor = "multi_thread")]
1717
async fn make_ci_happy() {
18-
init_client().await.unwrap().0.list_streams().await.unwrap();
18+
let client = init_client().await.unwrap();
19+
let (stream, sub) = client.new_stream_subscription().await.unwrap();
20+
let mut consumer = client.new_consumer(sub.subscription_id).await.unwrap();
21+
let (appender, producer) = client.new_sync_producer(stream.stream_name).await.unwrap();
22+
appender.append(rand_raw_record(4500)).await.unwrap();
23+
producer.start().await;
24+
while let Some(x) = consumer.next().await {
25+
x.1.ack().unwrap();
26+
}
1927
}
2028

2129
pub struct Client(pub hstreamdb::Client);
@@ -110,4 +118,23 @@ impl Client {
110118
let subscription = self.new_subscription(stream_name).await?;
111119
Ok((stream, subscription))
112120
}
121+
122+
pub async fn new_consumer(
123+
&self,
124+
subscription_id: SubscriptionId,
125+
) -> anyhow::Result<ConsumerStream> {
126+
let fetching_stream = self
127+
.0
128+
.streaming_fetch(rand_alphanumeric(20), subscription_id)
129+
.await
130+
.unwrap();
131+
Ok(fetching_stream)
132+
}
133+
}
134+
135+
pub fn rand_raw_record(len: usize) -> Record {
136+
Record {
137+
partition_key: rand_alphanumeric(10),
138+
payload: hstreamdb::Payload::RawRecord(rand_alphanumeric(len).into_bytes()),
139+
}
113140
}

src/hstreamdb/tests/consumer_test.rs

-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ use hstreamdb::producer::FlushSettings;
66
use hstreamdb::{ChannelProviderSettings, Subscription};
77
use hstreamdb_pb::{SpecialOffset, Stream};
88
use hstreamdb_test_utils::rand_alphanumeric;
9-
use tokio_stream::StreamExt;
109

1110
#[tokio::test(flavor = "multi_thread")]
1211
async fn test_consumer() {

src/hstreamdb/tests/integration_test.rs

+19-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
mod common;
22

3-
use common::init_client;
3+
use common::{init_client, rand_raw_record};
44

55
#[tokio::test(flavor = "multi_thread")]
66
async fn utils_base_test() {
@@ -21,3 +21,21 @@ async fn utils_base_test() {
2121

2222
client.new_stream_subscription().await.unwrap();
2323
}
24+
25+
#[tokio::test(flavor = "multi_thread")]
26+
async fn sync_producer_should_be_sync() {
27+
let client = init_client().await.unwrap();
28+
29+
let (stream, sub) = client.new_stream_subscription().await.unwrap();
30+
let stream_name = &stream.stream_name;
31+
let (appender, producer) = client.new_sync_producer(stream_name).await.unwrap();
32+
let mut fetching_stream = client.new_consumer(sub.subscription_id).await.unwrap();
33+
34+
tokio::spawn(producer.start());
35+
36+
for _ in 0..50 {
37+
appender.append(rand_raw_record(200)).await.unwrap();
38+
let (_, responder) = fetching_stream.next().await.unwrap();
39+
responder.ack().unwrap()
40+
}
41+
}

src/x/hstreamdb-erl-nifs/src/lib.rs

-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ use rustler::{
2020
ResourceArc, Term,
2121
};
2222
use tokio::sync::{oneshot, Mutex, MutexGuard};
23-
use tokio_stream::StreamExt;
2423
use tonic::transport::{Certificate, ClientTlsConfig, Identity};
2524

2625
mod runtime;

0 commit comments

Comments
 (0)