123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382 |
- ---
- description: 任务与队列(Task)
- globs:
- ---
- # 任务与队列(Task)
- ## 内置任务(代码中配置)
- 内置定时任务能力来自于[midwayjs](https://www.midwayjs.org/docs/extensions/cron)
- ### 引入组件
- ```ts
- import { Configuration } from "@midwayjs/core";
- import * as cron from "@midwayjs/cron"; // 导入模块
- import { join } from "path";
- @Configuration({
- imports: [cron],
- importConfigs: [join(__dirname, "config")],
- })
- export class AutoConfiguration {}
- ```
- ### 使用
- ```ts
- import { Job, IJob } from "@midwayjs/cron";
- import { FORMAT } from "@midwayjs/core";
- @Job({
- cronTime: FORMAT.CRONTAB.EVERY_PER_30_MINUTE,
- start: true,
- })
- export class DataSyncCheckerJob implements IJob {
- async onTick() {
- // ...
- }
- }
- ```
- ```ts
- @Job("syncJob", {
- cronTime: "*/2 * * * * *", // 每隔 2s 执行
- })
- export class DataSyncCheckerJob implements IJob {
- async onTick() {
- // ...
- }
- }
- ```
- ### 规则 cron
- ```ts
- * * * * * *
- ┬ ┬ ┬ ┬ ┬ ┬
- │ │ │ │ │ |
- │ │ │ │ │ └ day of week (0 - 7) (0 or 7 is Sun)
- │ │ │ │ └───── month (1 - 12)
- │ │ │ └────────── day of month (1 - 31)
- │ │ └─────────────── hour (0 - 23)
- │ └──────────────────── minute (0 - 59)
- └───────────────────────── second (0 - 59, optional)
- ```
- ::: warning 警告
- 注意:该方式在多实例部署的情况下无法做到任务之前的协同,任务存在重复执行的可能
- :::
- ## 本地任务(管理后台配置,v8.0 新增)
- 可以到登录后台`/系统管理/任务管理/任务列表`,配置任务。默认是不需要任何依赖的, 旧版需要依赖`redis`才能使用该功能。
- ### 配置任务
- 配置完任务可以调用你配置的 service 方法,如:taskDemoService.test()
- ### 规则 cron
- 规则 cron
- ```ts
- * * * * * *
- ┬ ┬ ┬ ┬ ┬ ┬
- │ │ │ │ │ |
- │ │ │ │ │ └ day of week (0 - 7) (0 or 7 is Sun)
- │ │ │ │ └───── month (1 - 12)
- │ │ │ └────────── day of month (1 - 31)
- │ │ └─────────────── hour (0 - 23)
- │ └──────────────────── minute (0 - 59)
- └───────────────────────── second (0 - 59, optional)
- ```
- 规则示例:
- - 每 5 秒执行一次: `*/5 * * * * *`
- - 每 5 分钟执行一次: `*/5 * * * *`
- - 每小时执行一次: `0 * * * *`
- - 每天执行一次: `0 0 * * *`
- - 每天 1 点执行: `0 1 * * *`
- - 每周执行一次: `0 0 * * 0`
- - 每月执行一次: `0 0 1 * *`
- 
- ## 分布式任务(管理后台配置)
- 当需要分布式部署时,需要开启分布式任务,通过 redis 作为协同整个集群的任务,防止任务重复执行等异常情况。
- #### 引入插件
- `src/configuration.ts`
- ```ts
- import { Configuration, App } from "@midwayjs/core";
- import { join } from "path";
- import * as task from "@cool-midway/task";
- @Configuration({
- imports: [task],
- importConfigs: [join(__dirname, "./config")],
- })
- export class ContainerLifeCycle {
- @App()
- app: koa.Application;
- async onReady() {}
- }
- ```
- #### 配置
- [redis>=5.x](https://redis.io/),推荐[redis>=7.x](https://redis.io/)
- `src/config/config.default.ts`
- ::: warning 注意
- 很多人忽略了这个配置,导致项目包 redis 连接错误!!!
- :::
- ```ts
- import { CoolFileConfig, MODETYPE } from "@cool-midway/file";
- import { MidwayConfig } from "@midwayjs/core";
- import * as fsStore from "cache-manager-fs-hash";
- export default {
- // 修改成你自己独有的key
- keys: "cool-admin for node",
- koa: {
- port: 8001,
- },
- // cool配置
- cool: {
- redis: {
- host: "127.0.0.1",
- port: 6379,
- password: "",
- db: 0,
- },
- },
- } as unknown as MidwayConfig;
- ```
- redis cluster 方式
- ```ts
- [
- {
- host: "192.168.0.103",
- port: 7000,
- },
- {
- host: "192.168.0.103",
- port: 7001,
- },
- {
- host: "192.168.0.103",
- port: 7002,
- },
- {
- host: "192.168.0.103",
- port: 7003,
- },
- {
- host: "192.168.0.103",
- port: 7004,
- },
- {
- host: "192.168.0.103",
- port: 7005,
- },
- ];
- ```
- ### 创建执行任务的 service
- ```ts
- import { Provide } from "@midwayjs/core";
- import { BaseService } from "@cool-midway/core";
- /**
- * 任务执行的demo示例
- */
- @Provide()
- export class DemoTaskService extends BaseService {
- /**
- * 测试任务执行
- * @param params 接收的参数 数组 [] 可不传
- */
- async test(params?: []) {
- // 需要登录后台任务管理配置任务
- console.log("任务执行了", params);
- }
- }
- ```
- ### 配置定时任务
- 登录后台 任务管理/任务列表
- 
- ::: warning
- 截图中的 demoTaskService 为上一步执行任务的 service 的实例 ID,midwayjs 默认为类名首字母小写!!!
- 任务调度基于 redis,所有的任务都需要通过代码去维护任务的创建,启动,暂停。 所以直接改变数据库的任务状态是无效的,redis 中的信息还未清空, 任务将继续执行。
- :::
- ## 队列
- 之前的分布式任务调度,其实是利用了[bullmq](https://docs.bullmq.io/)的重复队列机制。
- 在项目开发过程中特别是较大型、数据量较大、业务较复杂的场景下往往需要用到队列。 如:抢购、批量发送消息、分布式事务、订单 2 小时后失效等。
- 得益于[bullmq](https://docs.bullmq.io/),cool 的队列也支持`延迟`、`重复`、`优先级`等高级特性。
- ### 创建队列
- 一般放在名称为 queue 文件夹下
- #### 普通队列
- 普通队列数据由消费者自动消费,必须重写 data 方法用于被动消费数据。
- `src/modules/demo/queue/comm.ts`
- ```ts
- import { BaseCoolQueue, CoolQueue } from "@cool-midway/task";
- import { IMidwayApplication } from "@midwayjs/core";
- import { App } from "@midwayjs/core";
- /**
- * 普通队列
- */
- @CoolQueue()
- export class DemoCommQueue extends BaseCoolQueue {
- @App()
- app: IMidwayApplication;
- async data(job: any, done: any): Promise<void> {
- // 这边可以执行定时任务具体的业务或队列的业务
- console.log("数据", job.data);
- // 抛出错误 可以让队列重试,默认重试5次
- //throw new Error('错误');
- done();
- }
- }
- ```
- #### 主动队列
- 主动队列数据由消费者主动消费
- `src/modules/demo/queue/getter.ts`
- ```ts
- import { BaseCoolQueue, CoolQueue } from "@cool-midway/task";
- /**
- * 主动消费队列
- */
- @CoolQueue({ type: "getter" })
- export class DemoGetterQueue extends BaseCoolQueue {}
- ```
- 主动消费数据
- ```ts
- // 主动消费队列
- @Inject()
- demoGetterQueue: DemoGetterQueue;
- const job = await this.demoGetterQueue.getters.getJobs(['wait'], 0, 0, true);
- // 获得完将数据从队列移除
- await job[0].remove();
- ```
- ### 发送数据
- ```ts
- import { Get, Inject, Post, Provide } from "@midwayjs/core";
- import { CoolController, BaseController } from "@cool-midway/core";
- import { DemoCommQueue } from "../../queue/comm";
- import { DemoGetterQueue } from "../../queue/getter";
- /**
- * 队列
- */
- @Provide()
- @CoolController()
- export class DemoQueueController extends BaseController {
- // 普通队列
- @Inject()
- demoCommQueue: DemoCommQueue;
- // 主动消费队列
- @Inject()
- demoGetterQueue: DemoGetterQueue;
- /**
- * 发送数据到队列
- */
- @Post("/add", { summary: "发送队列数据" })
- async queue() {
- this.demoCommQueue.add({ a: 2 });
- return this.ok();
- }
- /**
- * 获得队列中的数据,只有当队列类型为getter时有效
- */
- @Get("/getter")
- async getter() {
- const job = await this.demoCommQueue.getters.getJobs(["wait"], 0, 0, true);
- // 获得完将数据从队列移除
- await job[0].remove();
- return this.ok(job[0].data);
- }
- }
- ```
- 队列配置
- ```ts
- interface JobOpts {
- priority: number; // Optional priority value. ranges from 1 (highest priority) to MAX_INT (lowest priority). Note that
- // using priorities has a slight impact on performance, so do not use it if not required.
- delay: number; // An amount of milliseconds to wait until this job can be processed. Note that for accurate delays, both
- // server and clients should have their clocks synchronized. [optional].
- attempts: number; // The total number of attempts to try the job until it completes.
- repeat: RepeatOpts; // Repeat job according to a cron specification.
- backoff: number | BackoffOpts; // Backoff setting for automatic retries if the job fails, default strategy: `fixed`
- lifo: boolean; // if true, adds the job to the right of the queue instead of the left (default false)
- timeout: number; // The number of milliseconds after which the job should be fail with a timeout error [optional]
- jobId: number | string; // Override the job ID - by default, the job ID is a unique
- // integer, but you can use this setting to override it.
- // If you use this option, it is up to you to ensure the
- // jobId is unique. If you attempt to add a job with an id that
- // already exists, it will not be added.
- removeOnComplete: boolean | number; // If true, removes the job when it successfully
- // completes. A number specified the amount of jobs to keep. Default behavior is to keep the job in the completed set.
- removeOnFail: boolean | number; // If true, removes the job when it fails after all attempts. A number specified the amount of jobs to keep
- // Default behavior is to keep the job in the failed set.
- stackTraceLimit: number; // Limits the amount of stack trace lines that will be recorded in the stacktrace.
- }
- ```
- ::: tip
- this.demoQueue.queue 获得的就是 bull 实例,更多 bull 的高级用户可以查看[bull 文档](https://github.com/OptimalBits/bull/blob/develop/REFERENCE.md)
- :::
|