• 【一起学Rust | 进阶篇 | Fang库】后台任务处理库——Fang



    前言

    Fang是一个Rust的后台任务处理库,采用Postgres DB作为任务队列。同时支持Asynk和Blocking任务。Asynk任务采用的是tokio的特性,Worker工作在tokio下。Blocking任务使用的是std::thread,Worker工作在一个单独的线程。


    一、Fang安装

    1. 添加依赖

    添加Fang到你的Cargo.toml文件中

    注意 Fang仅支持rust 1.62+版本

    仅使用Blocking

    [dependencies]
    fang = { version = "0.7" , features = ["blocking"], default-features = false }
    
    • 1
    • 2

    仅使用Asynk

    [dependencies]
    fang = { version = "0.7" , features = ["asynk"], default-features = false }
    
    • 1
    • 2

    同时使用Blocking和Asynk

    fang = { version = "0.7" }
    
    • 1

    2. 创建数据库

    这里需要使用Diesel CLI来完成数据库的迁移,将在后面的文章中介绍

    在你的Postgres DB中创建fang_tasks表,然后运行以下脚本

    CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
    
    CREATE TYPE fang_task_state AS ENUM ('new', 'in_progress', 'failed', 'finished');
    
    CREATE TABLE fang_tasks (
         id uuid PRIMARY KEY DEFAULT uuid_generate_v4(),
         metadata jsonb NOT NULL,
         error_message TEXT,
         state fang_task_state default 'new' NOT NULL,
         task_type VARCHAR default 'common' NOT NULL,
         created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
         updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
    );
    
    CREATE INDEX fang_tasks_state_index ON fang_tasks(state);
    CREATE INDEX fang_tasks_type_index ON fang_tasks(task_type);
    CREATE INDEX fang_tasks_created_at_index ON fang_tasks(created_at);
    CREATE INDEX fang_tasks_metadata_index ON fang_tasks(metadata);
    
    CREATE TABLE fang_periodic_tasks (
      id uuid PRIMARY KEY DEFAULT uuid_generate_v4(),
      metadata jsonb NOT NULL,
      period_in_seconds INTEGER NOT NULL,
      scheduled_at TIMESTAMP WITH TIME ZONE,
      created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
      updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
    );
    
    CREATE INDEX fang_periodic_tasks_scheduled_at_index ON fang_periodic_tasks(scheduled_at);
    CREATE INDEX fang_periodic_tasks_metadata_index ON fang_periodic_tasks(metadata);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    这些文件可以在源码目录migrations中找到,github搜Fang,然后进入下载源码。

    二、使用

    1.定义一个任务

    Blocking任务

    每个要被Fang执行的任务都必须实现fang::Runnable特质,特质实现#[typetag::serde]使之具有反序列化任务的属性。

    use fang::Error;
    use fang::Runnable;
    use fang::typetag;
    use fang::PgConnection;
    use fang::serde::{Deserialize, Serialize};
    
    #[derive(Serialize, Deserialize)]
    #[serde(crate = "fang::serde")]
    struct MyTask {
        pub number: u16,
    }
    
    #[typetag::serde]
    impl Runnable for MyTask {
        fn run(&self, _connection: &PgConnection) -> Result<(), Error> {
            println!("the number is {}", self.number);
    
            Ok(())
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    run函数的第二个参数是PgConnection,你可以重复使用它来操作任务队列,例如在当前作业执行期间添加一个新任务,或者,如果你要复用,可以在自己的查询中重新使用它。如果你不需要它,就忽略它。

    Asynk任务

    每个要被Fang执行的任务都必须实现fang::AsyncRunnable特质

    注意 不要实现两个同名的AsyncRunnable,这会导致typetag失败

    use fang::AsyncRunnable;
    use fang::asynk::async_queue::AsyncQueueable;
    use fang::serde::{Deserialize, Serialize};
    use fang::async_trait;
    
    #[derive(Serialize, Deserialize)]
    #[serde(crate = "fang::serde")]
    struct AsyncTask {
      pub number: u16,
    }
    
    #[typetag::serde]
    #[async_trait]
    impl AsyncRunnable for AsyncTask {
        async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), Error> {
            Ok(())
        }
        // this func is optional to impl
        // Default task-type it is common
        fn task_type(&self) -> String {
            "my-task-type".to_string()
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    2.任务队列

    Blocking任务

    需要使用Queue::enqueue_task来入队列

    use fang::Queue;
    
    ...
    
    Queue::enqueue_task(&MyTask { number: 10 }).unwrap();
    
    • 1
    • 2
    • 3
    • 4
    • 5

    上面的示例在每次调用时都会创建一个新的 postgres 连接

    重用相同的 postgres 连接来将多个任务排入队列

    let queue = Queue::new();
    
    for id in &unsynced_feed_ids {
        queue.push_task(&SyncFeedMyTask { feed_id: *id }).unwrap();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    或者使用PgConnection结构体

    Queue::push_task_query(pg_connection, &new_task).unwrap();
    
    • 1

    Asynk任务

    使用AsyncQueueable::insert_task来入队,可以根据你自己后端来进行操作,默认为Postgres

    use fang::asynk::async_queue::AsyncQueue;
    use fang::NoTls;
    use fang::AsyncRunnable;
    
    // 创建异步队列
    let max_pool_size: u32 = 2;
    
    let mut queue = AsyncQueue::builder()
        // Postgres 数据库 url
        .uri("postgres://postgres:postgres@localhost/fang")
        // 允许的最大连接数控i昂
        .max_pool_size(max_pool_size)
        // 如果希望任务中的唯一性,则为false
        .duplicated_tasks(true)
        .build();
    
    // 要进行操作之前,总是要先连接
    queue.connect(NoTls).await.unwrap();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    举个简单例子我们用的是NoTls,如果你有特殊需求,如果出于某种原因你想加密 postgres 流量。

    let task = AsyncTask { 8 };
    let task_returned = queue
      .insert_task(&task as &dyn AsyncRunnable)
      .await
      .unwrap();
    
    • 1
    • 2
    • 3
    • 4
    • 5

    3. 启动Worker

    Blocking任务

    每个Worker都在一个单独的线程中运行。如果panic,会重新启动。
    使用WorkerPool来启动Worker,WorkerPool::new接收一个整型参数,Worker的数量

    use fang::WorkerPool;
    
    WorkerPool::new(10).start();
    
    • 1
    • 2
    • 3

    使用shutdown停止线程

    
    use fang::WorkerPool;
    
    worker_pool = WorkerPool::new(10).start().unwrap;
    
    worker_pool.shutdown()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    Asynk任务

    每个Worker都在一个单独的 tokio 任务中运行。如果panic,会重新启动。
    使用AsyncWorkerPool来启动Worker

    use fang::asynk::async_worker_pool::AsyncWorkerPool;
    
    // 必须创建一个队列
    // 插入一些任务
    
    let mut pool: AsyncWorkerPool<AsyncQueue<NoTls>> = AsyncWorkerPool::builder()
            .number_of_workers(max_pool_size)
            .queue(queue.clone())
            .build();
    
    pool.start().await;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    4. 配置

    Blocking任务

    在创建Blocking任务任务的时候,默认只能传入Worker数量参数,如果想要进行自定义配置,需要使用WorkerPool.new_with_params来创建,它接受两个参数——工人数量和WorkerParams结构体。

    WorkerParams的定义是这样的

    pub struct WorkerParams {
        pub retention_mode: Option<RetentionMode>,
        pub sleep_params: Option<SleepParams>,
        pub task_type: Option<String>,
    }
    
    pub enum RetentionMode {
        KeepAll,
        RemoveAll,
        RemoveFinished,
    }
    
    pub struct SleepParams {
        pub sleep_period: u64,
        pub max_sleep_period: u64,
        pub min_sleep_period: u64,
        pub sleep_step: u64,
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    Asynk任务

    使用AsyncWorkerPool的builder方法即可。需要链式调用,创建一个AsyncWorkerPool,然后调用.queue(…),.sleep_params(…)(可选),.retention_mode(…)(可选),.number_of_workers(…)配置,最后调用.build()构建对象。

    5. 配置Worker类型

    可以指定Worker类型,来指定指定类型Worker执行指定类型的任务

    Blocking任务

    Runnable特质中添加方法

    ...
    
    #[typetag::serde]
    impl Runnable for MyTask {
        fn run(&self) -> Result<(), Error> {
            println!("the number is {}", self.number);
    
            Ok(())
        }
    
        fn task_type(&self) -> String {
            "number".to_string()
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    设置task_type

    let mut worker_params = WorkerParams::new();
    worker_params.set_task_type("number".to_string());
    
    WorkerPool::new_with_params(10, worker_params).start();
    
    • 1
    • 2
    • 3
    • 4

    没有设置task_type的Worker可以执行任何任务

    Asynk任务

    功能与Blocking任务相同。使用AsyncWorker的builer来设置

    6. 配置保留模式

    默认情况下,所有成功完成的任务都会从数据库中删除,失败的任务不会。可以使用三种保留模式:

    pub enum RetentionMode {
        KeepAll,        \\ 不删除任务
        RemoveAll,      \\ 删除所有任务
        RemoveFinished, \\ 默认值,完成就删除
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    Blocking任务

    使用set_retention_mode设置保留模式

    let mut worker_params = WorkerParams::new();
    worker_params.set_retention_mode(RetentionMode::RemoveAll);
    
    WorkerPool::new_with_params(10, worker_params).start();
    
    • 1
    • 2
    • 3
    • 4

    Asynk任务

    使用AsyncWorker的builder。

    7. 配置睡眠值

    Blocking任务

    使用 useSleepParams来配置睡眠值:

    pub struct SleepParams {
        pub sleep_period: u64,     \\ 默认值 5
        pub max_sleep_period: u64, \\ 默认值 15
        pub min_sleep_period: u64, \\ 默认值 5
        pub sleep_step: u64,       \\ 默认值 5
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    如果数据库中没有任务,则Worker会休眠sleep_period,并且每次该值都会增加sleep_step,直到达到max_sleep_period. min_sleep_period是sleep_period的初始值。所有值都以秒为单位。

    使用set_sleep_params来设置

    let sleep_params = SleepParams {
        sleep_period: 2,
        max_sleep_period: 6,
        min_sleep_period: 2,
        sleep_step: 1,
    };
    let mut worker_params = WorkerParams::new();
    worker_params.set_sleep_params(sleep_params);
    
    WorkerPool::new_with_params(10, worker_params).start();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    Asynk任务

    使用AsyncWorker的builder。

    8. 定时任务

    如果你从头到尾看的本文,那么什么也不需要做,否则你需要创建fang_periodic_tasks表,就在本文安装那个部分。

    Blocking任务

    use fang::Scheduler;
    use fang::Queue;
    
    let queue = Queue::new();
    
    queue
         .push_periodic_task(&SyncMyTask::default(), 120)
         .unwrap();
    
    queue
         .push_periodic_task(&DeliverMyTask::default(), 60)
         .unwrap();
    
    Scheduler::start(10, 5);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    在上面的示例中,push_periodic_task用于将指定的任务保存到表fang_periodic_tasks中,该表将fang_tasks每隔指定的秒数排队(保存到表中)。

    Scheduler::start(10, 5)启动调度程序。它接受两个参数:

    • 数据库检查周期(以秒为单位)
    • 可接受的错误限制(以秒为单位)

    Asynk任务

    use fang::asynk::async_scheduler::Scheduler;
    use fang::asynk::async_queue::AsyncQueueable;
    use fang::asynk::async_queue::AsyncQueue;
    
    // 在此之前构建一个Async队列
    
    let schedule_in_future = Utc::now() + OtherDuration::seconds(5);
    
    let _periodic_task = queue.insert_periodic_task(
        &AsyncTask { number: 1 },
        schedule_in_future,
        10,
    )
    .await;
    
    let check_period: u64 = 1;
    let error_margin_seconds: u64 = 2;
    
    let mut scheduler = Scheduler::builder()
        .check_period(check_period)
        .error_margin_seconds(error_margin_seconds)
        .queue(&mut queue as &mut dyn AsyncQueueable)
        .build();
    
    // 在其他线程或循环之前添加更多任务
    
    // 调度程序循环
    scheduler.start().await.unwrap();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    总结

    以上就是本文的所有内容,介绍了Rust中借助Fang库来实现后台任务,进行后台任务的处理,还有定时任务,配置等。

  • 相关阅读:
    Clion新增子模块库代码跳转
    Docker安装Yapi
    yarn 设置淘宝镜像配置
    多线程知识汇总
    源码深度剖析SpringBoot启动流程中开启OpenFeign的入口(ImportBeanDefinitionRegistrar详解)
    互联网被裁的程序员,未来有什么方向呢?
    11.25日常总结
    注册阿里云账号,免费领取云服务器!
    Oracle Automatic Database Diagnostic Monitor (ADDM) 学习笔记
    王道考研操作系统——I/O管理
  • 原文地址:https://blog.csdn.net/weixin_47754149/article/details/126230426