Skip to content

👌 feat(message:deduplication) implementing the feature #33 #229

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions gbus/abstractions.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,14 @@ const (
REPLY Semantics = "reply"
)

type DeduplicationPolicy int

const (
DeduplicationPolicyNone DeduplicationPolicy = iota
DeduplicationPolicyReject
DeduplicationPolicyAck
)

//Bus interface provides the majority of functionality to Send, Reply and Publish messages to the Bus
type Bus interface {
HandlerRegister
Expand Down Expand Up @@ -213,6 +221,8 @@ type Builder interface {

//WithLogger set custom logger instance
WithLogger(logger logrus.FieldLogger) Builder

WithDeduplicationPolicy(method DeduplicationPolicy, age time.Duration) Builder
}

//Invocation context for a specific processed message
Expand Down Expand Up @@ -284,3 +294,17 @@ type Logged interface {
SetLogger(entry logrus.FieldLogger)
Log() logrus.FieldLogger
}

// Deduplicator abstracts the way to manages the duplications
type Deduplicator interface {
// StoreMessageID stores the message id in the storage
StoreMessageID(logger logrus.FieldLogger, tx *sql.Tx, id string) error
// MessageIDExists checks if message exists in storage
MessageIDExists(logger logrus.FieldLogger, id string) (bool, error)
// Deletes all data from the storage of the duplicator
Purge(logger logrus.FieldLogger) error
// Starts the background process which cleans the storage of the duplicator
Start(logger logrus.FieldLogger)
// Stops the background process of cleaning
Stop(logger logrus.FieldLogger)
}
61 changes: 40 additions & 21 deletions gbus/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,35 @@ import (
"sync"
"time"

"emperror.dev/errors"
"github.com/sirupsen/logrus"

"github.com/wework/grabbit/gbus"
"github.com/wework/grabbit/gbus/deduplicator"
"github.com/wework/grabbit/gbus/saga"
"github.com/wework/grabbit/gbus/serialization"
"github.com/wework/grabbit/gbus/tx/mysql"
)

type defaultBuilder struct {
PrefetchCount uint
connStr string
purgeOnStartup bool
sagaStoreConnStr string
txnl bool
txConnStr string
txnlProvider string
workerNum uint
serializer gbus.Serializer
dlx string
defaultPolicies []gbus.MessagePolicy
confirm bool
dbPingTimeout time.Duration
usingPingTimeout bool
logger logrus.FieldLogger
busCfg gbus.BusConfiguration
PrefetchCount uint
connStr string
purgeOnStartup bool
sagaStoreConnStr string
txnl bool
txConnStr string
txnlProvider string
workerNum uint
serializer gbus.Serializer
dlx string
defaultPolicies []gbus.MessagePolicy
confirm bool
dbPingTimeout time.Duration
usingPingTimeout bool
logger logrus.FieldLogger
busCfg gbus.BusConfiguration
deduplicationPolicy gbus.DeduplicationPolicy
deduplicationRetentionAge time.Duration
}

func (builder *defaultBuilder) Build(svcName string) gbus.Bus {
Expand All @@ -53,6 +57,7 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus {
DefaultPolicies: builder.defaultPolicies,
DbPingTimeout: 3,
Confirm: builder.confirm,
DeduplicationPolicy: builder.deduplicationPolicy,
}

var finalLogger logrus.FieldLogger
Expand Down Expand Up @@ -107,6 +112,7 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus {
if builder.usingPingTimeout {
gb.DbPingTimeout = builder.dbPingTimeout
}
gb.Deduplicator = deduplicator.New(svcName, builder.deduplicationPolicy, gb.TxProvider, builder.deduplicationRetentionAge)

//TODO move this into the NewSagaStore factory methods
if builder.purgeOnStartup {
Expand All @@ -115,6 +121,11 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus {
errMsg := fmt.Errorf("grabbit: saga store faild to purge. error: %v", err)
panic(errMsg)
}
err = gb.Deduplicator.Purge(gb.Log())
if err != nil {
errMsg := errors.NewWithDetails("duplicator failed to purge", "component", "grabbit", "feature", "deduplicator")
panic(errMsg)
}
}
glue := saga.NewGlue(gb, sagaStore, svcName, gb.TxProvider, gb.Log, timeoutManager)
glue.SetLogger(gb.Log())
Expand Down Expand Up @@ -206,6 +217,12 @@ func (builder *defaultBuilder) WithLogger(logger logrus.FieldLogger) gbus.Builde
return builder
}

func (builder *defaultBuilder) WithDeduplicationPolicy(policy gbus.DeduplicationPolicy, age time.Duration) gbus.Builder {
builder.deduplicationPolicy = policy
builder.deduplicationRetentionAge = age
return builder
}

//New :)
func New() Nu {
return Nu{}
Expand All @@ -218,9 +235,11 @@ type Nu struct {
//Bus inits a new BusBuilder
func (Nu) Bus(brokerConnStr string) gbus.Builder {
return &defaultBuilder{
busCfg: gbus.BusConfiguration{},
PrefetchCount: 1,
connStr: brokerConnStr,
serializer: serialization.NewGobSerializer(),
defaultPolicies: make([]gbus.MessagePolicy, 0)}
busCfg: gbus.BusConfiguration{},
PrefetchCount: 1,
connStr: brokerConnStr,
serializer: serialization.NewGobSerializer(),
defaultPolicies: make([]gbus.MessagePolicy, 0),
deduplicationPolicy: gbus.DeduplicationPolicyNone,
}
}
55 changes: 31 additions & 24 deletions gbus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,17 @@ type DefaultBus struct {
Glue SagaGlue
TxProvider TxProvider

WorkerNum uint
Serializer Serializer
DLX string
DefaultPolicies []MessagePolicy
Confirm bool
healthChan chan error
backpressure bool
DbPingTimeout time.Duration
amqpConnected bool
WorkerNum uint
Serializer Serializer
DLX string
DeduplicationPolicy DeduplicationPolicy
Deduplicator Deduplicator
DefaultPolicies []MessagePolicy
Confirm bool
healthChan chan error
backpressure bool
DbPingTimeout time.Duration
amqpConnected bool
}

var (
Expand Down Expand Up @@ -222,6 +224,8 @@ func (b *DefaultBus) Start() error {
return startErr
}

b.Deduplicator.Start(b.Log())

//declare queue
var q amqp.Queue
if q, e = b.createServiceQueue(); e != nil {
Expand Down Expand Up @@ -280,21 +284,24 @@ func (b *DefaultBus) createBusWorkers(workerNum uint) ([]*worker, error) {
tag := fmt.Sprintf("%s_worker_%d", b.SvcName, i)

w := &worker{
consumerTag: tag,
channel: amqpChan,
q: b.serviceQueue,
rpcq: b.rpcQueue,
svcName: b.SvcName,
txProvider: b.TxProvider,
rpcLock: b.RPCLock,
rpcHandlers: b.RPCHandlers,
deadletterHandler: b.deadletterHandler,
globalRawHandler: b.globalRawHandler,
handlersLock: &sync.Mutex{},
registrations: b.Registrations,
serializer: b.Serializer,
b: b,
amqpErrors: b.amqpErrors}
consumerTag: tag,
channel: amqpChan,
q: b.serviceQueue,
rpcq: b.rpcQueue,
svcName: b.SvcName,
txProvider: b.TxProvider,
rpcLock: b.RPCLock,
rpcHandlers: b.RPCHandlers,
deadletterHandler: b.deadletterHandler,
globalRawHandler: b.globalRawHandler,
handlersLock: &sync.Mutex{},
registrations: b.Registrations,
serializer: b.Serializer,
b: b,
amqpErrors: b.amqpErrors,
deduplicationPolicy: b.DeduplicationPolicy,
deduplicator: b.Deduplicator,
}

err := w.Start()
if err != nil {
Expand Down
162 changes: 162 additions & 0 deletions gbus/deduplicator/deduper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
package deduplicator

import (
"database/sql"
"sync"
"time"

"emperror.dev/errors"
"github.com/sirupsen/logrus"

"github.com/wework/grabbit/gbus"
"github.com/wework/grabbit/gbus/tx"
)

var _ gbus.Deduplicator = &dedup{}

type dedup struct {
svcName string
policy gbus.DeduplicationPolicy
txProvider gbus.TxProvider
age time.Duration
ticker *time.Ticker
done chan bool
tableName string
started bool
startStopMutex sync.Mutex
}

func (d *dedup) Purge(logger logrus.FieldLogger) (err error) {
truncateSQL := "TRUNCATE TABLE " + d.tableName
txp, err := d.txProvider.New()
if err != nil {
logger.WithError(err).WithField("table_name", d.tableName).Error("failed purging duplicates table")
return err
}
defer func() {
if err != nil {
serr := txp.Rollback()
logger.WithError(serr).Error("failed rolling back transaction after purge")
err = errors.Append(err, serr)
}
err = txp.Commit()
}()
_, err = txp.Exec(truncateSQL)
if err != nil {
logger.WithError(err).WithField("table_name", d.tableName).Error("failed executing truncate on table")
return err
}
logger.WithField("table_name", d.tableName).Info("successfully truncated table")
return nil
}

func (d *dedup) Start(l logrus.FieldLogger) {
d.startStopMutex.Lock()
defer d.startStopMutex.Unlock()
logger := d.decoratedLog(l)
d.ticker = time.NewTicker(time.Minute)
d.done = make(chan bool)
deleteQuery := "DELETE FROM " + d.tableName + " WHERE `created_at` < ?"
go func() {
for {
select {
case <-d.done:
return
case <-d.ticker.C:
oldest := time.Now().Add(-1 * d.age)
tx, err := d.txProvider.New()
if err != nil {
logger.WithError(err).Error("failed to acquire a tx")
continue
}
result, err := tx.Exec(deleteQuery, oldest)
if err != nil && err != sql.ErrNoRows {
logger.WithError(err).Error("failed executing delete query")
continue
}
n, err := result.RowsAffected()
if err != nil {
logger.WithError(err).Error("failed to get count of affected rows")
} else {
logger.WithField("table_name", d.tableName).WithField("rows_deleted", n).
Info("successfully cleanup duplicates table")
}
}
}
}()
d.started = true
}

func (d *dedup) decoratedLog(l logrus.FieldLogger) logrus.FieldLogger {
logger := l.WithField("grabbit", "dedup")
return logger
}

func (d *dedup) Stop(logger logrus.FieldLogger) {
d.decoratedLog(logger).Info("shutting down deduplicator")
d.startStopMutex.Lock()
defer d.startStopMutex.Unlock()
if d.started {
d.ticker.Stop()
close(d.done)
d.started = false
}
}

//
func (d *dedup) StoreMessageID(logger logrus.FieldLogger, tx *sql.Tx, id string) error {
insertSQL := "INSERT INTO " + d.tableName + " (id) values (?)"
_, err := tx.Exec(insertSQL, id)
if err != nil {
d.decoratedLog(logger).WithError(err).Error("failed to insert the id of the message into the dedup table")
return err
}
return nil
}

// MessageIDExists checks if a message id is in the deduplication table and returns an error if it fails
func (d *dedup) MessageIDExists(l logrus.FieldLogger, id string) (bool, error) {
logger := d.decoratedLog(l)
if d.policy == gbus.DeduplicationPolicyNone {
logger.Debug("duplication policy is none")
return false, nil
}
tx, err := d.txProvider.New()
if err != nil {
logger.WithError(err).Error("failed getting tx from txProvider")
return true, err
}
defer func() {
err = tx.Rollback()
if err != nil {
logger.WithError(err).Error("could not commit tx for query MessageIDExists")
}
}()
selectSQL := "SELECT EXISTS (SELECT id FROM " + d.tableName + " WHERE id = ? limit 1)"

var exists bool
err = tx.QueryRow(selectSQL, id).Scan(&exists)
if err != nil && err == sql.ErrNoRows {
logger.WithField("table_name", d.tableName).Debug("no rows in result set when looking for messages in duplicates table")
return false, nil
}

if err != nil {
logger.WithError(err).WithField("table_name", d.tableName).Error("failed executing lookup query in duplicates table")
return true, err
}

return exists, nil
}

func New(svcName string, policy gbus.DeduplicationPolicy, txProvider gbus.TxProvider, age time.Duration) gbus.Deduplicator {
d := &dedup{
svcName: svcName,
policy: policy,
txProvider: txProvider,
age: age,
tableName: tx.GrabbitTableNameTemplate(svcName, "duplicates"),
started: false,
}
return d
}
Loading