Skip to content

Commit 6f3abdc

Browse files
rohilsuranaspy16
andauthored
feat(firehose): add field stop_by_time to stop firehose by give datetime (#85)
* feat(firehose): add field stop_by_time to stop firehose by give datetime * fix(firehose): tests with updated firehose spec config schema * feat(firehose): created scheduled sync job when stop time is set for firehose * fix(firehose): schedule sync job and change stop_by_time to stop_time field * refactor: refactor Plan struct to use Resource by value * feat: use run-at timestamp value in sync jobid Co-authored-by: Shivaprasad <[email protected]>
1 parent 4a3fb40 commit 6f3abdc

File tree

16 files changed

+253
-144
lines changed

16 files changed

+253
-144
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,3 +25,5 @@ tmp/
2525
expt/
2626

2727
entropy.dev.yaml
28+
29+
requests.http

cli/serve.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,10 @@ func runServer(baseCtx context.Context, nrApp *newrelic.Application, zapLog *zap
9090
return err
9191
}
9292

93+
if err := asyncWorker.Register(core.JobKindScheduledSyncResource, service.HandleSyncJob); err != nil {
94+
return err
95+
}
96+
9397
return entropyserver.Serve(ctx, cfg.Service.addr(), nrApp, zapLog, service)
9498
}
9599

core/mocks/loggable_module.go

Lines changed: 5 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/mocks/module.go

Lines changed: 5 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/module/module.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package module
55
import (
66
"context"
77
"encoding/json"
8+
"time"
89

910
"github.com/odpf/entropy/core/resource"
1011
"github.com/odpf/entropy/pkg/errors"
@@ -16,7 +17,7 @@ type Module interface {
1617
// Plan SHOULD validate the action on the current version of the resource,
1718
// return the resource with config/status/state changes (if any) applied.
1819
// Plan SHOULD NOT have side effects on anything other than the resource.
19-
Plan(ctx context.Context, spec Spec, act ActionRequest) (*resource.Resource, error)
20+
Plan(ctx context.Context, spec Spec, act ActionRequest) (*Plan, error)
2021

2122
// Sync is called repeatedly by Entropy core until the returned state is
2223
// a terminal status. Module implementation is free to execute an action
@@ -27,6 +28,12 @@ type Module interface {
2728
Sync(ctx context.Context, spec Spec) (*resource.State, error)
2829
}
2930

31+
// Plan represents the changes to be staged and later synced by module.
32+
type Plan struct {
33+
Resource resource.Resource
34+
ScheduleRunAt time.Time
35+
}
36+
3037
// Spec represents the context for Plan() or Sync() invocations.
3138
type Spec struct {
3239
Resource resource.Resource `json:"resource"`

core/module/registry.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ func (mr *Registry) get(kind string) (Descriptor, bool) {
5959
return desc, found
6060
}
6161

62-
func (mr *Registry) Plan(ctx context.Context, spec Spec, act ActionRequest) (*resource.Resource, error) {
62+
func (mr *Registry) Plan(ctx context.Context, spec Spec, act ActionRequest) (*Plan, error) {
6363
kind := spec.Resource.Kind
6464

6565
desc, found := mr.get(kind)

core/sync.go

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,19 +6,23 @@ import (
66
"fmt"
77
"time"
88

9+
"github.com/odpf/entropy/core/module"
910
"github.com/odpf/entropy/core/resource"
1011
"github.com/odpf/entropy/pkg/errors"
1112
"github.com/odpf/entropy/pkg/worker"
1213
)
1314

14-
const JobKindSyncResource = "sync_resource"
15+
const (
16+
JobKindSyncResource = "sync_resource"
17+
JobKindScheduledSyncResource = "sched_sync_resource"
18+
)
1519

1620
type syncJobPayload struct {
1721
ResourceURN string `json:"resource_urn"`
1822
UpdatedAt time.Time `json:"updated_at"`
1923
}
2024

21-
func (s *Service) enqueueSyncJob(ctx context.Context, res resource.Resource) error {
25+
func (s *Service) enqueueSyncJob(ctx context.Context, res resource.Resource, runAt time.Time, jobType string) error {
2226
data := syncJobPayload{
2327
ResourceURN: res.URN,
2428
UpdatedAt: res.UpdatedAt,
@@ -29,12 +33,17 @@ func (s *Service) enqueueSyncJob(ctx context.Context, res resource.Resource) err
2933
return err
3034
}
3135

32-
return s.worker.Enqueue(ctx, worker.Job{
33-
ID: fmt.Sprintf("sync-%s-%d", res.URN, res.UpdatedAt.Unix()),
34-
Kind: JobKindSyncResource,
35-
RunAt: time.Now(),
36+
job := worker.Job{
37+
ID: fmt.Sprintf(jobType+"-%s-%d", res.URN, runAt.Unix()),
38+
Kind: jobType,
39+
RunAt: runAt,
3640
Payload: payload,
37-
})
41+
}
42+
43+
if err := s.worker.Enqueue(ctx, job); err != nil && !errors.Is(err, worker.ErrJobExists) {
44+
return err
45+
}
46+
return nil
3847
}
3948

4049
// HandleSyncJob is meant to be invoked by asyncWorker when an enqueued job is
@@ -94,7 +103,7 @@ func (s *Service) syncChange(ctx context.Context, urn string) (*resource.Resourc
94103
return nil, err
95104
}
96105
} else {
97-
if err := s.upsert(ctx, *res, false); err != nil {
106+
if err := s.upsert(ctx, module.Plan{Resource: *res}, false); err != nil {
98107
return nil, err
99108
}
100109
}

core/write.go

Lines changed: 26 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -55,70 +55,77 @@ func (s *Service) ApplyAction(ctx context.Context, urn string, act module.Action
5555
}
5656

5757
func (s *Service) execAction(ctx context.Context, res resource.Resource, act module.ActionRequest) (*resource.Resource, error) {
58-
plannedRes, err := s.planChange(ctx, res, act)
58+
planned, err := s.planChange(ctx, res, act)
5959
if err != nil {
6060
return nil, err
6161
}
6262

6363
if isCreate(act.Name) {
64-
plannedRes.CreatedAt = s.clock()
65-
plannedRes.UpdatedAt = plannedRes.CreatedAt
64+
planned.Resource.CreatedAt = s.clock()
65+
planned.Resource.UpdatedAt = planned.Resource.CreatedAt
6666
} else {
67-
plannedRes.CreatedAt = res.CreatedAt
68-
plannedRes.UpdatedAt = s.clock()
67+
planned.Resource.CreatedAt = res.CreatedAt
68+
planned.Resource.UpdatedAt = s.clock()
6969
}
7070

71-
if err := s.upsert(ctx, *plannedRes, isCreate(act.Name)); err != nil {
71+
if err := s.upsert(ctx, *planned, isCreate(act.Name)); err != nil {
7272
return nil, err
7373
}
74-
return plannedRes, nil
74+
return &planned.Resource, nil
7575
}
7676

7777
func isCreate(actionName string) bool {
7878
return actionName == module.CreateAction
7979
}
8080

81-
func (s *Service) planChange(ctx context.Context, res resource.Resource, act module.ActionRequest) (*resource.Resource, error) {
81+
func (s *Service) planChange(ctx context.Context, res resource.Resource, act module.ActionRequest) (*module.Plan, error) {
8282
modSpec, err := s.generateModuleSpec(ctx, res)
8383
if err != nil {
8484
return nil, err
8585
}
8686

87-
plannedRes, err := s.rootModule.Plan(ctx, *modSpec, act)
87+
planned, err := s.rootModule.Plan(ctx, *modSpec, act)
8888
if err != nil {
8989
if errors.Is(err, errors.ErrInvalid) {
9090
return nil, err
9191
}
9292
return nil, errors.ErrInternal.WithMsgf("plan() failed").WithCausef(err.Error())
93-
} else if err := plannedRes.Validate(isCreate(act.Name)); err != nil {
93+
} else if err := planned.Resource.Validate(isCreate(act.Name)); err != nil {
9494
return nil, err
9595
}
9696

97-
return plannedRes, nil
97+
return planned, nil
9898
}
9999

100-
func (s *Service) upsert(ctx context.Context, res resource.Resource, isCreate bool) error {
101-
hook := func(ctx context.Context) error {
102-
if res.State.IsTerminal() {
100+
func (s *Service) upsert(ctx context.Context, plan module.Plan, isCreate bool) error {
101+
var hooks []resource.MutationHook
102+
hooks = append(hooks, func(ctx context.Context) error {
103+
if plan.Resource.State.IsTerminal() {
103104
// no need to enqueue if resource has reached terminal state.
104105
return nil
105106
}
106107

107-
return s.enqueueSyncJob(ctx, res)
108+
return s.enqueueSyncJob(ctx, plan.Resource, s.clock(), JobKindSyncResource)
109+
})
110+
111+
if !plan.ScheduleRunAt.IsZero() {
112+
hooks = append(hooks, func(ctx context.Context) error {
113+
return s.enqueueSyncJob(ctx, plan.Resource, plan.ScheduleRunAt, JobKindScheduledSyncResource)
114+
})
108115
}
109116

110117
var err error
111118
if isCreate {
112-
err = s.store.Create(ctx, res, hook)
119+
err = s.store.Create(ctx, plan.Resource, hooks...)
113120
} else {
114-
err = s.store.Update(ctx, res, hook)
121+
err = s.store.Update(ctx, plan.Resource, hooks...)
115122
}
116123

117124
if err != nil {
118125
if isCreate && errors.Is(err, errors.ErrConflict) {
119-
return errors.ErrConflict.WithMsgf("resource with urn '%s' already exists", res.URN)
126+
return errors.ErrConflict.WithMsgf("resource with urn '%s' already exists", plan.Resource.URN)
120127
} else if !isCreate && errors.Is(err, errors.ErrNotFound) {
121-
return errors.ErrNotFound.WithMsgf("resource with urn '%s' does not exist", res.URN)
128+
return errors.ErrNotFound.WithMsgf("resource with urn '%s' does not exist", plan.Resource.URN)
122129
}
123130
return errors.ErrInternal.WithCausef(err.Error())
124131
}

0 commit comments

Comments
 (0)