Skip to content

Commit 91ad550

Browse files
authored
Fast path for unique insertion (#36)
Updates the Python client to be compatible with the fast unique insertion added to the main River in [1] which uses a unique index instead of advisory lock + fetch as long as uniqueness is constrained to the default set of unique job states. Also, not so much by design originally, but upgrade to sqlc v1.27.0 as we have going in other River projects. [1] riverqueue/river#451
1 parent 6f06010 commit 91ad550

File tree

13 files changed

+503
-93
lines changed

13 files changed

+503
-93
lines changed

.github/workflows/ci.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ jobs:
142142

143143
env:
144144
BIN_PATH: /home/runner/bin
145-
SQLC_VERSION: 1.26.0
145+
SQLC_VERSION: 1.27.0
146146

147147
steps:
148148
- name: Checkout

CHANGELOG.md

+4
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## [Unreleased]
99

10+
### Changed
11+
12+
- Now compatible with "fast path" unique job insertion that uses a unique index instead of advisory lock and fetch [as introduced in River #451](https://github.com/riverqueue/river/pull/451). [PR #36](https://github.com/riverqueue/riverqueue-python/pull/36).
13+
1014
## [0.6.3] - 2024-07-08
1115

1216
### Fixed

docs/development.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ $ rye lint
5151
## Run type check (Mypy)
5252

5353
```shell
54-
$ make typecheck
54+
$ make type-check
5555
```
5656

5757
## Format code

src/riverqueue/client.py

+83-58
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
11
from dataclasses import dataclass, field
22
from datetime import datetime, timezone, timedelta
3+
from hashlib import sha256
34
import re
45
from typing import (
5-
Awaitable,
66
Optional,
77
Protocol,
88
Tuple,
99
List,
10-
Callable,
1110
cast,
1211
runtime_checkable,
1312
)
@@ -206,12 +205,7 @@ def to_json(self) -> str:
206205
insert_opts = InsertOpts()
207206
insert_params, unique_opts = _make_driver_insert_params(args, insert_opts)
208207

209-
async def insert():
210-
return InsertResult(await exec.job_insert(insert_params))
211-
212-
return await self.__check_unique_job(
213-
exec, insert_params, unique_opts, insert
214-
)
208+
return await self.__insert_job_with_unique(exec, insert_params, unique_opts)
215209

216210
async def insert_tx(
217211
self, tx, args: JobArgs, insert_opts: Optional[InsertOpts] = None
@@ -253,10 +247,7 @@ async def insert_tx(
253247
insert_opts = InsertOpts()
254248
insert_params, unique_opts = _make_driver_insert_params(args, insert_opts)
255249

256-
async def insert():
257-
return InsertResult(await exec.job_insert(insert_params))
258-
259-
return await self.__check_unique_job(exec, insert_params, unique_opts, insert)
250+
return await self.__insert_job_with_unique(exec, insert_params, unique_opts)
260251

261252
async def insert_many(self, args: List[JobArgs | InsertManyParams]) -> int:
262253
"""
@@ -327,33 +318,50 @@ async def insert_many_tx(self, tx, args: List[JobArgs | InsertManyParams]) -> in
327318
exec = self.driver.unwrap_executor(tx)
328319
return await exec.job_insert_many(_make_driver_insert_params_many(args))
329320

330-
async def __check_unique_job(
321+
async def __insert_job_with_unique(
331322
self,
332323
exec: AsyncExecutorProtocol,
333324
insert_params: JobInsertParams,
334325
unique_opts: Optional[UniqueOpts],
335-
insert_func: Callable[[], Awaitable[InsertResult]],
336326
) -> InsertResult:
337327
"""
338-
Checks for an existing unique job and runs `insert_func` if one is
339-
present.
328+
Inserts a job, accounting for unique jobs whose insertion may be skipped
329+
if an equivalent job is already present.
340330
"""
341331

342-
get_params, lock_key = _build_unique_get_params_and_lock_key(
343-
self.advisory_lock_prefix, insert_params, unique_opts
332+
get_params, unique_key = _build_unique_get_params_and_unique_key(
333+
insert_params, unique_opts
344334
)
345335

346-
if not get_params:
347-
return await insert_func()
336+
if not get_params or not unique_opts:
337+
return InsertResult(await exec.job_insert(insert_params))
338+
339+
# fast path
340+
if (
341+
not unique_opts.by_state
342+
or unique_opts.by_state.sort == UNIQUE_STATES_DEFAULT
343+
):
344+
job, unique_skipped_as_duplicate = await exec.job_insert_unique(
345+
insert_params, sha256(unique_key.encode("utf-8")).digest()
346+
)
347+
return InsertResult(
348+
job=job, unique_skipped_as_duplicated=unique_skipped_as_duplicate
349+
)
348350

349351
async with exec.transaction():
350-
await exec.advisory_lock(lock_key)
352+
lock_key = "unique_key"
353+
lock_key += "kind=#{insert_params.kind}"
354+
lock_key += unique_key
355+
356+
await exec.advisory_lock(
357+
_hash_lock_key(self.advisory_lock_prefix, lock_key)
358+
)
351359

352360
existing_job = await exec.job_get_by_kind_and_unique_properties(get_params)
353361
if existing_job:
354362
return InsertResult(existing_job, unique_skipped_as_duplicated=True)
355363

356-
return await insert_func()
364+
return InsertResult(await exec.job_insert(insert_params))
357365

358366

359367
class Client:
@@ -451,10 +459,7 @@ def to_json(self) -> str:
451459
insert_opts = InsertOpts()
452460
insert_params, unique_opts = _make_driver_insert_params(args, insert_opts)
453461

454-
def insert():
455-
return InsertResult(exec.job_insert(insert_params))
456-
457-
return self.__check_unique_job(exec, insert_params, unique_opts, insert)
462+
return self.__insert_job_with_unique(exec, insert_params, unique_opts)
458463

459464
def insert_tx(
460465
self, tx, args: JobArgs, insert_opts: Optional[InsertOpts] = None
@@ -496,10 +501,7 @@ def insert_tx(
496501
insert_opts = InsertOpts()
497502
insert_params, unique_opts = _make_driver_insert_params(args, insert_opts)
498503

499-
def insert():
500-
return InsertResult(exec.job_insert(insert_params))
501-
502-
return self.__check_unique_job(exec, insert_params, unique_opts, insert)
504+
return self.__insert_job_with_unique(exec, insert_params, unique_opts)
503505

504506
def insert_many(self, args: List[JobArgs | InsertManyParams]) -> int:
505507
"""
@@ -570,58 +572,72 @@ def insert_many_tx(self, tx, args: List[JobArgs | InsertManyParams]) -> int:
570572
exec = self.driver.unwrap_executor(tx)
571573
return exec.job_insert_many(_make_driver_insert_params_many(args))
572574

573-
def __check_unique_job(
575+
def __insert_job_with_unique(
574576
self,
575577
exec: ExecutorProtocol,
576578
insert_params: JobInsertParams,
577579
unique_opts: Optional[UniqueOpts],
578-
insert_func: Callable[[], InsertResult],
579580
) -> InsertResult:
580581
"""
581-
Checks for an existing unique job and runs `insert_func` if one is
582-
present.
582+
Inserts a job, accounting for unique jobs whose insertion may be skipped
583+
if an equivalent job is already present.
583584
"""
584585

585-
get_params, lock_key = _build_unique_get_params_and_lock_key(
586-
self.advisory_lock_prefix, insert_params, unique_opts
586+
get_params, unique_key = _build_unique_get_params_and_unique_key(
587+
insert_params, unique_opts
587588
)
588589

589-
if not get_params:
590-
return insert_func()
590+
if not get_params or not unique_opts:
591+
return InsertResult(exec.job_insert(insert_params))
592+
593+
# fast path
594+
if (
595+
not unique_opts.by_state
596+
or unique_opts.by_state.sort == UNIQUE_STATES_DEFAULT
597+
):
598+
job, unique_skipped_as_duplicate = exec.job_insert_unique(
599+
insert_params, sha256(unique_key.encode("utf-8")).digest()
600+
)
601+
return InsertResult(
602+
job=job, unique_skipped_as_duplicated=unique_skipped_as_duplicate
603+
)
591604

592605
with exec.transaction():
593-
exec.advisory_lock(lock_key)
606+
lock_key = "unique_key"
607+
lock_key += "kind=#{insert_params.kind}"
608+
lock_key += unique_key
609+
610+
exec.advisory_lock(_hash_lock_key(self.advisory_lock_prefix, lock_key))
594611

595612
existing_job = exec.job_get_by_kind_and_unique_properties(get_params)
596613
if existing_job:
597614
return InsertResult(existing_job, unique_skipped_as_duplicated=True)
598615

599-
return insert_func()
616+
return InsertResult(exec.job_insert(insert_params))
600617

601618

602-
def _build_unique_get_params_and_lock_key(
603-
advisory_lock_prefix: Optional[int],
619+
def _build_unique_get_params_and_unique_key(
604620
insert_params: JobInsertParams,
605621
unique_opts: Optional[UniqueOpts],
606-
) -> tuple[Optional[JobGetByKindAndUniquePropertiesParam], int]:
622+
) -> tuple[Optional[JobGetByKindAndUniquePropertiesParam], str]:
607623
"""
608624
Builds driver get params and an advisory lock key from insert params and
609625
unique options for use during a unique job insertion.
610626
"""
611627

612628
if unique_opts is None:
613-
return (None, 0)
629+
return (None, "")
614630

615631
any_unique_opts = False
616632
get_params = JobGetByKindAndUniquePropertiesParam(kind=insert_params.kind)
617633

618-
lock_str = f"unique_keykind={insert_params.kind}"
634+
unique_key = ""
619635

620636
if unique_opts.by_args:
621637
any_unique_opts = True
622638
get_params.by_args = True
623639
get_params.args = insert_params.args
624-
lock_str += f"&args={insert_params.args}"
640+
unique_key += f"&args={insert_params.args}"
625641

626642
if unique_opts.by_period:
627643
lower_period_bound = _truncate_time(
@@ -634,33 +650,27 @@ def _build_unique_get_params_and_lock_key(
634650
lower_period_bound,
635651
lower_period_bound + timedelta(seconds=unique_opts.by_period),
636652
]
637-
lock_str += f"&period={lower_period_bound.strftime('%FT%TZ')}"
653+
unique_key += f"&period={lower_period_bound.strftime('%FT%TZ')}"
638654

639655
if unique_opts.by_queue:
640656
any_unique_opts = True
641657
get_params.by_queue = True
642658
get_params.queue = insert_params.queue
643-
lock_str += f"&queue={insert_params.queue}"
659+
unique_key += f"&queue={insert_params.queue}"
644660

645661
if unique_opts.by_state:
646662
any_unique_opts = True
647663
get_params.by_state = True
648664
get_params.state = cast(list[str], unique_opts.by_state)
649-
lock_str += f"&state={','.join(unique_opts.by_state)}"
665+
unique_key += f"&state={','.join(unique_opts.by_state)}"
650666
else:
651667
get_params.state = UNIQUE_STATES_DEFAULT
652-
lock_str += f"&state={','.join(UNIQUE_STATES_DEFAULT)}"
668+
unique_key += f"&state={','.join(UNIQUE_STATES_DEFAULT)}"
653669

654670
if not any_unique_opts:
655-
return (None, 0)
671+
return (None, "")
656672

657-
if advisory_lock_prefix is None:
658-
lock_key = fnv1_hash(lock_str.encode("utf-8"), 64)
659-
else:
660-
prefix = advisory_lock_prefix
661-
lock_key = (prefix << 32) | fnv1_hash(lock_str.encode("utf-8"), 32)
662-
663-
return (get_params, _uint64_to_int64(lock_key))
673+
return (get_params, unique_key)
664674

665675

666676
def _check_advisory_lock_prefix_bounds(
@@ -678,6 +688,21 @@ def _check_advisory_lock_prefix_bounds(
678688
return advisory_lock_prefix
679689

680690

691+
def _hash_lock_key(advisory_lock_prefix: Optional[int], lock_key: str) -> int:
692+
"""
693+
Generates an FNV-1 hash from the given lock key string suitable for use with
694+
a PG advisory lock while checking for the existence of a unique job.
695+
"""
696+
697+
if advisory_lock_prefix is None:
698+
lock_key_hash = fnv1_hash(lock_key.encode("utf-8"), 64)
699+
else:
700+
prefix = advisory_lock_prefix
701+
lock_key_hash = (prefix << 32) | fnv1_hash(lock_key.encode("utf-8"), 32)
702+
703+
return _uint64_to_int64(lock_key_hash)
704+
705+
681706
def _make_driver_insert_params(
682707
args: JobArgs,
683708
insert_opts: InsertOpts,

src/riverqueue/driver/driver_protocol.py

+10
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,11 @@ async def job_insert(self, insert_params: JobInsertParams) -> Job:
6464
async def job_insert_many(self, all_params) -> int:
6565
pass
6666

67+
async def job_insert_unique(
68+
self, insert_params: JobInsertParams, unique_key: bytes
69+
) -> tuple[Job, bool]:
70+
pass
71+
6772
async def job_get_by_kind_and_unique_properties(
6873
self, get_params: JobGetByKindAndUniquePropertiesParam
6974
) -> Optional[Job]:
@@ -137,6 +142,11 @@ def job_insert(self, insert_params: JobInsertParams) -> Job:
137142
def job_insert_many(self, all_params) -> int:
138143
pass
139144

145+
def job_insert_unique(
146+
self, insert_params: JobInsertParams, unique_key: bytes
147+
) -> tuple[Job, bool]:
148+
pass
149+
140150
def job_get_by_kind_and_unique_properties(
141151
self, get_params: JobGetByKindAndUniquePropertiesParam
142152
) -> Optional[Job]:

src/riverqueue/driver/riversqlalchemy/dbsqlc/models.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# Code generated by sqlc. DO NOT EDIT.
22
# versions:
3-
# sqlc v1.26.0
3+
# sqlc v1.27.0
44
import dataclasses
55
import datetime
66
import enum
@@ -36,3 +36,4 @@ class RiverJob:
3636
state: RiverJobState
3737
scheduled_at: datetime.datetime
3838
tags: List[str]
39+
unique_key: Optional[memoryview]

src/riverqueue/driver/riversqlalchemy/dbsqlc/pg_misc.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# Code generated by sqlc. DO NOT EDIT.
22
# versions:
3-
# sqlc v1.26.0
3+
# sqlc v1.27.0
44
# source: pg_misc.sql
55
from typing import Any
66

0 commit comments

Comments
 (0)