task.mdc 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382
  1. ---
  2. description: 任务与队列(Task)
  3. globs:
  4. ---
  5. # 任务与队列(Task)
  6. ## 内置任务(代码中配置)
  7. 内置定时任务能力来自于[midwayjs](https://www.midwayjs.org/docs/extensions/cron)
  8. ### 引入组件
  9. ```ts
  10. import { Configuration } from "@midwayjs/core";
  11. import * as cron from "@midwayjs/cron"; // 导入模块
  12. import { join } from "path";
  13. @Configuration({
  14. imports: [cron],
  15. importConfigs: [join(__dirname, "config")],
  16. })
  17. export class AutoConfiguration {}
  18. ```
  19. ### 使用
  20. ```ts
  21. import { Job, IJob } from "@midwayjs/cron";
  22. import { FORMAT } from "@midwayjs/core";
  23. @Job({
  24. cronTime: FORMAT.CRONTAB.EVERY_PER_30_MINUTE,
  25. start: true,
  26. })
  27. export class DataSyncCheckerJob implements IJob {
  28. async onTick() {
  29. // ...
  30. }
  31. }
  32. ```
  33. ```ts
  34. @Job("syncJob", {
  35. cronTime: "*/2 * * * * *", // 每隔 2s 执行
  36. })
  37. export class DataSyncCheckerJob implements IJob {
  38. async onTick() {
  39. // ...
  40. }
  41. }
  42. ```
  43. ### 规则 cron
  44. ```ts
  45. * * * * * *
  46. ┬ ┬ ┬ ┬ ┬ ┬
  47. │ │ │ │ │ |
  48. │ │ │ │ │ └ day of week (0 - 7) (0 or 7 is Sun)
  49. │ │ │ │ └───── month (1 - 12)
  50. │ │ │ └────────── day of month (1 - 31)
  51. │ │ └─────────────── hour (0 - 23)
  52. │ └──────────────────── minute (0 - 59)
  53. └───────────────────────── second (0 - 59, optional)
  54. ```
  55. ::: warning 警告
  56. 注意:该方式在多实例部署的情况下无法做到任务之前的协同,任务存在重复执行的可能
  57. :::
  58. ## 本地任务(管理后台配置,v8.0 新增)
  59. 可以到登录后台`/系统管理/任务管理/任务列表`,配置任务。默认是不需要任何依赖的, 旧版需要依赖`redis`才能使用该功能。
  60. ### 配置任务
  61. 配置完任务可以调用你配置的 service 方法,如:taskDemoService.test()
  62. ### 规则 cron
  63. 规则 cron
  64. ```ts
  65. * * * * * *
  66. ┬ ┬ ┬ ┬ ┬ ┬
  67. │ │ │ │ │ |
  68. │ │ │ │ │ └ day of week (0 - 7) (0 or 7 is Sun)
  69. │ │ │ │ └───── month (1 - 12)
  70. │ │ │ └────────── day of month (1 - 31)
  71. │ │ └─────────────── hour (0 - 23)
  72. │ └──────────────────── minute (0 - 59)
  73. └───────────────────────── second (0 - 59, optional)
  74. ```
  75. 规则示例:
  76. - 每 5 秒执行一次: `*/5 * * * * *`
  77. - 每 5 分钟执行一次: `*/5 * * * *`
  78. - 每小时执行一次: `0 * * * *`
  79. - 每天执行一次: `0 0 * * *`
  80. - 每天 1 点执行: `0 1 * * *`
  81. - 每周执行一次: `0 0 * * 0`
  82. - 每月执行一次: `0 0 1 * *`
  83. ![](/admin/node/task.png)
  84. ## 分布式任务(管理后台配置)
  85. 当需要分布式部署时,需要开启分布式任务,通过 redis 作为协同整个集群的任务,防止任务重复执行等异常情况。
  86. #### 引入插件
  87. `src/configuration.ts`
  88. ```ts
  89. import { Configuration, App } from "@midwayjs/core";
  90. import { join } from "path";
  91. import * as task from "@cool-midway/task";
  92. @Configuration({
  93. imports: [task],
  94. importConfigs: [join(__dirname, "./config")],
  95. })
  96. export class ContainerLifeCycle {
  97. @App()
  98. app: koa.Application;
  99. async onReady() {}
  100. }
  101. ```
  102. #### 配置
  103. [redis>=5.x](https://redis.io/),推荐[redis>=7.x](https://redis.io/)
  104. `src/config/config.default.ts`
  105. ::: warning 注意
  106. 很多人忽略了这个配置,导致项目包 redis 连接错误!!!
  107. :::
  108. ```ts
  109. import { CoolFileConfig, MODETYPE } from "@cool-midway/file";
  110. import { MidwayConfig } from "@midwayjs/core";
  111. import * as fsStore from "cache-manager-fs-hash";
  112. export default {
  113. // 修改成你自己独有的key
  114. keys: "cool-admin for node",
  115. koa: {
  116. port: 8001,
  117. },
  118. // cool配置
  119. cool: {
  120. redis: {
  121. host: "127.0.0.1",
  122. port: 6379,
  123. password: "",
  124. db: 0,
  125. },
  126. },
  127. } as unknown as MidwayConfig;
  128. ```
  129. redis cluster 方式
  130. ```ts
  131. [
  132. {
  133. host: "192.168.0.103",
  134. port: 7000,
  135. },
  136. {
  137. host: "192.168.0.103",
  138. port: 7001,
  139. },
  140. {
  141. host: "192.168.0.103",
  142. port: 7002,
  143. },
  144. {
  145. host: "192.168.0.103",
  146. port: 7003,
  147. },
  148. {
  149. host: "192.168.0.103",
  150. port: 7004,
  151. },
  152. {
  153. host: "192.168.0.103",
  154. port: 7005,
  155. },
  156. ];
  157. ```
  158. ### 创建执行任务的 service
  159. ```ts
  160. import { Provide } from "@midwayjs/core";
  161. import { BaseService } from "@cool-midway/core";
  162. /**
  163. * 任务执行的demo示例
  164. */
  165. @Provide()
  166. export class DemoTaskService extends BaseService {
  167. /**
  168. * 测试任务执行
  169. * @param params 接收的参数 数组 [] 可不传
  170. */
  171. async test(params?: []) {
  172. // 需要登录后台任务管理配置任务
  173. console.log("任务执行了", params);
  174. }
  175. }
  176. ```
  177. ### 配置定时任务
  178. 登录后台 任务管理/任务列表
  179. ![](/admin/node/task.png)
  180. ::: warning
  181. 截图中的 demoTaskService 为上一步执行任务的 service 的实例 ID,midwayjs 默认为类名首字母小写!!!
  182. 任务调度基于 redis,所有的任务都需要通过代码去维护任务的创建,启动,暂停。 所以直接改变数据库的任务状态是无效的,redis 中的信息还未清空, 任务将继续执行。
  183. :::
  184. ## 队列
  185. 之前的分布式任务调度,其实是利用了[bullmq](https://docs.bullmq.io/)的重复队列机制。
  186. 在项目开发过程中特别是较大型、数据量较大、业务较复杂的场景下往往需要用到队列。 如:抢购、批量发送消息、分布式事务、订单 2 小时后失效等。
  187. 得益于[bullmq](https://docs.bullmq.io/),cool 的队列也支持`延迟`、`重复`、`优先级`等高级特性。
  188. ### 创建队列
  189. 一般放在名称为 queue 文件夹下
  190. #### 普通队列
  191. 普通队列数据由消费者自动消费,必须重写 data 方法用于被动消费数据。
  192. `src/modules/demo/queue/comm.ts`
  193. ```ts
  194. import { BaseCoolQueue, CoolQueue } from "@cool-midway/task";
  195. import { IMidwayApplication } from "@midwayjs/core";
  196. import { App } from "@midwayjs/core";
  197. /**
  198. * 普通队列
  199. */
  200. @CoolQueue()
  201. export class DemoCommQueue extends BaseCoolQueue {
  202. @App()
  203. app: IMidwayApplication;
  204. async data(job: any, done: any): Promise<void> {
  205. // 这边可以执行定时任务具体的业务或队列的业务
  206. console.log("数据", job.data);
  207. // 抛出错误 可以让队列重试,默认重试5次
  208. //throw new Error('错误');
  209. done();
  210. }
  211. }
  212. ```
  213. #### 主动队列
  214. 主动队列数据由消费者主动消费
  215. `src/modules/demo/queue/getter.ts`
  216. ```ts
  217. import { BaseCoolQueue, CoolQueue } from "@cool-midway/task";
  218. /**
  219. * 主动消费队列
  220. */
  221. @CoolQueue({ type: "getter" })
  222. export class DemoGetterQueue extends BaseCoolQueue {}
  223. ```
  224. 主动消费数据
  225. ```ts
  226. // 主动消费队列
  227. @Inject()
  228. demoGetterQueue: DemoGetterQueue;
  229. const job = await this.demoGetterQueue.getters.getJobs(['wait'], 0, 0, true);
  230. // 获得完将数据从队列移除
  231. await job[0].remove();
  232. ```
  233. ### 发送数据
  234. ```ts
  235. import { Get, Inject, Post, Provide } from "@midwayjs/core";
  236. import { CoolController, BaseController } from "@cool-midway/core";
  237. import { DemoCommQueue } from "../../queue/comm";
  238. import { DemoGetterQueue } from "../../queue/getter";
  239. /**
  240. * 队列
  241. */
  242. @Provide()
  243. @CoolController()
  244. export class DemoQueueController extends BaseController {
  245. // 普通队列
  246. @Inject()
  247. demoCommQueue: DemoCommQueue;
  248. // 主动消费队列
  249. @Inject()
  250. demoGetterQueue: DemoGetterQueue;
  251. /**
  252. * 发送数据到队列
  253. */
  254. @Post("/add", { summary: "发送队列数据" })
  255. async queue() {
  256. this.demoCommQueue.add({ a: 2 });
  257. return this.ok();
  258. }
  259. /**
  260. * 获得队列中的数据,只有当队列类型为getter时有效
  261. */
  262. @Get("/getter")
  263. async getter() {
  264. const job = await this.demoCommQueue.getters.getJobs(["wait"], 0, 0, true);
  265. // 获得完将数据从队列移除
  266. await job[0].remove();
  267. return this.ok(job[0].data);
  268. }
  269. }
  270. ```
  271. 队列配置
  272. ```ts
  273. interface JobOpts {
  274. priority: number; // Optional priority value. ranges from 1 (highest priority) to MAX_INT (lowest priority). Note that
  275. // using priorities has a slight impact on performance, so do not use it if not required.
  276. delay: number; // An amount of milliseconds to wait until this job can be processed. Note that for accurate delays, both
  277. // server and clients should have their clocks synchronized. [optional].
  278. attempts: number; // The total number of attempts to try the job until it completes.
  279. repeat: RepeatOpts; // Repeat job according to a cron specification.
  280. backoff: number | BackoffOpts; // Backoff setting for automatic retries if the job fails, default strategy: `fixed`
  281. lifo: boolean; // if true, adds the job to the right of the queue instead of the left (default false)
  282. timeout: number; // The number of milliseconds after which the job should be fail with a timeout error [optional]
  283. jobId: number | string; // Override the job ID - by default, the job ID is a unique
  284. // integer, but you can use this setting to override it.
  285. // If you use this option, it is up to you to ensure the
  286. // jobId is unique. If you attempt to add a job with an id that
  287. // already exists, it will not be added.
  288. removeOnComplete: boolean | number; // If true, removes the job when it successfully
  289. // completes. A number specified the amount of jobs to keep. Default behavior is to keep the job in the completed set.
  290. removeOnFail: boolean | number; // If true, removes the job when it fails after all attempts. A number specified the amount of jobs to keep
  291. // Default behavior is to keep the job in the failed set.
  292. stackTraceLimit: number; // Limits the amount of stack trace lines that will be recorded in the stacktrace.
  293. }
  294. ```
  295. ::: tip
  296. this.demoQueue.queue 获得的就是 bull 实例,更多 bull 的高级用户可以查看[bull 文档](https://github.com/OptimalBits/bull/blob/develop/REFERENCE.md)
  297. :::