Skip to content

Commit 1545a31

Browse files
harisolovromazgon
andauthored
Generate connector.yaml (ConduitIO#232)
* Generate connector.yaml * Add connector.yaml * generate connector.yaml * Set version * more updates * make build * update sdk * Migrate to specgen * go.sum fix * lint * Upgrade SDK * Update connector.yaml Co-authored-by: Lovro Mažgon <[email protected]> * validate generated files * rename * update sdk, use conn-sdk-cli * use readmegen * remove local replacement * add phony generate * respect logrepl.withAvroSchema * use latest sdk from main * inline errors * receivers * upgrade conduit-connector-sdk to v0.13.0 * inline errors * make generate --------- Co-authored-by: Lovro Mažgon <[email protected]>
1 parent 626ea6e commit 1545a31

18 files changed

+707
-340
lines changed

.github/workflows/release.yml

+45-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ name: release
33
on:
44
push:
55
tags:
6-
- v*
6+
- '*'
77

88
permissions:
99
contents: write
@@ -18,6 +18,50 @@ jobs:
1818
with:
1919
fetch-depth: 0
2020

21+
- name: Validate Tag Format
22+
run: |
23+
TAG=${GITHUB_REF#refs/tags/}
24+
25+
SV_REGEX="^v(0|[1-9][0-9]*)\.(0|[1-9][0-9]*)\.(0|[1-9][0-9]*)(-((0|[1-9][0-9]*|[0-9]*[a-zA-Z-][0-9a-zA-Z-]*)(\.(0|[1-9][0-9]*|[0-9]*[a-zA-Z-][0-9a-zA-Z-]*))*))?(\+([0-9a-zA-Z-]+(\.[0-9a-zA-Z-]+)*))?$"
26+
27+
if ! [[ $TAG =~ $SV_REGEX ]]; then
28+
echo "$TAG is NOT a valid tag (expected format: v<semver>)"
29+
exit 1
30+
fi
31+
32+
- name: Check Version Consistency
33+
run: |
34+
# Extract tag and remove 'v' prefix if exists
35+
TAG=${GITHUB_REF#refs/tags/}
36+
37+
# Read version from connector.yaml
38+
YAML_VERSION=$(yq e '.specification.version' connector.yaml)
39+
40+
# Compare versions
41+
if [[ "$TAG" != "$YAML_VERSION" ]]; then
42+
echo "Version mismatch detected:"
43+
echo "Git Tag: $TAG"
44+
echo "connector.yaml Version: $YAML_VERSION"
45+
exit 1
46+
fi
47+
48+
- name: Delete Invalid Tag
49+
if: failure()
50+
uses: actions/github-script@v7
51+
with:
52+
github-token: ${{secrets.GITHUB_TOKEN}}
53+
script: |
54+
const tag = context.ref.replace('refs/tags/', '')
55+
try {
56+
await github.rest.git.deleteRef({
57+
owner: context.repo.owner,
58+
repo: context.repo.repo,
59+
ref: `tags/${tag}`
60+
})
61+
} catch (error) {
62+
console.log('Error deleting tag:', error)
63+
}
64+
2165
- name: Set up Go
2266
uses: actions/setup-go@v5
2367
with:

Makefile

+1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ lint:
1919
.PHONY: generate
2020
generate:
2121
go generate ./...
22+
conn-sdk-cli readmegen -w
2223

2324
.PHONY: fmt
2425
fmt:

README.md

+219-64
Large diffs are not rendered by default.

connector.go

+10-1
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,23 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
//go:generate conn-sdk-cli specgen
16+
1517
package postgres
1618

1719
import (
20+
_ "embed"
21+
1822
sdk "github.com/conduitio/conduit-connector-sdk"
1923
)
2024

25+
//go:embed connector.yaml
26+
var specs string
27+
28+
var version = "(devel)"
29+
2130
var Connector = sdk.Connector{
22-
NewSpecification: Specification,
31+
NewSpecification: sdk.YAMLSpecification(specs, version),
2332
NewSource: NewSource,
2433
NewDestination: NewDestination,
2534
}

connector.yaml

+299
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,299 @@
1+
version: "1.0"
2+
specification:
3+
name: postgres
4+
summary: Conduit connector for PostgreSQL
5+
description: |
6+
## Source
7+
8+
The Postgres Source Connector connects to a database with the provided `url` and
9+
starts creating records for each change detected in the provided tables.
10+
11+
Upon starting, the source takes a snapshot of the provided tables in the database,
12+
then switches into CDC mode. In CDC mode, the plugin reads from a buffer of CDC events.
13+
14+
### Snapshot
15+
16+
When the connector first starts, snapshot mode is enabled. The connector acquires
17+
a read-only lock on the tables, and then reads all rows of the tables into Conduit.
18+
Once all rows in that initial snapshot are read the connector releases its lock and
19+
switches into CDC mode.
20+
21+
This behavior is enabled by default, but can be turned off by adding
22+
`"snapshotMode": "never"` to the Source configuration.
23+
24+
### Change Data Capture
25+
26+
This connector implements Change Data Capture (CDC) features for PostgreSQL by
27+
creating a logical replication slot and a publication that listens to changes in the
28+
configured tables. Every detected change is converted into a record. If there are no
29+
records available, the connector blocks until a record is available or the connector
30+
receives a stop signal.
31+
32+
#### Logical Replication Configuration
33+
34+
When the connector switches to CDC mode, it attempts to run the initial setup commands
35+
to create its logical replication slot and publication. It will connect to an existing
36+
slot if one with the configured name exists.
37+
38+
The Postgres user specified in the connection URL must have sufficient privileges to
39+
run all of these setup commands, or it will fail.
40+
41+
Example pipeline configuration that's using logical replication:
42+
43+
```yaml
44+
version: 2.2
45+
pipelines:
46+
- id: pg-to-log
47+
status: running
48+
connectors:
49+
- id: pg
50+
type: source
51+
plugin: builtin:postgres
52+
settings:
53+
url: "postgres://exampleuser:examplepass@localhost:5433/exampledb?sslmode=disable"
54+
tables: "users"
55+
cdcMode: "logrepl"
56+
logrepl.publicationName: "examplepub"
57+
logrepl.slotName": "exampleslot"
58+
- id: log
59+
type: destination
60+
plugin: builtin:log
61+
settings:
62+
level: info
63+
```
64+
65+
:warning: When the connector or pipeline is deleted, the connector will automatically
66+
attempt to delete the replication slot and publication. This is the default behaviour
67+
and can be disabled by setting `logrepl.autoCleanup` to `false`.
68+
69+
### Key Handling
70+
71+
The connector will automatically look up the primary key column for the specified tables
72+
and use them as the key value. If that can't be determined, the connector will return
73+
an error.
74+
75+
## Destination
76+
77+
The Postgres Destination takes a Conduit record and stores it using a SQL statement.
78+
The Destination is designed to handle different payloads and keys. Because of this,
79+
each record is individually parsed and upserted.
80+
81+
### Handling record operations
82+
83+
Based on the `Operation` field in the record, the destination will either insert,
84+
update or delete the record in the target table. Snapshot records are always inserted.
85+
86+
If the target table already contains a record with the same key as a record being
87+
inserted, the record will be updated (upserted). This can overwrite and thus potentially
88+
lose data, so keys should be assigned correctly from the Source.
89+
90+
If the target table does not contain a record with the same key as a record being
91+
deleted, the record will be ignored.
92+
93+
If there is no key, the record will be simply appended.
94+
version: v0.11.0-dev
95+
author: Meroxa, Inc.
96+
source:
97+
parameters:
98+
- name: url
99+
description: URL is the connection string for the Postgres database.
100+
type: string
101+
default: ""
102+
validations:
103+
- type: required
104+
value: ""
105+
- name: cdcMode
106+
description: CDCMode determines how the connector should listen to changes.
107+
type: string
108+
default: auto
109+
validations:
110+
- type: inclusion
111+
value: auto,logrepl
112+
- name: logrepl.autoCleanup
113+
description: |-
114+
LogreplAutoCleanup determines if the replication slot and publication should be
115+
removed when the connector is deleted.
116+
type: bool
117+
default: "true"
118+
validations: []
119+
- name: logrepl.publicationName
120+
description: |-
121+
LogreplPublicationName determines the publication name in case the
122+
connector uses logical replication to listen to changes (see CDCMode).
123+
type: string
124+
default: conduitpub
125+
validations: []
126+
- name: logrepl.slotName
127+
description: |-
128+
LogreplSlotName determines the replication slot name in case the
129+
connector uses logical replication to listen to changes (see CDCMode).
130+
type: string
131+
default: conduitslot
132+
validations: []
133+
- name: logrepl.withAvroSchema
134+
description: |-
135+
WithAvroSchema determines whether the connector should attach an avro schema on each
136+
record.
137+
type: bool
138+
default: "true"
139+
validations: []
140+
- name: sdk.batch.delay
141+
description: Maximum delay before an incomplete batch is read from the source.
142+
type: duration
143+
default: "0"
144+
validations:
145+
- type: greater-than
146+
value: "-1"
147+
- name: sdk.batch.size
148+
description: Maximum size of batch before it gets read from the source.
149+
type: int
150+
default: "0"
151+
validations:
152+
- type: greater-than
153+
value: "-1"
154+
- name: sdk.schema.context.enabled
155+
description: |-
156+
Specifies whether to use a schema context name. If set to false, no schema context name will
157+
be used, and schemas will be saved with the subject name specified in the connector
158+
(not safe because of name conflicts).
159+
type: bool
160+
default: "true"
161+
validations: []
162+
- name: sdk.schema.context.name
163+
description: |-
164+
Schema context name to be used. Used as a prefix for all schema subject names.
165+
If empty, defaults to the connector ID.
166+
type: string
167+
default: ""
168+
validations: []
169+
- name: sdk.schema.extract.key.enabled
170+
description: Whether to extract and encode the record key with a schema.
171+
type: bool
172+
default: "false"
173+
validations: []
174+
- name: sdk.schema.extract.key.subject
175+
description: |-
176+
The subject of the key schema. If the record metadata contains the field
177+
"opencdc.collection" it is prepended to the subject name and separated
178+
with a dot.
179+
type: string
180+
default: key
181+
validations: []
182+
- name: sdk.schema.extract.payload.enabled
183+
description: Whether to extract and encode the record payload with a schema.
184+
type: bool
185+
default: "false"
186+
validations: []
187+
- name: sdk.schema.extract.payload.subject
188+
description: |-
189+
The subject of the payload schema. If the record metadata contains the
190+
field "opencdc.collection" it is prepended to the subject name and
191+
separated with a dot.
192+
type: string
193+
default: payload
194+
validations: []
195+
- name: sdk.schema.extract.type
196+
description: The type of the payload schema.
197+
type: string
198+
default: avro
199+
validations:
200+
- type: inclusion
201+
value: avro
202+
- name: snapshot.fetchSize
203+
description: Snapshot fetcher size determines the number of rows to retrieve at a time.
204+
type: int
205+
default: "50000"
206+
validations: []
207+
- name: snapshotMode
208+
description: SnapshotMode is whether the plugin will take a snapshot of the entire table before starting cdc mode.
209+
type: string
210+
default: initial
211+
validations:
212+
- type: inclusion
213+
value: initial,never
214+
- name: table
215+
description: 'Deprecated: use `tables` instead.'
216+
type: string
217+
default: ""
218+
validations: []
219+
- name: tables
220+
description: |-
221+
Tables is a List of table names to read from, separated by a comma, e.g.:"table1,table2".
222+
Use "*" if you'd like to listen to all tables.
223+
type: string
224+
default: ""
225+
validations: []
226+
destination:
227+
parameters:
228+
- name: url
229+
description: URL is the connection string for the Postgres database.
230+
type: string
231+
default: ""
232+
validations:
233+
- type: required
234+
value: ""
235+
- name: key
236+
description: Key represents the column name for the key used to identify and update existing rows.
237+
type: string
238+
default: ""
239+
validations: []
240+
- name: sdk.batch.delay
241+
description: Maximum delay before an incomplete batch is written to the destination.
242+
type: duration
243+
default: "0"
244+
validations: []
245+
- name: sdk.batch.size
246+
description: Maximum size of batch before it gets written to the destination.
247+
type: int
248+
default: "0"
249+
validations:
250+
- type: greater-than
251+
value: "-1"
252+
- name: sdk.rate.burst
253+
description: |-
254+
Allow bursts of at most X records (0 or less means that bursts are not
255+
limited). Only takes effect if a rate limit per second is set. Note that
256+
if `sdk.batch.size` is bigger than `sdk.rate.burst`, the effective batch
257+
size will be equal to `sdk.rate.burst`.
258+
type: int
259+
default: "0"
260+
validations:
261+
- type: greater-than
262+
value: "-1"
263+
- name: sdk.rate.perSecond
264+
description: Maximum number of records written per second (0 means no rate limit).
265+
type: float
266+
default: "0"
267+
validations:
268+
- type: greater-than
269+
value: "-1"
270+
- name: sdk.record.format
271+
description: |-
272+
The format of the output record. See the Conduit documentation for a full
273+
list of supported formats (https://conduit.io/docs/using/connectors/configuration-parameters/output-format).
274+
type: string
275+
default: opencdc/json
276+
validations: []
277+
- name: sdk.record.format.options
278+
description: |-
279+
Options to configure the chosen output record format. Options are normally
280+
key=value pairs separated with comma (e.g. opt1=val2,opt2=val2), except
281+
for the `template` record format, where options are a Go template.
282+
type: string
283+
default: ""
284+
validations: []
285+
- name: sdk.schema.extract.key.enabled
286+
description: Whether to extract and decode the record key with a schema.
287+
type: bool
288+
default: "true"
289+
validations: []
290+
- name: sdk.schema.extract.payload.enabled
291+
description: Whether to extract and decode the record payload with a schema.
292+
type: bool
293+
default: "true"
294+
validations: []
295+
- name: table
296+
description: Table is used as the target table into which records are inserted.
297+
type: string
298+
default: '{{ index .Metadata "opencdc.collection" }}'
299+
validations: []

0 commit comments

Comments
 (0)