Skip to content

Commit 7cf4f3a

Browse files
Shivaprasadspy16
Shivaprasad
authored andcommitted
feat: add worker-count config and launch multiple workers
1 parent 8579741 commit 7cf4f3a

File tree

4 files changed

+30
-9
lines changed

4 files changed

+30
-9
lines changed

cli/config.go

+1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ type Config struct {
2727
}
2828

2929
type syncerConf struct {
30+
WorkerCount int `mapstructure:"worker_count" default:"1"`
3031
SyncInterval time.Duration `mapstructure:"sync_interval" default:"1s"`
3132
RefreshInterval time.Duration `mapstructure:"refresh_interval" default:"3s"`
3233
ExtendLockBy time.Duration `mapstructure:"extend_lock_by" default:"5s"`

cli/serve.go

+8-5
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,9 @@ func cmdServe() *cobra.Command {
4949
newrelic.ConfigAppName(cfg.Telemetry.ServiceName),
5050
newrelic.ConfigLicense(cfg.Telemetry.NewRelicAPIKey),
5151
)
52+
if err != nil {
53+
return err
54+
}
5255

5356
store := setupStorage(cfg.PGConnStr, cfg.Syncer)
5457
moduleService := module.NewService(setupRegistry(), store)
@@ -61,11 +64,11 @@ func cmdServe() *cobra.Command {
6164
}
6265

6366
if spawnWorker {
64-
go func() {
65-
if runErr := resourceService.RunSyncer(cmd.Context(), cfg.Syncer.SyncInterval); runErr != nil {
66-
zap.L().Error("syncer exited with error", zap.Error(err))
67-
}
68-
}()
67+
go resourceService.RunSyncer(
68+
cmd.Context(),
69+
cfg.Syncer.WorkerCount,
70+
cfg.Syncer.SyncInterval,
71+
)
6972
}
7073

7174
return entropyserver.Serve(cmd.Context(),

core/sync.go

+20-1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package core
22

33
import (
44
"context"
5+
"sync"
56
"time"
67

78
"go.uber.org/zap"
@@ -12,7 +13,24 @@ import (
1213

1314
// RunSyncer runs the syncer thread that keeps performing resource-sync at
1415
// regular intervals.
15-
func (svc *Service) RunSyncer(ctx context.Context, interval time.Duration) error {
16+
func (svc *Service) RunSyncer(ctx context.Context, workerCount int, interval time.Duration) {
17+
wg := &sync.WaitGroup{}
18+
for i := 0; i < workerCount; i++ {
19+
wg.Add(1)
20+
go func(id int) {
21+
defer wg.Done()
22+
23+
if err := svc.runWorker(ctx, id, interval); err != nil {
24+
zap.L().Error("worker-%d failed", zap.Error(err))
25+
}
26+
}(i)
27+
}
28+
wg.Wait()
29+
30+
zap.L().Info("all syncer workers exited")
31+
}
32+
33+
func (svc *Service) runWorker(ctx context.Context, _ int, interval time.Duration) error {
1634
tick := time.NewTimer(interval)
1735
defer tick.Stop()
1836

@@ -27,6 +45,7 @@ func (svc *Service) RunSyncer(ctx context.Context, interval time.Duration) error
2745
err := svc.store.SyncOne(ctx, svc.handleSync)
2846
if err != nil {
2947
zap.L().Warn("SyncOne() failed", zap.Error(err))
48+
continue
3049
}
3150
}
3251
}

modules/firehose/driver_plan.go

+1-3
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,7 @@ import (
1616

1717
const SourceKafkaConsumerAutoOffsetReset = "SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET"
1818

19-
var (
20-
suffixRegex = regexp.MustCompile(`^([A-Za-z0-9-]+)-([0-9]+)$`)
21-
)
19+
var suffixRegex = regexp.MustCompile(`^([A-Za-z0-9-]+)-([0-9]+)$`)
2220

2321
var errCauseInvalidNamespaceUpdate = "cannot update kube namespace of a running firehose"
2422

0 commit comments

Comments
 (0)