Skip to content

Commit 8e4023f

Browse files
committed
Improve handling of closed channels.
1 parent 0f3b770 commit 8e4023f

File tree

4 files changed

+25
-12
lines changed

4 files changed

+25
-12
lines changed

crates/database/src/audit_provider.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -100,23 +100,25 @@ where
100100
let mut rows = stmt.query([])?;
101101

102102
while let Some(row) = rows.next()? {
103+
if (tx.is_closed()) {
104+
break;
105+
}
103106
let row: AuditRow = row.try_into()?;
104107
let record: AuditRecord = row.try_into()?;
105108
let inner_tx = tx.clone();
106-
futures::executor::block_on(async move {
107-
if let Err(e) =
108-
inner_tx.send(Ok(record.event)).await
109-
{
110-
tracing::error!(error = %e);
111-
}
109+
let res = futures::executor::block_on(async move {
110+
inner_tx.send(Ok(record.event)).await
112111
});
112+
if let Err(e) = res {
113+
tracing::error!(error = %e);
114+
break;
115+
}
113116
}
114117

115118
Ok::<_, Error>(())
116119
})
117120
.await
118121
.map_err(Error::from)?;
119-
120122
Ok::<_, Self::Error>(())
121123
});
122124

crates/database/src/event_log.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -393,14 +393,19 @@ where
393393
})?;
394394

395395
for row in rows {
396+
if tx.is_closed() {
397+
break;
398+
}
396399
let row = row?;
397400
let record: EventRecord = row.try_into()?;
398-
let sender = tx.clone();
399-
futures::executor::block_on(async move {
400-
if let Err(err) = sender.send(Ok(record)).await {
401-
tracing::error!(error = %err);
402-
}
401+
let inner_tx = tx.clone();
402+
let res = futures::executor::block_on(async move {
403+
inner_tx.send(Ok(record)).await
403404
});
405+
if let Err(e) = res {
406+
tracing::error!(error = %e);
407+
break;
408+
}
404409
}
405410

406411
Ok::<_, Error>(())

crates/filesystem/src/audit_provider.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,9 @@ where
222222
let it_file = self.file.clone();
223223
tokio::task::spawn(async move {
224224
while let Some(record) = it.next().await? {
225+
if tx.is_closed() {
226+
break;
227+
}
225228
let mut inner = it_file.lock().await;
226229
let event = inner.read_event(&record).await?;
227230
if let Err(e) = tx.send(Ok(event)).await {

crates/filesystem/src/event_log.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,9 @@ where
142142
let file_path = self.data.clone();
143143
tokio::task::spawn(async move {
144144
while let Some(record) = it.next().await? {
145+
if tx.is_closed() {
146+
break;
147+
}
145148
let event_buffer =
146149
read_event_buffer(file_path.clone(), &record).await?;
147150
let event_record = record.into_event_record(event_buffer);

0 commit comments

Comments
 (0)