Skip to content

surface rather than hide error handling #135

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
261 changes: 161 additions & 100 deletions cmd/internal/planetscale_edge_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,17 +279,28 @@ func (p PlanetScaleEdgeDatabase) Read(ctx context.Context, w io.Writer, ps Plane
}
}

func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, syncMode string, tc *psdbconnect.TableCursor, stopPosition string, s Stream, ps PlanetScaleSource, tabletType psdbconnect.TabletType, readDuration time.Duration) (*psdbconnect.TableCursor, int, error) {
func (p PlanetScaleEdgeDatabase) sync(
ctx context.Context, syncMode string,
tc *psdbconnect.TableCursor,
stopPosition string,
s Stream,
ps PlanetScaleSource,
tabletType psdbconnect.TabletType,
readDuration time.Duration,
) (*psdbconnect.TableCursor, int, error) {
preamble := fmt.Sprintf("[%v:%v:%v shard : %v] ", s.Namespace, TabletTypeToString(tabletType), s.Name, tc.Shard)
logf := func(level, message string, args ...any) {
p.Logger.Log(level, fmt.Sprintf("%s%s", preamble, fmt.Sprintf(message, args...)))
}

defer p.Logger.Flush()

ctx, cancel := context.WithTimeout(ctx, readDuration)
defer cancel()

var (
err error
vtgateClient vtgateservice.VitessClient
fields []*query.Field
)

vtgateClient, conn, err := p.initializeVTGateClient(ctx, ps)
Expand All @@ -306,17 +317,17 @@ func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, syncMode string, tc *

isFullSync := syncMode == "full"
vtgateReq := buildVStreamRequest(tabletType, s.Name, tc.Shard, tc.Keyspace, tc.Position, tc.LastKnownPk)
p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%sRequesting VStream with %+v", preamble, vtgateReq))
logf(LOGLEVEL_INFO, "Requesting VStream with %+v", vtgateReq)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logging change is a very welcome change


if isFullSync {
p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%sWill stop once COPY COMPLETED event is seen.", preamble))
logf(LOGLEVEL_INFO, "Will stop once COPY COMPLETED event is seen.")
} else {
p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%sWill stop once stop position [%+v] is found.", preamble, stopPosition))
logf(LOGLEVEL_INFO, "Will stop once stop position [%+v] is found.", stopPosition)
}

c, err := vtgateClient.VStream(ctx, vtgateReq)
if err != nil {
p.Logger.Log(LOGLEVEL_ERROR, fmt.Sprintf("%sExiting sync due to client sync error: %+v", preamble, err))
logf(LOGLEVEL_ERROR, "Exiting sync due to client sync error: %+v", err)
return tc, 0, err
}

Expand All @@ -325,126 +336,176 @@ func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, syncMode string, tc *
keyspaceOrDatabase = ps.Database
}

copyCompletedSeen := false
// Can finish sync once we've synced to the stop position, or finished the VStream COPY phase
canFinishSync := false
resultCount := 0
var (
// Can finish sync once we've synced to the stop position, or finished the
// VStream COPY phase
canFinishSync bool

// Keep track of last seen lastFields. We may receive a field event in one
// recv call, and row events in another.
lastFields []*query.Field

resultCount int
)

for {
res, err := c.Recv()
vstreamResp, err := syncRecv(logf, c)
if err != nil {
if s, ok := status.FromError(err); ok && s.Code() == codes.DeadlineExceeded {
// No next VGTID found
p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%sExiting sync and flushing records because no new VGTID found after last position or deadline exceeded %+v", preamble, tc))
return tc, resultCount, err
} else if errors.Is(err, io.EOF) {
// EOF is an acceptable error indicating VStream is finished.
p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%sExiting sync and flushing records because EOF encountered at position %+v", preamble, tc))
return tc, resultCount, io.EOF
} else {
p.Logger.Log(LOGLEVEL_ERROR, fmt.Sprintf("%sExiting sync and flushing records due to error: %+v", preamble, err))
return tc, resultCount, err
}
logf(LOGLEVEL_INFO, "Exiting sync and flushing records at position %+v: %v", tc, err)
return tc, resultCount, err
}

var rows []*query.QueryResult
for _, event := range res.Events {
switch event.Type {
case binlogdata.VEventType_VGTID:
vgtid := event.GetVgtid().ShardGtids[0]
if vgtid != nil {
tc.Position = vgtid.Gtid
if vgtid.TablePKs != nil {
tablePK := vgtid.TablePKs[0]
if tablePK != nil {
// Setting LastKnownPk allows a COPY phase to pick up where it left off
lastPK := tablePK.Lastpk
tc.LastKnownPk = lastPK
} else {
tc.LastKnownPk = nil
}
} else {
tc.LastKnownPk = nil
}
}
case binlogdata.VEventType_LASTPK:
if event.LastPKEvent.TableLastPK != nil {
// Only update last PK because we're in a COPY phase
tc = &psdbconnect.TableCursor{
Shard: tc.Shard,
Keyspace: tc.Keyspace,
LastKnownPk: event.LastPKEvent.TableLastPK.Lastpk,
Position: tc.Position,
}
p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%sLASTPK event found, setting last PK to %+v", preamble, tc))
}
case binlogdata.VEventType_FIELD:
// Save fields for processing
fields = event.FieldEvent.Fields
p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%sFIELD event found, setting fields to %+v", preamble, fields))
case binlogdata.VEventType_ROW:
// Collect rows for processing
for _, change := range event.RowEvent.RowChanges {
if change.After != nil {
rows = append(rows, &query.QueryResult{
Fields: fields,
Rows: []*query.Row{change.After},
})
}
}
case binlogdata.VEventType_COPY_COMPLETED:
p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%sCOPY_COMPLETED event found, copy phase finished", preamble))
copyCompletedSeen = true
case binlogdata.VEventType_BEGIN, binlogdata.VEventType_COMMIT,
binlogdata.VEventType_DDL, binlogdata.VEventType_DELETE,
binlogdata.VEventType_GTID, binlogdata.VEventType_HEARTBEAT,
binlogdata.VEventType_INSERT, binlogdata.VEventType_JOURNAL,
binlogdata.VEventType_OTHER, binlogdata.VEventType_REPLACE,
binlogdata.VEventType_ROLLBACK, binlogdata.VEventType_SAVEPOINT,
binlogdata.VEventType_SET, binlogdata.VEventType_UNKNOWN,
binlogdata.VEventType_UPDATE, binlogdata.VEventType_VERSION:
// No special handling.
default:
panic(fmt.Sprintf("unexpected binlogdata.VEventType: %#v", event.Type))
}
summary, err := summarizeVStreamEvents(logf, vstreamResp.Events, lastFields)
if err != nil {
return tc, resultCount, err
}

// Keep track of last seen lastFields. We may receive a field event in one
// recv call, and row events in another.
lastFields = summary.fields

// TODO: only update these after rows have been flushed to Airbyte.
if summary.position != "" {
tc.Position = summary.position
}
if summary.lastKnownPk != nil {
tc.LastKnownPk = summary.lastKnownPk
}

if isFullSync && copyCompletedSeen {
p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%sReady to finish sync and flush since copy phase completed or stop VGTID passed", preamble))
if isFullSync && summary.copyCompletedSeen {
logf(LOGLEVEL_INFO, "Ready to finish sync and flush since copy phase completed or stop VGTID passed")
canFinishSync = true
}

if !isFullSync && positionEqual(tc.Position, stopPosition) {
p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%sReady to finish sync and flush since stop position [%+v] found", preamble, stopPosition))
logf(LOGLEVEL_INFO, "Ready to finish sync and flush since stop position [%+v] found", stopPosition)
canFinishSync = true
}

if len(rows) > 0 {
for _, result := range rows {
qr := sqltypes.Proto3ToResult(result)
for _, row := range qr.Rows {
resultCount += 1
sqlResult := &sqltypes.Result{
Fields: fields,
}
sqlResult.Rows = append(sqlResult.Rows, row)
// Results queued to Airbyte here, and flushed at the end of sync()
p.printQueryResult(sqlResult, keyspaceOrDatabase, s.Name, &ps)
for _, result := range summary.results {
qr := sqltypes.Proto3ToResult(result)
for _, row := range qr.Rows {
resultCount += 1
sqlResult := &sqltypes.Result{
Fields: result.Fields,
Rows: []sqltypes.Row{row},
}
// Results queued to Airbyte here, and flushed at the end of sync()
p.printQueryResult(sqlResult, keyspaceOrDatabase, s.Name, &ps)
}
}

// Exit sync and flush records once the VGTID position is at or past the desired stop position, and we're no longer waiting for COPY phase to complete
// Exit sync and flush records once the VGTID position is at or past the
// desired stop position, and we're no longer waiting for COPY phase to
// complete
if canFinishSync {
if isFullSync {
p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%sExiting full sync and flushing records because COPY_COMPLETED event was seen, current position is %+v, stop position is %+v", preamble, tc.Position, stopPosition))
logf(LOGLEVEL_INFO,
"Exiting full sync and flushing records because COPY_COMPLETED event was seen, current position is %+v, stop position is %+v",
tc.Position, stopPosition)
} else {
p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%sExiting incremental sync and flushing records because current position %+v has reached or passed stop position %+v", preamble, tc.Position, stopPosition))
logf(LOGLEVEL_INFO,
"Exiting incremental sync and flushing records because current position %+v has reached or passed stop position %+v",
tc.Position, stopPosition)
}
return tc, resultCount, io.EOF
}
}
}

func syncRecv(
logf func(level, message string, args ...any),
c vtgateservice.Vitess_VStreamClient,
) (*vtgate.VStreamResponse, error) {
resp, err := c.Recv()
if err != nil {
s, ok := status.FromError(err)
switch {
case ok && s.Code() == codes.DeadlineExceeded:
// No next VGTID found within deadline.
logf(LOGLEVEL_ERROR, "no new VGTID found before deadline exceeded: %v", err)
Copy link
Preview

Copilot AI Apr 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The DeadlineExceeded condition is likely an expected scenario; consider using an informational log level instead of LOGLEVEL_ERROR.

Suggested change
logf(LOGLEVEL_ERROR, "no new VGTID found before deadline exceeded: %v", err)
logf(LOGLEVEL_INFO, "no new VGTID found before deadline exceeded: %v", err)

Copilot uses AI. Check for mistakes.

case errors.Is(err, io.EOF):
// EOF is an acceptable error indicating VStream is finished.
logf(LOGLEVEL_ERROR, "encountered EOF, possibly indicating end of VStream")
Copy link
Preview

Copilot AI Apr 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EOF is typically an acceptable signal indicating the end of the stream; consider logging it at an informational level rather than as an error.

Suggested change
logf(LOGLEVEL_ERROR, "encountered EOF, possibly indicating end of VStream")
logf(LOGLEVEL_INFO, "encountered EOF, possibly indicating end of VStream")

Copilot uses AI. Check for mistakes.

}
}
return resp, err
}

type vstreamEventsSummary struct {
copyCompletedSeen bool
fields []*query.Field
lastKnownPk *query.QueryResult
position string
results []*query.QueryResult
}

func summarizeVStreamEvents(
logf func(level, message string, args ...any),
events []*binlogdata.VEvent,
lastFields []*query.Field,
) (vstreamEventsSummary, error) {
summary := vstreamEventsSummary{fields: lastFields}

for _, event := range events {
switch event.Type {
case binlogdata.VEventType_VGTID:
vgtid := event.GetVgtid().ShardGtids[0]
if vgtid != nil {
summary.position = vgtid.Gtid
if vgtid.TablePKs != nil {
tablePK := vgtid.TablePKs[0]
if tablePK != nil {
// Setting LastKnownPk allows a COPY phase to pick up where it left off
lastPK := tablePK.Lastpk
summary.lastKnownPk = lastPK
} else {
summary.lastKnownPk = nil
}
} else {
summary.lastKnownPk = nil
}
}
case binlogdata.VEventType_LASTPK:
if event.LastPKEvent.TableLastPK != nil {
// Only update last PK because we're in a COPY phase
logf(LOGLEVEL_INFO, "LASTPK event found, setting last PK to %+v", event.LastPKEvent.TableLastPK.Lastpk)
summary.lastKnownPk = event.LastPKEvent.TableLastPK.Lastpk
}
case binlogdata.VEventType_FIELD:
// Save fields for processing
logf(LOGLEVEL_INFO, "FIELD event found, setting fields to %+v", event.FieldEvent.Fields)
summary.fields = event.FieldEvent.Fields
case binlogdata.VEventType_ROW:
// Collect rows for processing
for _, change := range event.RowEvent.RowChanges {
if change.After != nil {
summary.results = append(summary.results, &query.QueryResult{
Fields: summary.fields,
Rows: []*query.Row{change.After},
})
}
}
case binlogdata.VEventType_COPY_COMPLETED:
logf(LOGLEVEL_INFO, "COPY_COMPLETED event found, copy phase finished")
summary.copyCompletedSeen = true
case binlogdata.VEventType_BEGIN, binlogdata.VEventType_COMMIT,
binlogdata.VEventType_DDL, binlogdata.VEventType_DELETE,
binlogdata.VEventType_GTID, binlogdata.VEventType_HEARTBEAT,
binlogdata.VEventType_INSERT, binlogdata.VEventType_JOURNAL,
binlogdata.VEventType_OTHER, binlogdata.VEventType_REPLACE,
binlogdata.VEventType_ROLLBACK, binlogdata.VEventType_SAVEPOINT,
binlogdata.VEventType_SET, binlogdata.VEventType_UNKNOWN,
binlogdata.VEventType_UPDATE, binlogdata.VEventType_VERSION:
// No special handling.
default:
return summary, fmt.Errorf("unexpected binlogdata.VEventType: %#v", event.Type)
}
}

return summary, nil
}

func (p PlanetScaleEdgeDatabase) getStopCursorPosition(ctx context.Context, shard, keyspace string, s Stream, ps PlanetScaleSource, tabletType psdbconnect.TabletType) (string, error) {
defer p.Logger.Flush()
timeout := 45 * time.Second
Expand Down
Loading