From 0281793092ebe99482d01a2a24991885a88cc542 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Tue, 25 Feb 2025 13:47:50 +0800 Subject: [PATCH 01/29] add some metric --- logservice/eventstore/event_store.go | 1 + logservice/logpuller/subscription_client.go | 27 +++++++++++++++------ 2 files changed, 21 insertions(+), 7 deletions(-) diff --git a/logservice/eventstore/event_store.go b/logservice/eventstore/event_store.go index 453088631..8c83e51b6 100644 --- a/logservice/eventstore/event_store.go +++ b/logservice/eventstore/event_store.go @@ -316,6 +316,7 @@ func (p *writeTaskPool) run(ctx context.Context) { return default: events, ok := p.dataCh.GetMultipleNoGroup(buffer) + log.Info("write task pool get events", zap.Int("event count", len(events))) if !ok { return } diff --git a/logservice/logpuller/subscription_client.go b/logservice/logpuller/subscription_client.go index 0cdccebd4..5868ade40 100644 --- a/logservice/logpuller/subscription_client.go +++ b/logservice/logpuller/subscription_client.go @@ -261,24 +261,37 @@ func (s *SubscriptionClient) initMetrics() { } func (s *SubscriptionClient) updateMetrics(ctx context.Context) error { - ticker1 := time.NewTicker(10 * time.Second) - ticker2 := time.NewTicker(5 * time.Millisecond) - defer ticker1.Stop() + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() for { select { case <-ctx.Done(): return ctx.Err() - case <-ticker1.C: + case <-ticker.C: resolvedTsLag := s.GetResolvedTsLag() if resolvedTsLag > 0 { metrics.LogPullerResolvedTsLag.Set(resolvedTsLag) } - case <-ticker2.C: dsMetrics := s.ds.GetMetrics() metricSubscriptionClientDSChannelSize.Set(float64(dsMetrics.EventChanSize)) metricSubscriptionClientDSPendingQueueLen.Set(float64(dsMetrics.PendingQueueLen)) - metricEventStoreDSAddPathNum.Set(float64(dsMetrics.AddPath)) - metricEventStoreDSRemovePathNum.Set(float64(dsMetrics.RemovePath)) + if len(dsMetrics.MemoryControl.AreaMemoryMetrics) > 1 { + log.Panic("subscription client should have only one area") + } + // TODO: separate area for schema store and event store + if len(dsMetrics.MemoryControl.AreaMemoryMetrics) > 0 { + areaMetric := dsMetrics.MemoryControl.AreaMemoryMetrics[0] + metrics.DynamicStreamMemoryUsage.WithLabelValues( + "log-puller", + "max", + "default", + ).Set(float64(areaMetric.MaxMemory())) + metrics.DynamicStreamMemoryUsage.WithLabelValues( + "log-puller", + "used", + "default", + ).Set(float64(areaMetric.MemoryUsage())) + } } } } From 77b1e75dd646d169b0d2606acdf5af33930f3eee Mon Sep 17 00:00:00 2001 From: lidezhu Date: Tue, 25 Feb 2025 14:40:54 +0800 Subject: [PATCH 02/29] add some log --- logservice/eventstore/event_store.go | 12 ++++++------ utils/dynstream/memory_control.go | 2 ++ 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/logservice/eventstore/event_store.go b/logservice/eventstore/event_store.go index 8c83e51b6..ba5e4b512 100644 --- a/logservice/eventstore/event_store.go +++ b/logservice/eventstore/event_store.go @@ -516,12 +516,12 @@ func (e *eventStore) RegisterDispatcher( } // just do CompareAndSwap once, if failed, it means another goroutine has updated resolvedTs if subStat.resolvedTs.CompareAndSwap(currentResolvedTs, ts) { - subStat.dispatchers.Lock() - defer subStat.dispatchers.Unlock() - for _, notifier := range subStat.dispatchers.notifiers { - notifier(ts, subStat.maxEventCommitTs.Load()) - } - CounterResolved.Inc() + // subStat.dispatchers.Lock() + // defer subStat.dispatchers.Unlock() + // for _, notifier := range subStat.dispatchers.notifiers { + // notifier(ts, subStat.maxEventCommitTs.Load()) + // } + // CounterResolved.Inc() } } // Note: don't hold any lock when call Subscribe diff --git a/utils/dynstream/memory_control.go b/utils/dynstream/memory_control.go index 3abaa2429..c13726271 100644 --- a/utils/dynstream/memory_control.go +++ b/utils/dynstream/memory_control.go @@ -85,6 +85,7 @@ func (as *areaMemStat[A, P, T, D, H]) appendEvent( // Update the pending size. path.updatePendingSize(int64(event.eventSize)) as.totalPendingSize.Add(int64(event.eventSize)) + log.Info("increase dynamic stream memory size", zap.Int64("size", int64(event.eventSize))) return true } @@ -233,6 +234,7 @@ func (as *areaMemStat[A, P, T, D, H]) shouldPauseArea() (pause bool, resume bool } func (as *areaMemStat[A, P, T, D, H]) decPendingSize(path *pathInfo[A, P, T, D, H], size int64) { + log.Info("descrease dynamic stream memory size", zap.Int64("size", size)) as.totalPendingSize.Add(int64(-size)) if as.totalPendingSize.Load() < 0 { log.Warn("Total pending size is less than 0, reset it to 0", zap.Int64("totalPendingSize", as.totalPendingSize.Load())) From 34b480fed890ad54189cfcfc93b490f59bde5180 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Tue, 25 Feb 2025 15:06:29 +0800 Subject: [PATCH 03/29] ignore resolved ts event --- logservice/logpuller/region_event_handler.go | 20 +++++++++++--------- utils/dynstream/parallel_dynamic_stream.go | 8 +++++++- 2 files changed, 18 insertions(+), 10 deletions(-) diff --git a/logservice/logpuller/region_event_handler.go b/logservice/logpuller/region_event_handler.go index 2f2f11b18..6bb6f36a2 100644 --- a/logservice/logpuller/region_event_handler.go +++ b/logservice/logpuller/region_event_handler.go @@ -50,16 +50,18 @@ func (event *regionEvent) getSize() int { if event == nil { return 0 } + // don't count the size of resolved ts event + if event.entries == nil { + return 0 + } size := int(unsafe.Sizeof(*event)) - if event.entries != nil { - size += int(unsafe.Sizeof(*event.entries)) - size += int(unsafe.Sizeof(*event.entries.Entries)) - for _, row := range event.entries.Entries.GetEntries() { - size += int(unsafe.Sizeof(*row)) - size += len(row.Key) - size += len(row.Value) - size += len(row.OldValue) - } + size += int(unsafe.Sizeof(*event.entries)) + size += int(unsafe.Sizeof(*event.entries.Entries)) + for _, row := range event.entries.Entries.GetEntries() { + size += int(unsafe.Sizeof(*row)) + size += len(row.Key) + size += len(row.Value) + size += len(row.OldValue) } return size } diff --git a/utils/dynstream/parallel_dynamic_stream.go b/utils/dynstream/parallel_dynamic_stream.go index 6f9dfef39..580299734 100644 --- a/utils/dynstream/parallel_dynamic_stream.go +++ b/utils/dynstream/parallel_dynamic_stream.go @@ -97,12 +97,18 @@ func (s *parallelDynamicStream[A, P, T, D, H]) Push(path P, e T) { } } + eventSize := s.handler.GetSize(e) + // Only count the extra size when the event is not an "zero size" object + // to provide more flexibility on memory usage control for the user. + if eventSize != 0 { + eventSize += s.eventExtraSize + } ew := eventWrap[A, P, T, D, H]{ event: e, pathInfo: pi, paused: s.handler.IsPaused(e), eventType: s.handler.GetType(e), - eventSize: s.eventExtraSize + s.handler.GetSize(e), + eventSize: eventSize, timestamp: s.handler.GetTimestamp(e), queueTime: time.Now(), } From 34c78d1139dec673293c885c1f1e0aa7477b5973 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Tue, 25 Feb 2025 15:21:36 +0800 Subject: [PATCH 04/29] refine log --- utils/dynstream/memory_control.go | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/utils/dynstream/memory_control.go b/utils/dynstream/memory_control.go index c13726271..a9bda3894 100644 --- a/utils/dynstream/memory_control.go +++ b/utils/dynstream/memory_control.go @@ -82,10 +82,12 @@ func (as *areaMemStat[A, P, T, D, H]) appendEvent( // Add the event to the pending queue. path.pendingQueue.PushBack(event) - // Update the pending size. - path.updatePendingSize(int64(event.eventSize)) - as.totalPendingSize.Add(int64(event.eventSize)) - log.Info("increase dynamic stream memory size", zap.Int64("size", int64(event.eventSize))) + if event.eventSize != 0 { + // Update the pending size. + path.updatePendingSize(int64(event.eventSize)) + as.totalPendingSize.Add(int64(event.eventSize)) + log.Info("increase dynamic stream memory size", zap.Int64("size", int64(event.eventSize))) + } return true } @@ -234,6 +236,9 @@ func (as *areaMemStat[A, P, T, D, H]) shouldPauseArea() (pause bool, resume bool } func (as *areaMemStat[A, P, T, D, H]) decPendingSize(path *pathInfo[A, P, T, D, H], size int64) { + if size == 0 { + return + } log.Info("descrease dynamic stream memory size", zap.Int64("size", size)) as.totalPendingSize.Add(int64(-size)) if as.totalPendingSize.Load() < 0 { From 41e9568098e6feb8dd782216acb988bbffa32015 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Tue, 25 Feb 2025 15:32:50 +0800 Subject: [PATCH 05/29] remove log --- utils/dynstream/memory_control.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/utils/dynstream/memory_control.go b/utils/dynstream/memory_control.go index a9bda3894..194e255c1 100644 --- a/utils/dynstream/memory_control.go +++ b/utils/dynstream/memory_control.go @@ -86,7 +86,6 @@ func (as *areaMemStat[A, P, T, D, H]) appendEvent( // Update the pending size. path.updatePendingSize(int64(event.eventSize)) as.totalPendingSize.Add(int64(event.eventSize)) - log.Info("increase dynamic stream memory size", zap.Int64("size", int64(event.eventSize))) } return true } @@ -239,7 +238,6 @@ func (as *areaMemStat[A, P, T, D, H]) decPendingSize(path *pathInfo[A, P, T, D, if size == 0 { return } - log.Info("descrease dynamic stream memory size", zap.Int64("size", size)) as.totalPendingSize.Add(int64(-size)) if as.totalPendingSize.Load() < 0 { log.Warn("Total pending size is less than 0, reset it to 0", zap.Int64("totalPendingSize", as.totalPendingSize.Load())) From df356b508f1f289ba0971b2445c0d6c4dca60621 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Tue, 25 Feb 2025 15:40:48 +0800 Subject: [PATCH 06/29] increase memory quota --- logservice/logpuller/subscription_client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/logservice/logpuller/subscription_client.go b/logservice/logpuller/subscription_client.go index 5868ade40..c278d36be 100644 --- a/logservice/logpuller/subscription_client.go +++ b/logservice/logpuller/subscription_client.go @@ -328,7 +328,7 @@ func (s *SubscriptionClient) Subscribe( s.totalSpans.Unlock() s.ds.AddPath(rt.subID, rt, dynstream.AreaSettings{ - MaxPendingSize: 2 * 1024 * 1024 * 1024, // 2GB + MaxPendingSize: 4 * 1024 * 1024 * 1024, // 4GB }) s.rangeTaskCh <- rangeTask{span: span, subscribedSpan: rt} From 5eaf91fbc871999ee7347742386139ef5b13c67a Mon Sep 17 00:00:00 2001 From: lidezhu Date: Tue, 25 Feb 2025 16:01:56 +0800 Subject: [PATCH 07/29] disable memory control --- logservice/logpuller/subscription_client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/logservice/logpuller/subscription_client.go b/logservice/logpuller/subscription_client.go index c278d36be..c98274d37 100644 --- a/logservice/logpuller/subscription_client.go +++ b/logservice/logpuller/subscription_client.go @@ -232,7 +232,7 @@ func NewSubscriptionClient( option := dynstream.NewOption() option.BatchCount = 1024 option.UseBuffer = false - option.EnableMemoryControl = true + option.EnableMemoryControl = false ds := dynstream.NewParallelDynamicStream( func(subID SubscriptionID) uint64 { return uint64(subID) }, ®ionEventHandler{subClient: subClient}, From 5f4540160cbbafdfd414b9cae83db1e434d58909 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Tue, 25 Feb 2025 16:10:39 +0800 Subject: [PATCH 08/29] add some log --- logservice/eventstore/event_store.go | 1 + 1 file changed, 1 insertion(+) diff --git a/logservice/eventstore/event_store.go b/logservice/eventstore/event_store.go index ba5e4b512..b63cc22c8 100644 --- a/logservice/eventstore/event_store.go +++ b/logservice/eventstore/event_store.go @@ -744,6 +744,7 @@ func (e *eventStore) writeEvents(db *pebble.DB, events []eventWithCallback) erro } } CounterKv.Add(float64(kvCount)) + log.Info("write events batch size", zap.Int("kvCount", kvCount), zap.Int("batchSize", batch.Len())) metrics.EventStoreWriteBatchEventsCountHist.Observe(float64(kvCount)) metrics.EventStoreWriteBatchSizeHist.Observe(float64(batch.Len())) metrics.EventStoreWriteBytes.Add(float64(batch.Len())) From f212bbbe7ff5dc5338e7bb1530b7cf5732862271 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Tue, 25 Feb 2025 16:11:51 +0800 Subject: [PATCH 09/29] fix --- logservice/logpuller/subscription_client.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/logservice/logpuller/subscription_client.go b/logservice/logpuller/subscription_client.go index c98274d37..f78de7305 100644 --- a/logservice/logpuller/subscription_client.go +++ b/logservice/logpuller/subscription_client.go @@ -232,7 +232,7 @@ func NewSubscriptionClient( option := dynstream.NewOption() option.BatchCount = 1024 option.UseBuffer = false - option.EnableMemoryControl = false + option.EnableMemoryControl = metrics.SlowestTablePullerResolvedTs.DeleteLabelValues() ds := dynstream.NewParallelDynamicStream( func(subID SubscriptionID) uint64 { return uint64(subID) }, ®ionEventHandler{subClient: subClient}, @@ -328,7 +328,7 @@ func (s *SubscriptionClient) Subscribe( s.totalSpans.Unlock() s.ds.AddPath(rt.subID, rt, dynstream.AreaSettings{ - MaxPendingSize: 4 * 1024 * 1024 * 1024, // 4GB + MaxPendingSize: 8 * 1024 * 1024 * 1024, // 4GB }) s.rangeTaskCh <- rangeTask{span: span, subscribedSpan: rt} From e199309433176d7d37338f30576716a2a9ac1633 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Tue, 25 Feb 2025 16:37:29 +0800 Subject: [PATCH 10/29] fix --- logservice/logpuller/subscription_client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/logservice/logpuller/subscription_client.go b/logservice/logpuller/subscription_client.go index f78de7305..3050e7806 100644 --- a/logservice/logpuller/subscription_client.go +++ b/logservice/logpuller/subscription_client.go @@ -328,7 +328,7 @@ func (s *SubscriptionClient) Subscribe( s.totalSpans.Unlock() s.ds.AddPath(rt.subID, rt, dynstream.AreaSettings{ - MaxPendingSize: 8 * 1024 * 1024 * 1024, // 4GB + MaxPendingSize: 4 * 1024 * 1024 * 1024, // 4GB }) s.rangeTaskCh <- rangeTask{span: span, subscribedSpan: rt} From ac8814d03d607157abfb9e51351fa898064e70f4 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Tue, 25 Feb 2025 17:07:20 +0800 Subject: [PATCH 11/29] fix --- logservice/logpuller/subscription_client.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/logservice/logpuller/subscription_client.go b/logservice/logpuller/subscription_client.go index 3050e7806..7d0064c20 100644 --- a/logservice/logpuller/subscription_client.go +++ b/logservice/logpuller/subscription_client.go @@ -231,8 +231,11 @@ func NewSubscriptionClient( option := dynstream.NewOption() option.BatchCount = 1024 - option.UseBuffer = false - option.EnableMemoryControl = metrics.SlowestTablePullerResolvedTs.DeleteLabelValues() + // Note: after enable memory control, UseBuffer must be true. + // otherwise, the "wake event" may be blocked which will block the consumer + // and results in performance degradation. + option.UseBuffer = true + option.EnableMemoryControl = true ds := dynstream.NewParallelDynamicStream( func(subID SubscriptionID) uint64 { return uint64(subID) }, ®ionEventHandler{subClient: subClient}, From 187bf64df3709f6eec8b71e07a955f14450cc7d7 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Tue, 25 Feb 2025 17:07:54 +0800 Subject: [PATCH 12/29] fix --- logservice/logpuller/subscription_client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/logservice/logpuller/subscription_client.go b/logservice/logpuller/subscription_client.go index 7d0064c20..1b86fae51 100644 --- a/logservice/logpuller/subscription_client.go +++ b/logservice/logpuller/subscription_client.go @@ -331,7 +331,7 @@ func (s *SubscriptionClient) Subscribe( s.totalSpans.Unlock() s.ds.AddPath(rt.subID, rt, dynstream.AreaSettings{ - MaxPendingSize: 4 * 1024 * 1024 * 1024, // 4GB + MaxPendingSize: 2 * 1024 * 1024 * 1024, // 2GB }) s.rangeTaskCh <- rangeTask{span: span, subscribedSpan: rt} From 4068f3ffec1ee4f24793c4bfe372c284ad486795 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Tue, 25 Feb 2025 17:21:48 +0800 Subject: [PATCH 13/29] fix --- logservice/logpuller/subscription_client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/logservice/logpuller/subscription_client.go b/logservice/logpuller/subscription_client.go index 1b86fae51..7d0064c20 100644 --- a/logservice/logpuller/subscription_client.go +++ b/logservice/logpuller/subscription_client.go @@ -331,7 +331,7 @@ func (s *SubscriptionClient) Subscribe( s.totalSpans.Unlock() s.ds.AddPath(rt.subID, rt, dynstream.AreaSettings{ - MaxPendingSize: 2 * 1024 * 1024 * 1024, // 2GB + MaxPendingSize: 4 * 1024 * 1024 * 1024, // 4GB }) s.rangeTaskCh <- rangeTask{span: span, subscribedSpan: rt} From 92ee309356fc6d4449a38aa26427bc5416c0a21a Mon Sep 17 00:00:00 2001 From: lidezhu Date: Tue, 25 Feb 2025 17:38:34 +0800 Subject: [PATCH 14/29] remove callback --- logservice/eventstore/event_store.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/logservice/eventstore/event_store.go b/logservice/eventstore/event_store.go index b63cc22c8..b71033b24 100644 --- a/logservice/eventstore/event_store.go +++ b/logservice/eventstore/event_store.go @@ -321,9 +321,9 @@ func (p *writeTaskPool) run(ctx context.Context) { return } p.store.writeEvents(p.db, events) - for i := range events { - events[i].callback() - } + // for i := range events { + // events[i].callback() + // } buffer = buffer[:0] } } @@ -506,7 +506,8 @@ func (e *eventStore) RegisterDispatcher( kvs: kvs, callback: finishCallback, }) - return true + // FIXME + return false } advanceResolvedTs := func(ts uint64) { // filter out identical resolved ts From 4afe07dbfa89c3f9d0703b5f3e9e48003adb5e56 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Tue, 25 Feb 2025 17:49:14 +0800 Subject: [PATCH 15/29] adjust pebble options --- logservice/eventstore/event_store.go | 44 ++++++++++++++-------------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/logservice/eventstore/event_store.go b/logservice/eventstore/event_store.go index b71033b24..0c5c845f2 100644 --- a/logservice/eventstore/event_store.go +++ b/logservice/eventstore/event_store.go @@ -252,34 +252,34 @@ func newPebbleOptions() *pebble.Options { // Disable WAL to improve performance DisableWAL: true, - // Configure large memtable to keep recent data in memory - MemTableSize: memTableSize, - MemTableStopWritesThreshold: memTableCount, + // // Configure large memtable to keep recent data in memory + // MemTableSize: memTableSize, + // MemTableStopWritesThreshold: memTableCount, - // Configure large block cache to keep frequently accessed data in memory - Cache: pebble.NewCache(blockCacheSize), + // // Configure large block cache to keep frequently accessed data in memory + // Cache: pebble.NewCache(blockCacheSize), - // Configure options to optimize read/write performance - Levels: make([]pebble.LevelOptions, 2), + // // Configure options to optimize read/write performance + // Levels: make([]pebble.LevelOptions, 2), } - // Configure level strategy - opts.Levels[0] = pebble.LevelOptions{ // L0 - Latest data fully in memory - BlockSize: 32 << 10, // 32KB block size - IndexBlockSize: 128 << 10, // 128KB index block - Compression: pebble.NoCompression, // No compression in L0 for better performance - } + // // Configure level strategy + // opts.Levels[0] = pebble.LevelOptions{ // L0 - Latest data fully in memory + // BlockSize: 32 << 10, // 32KB block size + // IndexBlockSize: 128 << 10, // 128KB index block + // Compression: pebble.NoCompression, // No compression in L0 for better performance + // } - opts.Levels[1] = pebble.LevelOptions{ // L1 - Data that may be in memory or on disk - BlockSize: 64 << 10, - IndexBlockSize: 256 << 10, - Compression: pebble.SnappyCompression, - TargetFileSize: 256 << 20, // 256MB - } + // opts.Levels[1] = pebble.LevelOptions{ // L1 - Data that may be in memory or on disk + // BlockSize: 64 << 10, + // IndexBlockSize: 256 << 10, + // Compression: pebble.SnappyCompression, + // TargetFileSize: 256 << 20, // 256MB + // } - // Adjust L0 thresholds to delay compaction timing - opts.L0CompactionThreshold = 20 // Allow more files in L0 - opts.L0StopWritesThreshold = 40 // Increase stop-writes threshold + // // Adjust L0 thresholds to delay compaction timing + // opts.L0CompactionThreshold = 20 // Allow more files in L0 + // opts.L0StopWritesThreshold = 40 // Increase stop-writes threshold // Prefetch configuration opts.ReadOnly = false From 6c3f22e2a7c99825a1031a9a5add6a8c6b48b6ec Mon Sep 17 00:00:00 2001 From: lidezhu Date: Tue, 25 Feb 2025 18:00:34 +0800 Subject: [PATCH 16/29] restore pebble config --- logservice/eventstore/event_store.go | 44 ++++++++++++++-------------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/logservice/eventstore/event_store.go b/logservice/eventstore/event_store.go index 0c5c845f2..b71033b24 100644 --- a/logservice/eventstore/event_store.go +++ b/logservice/eventstore/event_store.go @@ -252,34 +252,34 @@ func newPebbleOptions() *pebble.Options { // Disable WAL to improve performance DisableWAL: true, - // // Configure large memtable to keep recent data in memory - // MemTableSize: memTableSize, - // MemTableStopWritesThreshold: memTableCount, + // Configure large memtable to keep recent data in memory + MemTableSize: memTableSize, + MemTableStopWritesThreshold: memTableCount, - // // Configure large block cache to keep frequently accessed data in memory - // Cache: pebble.NewCache(blockCacheSize), + // Configure large block cache to keep frequently accessed data in memory + Cache: pebble.NewCache(blockCacheSize), - // // Configure options to optimize read/write performance - // Levels: make([]pebble.LevelOptions, 2), + // Configure options to optimize read/write performance + Levels: make([]pebble.LevelOptions, 2), } - // // Configure level strategy - // opts.Levels[0] = pebble.LevelOptions{ // L0 - Latest data fully in memory - // BlockSize: 32 << 10, // 32KB block size - // IndexBlockSize: 128 << 10, // 128KB index block - // Compression: pebble.NoCompression, // No compression in L0 for better performance - // } + // Configure level strategy + opts.Levels[0] = pebble.LevelOptions{ // L0 - Latest data fully in memory + BlockSize: 32 << 10, // 32KB block size + IndexBlockSize: 128 << 10, // 128KB index block + Compression: pebble.NoCompression, // No compression in L0 for better performance + } - // opts.Levels[1] = pebble.LevelOptions{ // L1 - Data that may be in memory or on disk - // BlockSize: 64 << 10, - // IndexBlockSize: 256 << 10, - // Compression: pebble.SnappyCompression, - // TargetFileSize: 256 << 20, // 256MB - // } + opts.Levels[1] = pebble.LevelOptions{ // L1 - Data that may be in memory or on disk + BlockSize: 64 << 10, + IndexBlockSize: 256 << 10, + Compression: pebble.SnappyCompression, + TargetFileSize: 256 << 20, // 256MB + } - // // Adjust L0 thresholds to delay compaction timing - // opts.L0CompactionThreshold = 20 // Allow more files in L0 - // opts.L0StopWritesThreshold = 40 // Increase stop-writes threshold + // Adjust L0 thresholds to delay compaction timing + opts.L0CompactionThreshold = 20 // Allow more files in L0 + opts.L0StopWritesThreshold = 40 // Increase stop-writes threshold // Prefetch configuration opts.ReadOnly = false From 0a416ef3f9facb542ce9cf8ab3d3e64e914b398e Mon Sep 17 00:00:00 2001 From: lidezhu Date: Tue, 25 Feb 2025 18:07:29 +0800 Subject: [PATCH 17/29] adjust config --- logservice/eventstore/event_store.go | 11 +++++------ logservice/logpuller/subscription_client.go | 5 +++-- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/logservice/eventstore/event_store.go b/logservice/eventstore/event_store.go index b71033b24..26a37f0ae 100644 --- a/logservice/eventstore/event_store.go +++ b/logservice/eventstore/event_store.go @@ -309,7 +309,7 @@ func (p *writeTaskPool) run(ctx context.Context) { for i := 0; i < p.workerNum; i++ { go func() { defer p.store.wg.Done() - buffer := make([]eventWithCallback, 0, 128) + buffer := make([]eventWithCallback, 0, 32) for { select { case <-ctx.Done(): @@ -321,9 +321,9 @@ func (p *writeTaskPool) run(ctx context.Context) { return } p.store.writeEvents(p.db, events) - // for i := range events { - // events[i].callback() - // } + for i := range events { + events[i].callback() + } buffer = buffer[:0] } } @@ -506,8 +506,7 @@ func (e *eventStore) RegisterDispatcher( kvs: kvs, callback: finishCallback, }) - // FIXME - return false + return true } advanceResolvedTs := func(ts uint64) { // filter out identical resolved ts diff --git a/logservice/logpuller/subscription_client.go b/logservice/logpuller/subscription_client.go index 7d0064c20..d87c02f7d 100644 --- a/logservice/logpuller/subscription_client.go +++ b/logservice/logpuller/subscription_client.go @@ -98,7 +98,8 @@ type rangeTask struct { subscribedSpan *subscribedSpan } -const kvEventsCacheMaxSize = 32 +// TODO: dynamically adjust the size of the cache for some particular stream. +const kvEventsCacheMaxSize = 128 // subscribedSpan represents a span to subscribe. // It contains a sub span of a table(or the total span of a table), @@ -230,7 +231,7 @@ func NewSubscriptionClient( subClient.totalSpans.spanMap = make(map[SubscriptionID]*subscribedSpan) option := dynstream.NewOption() - option.BatchCount = 1024 + option.BatchCount = 10240 // Note: after enable memory control, UseBuffer must be true. // otherwise, the "wake event" may be blocked which will block the consumer // and results in performance degradation. From 02ddd8a5de8b47b279cd5f209f595d93eead5502 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Tue, 25 Feb 2025 21:48:05 +0800 Subject: [PATCH 18/29] calculate resolved ts outside ds --- logservice/logpuller/region_event_handler.go | 50 ++------ logservice/logpuller/region_request_worker.go | 11 +- logservice/logpuller/subscription_client.go | 112 ++++++++++++++++-- 3 files changed, 116 insertions(+), 57 deletions(-) diff --git a/logservice/logpuller/region_event_handler.go b/logservice/logpuller/region_event_handler.go index 6bb6f36a2..b680e1778 100644 --- a/logservice/logpuller/region_event_handler.go +++ b/logservice/logpuller/region_event_handler.go @@ -15,7 +15,6 @@ package logpuller import ( "encoding/hex" - "time" "unsafe" "github.com/pingcap/kvproto/pkg/cdcpb" @@ -37,6 +36,7 @@ const ( DataGroupError = 3 ) +// TODO: rename this not a real region event now, because the resolved ts is subscription level now. type regionEvent struct { state *regionFeedState worker *regionRequestWorker // TODO: remove the field @@ -89,7 +89,11 @@ func (h *regionEventHandler) Handle(span *subscribedSpan, events ...regionEvent) if event.entries != nil { handleEventEntries(span, event.state, event.entries) } else if event.resolvedTs != 0 { - handleResolvedTs(span, event.state, event.resolvedTs) + // do some safety check + if event.state != nil || event.worker != nil { + log.Panic("should not happen: resolvedTs event should not have state or worker") + } + span.advanceResolvedTs(event.resolvedTs) } else { log.Panic("should not reach", zap.Any("event", event), zap.Any("events", events)) } @@ -141,9 +145,9 @@ func (h *regionEventHandler) GetType(event regionEvent) dynstream.EventType { if event.entries != nil { return dynstream.EventType{DataGroup: DataGroupEntries, Property: dynstream.BatchableData} } else if event.resolvedTs != 0 { - // Note: resolved ts may from different region, so there are not periodic signal - return dynstream.EventType{DataGroup: DataGroupResolvedTs, Property: dynstream.BatchableData} - } else if event.state.isStale() { + // resolved ts is subscription level now + return dynstream.EventType{DataGroup: DataGroupResolvedTs, Property: dynstream.PeriodicSignal} + } else if event.state != nil && event.state.isStale() { return dynstream.EventType{DataGroup: DataGroupError, Property: dynstream.BatchableData} } else { log.Panic("unknown event type", @@ -274,39 +278,3 @@ func handleEventEntries(span *subscribedSpan, state *regionFeedState, entries *c } } } - -func handleResolvedTs(span *subscribedSpan, state *regionFeedState, resolvedTs uint64) { - if state.isStale() || !state.isInitialized() { - return - } - state.matcher.tryCleanUnmatchedValue() - regionID := state.getRegionID() - lastResolvedTs := state.getLastResolvedTs() - if resolvedTs < lastResolvedTs { - log.Info("The resolvedTs is fallen back in subscription client", - zap.Uint64("subscriptionID", uint64(state.region.subscribedSpan.subID)), - zap.Uint64("regionID", regionID), - zap.Uint64("resolvedTs", resolvedTs), - zap.Uint64("lastResolvedTs", lastResolvedTs)) - return - } - state.updateResolvedTs(resolvedTs) - - now := time.Now().UnixMilli() - lastAdvance := span.lastAdvanceTime.Load() - if now-lastAdvance > span.advanceInterval && span.lastAdvanceTime.CompareAndSwap(lastAdvance, now) { - ts := span.rangeLock.ResolvedTs() - if ts > 0 && span.initialized.CompareAndSwap(false, true) { - log.Info("subscription client is initialized", - zap.Uint64("subscriptionID", uint64(span.subID)), - zap.Uint64("regionID", regionID), - zap.Uint64("resolvedTs", ts)) - } - lastResolvedTs := span.resolvedTs.Load() - if ts > lastResolvedTs { - span.resolvedTs.Store(ts) - span.resolvedTsUpdated.Store(time.Now().Unix()) - span.advanceResolvedTs(ts) - } - } -} diff --git a/logservice/logpuller/region_request_worker.go b/logservice/logpuller/region_request_worker.go index e4d7a5e11..9109245e2 100644 --- a/logservice/logpuller/region_request_worker.go +++ b/logservice/logpuller/region_request_worker.go @@ -263,14 +263,15 @@ func (s *regionRequestWorker) dispatchRegionChangeEvents(events []*cdcpb.Event) zap.Any("error", eventData.Error)) state.markStopped(&eventError{err: eventData.Error}) case *cdcpb.Event_ResolvedTs: - regionEvent.resolvedTs = eventData.ResolvedTs + s.client.pushResolvedTsEvent(subscriptionID, state, eventData.ResolvedTs) + continue case *cdcpb.Event_LongTxn_: // ignore continue default: log.Panic("unknown event type", zap.Any("event", event)) } - s.client.pushRegionEventToDS(SubscriptionID(event.RequestId), regionEvent) + s.client.pushRegionEventToDS(subscriptionID, regionEvent) } else { log.Warn("region request worker receives a region event for an untracked region", zap.Uint64("workerID", s.workerID), @@ -286,11 +287,7 @@ func (s *regionRequestWorker) dispatchResolvedTsEvent(resolvedTsEvent *cdcpb.Res s.client.metrics.batchResolvedSize.Observe(float64(len(resolvedTsEvent.Regions))) for _, regionID := range resolvedTsEvent.Regions { if state := s.getRegionState(subscriptionID, regionID); state != nil { - s.client.pushRegionEventToDS(SubscriptionID(resolvedTsEvent.RequestId), regionEvent{ - state: state, - worker: s, - resolvedTs: resolvedTsEvent.Ts, - }) + s.client.pushResolvedTsEvent(subscriptionID, state, resolvedTsEvent.Ts) } else { log.Warn("region request worker receives a resolved ts event for an untracked region", zap.Uint64("workerID", s.workerID), diff --git a/logservice/logpuller/subscription_client.go b/logservice/logpuller/subscription_client.go index d87c02f7d..debac722f 100644 --- a/logservice/logpuller/subscription_client.go +++ b/logservice/logpuller/subscription_client.go @@ -32,6 +32,7 @@ import ( "github.com/pingcap/ticdc/pkg/pdutil" "github.com/pingcap/ticdc/pkg/spanz" "github.com/pingcap/ticdc/pkg/util" + "github.com/pingcap/ticdc/utils/chann" "github.com/pingcap/ticdc/utils/dynstream" "github.com/pingcap/tiflow/pkg/security" "github.com/prometheus/client_golang/prometheus" @@ -98,8 +99,18 @@ type rangeTask struct { subscribedSpan *subscribedSpan } +type resolvedTsEvent struct { + subID SubscriptionID + state *regionFeedState + resolvedTs uint64 +} + +func resolvedTsEventSizer(e resolvedTsEvent) int { + return 0 +} + // TODO: dynamically adjust the size of the cache for some particular stream. -const kvEventsCacheMaxSize = 128 +const kvEventsCacheMaxSize = 32 // subscribedSpan represents a span to subscribe. // It contains a sub span of a table(or the total span of a table), @@ -119,7 +130,7 @@ type subscribedSpan struct { advanceResolvedTs func(ts uint64) - advanceInterval int64 + lastAdvanceTime atomic.Int64 kvEventsCache []common.RawKVEntry @@ -130,8 +141,6 @@ type subscribedSpan struct { tryResolveLock func(regionID uint64, state *regionlock.LockedRangeState) staleLocksTargetTs atomic.Uint64 - lastAdvanceTime atomic.Int64 - initialized atomic.Bool resolvedTsUpdated atomic.Int64 resolvedTs atomic.Uint64 @@ -148,6 +157,8 @@ func (span *subscribedSpan) clearKVEventsCache() { type SubscriptionClientConfig struct { // The number of region request workers to send region task for every tikv store RegionRequestWorkerPerStore uint + // The number of goroutine to handle resolved ts event + ResolvedTsWorkerCh int } type sharedClientMetrics struct { @@ -189,6 +200,8 @@ type SubscriptionClient struct { spanMap map[SubscriptionID]*subscribedSpan } + resolvedTsCh []*chann.UnlimitedChannel[resolvedTsEvent, uint64] + // rangeTaskCh is used to receive range tasks. // The tasks will be handled in `handleRangeTask` goroutine. rangeTaskCh chan rangeTask @@ -212,6 +225,9 @@ func NewSubscriptionClient( lockResolver txnutil.LockResolver, credential *security.Credential, ) *SubscriptionClient { + if config.ResolvedTsWorkerCh <= 0 { + config.ResolvedTsWorkerCh = 4 + } subClient := &SubscriptionClient{ config: config, filterLoop: false, // FIXME @@ -229,12 +245,16 @@ func NewSubscriptionClient( errCache: newErrCache(), } subClient.totalSpans.spanMap = make(map[SubscriptionID]*subscribedSpan) + for i := 0; i < config.ResolvedTsWorkerCh; i++ { + subClient.resolvedTsCh = append(subClient.resolvedTsCh, chann.NewUnlimitedChannel[resolvedTsEvent, uint64](nil, resolvedTsEventSizer)) + } option := dynstream.NewOption() - option.BatchCount = 10240 - // Note: after enable memory control, UseBuffer must be true. + // Note: it is upperbound of the batch of the kv sent from tikv(not committed rows) + option.BatchCount = 1024 + // Note: after enable memory control, UseBuffer is better to be true. // otherwise, the "wake event" may be blocked which will block the consumer - // and results in performance degradation. + // and cause some performance degradation. option.UseBuffer = true option.EnableMemoryControl = true ds := dynstream.NewParallelDynamicStream( @@ -360,6 +380,16 @@ func (s *SubscriptionClient) wakeSubscription(subID SubscriptionID) { s.ds.Wake(subID) } +func (s *SubscriptionClient) pushResolvedTsEvent(subID SubscriptionID, state *regionFeedState, resolvedTs uint64) { + // make sure a subscription is always handled by a single goroutine + targetChIndex := int(subID) % s.config.ResolvedTsWorkerCh + s.resolvedTsCh[targetChIndex].Push(resolvedTsEvent{ + subID: subID, + state: state, + resolvedTs: resolvedTs, + }) +} + func (s *SubscriptionClient) pushRegionEventToDS(subID SubscriptionID, event regionEvent) { // fast path if !s.paused.Load() { @@ -396,6 +426,69 @@ func (s *SubscriptionClient) handleDSFeedBack(ctx context.Context) error { } } +func (s *SubscriptionClient) handleResolvedTsEvent(ctx context.Context, ch *chann.UnlimitedChannel[resolvedTsEvent, uint64]) error { + buffer := make([]resolvedTsEvent, 0, 1024) + for { + select { + case <-ctx.Done(): + return nil + default: + events, ok := ch.GetMultipleNoGroup(buffer) + if !ok { + return nil + } + for _, event := range events { + state := event.state + if state.isStale() || !state.isInitialized() { + continue + } + state.matcher.tryCleanUnmatchedValue() + regionID := state.getRegionID() + lastResolvedTs := state.getLastResolvedTs() + if event.resolvedTs < lastResolvedTs { + log.Info("The resolvedTs is fallen back in subscription client", + zap.Uint64("subscriptionID", uint64(state.region.subscribedSpan.subID)), + zap.Uint64("regionID", regionID), + zap.Uint64("resolvedTs", event.resolvedTs), + zap.Uint64("lastResolvedTs", lastResolvedTs)) + continue + } + state.updateResolvedTs(event.resolvedTs) + + s.totalSpans.Lock() + subSpan := s.totalSpans.spanMap[event.subID] + s.totalSpans.Unlock() + if subSpan == nil { + log.Warn("unknown subscription", zap.Uint64("subscriptionID", uint64(event.subID))) + continue + } + + now := time.Now().UnixMilli() + lastAdvance := subSpan.lastAdvanceTime.Load() + // TODO: use a config instead + if now-lastAdvance > 600 && subSpan.lastAdvanceTime.CompareAndSwap(lastAdvance, now) { + ts := subSpan.rangeLock.ResolvedTs() + if ts > 0 && subSpan.initialized.CompareAndSwap(false, true) { + log.Info("subscription client is initialized", + zap.Uint64("subscriptionID", uint64(subSpan.subID)), + zap.Uint64("regionID", regionID), + zap.Uint64("resolvedTs", ts)) + } + lastResolvedTs := subSpan.resolvedTs.Load() + if ts > lastResolvedTs { + subSpan.resolvedTs.Store(ts) + subSpan.resolvedTsUpdated.Store(time.Now().Unix()) + s.pushRegionEventToDS(subSpan.subID, regionEvent{ + resolvedTs: ts, + }) + } + } + } + buffer = buffer[:0] + } + } +} + // RegionCount returns subscribed region count for the span. func (s *SubscriptionClient) RegionCount(subID SubscriptionID) uint64 { s.totalSpans.RLock() @@ -407,7 +500,6 @@ func (s *SubscriptionClient) RegionCount(subID SubscriptionID) uint64 { } func (s *SubscriptionClient) Run(ctx context.Context) error { - // s.consume = consume if s.pd == nil { log.Warn("subsription client should be in test mode, skip run") return nil @@ -417,6 +509,9 @@ func (s *SubscriptionClient) Run(ctx context.Context) error { g, ctx := errgroup.WithContext(ctx) g.Go(func() error { return s.updateMetrics(ctx) }) + for i := 0; i < s.config.ResolvedTsWorkerCh; i++ { + g.Go(func() error { return s.handleResolvedTsEvent(ctx, s.resolvedTsCh[i]) }) + } g.Go(func() error { return s.handleDSFeedBack(ctx) }) g.Go(func() error { return s.handleRangeTasks(ctx) }) g.Go(func() error { return s.handleRegions(ctx, g) }) @@ -946,7 +1041,6 @@ func (s *SubscriptionClient) newSubscribedSpan( consumeKVEvents: consumeKVEvents, advanceResolvedTs: advanceResolvedTs, - advanceInterval: advanceInterval, } rt.initialized.Store(false) rt.resolvedTsUpdated.Store(time.Now().Unix()) From dc9a2ed4dd793663d777d0dddd7e736fa10a3423 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Tue, 25 Feb 2025 21:51:38 +0800 Subject: [PATCH 19/29] fix panic --- logservice/logpuller/region_event_handler.go | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/logservice/logpuller/region_event_handler.go b/logservice/logpuller/region_event_handler.go index b680e1778..0dd92f4a5 100644 --- a/logservice/logpuller/region_event_handler.go +++ b/logservice/logpuller/region_event_handler.go @@ -82,18 +82,20 @@ func (h *regionEventHandler) Handle(span *subscribedSpan, events ...regionEvent) } for _, event := range events { + if event.resolvedTs != 0 { + // do some safety check + if event.state != nil || event.worker != nil { + log.Panic("should not happen: resolvedTs event should not have state or worker") + } + span.advanceResolvedTs(event.resolvedTs) + continue + } if event.state.isStale() { h.handleRegionError(event.state, event.worker) continue } if event.entries != nil { handleEventEntries(span, event.state, event.entries) - } else if event.resolvedTs != 0 { - // do some safety check - if event.state != nil || event.worker != nil { - log.Panic("should not happen: resolvedTs event should not have state or worker") - } - span.advanceResolvedTs(event.resolvedTs) } else { log.Panic("should not reach", zap.Any("event", event), zap.Any("events", events)) } From 7ee72b94a46d13ddd722f0e6d3d1f528c5996df3 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Tue, 25 Feb 2025 22:04:45 +0800 Subject: [PATCH 20/29] add log --- logservice/eventstore/event_store.go | 1 + 1 file changed, 1 insertion(+) diff --git a/logservice/eventstore/event_store.go b/logservice/eventstore/event_store.go index 26a37f0ae..344835d0b 100644 --- a/logservice/eventstore/event_store.go +++ b/logservice/eventstore/event_store.go @@ -750,6 +750,7 @@ func (e *eventStore) writeEvents(db *pebble.DB, events []eventWithCallback) erro metrics.EventStoreWriteBytes.Add(float64(batch.Len())) start := time.Now() err := batch.Commit(pebble.NoSync) + log.Info("write events batch commit", zap.Int("kvCount", kvCount)) metrics.EventStoreWriteDurationHistogram.Observe(float64(time.Since(start).Milliseconds()) / 1000) return err } From 2580b9d7675b406ea8ea51f6535c876582799e5e Mon Sep 17 00:00:00 2001 From: lidezhu Date: Tue, 25 Feb 2025 22:10:01 +0800 Subject: [PATCH 21/29] remove compression --- logservice/eventstore/event_store.go | 58 ++++++++++++++-------------- 1 file changed, 28 insertions(+), 30 deletions(-) diff --git a/logservice/eventstore/event_store.go b/logservice/eventstore/event_store.go index 344835d0b..0d2c2cab4 100644 --- a/logservice/eventstore/event_store.go +++ b/logservice/eventstore/event_store.go @@ -24,7 +24,6 @@ import ( "time" "github.com/cockroachdb/pebble" - "github.com/klauspost/compress/zstd" "github.com/pingcap/log" "github.com/pingcap/ticdc/heartbeatpb" "github.com/pingcap/ticdc/logservice/logpuller" @@ -172,8 +171,8 @@ type eventStore struct { tableToDispatchers map[int64]map[common.DispatcherID]bool } - encoder *zstd.Encoder - decoder *zstd.Decoder + // encoder *zstd.Encoder + // decoder *zstd.Decoder } const ( @@ -201,16 +200,16 @@ func New( if err != nil { log.Panic("fail to remove path") } - // Create the zstd encoder - encoder, err := zstd.NewWriter(nil) - if err != nil { - log.Panic("Failed to create zstd encoder", zap.Error(err)) - } + // // Create the zstd encoder + // encoder, err := zstd.NewWriter(nil) + // if err != nil { + // log.Panic("Failed to create zstd encoder", zap.Error(err)) + // } - decoder, err := zstd.NewReader(nil) - if err != nil { - log.Panic("Failed to create zstd decoder", zap.Error(err)) - } + // decoder, err := zstd.NewReader(nil) + // if err != nil { + // log.Panic("Failed to create zstd decoder", zap.Error(err)) + // } store := &eventStore{ pdClock: pdClock, subClient: subClient, @@ -220,8 +219,8 @@ func New( writeTaskPools: make([]*writeTaskPool, 0, dbCount), gcManager: newGCManager(), - encoder: encoder, - decoder: decoder, + // encoder: encoder, + // decoder: decoder, } // TODO: update pebble options @@ -265,9 +264,9 @@ func newPebbleOptions() *pebble.Options { // Configure level strategy opts.Levels[0] = pebble.LevelOptions{ // L0 - Latest data fully in memory - BlockSize: 32 << 10, // 32KB block size - IndexBlockSize: 128 << 10, // 128KB index block - Compression: pebble.NoCompression, // No compression in L0 for better performance + BlockSize: 32 << 10, // 32KB block size + IndexBlockSize: 128 << 10, // 128KB index block + Compression: pebble.ZstdCompression, } opts.Levels[1] = pebble.LevelOptions{ // L1 - Data that may be in memory or on disk @@ -680,7 +679,7 @@ func (e *eventStore) GetIterator(dispatcherID common.DispatcherID, dataRange com startTs: dataRange.StartTs, endTs: dataRange.EndTs, rowCount: 0, - decoder: e.decoder, + // decoder: e.decoder, }, nil } @@ -735,10 +734,10 @@ func (e *eventStore) writeEvents(db *pebble.DB, events []eventWithCallback) erro for _, kv := range event.kvs { key := EncodeKey(uint64(event.subID), event.tableID, &kv) value := kv.Encode() - compressedValue := e.encoder.EncodeAll(value, nil) - ratio := float64(len(value)) / float64(len(compressedValue)) - metrics.EventStoreCompressRatio.Set(ratio) - if err := batch.Set(key, compressedValue, pebble.NoSync); err != nil { + // compressedValue := e.encoder.EncodeAll(value, nil) + // ratio := float64(len(value)) / float64(len(compressedValue)) + // metrics.EventStoreCompressRatio.Set(ratio) + if err := batch.Set(key, value, pebble.NoSync); err != nil { log.Panic("failed to update pebble batch", zap.Error(err)) } } @@ -750,7 +749,6 @@ func (e *eventStore) writeEvents(db *pebble.DB, events []eventWithCallback) erro metrics.EventStoreWriteBytes.Add(float64(batch.Len())) start := time.Now() err := batch.Commit(pebble.NoSync) - log.Info("write events batch commit", zap.Int("kvCount", kvCount)) metrics.EventStoreWriteDurationHistogram.Observe(float64(time.Since(start).Milliseconds()) / 1000) return err } @@ -774,7 +772,7 @@ type eventStoreIter struct { startTs uint64 endTs uint64 rowCount int64 - decoder *zstd.Decoder + // decoder *zstd.Decoder } func (iter *eventStoreIter) Next() (*common.RawKVEntry, bool, error) { @@ -787,13 +785,13 @@ func (iter *eventStoreIter) Next() (*common.RawKVEntry, bool, error) { } value := iter.innerIter.Value() - decompressedValue, err := iter.decoder.DecodeAll(value, nil) - if err != nil { - log.Panic("failed to decompress value", zap.Error(err)) - } - metrics.EventStoreScanBytes.Add(float64(len(decompressedValue))) + // decompressedValue, err := iter.decoder.DecodeAll(value, nil) + // if err != nil { + // log.Panic("failed to decompress value", zap.Error(err)) + // } + metrics.EventStoreScanBytes.Add(float64(len(value))) rawKV := &common.RawKVEntry{} - rawKV.Decode(decompressedValue) + rawKV.Decode(value) isNewTxn := false if iter.prevCommitTs == 0 || (rawKV.StartTs != iter.prevStartTs || rawKV.CRTs != iter.prevCommitTs) { isNewTxn = true From 221bd0993a32365fb4a107afa89d48fc7a0edfe1 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Tue, 25 Feb 2025 22:28:42 +0800 Subject: [PATCH 22/29] add some log --- logservice/eventstore/event_store.go | 1 + 1 file changed, 1 insertion(+) diff --git a/logservice/eventstore/event_store.go b/logservice/eventstore/event_store.go index 0d2c2cab4..cde02ed11 100644 --- a/logservice/eventstore/event_store.go +++ b/logservice/eventstore/event_store.go @@ -749,6 +749,7 @@ func (e *eventStore) writeEvents(db *pebble.DB, events []eventWithCallback) erro metrics.EventStoreWriteBytes.Add(float64(batch.Len())) start := time.Now() err := batch.Commit(pebble.NoSync) + log.Info("write events batch commit", zap.Int("kvCount", kvCount)) metrics.EventStoreWriteDurationHistogram.Observe(float64(time.Since(start).Milliseconds()) / 1000) return err } From ba514eb761c0ad26e7d6cf8e3261a19559032fbb Mon Sep 17 00:00:00 2001 From: lidezhu Date: Tue, 25 Feb 2025 22:39:14 +0800 Subject: [PATCH 23/29] change compression --- logservice/eventstore/event_store.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/logservice/eventstore/event_store.go b/logservice/eventstore/event_store.go index cde02ed11..489f889a1 100644 --- a/logservice/eventstore/event_store.go +++ b/logservice/eventstore/event_store.go @@ -266,13 +266,13 @@ func newPebbleOptions() *pebble.Options { opts.Levels[0] = pebble.LevelOptions{ // L0 - Latest data fully in memory BlockSize: 32 << 10, // 32KB block size IndexBlockSize: 128 << 10, // 128KB index block - Compression: pebble.ZstdCompression, + Compression: pebble.SnappyCompression, } opts.Levels[1] = pebble.LevelOptions{ // L1 - Data that may be in memory or on disk BlockSize: 64 << 10, IndexBlockSize: 256 << 10, - Compression: pebble.SnappyCompression, + Compression: pebble.ZstdCompression, TargetFileSize: 256 << 20, // 256MB } From 3eaf9670caf8786aa1e445fd9259861eac3d9a6a Mon Sep 17 00:00:00 2001 From: lidezhu Date: Tue, 25 Feb 2025 22:54:14 +0800 Subject: [PATCH 24/29] Revert "change compression" This reverts commit ba514eb761c0ad26e7d6cf8e3261a19559032fbb. --- logservice/eventstore/event_store.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/logservice/eventstore/event_store.go b/logservice/eventstore/event_store.go index 489f889a1..cde02ed11 100644 --- a/logservice/eventstore/event_store.go +++ b/logservice/eventstore/event_store.go @@ -266,13 +266,13 @@ func newPebbleOptions() *pebble.Options { opts.Levels[0] = pebble.LevelOptions{ // L0 - Latest data fully in memory BlockSize: 32 << 10, // 32KB block size IndexBlockSize: 128 << 10, // 128KB index block - Compression: pebble.SnappyCompression, + Compression: pebble.ZstdCompression, } opts.Levels[1] = pebble.LevelOptions{ // L1 - Data that may be in memory or on disk BlockSize: 64 << 10, IndexBlockSize: 256 << 10, - Compression: pebble.ZstdCompression, + Compression: pebble.SnappyCompression, TargetFileSize: 256 << 20, // 256MB } From ec582bc4c99d78908af72d55248c5902cf48a11b Mon Sep 17 00:00:00 2001 From: lidezhu Date: Tue, 25 Feb 2025 22:54:40 +0800 Subject: [PATCH 25/29] Revert "remove compression" This reverts commit 2580b9d7675b406ea8ea51f6535c876582799e5e. --- logservice/eventstore/event_store.go | 57 ++++++++++++++-------------- 1 file changed, 29 insertions(+), 28 deletions(-) diff --git a/logservice/eventstore/event_store.go b/logservice/eventstore/event_store.go index cde02ed11..344835d0b 100644 --- a/logservice/eventstore/event_store.go +++ b/logservice/eventstore/event_store.go @@ -24,6 +24,7 @@ import ( "time" "github.com/cockroachdb/pebble" + "github.com/klauspost/compress/zstd" "github.com/pingcap/log" "github.com/pingcap/ticdc/heartbeatpb" "github.com/pingcap/ticdc/logservice/logpuller" @@ -171,8 +172,8 @@ type eventStore struct { tableToDispatchers map[int64]map[common.DispatcherID]bool } - // encoder *zstd.Encoder - // decoder *zstd.Decoder + encoder *zstd.Encoder + decoder *zstd.Decoder } const ( @@ -200,16 +201,16 @@ func New( if err != nil { log.Panic("fail to remove path") } - // // Create the zstd encoder - // encoder, err := zstd.NewWriter(nil) - // if err != nil { - // log.Panic("Failed to create zstd encoder", zap.Error(err)) - // } + // Create the zstd encoder + encoder, err := zstd.NewWriter(nil) + if err != nil { + log.Panic("Failed to create zstd encoder", zap.Error(err)) + } - // decoder, err := zstd.NewReader(nil) - // if err != nil { - // log.Panic("Failed to create zstd decoder", zap.Error(err)) - // } + decoder, err := zstd.NewReader(nil) + if err != nil { + log.Panic("Failed to create zstd decoder", zap.Error(err)) + } store := &eventStore{ pdClock: pdClock, subClient: subClient, @@ -219,8 +220,8 @@ func New( writeTaskPools: make([]*writeTaskPool, 0, dbCount), gcManager: newGCManager(), - // encoder: encoder, - // decoder: decoder, + encoder: encoder, + decoder: decoder, } // TODO: update pebble options @@ -264,9 +265,9 @@ func newPebbleOptions() *pebble.Options { // Configure level strategy opts.Levels[0] = pebble.LevelOptions{ // L0 - Latest data fully in memory - BlockSize: 32 << 10, // 32KB block size - IndexBlockSize: 128 << 10, // 128KB index block - Compression: pebble.ZstdCompression, + BlockSize: 32 << 10, // 32KB block size + IndexBlockSize: 128 << 10, // 128KB index block + Compression: pebble.NoCompression, // No compression in L0 for better performance } opts.Levels[1] = pebble.LevelOptions{ // L1 - Data that may be in memory or on disk @@ -679,7 +680,7 @@ func (e *eventStore) GetIterator(dispatcherID common.DispatcherID, dataRange com startTs: dataRange.StartTs, endTs: dataRange.EndTs, rowCount: 0, - // decoder: e.decoder, + decoder: e.decoder, }, nil } @@ -734,10 +735,10 @@ func (e *eventStore) writeEvents(db *pebble.DB, events []eventWithCallback) erro for _, kv := range event.kvs { key := EncodeKey(uint64(event.subID), event.tableID, &kv) value := kv.Encode() - // compressedValue := e.encoder.EncodeAll(value, nil) - // ratio := float64(len(value)) / float64(len(compressedValue)) - // metrics.EventStoreCompressRatio.Set(ratio) - if err := batch.Set(key, value, pebble.NoSync); err != nil { + compressedValue := e.encoder.EncodeAll(value, nil) + ratio := float64(len(value)) / float64(len(compressedValue)) + metrics.EventStoreCompressRatio.Set(ratio) + if err := batch.Set(key, compressedValue, pebble.NoSync); err != nil { log.Panic("failed to update pebble batch", zap.Error(err)) } } @@ -773,7 +774,7 @@ type eventStoreIter struct { startTs uint64 endTs uint64 rowCount int64 - // decoder *zstd.Decoder + decoder *zstd.Decoder } func (iter *eventStoreIter) Next() (*common.RawKVEntry, bool, error) { @@ -786,13 +787,13 @@ func (iter *eventStoreIter) Next() (*common.RawKVEntry, bool, error) { } value := iter.innerIter.Value() - // decompressedValue, err := iter.decoder.DecodeAll(value, nil) - // if err != nil { - // log.Panic("failed to decompress value", zap.Error(err)) - // } - metrics.EventStoreScanBytes.Add(float64(len(value))) + decompressedValue, err := iter.decoder.DecodeAll(value, nil) + if err != nil { + log.Panic("failed to decompress value", zap.Error(err)) + } + metrics.EventStoreScanBytes.Add(float64(len(decompressedValue))) rawKV := &common.RawKVEntry{} - rawKV.Decode(value) + rawKV.Decode(decompressedValue) isNewTxn := false if iter.prevCommitTs == 0 || (rawKV.StartTs != iter.prevStartTs || rawKV.CRTs != iter.prevCommitTs) { isNewTxn = true From 73819759bd7ac800d753cd962a0cacf0d9aa9cb6 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Tue, 25 Feb 2025 22:57:30 +0800 Subject: [PATCH 26/29] increase mem quota --- logservice/eventstore/event_store.go | 2 +- logservice/logpuller/subscription_client.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/logservice/eventstore/event_store.go b/logservice/eventstore/event_store.go index 344835d0b..861f5cc69 100644 --- a/logservice/eventstore/event_store.go +++ b/logservice/eventstore/event_store.go @@ -309,7 +309,7 @@ func (p *writeTaskPool) run(ctx context.Context) { for i := 0; i < p.workerNum; i++ { go func() { defer p.store.wg.Done() - buffer := make([]eventWithCallback, 0, 32) + buffer := make([]eventWithCallback, 0, 128) for { select { case <-ctx.Done(): diff --git a/logservice/logpuller/subscription_client.go b/logservice/logpuller/subscription_client.go index debac722f..78f8ea2ac 100644 --- a/logservice/logpuller/subscription_client.go +++ b/logservice/logpuller/subscription_client.go @@ -352,7 +352,7 @@ func (s *SubscriptionClient) Subscribe( s.totalSpans.Unlock() s.ds.AddPath(rt.subID, rt, dynstream.AreaSettings{ - MaxPendingSize: 4 * 1024 * 1024 * 1024, // 4GB + MaxPendingSize: 8 * 1024 * 1024 * 1024, // 8GB }) s.rangeTaskCh <- rangeTask{span: span, subscribedSpan: rt} From 39c87c8268a34e06e49e33d26dcbdbca8b32a264 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Tue, 25 Feb 2025 23:07:42 +0800 Subject: [PATCH 27/29] increase mem quota --- logservice/logpuller/subscription_client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/logservice/logpuller/subscription_client.go b/logservice/logpuller/subscription_client.go index 78f8ea2ac..30055db15 100644 --- a/logservice/logpuller/subscription_client.go +++ b/logservice/logpuller/subscription_client.go @@ -352,7 +352,7 @@ func (s *SubscriptionClient) Subscribe( s.totalSpans.Unlock() s.ds.AddPath(rt.subID, rt, dynstream.AreaSettings{ - MaxPendingSize: 8 * 1024 * 1024 * 1024, // 8GB + MaxPendingSize: 4 * 1024 * 1024 * 1024, // 4 }) s.rangeTaskCh <- rangeTask{span: span, subscribedSpan: rt} From 63ef8eb038d3c7dd265c68218f717e40224f0c89 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Tue, 25 Feb 2025 23:19:58 +0800 Subject: [PATCH 28/29] adjust config --- logservice/eventstore/event_store.go | 6 +++++- logservice/logpuller/subscription_client.go | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/logservice/eventstore/event_store.go b/logservice/eventstore/event_store.go index 861f5cc69..789e9fbcc 100644 --- a/logservice/eventstore/event_store.go +++ b/logservice/eventstore/event_store.go @@ -261,6 +261,10 @@ func newPebbleOptions() *pebble.Options { // Configure options to optimize read/write performance Levels: make([]pebble.LevelOptions, 2), + + MaxConcurrentCompactions: func() int { + return 2 + }, } // Configure level strategy @@ -279,7 +283,7 @@ func newPebbleOptions() *pebble.Options { // Adjust L0 thresholds to delay compaction timing opts.L0CompactionThreshold = 20 // Allow more files in L0 - opts.L0StopWritesThreshold = 40 // Increase stop-writes threshold + opts.L0StopWritesThreshold = 80 // Increase stop-writes threshold // Prefetch configuration opts.ReadOnly = false diff --git a/logservice/logpuller/subscription_client.go b/logservice/logpuller/subscription_client.go index 30055db15..debac722f 100644 --- a/logservice/logpuller/subscription_client.go +++ b/logservice/logpuller/subscription_client.go @@ -352,7 +352,7 @@ func (s *SubscriptionClient) Subscribe( s.totalSpans.Unlock() s.ds.AddPath(rt.subID, rt, dynstream.AreaSettings{ - MaxPendingSize: 4 * 1024 * 1024 * 1024, // 4 + MaxPendingSize: 4 * 1024 * 1024 * 1024, // 4GB }) s.rangeTaskCh <- rangeTask{span: span, subscribedSpan: rt} From 278b353b954f26fc67f6dca98c3a765d1ca8cfbb Mon Sep 17 00:00:00 2001 From: lidezhu Date: Tue, 25 Feb 2025 23:36:10 +0800 Subject: [PATCH 29/29] adjust config --- logservice/eventstore/event_store.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/logservice/eventstore/event_store.go b/logservice/eventstore/event_store.go index 789e9fbcc..d98f98774 100644 --- a/logservice/eventstore/event_store.go +++ b/logservice/eventstore/event_store.go @@ -178,14 +178,14 @@ type eventStore struct { const ( dataDir = "event_store" - dbCount = 8 - writeWorkerNumPerDB = 2 + dbCount = 4 + writeWorkerNumPerDB = 4 // Pebble options - targetMemoryLimit = 2 << 30 // 2GB + targetMemoryLimit = 4 << 30 // 4GB memTableSize = 256 << 20 // 256MB - memTableCount = 4 - blockCacheSize = targetMemoryLimit - (memTableSize * memTableCount) // 1GB + memTableCount = 8 + blockCacheSize = targetMemoryLimit - (memTableSize * memTableCount) // 2GB ) func New( @@ -263,7 +263,7 @@ func newPebbleOptions() *pebble.Options { Levels: make([]pebble.LevelOptions, 2), MaxConcurrentCompactions: func() int { - return 2 + return 4 }, } @@ -282,8 +282,8 @@ func newPebbleOptions() *pebble.Options { } // Adjust L0 thresholds to delay compaction timing - opts.L0CompactionThreshold = 20 // Allow more files in L0 - opts.L0StopWritesThreshold = 80 // Increase stop-writes threshold + opts.L0CompactionThreshold = 20 // Allow more files in L0 + opts.L0StopWritesThreshold = 160 // Increase stop-writes threshold // Prefetch configuration opts.ReadOnly = false