Skip to content

surface errors for flushing and recording #136

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: maxeng-error-handling-1
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 76 additions & 29 deletions cmd/internal/logger.go
Original file line number Diff line number Diff line change
@@ -1,61 +1,85 @@
package internal

import (
"bytes"
"encoding/json"
"fmt"
"io"
"os"
"time"

"github.com/pkg/errors"
)

type AirbyteLogger interface {
Log(level, message string)
Catalog(catalog Catalog)
ConnectionStatus(status ConnectionStatus)
Record(tableNamespace, tableName string, data map[string]interface{})
Flush()
Record(tableNamespace, tableName string, data map[string]interface{}) error
Flush() error
State(syncState SyncState)
Error(error string)
QueueFull() bool
}

const MaxBatchSize = 10000

var (
errExceededMaxRecordBatchSize = errors.New("exceeded max record batch size")

_ AirbyteLogger = (*airbyteLogger)(nil)
)

func NewLogger(w io.Writer) AirbyteLogger {
al := airbyteLogger{}
al.writer = w
al.recordEncoder = json.NewEncoder(w)
al.records = make([]AirbyteMessage, 0, MaxBatchSize)
return &al
buffer := &bytes.Buffer{}
return &airbyteLogger{
buffer: buffer,
encoder: json.NewEncoder(buffer),
records: make([]AirbyteMessage, 0, MaxBatchSize),
writer: w,
}
}

type airbyteLogger struct {
recordEncoder *json.Encoder
writer io.Writer
records []AirbyteMessage
buffer *bytes.Buffer
encoder *json.Encoder
writer io.Writer
records []AirbyteMessage
}

// QueueFull implements AirbyteLogger.
func (a *airbyteLogger) QueueFull() bool {
return len(a.records) >= MaxBatchSize
}

func (a *airbyteLogger) Log(level, message string) {
if err := a.recordEncoder.Encode(AirbyteMessage{
if err := a.encodeAndWriteMessages([]AirbyteMessage{{
Type: LOG,
Log: &AirbyteLogMessage{
Level: level,
Message: preamble() + message,
},
}); err != nil {
}}); err != nil {
// TODO: return this as an error
fmt.Fprintf(os.Stderr, "%sFailed to write log message: %v", preamble(), err)
}
}

func (a *airbyteLogger) Catalog(catalog Catalog) {
if err := a.recordEncoder.Encode(AirbyteMessage{
if err := a.encodeAndWriteMessages([]AirbyteMessage{{
Type: CATALOG,
Catalog: &catalog,
}); err != nil {
}}); err != nil {
// TODO: return this as an error
a.Error(fmt.Sprintf("catalog encoding error: %v", err))
}
}

func (a *airbyteLogger) Record(tableNamespace, tableName string, data map[string]interface{}) {
func (a *airbyteLogger) Record(tableNamespace, tableName string, data map[string]interface{}) error {
if len(a.records) >= MaxBatchSize {
return errExceededMaxRecordBatchSize
}

now := time.Now()
amsg := AirbyteMessage{
Type: RECORD,
Expand All @@ -68,50 +92,73 @@ func (a *airbyteLogger) Record(tableNamespace, tableName string, data map[string
}

a.records = append(a.records, amsg)
if len(a.records) == MaxBatchSize {
a.Flush()
}
return nil
}

func (a *airbyteLogger) Flush() {
for _, record := range a.records {
if err := a.recordEncoder.Encode(record); err != nil {
a.Error(fmt.Sprintf("flush encoding error: %v", err))
}
func (a *airbyteLogger) Flush() error {
if len(a.records) == 0 {
return nil
}
if err := a.encodeAndWriteMessages(a.records); err != nil {
return fmt.Errorf("encode and write messages: %w", err)
}
a.records = a.records[:0]
return nil
}

func (a *airbyteLogger) State(syncState SyncState) {
if err := a.recordEncoder.Encode(AirbyteMessage{
if err := a.encodeAndWriteMessages([]AirbyteMessage{{
Type: STATE,
State: &AirbyteState{syncState},
}); err != nil {
}}); err != nil {
// TODO: return this as an error
a.Error(fmt.Sprintf("state encoding error: %v", err))
}
}

func (a *airbyteLogger) Error(error string) {
if err := a.recordEncoder.Encode(AirbyteMessage{
if err := a.encodeAndWriteMessages([]AirbyteMessage{{
Type: LOG,
Log: &AirbyteLogMessage{
Level: LOGLEVEL_ERROR,
Message: error,
},
}); err != nil {
}}); err != nil {
// TODO: return this as an error
fmt.Fprintf(os.Stderr, "%sFailed to write error: %v", preamble(), err)
}
}

func (a *airbyteLogger) ConnectionStatus(status ConnectionStatus) {
if err := a.recordEncoder.Encode(AirbyteMessage{
if err := a.encodeAndWriteMessages([]AirbyteMessage{{
Type: CONNECTION_STATUS,
ConnectionStatus: &status,
}); err != nil {
}}); err != nil {
// TODO: return this as an error
a.Error(fmt.Sprintf("connection status encoding error: %v", err))
}
}

func (a *airbyteLogger) encodeAndWriteMessages(ms []AirbyteMessage) error {
if len(ms) == 0 {
return nil
}

defer a.buffer.Reset()

for _, m := range ms {
if err := a.encoder.Encode(m); err != nil {
return fmt.Errorf("encode: %w", err)
}
}

if _, err := a.writer.Write(a.buffer.Bytes()); err != nil {
return fmt.Errorf("write: %w", err)
}

return nil
}

func preamble() string {
return "PlanetScale Source :: "
}
13 changes: 11 additions & 2 deletions cmd/internal/mock_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,19 @@ type testAirbyteLogEntry struct {
message string
}

var _ AirbyteLogger = (*testAirbyteLogger)(nil)

type testAirbyteLogger struct {
logMessages []testAirbyteLogEntry
logMessagesByLevel map[string][]string
records map[string][]map[string]interface{}
}

// QueueFull implements AirbyteLogger.
func (tal *testAirbyteLogger) QueueFull() bool {
return len(tal.records) > 0
}

func (tal *testAirbyteLogger) Log(level, message string) {
if tal.logMessagesByLevel == nil {
tal.logMessagesByLevel = map[string][]string{}
Expand All @@ -39,15 +46,17 @@ func (testAirbyteLogger) ConnectionStatus(status ConnectionStatus) {
panic("implement me")
}

func (tal *testAirbyteLogger) Record(tableNamespace, tableName string, data map[string]interface{}) {
func (tal *testAirbyteLogger) Record(tableNamespace, tableName string, data map[string]interface{}) error {
if tal.records == nil {
tal.records = map[string][]map[string]interface{}{}
}
key := tableNamespace + "." + tableName
tal.records[key] = append(tal.records[key], data)
return nil
}

func (testAirbyteLogger) Flush() {
func (testAirbyteLogger) Flush() error {
return nil
}

func (testAirbyteLogger) State(syncState SyncState) {
Expand Down
Loading