From b1896e9ec1c5e68ccc0dd7f410b9b4c322f01f85 Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Wed, 26 Mar 2025 16:08:57 +0800 Subject: [PATCH 1/3] update --- logservice/schemastore/persist_storage.go | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/logservice/schemastore/persist_storage.go b/logservice/schemastore/persist_storage.go index 0d227eb07..4e233517e 100644 --- a/logservice/schemastore/persist_storage.go +++ b/logservice/schemastore/persist_storage.go @@ -85,6 +85,9 @@ type persistentStorage struct { // tableID -> total registered count tableRegisteredCount map[int64]int + + mutexForHack sync.Mutex + tablesForHack map[uint64][]commonEvent.Table } func exists(path string) bool { @@ -149,6 +152,7 @@ func newPersistentStorage( tableTriggerDDLHistory: make([]uint64, 0), tableInfoStoreMap: make(map[int64]*versionedTableInfoStore), tableRegisteredCount: make(map[int64]int), + tablesForHack: make(map[uint64][]commonEvent.Table), } isDataReusable := false @@ -267,7 +271,22 @@ func (p *persistentStorage) getAllPhysicalTables(snapTs uint64, tableFilter filt log.Debug("getAllPhysicalTables finish", zap.Any("duration(s)", time.Since(start).Seconds())) }() - return loadAllPhysicalTablesAtTs(storageSnap, gcTs, snapTs, tableFilter) + + p.mutexForHack.Lock() + if p.tablesForHack[snapTs] != nil { + p.mutexForHack.Unlock() + return p.tablesForHack[snapTs], nil + } + p.mutexForHack.Unlock() + + tables, err := loadAllPhysicalTablesAtTs(storageSnap, gcTs, snapTs, tableFilter) + if err == nil { + p.mutexForHack.Lock() + p.tablesForHack[snapTs] = tables + p.mutexForHack.Unlock() + } + return tables, err + //return loadAllPhysicalTablesAtTs(storageSnap, gcTs, snapTs, tableFilter) } // only return when table info is initialized From e0130b69f1760e95c0ab80a339acdb06ae8701c4 Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Wed, 26 Mar 2025 16:22:57 +0800 Subject: [PATCH 2/3] update --- logservice/schemastore/disk_format.go | 39 ++++++++++++++++++----- logservice/schemastore/persist_storage.go | 20 +----------- 2 files changed, 32 insertions(+), 27 deletions(-) diff --git a/logservice/schemastore/disk_format.go b/logservice/schemastore/disk_format.go index 1c6c89be8..f72f865b4 100644 --- a/logservice/schemastore/disk_format.go +++ b/logservice/schemastore/disk_format.go @@ -19,6 +19,7 @@ import ( "encoding/json" "math" "strings" + "sync" "time" "github.com/cockroachdb/pebble" @@ -709,22 +710,44 @@ func cleanObsoleteData(db *pebble.DB, oldGcTs uint64, gcTs uint64) { } } +var mutexForHack sync.Mutex +var tableInfoMapForHack = map[uint64]map[int64]*model.TableInfo{} +var tableMapForHack = map[uint64]map[int64]*BasicTableInfo{} +var partitionMapForHack = map[uint64]map[int64]BasicPartitionInfo{} + func loadAllPhysicalTablesAtTs( storageSnap *pebble.Snapshot, gcTs uint64, snapVersion uint64, tableFilter filter.Filter, ) ([]commonEvent.Table, error) { - // TODO: respect tableFilter(filter table in kv snap is easy, filter ddl jobs need more attention) - databaseMap, err := loadDatabasesInKVSnap(storageSnap, gcTs) - if err != nil { - return nil, err - } + var tableInfoMap map[int64]*model.TableInfo + var tableMap map[int64]*BasicTableInfo + var partitionMap map[int64]BasicPartitionInfo - tableInfoMap, tableMap, partitionMap, err := loadFullTablesInKVSnap(storageSnap, gcTs, databaseMap) - if err != nil { - return nil, err + mutexForHack.Lock() + if tableInfoMapForHack[gcTs] != nil { + tableInfoMap = tableInfoMapForHack[gcTs] + tableMap = tableMapForHack[gcTs] + partitionMap = partitionMapForHack[gcTs] + mutexForHack.Unlock() + } else { + mutexForHack.Unlock() + // TODO: respect tableFilter(filter table in kv snap is easy, filter ddl jobs need more attention) + databaseMap, err := loadDatabasesInKVSnap(storageSnap, gcTs) + if err != nil { + return nil, err + } + + tableInfoMap, tableMap, partitionMap, err = loadFullTablesInKVSnap(storageSnap, gcTs, databaseMap) + if err != nil { + return nil, err + } + tableInfoMapForHack[gcTs] = tableInfoMap + tableMapForHack[gcTs] = tableMap + partitionMapForHack[gcTs] = partitionMap } + log.Info("after load tables in kv snap", zap.Int("tableInfoMapLen", len(tableInfoMap)), zap.Int("tableMapLen", len(tableMap)), diff --git a/logservice/schemastore/persist_storage.go b/logservice/schemastore/persist_storage.go index 4e233517e..a262b38d8 100644 --- a/logservice/schemastore/persist_storage.go +++ b/logservice/schemastore/persist_storage.go @@ -85,9 +85,6 @@ type persistentStorage struct { // tableID -> total registered count tableRegisteredCount map[int64]int - - mutexForHack sync.Mutex - tablesForHack map[uint64][]commonEvent.Table } func exists(path string) bool { @@ -152,7 +149,6 @@ func newPersistentStorage( tableTriggerDDLHistory: make([]uint64, 0), tableInfoStoreMap: make(map[int64]*versionedTableInfoStore), tableRegisteredCount: make(map[int64]int), - tablesForHack: make(map[uint64][]commonEvent.Table), } isDataReusable := false @@ -272,21 +268,7 @@ func (p *persistentStorage) getAllPhysicalTables(snapTs uint64, tableFilter filt zap.Any("duration(s)", time.Since(start).Seconds())) }() - p.mutexForHack.Lock() - if p.tablesForHack[snapTs] != nil { - p.mutexForHack.Unlock() - return p.tablesForHack[snapTs], nil - } - p.mutexForHack.Unlock() - - tables, err := loadAllPhysicalTablesAtTs(storageSnap, gcTs, snapTs, tableFilter) - if err == nil { - p.mutexForHack.Lock() - p.tablesForHack[snapTs] = tables - p.mutexForHack.Unlock() - } - return tables, err - //return loadAllPhysicalTablesAtTs(storageSnap, gcTs, snapTs, tableFilter) + return loadAllPhysicalTablesAtTs(storageSnap, gcTs, snapTs, tableFilter) } // only return when table info is initialized From e2877b89bd4a92bd8de58ca88fa443d9b213c8d7 Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Wed, 26 Mar 2025 16:23:51 +0800 Subject: [PATCH 3/3] update --- logservice/schemastore/disk_format.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/logservice/schemastore/disk_format.go b/logservice/schemastore/disk_format.go index f72f865b4..ff99b6077 100644 --- a/logservice/schemastore/disk_format.go +++ b/logservice/schemastore/disk_format.go @@ -725,6 +725,11 @@ func loadAllPhysicalTablesAtTs( var tableMap map[int64]*BasicTableInfo var partitionMap map[int64]BasicPartitionInfo + databaseMap, err := loadDatabasesInKVSnap(storageSnap, gcTs) + if err != nil { + return nil, err + } + mutexForHack.Lock() if tableInfoMapForHack[gcTs] != nil { tableInfoMap = tableInfoMapForHack[gcTs] @@ -734,11 +739,6 @@ func loadAllPhysicalTablesAtTs( } else { mutexForHack.Unlock() // TODO: respect tableFilter(filter table in kv snap is easy, filter ddl jobs need more attention) - databaseMap, err := loadDatabasesInKVSnap(storageSnap, gcTs) - if err != nil { - return nil, err - } - tableInfoMap, tableMap, partitionMap, err = loadFullTablesInKVSnap(storageSnap, gcTs, databaseMap) if err != nil { return nil, err