Skip to content

Commit aacf87d

Browse files
committed
fix: Change thread local context to allow overlapped scopes
1 parent 2e081cf commit aacf87d

File tree

2 files changed

+184
-37
lines changed

2 files changed

+184
-37
lines changed

opentelemetry/CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
- *Breaking* Changed value type of `Baggage` from `Value` to `StringValue`
1111
- Updated `Baggage` constants to reflect latest standard (`MAX_KEY_VALUE_PAIRS` - 180 -> 64, `MAX_BYTES_FOR_ONE_PAIR` - removed) and increased insert performance see #[2284](https://github.com/open-telemetry/opentelemetry-rust/pull/2284).
1212
- *Breaking* Align `Baggage.remove()` signature with `.get()` to take the key as a reference
13+
- Changed `Context` to use a stack to properly handle out of order dropping of `ContextGuard`. This imposes a limit of `65535` nested contexts on a single thread. See #[2378](https://github.com/open-telemetry/opentelemetry-rust/pull/2284) and #[1887](https://github.com/open-telemetry/opentelemetry-rust/issues/1887).
1314

1415
## 0.28.0
1516

opentelemetry/src/context.rs

+183-37
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use crate::otel_warn;
12
#[cfg(feature = "trace")]
23
use crate::trace::context::SynchronizedSpan;
34
use std::any::{Any, TypeId};
@@ -9,7 +10,7 @@ use std::marker::PhantomData;
910
use std::sync::Arc;
1011

1112
thread_local! {
12-
static CURRENT_CONTEXT: RefCell<Context> = RefCell::new(Context::default());
13+
static CURRENT_CONTEXT: RefCell<ContextStack> = RefCell::new(ContextStack::default());
1314
}
1415

1516
/// An execution-scoped collection of values.
@@ -122,7 +123,7 @@ impl Context {
122123
/// Note: This function will panic if you attempt to attach another context
123124
/// while the current one is still borrowed.
124125
pub fn map_current<T>(f: impl FnOnce(&Context) -> T) -> T {
125-
CURRENT_CONTEXT.with(|cx| f(&cx.borrow()))
126+
CURRENT_CONTEXT.with(|cx| cx.borrow().map_current_cx(f))
126127
}
127128

128129
/// Returns a clone of the current thread's context with the given value.
@@ -298,12 +299,10 @@ impl Context {
298299
/// assert_eq!(Context::current().get::<ValueA>(), None);
299300
/// ```
300301
pub fn attach(self) -> ContextGuard {
301-
let previous_cx = CURRENT_CONTEXT
302-
.try_with(|current| current.replace(self))
303-
.ok();
302+
let cx_id = CURRENT_CONTEXT.with(|cx| cx.borrow_mut().push(self));
304303

305304
ContextGuard {
306-
previous_cx,
305+
cx_pos: cx_id,
307306
_marker: PhantomData,
308307
}
309308
}
@@ -344,17 +343,19 @@ impl fmt::Debug for Context {
344343
}
345344

346345
/// A guard that resets the current context to the prior context when dropped.
347-
#[allow(missing_debug_implementations)]
346+
#[derive(Debug)]
348347
pub struct ContextGuard {
349-
previous_cx: Option<Context>,
350-
// ensure this type is !Send as it relies on thread locals
348+
// The position of the context in the stack. This is used to pop the context.
349+
cx_pos: u16,
350+
// Ensure this type is !Send as it relies on thread locals
351351
_marker: PhantomData<*const ()>,
352352
}
353353

354354
impl Drop for ContextGuard {
355355
fn drop(&mut self) {
356-
if let Some(previous_cx) = self.previous_cx.take() {
357-
let _ = CURRENT_CONTEXT.try_with(|current| current.replace(previous_cx));
356+
let id = self.cx_pos;
357+
if id > ContextStack::BASE_POS && id < ContextStack::MAX_POS {
358+
CURRENT_CONTEXT.with(|context_stack| context_stack.borrow_mut().pop_id(id));
358359
}
359360
}
360361
}
@@ -381,17 +382,116 @@ impl Hasher for IdHasher {
381382
}
382383
}
383384

385+
/// A stack for keeping track of the [`Context`] instances that have been attached
386+
/// to a thread.
387+
///
388+
/// The stack allows for popping of contexts by position, which is used to do out
389+
/// of order dropping of [`ContextGuard`] instances. Only when the top of the
390+
/// stack is popped, the topmost [`Context`] is actually restored.
391+
///
392+
/// The stack relies on the fact that it is thread local and that the
393+
/// [`ContextGuard`] instances that are constructed using ids from it can't be
394+
/// moved to other threads. That means that the ids are always valid and that
395+
/// they are always within the bounds of the stack.
396+
struct ContextStack {
397+
/// This is the current [`Context`] that is active on this thread, and the top
398+
/// of the [`ContextStack`]. It is always present, and if the `stack` is empty
399+
/// it's an empty [`Context`].
400+
///
401+
/// Having this here allows for fast access to the current [`Context`].
402+
current_cx: Context,
403+
/// A `stack` of the other contexts that have been attached to the thread.
404+
stack: Vec<Option<Context>>,
405+
/// Ensure this type is !Send as it relies on thread locals
406+
_marker: PhantomData<*const ()>,
407+
}
408+
409+
impl ContextStack {
410+
const BASE_POS: u16 = 0;
411+
const MAX_POS: u16 = u16::MAX;
412+
const INITIAL_CAPACITY: usize = 8;
413+
414+
#[inline(always)]
415+
fn push(&mut self, cx: Context) -> u16 {
416+
// The next id is the length of the `stack`, plus one since we have the
417+
// top of the [`ContextStack`] as the `current_cx`.
418+
let next_id = self.stack.len() + 1;
419+
if next_id < ContextStack::MAX_POS.into() {
420+
let current_cx = std::mem::replace(&mut self.current_cx, cx);
421+
self.stack.push(Some(current_cx));
422+
next_id as u16
423+
} else {
424+
// This is an overflow, log it and ignore it.
425+
otel_warn!(
426+
name: "Context.AttachFailed",
427+
message = format!("Too many contexts. Max limit is {}. \
428+
Context::current() remains unchanged as this attach failed. \
429+
Dropping the returned ContextGuard will have no impact on Context::current().",
430+
ContextStack::MAX_POS)
431+
);
432+
ContextStack::MAX_POS
433+
}
434+
}
435+
436+
#[inline(always)]
437+
fn pop_id(&mut self, pos: u16) {
438+
if pos == ContextStack::BASE_POS || pos == ContextStack::MAX_POS {
439+
// The empty context is always at the bottom of the [`ContextStack`]
440+
// and cannot be popped, and the overflow position is invalid, so do
441+
// nothing.
442+
return;
443+
}
444+
let len: u16 = self.stack.len() as u16;
445+
// Are we at the top of the [`ContextStack`]?
446+
if pos == len {
447+
// Shrink the stack if possible to clear out any out of order pops.
448+
while let Some(None) = self.stack.last() {
449+
_ = self.stack.pop();
450+
}
451+
// Restore the previous context. This will always happen since the
452+
// empty context is always at the bottom of the stack if the
453+
// [`ContextStack`] is not empty.
454+
if let Some(Some(next_cx)) = self.stack.pop() {
455+
self.current_cx = next_cx;
456+
}
457+
} else {
458+
// This is an out of order pop.
459+
if pos >= len {
460+
// This is an invalid id, ignore it.
461+
return;
462+
}
463+
// Clear out the entry at the given id.
464+
_ = self.stack[pos as usize].take();
465+
}
466+
}
467+
468+
#[inline(always)]
469+
fn map_current_cx<T>(&self, f: impl FnOnce(&Context) -> T) -> T {
470+
f(&self.current_cx)
471+
}
472+
}
473+
474+
impl Default for ContextStack {
475+
fn default() -> Self {
476+
ContextStack {
477+
current_cx: Context::default(),
478+
stack: Vec::with_capacity(ContextStack::INITIAL_CAPACITY),
479+
_marker: PhantomData,
480+
}
481+
}
482+
}
483+
384484
#[cfg(test)]
385485
mod tests {
386486
use super::*;
387487

488+
#[derive(Debug, PartialEq)]
489+
struct ValueA(u64);
490+
#[derive(Debug, PartialEq)]
491+
struct ValueB(u64);
492+
388493
#[test]
389494
fn context_immutable() {
390-
#[derive(Debug, PartialEq)]
391-
struct ValueA(u64);
392-
#[derive(Debug, PartialEq)]
393-
struct ValueB(u64);
394-
395495
// start with Current, which should be an empty context
396496
let cx = Context::current();
397497
assert_eq!(cx.get::<ValueA>(), None);
@@ -424,66 +524,56 @@ mod tests {
424524

425525
#[test]
426526
fn nested_contexts() {
427-
#[derive(Debug, PartialEq)]
428-
struct ValueA(&'static str);
429-
#[derive(Debug, PartialEq)]
430-
struct ValueB(u64);
431-
let _outer_guard = Context::new().with_value(ValueA("a")).attach();
527+
let _outer_guard = Context::new().with_value(ValueA(1)).attach();
432528

433529
// Only value `a` is set
434530
let current = Context::current();
435-
assert_eq!(current.get(), Some(&ValueA("a")));
531+
assert_eq!(current.get(), Some(&ValueA(1)));
436532
assert_eq!(current.get::<ValueB>(), None);
437533

438534
{
439535
let _inner_guard = Context::current_with_value(ValueB(42)).attach();
440536
// Both values are set in inner context
441537
let current = Context::current();
442-
assert_eq!(current.get(), Some(&ValueA("a")));
538+
assert_eq!(current.get(), Some(&ValueA(1)));
443539
assert_eq!(current.get(), Some(&ValueB(42)));
444540

445541
assert!(Context::map_current(|cx| {
446-
assert_eq!(cx.get(), Some(&ValueA("a")));
542+
assert_eq!(cx.get(), Some(&ValueA(1)));
447543
assert_eq!(cx.get(), Some(&ValueB(42)));
448544
true
449545
}));
450546
}
451547

452548
// Resets to only value `a` when inner guard is dropped
453549
let current = Context::current();
454-
assert_eq!(current.get(), Some(&ValueA("a")));
550+
assert_eq!(current.get(), Some(&ValueA(1)));
455551
assert_eq!(current.get::<ValueB>(), None);
456552

457553
assert!(Context::map_current(|cx| {
458-
assert_eq!(cx.get(), Some(&ValueA("a")));
554+
assert_eq!(cx.get(), Some(&ValueA(1)));
459555
assert_eq!(cx.get::<ValueB>(), None);
460556
true
461557
}));
462558
}
463559

464560
#[test]
465-
#[ignore = "overlapping contexts are not supported yet"]
466561
fn overlapping_contexts() {
467-
#[derive(Debug, PartialEq)]
468-
struct ValueA(&'static str);
469-
#[derive(Debug, PartialEq)]
470-
struct ValueB(u64);
471-
472-
let outer_guard = Context::new().with_value(ValueA("a")).attach();
562+
let outer_guard = Context::new().with_value(ValueA(1)).attach();
473563

474564
// Only value `a` is set
475565
let current = Context::current();
476-
assert_eq!(current.get(), Some(&ValueA("a")));
566+
assert_eq!(current.get(), Some(&ValueA(1)));
477567
assert_eq!(current.get::<ValueB>(), None);
478568

479569
let inner_guard = Context::current_with_value(ValueB(42)).attach();
480570
// Both values are set in inner context
481571
let current = Context::current();
482-
assert_eq!(current.get(), Some(&ValueA("a")));
572+
assert_eq!(current.get(), Some(&ValueA(1)));
483573
assert_eq!(current.get(), Some(&ValueB(42)));
484574

485575
assert!(Context::map_current(|cx| {
486-
assert_eq!(cx.get(), Some(&ValueA("a")));
576+
assert_eq!(cx.get(), Some(&ValueA(1)));
487577
assert_eq!(cx.get(), Some(&ValueB(42)));
488578
true
489579
}));
@@ -492,7 +582,7 @@ mod tests {
492582

493583
// `inner_guard` is still alive so both `ValueA` and `ValueB` should still be accessible
494584
let current = Context::current();
495-
assert_eq!(current.get(), Some(&ValueA("a")));
585+
assert_eq!(current.get(), Some(&ValueA(1)));
496586
assert_eq!(current.get(), Some(&ValueB(42)));
497587

498588
drop(inner_guard);
@@ -502,4 +592,60 @@ mod tests {
502592
assert_eq!(current.get::<ValueA>(), None);
503593
assert_eq!(current.get::<ValueB>(), None);
504594
}
595+
596+
#[test]
597+
fn too_many_contexts() {
598+
let mut guards: Vec<ContextGuard> = Vec::with_capacity(ContextStack::MAX_POS as usize);
599+
let stack_max_pos = ContextStack::MAX_POS as u64;
600+
// Fill the stack up until the last position
601+
for i in 1..stack_max_pos {
602+
let cx_guard = Context::current().with_value(ValueB(i)).attach();
603+
assert_eq!(Context::current().get(), Some(&ValueB(i)));
604+
assert_eq!(cx_guard.cx_pos, i as u16);
605+
guards.push(cx_guard);
606+
}
607+
// Let's overflow the stack a couple of times
608+
for _ in 0..16 {
609+
let cx_guard = Context::current().with_value(ValueA(1)).attach();
610+
assert_eq!(cx_guard.cx_pos, ContextStack::MAX_POS);
611+
assert_eq!(Context::current().get::<ValueA>(), None);
612+
assert_eq!(Context::current().get(), Some(&ValueB(stack_max_pos - 1)));
613+
guards.push(cx_guard);
614+
}
615+
// Drop the overflow contexts
616+
for _ in 0..16 {
617+
guards.pop();
618+
assert_eq!(Context::current().get::<ValueA>(), None);
619+
assert_eq!(Context::current().get(), Some(&ValueB(stack_max_pos - 1)));
620+
}
621+
// Drop one more so we can add a new one
622+
guards.pop();
623+
assert_eq!(Context::current().get::<ValueA>(), None);
624+
assert_eq!(Context::current().get(), Some(&ValueB(stack_max_pos - 2)));
625+
// Push a new context and see that it works
626+
let cx_guard = Context::current().with_value(ValueA(2)).attach();
627+
assert_eq!(cx_guard.cx_pos, ContextStack::MAX_POS - 1);
628+
assert_eq!(Context::current().get(), Some(&ValueA(2)));
629+
assert_eq!(Context::current().get(), Some(&ValueB(stack_max_pos - 2)));
630+
guards.push(cx_guard);
631+
// Let's overflow the stack a couple of times again
632+
for _ in 0..16 {
633+
let cx_guard = Context::current().with_value(ValueA(1)).attach();
634+
assert_eq!(cx_guard.cx_pos, ContextStack::MAX_POS);
635+
assert_eq!(Context::current().get(), Some(&ValueA(2)));
636+
assert_eq!(Context::current().get(), Some(&ValueB(stack_max_pos - 2)));
637+
guards.push(cx_guard);
638+
}
639+
}
640+
641+
#[test]
642+
fn context_stack_pop_id() {
643+
// This is to get full line coverage of the `pop_id` function.
644+
// In real life the `Drop`` implementation of `ContextGuard` ensures that
645+
// the ids are valid and inside the bounds.
646+
let mut stack = ContextStack::default();
647+
stack.pop_id(ContextStack::BASE_POS);
648+
stack.pop_id(ContextStack::MAX_POS);
649+
stack.pop_id(4711);
650+
}
505651
}

0 commit comments

Comments
 (0)