Skip to content

Commit 43170f2

Browse files
committed
tests: init utils
1 parent 31ee6a3 commit 43170f2

File tree

4 files changed

+172
-26
lines changed

4 files changed

+172
-26
lines changed

src/hstreamdb/src/producer.rs

+8
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,14 @@ impl FlushSettings {
7171
pub fn builder() -> FlushSettingsBuilder {
7272
default()
7373
}
74+
75+
pub fn sync() -> Self {
76+
Self {
77+
len: 0,
78+
size: 0,
79+
deadline: None,
80+
}
81+
}
7482
}
7583

7684
#[derive(Default)]

src/hstreamdb/tests/common.rs

+108
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
use std::env;
2+
3+
use hstreamdb::appender::Appender;
4+
use hstreamdb::common::{CompressionType, SpecialOffset, Stream};
5+
use hstreamdb::producer::{FlushSettings, Producer};
6+
use hstreamdb::{ChannelProviderSettings, Record, Subscription};
7+
use hstreamdb_test_utils::rand_alphanumeric;
8+
9+
pub async fn init_client() -> anyhow::Result<Client> {
10+
let server_url = env::var("TEST_SERVER_ADDR").unwrap();
11+
let channel_provider_settings = ChannelProviderSettings::default();
12+
let client = hstreamdb::Client::new(server_url, channel_provider_settings).await?;
13+
Ok(Client(client))
14+
}
15+
16+
pub struct Client(hstreamdb::Client);
17+
18+
impl Client {
19+
pub async fn new_stream(&self) -> anyhow::Result<Stream> {
20+
let stream_name = rand_alphanumeric(20);
21+
let stream = self
22+
.0
23+
.create_stream(Stream {
24+
stream_name,
25+
replication_factor: 3,
26+
backlog_duration: 60 * 60 * 24,
27+
shard_count: 20,
28+
creation_time: None,
29+
})
30+
.await?;
31+
Ok(stream)
32+
}
33+
34+
pub async fn new_subscription<T: Into<String>>(
35+
&self,
36+
stream_name: T,
37+
) -> anyhow::Result<Subscription> {
38+
let subscription = self
39+
.0
40+
.create_subscription(Subscription {
41+
subscription_id: rand_alphanumeric(20),
42+
stream_name: stream_name.into(),
43+
ack_timeout_seconds: 60 * 15,
44+
max_unacked_records: 1000,
45+
offset: SpecialOffset::Earliest,
46+
creation_time: None,
47+
})
48+
.await?;
49+
Ok(subscription)
50+
}
51+
52+
pub async fn new_sync_producer<T: Into<String>>(
53+
&self,
54+
stream_name: T,
55+
) -> anyhow::Result<(Appender, Producer)> {
56+
let producer = self
57+
.0
58+
.new_producer(
59+
stream_name.into(),
60+
CompressionType::None,
61+
None,
62+
FlushSettings::sync(),
63+
ChannelProviderSettings::default(),
64+
None,
65+
)
66+
.await?;
67+
Ok(producer)
68+
}
69+
70+
pub async fn write_rand<T: Into<String>>(
71+
&self,
72+
stream_name: T,
73+
appender_num: usize,
74+
record_num: usize,
75+
payload_size: usize,
76+
) -> anyhow::Result<()> {
77+
let (appender, producer) = self.new_sync_producer(stream_name).await?;
78+
79+
for _ in 0..appender_num {
80+
let appender = appender.clone();
81+
tokio::spawn(async move {
82+
for _ in 0..record_num {
83+
let payload = rand_alphanumeric(payload_size);
84+
match appender
85+
.append(Record {
86+
partition_key: "".to_string(),
87+
payload: hstreamdb::Payload::RawRecord(payload.into_bytes()),
88+
})
89+
.await
90+
{
91+
Ok(_) => (),
92+
Err(err) => log::error!("{}", err),
93+
};
94+
}
95+
});
96+
}
97+
drop(appender);
98+
producer.start().await;
99+
Ok(())
100+
}
101+
102+
pub async fn new_stream_subscription(&self) -> anyhow::Result<(Stream, Subscription)> {
103+
let stream = self.new_stream().await?;
104+
let stream_name = stream.stream_name.clone();
105+
let subscription = self.new_subscription(stream_name).await?;
106+
Ok((stream, subscription))
107+
}
108+
}
+23
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
mod common;
2+
3+
use common::init_client;
4+
5+
#[tokio::test(flavor = "multi_thread")]
6+
async fn utils_base_test() {
7+
let client = init_client().await.unwrap();
8+
9+
let stream = client.new_stream().await.unwrap();
10+
let stream_name = &stream.stream_name;
11+
let _subscription = client.new_subscription(stream_name).await.unwrap();
12+
let _producer = client.new_sync_producer(stream_name).await.unwrap();
13+
14+
let appender_num = 5;
15+
let record_num = 50;
16+
let payload_size = 2000;
17+
client
18+
.write_rand(stream_name, appender_num, record_num, payload_size)
19+
.await
20+
.unwrap();
21+
22+
client.new_stream_subscription().await.unwrap();
23+
}

src/hstreamdb/tests/tls_test.rs

+33-26
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,39 @@
1-
// use std::{env, fs};
1+
use std::env;
22

3-
// use hstreamdb::tls::{Certificate, ClientTlsConfig, Identity};
4-
// use hstreamdb::{ChannelProviderSettings, Client};
3+
use hstreamdb::tls::{Certificate, ClientTlsConfig, Identity};
4+
use hstreamdb::{ChannelProviderSettings, Client};
55

6-
// async fn _test_tls_impl() {
7-
// env::set_var("RUST_LOG", "DEBUG");
8-
// env_logger::init();
6+
#[tokio::test(flavor = "multi_thread")]
7+
async fn test_tls() {
8+
if let Ok(_) = env::var("ENDPOINT") {
9+
test_tls_impl().await
10+
} else {
11+
log::warn!("cloud endpoint is not presented");
12+
log::warn!("ignore tls tests");
13+
}
14+
}
915

10-
// let server_url: &str = todo!();
11-
// let tls_dir: &str = todo!();
16+
async fn test_tls_impl() {
17+
env::set_var("RUST_LOG", "DEBUG");
18+
env_logger::init();
1219

13-
// let ca_certificate =
14-
// Certificate::from_pem(fs::read(format!("{tls_dir}/root_ca.crt")).unwrap());
15-
// let cert = fs::read(format!("{tls_dir}/client.crt")).unwrap();
16-
// let key = fs::read(format!("{tls_dir}/client.key")).unwrap();
20+
let server_url: String = env::var("ENDPOINT").unwrap();
21+
let ca_certificate: String = env::var("ROOT_CA").unwrap();
22+
let cert = env::var("CLIENT_CRT").unwrap();
23+
let key = env::var("CLIENT_KEY").unwrap();
1724

18-
// let client = Client::new(
19-
// server_url,
20-
// ChannelProviderSettings::builder()
21-
// .set_tls_config(
22-
// ClientTlsConfig::new()
23-
// .ca_certificate(ca_certificate)
24-
// .identity(Identity::from_pem(cert, key)),
25-
// )
26-
// .build(),
27-
// )
28-
// .await
29-
// .unwrap();
25+
let client = Client::new(
26+
server_url,
27+
ChannelProviderSettings::builder()
28+
.set_tls_config(
29+
ClientTlsConfig::new()
30+
.ca_certificate(Certificate::from_pem(ca_certificate))
31+
.identity(Identity::from_pem(cert, key)),
32+
)
33+
.build(),
34+
)
35+
.await
36+
.unwrap();
3037

31-
// log::info!("{:?}", client.list_streams().await.unwrap());
32-
// }
38+
log::info!("{:?}", client.list_streams().await.unwrap());
39+
}

0 commit comments

Comments
 (0)