Skip to content

Commit 03b6579

Browse files
authored
Merge pull request rust-lang#7494 from Turbo87/chaosproxy
Simplify `ChaosProxy` code
2 parents 7ab93b6 + 0fb65ef commit 03b6579

File tree

3 files changed

+66
-56
lines changed

3 files changed

+66
-56
lines changed

src/tests/server_binary.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ fn startup_without_database() {
4444
// Break the networking *before* starting the binary, to ensure the binary can fully startup
4545
// without a database connection. Most of crates.io should not work when started without a
4646
// database, but unconditional redirects will work.
47-
server_bin.chaosproxy.break_networking();
47+
server_bin.chaosproxy.break_networking().unwrap();
4848

4949
let running_server = server_bin.start().unwrap();
5050

src/tests/unhealthy_database.rs

+13-13
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,13 @@ fn download_crate_with_broken_networking_primary_database() {
2525
// do an unconditional redirect to the CDN, without checking whether the crate exists or what
2626
// the exact capitalization of crate name is.
2727

28-
app.primary_db_chaosproxy().break_networking();
28+
app.primary_db_chaosproxy().break_networking().unwrap();
2929
assert_unconditional_redirects(&anon);
3030

3131
// After restoring the network and waiting for the database pool to get healthy again redirects
3232
// should be checked again.
3333

34-
app.primary_db_chaosproxy().restore_networking();
34+
app.primary_db_chaosproxy().restore_networking().unwrap();
3535
app.as_inner()
3636
.primary_database
3737
.wait_until_healthy(DB_HEALTHY_TIMEOUT)
@@ -75,12 +75,12 @@ fn http_error_with_unhealthy_database() {
7575
let response = anon.get::<()>("/api/v1/summary");
7676
assert_eq!(response.status(), StatusCode::OK);
7777

78-
app.primary_db_chaosproxy().break_networking();
78+
app.primary_db_chaosproxy().break_networking().unwrap();
7979

8080
let response = anon.get::<()>("/api/v1/summary");
8181
assert_eq!(response.status(), StatusCode::SERVICE_UNAVAILABLE);
8282

83-
app.primary_db_chaosproxy().restore_networking();
83+
app.primary_db_chaosproxy().restore_networking().unwrap();
8484
app.as_inner()
8585
.primary_database
8686
.wait_until_healthy(DB_HEALTHY_TIMEOUT)
@@ -99,14 +99,14 @@ fn fallback_to_replica_returns_user_info() {
9999
.with_chaos_proxy()
100100
.with_user();
101101
app.db_new_user("foo");
102-
app.primary_db_chaosproxy().break_networking();
102+
app.primary_db_chaosproxy().break_networking().unwrap();
103103

104104
// When the primary database is down, requests are forwarded to the replica database
105105
let response = owner.get::<()>(URL);
106106
assert_eq!(response.status(), 200);
107107

108108
// restore primary database connection
109-
app.primary_db_chaosproxy().restore_networking();
109+
app.primary_db_chaosproxy().restore_networking().unwrap();
110110
app.as_inner()
111111
.primary_database
112112
.wait_until_healthy(DB_HEALTHY_TIMEOUT)
@@ -122,15 +122,15 @@ fn restored_replica_returns_user_info() {
122122
.with_chaos_proxy()
123123
.with_user();
124124
app.db_new_user("foo");
125-
app.primary_db_chaosproxy().break_networking();
126-
app.replica_db_chaosproxy().break_networking();
125+
app.primary_db_chaosproxy().break_networking().unwrap();
126+
app.replica_db_chaosproxy().break_networking().unwrap();
127127

128128
// When both primary and replica database are down, the request returns an error
129129
let response = owner.get::<()>(URL);
130130
assert_eq!(response.status(), StatusCode::SERVICE_UNAVAILABLE);
131131

132132
// Once the replica database is restored, it should serve as a fallback again
133-
app.replica_db_chaosproxy().restore_networking();
133+
app.replica_db_chaosproxy().restore_networking().unwrap();
134134
app.as_inner()
135135
.read_only_replica_database
136136
.as_ref()
@@ -142,7 +142,7 @@ fn restored_replica_returns_user_info() {
142142
assert_eq!(response.status(), StatusCode::OK);
143143

144144
// restore connection
145-
app.primary_db_chaosproxy().restore_networking();
145+
app.primary_db_chaosproxy().restore_networking().unwrap();
146146
app.as_inner()
147147
.primary_database
148148
.wait_until_healthy(DB_HEALTHY_TIMEOUT)
@@ -158,15 +158,15 @@ fn restored_primary_returns_user_info() {
158158
.with_chaos_proxy()
159159
.with_user();
160160
app.db_new_user("foo");
161-
app.primary_db_chaosproxy().break_networking();
162-
app.replica_db_chaosproxy().break_networking();
161+
app.primary_db_chaosproxy().break_networking().unwrap();
162+
app.replica_db_chaosproxy().break_networking().unwrap();
163163

164164
// When both primary and replica database are down, the request returns an error
165165
let response = owner.get::<()>(URL);
166166
assert_eq!(response.status(), StatusCode::SERVICE_UNAVAILABLE);
167167

168168
// Once the replica database is restored, it should serve as a fallback again
169-
app.primary_db_chaosproxy().restore_networking();
169+
app.primary_db_chaosproxy().restore_networking().unwrap();
170170
app.as_inner()
171171
.primary_database
172172
.wait_until_healthy(DB_HEALTHY_TIMEOUT)

src/tests/util/chaosproxy.rs

+52-42
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use anyhow::{Context, Error};
1+
use anyhow::{anyhow, Context};
22
use std::net::SocketAddr;
33
use std::sync::Arc;
44
use tokio::{
@@ -10,6 +10,7 @@ use tokio::{
1010
runtime::Runtime,
1111
sync::broadcast::Sender,
1212
};
13+
use tracing::error;
1314
use url::Url;
1415

1516
pub(crate) struct ChaosProxy {
@@ -23,7 +24,7 @@ pub(crate) struct ChaosProxy {
2324
}
2425

2526
impl ChaosProxy {
26-
pub(crate) fn new(backend_address: SocketAddr) -> Result<Arc<Self>, Error> {
27+
pub(crate) fn new(backend_address: SocketAddr) -> anyhow::Result<Arc<Self>> {
2728
let runtime = Runtime::new().expect("failed to create Tokio runtime");
2829
let listener = runtime.block_on(TcpListener::bind("127.0.0.1:0"))?;
2930

@@ -42,42 +43,49 @@ impl ChaosProxy {
4243

4344
let instance_clone = instance.clone();
4445
instance.runtime.spawn(async move {
45-
if let Err(err) = instance_clone.server_loop(listener).await {
46-
eprintln!("ChaosProxy server error: {err}");
46+
if let Err(error) = instance_clone.server_loop(listener).await {
47+
error!(%error, "ChaosProxy server error");
4748
}
4849
});
4950

5051
Ok(instance)
5152
}
5253

53-
pub(crate) fn proxy_database_url(url: &str) -> Result<(Arc<Self>, String), Error> {
54+
pub(crate) fn proxy_database_url(url: &str) -> anyhow::Result<(Arc<Self>, String)> {
5455
let mut db_url = Url::parse(url).context("failed to parse database url")?;
5556
let backend_addr = db_url
5657
.socket_addrs(|| Some(5432))
5758
.context("could not resolve database url")?
5859
.first()
5960
.copied()
60-
.ok_or_else(|| anyhow::anyhow!("the database url does not point to any IP"))?;
61+
.ok_or_else(|| anyhow!("the database url does not point to any IP"))?;
62+
63+
let instance = ChaosProxy::new(backend_addr)?;
64+
65+
db_url
66+
.set_ip_host(instance.address.ip())
67+
.map_err(|_| anyhow!("Failed to set IP host on the URL"))?;
68+
69+
db_url
70+
.set_port(Some(instance.address.port()))
71+
.map_err(|_| anyhow!("Failed to set post on the URL"))?;
6172

62-
let instance = ChaosProxy::new(backend_addr).unwrap();
63-
db_url.set_ip_host(instance.address.ip()).unwrap();
64-
db_url.set_port(Some(instance.address.port())).unwrap();
6573
Ok((instance, db_url.into()))
6674
}
6775

68-
pub(crate) fn break_networking(&self) {
76+
pub(crate) fn break_networking(&self) -> anyhow::Result<usize> {
6977
self.break_networking_send
7078
.send(())
71-
.expect("failed to send the break_networking message");
79+
.context("Failed to send the break_networking message")
7280
}
7381

74-
pub(crate) fn restore_networking(&self) {
82+
pub(crate) fn restore_networking(&self) -> anyhow::Result<usize> {
7583
self.restore_networking_send
7684
.send(())
77-
.expect("failed to send the restore_networking message");
85+
.context("Failed to send the restore_networking message")
7886
}
7987

80-
async fn server_loop(self: Arc<Self>, initial_listener: TcpListener) -> Result<(), Error> {
88+
async fn server_loop(&self, initial_listener: TcpListener) -> anyhow::Result<()> {
8189
let mut listener = Some(initial_listener);
8290

8391
let mut break_networking_recv = self.break_networking_send.subscribe();
@@ -87,7 +95,7 @@ impl ChaosProxy {
8795
if let Some(l) = &listener {
8896
tokio::select! {
8997
accepted = l.accept() => {
90-
self.clone().accept_connection(accepted?.0).await?;
98+
self.accept_connection(accepted?.0).await?;
9199
},
92100

93101
_ = break_networking_recv.recv() => {
@@ -104,51 +112,53 @@ impl ChaosProxy {
104112
}
105113
}
106114

107-
async fn accept_connection(self: Arc<Self>, accepted: TcpStream) -> Result<(), Error> {
115+
async fn accept_connection(&self, accepted: TcpStream) -> anyhow::Result<()> {
108116
let (client_read, client_write) = accepted.into_split();
109117
let (backend_read, backend_write) = TcpStream::connect(&self.backend_address)
110118
.await?
111119
.into_split();
112120

113-
let self_clone = self.clone();
121+
let break_networking_send = self.break_networking_send.clone();
114122
tokio::spawn(async move {
115-
if let Err(err) = self_clone.proxy_data(client_read, backend_write).await {
116-
eprintln!("ChaosProxy connection error: {err}");
123+
if let Err(error) = proxy_data(break_networking_send, client_read, backend_write).await
124+
{
125+
error!(%error, "ChaosProxy connection error");
117126
}
118127
});
119128

120-
let self_clone = self.clone();
129+
let break_networking_send = self.break_networking_send.clone();
121130
tokio::spawn(async move {
122-
if let Err(err) = self_clone.proxy_data(backend_read, client_write).await {
123-
eprintln!("ChaosProxy connection error: {err}");
131+
if let Err(error) = proxy_data(break_networking_send, backend_read, client_write).await
132+
{
133+
error!(%error, "ChaosProxy connection error");
124134
}
125135
});
126136

127137
Ok(())
128138
}
139+
}
129140

130-
async fn proxy_data(
131-
&self,
132-
mut from: OwnedReadHalf,
133-
mut to: OwnedWriteHalf,
134-
) -> Result<(), Error> {
135-
let mut break_connections_recv = self.break_networking_send.subscribe();
136-
let mut buf = [0; 1024];
137-
138-
loop {
139-
tokio::select! {
140-
len = from.read(&mut buf) => {
141-
let len = len?;
142-
if len == 0 {
143-
// EOF, the socket was closed
144-
return Ok(());
145-
}
146-
to.write_all(&buf[0..len]).await?;
147-
}
148-
_ = break_connections_recv.recv() => {
149-
to.shutdown().await?;
141+
async fn proxy_data(
142+
break_networking_send: Sender<()>,
143+
mut from: OwnedReadHalf,
144+
mut to: OwnedWriteHalf,
145+
) -> anyhow::Result<()> {
146+
let mut break_connections_recv = break_networking_send.subscribe();
147+
let mut buf = [0; 1024];
148+
149+
loop {
150+
tokio::select! {
151+
len = from.read(&mut buf) => {
152+
let len = len?;
153+
if len == 0 {
154+
// EOF, the socket was closed
150155
return Ok(());
151156
}
157+
to.write_all(&buf[0..len]).await?;
158+
}
159+
_ = break_connections_recv.recv() => {
160+
to.shutdown().await?;
161+
return Ok(());
152162
}
153163
}
154164
}

0 commit comments

Comments
 (0)