diff --git a/cmd/internal/planetscale_edge_database.go b/cmd/internal/planetscale_edge_database.go index 383ea55..5dcd24b 100644 --- a/cmd/internal/planetscale_edge_database.go +++ b/cmd/internal/planetscale_edge_database.go @@ -279,17 +279,28 @@ func (p PlanetScaleEdgeDatabase) Read(ctx context.Context, w io.Writer, ps Plane } } -func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, syncMode string, tc *psdbconnect.TableCursor, stopPosition string, s Stream, ps PlanetScaleSource, tabletType psdbconnect.TabletType, readDuration time.Duration) (*psdbconnect.TableCursor, int, error) { +func (p PlanetScaleEdgeDatabase) sync( + ctx context.Context, syncMode string, + tc *psdbconnect.TableCursor, + stopPosition string, + s Stream, + ps PlanetScaleSource, + tabletType psdbconnect.TabletType, + readDuration time.Duration, +) (*psdbconnect.TableCursor, int, error) { preamble := fmt.Sprintf("[%v:%v:%v shard : %v] ", s.Namespace, TabletTypeToString(tabletType), s.Name, tc.Shard) + logf := func(level, message string, args ...any) { + p.Logger.Log(level, fmt.Sprintf("%s%s", preamble, fmt.Sprintf(message, args...))) + } defer p.Logger.Flush() + ctx, cancel := context.WithTimeout(ctx, readDuration) defer cancel() var ( err error vtgateClient vtgateservice.VitessClient - fields []*query.Field ) vtgateClient, conn, err := p.initializeVTGateClient(ctx, ps) @@ -306,17 +317,17 @@ func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, syncMode string, tc * isFullSync := syncMode == "full" vtgateReq := buildVStreamRequest(tabletType, s.Name, tc.Shard, tc.Keyspace, tc.Position, tc.LastKnownPk) - p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%sRequesting VStream with %+v", preamble, vtgateReq)) + logf(LOGLEVEL_INFO, "Requesting VStream with %+v", vtgateReq) if isFullSync { - p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%sWill stop once COPY COMPLETED event is seen.", preamble)) + logf(LOGLEVEL_INFO, "Will stop once COPY COMPLETED event is seen.") } else { - p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%sWill stop once stop position [%+v] is found.", preamble, stopPosition)) + logf(LOGLEVEL_INFO, "Will stop once stop position [%+v] is found.", stopPosition) } c, err := vtgateClient.VStream(ctx, vtgateReq) if err != nil { - p.Logger.Log(LOGLEVEL_ERROR, fmt.Sprintf("%sExiting sync due to client sync error: %+v", preamble, err)) + logf(LOGLEVEL_ERROR, "Exiting sync due to client sync error: %+v", err) return tc, 0, err } @@ -325,126 +336,176 @@ func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, syncMode string, tc * keyspaceOrDatabase = ps.Database } - copyCompletedSeen := false - // Can finish sync once we've synced to the stop position, or finished the VStream COPY phase - canFinishSync := false - resultCount := 0 + var ( + // Can finish sync once we've synced to the stop position, or finished the + // VStream COPY phase + canFinishSync bool + + // Keep track of last seen lastFields. We may receive a field event in one + // recv call, and row events in another. + lastFields []*query.Field + + resultCount int + ) for { - res, err := c.Recv() + vstreamResp, err := syncRecv(logf, c) if err != nil { - if s, ok := status.FromError(err); ok && s.Code() == codes.DeadlineExceeded { - // No next VGTID found - p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%sExiting sync and flushing records because no new VGTID found after last position or deadline exceeded %+v", preamble, tc)) - return tc, resultCount, err - } else if errors.Is(err, io.EOF) { - // EOF is an acceptable error indicating VStream is finished. - p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%sExiting sync and flushing records because EOF encountered at position %+v", preamble, tc)) - return tc, resultCount, io.EOF - } else { - p.Logger.Log(LOGLEVEL_ERROR, fmt.Sprintf("%sExiting sync and flushing records due to error: %+v", preamble, err)) - return tc, resultCount, err - } + logf(LOGLEVEL_INFO, "Exiting sync and flushing records at position %+v: %v", tc, err) + return tc, resultCount, err } - var rows []*query.QueryResult - for _, event := range res.Events { - switch event.Type { - case binlogdata.VEventType_VGTID: - vgtid := event.GetVgtid().ShardGtids[0] - if vgtid != nil { - tc.Position = vgtid.Gtid - if vgtid.TablePKs != nil { - tablePK := vgtid.TablePKs[0] - if tablePK != nil { - // Setting LastKnownPk allows a COPY phase to pick up where it left off - lastPK := tablePK.Lastpk - tc.LastKnownPk = lastPK - } else { - tc.LastKnownPk = nil - } - } else { - tc.LastKnownPk = nil - } - } - case binlogdata.VEventType_LASTPK: - if event.LastPKEvent.TableLastPK != nil { - // Only update last PK because we're in a COPY phase - tc = &psdbconnect.TableCursor{ - Shard: tc.Shard, - Keyspace: tc.Keyspace, - LastKnownPk: event.LastPKEvent.TableLastPK.Lastpk, - Position: tc.Position, - } - p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%sLASTPK event found, setting last PK to %+v", preamble, tc)) - } - case binlogdata.VEventType_FIELD: - // Save fields for processing - fields = event.FieldEvent.Fields - p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%sFIELD event found, setting fields to %+v", preamble, fields)) - case binlogdata.VEventType_ROW: - // Collect rows for processing - for _, change := range event.RowEvent.RowChanges { - if change.After != nil { - rows = append(rows, &query.QueryResult{ - Fields: fields, - Rows: []*query.Row{change.After}, - }) - } - } - case binlogdata.VEventType_COPY_COMPLETED: - p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%sCOPY_COMPLETED event found, copy phase finished", preamble)) - copyCompletedSeen = true - case binlogdata.VEventType_BEGIN, binlogdata.VEventType_COMMIT, - binlogdata.VEventType_DDL, binlogdata.VEventType_DELETE, - binlogdata.VEventType_GTID, binlogdata.VEventType_HEARTBEAT, - binlogdata.VEventType_INSERT, binlogdata.VEventType_JOURNAL, - binlogdata.VEventType_OTHER, binlogdata.VEventType_REPLACE, - binlogdata.VEventType_ROLLBACK, binlogdata.VEventType_SAVEPOINT, - binlogdata.VEventType_SET, binlogdata.VEventType_UNKNOWN, - binlogdata.VEventType_UPDATE, binlogdata.VEventType_VERSION: - // No special handling. - default: - panic(fmt.Sprintf("unexpected binlogdata.VEventType: %#v", event.Type)) - } + summary, err := summarizeVStreamEvents(logf, vstreamResp.Events, lastFields) + if err != nil { + return tc, resultCount, err + } + + // Keep track of last seen lastFields. We may receive a field event in one + // recv call, and row events in another. + lastFields = summary.fields + + // TODO: only update these after rows have been flushed to Airbyte. + if summary.position != "" { + tc.Position = summary.position + } + if summary.lastKnownPk != nil { + tc.LastKnownPk = summary.lastKnownPk } - if isFullSync && copyCompletedSeen { - p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%sReady to finish sync and flush since copy phase completed or stop VGTID passed", preamble)) + if isFullSync && summary.copyCompletedSeen { + logf(LOGLEVEL_INFO, "Ready to finish sync and flush since copy phase completed or stop VGTID passed") canFinishSync = true } + if !isFullSync && positionEqual(tc.Position, stopPosition) { - p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%sReady to finish sync and flush since stop position [%+v] found", preamble, stopPosition)) + logf(LOGLEVEL_INFO, "Ready to finish sync and flush since stop position [%+v] found", stopPosition) canFinishSync = true } - if len(rows) > 0 { - for _, result := range rows { - qr := sqltypes.Proto3ToResult(result) - for _, row := range qr.Rows { - resultCount += 1 - sqlResult := &sqltypes.Result{ - Fields: fields, - } - sqlResult.Rows = append(sqlResult.Rows, row) - // Results queued to Airbyte here, and flushed at the end of sync() - p.printQueryResult(sqlResult, keyspaceOrDatabase, s.Name, &ps) + for _, result := range summary.results { + qr := sqltypes.Proto3ToResult(result) + for _, row := range qr.Rows { + resultCount += 1 + sqlResult := &sqltypes.Result{ + Fields: result.Fields, + Rows: []sqltypes.Row{row}, } + // Results queued to Airbyte here, and flushed at the end of sync() + p.printQueryResult(sqlResult, keyspaceOrDatabase, s.Name, &ps) } } - // Exit sync and flush records once the VGTID position is at or past the desired stop position, and we're no longer waiting for COPY phase to complete + // Exit sync and flush records once the VGTID position is at or past the + // desired stop position, and we're no longer waiting for COPY phase to + // complete if canFinishSync { if isFullSync { - p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%sExiting full sync and flushing records because COPY_COMPLETED event was seen, current position is %+v, stop position is %+v", preamble, tc.Position, stopPosition)) + logf(LOGLEVEL_INFO, + "Exiting full sync and flushing records because COPY_COMPLETED event was seen, current position is %+v, stop position is %+v", + tc.Position, stopPosition) } else { - p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%sExiting incremental sync and flushing records because current position %+v has reached or passed stop position %+v", preamble, tc.Position, stopPosition)) + logf(LOGLEVEL_INFO, + "Exiting incremental sync and flushing records because current position %+v has reached or passed stop position %+v", + tc.Position, stopPosition) } return tc, resultCount, io.EOF } } } +func syncRecv( + logf func(level, message string, args ...any), + c vtgateservice.Vitess_VStreamClient, +) (*vtgate.VStreamResponse, error) { + resp, err := c.Recv() + if err != nil { + s, ok := status.FromError(err) + switch { + case ok && s.Code() == codes.DeadlineExceeded: + // No next VGTID found within deadline. + logf(LOGLEVEL_ERROR, "no new VGTID found before deadline exceeded: %v", err) + case errors.Is(err, io.EOF): + // EOF is an acceptable error indicating VStream is finished. + logf(LOGLEVEL_ERROR, "encountered EOF, possibly indicating end of VStream") + } + } + return resp, err +} + +type vstreamEventsSummary struct { + copyCompletedSeen bool + fields []*query.Field + lastKnownPk *query.QueryResult + position string + results []*query.QueryResult +} + +func summarizeVStreamEvents( + logf func(level, message string, args ...any), + events []*binlogdata.VEvent, + lastFields []*query.Field, +) (vstreamEventsSummary, error) { + summary := vstreamEventsSummary{fields: lastFields} + + for _, event := range events { + switch event.Type { + case binlogdata.VEventType_VGTID: + vgtid := event.GetVgtid().ShardGtids[0] + if vgtid != nil { + summary.position = vgtid.Gtid + if vgtid.TablePKs != nil { + tablePK := vgtid.TablePKs[0] + if tablePK != nil { + // Setting LastKnownPk allows a COPY phase to pick up where it left off + lastPK := tablePK.Lastpk + summary.lastKnownPk = lastPK + } else { + summary.lastKnownPk = nil + } + } else { + summary.lastKnownPk = nil + } + } + case binlogdata.VEventType_LASTPK: + if event.LastPKEvent.TableLastPK != nil { + // Only update last PK because we're in a COPY phase + logf(LOGLEVEL_INFO, "LASTPK event found, setting last PK to %+v", event.LastPKEvent.TableLastPK.Lastpk) + summary.lastKnownPk = event.LastPKEvent.TableLastPK.Lastpk + } + case binlogdata.VEventType_FIELD: + // Save fields for processing + logf(LOGLEVEL_INFO, "FIELD event found, setting fields to %+v", event.FieldEvent.Fields) + summary.fields = event.FieldEvent.Fields + case binlogdata.VEventType_ROW: + // Collect rows for processing + for _, change := range event.RowEvent.RowChanges { + if change.After != nil { + summary.results = append(summary.results, &query.QueryResult{ + Fields: summary.fields, + Rows: []*query.Row{change.After}, + }) + } + } + case binlogdata.VEventType_COPY_COMPLETED: + logf(LOGLEVEL_INFO, "COPY_COMPLETED event found, copy phase finished") + summary.copyCompletedSeen = true + case binlogdata.VEventType_BEGIN, binlogdata.VEventType_COMMIT, + binlogdata.VEventType_DDL, binlogdata.VEventType_DELETE, + binlogdata.VEventType_GTID, binlogdata.VEventType_HEARTBEAT, + binlogdata.VEventType_INSERT, binlogdata.VEventType_JOURNAL, + binlogdata.VEventType_OTHER, binlogdata.VEventType_REPLACE, + binlogdata.VEventType_ROLLBACK, binlogdata.VEventType_SAVEPOINT, + binlogdata.VEventType_SET, binlogdata.VEventType_UNKNOWN, + binlogdata.VEventType_UPDATE, binlogdata.VEventType_VERSION: + // No special handling. + default: + return summary, fmt.Errorf("unexpected binlogdata.VEventType: %#v", event.Type) + } + } + + return summary, nil +} + func (p PlanetScaleEdgeDatabase) getStopCursorPosition(ctx context.Context, shard, keyspace string, s Stream, ps PlanetScaleSource, tabletType psdbconnect.TabletType) (string, error) { defer p.Logger.Flush() timeout := 45 * time.Second