@@ -377,18 +377,24 @@ def row_to_singer_message(stream, row, version, columns, time_extracted, md_map,
377
377
time_extracted = time_extracted )
378
378
379
379
380
- # pylint: disable=unused-argument,too-many-locals
381
- def consume_message (streams , state , msg , time_extracted , conn_info ):
382
- # Strip leading comma generated by write-in-chunks and parse valid JSON
383
- try :
384
- payload = json .loads (msg .payload .lstrip (',' ))
385
- except Exception :
386
- return state
380
+ def check_for_new_columns (columns , target_stream , conn_info ):
381
+ diff = set (columns ).difference (target_stream ['schema' ]['properties' ].keys ())
387
382
388
- lsn = msg .data_start
383
+ if diff :
384
+ LOGGER .info ('Detected new columns "%s", refreshing schema of stream %s' , diff , target_stream ['stream' ])
385
+ # encountered a column that is not in the schema
386
+ # refresh the stream schema and metadata by running discovery
387
+ refresh_streams_schema (conn_info , [target_stream ])
389
388
390
- streams_lookup = {s ['tap_stream_id' ]: s for s in streams }
389
+ # add the automatic properties back to the stream
390
+ add_automatic_properties (target_stream , conn_info .get ('debug_lsn' , False ))
391
391
392
+ # publish new schema
393
+ sync_common .send_schema_message (target_stream , ['lsn' ])
394
+
395
+
396
+ # pylint: disable=too-many-locals
397
+ def consume_message_format_1 (payload , conn_info , streams_lookup , state , time_extracted , lsn ):
392
398
tap_stream_id = post_db .compute_tap_stream_id (payload ['schema' ], payload ['table' ])
393
399
if streams_lookup .get (tap_stream_id ) is None :
394
400
return state
@@ -400,22 +406,8 @@ def consume_message(streams, state, msg, time_extracted, conn_info):
400
406
401
407
# Get the additional fields in payload that are not in schema properties:
402
408
# only inserts and updates have the list of columns that can be used to detect any different in columns
403
- diff = set ()
404
409
if payload ['kind' ] in {'insert' , 'update' }:
405
- diff = set (payload ['columnnames' ]).difference (target_stream ['schema' ]['properties' ].keys ())
406
-
407
- # if there is new columns in the payload that are not in the schema properties then refresh the stream schema
408
- if diff :
409
- LOGGER .info ('Detected new columns "%s", refreshing schema of stream %s' , diff , target_stream ['stream' ])
410
- # encountered a column that is not in the schema
411
- # refresh the stream schema and metadata by running discovery
412
- refresh_streams_schema (conn_info , [target_stream ])
413
-
414
- # add the automatic properties back to the stream
415
- add_automatic_properties (target_stream , conn_info .get ('debug_lsn' , False ))
416
-
417
- # publish new schema
418
- sync_common .send_schema_message (target_stream , ['lsn' ])
410
+ check_for_new_columns (payload ['columnnames' ], target_stream , conn_info )
419
411
420
412
stream_version = get_stream_version (target_stream ['tap_stream_id' ], state )
421
413
stream_md_map = metadata .to_map (target_stream ['metadata' ])
@@ -476,6 +468,109 @@ def consume_message(streams, state, msg, time_extracted, conn_info):
476
468
return state
477
469
478
470
471
+ def consume_message_format_2 (payload , conn_info , streams_lookup , state , time_extracted , lsn ):
472
+ ## Action Types:
473
+ # I = Insert
474
+ # U = Update
475
+ # D = Delete
476
+ # B = Begin Transaction
477
+ # C = Commit Transaction
478
+ # M = Message
479
+ # T = Truncate
480
+ action = payload ['action' ]
481
+ if action not in {'U' , 'I' , 'D' }:
482
+ raise UnsupportedPayloadKindError (f"unrecognized replication operation: { action } " )
483
+
484
+ tap_stream_id = post_db .compute_tap_stream_id (payload ['schema' ], payload ['table' ])
485
+ if streams_lookup .get (tap_stream_id ) is not None :
486
+ target_stream = streams_lookup [tap_stream_id ]
487
+
488
+ # Get the additional fields in payload that are not in schema properties:
489
+ # only inserts and updates have the list of columns that can be used to detect any different in columns
490
+ if payload ['action' ] in {'I' , 'U' }:
491
+ check_for_new_columns ({column ['name' ] for column in payload ['columns' ]}, target_stream , conn_info )
492
+
493
+ stream_version = get_stream_version (target_stream ['tap_stream_id' ], state )
494
+ stream_md_map = metadata .to_map (target_stream ['metadata' ])
495
+
496
+ desired_columns = {c for c in target_stream ['schema' ]['properties' ].keys () if sync_common .should_sync_column (
497
+ stream_md_map , c )}
498
+
499
+ stream_version = get_stream_version (target_stream ['tap_stream_id' ], state )
500
+ stream_md_map = metadata .to_map (target_stream ['metadata' ])
501
+
502
+ desired_columns = [
503
+ col for col in target_stream ['schema' ]['properties' ].keys ()
504
+ if sync_common .should_sync_column (stream_md_map , col )
505
+ ]
506
+
507
+ col_names = []
508
+ col_vals = []
509
+ if payload ['action' ] in ['I' , 'U' ]:
510
+ for column in payload ['columns' ]:
511
+ if column ['name' ] in set (desired_columns ):
512
+ col_names .append (column ['name' ])
513
+ col_vals .append (column ['value' ])
514
+
515
+ col_names = col_names + ['_sdc_deleted_at' ]
516
+ col_vals = col_vals + [None ]
517
+
518
+ if conn_info .get ('debug_lsn' ):
519
+ col_names = col_names + ['_sdc_lsn' ]
520
+ col_vals = col_vals + [str (lsn )]
521
+
522
+ elif payload ['action' ] == 'D' :
523
+ for column in payload ['identity' ]:
524
+ if column ['name' ] in set (desired_columns ):
525
+ col_names .append (column ['name' ])
526
+ col_vals .append (column ['value' ])
527
+
528
+ col_names = col_names + ['_sdc_deleted_at' ]
529
+ col_vals = col_vals + [singer .utils .strftime (singer .utils .strptime_to_utc (payload ['timestamp' ]))]
530
+
531
+ if conn_info .get ('debug_lsn' ):
532
+ col_vals = col_vals + [str (lsn )]
533
+ col_names = col_names + ['_sdc_lsn' ]
534
+
535
+ # Write 1 record to match the API of V1
536
+ record_message = row_to_singer_message (
537
+ target_stream ,
538
+ col_vals ,
539
+ stream_version ,
540
+ col_names ,
541
+ time_extracted ,
542
+ stream_md_map ,
543
+ conn_info ,
544
+ )
545
+
546
+ singer .write_message (record_message )
547
+ state = singer .write_bookmark (state , target_stream ['tap_stream_id' ], 'lsn' , lsn )
548
+
549
+ return state
550
+
551
+
552
+ def consume_message (streams , state , msg , time_extracted , conn_info ):
553
+ # Strip leading comma generated by write-in-chunks and parse valid JSON
554
+ try :
555
+ payload = json .loads (msg .payload .lstrip (',' ))
556
+ except Exception :
557
+ return state
558
+
559
+ lsn = msg .data_start
560
+
561
+ streams_lookup = {s ['tap_stream_id' ]: s for s in streams }
562
+
563
+ message_format = conn_info ['wal2json_message_format' ]
564
+ if message_format == 1 :
565
+ state = consume_message_format_1 (payload , conn_info , streams_lookup , state , time_extracted , lsn )
566
+ elif message_format == 2 :
567
+ state = consume_message_format_2 (payload , conn_info , streams_lookup , state , time_extracted , lsn )
568
+ else :
569
+ raise Exception (f"Unknown wal2json message format version: { message_format } " )
570
+
571
+ return state
572
+
573
+
479
574
def generate_replication_slot_name (dbname , tap_id = None , prefix = 'pipelinewise' ):
480
575
"""Generate replication slot name with
481
576
@@ -591,14 +686,19 @@ def sync_tables(conn_info, logical_streams, state, end_lsn, state_file):
591
686
int_to_lsn (end_lsn ),
592
687
slot )
593
688
# psycopg2 2.8.4 will send a keep-alive message to postgres every status_interval
689
+ options = {
690
+ 'add-tables' : streams_to_wal2json_tables (logical_streams ),
691
+ 'format-version' : conn_info ['wal2json_message_format' ],
692
+ }
693
+ if options ['format-version' ] == 1 :
694
+ options ['write-in-chunks' ] = 1
695
+ else :
696
+ options ['actions' ] = ['insert' , 'update' , 'delete' ]
594
697
cur .start_replication (slot_name = slot ,
595
698
decode = True ,
596
699
start_lsn = start_lsn ,
597
700
status_interval = poll_interval ,
598
- options = {
599
- 'write-in-chunks' : 1 ,
600
- 'add-tables' : streams_to_wal2json_tables (logical_streams )
601
- })
701
+ options = options )
602
702
603
703
except psycopg2 .ProgrammingError as ex :
604
704
raise Exception (f"Unable to start replication with logical replication (slot { ex } )" ) from ex
0 commit comments