Skip to content

Commit 1279a9f

Browse files
authored
statistics: speed up the backgroud stats update worker (#58490)
ref #57868
1 parent 0b33f50 commit 1279a9f

File tree

15 files changed

+184
-77
lines changed

15 files changed

+184
-77
lines changed

br/pkg/restore/snap_client/pipeline_items.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,7 @@ func (rc *SnapClient) GoUpdateMetaAndLoadStats(
297297
log.Info("start update metas", zap.Stringer("table", oldTable.Info.Name), zap.Stringer("db", oldTable.DB.Name))
298298
// the total kvs contains the index kvs, but the stats meta needs the count of rows
299299
count := int64(oldTable.TotalKvs / uint64(len(oldTable.Info.Indices)+1))
300-
if statsErr = statsHandler.SaveMetaToStorage(tbl.Table.ID, count, 0, "br restore"); statsErr != nil {
300+
if statsErr = statsHandler.SaveMetaToStorage(tbl.Table.ID, count, 0, "br restore", false); statsErr != nil {
301301
log.Error("update stats meta failed", zap.Any("table", tbl.Table), zap.Error(statsErr))
302302
}
303303
}

br/pkg/restore/snap_client/systable_restore_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -117,5 +117,5 @@ func TestCheckSysTableCompatibility(t *testing.T) {
117117

118118
// The above variables are in the file br/pkg/restore/systable_restore.go
119119
func TestMonitorTheSystemTableIncremental(t *testing.T) {
120-
require.Equal(t, int64(246), session.CurrentBootstrapVersion)
120+
require.Equal(t, int64(247), session.CurrentBootstrapVersion)
121121
}

pkg/session/bootstrap.go

+20-8
Original file line numberDiff line numberDiff line change
@@ -222,11 +222,12 @@ const (
222222

223223
// CreateStatsMetaTable stores the meta of table statistics.
224224
CreateStatsMetaTable = `CREATE TABLE IF NOT EXISTS mysql.stats_meta (
225-
version BIGINT(64) UNSIGNED NOT NULL,
226-
table_id BIGINT(64) NOT NULL,
227-
modify_count BIGINT(64) NOT NULL DEFAULT 0,
228-
count BIGINT(64) UNSIGNED NOT NULL DEFAULT 0,
229-
snapshot BIGINT(64) UNSIGNED NOT NULL DEFAULT 0,
225+
version BIGINT(64) UNSIGNED NOT NULL,
226+
table_id BIGINT(64) NOT NULL,
227+
modify_count BIGINT(64) NOT NULL DEFAULT 0,
228+
count BIGINT(64) UNSIGNED NOT NULL DEFAULT 0,
229+
snapshot BIGINT(64) UNSIGNED NOT NULL DEFAULT 0,
230+
last_stats_histograms_version BIGINT(64) UNSIGNED DEFAULT NULL,
230231
INDEX idx_ver(version),
231232
UNIQUE INDEX tbl(table_id)
232233
);`
@@ -1271,19 +1272,23 @@ const (
12711272
// Add extra_params to tidb_global_task and tidb_global_task_history.
12721273
version243 = 243
12731274

1274-
// version242 add Max_user_connections into mysql.user.
1275+
// version244 add Max_user_connections into mysql.user.
12751276
version244 = 244
12761277

12771278
// version245 updates column types of mysql.bind_info.
12781279
version245 = 245
12791280

12801281
// version246 adds new unique index for mysql.bind_info.
12811282
version246 = 246
1283+
1284+
// version 247
1285+
// Add last_stats_histograms_version to mysql.stats_meta.
1286+
version247 = 247
12821287
)
12831288

12841289
// currentBootstrapVersion is defined as a variable, so we can modify its value for testing.
12851290
// please make sure this is the largest version
1286-
var currentBootstrapVersion int64 = version246
1291+
var currentBootstrapVersion int64 = version247
12871292

12881293
// DDL owner key's expired time is ManagerSessionTTL seconds, we should wait the time and give more time to have a chance to finish it.
12891294
var internalSQLTimeout = owner.ManagerSessionTTL + 15
@@ -1465,6 +1470,7 @@ var (
14651470
upgradeToVer244,
14661471
upgradeToVer245,
14671472
upgradeToVer246,
1473+
upgradeToVer247,
14681474
}
14691475
)
14701476

@@ -3406,7 +3412,6 @@ func upgradeToVer245(s sessiontypes.Session, ver int64) {
34063412
if ver >= version245 {
34073413
return
34083414
}
3409-
34103415
doReentrantDDL(s, "ALTER TABLE mysql.bind_info MODIFY COLUMN original_sql LONGTEXT NOT NULL")
34113416
doReentrantDDL(s, "ALTER TABLE mysql.bind_info MODIFY COLUMN bind_sql LONGTEXT NOT NULL")
34123417
}
@@ -3465,6 +3470,13 @@ func upgradeToVer246(s sessiontypes.Session, ver int64) {
34653470
doReentrantDDL(s, "ALTER TABLE mysql.bind_info ADD UNIQUE INDEX digest_index(plan_digest, sql_digest)", dbterror.ErrDupKeyName)
34663471
}
34673472

3473+
func upgradeToVer247(s sessiontypes.Session, ver int64) {
3474+
if ver >= version247 {
3475+
return
3476+
}
3477+
doReentrantDDL(s, "ALTER TABLE mysql.stats_meta ADD COLUMN last_stats_histograms_version bigint unsigned DEFAULT NULL", infoschema.ErrColumnExists)
3478+
}
3479+
34683480
// initGlobalVariableIfNotExists initialize a global variable with specific val if it does not exist.
34693481
func initGlobalVariableIfNotExists(s sessiontypes.Session, name string, val any) {
34703482
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnBootstrap)

pkg/statistics/handle/bootstrap.go

+16-11
Original file line numberDiff line numberDiff line change
@@ -68,21 +68,26 @@ func (*Handle) initStatsMeta4Chunk(cache statstypes.StatsCache, iter *chunk.Iter
6868
physicalID = row.GetInt64(1)
6969
maxPhysicalID = max(physicalID, maxPhysicalID)
7070
newHistColl := *statistics.NewHistColl(physicalID, row.GetInt64(3), row.GetInt64(2), 4, 4)
71+
// During the initialization phase, we need to initialize LastAnalyzeVersion with the snapshot,
72+
// which ensures that we don't duplicate the auto-analyze of a particular type of table.
73+
// When the predicate columns feature is turned on, if a table has neither predicate columns nor indexes,
74+
// then auto-analyze will only analyze the _row_id and refresh stats_meta,
75+
// but since we don't have any histograms or topn's created for _row_id at the moment.
76+
// So if we don't initialize LastAnalyzeVersion with the snapshot here,
77+
// it will stay at 0 and auto-analyze won't be able to detect that the table has been analyzed.
78+
// But in the future, we maybe will create some records for _row_id, see:
79+
// https://github.com/pingcap/tidb/issues/51098
7180
snapshot := row.GetUint64(4)
81+
lastAnalyzeVersion, lastStatsHistUpdateVersion := snapshot, snapshot
82+
if !row.IsNull(5) {
83+
lastStatsHistUpdateVersion = max(lastStatsHistUpdateVersion, row.GetUint64(5))
84+
}
7285
tbl := &statistics.Table{
7386
HistColl: newHistColl,
7487
Version: row.GetUint64(0),
7588
ColAndIdxExistenceMap: statistics.NewColAndIndexExistenceMapWithoutSize(),
76-
// During the initialization phase, we need to initialize LastAnalyzeVersion with the snapshot,
77-
// which ensures that we don't duplicate the auto-analyze of a particular type of table.
78-
// When the predicate columns feature is turned on, if a table has neither predicate columns nor indexes,
79-
// then auto-analyze will only analyze the _row_id and refresh stats_meta,
80-
// but since we don't have any histograms or topn's created for _row_id at the moment.
81-
// So if we don't initialize LastAnalyzeVersion with the snapshot here,
82-
// it will stay at 0 and auto-analyze won't be able to detect that the table has been analyzed.
83-
// But in the future, we maybe will create some records for _row_id, see:
84-
// https://github.com/pingcap/tidb/issues/51098
85-
LastAnalyzeVersion: snapshot,
89+
LastAnalyzeVersion: lastAnalyzeVersion,
90+
LastStatsHistVersion: lastStatsHistUpdateVersion,
8691
}
8792
cache.Put(physicalID, tbl) // put this table again since it is updated
8893
}
@@ -95,7 +100,7 @@ func (*Handle) initStatsMeta4Chunk(cache statstypes.StatsCache, iter *chunk.Iter
95100

96101
func (h *Handle) initStatsMeta(ctx context.Context) (statstypes.StatsCache, error) {
97102
ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnStats)
98-
sql := "select HIGH_PRIORITY version, table_id, modify_count, count, snapshot from mysql.stats_meta"
103+
sql := "select HIGH_PRIORITY version, table_id, modify_count, count, snapshot, last_stats_histograms_version from mysql.stats_meta"
99104
rc, err := util.Exec(h.initStatsCtx, sql)
100105
if err != nil {
101106
return nil, errors.Trace(err)

pkg/statistics/handle/cache/statscache.go

+37-20
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ func (s *StatsCacheImpl) Update(ctx context.Context, is infoschema.InfoSchema, t
133133
err error
134134
)
135135
if err := util.CallWithSCtx(s.statsHandle.SPool(), func(sctx sessionctx.Context) error {
136-
query := "SELECT version, table_id, modify_count, count, snapshot from mysql.stats_meta where version > %? "
136+
query := "SELECT version, table_id, modify_count, count, snapshot, last_stats_histograms_version from mysql.stats_meta where version > %? "
137137
args := []any{lastVersion}
138138

139139
if onlyForAnalyzedTables {
@@ -174,6 +174,10 @@ func (s *StatsCacheImpl) Update(ctx context.Context, is infoschema.InfoSchema, t
174174
modifyCount := row.GetInt64(2)
175175
count := row.GetInt64(3)
176176
snapshot := row.GetUint64(4)
177+
var latestHistUpdateVersion uint64
178+
if !row.IsNull(5) {
179+
latestHistUpdateVersion = row.GetUint64(5)
180+
}
177181

178182
// Detect the context cancel signal, since it may take a long time for the loop.
179183
// TODO: add context to TableInfoByID and remove this code block?
@@ -192,31 +196,44 @@ func (s *StatsCacheImpl) Update(ctx context.Context, is infoschema.InfoSchema, t
192196
}
193197
tableInfo := table.Meta()
194198
// If the table is not updated, we can skip it.
195-
if oldTbl, ok := s.Get(physicalID); ok &&
196-
oldTbl.Version >= version &&
199+
200+
oldTbl, ok := s.Get(physicalID)
201+
if ok && oldTbl.Version >= version &&
197202
tableInfo.UpdateTS == oldTbl.TblInfoUpdateTS {
198203
continue
199204
}
200-
tbl, err := s.statsHandle.TableStatsFromStorage(
201-
tableInfo,
202-
physicalID,
203-
false,
204-
0,
205-
)
206-
// Error is not nil may mean that there are some ddl changes on this table, we will not update it.
207-
if err != nil {
208-
statslogutil.StatsLogger().Error(
209-
"error occurred when read table stats",
210-
zap.String("table", tableInfo.Name.O),
211-
zap.Error(err),
212-
)
213-
continue
205+
var tbl *statistics.Table
206+
needLoadColAndIdxStats := true
207+
// If the column/index stats has not been updated, we can reuse the old table stats.
208+
// Only need to update the count and modify count.
209+
if ok && latestHistUpdateVersion > 0 && oldTbl.LastStatsHistVersion >= latestHistUpdateVersion {
210+
tbl = oldTbl.Copy()
211+
// count and modify count is updated in finalProcess
212+
needLoadColAndIdxStats = false
214213
}
215-
if tbl == nil {
216-
tblToUpdateOrDelete.addToDelete(physicalID)
217-
continue
214+
if needLoadColAndIdxStats {
215+
tbl, err = s.statsHandle.TableStatsFromStorage(
216+
tableInfo,
217+
physicalID,
218+
false,
219+
0,
220+
)
221+
// Error is not nil may mean that there are some ddl changes on this table, we will not update it.
222+
if err != nil {
223+
statslogutil.StatsLogger().Error(
224+
"error occurred when read table stats",
225+
zap.String("table", tableInfo.Name.O),
226+
zap.Error(err),
227+
)
228+
continue
229+
}
230+
if tbl == nil {
231+
tblToUpdateOrDelete.addToDelete(physicalID)
232+
continue
233+
}
218234
}
219235
tbl.Version = version
236+
tbl.LastStatsHistVersion = latestHistUpdateVersion
220237
tbl.RealtimeCount = count
221238
tbl.ModifyCount = modifyCount
222239
tbl.TblInfoUpdateTS = tableInfo.UpdateTS

pkg/statistics/handle/ddl/ddl_test.go

+16-4
Original file line numberDiff line numberDiff line change
@@ -299,16 +299,28 @@ func TestDDLHistogram(t *testing.T) {
299299
<-h.DDLEventCh()
300300
testKit.MustExec("insert into t values(1,2),(3,4)")
301301
testKit.MustExec("analyze table t")
302-
303-
testKit.MustExec("alter table t add column c_null int")
304-
err := statstestutil.HandleNextDDLEventWithTxn(h)
305-
require.NoError(t, err)
306302
is := do.InfoSchema()
307303
require.Nil(t, h.Update(context.Background(), is))
308304
tbl, err := is.TableByName(context.Background(), ast.NewCIStr("test"), ast.NewCIStr("t"))
309305
require.NoError(t, err)
310306
tableInfo := tbl.Meta()
311307
statsTbl := do.StatsHandle().GetTableStats(tableInfo)
308+
lastHistUpdateVersion1 := statsTbl.LastStatsHistVersion
309+
310+
testKit.MustExec("alter table t add column c_null int")
311+
err = statstestutil.HandleNextDDLEventWithTxn(h)
312+
require.NoError(t, err)
313+
314+
// Check that the last_stats_histograms_version has been updated.
315+
is = do.InfoSchema()
316+
tbl, err = is.TableByName(context.Background(), ast.NewCIStr("test"), ast.NewCIStr("t"))
317+
require.NoError(t, err)
318+
tableInfo = tbl.Meta()
319+
require.Nil(t, h.Update(context.Background(), is))
320+
statsTbl = do.StatsHandle().GetTableStats(tableInfo)
321+
lastHistUpdateVersion2 := statsTbl.LastStatsHistVersion
322+
require.Greater(t, lastHistUpdateVersion2, lastHistUpdateVersion1)
323+
312324
require.True(t, statsTbl.ColAndIdxExistenceMap.HasAnalyzed(2, false))
313325
require.False(t, statsTbl.Pseudo)
314326
require.True(t, statsTbl.GetCol(tableInfo.Columns[2].ID).IsStatsInitialized())

pkg/statistics/handle/ddl/subscriber.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -316,7 +316,7 @@ func (h subscriber) delayedDeleteStats4PhysicalID(
316316
sctx sessionctx.Context,
317317
id int64,
318318
) error {
319-
startTS, err2 := storage.UpdateStatsMetaVersion(ctx, sctx, id)
319+
startTS, err2 := storage.UpdateStatsMetaVerAndLastHistUpdateVer(ctx, sctx, id)
320320
if err2 != nil {
321321
return errors.Trace(err2)
322322
}

pkg/statistics/handle/storage/BUILD.bazel

+1-1
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ go_test(
5959
"stats_read_writer_test.go",
6060
],
6161
flaky = True,
62-
shard_count = 27,
62+
shard_count = 28,
6363
deps = [
6464
":storage",
6565
"//pkg/domain",

pkg/statistics/handle/storage/dump_test.go

+27
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,33 @@ func TestLoadGlobalStats(t *testing.T) {
197197
require.Equal(t, 3, len(loadedStats.Partitions)) // p0, p1, global
198198
}
199199

200+
func TestLastStatsHistUpdateVersionAfterLoadStats(t *testing.T) {
201+
store, dom := testkit.CreateMockStoreAndDomain(t)
202+
tk := testkit.NewTestKit(t, store)
203+
tk.MustExec("use test")
204+
tk.MustExec("set @@tidb_analyze_version = 2")
205+
tk.MustExec("use test")
206+
tk.MustExec("drop table if exists t")
207+
tk.MustExec("create table t (a int, key(a))")
208+
tk.MustExec("insert into t values (1), (2)")
209+
tk.MustExec("analyze table t")
210+
211+
statsHandle := dom.StatsHandle()
212+
table, err := dom.InfoSchema().TableByName(context.Background(), ast.NewCIStr("test"), ast.NewCIStr("t"))
213+
require.NoError(t, err)
214+
tableInfo := table.Meta()
215+
statsTbl := statsHandle.GetTableStats(tableInfo)
216+
require.Greater(t, statsTbl.LastStatsHistVersion, uint64(0))
217+
origLastStatsHistVersion := statsTbl.LastStatsHistVersion
218+
219+
jsonTbl := getStatsJSON(t, dom, "test", "t")
220+
dom.StatsHandle().Clear()
221+
require.Nil(t, statsHandle.LoadStatsFromJSON(context.Background(), dom.InfoSchema(), jsonTbl, 0))
222+
require.NoError(t, statsHandle.Update(context.Background(), dom.InfoSchema()))
223+
statsTbl = statsHandle.GetTableStats(tableInfo)
224+
require.Greater(t, statsTbl.LastStatsHistVersion, origLastStatsHistVersion)
225+
}
226+
200227
func TestLoadPartitionStats(t *testing.T) {
201228
store, dom := testkit.CreateMockStoreAndDomain(t)
202229
tk := testkit.NewTestKit(t, store)

pkg/statistics/handle/storage/gc.go

+6-4
Original file line numberDiff line numberDiff line change
@@ -141,8 +141,10 @@ func DeleteTableStatsFromKV(sctx sessionctx.Context, statsIDs []int64, soft bool
141141
return errors.Trace(err)
142142
}
143143
for _, statsID := range statsIDs {
144-
// We only update the version so that other tidb will know that this table is deleted.
145-
if _, err = util.Exec(sctx, "update mysql.stats_meta set version = %? where table_id = %? ", startTS, statsID); err != nil {
144+
// We update the version so that other tidb will know that this table is deleted.
145+
// And we also update the last_stats_histograms_version to tell other tidb that the stats histogram is deleted
146+
// and they should update their memory cache. It's mainly for soft delete triggered by DROP STATS.
147+
if _, err = util.Exec(sctx, "update mysql.stats_meta set version = %?, last_stats_histograms_version = %? where table_id = %? ", startTS, startTS, statsID); err != nil {
146148
return err
147149
}
148150
if soft {
@@ -245,7 +247,7 @@ func deleteHistStatsFromKV(sctx sessionctx.Context, physicalID int64, histID int
245247
return errors.Trace(err)
246248
}
247249
// First of all, we update the version. If this table doesn't exist, it won't have any problem. Because we cannot delete anything.
248-
if _, err = util.Exec(sctx, "update mysql.stats_meta set version = %? where table_id = %? ", startTS, physicalID); err != nil {
250+
if _, err = util.Exec(sctx, "update mysql.stats_meta set version = %?, last_stats_histograms_version = %? where table_id = %? ", startTS, startTS, physicalID); err != nil {
249251
return err
250252
}
251253
// delete histogram meta
@@ -434,7 +436,7 @@ func MarkExtendedStatsDeleted(sctx sessionctx.Context,
434436
if err != nil {
435437
return 0, errors.Trace(err)
436438
}
437-
if _, err = util.Exec(sctx, "UPDATE mysql.stats_meta SET version = %? WHERE table_id = %?", version, tableID); err != nil {
439+
if _, err = util.Exec(sctx, "UPDATE mysql.stats_meta SET version = %?, last_stats_histograms_version = %? WHERE table_id = %?", version, version, tableID); err != nil {
438440
return 0, err
439441
}
440442
statsVer = version

0 commit comments

Comments
 (0)