• Unity JobSystem使用及技巧


    什么是JobSystem#

    并行编程#

    在游戏开发过程中我们经常会遇到要处理大量数据计算的需求,因此为了充分发挥硬件的多核性能,我们会需要用到并行编程,多线程编程也是并行编程的一种。

    线程是在进程内的,是共享进程内存的执行流,线程上下文切换的开销是相当高的,大概有2000的CPU Circle,同时会导致缓存失效,导致万级别的CPU Circle,Job System的设计使用了线程池,一开始先将大量的计算任务分配下去尽量减少线程的执行流被打断,也降低了一些thread的切换开销。

    Unreal Unity大部分都是这种模型,分配了一些work thread 然后其他的线程往这个线程塞Task,相比fixed thread模式性能好一些,多出了Task的概念,Unity里称这个为Job。

    建议看看Games104并行架构部分

    Unity JobSystem#

    通常Unity在一个线程上执行代码,该线程默认在程序开始时运行,称为主线程。我们在主线程使用JobSystem的API,去给worker线程下发任务,就是使用多线程

    通常Unity JobSystem会和Burst编译器一起使用,Burst会把IL变成使用LLVM优化的CPU代码,执行效率可以说大幅提升,但是使用Burst时候debug会变得困难,会缺少一些报错的堆栈,此时关闭burst可以看到一些堆栈,更方便debug。
    虽然并行编程有着种种的技巧,比如,线程之间沟通交流数据有需要加锁、原子操作等等的数据交换等操作。但是Unity为了让我们更容易的编写多线程代码,

    通过一些规则的制定,规避了一些复杂行为,同时也限制了一些功能,必要时这些功能也可以通过添加attribute、或者使用指针的方式来打破一些规则。
    规定包括但不限于:

    • 不允许访问静态变量
    • 不允许在Job里调度子Job
    • 只能向Job里传递值类型,并且是通过拷贝的方式从主线程将数据传输进Job,当Job运行结束数据会拷贝回主线程,我们可以在主线程的job对象访问Job的执行结果。
    • 不允许在Native容器里添加托管类型
    • 不允许使用指针
    • 不允许多个Job同时写入同一个地方
    • 不允许在Job里分配额外内存

    可以查看 官方文档

    应用场景#

    基本上所有需要处理数据计算的场景都可以使用,我们可以用它做大量的游戏逻辑的计算,
    我们也可以用它来做一些编辑器下的工具,可以达到加速的效果。

    细节#

    接口#

    unity官方提供了一系列的接口,写一个Struct实现接口便可以执行多线程代码,提供的接口包括:

    • IJob:一个线程
    • IJobParallelFor:多线程,使用时传入一个数组,根据数组长度会划分出任务数量,每个任务的索引就是数组元素的索引
    • IJobParallelForTransform:并行访问Transform组件的,这是unity自己实现的比较特殊的读写Transform信息的Job,实测下来用起来貌似worker还是一个在动,但是经过Burst编译后快不少。
    • IJobFor:几乎没用

    IJobParallelFor是最常用的,对数据源中的每一项都调用一次 Execute 方法。Execute 方法中有一个整数参数。该索引用于访问和操作作业实现中的数据源的单个元素。

    容器#

    Job使用的数据都需要使用Unity提供的Native容器,我们在主线程将要计算的数据装进NativeContainer里然后再传进Job。
    主要会使用的容器就是NativeArray,其实就是一个原生的数组类型,其他的容器这里暂时不提
    这些容器还要指定分配器,分配器包括

    • Allocator.Temp: 最快的配置。将其用于生命周期为一帧或更少的分配。从主线程传数据给Job时,不能使用Temp分配器。
    • Allocator.TempJob: 分配比 慢Temp但比 快Persistent。在四帧的生命周期内使用它进行线程安全分配。
    • Allocator.Persistent: 最慢的分配,但只要你需要它就可以持续,如果有必要,可以贯穿应用程序的整个生命周期。它是直接调用malloc. 较长的作业可以使用此 NativeContainer 分配类型。

    容器在实现Job的Struct里可以打标记,包括ReadOnly、WriteOnly,一方面可以提升性能,另一方面有时候会有读写冲突的情况,此时应该尽量多标记ReadOnly,避免一些数据冲突。

    创建 使用#

    官方文档已经说的很好。
    https://docs.unity3d.com/Manual/JobSystemCreatingJobs.html
    对于ParallelFor的Schedule多了一些参数,innerloopBatchCount这个参数可以留意一下,可以理解为一个线程次性拿走多少任务。

    Job之间互相依赖#

    https://docs.unity3d.com/Manual/JobSystemJobDependencies.html

    其实执行了一个Job之后,在主线再执行另一个Job也不会性能差很多,并且易于debug,可以断点查看多个阶段执行过程中Job的数据情况,但是追求完美还是可以把依赖填上。

    性能测试比较#

    笔者曾经做过简单的使用Job和不用Job的对比,通过打上Unity Profiler的标记,可以方便的在图表里查看运行开销。

    Profiler.BeginSample("Your Target Profiler Name");
    // your code
    Profiler.EndSample();
    

    IJob#

    using System.Collections;
    using System.Collections.Generic;
    using Unity.Collections;
    using Unity.Jobs;
    
    using UnityEngine;
    using Unity.Burst;
    [BurstCompile] 
    public class JobTest : MonoBehaviour
    {
    
        public bool useJob;
        // Update is called once per frame
        void Update()
        {
            float startTime = Time.realtimeSinceStartup;
            if (useJob)
            {
                NativeArray result = new NativeArray(1, Allocator.TempJob);//four frame allocate
                MyJobSystem0 job0 = new MyJobSystem0();
                job0.a = 0;
                job0.b = 1;
                job0.result = result;
                JobHandle handle = job0.Schedule();
                handle.Complete();
                result.Dispose();
                Debug.Log(("Use Job:"+ (Time.realtimeSinceStartup - startTime) * 1000f) + "ms");
            }
            else
            {
                var index = 0;
                for(int i = 0; i < 1000000; i++)
                {
                    index++;
                }
                Debug.Log(("Not Use Job:"+ (Time.realtimeSinceStartup - startTime) * 1000f) + "ms");
            }
        }
        
    }
    [BurstCompile] 
    public struct MyJobSystem0 : IJob
    {
        public int a;
        public int b;
        public NativeArray result;
    
        public void Execute()
        {
            var index = 0;
            for(int i = 0; i < 1000000; i++)
            {
                index++;
            }
            result[0] = a + b;
        }
    }
    

    使用IJob执行一项复杂的工作,没有使用job跑了2-4ms,使用job也是跑了2-4 ms,但是使用了job+burst,这个for循环的速度就变得只有0.2-0.8 ms了,burst对此优化挺大的。

    IJobParallelFor#

    using System;
    using System.Collections;
    using System.Collections.Generic;
    using Unity.Collections;
    using Unity.Jobs;
    using UnityEngine;
    
    public class JobForTest : MonoBehaviour
    {
        public bool useJob;
        public int dataCount;
        private NativeArray a;
    
        private NativeArray b;
    
        private NativeArray result;
    
        private List noJobA;
    
        private List noJobB;
    
        private List noJobResult;
        // Update is called once per frame
        private void Start()
        {
            a = new NativeArray(dataCount, Allocator.Persistent);
            b = new NativeArray(dataCount, Allocator.Persistent);
            result = new NativeArray(dataCount, Allocator.Persistent);
            noJobA = new List();
            noJobB = new List();
            noJobResult = new List();
            
            for (int i = 0; i < dataCount; ++i)
            {
                a[i] = 1.0f;
                b[i] = 2.0f;
                noJobA.Add(1.0f);
                noJobB.Add(2.0f);
                noJobResult.Add(0.0f);
            }
        }
    
        void Update()
        {
            float startTime = Time.realtimeSinceStartup;
            if (useJob)
            {
                MyParallelJob jobData = new MyParallelJob();
                jobData.a = a;  
                jobData.b = b;
                jobData.result = result;
                // 调度作业,为结果数组中的每个索引执行一个 Execute 方法,且每个处理批次只处理一项
                JobHandle handle = jobData.Schedule(result.Length, 1);
                // 等待作业完成
                handle.Complete();
                
                Debug.Log(("Use Job:"+ (Time.realtimeSinceStartup - startTime) * 1000f) + "ms");
    
            }
            else
            {
    
                for(int i = 0; i < dataCount; i++)
                {
                    noJobA[i] = 1;
                    noJobB[i] = 2;
                    noJobResult[i] = noJobA[i]+noJobB[i];
                }
                Debug.Log(("Not Use Job:"+ (Time.realtimeSinceStartup - startTime) * 1000f) + "ms");
            }
        }
    
        private void OnDestroy()
        {
            // 释放数组分配的内存
            a.Dispose();
            b.Dispose();
            result.Dispose();
        }
    }
    
    // 将两个浮点值相加的作业
    public struct MyParallelJob : IJobParallelFor
    {
        [ReadOnly]
        public NativeArray a;
        [ReadOnly]
        public NativeArray b;
        public NativeArray result;
    
        public void Execute(int i)
        {
            result[i] = a[i] + b[i];
        }
    }
    

    普通for寻找两个list,遍历list元素然后相加,数据量10万,每一个批次这里是处理1个execute, 不开job 2.48ms,开job 1.34ms,job开了burst就0.28ms。

    IJobParalForTransform#

    using Unity.Burst;
    using Unity.Collections;
    using Unity.Jobs;
    using Unity.Mathematics;
    using UnityEngine;
    using UnityEngine.Jobs;
    
    public class TransformJobs : MonoBehaviour
    {
        public bool useJob;
        public int dataCount = 100;
        //public int batchCount;
        // 用于存储transform的NativeArray
        private TransformAccessArray m_TransformsAccessArray;
        private NativeArray<Vector3> m_Velocities;
    
        private PositionUpdateJob m_Job;
        private JobHandle m_PositionJobHandle;
        private GameObject[] sphereGameObjects; 
        //[BurstCompile]
        struct PositionUpdateJob : IJobParallelForTransform
        {
            // 给每个物体设置一个速度
            [ReadOnly]
            public NativeArray<Vector3> velocity;
    
            public float deltaTime;
    
            // 实现IJobParallelForTransform的结构体中Execute方法第二个参数可以获取到Transform
            public void Execute(int i, TransformAccess transform)
            {
                transform.position += velocity[i] * deltaTime;
            }
        }
    
        void Start()
        {
            m_Velocities = new NativeArray<Vector3>(dataCount, Allocator.Persistent);
    
            // 用代码生成一个球体,作为复制的模板
            var sphere = GameObject.CreatePrimitive(PrimitiveType.Sphere);
            // 关闭阴影
            var renderer = sphere.GetComponent<MeshRenderer>();
            renderer.shadowCastingMode = UnityEngine.Rendering.ShadowCastingMode.Off;
            renderer.receiveShadows = false;
    
            // 关闭碰撞体
            var collider = sphere.GetComponent<Collider>();
            collider.enabled = false;
    
            // 保存transform的数组,用于生成transform的Native Array
            var transforms = new Transform[dataCount];
            sphereGameObjects = new GameObject[dataCount];
            int row = (int)Mathf.Sqrt(dataCount);
            // 生成1W个球
            for (int i = 0; i < row; i++)
            {
                for (int j = 0; j < row; j++)
                {
                    var go = GameObject.Instantiate(sphere);
                    go.transform.position = new Vector3(j, 0, i);
                    sphereGameObjects[i * row + j] = go;
                    transforms[i*row+j] = go.transform;
                    m_Velocities[i*row+j] = new Vector3(0.1f * j, 0, 0.1f * j);
                }
            }
    
            m_TransformsAccessArray = new TransformAccessArray(transforms);
        }
    
        void Update()
        {
            //float startTime = Time.realtimeSinceStartup;
            if (useJob)
            {
                // 实例化一个job,传入数据
                m_Job = new PositionUpdateJob()
                {
                    deltaTime = Time.deltaTime,
                    velocity = m_Velocities,
                };
    
                // 调度job执行
                m_PositionJobHandle = m_Job.Schedule(m_TransformsAccessArray);
                //Debug.Log(("Use Job:"+ (Time.realtimeSinceStartup - startTime) * 1000f) + "ms");
            }
            else
            {
                for (int i = 0; i < dataCount; ++i)
                {
                    sphereGameObjects[i].transform.position +=  m_Velocities[i] * Time.deltaTime;
                }
                //Debug.Log(("Not Use Job:"+ (Time.realtimeSinceStartup - startTime) * 1000f) + "ms");
            }
           
        }
    
        // 保证当前帧内Job执行完毕
        private void LateUpdate()
        {
            m_PositionJobHandle.Complete();
        }
    
        // OnDestroy中释放NativeArray的内存
        private void OnDestroy()
        {
            m_Velocities.Dispose();
            m_TransformsAccessArray.Dispose();
        }
    }
    
    

    100+vec3,不用job 0.02ms,用job +burst 0.02ms
    1600+vec3,不用job 0.31ms,用job 0.07ms +burst 0.04ms
    1万+vec3,不用job 2.23ms,用job 0.35ms + burst 0.12ms
    1万+float3,不用job 2.55ms,用job 0.4ms
    100万+float3,不用job 199ms ,用job 40ms + burst 31ms
    100万+vec3,不用job 189ms ,用job 35ms + burst 31ms

    高级技巧#

    使用特定的数学库中的实现#

    unity特定的数学库中的数据类型可以获取simd优化,比如vector3就可以换成float3,但是缺少的数学库,就要自己解决了,所以我一般就vector3。

    在合适的时机Schedule和Complete#

    拥有作业所需的数据后就立即在作业上调用 Schedule,并仅在需要结果时才开始在作业上调用 Complete。最好是调度当前不与正在运行的任何其他作业竞争的、不需要等待的作业。例如,如果在一帧结束和下一帧开始之间的一段时间没有作业正在运行,并且可以接受一帧延迟,则可以在一帧结束时调度作业,并在下一帧中使用其结果。另一方面,如果游戏占满了与其他作业的转换期,但在帧中的其他位置存在大量未充分利用的时段,那么在这个时段调度作业会更加有效。

    在单线程里运行JobSystem#

    IJobParallelForExtensions可以调用Run方法,会将所有的Job放到一个Thread里执行,之前我们提到了Schedule的innerloopBatchCount参数,将它调到和数据源一样大,也是在一个Thread里执行,
    当我们的数据量小于1000,分配线程可能都觉得费劲,用单线程的JobSystem配合Burst效果可能更好。
    需要注意的是,如果我们出现了并行写入问题(多个Thread同时写一个位置),在单线程模式下是不会报错的。

    使用NativeDisableUnsafePtrRestriction#

    打上这个标记后可以在Job里使用Unsafe代码块,使用指针
    有多个好处

    • 可以不需要拷贝数组就把主线程的数据塞进子线程,对数据量大,需要频繁调用的可以考虑
    • 可以包装一些托管内存,比如我这里就包装了一个二维数组,每个containsTriangleIndex其实是一个int的NativeArray

    如果struct里有NativeArray,这个struct放进NativeArray的时候会过不了安全检查。
    我这里是在主线程维护好了这些动态的数组,然后再传进了这个结构的。
    在unsafe代码块里,Native容器相关的API中有GetUnsafePtr可以获得指针。

    SamplePointRayTriangleJob samplePointRayTriangleJob = new SamplePointRayTriangleJob();  
    samplePointRayTriangleJob.meshTriangles = jobMeshTriangles;  
    samplePointRayTriangleJob.randomDirs = jobRandomDirs;  
    samplePointRayTriangleJob.useGrid = useGrid;  
    samplePointRayTriangleJob.allStartPoints = startPoints;  
    samplePointRayTriangleJob.allTriangleBoundsJobDatas = (TriangleBoundsJobData*)triangleBoundsJobDatas.GetUnsafePtr();
    

    NativeDisableParallelForRestriction并行写入#

    打上这个标记后,多个Thread同时数组的同一个地方进行写入,unity不会阻拦,但是自己也要处理好逻辑问题。

    举个例子:下面这篇文章里
    https://blog.csdn.net/n5/article/details/123742777
    在Parallel Job里面进行光栅化三角形时,多个三角形有可能并行访问depth buffer/frame buffer的相同地方。这在多线程编程中属于race conditions,Job system内部会检测出来,会直接报错。

    IndexOutOfRangeException: Index 219108 is out of restricted IJobParallelFor range [4392…4392] in ReadWriteBuffer.
    ReadWriteBuffers are restricted to only read & write the element at the job index. You can use double buffering strategies to avoid race conditions due to reading & writing in parallel to the same elements from a job.

    NativeDisableContainerSafetyRestriction#

    使用这个Attribute可以在子线程分配一块内存,比如我这里每个子线程是创建了一个数组来接受光线三角形求交,一根光线击中了多少个点,一个子任务会执行许多次光线遍历Mesh

    这个主要是博主在Github上学习Unity官方的MeshApiExample项目看到的案例,有点像StaticBatch
    可以查看这个链接:把整个场景的Mesh合并

    DeallocateOnJobCompletion#

    容器在job结束之后自动释放
    这个博主用的很少 基本都是主动释放
    可能在用非并行Job的时候 接受外面的NativeArray后自己不想管释放之类的。
    可以查看一个github上别人的案例看看:案例

    自定义Native容器#

    https://docs.unity3d.com/Manual/job-system-custom-nativecontainer-example.html

    思考#

    JobSystem与ComputeShader相比 优势#

    JobSystem主要是利用CPU来降低计算负载,在数量级上远远比不上GPU,在前面的性能测试中数据到万以上就相当吃力了。
    ComputeShader是利用GPU来降低计算负载,,现在GPU Driven的技术也逐渐越来越多。

    思考这两个的取舍主要应该看业务逻辑的数据流向,如果我们的数据是从CPU发起的,那么在把数据从CPU拷贝到GPU也是肯定是不如在CPU内做拷贝要快的,
    如果我们的计算的数据最后是给CPU做下步计算的,如果用GPU做计算就会出现CPU等GPU的回读问题,数据若停留在GPU,那么ComputeShader自然好。

    另外就是考虑两个后端的硬件特性,CPU高主频,处理复杂的逻辑,大量的循环、分支判断上比GPU要有优势,数量级上则GPU更有优势。

    最后也可以考虑一下易用性问题,如果用到了很多原本在CPU里的数学库,在JobSystem里都是可以直接用的,ComputeShader的话则需要自己实现一版,不过脚手架这种东西属于见仁见智,
    只要自己方便就好。

    2023.3.21
    flyingziming

  • 相关阅读:
    【算法】数学之旅,根据素数特征寻找底数
    岩土工程安全监测中振弦采集仪连接振弦传感器时注意事项
    【开源】Sentinel高性能高可用集群限流解决方案
    [kaldi] alignment 对齐 (音素级和词级)
    Django学习日志09
    echarts 树形图
    森林消防泵柱塞泵工作原理深度解析——恒峰智慧科技
    HJ61 放苹果
    做人力资源选择小公司or大公司?看完这篇再做决定
    1509_人月神话阅读笔记_整体与部分
  • 原文地址:https://www.cnblogs.com/FlyingZiming/p/17241013.html