Skip to content

Commit a258b1f

Browse files
authored
Give riverqueue.Job fully defined properties + timestamps as UTC (#26)
Previously, the internal sqlc `RiverJob` row was fully typed by virtue of being generated by sqlc, but the `riverqueue.Job` type was undefined, with typechecks working by using a `cast`. Here, give `riverqueue.Job` a full set of defined properties. This is better for things like conveying type information and autocomplete, but has a few other side benefits: * Make sure to return all timestamps as UTC. Previously, they'd be in whatever your local timezone is. * Give some fields like `args`, `metadata`, and `state` better types (the first two were previously `Any`). Lastly, modify `InsertResult` somewhat to make `job` non-optional since it's always returned, even if insert was skipped, because if it was we look it up via select query.
1 parent 8b5b76a commit a258b1f

File tree

6 files changed

+113
-40
lines changed

6 files changed

+113
-40
lines changed

src/riverqueue/__init__.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
AsyncClient as AsyncClient,
44
JobArgs as JobArgs,
55
JobArgsWithInsertOpts as JobArgsWithInsertOpts,
6-
JobState as JobState,
76
Client as Client,
87
InsertManyParams as InsertManyParams,
98
InsertOpts as InsertOpts,
@@ -12,4 +11,5 @@
1211
from .model import (
1312
InsertResult as InsertResult,
1413
Job as Job,
14+
JobState as JobState,
1515
)

src/riverqueue/client.py

+1-13
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
from dataclasses import dataclass
22
from datetime import datetime, timezone, timedelta
3-
from enum import Enum
43
import re
54
from typing import (
65
Any,
@@ -16,21 +15,10 @@
1615

1716
from .driver import GetParams, JobInsertParams, DriverProtocol, ExecutorProtocol
1817
from .driver.driver_protocol import AsyncDriverProtocol, AsyncExecutorProtocol
19-
from .model import InsertResult
18+
from .model import InsertResult, JobState
2019
from .fnv import fnv1_hash
2120

2221

23-
class JobState(str, Enum):
24-
AVAILABLE = "available"
25-
CANCELLED = "cancelled"
26-
COMPLETED = "completed"
27-
DISCARDED = "discarded"
28-
PENDING = "pending"
29-
RETRYABLE = "retryable"
30-
RUNNING = "running"
31-
SCHEDULED = "scheduled"
32-
33-
3422
MAX_ATTEMPTS_DEFAULT: int = 25
3523
PRIORITY_DEFAULT: int = 1
3624
QUEUE_DEFAULT: str = "default"

src/riverqueue/driver/riversqlalchemy/sql_alchemy_driver.py

+51-20
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
)
1717

1818
from ...driver import DriverProtocol, ExecutorProtocol, GetParams, JobInsertParams
19-
from ...model import Job
19+
from ...model import Job, JobState
2020
from .dbsqlc import models, river_job, pg_misc
2121

2222

@@ -30,11 +30,13 @@ async def advisory_lock(self, key: int) -> None:
3030
await self.pg_misc_querier.pg_advisory_xact_lock(key=key)
3131

3232
async def job_insert(self, insert_params: JobInsertParams) -> Job:
33-
return cast(
34-
Job,
35-
await self.job_querier.job_insert_fast(
36-
cast(river_job.JobInsertFastParams, insert_params)
37-
),
33+
return _job_from_row(
34+
cast( # drop Optional[] because insert always returns a row
35+
models.RiverJob,
36+
await self.job_querier.job_insert_fast(
37+
cast(river_job.JobInsertFastParams, insert_params)
38+
),
39+
)
3840
)
3941

4042
async def job_insert_many(self, all_params: list[JobInsertParams]) -> int:
@@ -46,12 +48,10 @@ async def job_insert_many(self, all_params: list[JobInsertParams]) -> int:
4648
async def job_get_by_kind_and_unique_properties(
4749
self, get_params: GetParams
4850
) -> Optional[Job]:
49-
return cast(
50-
Optional[Job],
51-
await self.job_querier.job_get_by_kind_and_unique_properties(
52-
cast(river_job.JobGetByKindAndUniquePropertiesParams, get_params)
53-
),
51+
row = await self.job_querier.job_get_by_kind_and_unique_properties(
52+
cast(river_job.JobGetByKindAndUniquePropertiesParams, get_params)
5453
)
54+
return _job_from_row(row) if row else None
5555

5656
@asynccontextmanager
5757
async def transaction(self) -> AsyncGenerator:
@@ -91,10 +91,12 @@ def advisory_lock(self, key: int) -> None:
9191
self.pg_misc_querier.pg_advisory_xact_lock(key=key)
9292

9393
def job_insert(self, insert_params: JobInsertParams) -> Job:
94-
return cast(
95-
Job,
96-
self.job_querier.job_insert_fast(
97-
cast(river_job.JobInsertFastParams, insert_params)
94+
return _job_from_row(
95+
cast( # drop Optional[] because insert always returns a row
96+
models.RiverJob,
97+
self.job_querier.job_insert_fast(
98+
cast(river_job.JobInsertFastParams, insert_params)
99+
),
98100
),
99101
)
100102

@@ -105,12 +107,10 @@ def job_insert_many(self, all_params: list[JobInsertParams]) -> int:
105107
def job_get_by_kind_and_unique_properties(
106108
self, get_params: GetParams
107109
) -> Optional[Job]:
108-
return cast(
109-
Optional[Job],
110-
self.job_querier.job_get_by_kind_and_unique_properties(
111-
cast(river_job.JobGetByKindAndUniquePropertiesParams, get_params)
112-
),
110+
row = self.job_querier.job_get_by_kind_and_unique_properties(
111+
cast(river_job.JobGetByKindAndUniquePropertiesParams, get_params)
113112
)
113+
return _job_from_row(row) if row else None
114114

115115
@contextmanager
116116
def transaction(self) -> Iterator[None]:
@@ -169,3 +169,34 @@ def _build_insert_many_params(
169169
insert_many_params.tags.append(",".join(insert_params.tags))
170170

171171
return insert_many_params
172+
173+
174+
def _job_from_row(row: models.RiverJob) -> Job:
175+
"""
176+
Converts an internal sqlc generated row to the top level type, issuing a few
177+
minor transformations along the way. Timestamps are changed from local
178+
timezone to UTC.
179+
"""
180+
181+
return Job(
182+
id=row.id,
183+
args=row.args,
184+
attempt=row.attempt,
185+
attempted_at=row.attempted_at.astimezone(timezone.utc)
186+
if row.attempted_at
187+
else None,
188+
attempted_by=row.attempted_by,
189+
created_at=row.created_at.astimezone(timezone.utc),
190+
errors=row.errors,
191+
finalized_at=row.finalized_at.astimezone(timezone.utc)
192+
if row.finalized_at
193+
else None,
194+
kind=row.kind,
195+
max_attempts=row.max_attempts,
196+
metadata=row.metadata,
197+
priority=row.priority,
198+
queue=row.queue,
199+
state=cast(JobState, row.state),
200+
scheduled_at=row.scheduled_at.astimezone(timezone.utc),
201+
tags=row.tags,
202+
)

src/riverqueue/model.py

+31-3
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,41 @@
11
from dataclasses import dataclass, field
2-
from typing import Optional
2+
import datetime
3+
from enum import Enum
4+
from typing import Any, Optional
5+
6+
7+
class JobState(str, Enum):
8+
AVAILABLE = "available"
9+
CANCELLED = "cancelled"
10+
COMPLETED = "completed"
11+
DISCARDED = "discarded"
12+
PENDING = "pending"
13+
RETRYABLE = "retryable"
14+
RUNNING = "running"
15+
SCHEDULED = "scheduled"
316

417

518
@dataclass
619
class InsertResult:
7-
job: Optional["Job"] = field(default=None)
20+
job: "Job"
821
unique_skipped_as_duplicated: bool = field(default=False)
922

1023

1124
@dataclass
1225
class Job:
13-
pass
26+
id: int
27+
args: dict[str, Any]
28+
attempt: int
29+
attempted_at: Optional[datetime.datetime]
30+
attempted_by: Optional[list[str]]
31+
created_at: datetime.datetime
32+
errors: Optional[list[Any]]
33+
finalized_at: Optional[datetime.datetime]
34+
kind: str
35+
max_attempts: int
36+
metadata: dict[str, Any]
37+
priority: int
38+
queue: str
39+
state: JobState
40+
scheduled_at: datetime.datetime
41+
tags: list[str]

tests/client_test.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -267,14 +267,14 @@ def test_tag_validation(client):
267267
with pytest.raises(AssertionError) as ex:
268268
client.insert(SimpleArgs(), insert_opts=InsertOpts(tags=["commas,bad"]))
269269
assert (
270-
"tags should be less than 255 characters in length and match regex \A[\w][\w\-]+[\w]\Z"
270+
r"tags should be less than 255 characters in length and match regex \A[\w][\w\-]+[\w]\Z"
271271
== str(ex.value)
272272
)
273273

274274
with pytest.raises(AssertionError) as ex:
275275
client.insert(SimpleArgs(), insert_opts=InsertOpts(tags=["a" * 256]))
276276
assert (
277-
"tags should be less than 255 characters in length and match regex \A[\w][\w\-]+[\w]\Z"
277+
r"tags should be less than 255 characters in length and match regex \A[\w][\w\-]+[\w]\Z"
278278
== str(ex.value)
279279
)
280280

tests/driver/riversqlalchemy/sqlalchemy_driver_test.py

+27-1
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,20 @@
11
import pytest
22
import pytest_asyncio
3+
from riverqueue.model import JobState
34
import sqlalchemy
45
import sqlalchemy.ext.asyncio
56
from datetime import datetime, timezone
67
from typing import AsyncIterator, Iterator
78
from unittest.mock import patch
89

910
from riverqueue import Client, InsertOpts, UniqueOpts
10-
from riverqueue.client import AsyncClient, InsertManyParams
11+
from riverqueue.client import (
12+
MAX_ATTEMPTS_DEFAULT,
13+
PRIORITY_DEFAULT,
14+
QUEUE_DEFAULT,
15+
AsyncClient,
16+
InsertManyParams,
17+
)
1118
from riverqueue.driver import riversqlalchemy
1219
from riverqueue.driver.driver_protocol import GetParams
1320

@@ -45,6 +52,25 @@ async def client_async(
4552
return AsyncClient(driver_async)
4653

4754

55+
def test_insert_job_from_row(client, driver):
56+
insert_res = client.insert(SimpleArgs())
57+
job = insert_res.job
58+
assert job
59+
assert isinstance(job.args, dict)
60+
assert job.attempt == 0
61+
assert job.attempted_by is None
62+
assert job.created_at.tzinfo == timezone.utc
63+
assert job.errors is None
64+
assert job.kind == "simple"
65+
assert job.max_attempts == MAX_ATTEMPTS_DEFAULT
66+
assert isinstance(job.metadata, dict)
67+
assert job.priority == PRIORITY_DEFAULT
68+
assert job.queue == QUEUE_DEFAULT
69+
assert job.scheduled_at.tzinfo == timezone.utc
70+
assert job.state == JobState.AVAILABLE
71+
assert job.tags == []
72+
73+
4874
def test_insert_with_only_args_sync(client, driver):
4975
insert_res = client.insert(SimpleArgs())
5076
assert insert_res.job

0 commit comments

Comments
 (0)