Skip to content

Commit 9b570e2

Browse files
authored
feat: Change interface to shutdown/close same as python (#79)
1 parent 0e99cdd commit 9b570e2

File tree

3 files changed

+55
-38
lines changed

3 files changed

+55
-38
lines changed

src/client.rs

Lines changed: 26 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use std::env;
44
use std::ffi::{OsStr, OsString};
55
use std::fmt;
66
use std::sync::Arc;
7+
use std::sync::RwLock;
78
use std::time::Duration;
89

910
use rand::random;
@@ -21,10 +22,9 @@ use transport::{DefaultTransportFactory, Transport, TransportFactory};
2122
use utils::{debug_images, server_name};
2223

2324
/// The Sentry client object.
24-
#[derive(Clone)]
2525
pub struct Client {
2626
options: ClientOptions,
27-
transport: Option<Arc<Box<Transport>>>,
27+
transport: RwLock<Option<Arc<Box<Transport>>>>,
2828
}
2929

3030
impl fmt::Debug for Client {
@@ -36,6 +36,15 @@ impl fmt::Debug for Client {
3636
}
3737
}
3838

39+
impl Clone for Client {
40+
fn clone(&self) -> Client {
41+
Client {
42+
options: self.options.clone(),
43+
transport: RwLock::new(self.transport.read().unwrap().clone()),
44+
}
45+
}
46+
}
47+
3948
/// Type alias for before event/breadcrumb handlers.
4049
pub type BeforeCallback<T> = Arc<Box<Fn(T) -> Option<T> + Send + Sync>>;
4150

@@ -81,8 +90,8 @@ pub struct ClientOptions {
8190
/// This will default to the `HTTPS_PROXY` environment variable
8291
/// or `http_proxy` if that one exists.
8392
pub https_proxy: Option<Cow<'static, str>>,
84-
/// The timeout on client drop for draining events.
85-
pub shutdown_timeout: Option<Duration>,
93+
/// The timeout on client drop for draining events on shutdown.
94+
pub shutdown_timeout: Duration,
8695
/// Attaches stacktraces to messages.
8796
pub attach_stacktrace: bool,
8897
/// If turned on some default PII informat is attached.
@@ -185,7 +194,7 @@ impl Default for ClientOptions {
185194
.map(Cow::Owned)
186195
.or_else(|| env::var("HTTPS_PROXY").ok().map(Cow::Owned))
187196
.or_else(|| env::var("http_proxy").ok().map(Cow::Owned)),
188-
shutdown_timeout: Some(Duration::from_secs(2)),
197+
shutdown_timeout: Duration::from_secs(2),
189198
attach_stacktrace: false,
190199
send_default_pii: false,
191200
before_send: None,
@@ -338,11 +347,11 @@ impl Client {
338347
/// disabled.
339348
pub fn with_options(options: ClientOptions) -> Client {
340349
#[cfg_attr(feature = "cargo-clippy", allow(question_mark))]
341-
let transport = if options.dsn.is_none() {
350+
let transport = RwLock::new(if options.dsn.is_none() {
342351
None
343352
} else {
344353
Some(Arc::new(options.transport.create_transport(&options)))
345-
};
354+
});
346355
Client { options, transport }
347356
}
348357

@@ -364,7 +373,7 @@ impl Client {
364373
pub fn disabled_with_options(options: ClientOptions) -> Client {
365374
Client {
366375
options,
367-
transport: None,
376+
transport: RwLock::new(None),
368377
}
369378
}
370379

@@ -517,7 +526,7 @@ impl Client {
517526

518527
/// Captures an event and sends it to sentry.
519528
pub fn capture_event(&self, event: Event<'static>, scope: Option<&Scope>) -> Uuid {
520-
if let Some(ref transport) = self.transport {
529+
if let Some(ref transport) = *self.transport.read().unwrap() {
521530
if self.sample_should_send() {
522531
if let Some(event) = self.prepare_event(event, scope) {
523532
let event_id = event.id.unwrap();
@@ -529,14 +538,16 @@ impl Client {
529538
Default::default()
530539
}
531540

532-
/// Drains all pending events up to the current time.
541+
/// Drains all pending events and shuts down the transport behind the
542+
/// client. After shutting down the transport is removed.
533543
///
534544
/// This returns `true` if the queue was successfully drained in the
535545
/// given time or `false` if not (for instance because of a timeout).
536-
/// If no timeout is provided the client will wait forever.
537-
pub fn drain_events(&self, timeout: Option<Duration>) -> bool {
538-
if let Some(ref transport) = self.transport {
539-
transport.drain(timeout)
546+
/// If no timeout is provided the client will wait for as long a
547+
/// `shutdown_timeout` in the client options.
548+
pub fn close(&self, timeout: Option<Duration>) -> bool {
549+
if let Some(transport) = self.transport.write().unwrap().take() {
550+
transport.shutdown(timeout.unwrap_or(self.options.shutdown_timeout))
540551
} else {
541552
true
542553
}
@@ -559,7 +570,7 @@ pub struct ClientInitGuard(Arc<Client>);
559570

560571
impl Drop for ClientInitGuard {
561572
fn drop(&mut self) {
562-
self.0.drain_events(self.0.options.shutdown_timeout);
573+
self.0.close(None);
563574
}
564575
}
565576

src/hub.rs

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -311,15 +311,6 @@ impl Hub {
311311
}}
312312
}
313313

314-
/// Drains the currently pending events.
315-
pub fn drain_events(&self, timeout: Option<Duration>) {
316-
with_client_impl! {{
317-
if let Some(ref client) = self.client() {
318-
client.drain_events(timeout);
319-
}
320-
}}
321-
}
322-
323314
/// Returns the currently bound client.
324315
#[cfg(feature = "with_client_implementation")]
325316
pub fn client(&self) -> Option<Arc<Client>> {

src/transport.rs

Lines changed: 29 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use std::sync::atomic::{AtomicBool, Ordering};
12
use std::sync::mpsc::{sync_channel, Receiver, SyncSender};
23
use std::sync::{Arc, Condvar, Mutex};
34
use std::thread::{self, JoinHandle};
@@ -22,9 +23,9 @@ pub trait Transport: Send + Sync + 'static {
2223
/// Drains the queue if there is one.
2324
///
2425
/// The default implementation does nothing. If the queue was successfully
25-
/// drained the return value should be `true` or `false` if events were
26+
/// shutdowned the return value should be `true` or `false` if events were
2627
/// left in it.
27-
fn drain(&self, timeout: Option<Duration>) -> bool {
28+
fn shutdown(&self, timeout: Duration) -> bool {
2829
let _timeout = timeout;
2930
true
3031
}
@@ -72,8 +73,8 @@ impl<T: Transport> Transport for Arc<T> {
7273
(**self).send_event(event)
7374
}
7475

75-
fn drain(&self, timeout: Option<Duration>) -> bool {
76-
(**self).drain(timeout)
76+
fn shutdown(&self, timeout: Duration) -> bool {
77+
(**self).shutdown(timeout)
7778
}
7879
}
7980

@@ -102,7 +103,8 @@ impl TransportFactory for DefaultTransportFactory {
102103
pub struct HttpTransport {
103104
dsn: Dsn,
104105
sender: Mutex<SyncSender<Option<Event<'static>>>>,
105-
drain_signal: Arc<Condvar>,
106+
shutdown_signal: Arc<Condvar>,
107+
shutdown_immediately: Arc<AtomicBool>,
106108
queue_size: Arc<Mutex<usize>>,
107109
_handle: Option<JoinHandle<()>>,
108110
}
@@ -112,6 +114,7 @@ fn spawn_http_sender(
112114
receiver: Receiver<Option<Event<'static>>>,
113115
dsn: Dsn,
114116
signal: Arc<Condvar>,
117+
shutdown_immediately: Arc<AtomicBool>,
115118
queue_size: Arc<Mutex<usize>>,
116119
user_agent: String,
117120
) -> JoinHandle<()> {
@@ -121,6 +124,14 @@ fn spawn_http_sender(
121124
let url = dsn.store_api_url().to_string();
122125

123126
while let Some(event) = receiver.recv().unwrap_or(None) {
127+
// on drop we want to not continue processing the queue.
128+
if shutdown_immediately.load(Ordering::SeqCst) {
129+
let mut size = queue_size.lock().unwrap();
130+
*size = 0;
131+
signal.notify_all();
132+
break;
133+
}
134+
124135
// while we are disabled due to rate limits, skip
125136
match disabled {
126137
Some((disabled_at, RetryAfter::Delay(disabled_for))) => {
@@ -176,7 +187,8 @@ impl HttpTransport {
176187
let https_proxy = options.https_proxy.as_ref().map(|x| x.to_string());
177188

178189
let (sender, receiver) = sync_channel(30);
179-
let drain_signal = Arc::new(Condvar::new());
190+
let shutdown_signal = Arc::new(Condvar::new());
191+
let shutdown_immediately = Arc::new(AtomicBool::new(false));
180192
#[cfg_attr(feature = "cargo-clippy", allow(mutex_atomic))]
181193
let queue_size = Arc::new(Mutex::new(0));
182194
let mut client = Client::builder();
@@ -190,14 +202,16 @@ impl HttpTransport {
190202
client.build().unwrap(),
191203
receiver,
192204
dsn.clone(),
193-
drain_signal.clone(),
205+
shutdown_signal.clone(),
206+
shutdown_immediately.clone(),
194207
queue_size.clone(),
195208
user_agent,
196209
));
197210
HttpTransport {
198211
dsn,
199212
sender: Mutex::new(sender),
200-
drain_signal,
213+
shutdown_signal,
214+
shutdown_immediately: shutdown_immediately,
201215
queue_size,
202216
_handle,
203217
}
@@ -215,21 +229,22 @@ impl Transport for HttpTransport {
215229
}
216230
}
217231

218-
fn drain(&self, timeout: Option<Duration>) -> bool {
232+
fn shutdown(&self, timeout: Duration) -> bool {
219233
let guard = self.queue_size.lock().unwrap();
220234
if *guard == 0 {
221-
return true;
222-
}
223-
if let Some(timeout) = timeout {
224-
self.drain_signal.wait_timeout(guard, timeout).is_ok()
235+
true
225236
} else {
226-
self.drain_signal.wait(guard).is_ok()
237+
if let Ok(sender) = self.sender.lock() {
238+
sender.send(None).ok();
239+
}
240+
self.shutdown_signal.wait_timeout(guard, timeout).is_ok()
227241
}
228242
}
229243
}
230244

231245
impl Drop for HttpTransport {
232246
fn drop(&mut self) {
247+
self.shutdown_immediately.store(true, Ordering::SeqCst);
233248
if let Ok(sender) = self.sender.lock() {
234249
sender.send(None).ok();
235250
}

0 commit comments

Comments
 (0)