Skip to content

Commit d997dab

Browse files
committed
Validate logical replication slot name config
1 parent e468f39 commit d997dab

File tree

5 files changed

+54
-4
lines changed

5 files changed

+54
-4
lines changed

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,8 @@ pipelines:
132132
logrepl.publicationName: "conduitpub"
133133
# LogreplSlotName determines the replication slot name in case the
134134
# connector uses logical replication to listen to changes (see
135-
# CDCMode).
135+
# CDCMode). Can only contain lower-case letters, numbers, and the
136+
# underscore character.
136137
# Type: string
137138
# Required: no
138139
logrepl.slotName: "conduitslot"

connector.yaml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,9 +136,12 @@ specification:
136136
description: |-
137137
LogreplSlotName determines the replication slot name in case the
138138
connector uses logical replication to listen to changes (see CDCMode).
139+
Can only contain lower-case letters, numbers, and the underscore character.
139140
type: string
140141
default: conduitslot
141-
validations: []
142+
validations:
143+
- type: regex
144+
value: ^[a-z0-9_]+$
142145
- name: logrepl.withAvroSchema
143146
description: |-
144147
WithAvroSchema determines whether the connector should attach an avro schema on each

source/config.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,8 @@ type Config struct {
6969
LogreplPublicationName string `json:"logrepl.publicationName" default:"conduitpub"`
7070
// LogreplSlotName determines the replication slot name in case the
7171
// connector uses logical replication to listen to changes (see CDCMode).
72-
LogreplSlotName string `json:"logrepl.slotName" default:"conduitslot"`
72+
// Can only contain lower-case letters, numbers, and the underscore character.
73+
LogreplSlotName string `json:"logrepl.slotName" validate:"regex=^[a-z0-9_]+$" default:"conduitslot"`
7374

7475
// LogreplAutoCleanup determines if the replication slot and publication should be
7576
// removed when the connector is deleted.

source/logrepl/internal/publication_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ func TestCreatePublication(t *testing.T) {
2727
ctx := test.Context(t)
2828
pool := test.ConnectPool(ctx, t, test.RegularConnString)
2929

30-
pubNames := []string{"testpub", "123", "test-hyphen", "test=equal"}
30+
pubNames := []string{"testpub", "123", "test-hyphen", "test:semicolon", "test.dot", "test=equal"}
3131
pubParams := [][]string{
3232
nil,
3333
{"publish = 'insert'"},

source_integration_test.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ import (
1919
"strings"
2020
"testing"
2121

22+
"github.com/conduitio/conduit-commons/config"
23+
"github.com/conduitio/conduit-connector-postgres/source"
2224
"github.com/conduitio/conduit-connector-postgres/source/logrepl"
2325
"github.com/conduitio/conduit-connector-postgres/test"
2426
sdk "github.com/conduitio/conduit-connector-sdk"
@@ -66,3 +68,46 @@ func TestSource_Open(t *testing.T) {
6668
is.NoErr(s.Teardown(ctx))
6769
}()
6870
}
71+
72+
func TestSource_ParseConfig(t *testing.T) {
73+
testCases := []struct {
74+
name string
75+
cfg config.Config
76+
wantErr bool
77+
}{
78+
{
79+
name: "valid postgres replication slot name",
80+
cfg: config.Config{
81+
"url": "postgresql://meroxauser:[email protected]:5432/meroxadb",
82+
"tables": "table1,table2",
83+
"cdcMode": "logrepl",
84+
"logrepl.slotName": "valid_slot_name",
85+
},
86+
wantErr: false,
87+
}, {
88+
name: "invalid postgres replication slot name",
89+
cfg: config.Config{
90+
"url": "postgresql://meroxauser:[email protected]:5432/meroxadb",
91+
"tables": "table1,table2",
92+
"cdcMode": "logrepl",
93+
"logrepl.slotName": "invalid:slot.name",
94+
},
95+
wantErr: true,
96+
},
97+
}
98+
99+
for _, tc := range testCases {
100+
t.Run(tc.name, func(t *testing.T) {
101+
is := is.New(t)
102+
103+
var cfg source.Config
104+
err := sdk.Util.ParseConfig(context.Background(), tc.cfg, cfg, Connector.NewSpecification().SourceParams)
105+
106+
if tc.wantErr {
107+
is.True(err != nil)
108+
return
109+
}
110+
is.NoErr(err)
111+
})
112+
}
113+
}

0 commit comments

Comments
 (0)