优化并发查询:如何处理并发统计查询中的区间锁

发布时间 2023-09-07 23:48:45作者: 若-飞

背景是查询一个有N亿条记录的mysql表

使用go多协程同时查询一个区间的不同数据代码:

func txHashesWorker(id int, tasks <-chan Task, results chan<- int, wg *sync.WaitGroup) {
	defer wg.Done()

	for task := range tasks {
		fmt.Printf("%s Process GotTask===> startId:%d,  endId:%d \n", time.Now().Format(lib.TIME_FORMAT), task.StartId, task.EndId)
		counter := lib.NewCounter(task.SpaceStore)

		var wg2 sync.WaitGroup
		wg2.Add(3)

		// CountChainInteractionCount
		go func(task Task) {
			defer wg2.Done()
			fmt.Printf("%s Process CountChainInteractionCount Start===> startId:%d,  endId:%d \n", time.Now().Format(lib.TIME_FORMAT), task.StartId, task.EndId)
			web3datas := counter.CountChainInteractionCount(task.Config.Collector.Mysql, task.StartId, task.EndId)
			fmt.Printf("%s Process CountChainInteractionCount InsertToWeb3DatasTemp===> web3datas length:%d, \n", time.Now().Format(lib.TIME_FORMAT), len(web3datas))
			lib.InsertToWeb3DatasTemp(task.Config.Web3Data.Mysql, web3datas)
			fmt.Printf("%s Process CountChainInteractionCount End===> startId:%d,  endId:%d \n", time.Now().Format(lib.TIME_FORMAT), task.StartId, task.EndId)
			results <- len(web3datas)
		}(task)

		// CountCreatedNftCount
		go func(task Task) {
			defer wg2.Done()
			fmt.Printf("%s Process CountCreatedNftCount Start===> startId:%d,  endId:%d \n", time.Now().Format(lib.TIME_FORMAT), task.StartId, task.EndId)
			web3datas := counter.CountCreatedNftCount(task.Config.Collector.Mysql, task.StartId, task.EndId)
			fmt.Printf("%s Process CountCreatedNftCount InsertToWeb3DatasTemp===> web3datas length:%d, \n", time.Now().Format(lib.TIME_FORMAT), len(web3datas))
			lib.InsertToWeb3DatasTemp(task.Config.Web3Data.Mysql, web3datas)
			fmt.Printf("%s Process CountCreatedNftCount End===> startId:%d,  endId:%d \n", time.Now().Format(lib.TIME_FORMAT), task.StartId, task.EndId)
			results <- len(web3datas)
		}(task)

		// CountNftTransactionsCount
		go func(task Task) {
			defer wg2.Done()
			fmt.Printf("%s Process DoTask CountNftTransactionsCount Start===> startId:%d,  endId:%d \n", time.Now().Format(lib.TIME_FORMAT), task.StartId, task.EndId)
			web3datas := counter.CountNftTransactionsCount(task.Config.Collector.Mysql, task.StartId, task.EndId)
			fmt.Printf("%s Process DoTask CountNftTransactionsCount InsertToWeb3DatasTemp===> web3datas length:%d, \n", time.Now().Format(lib.TIME_FORMAT), len(web3datas))
			lib.InsertToWeb3DatasTemp(task.Config.Web3Data.Mysql, web3datas)
			fmt.Printf("%s Process DoTask CountNftTransactionsCount End===> startId:%d,  endId:%d \n", time.Now().Format(lib.TIME_FORMAT), task.StartId, task.EndId)
			results <- len(web3datas)
		}(task)

		wg2.Wait()
	}
}

这个遇到问题在于3个group同时在一个区间进行,导致促发区间锁,影响性能,可以采用顺序方式执行。

==============================

在数据库应用程序中,处理并发查询是一个常见的挑战。特别是在涉及统计查询时,可能会出现并发事务对同一数据区间进行修改的情况,导致性能下降。本文将探讨如何优化并发查询,并重点讨论处理并发统计查询中的区间锁。

假设我们有一个数据库表tx_hashes,存储了交易哈希和相关信息。我们希望对该表进行三种统计查询:统计mint次数、统计转账次数和统计交互次数。每个查询都需要按照游戏名称(game_name)和地址(address)进行分组,并在给定的数据区间内进行统计。

以下是三个统计查询的示例:

  1. 统计mint次数:
sql
SELECT game_name, address, COUNT(*) AS mint_cnt
FROM tx_hashes
WHERE tx_type = 'mint' AND id >= 1000000 AND id < 1100000
GROUP BY game_name, address;
  1. 统计转账次数:
sql
SELECT game_name, address, COUNT(*) AS transfer_cnt
FROM tx_hashes
WHERE tx_type = 'transfer' AND id >= 1000000 AND id < 1100000
GROUP BY game_name, address;
  1. 统计交互次数:
sql
SELECT game_name, address, COUNT(*) AS interaction_cnt
FROM tx_hashes
WHERE tx_type = '' AND id >= 1000000 AND id < 1100000
GROUP BY game_name, address;

以上查询看起来很简单,但在高并发环境下可能会遇到问题。当多个并发事务同时执行这些查询时,可能会发生区间锁(Range Lock)的情况。区间锁是一种行级锁,用于保护指定范围内的数据,以防止并发事务对该范围内的数据进行修改。

假设有两个事务同时执行统计查询,其中一个事务正在统计mint次数,而另一个事务正在统计转账次数。由于它们都在相同的数据区间内进行查询,可能会触发区间锁。这样一来,两个事务将会相互等待,导致性能下降。

为了避免区间锁带来的并发性能问题,我们可以考虑按顺序执行这些查询,以确保每个查询在前一个查询完成后再执行。这样可以避免并发事务对同一区间的数据进行锁定,提高并发性能。

以下是按顺序执行这些查询的示例代码(使用GORM作为ORM框架):

go
package main

import (
	"fmt"

	"gorm.io/driver/mysql"
	"gorm.io/gorm"
)

// 定义用于存储统计结果的结构体
type Statistic struct {
	GameName       string
	Address        string
	MintCount      int
	TransferCount  int
	InteractionCnt int
}

func main() {
	dsn := "user:password@tcp(localhost:3306)/database"
	db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{})
	if err != nil {
		fmt.Println(err)
		return
	}

	var statistics []Statistic

	// 统计 mint 次数
	mintQuery := db.Model(&TxHash{}).
		Select("game_name, address, COUNT(*) AS mint_count").
		Where("tx_type = 'mint' AND id >= ? AND id < ?", 1000000, 1100000).
		Group("game_name, address").
		Find(&statistics)

	if mintQuery.Error != nil {
		fmt.Println(mintQuery.Error)
		return
	}

	// 统计 transfer 次数
	transferQuery := db.Model(&TxHash{}).
		Select("game_name, address, COUNT(*) AS transfer_count").
		Where("tx_type = 'transfer' AND id >= ? AND id < ?", 1000000, 1100000).
		Group("game_name, address").
		Find(&statistics)

	if transferQuery.Error != nil