Java教程

Job System 初探

本文主要是介绍Job System 初探,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

 

作者:i_dovelemon

日期:2023-08-24

主题:Fiber, Atomic Operation, MPMC Queue, Multiple thread, Job system

引言

        现代 CPU 是多核处理器,为了充分利用 CPU 多核处理的特性,游戏引擎会大量使用多线程 (multiple thread) 进行任务处理。

        而为了充分利用多线程,让开发变得简单,很多引擎会提供一个 job system 的系统,从而让开发人员将任务进行多线程并行处理,大大提高程序的性能。比如 unity 的 job system。

        之前阅读 OurMachinery 相关博客的时候,有看到一篇 Fiber based job system ,讲述了它们参考 Naughty Dog 在 GDC 2015 上的演讲 Parallelizing the Naughty Dog Engine Using Fibers 设计它们的 job system 的相关经验。当时看到的时候,就想着以后要来实际编写下,看看这个系统是怎么样的一个情况。

        最近刚好有了时间,所以抽空研究了下,简单做了一个 demo,这里和大家分享下一些心得体会。

        目前的设计,还是非常基本的,不排除有重大 bug 在里面,性能方面也不是最优化的,只能说基本满足了我需要的功能,相关代码地址可以参考这里。

概念

        这个系统的代码虽然很少,但是有很多以前不太接触的概念,这里先简单介绍下。

Fiber

        和 Unity 的 job system 不太一样的地方,这里设计的 job system 利用了 fiber 来作为最终执行 job 的上下文。为什么要使用 fiber 了?这是因为 fiber 的执行能够随时中断,跳转到另外一个 fiber 上去,然后在切换回来,有点类似协程。这样就使得代码的编写变得任意,我们可以随时中断我们的代码进程,让系统跳转到其他的 fiber 去执行任务。同时 fiber 的切换相对于 thread 来说,比较的轻量,性能更好。对于 fiber 的详细介绍,可以参考 Fibers

MPMC Queue

         Job system 里面大量的使用了 queue 这种数据结构,但是由于多线程需要同时多读多写(multi-producer multi-consumer),所以普通的队列无法满足要求。这里使用了一个基于原子操作,长度固定的循环队列来实现,参考这里 Bounded MPMC queue。

实现

接口

        接口设计的非常简单,除了系统的启动关闭之外,就只有两个函数,如下所示:

job_fence
job_kick(job_decal* decal, uint32_t count);

void
job_wait_for_complete(job_fence fence);

 

  其中 job_kick 用于抛发指定数量的任务,并且返回一个 fence 对象,用于同步等待。job_wait_for_complete 的功能就是等待指定的抛发任务全部完成的函数。

        job_decal 定义如下所示:

typedef void (*job_func)(void*);

typedef struct job_decal {
    job_func    job;
    void*       data;
} job_decal;

        下面是一个简单的使用案例:

void
test_job(void* job_data) {
    TracyCZoneN(ctx, "test_job", true);

    Sleep(1);
    for (uint32_t i = 0; i < 100; i++) {
        *(uint8_t*)job_data = sinf(*(uint8_t*)job_data + 100);
        *(uint8_t*)job_data = cosf(*(uint8_t*)job_data);
        *(uint8_t*)job_data = cosf(*(uint8_t*)job_data);
        *(uint8_t*)job_data = sinf(*(uint8_t*)job_data + 100);
        *(uint8_t*)job_data = cosf(*(uint8_t*)job_data);
        *(uint8_t*)job_data = cosf(*(uint8_t*)job_data);
        *(uint8_t*)job_data = sinf(*(uint8_t*)job_data + 100);
        *(uint8_t*)job_data = cosf(*(uint8_t*)job_data);
        *(uint8_t*)job_data = cosf(*(uint8_t*)job_data);
        *(uint8_t*)job_data = sinf(*(uint8_t*)job_data + 100);
        *(uint8_t*)job_data = cosf(*(uint8_t*)job_data);
        *(uint8_t*)job_data = cosf(*(uint8_t*)job_data);
    }

    TracyCZoneEnd(ctx);
}

void
test_1() {
    TracyCZoneN(ctx, "test_1", true);

    SYSTEM_INFO info;
    GetSystemInfo(&info);
    job_init(512, 512, info.dwNumberOfProcessors - 1);

    constexpr uint32_t job_count = 100;

    char test_data[job_count];

    job_decal decal[job_count];
    for (uint32_t i = 0; i < job_count; i++) {
        test_data[i] = i;
        decal[i].job = test_job;
        decal[i].data = &test_data[i];
    }

    job_fence fence = job_kick(decal, job_count);
    job_wait_for_complete(fence);

    job_shutdown();

    TracyCZoneEnd(ctx);
}

核心数据

        整个系统有如下几个重要的成员数据(省略了一些不重要的成员),这些数据在系统启动之后,就全部内存分配完毕,这样当我们整个 job system 运行的时候,就不再有相关的内存分配,提高性能:

typedef struct job_counter {
    volatile atomic_word count;
    volatile atomic_word gen;
} job_counter;

typedef struct job_queue_node {
    job_decal               decal;
    uint32_t                counter_index;
    job_counter*            counter;
} job_queue_node;

typedef struct job_system {
    ......
    atomic_queue<job_queue_node>*   job_queue;

    fiber*                          fiber_pool;
    fiber*                          switch_fiber_pool;  // which fiber switch to fiber in fiber_pool
    job_queue_node*                 fiber_exec_job_pool;  // which job will be executed by fiber
    uint32_t                        fiber_count;
    atomic_queue<uint32_t>*         free_fiber_indices_queue;

    job_counter*                    counter_pool;
    uint32_t                        counter_count;
    atomic_queue<uint32_t>*         free_counter_indices_queue;
} job_system;

 

        成员主要分成 3 个大部分:用于管理抛发任务的任务队列;用于管理 fiber 的相关数据;用于管理 job_counter,形成同步点的相关数据;

Job Queue

        job queue 是一个 mpmc queue 的队列,用于多线程的从这里面 enqueue/dequeue 任务,没有太复杂的地方。

Job Counter

        Job counter 是一组原子计数器,当我们抛发一组任务的时候,会从空闲队列中拿出一个计数器,用来保存当前抛发了多少任务。当每一个任务执行完毕的时候,这个计数器的值就减一。当这个计数器为 0 的时候,就表示所有的任务已经完成。

        我们看到 job_counter 的数据结构,不是只有一个 count,还有一个 gen 成员。这是因为原子计数器会被重复使用,一旦所有抛发的任务执行完毕之后,我们就会将计数器返回队列,那样就有机会被别人使用。这样就有可能出现,明明任务已经完成,但是计数器又不为 0 的情况。为了解决这个问题,特意添加了一个 generation 成员,用于表示当前的计数器是第多少代。这样当我们抛发任务,产生了一个 job_fence 的时候,可以将当前计数器的 gen 保存下来。当出现之前说的同一个计数器被多次使用的时候,我们就可以通过 gen 成员来判断是否为同一个计数器,从而正确的进行任务的同步。

Fiber Pool

        fiber pool 和 counter pool 类似,预先创建好了指定数量的 fiber,同时准备了一个空闲队列,用于获取空闲的 fiber。

        之前说过,这个系统执行任务的单元是 fiber,而不是线程,所以 fiber_exec_job_pool 用于保存对应位置的 fiber 所需要执行的 job 相关信息,这样我们就能够在 fiber 中获取当前需要执行的 job 相关信息,从而执行。

        了解过 fiber 相关概念之后,我们就知道,一个 fiber a 想要执行,就需要让 fiber b 通过调用 switch fiber 来将当前线程的执行核心转移到 fiber a。而一旦我们执行完毕了 fiber a,就需要将执行权转移回 fiber b,从而让 fiber b 的代码继续执行,实现类似协程的效果。所以 switch_fiber_pool 用于保存从哪个 fiber 切换到了当前 fiber,从而当 fiber 执行完毕之后,能够正确的 switch 回去。

重要流程

绑定线程核心

        job system 需要一些 woker thread,这些 worker thread 需要绑定到实际的 cpu 核心上去,这样才能够避免操作系统的调度,导致线程的执行发生中断,影响性能。而这个操作可以通过设置线程的 affinity mask 来完成,windows 下可以参考 SetThreadAffinityMask 。

        我们可以通过在线程中执行一个无限循环函数,来判断线程是否正确的绑定到了 cpu 核心上去。当我们正确的绑定了之后,执行无限循环函数会将对应的 cpu 资源全部沾满,从而能够在 windows 资源管理器中看到如下图所示的样子:

工作线程主循环

        工作线程,除了每个循环检测下是否需要退出线程之外,主要的任务就是判断是否有空闲的 job 需要执行。如果有空闲的 job 需要执行,那就拿取一个空闲的 job,并再拿取一个空闲的 fiber,设置相关的信息,然后将执行权交给对应的 fiber,让它去执行任务。当 fiber 执行完毕任务之后,将执行权再转交给之前的 fiber。

job_wait_for_complete

        这个同步函数,并不是一直阻塞等待任务完成。它在等待的过程中,除了判断对应的 job counter 是否为 0,即所有任务都执行完毕之外,也运行了工作线程的主循环函数,从而能够充分利用等待时间去执行更多的任务。

多线程 profiler

        有了多线程功能之后,需要进行调试,需要知道任务是否正确的并行处理了,依赖关系是否正确的建立了。所以就自然的想到了类似 unity 那样的 profiler ,它能够直观的看到 job 的执行情况。所以搜索了一些 profiler,看看有没有好用的。但是找下来,大部分都是性能分析相关的 profiler,支持多线程 timeline 的大部分都是收费的产品。最终找到了一个名为 Tracy Profiler 的开源工具,只要简单的接入,就能够实现 job 的 timeline profiler,题图就是来自于这个工具的截图,推荐大家使用。

总结

        以上就是这次研究 fiber based job system 的一些经验。虽然还很基础,但是已经基本能满足:多线程任务并发,任务依赖建立,任意线程抛发任务这些最基本的功能了。更多的使用场景还没有覆盖,所以可能有重大的 bug 和性能问题,等待以后多尝试使用之后再来分享经验。

参考

[1] Unity - Job system overview

[2] Fiber based job system

[3] Parallelizing the Naughty Dog Engine Using Fibers

[4] Windows - Fibers

[5] Bounded MPMC queue

[6] SetThreadAffinityMask

[7] Tracy Profiler

这篇关于Job System 初探的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!