Filecoin的存储单元称之为扇区(机械硬盘的最小存储单元就叫做扇区)。在miner进行数据存储的时候会向filecoin网络提供一系列的证明保证正确存储了数据。在证明sector存储的过程中需要经历一系列的处理,P1/P2/C1/C2。P-Precommit,预提交;C-commit 提交。 Filecoin的存储管理逻辑主要是现在sector-storage中。 在这之中 Worker、Sechduler、Manager三者是绕不开的话题。
Woekr主要分为localworker以及remoteworker,localworker处理本地服务,remoteworker支持远程服务处理。 Manager-管理多个Worker Sechduler-调度器调度多个Worker,一个Manager通常都会有一个Sehcduler Store-Store存储系统
README中的图分为上下两个部分,上面是Manager它包含了:store存储系统、localWorker和scheduler下面是remoteWorker。下面灰色的部分都是remoteWorker manager对上面的模块进行管理,使用scheduler对Worker进行管理,通常一个Manager对应一个Worker。Worker分为localworker以及remoteworker,
工作流程:Manager管理scheduler调度调度器,调度器管理多个worker。每一个连接到manager的worker都活同步他的CPU内存以及显存的信息。worker分为localworker以及remoteworker scheduler接受请求,根据请求类型以及资源请求选择合适的worker为他进行资源分配
主要模块:scheduler worker manager
模块的主要功能:资源分配与调度 模块文件:resource.go/cbor_gen.go/sched.go/sched_resource.go/sche_worker.go
此模块描述了运行时资源分配情况以及性能参数。
重要数据结构:
描述资源情况,他的方法是对Thread进行资源的分配
type Resources struct { MinMemory uint64 MaxMemory uint64 MaxParallelism int CanGPU bool BaseMinMemory uint64 }
重要函数方法:
线程资源分配函数
资源情况映射表,通过封装任务的Task类型和封装证明映射到对应的资源,用于查询此任务的资源占用情况。
/* Percent of threads to allocate to parallel tasks 12 * 0.92 = 11 16 * 0.92 = 14 24 * 0.92 = 22 32 * 0.92 = 29 64 * 0.92 = 58 128 * 0.92 = 117 */ var ParallelNum uint64 = 92 var ParallelDenom uint64 = 100 n := (wcpus * ParallelNum) / ParallelDenom //计算公式
初始化resrouceTable的函数方法
做各种角色的JSON包和cbor之间的转换 。编码和解码的集合,对象分别是 Call、WorkStae、 WorkID
重要方法函数:
将调用信息的JSON数据转换成cbor简明二进制展现
将cbor数据转换成数据结构打包的JSON数据
进行资源调度和分配的核心模块 是scheduler调度器方法的主要定义部分
关键类型::
用于描述调度窗口,存放了worker的请求数组
type schedWindow struct { allocated activeResources todo []*workerRequest }
优先级类型
用context表示JSON数据格式类型
表示一个匿名函数 type WorkerAction func(ctx context.Context, w Worker) error
一个很重要的类型——调度器——用于分配计算机资源给不同的任务.在Manager类与WorkScheduler类中都有他的身影
//分配器 type scheduler struct { workersLk sync.RWMutex workers map[WorkerID]*workerHandle schedule chan *workerRequest windowRequests chan *schedWindowRequest workerChange chan struct{} // worker added / changed/freed resources workerDisable chan workerDisableReq // owned by the sh.runSched goroutine schedQueue *requestQueue openWindows []*schedWindowRequest workTracker *workTracker info chan func(interface{}) //关闭中的 已经关闭的 测试同步通道 closing chan struct{} closed chan struct{} testSync chan struct{} // used for testing }
这个结构体根据目前的资源情况去调度分配 对sector、Windows、activeResource进行操作
计算机目前的活跃资源
资源分配窗口 显示当前计算机的活跃资源以及矿工提出的请求的序列
矿工请求处理窗口 把矿工所请求的已完成的任务存储到通道中 用此数据结构表示并存储一个矿工已经完成的任务
矿工具体的请求的数据结构。内部的请求信息包含了山区ID、证明信息、封装任务的任务类型、该任务的优先级、任务的开始时间、在heap存储空间中的索引下标、worker错误相应通道等
错误响应信息的数据结构
分配诊断请求信息结构体。包含了扇区ID 、封装任务类型以及该调度信息的优先级
资源诊断信息结构体,包含了请求信息的集合以及打开的窗口的集合
关键函数方法:
从context中获取任务的优先级
给任务赋上优先级
处理worker的错误信息 打包成error类并返会
创建一个新的调度器 返回该调度器的指针
调度器的成员函数 ,开始进入监听状态等待矿工的请求
进行资源分配的起始函数,分配器的成员函数,监听分配器中各个通道的变化并作出相对应的反应。
尝试进行资源分配的核心过程函数。 任务分配基于优先级、Worker工人资源可用性、特定任务Worker的性能、窗口请求年龄 1. 为每个调度队列中的任务找到能处理他们的可容纳窗口 1.1 创建解决任务的窗口容量的列表 acceptableWindws slice 1.2 根据任务选择器的性能为窗口排序 2. 再次遍历调度队列 把任务分配给第一个有可用资源的合适的窗口 3. 把被调度后的窗口提交给Worker
关闭调度处理的函数,遍历矿工列表,清理为他们分配的资源,释放矿工所占用的窗口
获取调度器信息的函数
关闭调度器函数 关闭失败则返回报错信息
分配器-计算机资源方面进行查询判断操作的包
关键函数方法::
(id WorkerID, wr storiface.WorkerResources, r Resources, locker sync.Locker, cb func() error) error 对worker分配资源的入口函数 并把它所需要的资源加入activeResource活动资源的结构体中
把worker所需要的资源加入到activeResource中的具体实现
释放woker所占的资源 并删除冲activeResource中删除对应的资源信息
查询目前的activeResource是否足够该woker所请求的资源 返回布尔值 true表示成功 false表示不能分配
查看目前活跃资源当中CPU占用率 最小内存空间和最大内存空间占用率 返回三个当中一个最大的,活动资源的利用率
返回woker占用的所有活动资源的占用率之和
woker分配数据结构。统揽。定义了schedWorker以及此结构体的一众方法 , Woker资源调度的具体实现
关键数据结构:
用于对Worker进行资源分配的结构体,schedWorker结构 包含了一个资源调度器 wokerer信息查询 workerID 分配窗口的通道 请求窗口数量
type schedWorker struct { sched *scheduler worker *workerHandle wid WorkerID heartbeatTimer *time.Ticker scheduledWindows chan *schedWindow taskDone chan struct{} windowsRequested int }
schedWorker通过workHandle来对worker进行管理 资源分配。workerHandle处理结构体:
type workerHandle struct { workerRpc Worker //工人的rpc调用接口 info storiface.WorkerInfo //工人的信息 preparing *activeResources active *activeResources lk sync.Mutex wndLk sync.Mutex activeWindows []*schedWindow enabled bool // for sync manager goroutine closing cleanupStarted bool closedMgr chan struct{} closingMgr chan struct{} }
这是核心的调度器数据结构的定义, 调度器的workers大映射表是从WorkerID到workerHandle解决方案
type scheduler struct { workersLk sync.RWMutex workers map[WorkerID]*workerHandle schedule chan *workerRequest windowRequests chan *schedWindowRequest workerChange chan struct{} // worker added / changed/freed resources workerDisable chan workerDisableReq // owned by the sh.runSched goroutine schedQueue *requestQueue openWindows []*schedWindowRequest workTracker *workTracker info chan func(interface{}) //关闭中的 已经关闭的 测试同步通道 closing chan struct{} closed chan struct{} testSync chan struct{} // used for testing }
关键函数方法::
该函数运行一个Worker处理其的请求,一个worker对应一个handleWorker的结构体,如果已经存在结构体 也就是在其他线程中解决此worker的情况 直接返回map[workerID]*handleWorker中的handleWorker的结构体指针,如果不存在,则需要配置调度矿工的信息,执行处理矿工的函数;
context类型的参数用于读取其其中的信息以对后续的worker调度进行设置
处理矿工需求的实现函数。 读取context中的内容开始。开始进行对worker请求的处理。如果worker调度器没有更多的窗口来应对请求,则结束。等待更多的窗口到来或者是等待阻塞中的任务结束。如果有新窗口的到来,则退出循环向窗口发送请求信息;如果没有新窗口的到来则等待更多的任务被主调度器分配或者等待worker结束一项任务并空闲下来。 空闲之后的worker要对其分配资源,绑定worker和窗口以及分配进程任务给窗口
worker清空函数,关闭worker的manager管理器以及worker,用内置close()函数关闭manager。通过删除调度器中打开窗口中的wokerID窗口,从而删除worker。
使得context所表示的内容无效化。先在主调度器的线程中等待清理程序,再等待清理程序完成以使其无效化,最终清空活动窗口,使得请求窗口清0
对session进行检查
检查所有的请求窗口
等待窗口的更新或者是worker任务的完成
把老窗口中的任务挪动到新窗口当中,将worker和窗口绑定在一起
将任务分配到目前所有的活动窗口中
开始执行一个任务的入口函数
核心函数: runSched():准备进入资源调度状态,执行此函数则进入监听状态,监听scheduler对象中通道的变化,并作出对应的决策 trySched():调度资源的核心实现,把请求的任务分配给窗口去 runWorker():开始对worker的请求进行处理,如果在其他线程中存在解决方案则直接返回,否则开始处理,开启一个goroutinue
模块文件:Manager.go\Manager_calltracker.go
重要数据结构:
包含manger的各种信息,结构体中包含了本地存储的接口、远程存储信息、本地存储类、Scheduler调度器以及storiface.CallID到WorkerID和chan result的映射等。
存储结果信息,有两个参数:interface{}结构体和Error出错信息类。
参数中含有能并行获取的数量限制以及一系列布尔型的变量,用于配置。 布尔型变量中包含了:AllowCommit 允许提交/AllowAddPiece 允许增加piece/AllowPreCommit1 允许预提交1/AllowPreCommit2 允许预提交2/AllowUnseal 允许解封扇区。
重要函数方法:
New这个函数负责创建一个新的manager,执行过程大致为: 1.首先新建一个本地的存储点,如果创建失败则跳出函数返回错误信息 2.随后新建一个provider,如果创建失败则跳出函数返回错误信息 3.新建一个线上(远距离)的存储点 4.将新的manager信息放入参数m中,设置m的基本参数并运行 5.新建任务,并且设置任务的基本参数 也就是sealerConfig中的参数 6.增加工人,如果增加失败返回错误信息 7.新建manager完成
这函数功能是增加当地存储空间,会报三种错误1.增加储存空间错误2.已经存在该存储空间错误3.存储空间配置错误
增加worker,调用sched文件中的runWork函数。开始进行worker请求处理,如果在其他线程中存在解决方案则直接返回,否则开始处理(这个地方的请求是增加Worker 并且传递过去了Worker类型的参数
提供远程的HTTP服务
空计划指令,什么都不进行操作
获取计划,经理通过等待worker获得计划
从指定扇区中取出一部分数据,通过调用函数tryReadUnsealedPiece()来尝试读取扇区中已经被解封的数据
尝试读取没封装的数据
在sector中新建文件存储片区,执行步骤:1.判锁2.查看是否有存在的片区,如果有则分配,如果没有则新建3.录入片区的一系列信息并返回
执行上述这一系列操作
这个模块就是将需要存储的信息传递给manager并且释放worker中扇区和缓存中无用的信息。新建选择其,并调用manager中的scheduler对象的方法,为worker分配资源。
释放没有被封装的(函数里也没有具体的实现)只有一句提示“即将执行释放操作”
移除封装或者未封装的扇区,或者移除缓冲区
代码复用,返回是否执行成功的结果
该函数方法总结:
此函数定义了大量的接口函数,通过manager进行资源调度、存储分配、远程HTTP服务、读取解封之后的扇区中的数据操作
重要数据结构:
代表了工作的代号,参数有method表示worker所进行封装的封装任务的任务类型
用于表示JSON格式的参数
代表工作状态的结构体,包含了工作的编号WorkID,工作的阶段(已经开始、执行中、已完成)、storiface.CallID到WorkID的绑定容器、专门用于保存work的错误、worker的主机名以及Work开始时间
这是一个枚举类型,具体表示了三种工作状态
const ( wsStarted WorkStatus = "started" // task started, not scheduled/running on a worker yet wsRunning WorkStatus = "running" // task running on a worker, waiting for worker return wsDone WorkStatus = "done" // task returned from the worker, results available )
重要函数方法:
新建工作 会出现调度错误,最后返回WorkID对象
这个模块主要就是负责对正在执行的工作设置追踪器(只有处于状态wsRunning才会设置)
获得工作状态,并执行cancel操作,将出现错误的程序或者没有被追踪的程序cancel掉。 1.新建一个workid 2.查看是否新建成功 3.查看workid是否存在,如果不存在返回错误 4.如果不存在则开始一个新的work,并且根据该work的状态执行操作 5.如果workid存在 说明程序已经运行或者已经被追踪,则返回true
开始工作并设置追踪其
首先获取Work的状态,获取失败则报错,其次判断work是否处于started阶段 是则返回,最后判断work是否被正确追踪,错误则返回
调用waitCall 等待调用
返回结果信息
允许中途结束work
模块文件:worker_ calltracker.go worker_local.go worker_tracked.go
主要是描述call指令的几种状态:开始、完成、结果返回、未完成
关键数据结构:
此类中包含了工作种类和是否允许交换的标志
存储空间、本地存储、扇区标号、返回信息、执行操作、是否允许交换、指令追踪、已经接受的任务、运行、任务锁、阶段、是否可以工作、关闭
关键函数方法:
新建一个本地的工人
申请一块扇区
异步调用 主要负责跟踪调用Call
尝试将操作结果返回给manager,如果操作成功就返回ture
新建扇区
在扇区中新增存储块
结束部分 删除不需要的信息 避免其占用内存
释放未封装的存储数据
解封并清除扇区或者缓存中的信息
转移存储
解封存储块,并且将没有用的信息删除
获取存储、cpu、gpu等信息
静态设置
模块关键包含:selector_alloc.go selector_existing.go selector_task.go
重要函数方法:
新建一个分配选择器
分配选择器初始化:1.获得工人的任务类型2.获得工人的路径3.获取扇区的大小4.寻找最适合的存储空间
compare,比较两个工人的对资源的利用效率,如果a < b返回ture 反之返回false
新建一个存在选择器
存在选择器初始化:1.获得工人的任务类型2.获得工人的路径3.获取扇区的大小4.寻找最适合的存储空间
compare,比较两个工人的对资源的利用效率,如果a< b返回ture 反之返回false
新建一个任务选择器
任务选择器初始化:获取支持工人的工作类型
compare,比较。分别获取ab两个工人的工作类型,选取达到目标做的较少的工人
scheduler: map[WorkerID]*workerHandle chan *workerRequest chan *schedWindowRequest WorkerID schedWindow activeResources []*workerRequest chan workerDisableReq []*schedWindow WorkerID *requestQueue []*workerRequest []*schedWindowRequest schedWindow chan *schedWindow *workTracker sync.Mutex map[storiface.CallID]struct{} abi.SectorID uuid.UUID map[storiface.CallID]trackedWork trackedWork storiface.WorkerJob CallID abi.SectorID sealtasks.TaskType 运行等待时间 开始时间 hostname WorkerID string trackedWorker WorkerID storiface.WorkerInfo *workTracker workerHandle: Worker WorkerInfo activeResources []*schedWindow schedWindow: activeResources []*workerRequest workerRequest WorkerAction sealtasks.TaskType storage.SectorRef WorkerSelector chan<- workerResponse
trySched()工作过程: 尝试进行资源分配的核心过程函数。 任务分配基于优先级、Worker工人资源可用性、特定任务Worker的性能、窗口请求年龄
func (wh *workerHandle) utilization() ->func (a *activeResources) utilization workHandle结构体调用活跃资源的方法来获得当前资源的占用情况
func (sh *scheduler) runWorker() -> func (sw *schedWorker) go sw.handleWorker() 工作过程:->func (sw *schedWorker) workerCompactWindows() ->func (sw *schedWorker) processAssignedWindows()
runWorker()该函数运行一个Worker处理其的请求,一个worker对应一个handleWorker的结构体,如果已经存在结构体 也就是在其他线程中解决此worker的情况 直接返回map[workerID]*handleWorker中的handleWorker的结构体指针,如果不存在,则需要配置调度矿工的信息,执行处理矿工的函数。 在存储好schedWorker worker资源调度清单之后 执行go sw.handleWorker(),开启一个goroutinue在后台并行执行。 handleWorker是一个专门处理worker调度的方法,他是处理矿工需求的实现函数。 读取context中的内容开始。开始进行对worker请求的处理。如果worker调度器没有更多的窗口来应对请求,则结束。等待更多的窗口到来或者是等待阻塞中的任务结束。如果有新窗口的到来,则退出循环向窗口发送请求信息;如果没有新窗口的到来则等待更多的任务被主调度器分配或者等待worker结束一项任务并空闲下来。空闲之后的worker要对其分配资源,绑定worker和窗口以及分配进程任务给窗口
SealPreCommit1() SealPreCommit2() SealCommit1()...等函数调用track方法 ->func (wt *workTracker) track -> 匿名函数
selector模块没有特别的调度线路,仅仅提供了三种选择器的函数方法以供调用。
原文地址:Lotus | Filecoin | Sector-storage部分源码与模块理解 - 知乎 (zhihu.com)