Skip to content

Commit 23cf3a3

Browse files
authored
feat: async worker implementation (#56)
* feat: add worker package with generic constructs * feat: implement job-queue using postgres * test: add worker tests * refactor: restructure pgq * refactor: change job-queue contract * chore: nicer makefile output * chore: upgrade go version
1 parent 8be6526 commit 23cf3a3

17 files changed

+1199
-24
lines changed

.github/workflows/lint.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ jobs:
77
steps:
88
- uses: actions/setup-go@v2
99
with:
10-
go-version: "1.17"
10+
go-version: "1.18"
1111
- uses: actions/checkout@v2
1212
with:
1313
fetch-depth: 0

.github/workflows/release.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ jobs:
1414
fetch-depth: 0
1515
- uses: actions/setup-go@v2
1616
with:
17-
go-version: "1.17"
17+
go-version: "1.18"
1818
- name: Login to DockerHub
1919
uses: docker/login-action@v1
2020
with:

.github/workflows/test.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,6 @@ jobs:
88
- uses: actions/checkout@v2
99
- uses: actions/setup-go@v2
1010
with:
11-
go-version: "1.17"
11+
go-version: "1.18"
1212
- name: run tests
1313
run: make test

.golangci.yml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ output:
66
linters:
77
enable-all: true
88
disable:
9+
- exhaustruct
910
- cyclop
1011
- exhaustive
1112
- exhaustivestruct
@@ -28,6 +29,9 @@ linters:
2829
- scopelint
2930
- tagliatelle
3031
- testpackage
32+
- paralleltest
33+
- tparallel
34+
- containedctx
3135
- varnamelen
3236
- wrapcheck
3337
- wsl

Makefile

Lines changed: 29 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -6,32 +6,43 @@ COVERAGE_DIR=coverage
66
BUILD_DIR=dist
77
EXE=entropy
88

9-
.PHONY: all build clean
9+
.PHONY: all build clean tidy format test test-coverage
1010

1111
all: clean test build format lint
1212

13-
build:
14-
mkdir -p ${BUILD_DIR}
15-
CGO_ENABLED=0 go build -ldflags '-X "${NAME}/pkg/version.Version=${VERSION}" -X "${NAME}/pkg/version.Commit=${COMMIT}" -X "${NAME}/pkg/version.BuildTime=${BUILD_TIME}"' -o ${BUILD_DIR}/${EXE}
13+
tidy:
14+
@echo "Tidy up go.mod..."
15+
@go mod tidy -v
1616

17-
clean:
18-
rm -rf ${COVERAGE_DIR} ${BUILD_DIR}
17+
format:
18+
@echo "Running gofumpt..."
19+
@gofumpt -l -w .
1920

20-
download:
21-
go mod download
21+
lint:
22+
@echo "Running lint checks using golangci-lint..."
23+
@golangci-lint run
24+
25+
clean: tidy
26+
@echo "Cleaning up build directories..."
27+
@rm -rf ${COVERAGE_DIR} ${BUILD_DIR}
28+
@echo "Running go-generate..."
29+
@go generate ./...
2230

23-
test:
24-
mkdir -p ${COVERAGE_DIR}
25-
go test ./... -coverprofile=${COVERAGE_DIR}/coverage.out
31+
test: tidy
32+
@mkdir -p ${COVERAGE_DIR}
33+
@echo "Running unit tests..."
34+
@go test ./... -coverprofile=${COVERAGE_DIR}/coverage.out
2635

2736
test-coverage: test
28-
go tool cover -html=${COVERAGE_DIR}/coverage.out
37+
@echo "Generating coverage report..."
38+
@go tool cover -html=${COVERAGE_DIR}/coverage.out
2939

30-
generate:
31-
go generate ./...
40+
build: clean
41+
@mkdir -p ${BUILD_DIR}
42+
@echo "Running build for '${VERSION}' in '${BUILD_DIR}/'..."
43+
@CGO_ENABLED=0 go build -ldflags '-X "${NAME}/pkg/version.Version=${VERSION}" -X "${NAME}/pkg/version.Commit=${COMMIT}" -X "${NAME}/pkg/version.BuildTime=${BUILD_TIME}"' -o ${BUILD_DIR}/${EXE}
44+
45+
download:
46+
go mod download
3247

33-
format:
34-
gofumpt -l -w .
3548

36-
lint:
37-
golangci-lint run

go.mod

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,10 @@ module github.com/odpf/entropy
33
go 1.18
44

55
require (
6-
github.com/davecgh/go-spew v1.1.1
6+
github.com/Masterminds/squirrel v1.5.2
77
github.com/google/go-cmp v0.5.8
88
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
9+
github.com/lib/pq v1.10.4
910
github.com/mcuadros/go-defaults v1.2.0
1011
github.com/newrelic/go-agent/v3 v3.15.2
1112
github.com/newrelic/go-agent/v3/integrations/nrgrpc v1.3.1
@@ -33,7 +34,6 @@ require (
3334
github.com/Masterminds/goutils v1.1.1 // indirect
3435
github.com/Masterminds/semver/v3 v3.1.1 // indirect
3536
github.com/Masterminds/sprig/v3 v3.2.2 // indirect
36-
github.com/Masterminds/squirrel v1.5.2 // indirect
3737
github.com/PuerkitoBio/purell v1.1.1 // indirect
3838
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect
3939
github.com/alecthomas/chroma v0.8.2 // indirect
@@ -47,6 +47,7 @@ require (
4747
github.com/containerd/containerd v1.6.3 // indirect
4848
github.com/cyphar/filepath-securejoin v0.2.3 // indirect
4949
github.com/danwakefield/fnmatch v0.0.0-20160403171240-cbb64ac3d964 // indirect
50+
github.com/davecgh/go-spew v1.1.1 // indirect
5051
github.com/dlclark/regexp2 v1.2.0 // indirect
5152
github.com/docker/cli v20.10.11+incompatible // indirect
5253
github.com/docker/distribution v2.8.1+incompatible // indirect
@@ -93,7 +94,6 @@ require (
9394
github.com/klauspost/compress v1.13.6 // indirect
9495
github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 // indirect
9596
github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 // indirect
96-
github.com/lib/pq v1.10.4 // indirect
9797
github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de // indirect
9898
github.com/lucasb-eyer/go-colorful v1.2.0 // indirect
9999
github.com/magiconair/properties v1.8.5 // indirect

pkg/worker/job.go

Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
package worker
2+
3+
//go:generate mockery --name=JobQueue -r --case underscore --with-expecter --structname JobQueue --filename=job_queue.go --output=./mocks
4+
5+
import (
6+
"context"
7+
"errors"
8+
"fmt"
9+
"strings"
10+
"time"
11+
)
12+
13+
const minRetryBackoff = 5 * time.Second
14+
15+
const (
16+
// StatusDone indicates the Job is successfully finished.
17+
StatusDone = "DONE"
18+
19+
// StatusPanic indicates there was a panic during job-execution.
20+
// This is a terminal status and will NOT be retried.
21+
StatusPanic = "PANIC"
22+
23+
// StatusFailed indicates job failed to succeed even after retries.
24+
// This is a terminal status and will NOT be retried.
25+
StatusFailed = "FAILED"
26+
27+
// StatusPending indicates at-least 1 attempt is still pending.
28+
StatusPending = "PENDING"
29+
)
30+
31+
var (
32+
ErrInvalidJob = errors.New("job is not valid")
33+
ErrKindExists = errors.New("handler for given kind exists")
34+
ErrUnknownKind = errors.New("job kind is invalid")
35+
)
36+
37+
// Job represents the specification for async processing and also maintains
38+
// the progress so far.
39+
type Job struct {
40+
// Specification of the job.
41+
ID string `json:"id"`
42+
Kind string `json:"kind"`
43+
RunAt time.Time `json:"run_at"`
44+
Payload []byte `json:"payload"`
45+
46+
// Internal metadata.
47+
Status string `json:"status"`
48+
CreatedAt time.Time `json:"created_at"`
49+
UpdatedAt time.Time `json:"updated_at"`
50+
51+
// Execution information.
52+
Result []byte `json:"result,omitempty"`
53+
AttemptsDone int64 `json:"attempts_done"`
54+
LastAttemptAt time.Time `json:"last_attempt_at,omitempty"`
55+
LastError string `json:"last_error,omitempty"`
56+
}
57+
58+
// JobQueue represents a special queue that holds jobs and releases them via
59+
// Dequeue() only after their RunAt time.
60+
type JobQueue interface {
61+
// Enqueue all jobs. Enqueue must ensure all-or-nothing behaviour.
62+
// Jobs with zero-value or historical value for ReadyAt must be
63+
// executed immediately.
64+
Enqueue(ctx context.Context, jobs ...Job) error
65+
66+
// Dequeue one job having one of the given kinds and invoke `fn`.
67+
// The job should be 'locked' until `fn` returns. Refer DequeueFn.
68+
Dequeue(ctx context.Context, kinds []string, fn DequeueFn) error
69+
}
70+
71+
// DequeueFn is invoked by the JobQueue for ready jobs. It is responsible for
72+
// handling a ready job and returning the updated version after the attempt.
73+
type DequeueFn func(ctx context.Context, j Job) (*Job, error)
74+
75+
// RetryableError can be returned by a JobFn to instruct the worker to attempt
76+
// retry after time specified by the RetryAfter field. RetryAfter can have min
77+
// of 5 seconds.
78+
type RetryableError struct {
79+
Cause error
80+
RetryAfter time.Duration
81+
}
82+
83+
func (j *Job) Sanitise() error {
84+
now := time.Now()
85+
86+
j.ID = strings.TrimSpace(j.ID)
87+
j.Kind = strings.TrimSpace(strings.ToLower(j.Kind))
88+
89+
if j.ID == "" {
90+
return fmt.Errorf("%w: job id must be set", ErrInvalidJob)
91+
}
92+
93+
if j.Kind == "" {
94+
return fmt.Errorf("%w: job kind must be set", ErrInvalidJob)
95+
}
96+
97+
j.Status = StatusPending
98+
j.CreatedAt = now
99+
j.UpdatedAt = now
100+
101+
if j.RunAt.IsZero() {
102+
j.RunAt = now
103+
}
104+
105+
j.AttemptsDone = 0
106+
j.LastAttemptAt = time.Time{}
107+
j.LastError = ""
108+
return nil
109+
}
110+
111+
// Attempt attempts to safely invoke `fn` for this job. Handles success, failure
112+
// and panic scenarios and updates the job with result in-place.
113+
func (j *Job) Attempt(ctx context.Context, now time.Time, fn JobFn) {
114+
defer func() {
115+
if v := recover(); v != nil {
116+
j.LastError = fmt.Sprintf("panic: %v", v)
117+
j.Status = StatusPanic
118+
}
119+
120+
j.AttemptsDone++
121+
j.LastAttemptAt = now
122+
j.UpdatedAt = now
123+
}()
124+
125+
select {
126+
case <-ctx.Done():
127+
j.Status = StatusPending
128+
j.RunAt = now.Add(minRetryBackoff)
129+
j.LastError = fmt.Sprintf("cancelled: %v", ctx.Err())
130+
131+
default:
132+
res, err := fn(ctx, *j)
133+
if err != nil {
134+
re := &RetryableError{}
135+
if errors.As(err, &re) {
136+
j.RunAt = now.Add(re.backoff())
137+
j.LastError = re.Error()
138+
j.Status = StatusPending
139+
} else {
140+
j.LastError = err.Error()
141+
j.Status = StatusFailed
142+
}
143+
} else {
144+
j.Result = res
145+
j.Status = StatusDone
146+
}
147+
}
148+
}
149+
150+
func (re *RetryableError) Error() string {
151+
return fmt.Sprintf("retryable-error: %v", re.Cause)
152+
}
153+
154+
func (re RetryableError) backoff() time.Duration {
155+
if re.RetryAfter <= minRetryBackoff {
156+
return minRetryBackoff
157+
}
158+
return re.RetryAfter
159+
}

0 commit comments

Comments
 (0)