Skip to content

[DMM] #1041

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 29 commits into
base: master
Choose a base branch
from
Open

[DMM] #1041

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 20 additions & 13 deletions logservice/eventstore/event_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,14 +178,14 @@ type eventStore struct {

const (
dataDir = "event_store"
dbCount = 8
writeWorkerNumPerDB = 2
dbCount = 4
writeWorkerNumPerDB = 4

// Pebble options
targetMemoryLimit = 2 << 30 // 2GB
targetMemoryLimit = 4 << 30 // 4GB
memTableSize = 256 << 20 // 256MB
memTableCount = 4
blockCacheSize = targetMemoryLimit - (memTableSize * memTableCount) // 1GB
memTableCount = 8
blockCacheSize = targetMemoryLimit - (memTableSize * memTableCount) // 2GB
)

func New(
Expand Down Expand Up @@ -261,6 +261,10 @@ func newPebbleOptions() *pebble.Options {

// Configure options to optimize read/write performance
Levels: make([]pebble.LevelOptions, 2),

MaxConcurrentCompactions: func() int {
return 4
},
}

// Configure level strategy
Expand All @@ -278,8 +282,8 @@ func newPebbleOptions() *pebble.Options {
}

// Adjust L0 thresholds to delay compaction timing
opts.L0CompactionThreshold = 20 // Allow more files in L0
opts.L0StopWritesThreshold = 40 // Increase stop-writes threshold
opts.L0CompactionThreshold = 20 // Allow more files in L0
opts.L0StopWritesThreshold = 160 // Increase stop-writes threshold

// Prefetch configuration
opts.ReadOnly = false
Expand Down Expand Up @@ -316,6 +320,7 @@ func (p *writeTaskPool) run(ctx context.Context) {
return
default:
events, ok := p.dataCh.GetMultipleNoGroup(buffer)
log.Info("write task pool get events", zap.Int("event count", len(events)))
if !ok {
return
}
Expand Down Expand Up @@ -515,12 +520,12 @@ func (e *eventStore) RegisterDispatcher(
}
// just do CompareAndSwap once, if failed, it means another goroutine has updated resolvedTs
if subStat.resolvedTs.CompareAndSwap(currentResolvedTs, ts) {
subStat.dispatchers.Lock()
defer subStat.dispatchers.Unlock()
for _, notifier := range subStat.dispatchers.notifiers {
notifier(ts, subStat.maxEventCommitTs.Load())
}
CounterResolved.Inc()
// subStat.dispatchers.Lock()
// defer subStat.dispatchers.Unlock()
// for _, notifier := range subStat.dispatchers.notifiers {
// notifier(ts, subStat.maxEventCommitTs.Load())
// }
// CounterResolved.Inc()
}
}
// Note: don't hold any lock when call Subscribe
Expand Down Expand Up @@ -743,11 +748,13 @@ func (e *eventStore) writeEvents(db *pebble.DB, events []eventWithCallback) erro
}
}
CounterKv.Add(float64(kvCount))
log.Info("write events batch size", zap.Int("kvCount", kvCount), zap.Int("batchSize", batch.Len()))
metrics.EventStoreWriteBatchEventsCountHist.Observe(float64(kvCount))
metrics.EventStoreWriteBatchSizeHist.Observe(float64(batch.Len()))
metrics.EventStoreWriteBytes.Add(float64(batch.Len()))
start := time.Now()
err := batch.Commit(pebble.NoSync)
log.Info("write events batch commit", zap.Int("kvCount", kvCount))
metrics.EventStoreWriteDurationHistogram.Observe(float64(time.Since(start).Milliseconds()) / 1000)
return err
}
Expand Down
74 changes: 23 additions & 51 deletions logservice/logpuller/region_event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package logpuller

import (
"encoding/hex"
"time"
"unsafe"

"github.com/pingcap/kvproto/pkg/cdcpb"
Expand All @@ -37,6 +36,7 @@ const (
DataGroupError = 3
)

// TODO: rename this not a real region event now, because the resolved ts is subscription level now.
type regionEvent struct {
state *regionFeedState
worker *regionRequestWorker // TODO: remove the field
Expand All @@ -50,16 +50,18 @@ func (event *regionEvent) getSize() int {
if event == nil {
return 0
}
// don't count the size of resolved ts event
if event.entries == nil {
return 0
}
size := int(unsafe.Sizeof(*event))
if event.entries != nil {
size += int(unsafe.Sizeof(*event.entries))
size += int(unsafe.Sizeof(*event.entries.Entries))
for _, row := range event.entries.Entries.GetEntries() {
size += int(unsafe.Sizeof(*row))
size += len(row.Key)
size += len(row.Value)
size += len(row.OldValue)
}
size += int(unsafe.Sizeof(*event.entries))
size += int(unsafe.Sizeof(*event.entries.Entries))
for _, row := range event.entries.Entries.GetEntries() {
size += int(unsafe.Sizeof(*row))
size += len(row.Key)
size += len(row.Value)
size += len(row.OldValue)
}
return size
}
Expand All @@ -80,14 +82,20 @@ func (h *regionEventHandler) Handle(span *subscribedSpan, events ...regionEvent)
}

for _, event := range events {
if event.resolvedTs != 0 {
// do some safety check
if event.state != nil || event.worker != nil {
log.Panic("should not happen: resolvedTs event should not have state or worker")
}
span.advanceResolvedTs(event.resolvedTs)
continue
}
if event.state.isStale() {
h.handleRegionError(event.state, event.worker)
continue
}
if event.entries != nil {
handleEventEntries(span, event.state, event.entries)
} else if event.resolvedTs != 0 {
handleResolvedTs(span, event.state, event.resolvedTs)
} else {
log.Panic("should not reach", zap.Any("event", event), zap.Any("events", events))
}
Expand Down Expand Up @@ -139,9 +147,9 @@ func (h *regionEventHandler) GetType(event regionEvent) dynstream.EventType {
if event.entries != nil {
return dynstream.EventType{DataGroup: DataGroupEntries, Property: dynstream.BatchableData}
} else if event.resolvedTs != 0 {
// Note: resolved ts may from different region, so there are not periodic signal
return dynstream.EventType{DataGroup: DataGroupResolvedTs, Property: dynstream.BatchableData}
} else if event.state.isStale() {
// resolved ts is subscription level now
return dynstream.EventType{DataGroup: DataGroupResolvedTs, Property: dynstream.PeriodicSignal}
} else if event.state != nil && event.state.isStale() {
return dynstream.EventType{DataGroup: DataGroupError, Property: dynstream.BatchableData}
} else {
log.Panic("unknown event type",
Expand Down Expand Up @@ -272,39 +280,3 @@ func handleEventEntries(span *subscribedSpan, state *regionFeedState, entries *c
}
}
}

func handleResolvedTs(span *subscribedSpan, state *regionFeedState, resolvedTs uint64) {
if state.isStale() || !state.isInitialized() {
return
}
state.matcher.tryCleanUnmatchedValue()
regionID := state.getRegionID()
lastResolvedTs := state.getLastResolvedTs()
if resolvedTs < lastResolvedTs {
log.Info("The resolvedTs is fallen back in subscription client",
zap.Uint64("subscriptionID", uint64(state.region.subscribedSpan.subID)),
zap.Uint64("regionID", regionID),
zap.Uint64("resolvedTs", resolvedTs),
zap.Uint64("lastResolvedTs", lastResolvedTs))
return
}
state.updateResolvedTs(resolvedTs)

now := time.Now().UnixMilli()
lastAdvance := span.lastAdvanceTime.Load()
if now-lastAdvance > span.advanceInterval && span.lastAdvanceTime.CompareAndSwap(lastAdvance, now) {
ts := span.rangeLock.ResolvedTs()
if ts > 0 && span.initialized.CompareAndSwap(false, true) {
log.Info("subscription client is initialized",
zap.Uint64("subscriptionID", uint64(span.subID)),
zap.Uint64("regionID", regionID),
zap.Uint64("resolvedTs", ts))
}
lastResolvedTs := span.resolvedTs.Load()
if ts > lastResolvedTs {
span.resolvedTs.Store(ts)
span.resolvedTsUpdated.Store(time.Now().Unix())
span.advanceResolvedTs(ts)
}
}
}
11 changes: 4 additions & 7 deletions logservice/logpuller/region_request_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,14 +263,15 @@ func (s *regionRequestWorker) dispatchRegionChangeEvents(events []*cdcpb.Event)
zap.Any("error", eventData.Error))
state.markStopped(&eventError{err: eventData.Error})
case *cdcpb.Event_ResolvedTs:
regionEvent.resolvedTs = eventData.ResolvedTs
s.client.pushResolvedTsEvent(subscriptionID, state, eventData.ResolvedTs)
continue
case *cdcpb.Event_LongTxn_:
// ignore
continue
default:
log.Panic("unknown event type", zap.Any("event", event))
}
s.client.pushRegionEventToDS(SubscriptionID(event.RequestId), regionEvent)
s.client.pushRegionEventToDS(subscriptionID, regionEvent)
} else {
log.Warn("region request worker receives a region event for an untracked region",
zap.Uint64("workerID", s.workerID),
Expand All @@ -286,11 +287,7 @@ func (s *regionRequestWorker) dispatchResolvedTsEvent(resolvedTsEvent *cdcpb.Res
s.client.metrics.batchResolvedSize.Observe(float64(len(resolvedTsEvent.Regions)))
for _, regionID := range resolvedTsEvent.Regions {
if state := s.getRegionState(subscriptionID, regionID); state != nil {
s.client.pushRegionEventToDS(SubscriptionID(resolvedTsEvent.RequestId), regionEvent{
state: state,
worker: s,
resolvedTs: resolvedTsEvent.Ts,
})
s.client.pushResolvedTsEvent(subscriptionID, state, resolvedTsEvent.Ts)
} else {
log.Warn("region request worker receives a resolved ts event for an untracked region",
zap.Uint64("workerID", s.workerID),
Expand Down
Loading