Skip to content

Commit 4b2253e

Browse files
authored
feat(firehose): check if consumer id is set or generate one (#108)
1 parent dad50ff commit 4b2253e

File tree

2 files changed

+15
-4
lines changed

2 files changed

+15
-4
lines changed

modules/firehose/config.go

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ import (
1111
"github.com/odpf/entropy/pkg/helm"
1212
)
1313

14+
const firehoseConsumerIDStartingSequence = "0001"
15+
1416
var (
1517
//go:embed schema/config.json
1618
completeConfigSchema string
@@ -35,14 +37,23 @@ type moduleConfig struct {
3537
} `json:"firehose"`
3638
}
3739

38-
func (mc *moduleConfig) validate() error {
40+
func (mc *moduleConfig) validateAndSanitize(r resource.Resource) error {
3941
if mc.StopTime != nil && mc.StopTime.Before(time.Now()) {
4042
return errors.ErrInvalid.
4143
WithMsgf("value for stop_time must be greater than current time")
4244
}
45+
46+
if mc.Firehose.KafkaConsumerID == "" {
47+
mc.Firehose.KafkaConsumerID = fmt.Sprintf("%s-%s", generateFirehoseName(r), firehoseConsumerIDStartingSequence)
48+
}
49+
4350
return nil
4451
}
4552

53+
func generateFirehoseName(r resource.Resource) string {
54+
return fmt.Sprintf("%s-%s-firehose", r.Project, r.Name)
55+
}
56+
4657
func (mc moduleConfig) GetHelmReleaseConfig(r resource.Resource) (*helm.ReleaseConfig, error) {
4758
var output Output
4859
err := json.Unmarshal(r.State.Output, &output)
@@ -52,7 +63,7 @@ func (mc moduleConfig) GetHelmReleaseConfig(r resource.Resource) (*helm.ReleaseC
5263
defaults := output.Defaults
5364

5465
rc := helm.DefaultReleaseConfig()
55-
rc.Name = fmt.Sprintf("%s-%s-firehose", r.Project, r.Name)
66+
rc.Name = generateFirehoseName(r)
5667
rc.Repository = defaults.ChartRepository
5768
rc.Chart = defaults.ChartName
5869
rc.Namespace = defaults.Namespace

modules/firehose/plan.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ func (m *firehoseModule) planCreate(res module.ExpandedResource, act module.Acti
2828
if err := json.Unmarshal(act.Params, &reqConf); err != nil {
2929
return nil, errors.ErrInvalid.WithMsgf("invalid config json: %v", err)
3030
}
31-
if err := reqConf.validate(); err != nil {
31+
if err := reqConf.validateAndSanitize(res.Resource); err != nil {
3232
return nil, err
3333
}
3434

@@ -68,7 +68,7 @@ func (m *firehoseModule) planChange(res module.ExpandedResource, act module.Acti
6868
if err := json.Unmarshal(act.Params, &reqConf); err != nil {
6969
return nil, errors.ErrInvalid.WithMsgf("invalid config json: %v", err)
7070
}
71-
if err := reqConf.validate(); err != nil {
71+
if err := reqConf.validateAndSanitize(r); err != nil {
7272
return nil, err
7373
}
7474
conf = reqConf

0 commit comments

Comments
 (0)