|
| 1 | +# Concurrent Size |
| 2 | + |
| 3 | +We recently added a `Size` counter abstraction for keeping track of the size of |
| 4 | +a linearizable data structure. The design is very much inspired by the approach |
| 5 | +described in the paper [Concurrent Size](https://arxiv.org/pdf/2209.07100.pdf) |
| 6 | +by Gal Sela and Erez Petrank. Roughly speaking, their approach uses an array of |
| 7 | +counters, updated using wait-free operations, and wait-free snapshotting of the |
| 8 | +counters to compute the size. This is a clever approach and performs well. In |
| 9 | +this document we'll briefly motivate and describe the changes we made in our |
| 10 | +version. The rest of this document assumes that you are largely familiar with |
| 11 | +the approach presented in the paper. |
| 12 | + |
| 13 | +## The problem |
| 14 | + |
| 15 | +Unfortunately, the wait-free counter updates of |
| 16 | +[Concurrent Size](https://arxiv.org/pdf/2209.07100.pdf) require that there is a |
| 17 | +unique counter for each independent thread of execution. In addition to allowing |
| 18 | +scalability, this is a key requirement for making the counter updates, i.e. |
| 19 | +increments and decrements, idempotent such that multiple threads may safely |
| 20 | +attempt to complete a single counter update in parallel. |
| 21 | + |
| 22 | +While it is definitely possible to create applications where a hard limit on the |
| 23 | +number of threads of execution can be enforced, it is arguably too strong a |
| 24 | +requirement for a general purpose abstraction. Consider the architecture of |
| 25 | +OCaml. While it makes sense to limit the number of domains, and, at the time of |
| 26 | +writing this, OCaml actually has a hard limit on the number of domains, OCaml |
| 27 | +also allows running relatively large numbers of concurrent (sys)threads on those |
| 28 | +domains. This means that the number of independent threads of execution can be |
| 29 | +impractically large to have a counter for each thread of execution. |
| 30 | + |
| 31 | +## Idempotent lock-free counter increment |
| 32 | + |
| 33 | +Unlike in [Concurrent Size](https://arxiv.org/pdf/2209.07100.pdf) we do not |
| 34 | +strictly require each thread of execution to have its own counter. Rather, we |
| 35 | +only _try_ to have one counter per domain to allow scalable counter updates. |
| 36 | +This means that counter updates can no longer use the wait-free technique |
| 37 | +described in the paper, because it is possible, for example, that two or more |
| 38 | +threads within a domain might concurrently try to update the counter allocated |
| 39 | +for the domain. Instead, we use a kind of lock-free transactional approach to |
| 40 | +performing counter updates. |
| 41 | + |
| 42 | +To understand how the technique works, it is perhaps helpful to first look at a |
| 43 | +simplified version of the algorithm for performing idempotent lock-free counter |
| 44 | +increments. |
| 45 | + |
| 46 | +Below is a commented signature of what we are trying to achieve: |
| 47 | + |
| 48 | +```ocaml |
| 49 | +module type Counter = sig |
| 50 | + type t |
| 51 | + (** Represents a counter. *) |
| 52 | +
|
| 53 | + val create : unit -> t |
| 54 | + (** [create ()] returns a new counter with an initial value of [0]. *) |
| 55 | +
|
| 56 | + val get : t -> int |
| 57 | + (** [get counter] returns the current value of the [counter]. *) |
| 58 | +
|
| 59 | + type increment |
| 60 | + (** Represents an increment operation that can be performed to increment a |
| 61 | + counter at most once. *) |
| 62 | +
|
| 63 | + val new_increment : unit -> increment |
| 64 | + (** [new_increment ()] returns a new increment operation. *) |
| 65 | +
|
| 66 | + val perform : t -> increment -> unit |
| 67 | + (** [perform counter increment] increments the [counter] or does nothing in |
| 68 | + case the [increment] operation has already been performed on the |
| 69 | + [counter]. |
| 70 | +
|
| 71 | + Note that an [increment] must only be used to target one specific |
| 72 | + [counter]. *) |
| 73 | +end |
| 74 | +``` |
| 75 | + |
| 76 | +From the signature we can read the idea is that one can create an `increment` |
| 77 | +operation that can then later be performed to increment a counter at most once. |
| 78 | + |
| 79 | +The implementation is mostly straightforward: |
| 80 | + |
| 81 | +```ocaml |
| 82 | +module Make_counter (Atomic : sig |
| 83 | + type 'a t |
| 84 | + val make : 'a -> 'a t |
| 85 | + val get : 'a t -> 'a |
| 86 | + val compare_and_set : 'a t -> 'a -> 'a -> bool |
| 87 | +end) : Counter = struct |
| 88 | + type increment = [ `New | `Done ] ref |
| 89 | + type counter = { value : int; increment : increment } |
| 90 | + type t = counter Atomic.t |
| 91 | +
|
| 92 | + let create () = Atomic.make { value = 0; increment = ref `Done } |
| 93 | +
|
| 94 | + let get counter = |
| 95 | + let tx = Atomic.get counter in |
| 96 | + tx.value |
| 97 | +
|
| 98 | + let new_increment () = ref `New |
| 99 | +
|
| 100 | + let rec perform counter increment = |
| 101 | + let before = Atomic.get counter in |
| 102 | + match !increment with |
| 103 | + | `Done -> () |
| 104 | + | `New -> |
| 105 | + if increment != before.increment then begin |
| 106 | + before.increment := `Done; |
| 107 | + let after = { value = before.value + 1; increment } in |
| 108 | + if not (Atomic.compare_and_set counter before after) then |
| 109 | + perform counter increment |
| 110 | + end |
| 111 | +end |
| 112 | +``` |
| 113 | + |
| 114 | +The tricky bit, of course, is the `perform` operation. It needs to ensure that |
| 115 | +an `increment` operation against a `counter` takes effect at most once. To check |
| 116 | +that the given increment had not been done before reading the counter, `perform` |
| 117 | +checks that the increment isn't marked as done and that the counter isn't |
| 118 | +pointing to the increment. If that is the case, it is safe to try to increment |
| 119 | +the counter after ensuring that the previous increment has been marked as done. |
| 120 | +It is possible for the `compare_and_set` to fail due to a concurrent or parallel |
| 121 | +update attempt with same or some other increment. If that happens, the operation |
| 122 | +needs to be retried. |
| 123 | + |
| 124 | +> As an aside, note that in the real size counting mechanism, a single counter |
| 125 | +> is allocated per domain. This should make `compare_and_set` failures extremely |
| 126 | +> rare and increments should be practically, but not technically, wait-free. |
| 127 | +
|
| 128 | +We can check that the logic works when we perform an increment from a single |
| 129 | +thread of execution: |
| 130 | + |
| 131 | +```ocaml |
| 132 | +# let module Counter = Make_counter (Atomic) in |
| 133 | + let counter = Counter.create () in |
| 134 | + let first = Counter.get counter in |
| 135 | + let increment = Counter.new_increment () in |
| 136 | + Counter.perform counter increment; |
| 137 | + let second = Counter.get counter in |
| 138 | + Counter.perform counter increment; |
| 139 | + let third = Counter.get counter in |
| 140 | + (first, second, third) |
| 141 | +- : int * int * int = (0, 1, 1) |
| 142 | +``` |
| 143 | + |
| 144 | +To build more confidence, we can e.g. use the |
| 145 | +[DSCheck](https://github.com/ocaml-multicore/dscheck) model checker to check |
| 146 | +that interleavings of atomic operations lead to the expected outcome: |
| 147 | + |
| 148 | +```ocaml |
| 149 | +# let module Counter = Make_counter (Dscheck.TracedAtomic) in |
| 150 | + Dscheck.TracedAtomic.trace @@ fun () -> |
| 151 | + let counter = Counter.create () in |
| 152 | + let increments = |
| 153 | + Array.init 2 @@ fun _ -> |
| 154 | + Counter.new_increment () in |
| 155 | + for _=1 to 2 do |
| 156 | + Dscheck.TracedAtomic.spawn @@ fun () -> |
| 157 | + Array.iter (Counter.perform counter) increments |
| 158 | + done; |
| 159 | + Dscheck.TracedAtomic.final @@ fun () -> |
| 160 | + Dscheck.TracedAtomic.check @@ fun () -> |
| 161 | + Counter.get counter = Array.length increments |
| 162 | +- : unit = () |
| 163 | +``` |
| 164 | + |
| 165 | +Hopefully this simplified version of an internal counter used in the size |
| 166 | +counting mechanism helps to understand how the real thing works. |
| 167 | + |
| 168 | +## Foo |
| 169 | + |
| 170 | +```ocaml |
| 171 | +module Snapshot = struct |
| 172 | + type t = { |
| 173 | + status : [ `Collecting | `Computing | `Value of int ] Atomic.t; |
| 174 | + counters : int Atomic.t array |
| 175 | + } |
| 176 | +
|
| 177 | + let create n = |
| 178 | + let status = Atomic.make `Collecting in |
| 179 | + let counters = Array.init n @@ fun _ -> Atomic.make 0 in |
| 180 | + { status; counters } |
| 181 | +
|
| 182 | + let is_collecting s = Atomic.get s.status == `Collecting |
| 183 | +
|
| 184 | + let set s i after = |
| 185 | + let snap = s.counters.(i) in |
| 186 | + let before = Atomic.get snap in |
| 187 | + if before < after then |
| 188 | + Atomic.compare_and_set snap before after |> ignore |
| 189 | +
|
| 190 | + let rec forward s i after = |
| 191 | + let snap = s.counters.(i) in |
| 192 | + let before = Atomic.get snap in |
| 193 | + if before < after then |
| 194 | + if not (Atomic.compare_and_set snap before after) then |
| 195 | + forward s i after |
| 196 | +
|
| 197 | + let rec sum_counters s sum i = |
| 198 | + if i < Array.length s.counters then |
| 199 | + let decr = Atomic.get s.counters.(i) in |
| 200 | + let incr = Atomic.get s.counters.(i+1) in |
| 201 | + sum_counters s (sum - decr + incr) (i + 2) |
| 202 | + else |
| 203 | + sum |
| 204 | +
|
| 205 | + let compute s = |
| 206 | + if Atomic.get s.status == `Collecting then |
| 207 | + Atomic.compare_and_set s.status `Collecting `Computing |> ignore; |
| 208 | + if Atomic.get s.status == `Computing then begin |
| 209 | + let n = sum_counters s 0 0 in |
| 210 | + if Atomic.get s.status == `Computing then |
| 211 | + Atomic.compare_and_set s.status `Computing (`Value n) |> ignore |
| 212 | + end; |
| 213 | + match Atomic.get s.status with |
| 214 | + | `Value n -> n |
| 215 | + | (`Collecting | `Computing) -> failwith "impossible" |
| 216 | +end |
| 217 | +``` |
| 218 | + |
| 219 | +```ocaml |
| 220 | +type once = |
| 221 | + [ `New of [ `New of int | `Done ] ref |
| 222 | + | `Done ] |
| 223 | +
|
| 224 | +type counter = { |
| 225 | + value : int; |
| 226 | + once : [ `New of int | `Done ] ref |
| 227 | +} |
| 228 | +
|
| 229 | +type size = { |
| 230 | + snapshot : Snapshot.t Atomic.t; |
| 231 | + counters : counter Atomic.t array; |
| 232 | +} |
| 233 | +
|
| 234 | +type t = size Atomic.t |
| 235 | +``` |
| 236 | + |
| 237 | +```ocaml |
| 238 | +let create () : t = |
| 239 | + let width = 2 in |
| 240 | + let snapshot = Atomic.make (Snapshot.create width) in |
| 241 | + let counters = |
| 242 | + Array.init width @@ fun _ -> |
| 243 | + Atomic.make { value = 0; once = ref `Done } |
| 244 | + in |
| 245 | + Atomic.make { snapshot; counters } |
| 246 | +``` |
| 247 | + |
| 248 | +```ocaml |
| 249 | +let rec update_once size once counter = |
| 250 | + let before = Atomic.get counter in |
| 251 | + match !once with |
| 252 | + | `Done -> () |
| 253 | + | `New index -> |
| 254 | + if before.once != once then begin |
| 255 | + before.once := `Done; |
| 256 | + let value = before.value + 1 in |
| 257 | + let after = { value; once } in |
| 258 | + if Atomic.compare_and_set counter before after then begin |
| 259 | + let snapshot = Atomic.get size.snapshot in |
| 260 | + if Snapshot.is_collecting snapshot then |
| 261 | + Snapshot.forward snapshot index value |
| 262 | + end |
| 263 | + else update_once size once size.counters.(index) |
| 264 | + end |
| 265 | +
|
| 266 | +let update_once (t : t) (once : once) = |
| 267 | + match once with |
| 268 | + | `Done -> () |
| 269 | + | `New once -> |
| 270 | + match !once with |
| 271 | + | `Done -> () |
| 272 | + | `New index -> |
| 273 | + let size = Atomic.get t in |
| 274 | + update_once size once size.counters.(index) |
| 275 | +``` |
0 commit comments