日常的工作中,可能需要对分片表进行全表扫描,这里介绍两种并发全表扫描的方法:
a、外层循环遍历每个分片
b、每个分片内分页读取数据
// 初试化分片为0 minId := int64(0) // 遍历分片 for shardId := int64(0); shardId < info.getShardingCount(); { // 获取数据 data, err := info.PageQueryData(shardId, PageSize, minId) // 并发处理 此处省略 // ... // 如果获取到的数据长度小于每页数量,则表明数据读取完,分片数+1,否则继续读取该分片数据 if len(data) < PageSize { shardId++ minId = 0 } }
a、设置最大并发goroutine数量n
b、通过buffersize = n的channel控制并发goroutine数量:每开启一个goroutine向channel中插入一条数据,每个goroutine结束后从channel取出一条数据
c、通过sync.WaitGroup等待所有goroutine执行完后才读取下一批数据
// 初试化分片为0 minId := int64(0) // 遍历分片 for shardId := int64(0); shardId < info.getShardingCount(); { // 获取数据 data, err := info.BatchGetData(shardId, PageSize, minId) // 并发处理 for _, item := range data { info.waitGroup.Add(1) // 向channel插入数据 info.concurrencyCtlChannel <- 1 if item.Id > minId { minId = item.Id } go func(ctx context.Context, data *Data) { defer func() { if err := recover(); err != nil { logs.CtxError(ctx, "Task: task execute error. err: %v", err) } // 运行完后从channel读取数据 <-info.concurrencyCtlChannel info.waitGroup.Done() }() // 业务处理 此处省略 // ... }(ctx, item) } // 等待data全部处理完 info.waitGroup.Wait() // 如果获取到的数据长度小于每页数量,则表明数据读取完,分片数+1,否则继续读取该分片数据 if len(data) < PageSize { shardId++ minId = 0 } }
a、创建一组生产者goroutine和消费者goroutine,以及一组传递消息的channel,不同的生产者goroutine从不同的分片中分页获取数据,向channel中插入数据,消费者goroutine从channel中获取数据进行消费。
b、另外通过一个channel控制生产者goroutine的数量,用一个输入参数控制消费者goroutine的数量。
c、使用sync.WaitGroup等待生产者生产完数据后关闭管道,等待消费者消费完数据后结束任务。
func ProduceData() { for i = minShardingKey; i <= maxShardingKey; i++ { channel := channelGroup[i % channelSize] pwg.Add(1) // 控制produce goroutine数量 produceChannel <- 1 go func(ch chan *db.Data) { defer func() { t.pwg.Done() <- produceChannel } minId := int64(1) for { // 分页从数据库读取数据 data, err := db.GetData(shardingKey, minId, PageSize) // 向channel生产数据 for _, order := range data { if order.Id > minId { minId = order.Id } // 如果channel已满则阻塞在此等待消费goroutine消费channel中的数据 ch <- order } minId++ if len(orderInfos) < PageSize { break } } }(channel) } }
func ConsumeData() { for i = 0; i < consumerGouroutineSize; i++ { cwg.Add(1); ch := channelGroup[i & channelSize] go func(ch chan *db.Data) { defer func() { cwg.Done() } for order := range ch { // do business } } }(channel) } }
func WaidAndClose() { // 等所有生产者goroutine结束 pwg.Wait() // 关闭管道 for { //... } // 等到消费完毕 cwg.wait() // 结束任务 return nil }