diff --git a/.gitignore b/.gitignore index 712d9fec6..a98d5b998 100644 --- a/.gitignore +++ b/.gitignore @@ -34,7 +34,6 @@ coverage # misskey built -db elasticsearch redis meili_data diff --git a/.pnp.cjs b/.pnp.cjs index 2802ec0fe..68a836cb6 100755 --- a/.pnp.cjs +++ b/.pnp.cjs @@ -1029,6 +1029,27 @@ const RAW_RUNTIME_STATE = ],\ "linkType": "SOFT"\ }],\ + ["npm:6.0.0", {\ + "packageLocation": "./.yarn/cache/@bull-board-api-npm-6.0.0-78af39dbec-922e46f2c5.zip/node_modules/@bull-board/api/",\ + "packageDependencies": [\ + ["@bull-board/api", "npm:6.0.0"]\ + ],\ + "linkType": "SOFT"\ + }],\ + ["virtual:1a204444c80de4bcde4565c72a3b0d6571f4eb3f918d7d48d8cd7ddf8392bb94fcf59c7a20ab1f49717ca9715358d5e7cac35936b84d89d5579286a3dc50f306#npm:6.0.0", {\ + "packageLocation": "./.yarn/__virtual__/@bull-board-api-virtual-0339be2ffb/0/cache/@bull-board-api-npm-6.0.0-78af39dbec-922e46f2c5.zip/node_modules/@bull-board/api/",\ + "packageDependencies": [\ + ["@bull-board/api", "virtual:1a204444c80de4bcde4565c72a3b0d6571f4eb3f918d7d48d8cd7ddf8392bb94fcf59c7a20ab1f49717ca9715358d5e7cac35936b84d89d5579286a3dc50f306#npm:6.0.0"],\ + ["@bull-board/ui", "npm:6.0.0"],\ + ["@types/bull-board__ui", null],\ + ["redis-info", "npm:3.1.0"]\ + ],\ + "packagePeers": [\ + "@bull-board/ui",\ + "@types/bull-board__ui"\ + ],\ + "linkType": "HARD"\ + }],\ ["virtual:6d3c013820dba430e71ebb352cb5205445a13ea3c7a848f57a7ff58fb0d6469fe4d374280277dac42cb77a6dbf8e924e64f2f0b3413c28a02da9d890c199e6d7#npm:5.6.0", {\ "packageLocation": "./.yarn/__virtual__/@bull-board-api-virtual-7db979021f/0/cache/@bull-board-api-npm-5.6.0-a1466ed4c8-951b27f057.zip/node_modules/@bull-board/api/",\ "packageDependencies": [\ @@ -1045,18 +1066,18 @@ const RAW_RUNTIME_STATE = }]\ ]],\ ["@bull-board/koa", [\ - ["npm:5.6.0", {\ - "packageLocation": "./.yarn/cache/@bull-board-koa-npm-5.6.0-9bfe284953-54aef8b937.zip/node_modules/@bull-board/koa/",\ + ["npm:6.0.0", {\ + "packageLocation": "./.yarn/cache/@bull-board-koa-npm-6.0.0-47e7552797-0d1bac34e8.zip/node_modules/@bull-board/koa/",\ "packageDependencies": [\ - ["@bull-board/koa", "npm:5.6.0"],\ - ["@bull-board/api", "virtual:6d3c013820dba430e71ebb352cb5205445a13ea3c7a848f57a7ff58fb0d6469fe4d374280277dac42cb77a6dbf8e924e64f2f0b3413c28a02da9d890c199e6d7#npm:5.6.0"],\ - ["@bull-board/ui", "npm:5.6.0"],\ - ["ejs", "npm:3.1.9"],\ + ["@bull-board/koa", "npm:6.0.0"],\ + ["@bull-board/api", "virtual:1a204444c80de4bcde4565c72a3b0d6571f4eb3f918d7d48d8cd7ddf8392bb94fcf59c7a20ab1f49717ca9715358d5e7cac35936b84d89d5579286a3dc50f306#npm:6.0.0"],\ + ["@bull-board/ui", "npm:6.0.0"],\ + ["ejs", "npm:3.1.10"],\ ["koa", "npm:2.14.2"],\ ["koa-mount", "npm:4.0.0"],\ ["koa-router", "npm:10.1.1"],\ ["koa-static", "npm:5.0.0"],\ - ["koa-views", "virtual:9bfe284953271243c18a986cde1b0b605f8cc92af0dd289bd01addbff3c00d207acc42c3b36ff53fa242e7954e50922cd3937d7d58025dbb42ac550d5824bf67#npm:7.0.2"]\ + ["koa-views", "virtual:47e7552797ac54a9c2afddff4b4e04d9d332a7c4ae9063d663783ce2e037b701b60b632afdcf95d0a9161f0abbca22d39846d9260e4899a8593e974d45bbf1e0#npm:7.0.2"]\ ],\ "linkType": "HARD"\ }]\ @@ -1069,6 +1090,14 @@ const RAW_RUNTIME_STATE = ["@bull-board/api", "virtual:6d3c013820dba430e71ebb352cb5205445a13ea3c7a848f57a7ff58fb0d6469fe4d374280277dac42cb77a6dbf8e924e64f2f0b3413c28a02da9d890c199e6d7#npm:5.6.0"]\ ],\ "linkType": "HARD"\ + }],\ + ["npm:6.0.0", {\ + "packageLocation": "./.yarn/cache/@bull-board-ui-npm-6.0.0-1a204444c8-6d083063a3.zip/node_modules/@bull-board/ui/",\ + "packageDependencies": [\ + ["@bull-board/ui", "npm:6.0.0"],\ + ["@bull-board/api", "virtual:1a204444c80de4bcde4565c72a3b0d6571f4eb3f918d7d48d8cd7ddf8392bb94fcf59c7a20ab1f49717ca9715358d5e7cac35936b84d89d5579286a3dc50f306#npm:6.0.0"]\ + ],\ + "linkType": "HARD"\ }]\ ]],\ ["@colors/colors", [\ @@ -7136,9 +7165,9 @@ const RAW_RUNTIME_STATE = "packageLocation": "./packages/backend/",\ "packageDependencies": [\ ["backend", "workspace:packages/backend"],\ - ["@bull-board/api", "virtual:6d3c013820dba430e71ebb352cb5205445a13ea3c7a848f57a7ff58fb0d6469fe4d374280277dac42cb77a6dbf8e924e64f2f0b3413c28a02da9d890c199e6d7#npm:5.6.0"],\ - ["@bull-board/koa", "npm:5.6.0"],\ - ["@bull-board/ui", "npm:5.6.0"],\ + ["@bull-board/api", "virtual:1a204444c80de4bcde4565c72a3b0d6571f4eb3f918d7d48d8cd7ddf8392bb94fcf59c7a20ab1f49717ca9715358d5e7cac35936b84d89d5579286a3dc50f306#npm:6.0.0"],\ + ["@bull-board/koa", "npm:6.0.0"],\ + ["@bull-board/ui", "npm:6.0.0"],\ ["@discordapp/twemoji", "npm:14.1.2"],\ ["@iceshrimp/summaly", "npm:2.7.2::__archiveUrl=https%3A%2F%2Ficeshrimp.dev%2Fapi%2Fpackages%2Ficeshrimp%2Fnpm%2F%2540iceshrimp%252Fsummaly%2F-%2F2.7.2%2Fsummaly-2.7.2.tgz"],\ ["@koa/cors", "npm:3.4.3"],\ @@ -7209,7 +7238,7 @@ const RAW_RUNTIME_STATE = ["axios", "npm:1.4.0"],\ ["bcryptjs", "npm:2.4.3"],\ ["blurhash", "npm:2.0.5"],\ - ["bull", "npm:4.10.4"],\ + ["bullmq", "npm:5.16.0"],\ ["cacheable-lookup", "npm:7.0.0"],\ ["cbor", "npm:8.1.0"],\ ["chalk", "npm:5.3.0"],\ @@ -7234,7 +7263,7 @@ const RAW_RUNTIME_STATE = ["happy-dom", "npm:12.10.3"],\ ["hpagent", "npm:0.1.2"],\ ["iceshrimp-sdk", "workspace:packages/iceshrimp-sdk"],\ - ["ioredis", "npm:5.3.2"],\ + ["ioredis", "npm:5.4.1"],\ ["ip-cidr", "npm:3.1.0"],\ ["is-svg", "npm:4.3.2"],\ ["js-yaml", "npm:4.1.0"],\ @@ -7803,19 +7832,18 @@ const RAW_RUNTIME_STATE = "linkType": "HARD"\ }]\ ]],\ - ["bull", [\ - ["npm:4.10.4", {\ - "packageLocation": "./.yarn/cache/bull-npm-4.10.4-3465f09e40-8a99097317.zip/node_modules/bull/",\ + ["bullmq", [\ + ["npm:5.16.0", {\ + "packageLocation": "./.yarn/cache/bullmq-npm-5.16.0-fe493a4098-39febf6e4a.zip/node_modules/bullmq/",\ "packageDependencies": [\ - ["bull", "npm:4.10.4"],\ - ["cron-parser", "npm:4.8.1"],\ - ["debuglog", "npm:1.0.1"],\ - ["get-port", "npm:5.1.1"],\ - ["ioredis", "npm:5.3.2"],\ - ["lodash", "npm:4.17.21"],\ - ["msgpackr", "npm:1.9.5"],\ + ["bullmq", "npm:5.16.0"],\ + ["cron-parser", "npm:4.9.0"],\ + ["ioredis", "npm:5.4.1"],\ + ["msgpackr", "npm:1.11.0"],\ + ["node-abort-controller", "npm:3.1.1"],\ ["semver", "npm:7.5.4"],\ - ["uuid", "npm:8.3.2"]\ + ["tslib", "npm:2.7.0"],\ + ["uuid", "npm:9.0.0"]\ ],\ "linkType": "HARD"\ }]\ @@ -9093,10 +9121,10 @@ const RAW_RUNTIME_STATE = ],\ "linkType": "SOFT"\ }],\ - ["virtual:92dc05b84fde30e8037028575c990319f08cb08b0698e442e4c8d1ac4aceb7d666cb7b6454e308d767b637ef0226cb4e16e0c82a358c214878c7185041270440#npm:0.16.0", {\ - "packageLocation": "./.yarn/__virtual__/consolidate-virtual-185da77412/0/cache/consolidate-npm-0.16.0-1a9b3c81f9-74b9bc2f1c.zip/node_modules/consolidate/",\ + ["virtual:86ee2a8ef0b659363be6b28aece63a5c4d251db21e8e56fe2ea2e03233d27c1fc9c744ff2a904e3dbda7f6a5fedaa3054e931f4e747f5b9deb58af95c389db98#npm:0.16.0", {\ + "packageLocation": "./.yarn/__virtual__/consolidate-virtual-be4eb30fd6/0/cache/consolidate-npm-0.16.0-1a9b3c81f9-74b9bc2f1c.zip/node_modules/consolidate/",\ "packageDependencies": [\ - ["consolidate", "virtual:92dc05b84fde30e8037028575c990319f08cb08b0698e442e4c8d1ac4aceb7d666cb7b6454e308d767b637ef0226cb4e16e0c82a358c214878c7185041270440#npm:0.16.0"],\ + ["consolidate", "virtual:86ee2a8ef0b659363be6b28aece63a5c4d251db21e8e56fe2ea2e03233d27c1fc9c744ff2a904e3dbda7f6a5fedaa3054e931f4e747f5b9deb58af95c389db98#npm:0.16.0"],\ ["@types/arc-templates", null],\ ["@types/atpl", null],\ ["@types/babel-core", null],\ @@ -9448,10 +9476,10 @@ const RAW_RUNTIME_STATE = }]\ ]],\ ["cron-parser", [\ - ["npm:4.8.1", {\ - "packageLocation": "./.yarn/cache/cron-parser-npm-4.8.1-53e673fffa-5deb3f8216.zip/node_modules/cron-parser/",\ + ["npm:4.9.0", {\ + "packageLocation": "./.yarn/cache/cron-parser-npm-4.9.0-2a573f98e9-ffca5e532a.zip/node_modules/cron-parser/",\ "packageDependencies": [\ - ["cron-parser", "npm:4.8.1"],\ + ["cron-parser", "npm:4.9.0"],\ ["luxon", "npm:3.3.0"]\ ],\ "linkType": "HARD"\ @@ -9896,15 +9924,6 @@ const RAW_RUNTIME_STATE = "linkType": "HARD"\ }]\ ]],\ - ["debuglog", [\ - ["npm:1.0.1", {\ - "packageLocation": "./.yarn/cache/debuglog-npm-1.0.1-c553c84ea5-942a319695.zip/node_modules/debuglog/",\ - "packageDependencies": [\ - ["debuglog", "npm:1.0.1"]\ - ],\ - "linkType": "HARD"\ - }]\ - ]],\ ["decamelize", [\ ["npm:1.2.0", {\ "packageLocation": "./.yarn/cache/decamelize-npm-1.2.0-c5a2fdc622-ad8c51a7e7.zip/node_modules/decamelize/",\ @@ -10593,6 +10612,14 @@ const RAW_RUNTIME_STATE = }]\ ]],\ ["ejs", [\ + ["npm:3.1.10", {\ + "packageLocation": "./.yarn/cache/ejs-npm-3.1.10-4e8cf4bdc1-a9cb7d7cd1.zip/node_modules/ejs/",\ + "packageDependencies": [\ + ["ejs", "npm:3.1.10"],\ + ["jake", "npm:10.8.7"]\ + ],\ + "linkType": "HARD"\ + }],\ ["npm:3.1.9", {\ "packageLocation": "./.yarn/cache/ejs-npm-3.1.9-e201b2088c-71f56d3754.zip/node_modules/ejs/",\ "packageDependencies": [\ @@ -13093,15 +13120,6 @@ const RAW_RUNTIME_STATE = "linkType": "HARD"\ }]\ ]],\ - ["get-port", [\ - ["npm:5.1.1", {\ - "packageLocation": "./.yarn/cache/get-port-npm-5.1.1-2f6074007a-0162663ffe.zip/node_modules/get-port/",\ - "packageDependencies": [\ - ["get-port", "npm:5.1.1"]\ - ],\ - "linkType": "HARD"\ - }]\ - ]],\ ["get-stream", [\ ["npm:2.3.1", {\ "packageLocation": "./.yarn/cache/get-stream-npm-2.3.1-1755f3cab9-712738e6a3.zip/node_modules/get-stream/",\ @@ -14423,10 +14441,10 @@ const RAW_RUNTIME_STATE = }]\ ]],\ ["ioredis", [\ - ["npm:5.3.2", {\ - "packageLocation": "./.yarn/cache/ioredis-npm-5.3.2-58471071b1-0140f055ef.zip/node_modules/ioredis/",\ + ["npm:5.4.1", {\ + "packageLocation": "./.yarn/cache/ioredis-npm-5.4.1-c96e18ae67-9043b812ac.zip/node_modules/ioredis/",\ "packageDependencies": [\ - ["ioredis", "npm:5.3.2"],\ + ["ioredis", "npm:5.4.1"],\ ["@ioredis/commands", "npm:1.2.0"],\ ["cluster-key-slot", "npm:1.1.2"],\ ["debug", "virtual:ac3d8e680759ce54399273724d44e041d6c9b73454d191d411a8c44bb27e22f02aaf6ed9d3ad0ac1c298eac4833cff369c9c7b84c573016112c4f84be2cd8543#npm:4.3.4"],\ @@ -16822,12 +16840,12 @@ const RAW_RUNTIME_STATE = ],\ "linkType": "SOFT"\ }],\ - ["virtual:9bfe284953271243c18a986cde1b0b605f8cc92af0dd289bd01addbff3c00d207acc42c3b36ff53fa242e7954e50922cd3937d7d58025dbb42ac550d5824bf67#npm:7.0.2", {\ - "packageLocation": "./.yarn/__virtual__/koa-views-virtual-92dc05b84f/0/cache/koa-views-npm-7.0.2-f4a5c0091b-edff754c9f.zip/node_modules/koa-views/",\ + ["virtual:47e7552797ac54a9c2afddff4b4e04d9d332a7c4ae9063d663783ce2e037b701b60b632afdcf95d0a9161f0abbca22d39846d9260e4899a8593e974d45bbf1e0#npm:7.0.2", {\ + "packageLocation": "./.yarn/__virtual__/koa-views-virtual-86ee2a8ef0/0/cache/koa-views-npm-7.0.2-f4a5c0091b-edff754c9f.zip/node_modules/koa-views/",\ "packageDependencies": [\ - ["koa-views", "virtual:9bfe284953271243c18a986cde1b0b605f8cc92af0dd289bd01addbff3c00d207acc42c3b36ff53fa242e7954e50922cd3937d7d58025dbb42ac550d5824bf67#npm:7.0.2"],\ + ["koa-views", "virtual:47e7552797ac54a9c2afddff4b4e04d9d332a7c4ae9063d663783ce2e037b701b60b632afdcf95d0a9161f0abbca22d39846d9260e4899a8593e974d45bbf1e0#npm:7.0.2"],\ ["@types/koa", null],\ - ["consolidate", "virtual:92dc05b84fde30e8037028575c990319f08cb08b0698e442e4c8d1ac4aceb7d666cb7b6454e308d767b637ef0226cb4e16e0c82a358c214878c7185041270440#npm:0.16.0"],\ + ["consolidate", "virtual:86ee2a8ef0b659363be6b28aece63a5c4d251db21e8e56fe2ea2e03233d27c1fc9c744ff2a904e3dbda7f6a5fedaa3054e931f4e747f5b9deb58af95c389db98#npm:0.16.0"],\ ["debug", "virtual:ac3d8e680759ce54399273724d44e041d6c9b73454d191d411a8c44bb27e22f02aaf6ed9d3ad0ac1c298eac4833cff369c9c7b84c573016112c4f84be2cd8543#npm:4.3.4"],\ ["get-paths", "npm:0.0.7"],\ ["koa-send", "npm:5.0.1"],\ @@ -16845,7 +16863,7 @@ const RAW_RUNTIME_STATE = "packageDependencies": [\ ["koa-views", "virtual:aa59773ac87791c4813d53447077fcf8a847d6de5a301d34dc31286584b1dbb26d30d3adb5b4c41c1e8aea04371e926fda05c09c6253647c432e11d872a304ba#npm:7.0.2"],\ ["@types/koa", "npm:2.13.6"],\ - ["consolidate", "virtual:92dc05b84fde30e8037028575c990319f08cb08b0698e442e4c8d1ac4aceb7d666cb7b6454e308d767b637ef0226cb4e16e0c82a358c214878c7185041270440#npm:0.16.0"],\ + ["consolidate", "virtual:86ee2a8ef0b659363be6b28aece63a5c4d251db21e8e56fe2ea2e03233d27c1fc9c744ff2a904e3dbda7f6a5fedaa3054e931f4e747f5b9deb58af95c389db98#npm:0.16.0"],\ ["debug", "virtual:ac3d8e680759ce54399273724d44e041d6c9b73454d191d411a8c44bb27e22f02aaf6ed9d3ad0ac1c298eac4833cff369c9c7b84c573016112c4f84be2cd8543#npm:4.3.4"],\ ["get-paths", "npm:0.0.7"],\ ["koa-send", "npm:5.0.1"],\ @@ -18107,6 +18125,14 @@ const RAW_RUNTIME_STATE = }]\ ]],\ ["msgpackr", [\ + ["npm:1.11.0", {\ + "packageLocation": "./.yarn/cache/msgpackr-npm-1.11.0-c075b2537e-e95edf511a.zip/node_modules/msgpackr/",\ + "packageDependencies": [\ + ["msgpackr", "npm:1.11.0"],\ + ["msgpackr-extract", "npm:3.0.2"]\ + ],\ + "linkType": "HARD"\ + }],\ ["npm:1.9.5", {\ "packageLocation": "./.yarn/cache/msgpackr-npm-1.9.5-69f0e8f5b8-d95fbee39b.zip/node_modules/msgpackr/",\ "packageDependencies": [\ @@ -18300,6 +18326,15 @@ const RAW_RUNTIME_STATE = "linkType": "HARD"\ }]\ ]],\ + ["node-abort-controller", [\ + ["npm:3.1.1", {\ + "packageLocation": "./.yarn/cache/node-abort-controller-npm-3.1.1-e246ed42cd-0a2cdb7ec0.zip/node_modules/node-abort-controller/",\ + "packageDependencies": [\ + ["node-abort-controller", "npm:3.1.1"]\ + ],\ + "linkType": "HARD"\ + }]\ + ]],\ ["node-addon-api", [\ ["npm:5.1.0", {\ "packageLocation": "./.yarn/unplugged/node-addon-api-npm-5.1.0-b50d00f739/node_modules/node-addon-api/",\ @@ -21142,7 +21177,7 @@ const RAW_RUNTIME_STATE = ["redis-semaphore", "virtual:aa59773ac87791c4813d53447077fcf8a847d6de5a301d34dc31286584b1dbb26d30d3adb5b4c41c1e8aea04371e926fda05c09c6253647c432e11d872a304ba#npm:5.3.1"],\ ["@types/ioredis", null],\ ["debug", "virtual:ac3d8e680759ce54399273724d44e041d6c9b73454d191d411a8c44bb27e22f02aaf6ed9d3ad0ac1c298eac4833cff369c9c7b84c573016112c4f84be2cd8543#npm:4.3.4"],\ - ["ioredis", "npm:5.3.2"]\ + ["ioredis", "npm:5.4.1"]\ ],\ "packagePeers": [\ "@types/ioredis",\ @@ -23900,6 +23935,13 @@ const RAW_RUNTIME_STATE = ["tslib", "npm:2.6.2"]\ ],\ "linkType": "HARD"\ + }],\ + ["npm:2.7.0", {\ + "packageLocation": "./.yarn/cache/tslib-npm-2.7.0-21668f5c21-9a5b47ddac.zip/node_modules/tslib/",\ + "packageDependencies": [\ + ["tslib", "npm:2.7.0"]\ + ],\ + "linkType": "HARD"\ }]\ ]],\ ["tsscmp", [\ @@ -24171,7 +24213,7 @@ const RAW_RUNTIME_STATE = ["dotenv", "npm:16.3.1"],\ ["glob", "npm:8.1.0"],\ ["hdb-pool", null],\ - ["ioredis", "npm:5.3.2"],\ + ["ioredis", "npm:5.4.1"],\ ["mkdirp", "npm:2.1.6"],\ ["mongodb", null],\ ["mssql", null],\ diff --git a/.yarn/cache/@bull-board-api-npm-6.0.0-78af39dbec-922e46f2c5.zip b/.yarn/cache/@bull-board-api-npm-6.0.0-78af39dbec-922e46f2c5.zip new file mode 100644 index 000000000..35d6ff6f2 --- /dev/null +++ b/.yarn/cache/@bull-board-api-npm-6.0.0-78af39dbec-922e46f2c5.zip @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:e9c73db82e0ba55256f0070fc178fd1b2c914e5633109aba22d27ba4df3bd354 +size 46484 diff --git a/.yarn/cache/@bull-board-koa-npm-5.6.0-9bfe284953-54aef8b937.zip b/.yarn/cache/@bull-board-koa-npm-5.6.0-9bfe284953-54aef8b937.zip deleted file mode 100644 index ef1ca3e88..000000000 --- a/.yarn/cache/@bull-board-koa-npm-5.6.0-9bfe284953-54aef8b937.zip +++ /dev/null @@ -1,3 +0,0 @@ -version https://git-lfs.github.com/spec/v1 -oid sha256:191914ba23a575444b1b80981509c07c49c1a9715ab7cec8ec655318d5fcb62b -size 5621 diff --git a/.yarn/cache/@bull-board-koa-npm-6.0.0-47e7552797-0d1bac34e8.zip b/.yarn/cache/@bull-board-koa-npm-6.0.0-47e7552797-0d1bac34e8.zip new file mode 100644 index 000000000..c7a05fadb --- /dev/null +++ b/.yarn/cache/@bull-board-koa-npm-6.0.0-47e7552797-0d1bac34e8.zip @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:18f00a957a9a0a0abdbb050e2705a4d9710ebdf8cc859c9cc13b2803a29b5d31 +size 5652 diff --git a/.yarn/cache/@bull-board-ui-npm-6.0.0-1a204444c8-6d083063a3.zip b/.yarn/cache/@bull-board-ui-npm-6.0.0-1a204444c8-6d083063a3.zip new file mode 100644 index 000000000..34b720327 --- /dev/null +++ b/.yarn/cache/@bull-board-ui-npm-6.0.0-1a204444c8-6d083063a3.zip @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:e4d46610c76408f1cf7b7ba10bf1e1b43cc4ae713fae15390c64ce184427e888 +size 1151166 diff --git a/.yarn/cache/bull-npm-4.10.4-3465f09e40-8a99097317.zip b/.yarn/cache/bull-npm-4.10.4-3465f09e40-8a99097317.zip deleted file mode 100644 index 3fd8e120d..000000000 --- a/.yarn/cache/bull-npm-4.10.4-3465f09e40-8a99097317.zip +++ /dev/null @@ -1,3 +0,0 @@ -version https://git-lfs.github.com/spec/v1 -oid sha256:670f483e64c5e13843d259dc3d88cb1d636fe1e995b9f86c698d016b51328254 -size 80309 diff --git a/.yarn/cache/bullmq-npm-5.16.0-fe493a4098-39febf6e4a.zip b/.yarn/cache/bullmq-npm-5.16.0-fe493a4098-39febf6e4a.zip new file mode 100644 index 000000000..71e50a358 --- /dev/null +++ b/.yarn/cache/bullmq-npm-5.16.0-fe493a4098-39febf6e4a.zip @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:ff67d6013ac99a18e78969ffd267a989364af4204ee0b5542aa0e45e1ad43899 +size 668616 diff --git a/.yarn/cache/cron-parser-npm-4.8.1-53e673fffa-5deb3f8216.zip b/.yarn/cache/cron-parser-npm-4.8.1-53e673fffa-5deb3f8216.zip deleted file mode 100644 index 628febda2..000000000 --- a/.yarn/cache/cron-parser-npm-4.8.1-53e673fffa-5deb3f8216.zip +++ /dev/null @@ -1,3 +0,0 @@ -version https://git-lfs.github.com/spec/v1 -oid sha256:3b02b0cd5ad2b4508a5c5946f7724351c3e6c000172c11b0a6e3e957b99d835d -size 36352 diff --git a/.yarn/cache/cron-parser-npm-4.9.0-2a573f98e9-ffca5e532a.zip b/.yarn/cache/cron-parser-npm-4.9.0-2a573f98e9-ffca5e532a.zip new file mode 100644 index 000000000..940207761 --- /dev/null +++ b/.yarn/cache/cron-parser-npm-4.9.0-2a573f98e9-ffca5e532a.zip @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:23072578db81c6c70abe64d41bce6a8baf7f81838c1d49ba6ed76329552cf860 +size 17361 diff --git a/.yarn/cache/debuglog-npm-1.0.1-c553c84ea5-942a319695.zip b/.yarn/cache/debuglog-npm-1.0.1-c553c84ea5-942a319695.zip deleted file mode 100644 index 43c82be09..000000000 --- a/.yarn/cache/debuglog-npm-1.0.1-c553c84ea5-942a319695.zip +++ /dev/null @@ -1,3 +0,0 @@ -version https://git-lfs.github.com/spec/v1 -oid sha256:d8075bb1439b687a080b9d33c3d2bb4aa3b3fc90a22239c4b3fc6b924f2bdde5 -size 2612 diff --git a/.yarn/cache/ejs-npm-3.1.10-4e8cf4bdc1-a9cb7d7cd1.zip b/.yarn/cache/ejs-npm-3.1.10-4e8cf4bdc1-a9cb7d7cd1.zip new file mode 100644 index 000000000..ec2f8fe19 --- /dev/null +++ b/.yarn/cache/ejs-npm-3.1.10-4e8cf4bdc1-a9cb7d7cd1.zip @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:9cb13c8bbf3aa79b70e4e157d4855e3cf21d627d4fa995f8a1211d0c8cae8b54 +size 47501 diff --git a/.yarn/cache/get-port-npm-5.1.1-2f6074007a-0162663ffe.zip b/.yarn/cache/get-port-npm-5.1.1-2f6074007a-0162663ffe.zip deleted file mode 100644 index 87e54ba59..000000000 --- a/.yarn/cache/get-port-npm-5.1.1-2f6074007a-0162663ffe.zip +++ /dev/null @@ -1,3 +0,0 @@ -version https://git-lfs.github.com/spec/v1 -oid sha256:15e9dcc31aa95b26c1adf7e3e1c08772d8bab0604f567005e46bcfd20df9df57 -size 4875 diff --git a/.yarn/cache/ioredis-npm-5.3.2-58471071b1-0140f055ef.zip b/.yarn/cache/ioredis-npm-5.3.2-58471071b1-0140f055ef.zip deleted file mode 100644 index ff80ba178..000000000 --- a/.yarn/cache/ioredis-npm-5.3.2-58471071b1-0140f055ef.zip +++ /dev/null @@ -1,3 +0,0 @@ -version https://git-lfs.github.com/spec/v1 -oid sha256:605cbe4bbad49a073adcd7fa90edd537912a98f2152abbde98ddd73c0f987f2a -size 138174 diff --git a/.yarn/cache/ioredis-npm-5.4.1-c96e18ae67-9043b812ac.zip b/.yarn/cache/ioredis-npm-5.4.1-c96e18ae67-9043b812ac.zip new file mode 100644 index 000000000..5a6348ee3 --- /dev/null +++ b/.yarn/cache/ioredis-npm-5.4.1-c96e18ae67-9043b812ac.zip @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:5a0c1828d5cace6a3a035ea95315d2d2607bd7cf008bacdb93b0020592e20f1b +size 138153 diff --git a/.yarn/cache/msgpackr-npm-1.11.0-c075b2537e-e95edf511a.zip b/.yarn/cache/msgpackr-npm-1.11.0-c075b2537e-e95edf511a.zip new file mode 100644 index 000000000..6afe550f0 --- /dev/null +++ b/.yarn/cache/msgpackr-npm-1.11.0-c075b2537e-e95edf511a.zip @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:6137fe5bb7a88a2277a4701624d086739efbf764552f3ac8d496a8b2bff10d9a +size 421467 diff --git a/.yarn/cache/node-abort-controller-npm-3.1.1-e246ed42cd-0a2cdb7ec0.zip b/.yarn/cache/node-abort-controller-npm-3.1.1-e246ed42cd-0a2cdb7ec0.zip new file mode 100644 index 000000000..8db814211 --- /dev/null +++ b/.yarn/cache/node-abort-controller-npm-3.1.1-e246ed42cd-0a2cdb7ec0.zip @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:81c60b3a7e57dcdc07db256a408180ce8fb4b0f936b918758d6efa65dfff09de +size 9551 diff --git a/.yarn/cache/tslib-npm-2.7.0-21668f5c21-9a5b47ddac.zip b/.yarn/cache/tslib-npm-2.7.0-21668f5c21-9a5b47ddac.zip new file mode 100644 index 000000000..bf132241b --- /dev/null +++ b/.yarn/cache/tslib-npm-2.7.0-21668f5c21-9a5b47ddac.zip @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:f3f9ecac0553575163487f51cded797c1722c8a953ada4cacf33736e26540ae1 +size 27094 diff --git a/packages/backend/package.json b/packages/backend/package.json index fa96642bf..f898ae03f 100644 --- a/packages/backend/package.json +++ b/packages/backend/package.json @@ -23,9 +23,9 @@ "@types/formidable": "^2.0.5" }, "dependencies": { - "@bull-board/api": "5.6.0", - "@bull-board/koa": "5.6.0", - "@bull-board/ui": "5.6.0", + "@bull-board/api": "6.0.0", + "@bull-board/koa": "6.0.0", + "@bull-board/ui": "6.0.0", "@discordapp/twemoji": "14.1.2", "@iceshrimp/summaly": "2.7.2", "@koa/cors": "3.4.3", @@ -47,7 +47,7 @@ "axios": "^1.4.0", "bcryptjs": "2.4.3", "blurhash": "2.0.5", - "bull": "4.10.4", + "bullmq": "5.16.0", "cacheable-lookup": "7.0.0", "cbor": "8.1.0", "chalk": "5.3.0", @@ -69,7 +69,7 @@ "happy-dom": "^12.10.3", "hpagent": "0.1.2", "iceshrimp-sdk": "workspace:*", - "ioredis": "5.3.2", + "ioredis": "5.4.1", "ip-cidr": "3.1.0", "is-svg": "4.3.2", "js-yaml": "4.1.0", diff --git a/packages/backend/src/queue/get-job-info.ts b/packages/backend/src/queue/get-job-info.ts deleted file mode 100644 index ae3532cda..000000000 --- a/packages/backend/src/queue/get-job-info.ts +++ /dev/null @@ -1,18 +0,0 @@ -import type Bull from "bull"; - -export function getJobInfo(job: Bull.Job, increment = false) { - const age = Date.now() - job.timestamp; - - const formated = - age > 60000 - ? `${Math.floor(age / 1000 / 60)}m` - : age > 10000 - ? `${Math.floor(age / 1000)}s` - : `${age}ms`; - - // onActiveとかonCompletedのattemptsMadeがなぜか0始まりなのでインクリメントする - const currentAttempts = job.attemptsMade + (increment ? 1 : 0); - const maxAttempts = job.opts ? job.opts.attempts : 0; - - return `id=${job.id} attempts=${currentAttempts}/${maxAttempts} age=${formated}`; -} diff --git a/packages/backend/src/queue/index.ts b/packages/backend/src/queue/index.ts index 787fa848f..1ec02cba2 100644 --- a/packages/backend/src/queue/index.ts +++ b/packages/backend/src/queue/index.ts @@ -1,617 +1,56 @@ -import type httpSignature from "@peertube/http-signature"; -import { v4 as uuid } from "uuid"; +import { deliverQueue, inboxQueue } from "./queues.js"; +import { dbInit } from "./queues/db/index.js"; +import { deliverInit, deliverLogger } from "./queues/deliver.js"; +import { endedPollNotificationInit } from "./queues/ended-poll-notification.js"; +import { inboxInit, inboxLogger } from "./queues/inbox.js"; +import { objectStorageInit } from "./queues/object-storage/index.js"; +import { systemInit } from "./queues/system/index.js"; +import { webhookDeliverInit } from "./queues/webhook-deliver.js"; -import config from "@/config/index.js"; -import type { DriveFile } from "@/models/entities/drive-file.js"; -import type { IActivity } from "@/remote/activitypub/type.js"; -import type { Webhook, webhookEventTypes } from "@/models/entities/webhook.js"; -import { envOption } from "../env.js"; +export { + createDeleteDriveFilesJob, + createExportCustomEmojisJob, + createExportNotesJob, + createExportFollowingJob, + createExportMuteJob, + createExportBlockingJob, + createExportUserListsJob, + createImportFollowingJob, + createImportMutingJob, + createImportBlockingJob, + createImportUserListsJob, + createImportCustomEmojisJob, + createDeleteAccountJob, +} from "./queues/db/index.js"; +export { + createDeleteObjectStorageFileJob, + createCleanRemoteFilesJob, +} from "./queues/object-storage/index.js"; +export { systemQueue } from "./queues/system/index.js"; +export { deliverJob as deliver, deliverQueue } from "./queues/deliver.js"; +export { endedPollNotificationQueue } from "./queues/ended-poll-notification.js"; +export { inboxJob as inbox, inboxQueue } from "./queues/inbox.js"; +export { webhookDeliverJob as webhookDeliver, webhookDeliverQueue } from "./queues/webhook-deliver.js"; -import processDeliver from "./processors/deliver.js"; -import processInbox from "./processors/inbox.js"; -import processDb from "./processors/db/index.js"; -import processObjectStorage from "./processors/object-storage/index.js"; -import processSystemQueue from "./processors/system/index.js"; -import processWebhookDeliver from "./processors/webhook-deliver.js"; -import processBackground from "./processors/background/index.js"; -import { endedPollNotification } from "./processors/ended-poll-notification.js"; -import { queueLogger } from "./logger.js"; -import { getJobInfo } from "./get-job-info.js"; -import { - systemQueue, - dbQueue, - deliverQueue, - inboxQueue, - objectStorageQueue, - endedPollNotificationQueue, - webhookDeliverQueue, - backgroundQueue, -} from "./queues.js"; -import type { ThinUser } from "./types.js"; - -function renderError(e: Error): any { - return { - stack: e.stack, - message: e.message, - name: e.name, - }; -} - -const systemLogger = queueLogger.createSubLogger("system"); -const deliverLogger = queueLogger.createSubLogger("deliver"); -const webhookLogger = queueLogger.createSubLogger("webhook"); -const inboxLogger = queueLogger.createSubLogger("inbox"); -const dbLogger = queueLogger.createSubLogger("db"); -const objectStorageLogger = queueLogger.createSubLogger("objectStorage"); - -systemQueue - .on("waiting", (jobId) => systemLogger.debug(`waiting id=${jobId}`)) - .on("active", (job) => systemLogger.debug(`active id=${job.id}`)) - .on("completed", (job, result) => - systemLogger.debug(`completed(${result}) id=${job.id}`), - ) - .on("failed", (job, err) => - systemLogger.warn(`failed(${err}) id=${job.id}`, { - job, - e: renderError(err), - }), - ) - .on("error", (job: any, err: Error) => - systemLogger.error(`error ${err}`, { job, e: renderError(err) }), - ) - .on("stalled", (job) => systemLogger.warn(`stalled id=${job.id}`)); - -deliverQueue - .on("waiting", (jobId) => deliverLogger.debug(`waiting id=${jobId}`)) - .on("active", (job) => - deliverLogger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`), - ) - .on("completed", (job, result) => - deliverLogger.debug( - `completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`, - ), - ) - .on("failed", (job, err) => - deliverLogger.warn(`failed(${err}) ${getJobInfo(job)} to=${job.data.to}`), - ) - .on("error", (job: any, err: Error) => - deliverLogger.error(`error ${err}`, { job, e: renderError(err) }), - ) - .on("stalled", (job) => - deliverLogger.warn(`stalled ${getJobInfo(job)} to=${job.data.to}`), - ); - -inboxQueue - .on("waiting", (jobId) => inboxLogger.debug(`waiting id=${jobId}`)) - .on("active", (job) => inboxLogger.debug(`active ${getJobInfo(job, true)}`)) - .on("completed", (job, result) => - inboxLogger.debug(`completed(${result}) ${getJobInfo(job, true)}`), - ) - .on("failed", (job, err) => - inboxLogger.warn( - `failed(${err}) ${getJobInfo(job)} activity=${ - job.data.activity ? job.data.activity.id : "none" - }`, - { job, e: renderError(err) }, - ), - ) - .on("error", (job: any, err: Error) => - inboxLogger.error(`error ${err}`, { job, e: renderError(err) }), - ) - .on("stalled", (job) => - inboxLogger.warn( - `stalled ${getJobInfo(job)} activity=${ - job.data.activity ? job.data.activity.id : "none" - }`, - ), - ); - -dbQueue - .on("waiting", (jobId) => dbLogger.debug(`waiting id=${jobId}`)) - .on("active", (job) => dbLogger.debug(`active id=${job.id}`)) - .on("completed", (job, result) => - dbLogger.debug(`completed(${result}) id=${job.id}`), - ) - .on("failed", (job, err) => - dbLogger.warn(`failed(${err}) id=${job.id}`, { job, e: renderError(err) }), - ) - .on("error", (job: any, err: Error) => - dbLogger.error(`error ${err}`, { job, e: renderError(err) }), - ) - .on("stalled", (job) => dbLogger.warn(`stalled id=${job.id}`)); - -objectStorageQueue - .on("waiting", (jobId) => objectStorageLogger.debug(`waiting id=${jobId}`)) - .on("active", (job) => objectStorageLogger.debug(`active id=${job.id}`)) - .on("completed", (job, result) => - objectStorageLogger.debug(`completed(${result}) id=${job.id}`), - ) - .on("failed", (job, err) => - objectStorageLogger.warn(`failed(${err}) id=${job.id}`, { - job, - e: renderError(err), - }), - ) - .on("error", (job: any, err: Error) => - objectStorageLogger.error(`error ${err}`, { job, e: renderError(err) }), - ) - .on("stalled", (job) => objectStorageLogger.warn(`stalled id=${job.id}`)); - -webhookDeliverQueue - .on("waiting", (jobId) => webhookLogger.debug(`waiting id=${jobId}`)) - .on("active", (job) => - webhookLogger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`), - ) - .on("completed", (job, result) => - webhookLogger.debug( - `completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`, - ), - ) - .on("failed", (job, err) => - webhookLogger.warn(`failed(${err}) ${getJobInfo(job)} to=${job.data.to}`), - ) - .on("error", (job: any, err: Error) => - webhookLogger.error(`error ${err}`, { job, e: renderError(err) }), - ) - .on("stalled", (job) => - webhookLogger.warn(`stalled ${getJobInfo(job)} to=${job.data.to}`), - ); - -export function deliver(user: ThinUser, content: unknown, to: string | null) { - if (content == null) return null; - if (to == null) return null; - - const data = { - user: { - id: user.id, - }, - content, - to, - }; - - return deliverQueue.add(data, { - attempts: config.deliverJobMaxAttempts || 12, - timeout: 1 * 60 * 1000, // 1min - backoff: { - type: "apBackoff", - }, - removeOnComplete: true, - removeOnFail: true, - }); -} - -export function inbox( - activity: IActivity, - signature: httpSignature.IParsedSignature, -) { - const data = { - activity: activity, - signature, - }; - - return inboxQueue.add(data, { - attempts: config.inboxJobMaxAttempts || 8, - timeout: 5 * 60 * 1000, // 5min - backoff: { - type: "apBackoff", - }, - removeOnComplete: true, - removeOnFail: true, - }); -} - -export function createDeleteDriveFilesJob(user: ThinUser) { - return dbQueue.add( - "deleteDriveFiles", - { - user: user, - }, - { - removeOnComplete: true, - removeOnFail: true, - }, - ); -} - -export function createExportCustomEmojisJob(user: ThinUser) { - return dbQueue.add( - "exportCustomEmojis", - { - user: user, - }, - { - removeOnComplete: true, - removeOnFail: true, - }, - ); -} - -export function createExportNotesJob(user: ThinUser) { - return dbQueue.add( - "exportNotes", - { - user: user, - }, - { - removeOnComplete: true, - removeOnFail: true, - }, - ); -} - -export function createExportFollowingJob( - user: ThinUser, - excludeMuting = false, - excludeInactive = false, -) { - return dbQueue.add( - "exportFollowing", - { - user: user, - excludeMuting, - excludeInactive, - }, - { - removeOnComplete: true, - removeOnFail: true, - }, - ); -} - -export function createExportMuteJob(user: ThinUser) { - return dbQueue.add( - "exportMute", - { - user: user, - }, - { - removeOnComplete: true, - removeOnFail: true, - }, - ); -} - -export function createExportBlockingJob(user: ThinUser) { - return dbQueue.add( - "exportBlocking", - { - user: user, - }, - { - removeOnComplete: true, - removeOnFail: true, - }, - ); -} - -export function createExportUserListsJob(user: ThinUser) { - return dbQueue.add( - "exportUserLists", - { - user: user, - }, - { - removeOnComplete: true, - removeOnFail: true, - }, - ); -} - -export function createImportFollowingJob( - user: ThinUser, - fileId: DriveFile["id"], -) { - return dbQueue.add( - "importFollowing", - { - user: user, - fileId: fileId, - }, - { - removeOnComplete: true, - removeOnFail: true, - }, - ); -} - -export function createImportPostsJob( - user: ThinUser, - fileId: DriveFile["id"], - signatureCheck: boolean, -) { - return dbQueue.add( - "importPosts", - { - user: user, - fileId: fileId, - signatureCheck: signatureCheck, - }, - { - removeOnComplete: true, - removeOnFail: true, - }, - ); -} - -export function createImportMastoPostJob( - user: ThinUser, - post: any, - signatureCheck: boolean, -) { - return dbQueue.add( - "importMastoPost", - { - user: user, - post: post, - signatureCheck: signatureCheck, - }, - { - removeOnComplete: true, - removeOnFail: true, - attempts: config.inboxJobMaxAttempts || 8, - timeout: 60 * 1000, // 1min - }, - ); -} - -export function createImportCkPostJob( - user: ThinUser, - post: any, - signatureCheck: boolean, -) { - return dbQueue.add( - "importCkPost", - { - user: user, - post: post, - signatureCheck: signatureCheck, - }, - { - removeOnComplete: true, - removeOnFail: true, - }, - ); -} - -export function createImportMutingJob(user: ThinUser, fileId: DriveFile["id"]) { - return dbQueue.add( - "importMuting", - { - user: user, - fileId: fileId, - }, - { - removeOnComplete: true, - removeOnFail: true, - }, - ); -} - -export function createImportBlockingJob( - user: ThinUser, - fileId: DriveFile["id"], -) { - return dbQueue.add( - "importBlocking", - { - user: user, - fileId: fileId, - }, - { - removeOnComplete: true, - removeOnFail: true, - }, - ); -} - -export function createImportUserListsJob( - user: ThinUser, - fileId: DriveFile["id"], -) { - return dbQueue.add( - "importUserLists", - { - user: user, - fileId: fileId, - }, - { - removeOnComplete: true, - removeOnFail: true, - }, - ); -} - -export function createImportCustomEmojisJob( - user: ThinUser, - fileId: DriveFile["id"], -) { - return dbQueue.add( - "importCustomEmojis", - { - user: user, - fileId: fileId, - }, - { - removeOnComplete: true, - removeOnFail: true, - }, - ); -} - -export function createDeleteAccountJob( - user: ThinUser, - opts: { soft?: boolean } = {}, -) { - return dbQueue.add( - "deleteAccount", - { - user: user, - soft: opts.soft, - }, - { - removeOnComplete: true, - removeOnFail: true, - }, - ); -} - -export function createDeleteObjectStorageFileJob(key: string) { - return objectStorageQueue.add( - "deleteFile", - { - key: key, - }, - { - removeOnComplete: true, - removeOnFail: true, - }, - ); -} - -export function createCleanRemoteFilesJob() { - return objectStorageQueue.add( - "cleanRemoteFiles", - {}, - { - removeOnComplete: true, - removeOnFail: true, - }, - ); -} - -export function createIndexAllNotesJob(data = {}) { - return backgroundQueue.add("indexAllNotes", data, { - removeOnComplete: true, - removeOnFail: false, - timeout: 1000 * 60 * 60 * 24, - }); -} - -export function webhookDeliver( - webhook: Webhook, - type: typeof webhookEventTypes[number], - content: unknown, -) { - const data = { - type, - content, - webhookId: webhook.id, - userId: webhook.userId, - to: webhook.url, - secret: webhook.secret, - createdAt: Date.now(), - eventId: uuid(), - }; - - return webhookDeliverQueue.add(data, { - attempts: 4, - timeout: 1 * 60 * 1000, // 1min - backoff: { - type: "apBackoff", - }, - removeOnComplete: true, - removeOnFail: true, - }); -} - -export default function () { - if (envOption.onlyServer) return; - - deliverQueue.process(config.deliverJobConcurrency || 128, processDeliver); - inboxQueue.process(config.inboxJobConcurrency || 16, processInbox); - endedPollNotificationQueue.process(endedPollNotification); - webhookDeliverQueue.process(64, processWebhookDeliver); - processDb(dbQueue); - - if (config.mediaCleanup?.cron) { - objectStorageQueue.add( - "cleanRemoteFiles", - {}, - { - repeat: { - cron: "0 0 * * *", - }, - removeOnComplete: true, - removeOnFail: true, - }, - ); - } - - processObjectStorage(objectStorageQueue); - processBackground(backgroundQueue); - - systemQueue.add( - "tickCharts", - {}, - { - repeat: { cron: "55 * * * *" }, - removeOnComplete: true, - }, - ); - - systemQueue.add( - "resyncCharts", - {}, - { - repeat: { cron: "0 0 * * *" }, - removeOnComplete: true, - }, - ); - - systemQueue.add( - "cleanCharts", - {}, - { - repeat: { cron: "0 0 * * *" }, - removeOnComplete: true, - }, - ); - - systemQueue.add( - "clean", - {}, - { - repeat: { cron: "0 0 * * *" }, - removeOnComplete: true, - }, - ); - - systemQueue.add( - "checkExpiredMutings", - {}, - { - repeat: { cron: "*/5 * * * *" }, - removeOnComplete: true, - }, - ); - - systemQueue.add( - "setLocalEmojiSizes", - {}, - { removeOnComplete: true, removeOnFail: true }, - ); - - systemQueue.add( - "verifyLinks", - {}, - { - repeat: { cron: "0 0 * * 0" }, - removeOnComplete: true, - removeOnFail: true, - }, - ); - - processSystemQueue(systemQueue); -} +export default async function () { + // initialize queue workers + await dbInit(); + await objectStorageInit(); + await systemInit(); + await deliverInit(); + await endedPollNotificationInit(); + await inboxInit(); + await webhookDeliverInit(); +}; export function destroy() { deliverQueue.once("cleaned", (jobs, status) => { deliverLogger.succ(`Cleaned ${jobs.length} ${status} jobs`); }); - deliverQueue.clean(0, "delayed"); + deliverQueue.clean(0, Infinity, "delayed"); inboxQueue.once("cleaned", (jobs, status) => { inboxLogger.succ(`Cleaned ${jobs.length} ${status} jobs`); }); - inboxQueue.clean(0, "delayed"); + inboxQueue.clean(0, Infinity, "delayed"); } diff --git a/packages/backend/src/queue/initialize.ts b/packages/backend/src/queue/initialize.ts deleted file mode 100644 index 16e623d13..000000000 --- a/packages/backend/src/queue/initialize.ts +++ /dev/null @@ -1,41 +0,0 @@ -import Bull from "bull"; -import config from "@/config/index.js"; - -export function initialize(name: string, limitPerSec = -1) { - return new Bull(name, { - redis: { - port: config.redis.port, - host: config.redis.host, - family: config.redis.family == null ? 0 : config.redis.family, - username: config.redis.user ?? "default", - password: config.redis.pass, - db: config.redis.db || 0, - tls: config.redis.tls, - }, - prefix: config.redis.prefix ? `${config.redis.prefix}:queue` : "queue", - limiter: - limitPerSec > 0 - ? { - max: limitPerSec, - duration: 1000, - } - : undefined, - settings: { - stalledInterval: 60, - maxStalledCount: 2, - backoffStrategies: { - apBackoff, - }, - }, - }); -} - -// ref. https://github.com/misskey-dev/misskey/pull/7635#issue-971097019 -function apBackoff(attemptsMade: number, err: Error) { - const baseDelay = 60 * 1000; // 1min - const maxBackoff = 8 * 60 * 60 * 1000; // 8hours - let backoff = (Math.pow(2, attemptsMade) - 1) * baseDelay; - backoff = Math.min(backoff, maxBackoff); - backoff += Math.round(backoff * Math.random() * 0.2); - return backoff; -} diff --git a/packages/backend/src/queue/logger.ts b/packages/backend/src/queue/logger.ts index 929c207e3..c55dd5e2d 100644 --- a/packages/backend/src/queue/logger.ts +++ b/packages/backend/src/queue/logger.ts @@ -1,3 +1,11 @@ import Logger from "@/services/logger.js"; export const queueLogger = new Logger("queue", "orange"); + +export function renderError(e: Error): any { + return { + stack: e.stack, + message: e.message, + name: e.name, + }; +} diff --git a/packages/backend/src/queue/processors/background/index.ts b/packages/backend/src/queue/processors/background/index.ts deleted file mode 100644 index ac3d1d32e..000000000 --- a/packages/backend/src/queue/processors/background/index.ts +++ /dev/null @@ -1,12 +0,0 @@ -import type Bull from "bull"; -import { noop } from "@/queue/processors/noop.js"; - -const jobs = {} as Record>>; - -export default function (q: Bull.Queue) { - for (const [k, v] of Object.entries(jobs)) { - q.process(k, 16, v); - } - - q.process(noop); -} diff --git a/packages/backend/src/queue/processors/db/import-custom-emojis.ts b/packages/backend/src/queue/processors/db/import-custom-emojis.ts deleted file mode 100644 index 9e8b3b174..000000000 --- a/packages/backend/src/queue/processors/db/import-custom-emojis.ts +++ /dev/null @@ -1,150 +0,0 @@ -import type Bull from "bull"; -import * as fs from "node:fs"; -import AdmZip from "adm-zip"; - -import { queueLogger } from "../../logger.js"; -import { createTempDir } from "@/misc/create-temp.js"; -import { downloadUrl } from "@/misc/download-url.js"; -import { DriveFiles, Emojis } from "@/models/index.js"; -import type { DbUserImportJobData } from "@/queue/types.js"; -import { addFile } from "@/services/drive/add-file.js"; -import { genId } from "@/misc/gen-id.js"; -import { db } from "@/db/postgre.js"; -import probeImageSize from "probe-image-size"; -import * as path from "path"; - -const logger = queueLogger.createSubLogger("import-custom-emojis"); - -// TODO: 名前衝突時の動作を選べるようにする -export async function importCustomEmojis( - job: Bull.Job, - done: any, -): Promise { - logger.info("Importing custom emojis ..."); - - const file = await DriveFiles.findOneBy({ - id: job.data.fileId, - }); - if (file == null) { - done(); - return; - } - - const [tempPath, cleanup] = await createTempDir(); - - logger.info(`Temp dir is ${tempPath}`); - - const destPath = `${tempPath}/emojis.zip`; - - try { - fs.writeFileSync(destPath, "", "binary"); - await downloadUrl(file.url, destPath); - } catch (e) { - // TODO: 何度か再試行 - if (e instanceof Error || typeof e === "string") { - logger.error(e); - } - throw e; - } - - const outputPath = `${tempPath}/emojis`; - const unzipStream = fs.createReadStream(destPath); - const zip = new AdmZip(destPath); - zip.extractAllToAsync(outputPath, true, false, async (error) => { - if (error) throw error; - - if (fs.existsSync(`${outputPath}/meta.json`)) { - logger.info("starting emoji import with metadata"); - const metaRaw = fs.readFileSync(`${outputPath}/meta.json`, "utf-8"); - const meta = JSON.parse(metaRaw); - - for (const record of meta.emojis) { - if (!record.downloaded) continue; - const emojiInfo = record.emoji; - const emojiPath = `${outputPath}/${record.fileName}`; - await Emojis.delete({ - name: emojiInfo.name, - }); - const driveFile = await addFile({ - user: null, - path: emojiPath, - name: record.fileName, - force: true, - }); - const file = fs.createReadStream(emojiPath); - const size = await probeImageSize(file); - file.destroy(); - await Emojis.insert({ - id: genId(), - updatedAt: new Date(), - name: emojiInfo.name, - category: emojiInfo.category, - host: null, - aliases: emojiInfo.aliases, - originalUrl: driveFile.url, - publicUrl: driveFile.webpublicUrl ?? driveFile.url, - type: driveFile.webpublicType ?? driveFile.type, - license: emojiInfo.license, - width: size.width || null, - height: size.height || null, - }).then((x) => Emojis.findOneByOrFail(x.identifiers[0])); - } - } else { - logger.info("starting emoji import without metadata"); - // Since we lack metadata, we import into a randomized category name instead - let categoryName = genId(); - - let containedEmojis = fs.readdirSync(outputPath); - - // Filter out accidental JSON files - containedEmojis = containedEmojis.filter( - (emoji) => !emoji.match(/\.(json)$/i), - ); - - for (const emojiFilename of containedEmojis) { - // strip extension and get filename to use as name - const name = path.basename(emojiFilename, path.extname(emojiFilename)); - const emojiPath = `${outputPath}/${emojiFilename}`; - - logger.info(`importing ${name}`); - - await Emojis.delete({ - name: name, - }); - const driveFile = await addFile({ - user: null, - path: emojiPath, - name: path.basename(emojiFilename), - force: true, - }); - const file = fs.createReadStream(emojiPath); - const size = await probeImageSize(file); - file.destroy(); - logger.info(`emoji size: ${size.width}x${size.height}`); - - await Emojis.insert({ - id: genId(), - updatedAt: new Date(), - name: name, - category: categoryName, - host: null, - aliases: [], - originalUrl: driveFile.url, - publicUrl: driveFile.webpublicUrl ?? driveFile.url, - type: driveFile.webpublicType ?? driveFile.type, - license: null, - width: size.width || null, - height: size.height || null, - }).then((x) => Emojis.findOneByOrFail(x.identifiers[0])); - } - } - - await db.queryResultCache!.remove(["meta_emojis"]); - - cleanup(); - - logger.succ("Imported"); - done(); - }); - logger.succ(`Unzipping to ${outputPath}`); -} diff --git a/packages/backend/src/queue/processors/db/import-firefish-post.ts b/packages/backend/src/queue/processors/db/import-firefish-post.ts deleted file mode 100644 index c7a6fd7f0..000000000 --- a/packages/backend/src/queue/processors/db/import-firefish-post.ts +++ /dev/null @@ -1,15 +0,0 @@ -import * as Post from "@/misc/post.js"; -import create from "@/services/note/create.js"; -import { Users } from "@/models/index.js"; -import type { DbUserImportMastoPostJobData } from "@/queue/types.js"; -import { queueLogger } from "../../logger.js"; -import type Bull from "bull"; - -const logger = queueLogger.createSubLogger("import-firefish-post"); - -export async function importCkPost( - job: Bull.Job, - done: any, -): Promise { - done(); -} diff --git a/packages/backend/src/queue/processors/db/import-masto-post.ts b/packages/backend/src/queue/processors/db/import-masto-post.ts deleted file mode 100644 index c6ac7346b..000000000 --- a/packages/backend/src/queue/processors/db/import-masto-post.ts +++ /dev/null @@ -1,19 +0,0 @@ -import create from "@/services/note/create.js"; -import { Users } from "@/models/index.js"; -import type { DbUserImportMastoPostJobData } from "@/queue/types.js"; -import { queueLogger } from "../../logger.js"; -import type Bull from "bull"; -import { htmlToMfm } from "@/remote/activitypub/misc/html-to-mfm.js"; -import { resolveNote } from "@/remote/activitypub/models/note.js"; -import { Note } from "@/models/entities/note.js"; -import { uploadFromUrl } from "@/services/drive/upload-from-url.js"; -import type { DriveFile } from "@/models/entities/drive-file.js"; - -const logger = queueLogger.createSubLogger("import-masto-post"); - -export async function importMastoPost( - job: Bull.Job, - done: any, -): Promise { - done(); -} diff --git a/packages/backend/src/queue/processors/db/import-posts.ts b/packages/backend/src/queue/processors/db/import-posts.ts deleted file mode 100644 index 9bde7479e..000000000 --- a/packages/backend/src/queue/processors/db/import-posts.ts +++ /dev/null @@ -1,76 +0,0 @@ -import { downloadTextFile } from "@/misc/download-text-file.js"; -import { processMastoNotes } from "@/misc/process-masto-notes.js"; -import { Users, DriveFiles } from "@/models/index.js"; -import type { DbUserImportPostsJobData } from "@/queue/types.js"; -import { queueLogger } from "../../logger.js"; -import type Bull from "bull"; -import { - createImportCkPostJob, - createImportMastoPostJob, -} from "@/queue/index.js"; - -const logger = queueLogger.createSubLogger("import-posts"); - -export async function importPosts( - job: Bull.Job, - done: any, -): Promise { - logger.info(`Importing posts of ${job.data.user.id} ...`); - - const user = await Users.findOneBy({ id: job.data.user.id }); - if (user == null) { - done(); - return; - } - - const file = await DriveFiles.findOneBy({ - id: job.data.fileId, - }); - if (file == null) { - done(); - return; - } - - if (file.name.endsWith("tar.gz") || file.name.endsWith("zip")) { - try { - logger.info("Reading Mastodon archive"); - const outbox = await processMastoNotes( - file.name, - file.url, - job.data.user.id, - ); - for (const post of outbox.orderedItems) { - createImportMastoPostJob(job.data.user, post, job.data.signatureCheck); - } - } catch (e) { - // handle error - logger.warn(`Failed reading Mastodon archive: ${e}`); - } - logger.succ("Mastodon archive imported"); - done(); - return; - } - - const json = await downloadTextFile(file.url); - - try { - const parsed = JSON.parse(json); - if (parsed instanceof Array) { - logger.info("Parsing key style posts"); - for (const post of JSON.parse(json)) { - createImportCkPostJob(job.data.user, post, job.data.signatureCheck); - } - } else if (parsed instanceof Object) { - logger.info("Parsing animal style posts"); - for (const post of parsed.orderedItems) { - createImportMastoPostJob(job.data.user, post, job.data.signatureCheck); - } - } - } catch (e) { - // handle error - logger.warn(`Error reading: ${e}`); - } - - logger.succ("Imported"); - done(); -} diff --git a/packages/backend/src/queue/processors/db/index.ts b/packages/backend/src/queue/processors/db/index.ts deleted file mode 100644 index 9100d458f..000000000 --- a/packages/backend/src/queue/processors/db/index.ts +++ /dev/null @@ -1,50 +0,0 @@ -import type Bull from "bull"; -import type { DbJobData } from "@/queue/types.js"; -import { deleteDriveFiles } from "./delete-drive-files.js"; -import { exportCustomEmojis } from "./export-custom-emojis.js"; -import { exportNotes } from "./export-notes.js"; -import { exportFollowing } from "./export-following.js"; -import { exportMute } from "./export-mute.js"; -import { exportBlocking } from "./export-blocking.js"; -import { exportUserLists } from "./export-user-lists.js"; -import { importFollowing } from "./import-following.js"; -import { importUserLists } from "./import-user-lists.js"; -import { deleteAccount } from "./delete-account.js"; -import { importMuting } from "./import-muting.js"; -import { importPosts } from "./import-posts.js"; -import { importMastoPost } from "./import-masto-post.js"; -import { importCkPost } from "./import-firefish-post.js"; -import { importBlocking } from "./import-blocking.js"; -import { importCustomEmojis } from "./import-custom-emojis.js"; -import { noop } from "@/queue/processors/noop.js"; - -const jobs = { - deleteDriveFiles, - exportCustomEmojis, - exportNotes, - exportFollowing, - exportMute, - exportBlocking, - exportUserLists, - importFollowing, - importMuting, - importBlocking, - importUserLists, - importPosts, - importMastoPost, - importCkPost, - importCustomEmojis, - deleteAccount, -} as Record< - string, - | Bull.ProcessCallbackFunction - | Bull.ProcessPromiseFunction ->; - -export default function (dbQueue: Bull.Queue) { - for (const [k, v] of Object.entries(jobs)) { - dbQueue.process(k, v); - } - - dbQueue.process(noop); -} diff --git a/packages/backend/src/queue/processors/noop.ts b/packages/backend/src/queue/processors/noop.ts deleted file mode 100644 index 575a75209..000000000 --- a/packages/backend/src/queue/processors/noop.ts +++ /dev/null @@ -1,7 +0,0 @@ -import Bull from "bull"; - -// Processor to be registered for jobs with __default__ (unnamed) handlers in queues that only have named handlers -// Prevents sporadic bogus jobs from clogging up the queues -export async function noop(job: Bull.Job): Promise { - job.opts.removeOnComplete = true; -} diff --git a/packages/backend/src/queue/processors/object-storage/index.ts b/packages/backend/src/queue/processors/object-storage/index.ts deleted file mode 100644 index 1c24f5244..000000000 --- a/packages/backend/src/queue/processors/object-storage/index.ts +++ /dev/null @@ -1,22 +0,0 @@ -import type Bull from "bull"; -import type { ObjectStorageJobData } from "@/queue/types.js"; -import deleteFile from "./delete-file.js"; -import cleanRemoteFiles from "./clean-remote-files.js"; -import { noop } from "@/queue/processors/noop.js"; - -const jobs = { - deleteFile, - cleanRemoteFiles, -} as Record< - string, - | Bull.ProcessCallbackFunction - | Bull.ProcessPromiseFunction ->; - -export default function (q: Bull.Queue) { - for (const [k, v] of Object.entries(jobs)) { - q.process(k, 16, v); - } - - q.process(noop); -} diff --git a/packages/backend/src/queue/processors/system/index.ts b/packages/backend/src/queue/processors/system/index.ts deleted file mode 100644 index 2ff2c636e..000000000 --- a/packages/backend/src/queue/processors/system/index.ts +++ /dev/null @@ -1,31 +0,0 @@ -import type Bull from "bull"; -import { tickCharts } from "./tick-charts.js"; -import { resyncCharts } from "./resync-charts.js"; -import { cleanCharts } from "./clean-charts.js"; -import { checkExpiredMutings } from "./check-expired-mutings.js"; -import { clean } from "./clean.js"; -import { setLocalEmojiSizes } from "./local-emoji-size.js"; -import { verifyLinks } from "./verify-links.js"; -import { noop } from "@/queue/processors/noop.js"; - -const jobs = { - tickCharts, - resyncCharts, - cleanCharts, - checkExpiredMutings, - clean, - setLocalEmojiSizes, - verifyLinks, -} as Record< - string, - | Bull.ProcessCallbackFunction> - | Bull.ProcessPromiseFunction> ->; - -export default function (dbQueue: Bull.Queue>) { - for (const [k, v] of Object.entries(jobs)) { - dbQueue.process(k, v); - } - - dbQueue.process(noop); -} diff --git a/packages/backend/src/queue/processors/system/local-emoji-size.ts b/packages/backend/src/queue/processors/system/local-emoji-size.ts deleted file mode 100644 index d696bbd86..000000000 --- a/packages/backend/src/queue/processors/system/local-emoji-size.ts +++ /dev/null @@ -1,42 +0,0 @@ -import type Bull from "bull"; -import { IsNull } from "typeorm"; -import { Emojis } from "@/models/index.js"; - -import { queueLogger } from "../../logger.js"; -import { getEmojiSize } from "@/misc/emoji-meta.js"; - -const logger = queueLogger.createSubLogger("local-emoji-size"); - -export async function setLocalEmojiSizes( - _job: Bull.Job>, - done: any, -): Promise { - logger.info("Setting sizes of local emojis..."); - - const emojis = await Emojis.findBy([ - { host: IsNull(), width: IsNull(), height: IsNull() }, - ]); - logger.info(`${emojis.length} emojis need to be fetched.`); - - for (let i = 0; i < emojis.length; i++) { - try { - const size = await getEmojiSize(emojis[i].publicUrl); - await Emojis.update(emojis[i].id, { - width: size.width || null, - height: size.height || null, - }); - } catch (e) { - logger.error( - `Unable to set emoji size (${i + 1}/${emojis.length}): ${e}`, - ); - /* skip if any error happens */ - } finally { - // wait for 1sec so that this would not overwhelm the object storage. - await new Promise((resolve) => setTimeout(resolve, 1000)); - if (i % 10 === 9) logger.succ(`fetched ${i + 1}/${emojis.length} emojis`); - } - } - - logger.succ("Done."); - done(); -} diff --git a/packages/backend/src/queue/queues.ts b/packages/backend/src/queue/queues.ts index 6b0eb2de4..709db513f 100644 --- a/packages/backend/src/queue/queues.ts +++ b/packages/backend/src/queue/queues.ts @@ -1,41 +1,25 @@ -import config from "@/config/index.js"; -import { initialize as initializeQueue } from "./initialize.js"; -import type { - DeliverJobData, - InboxJobData, - DbJobData, - ObjectStorageJobData, - EndedPollNotificationJobData, - WebhookDeliverJobData, -} from "./types.js"; +import { dbQueue } from "./queues/db/index.js"; +import { deliverQueue } from "./queues/deliver.js"; +import { endedPollNotificationQueue } from "./queues/ended-poll-notification.js"; +import { inboxQueue } from "./queues/inbox.js"; +import { objectStorageQueue } from "./queues/object-storage/index.js"; +import { systemQueue } from "./queues/system/index.js"; +import { webhookDeliverQueue } from "./queues/webhook-deliver.js"; -export const systemQueue = initializeQueue>("system"); -export const endedPollNotificationQueue = - initializeQueue("endedPollNotification"); -export const deliverQueue = initializeQueue( - "deliver", - config.deliverJobPerSec || 128, -); -export const inboxQueue = initializeQueue( - "inbox", - config.inboxJobPerSec || 16, -); -export const dbQueue = initializeQueue("db", 256); -export const objectStorageQueue = - initializeQueue("objectStorage"); -export const webhookDeliverQueue = initializeQueue( - "webhookDeliver", - 64, -); -export const backgroundQueue = initializeQueue>("bg"); +export { dbQueue } from "./queues/db/index.js"; +export { deliverQueue } from "./queues/deliver.js"; +export { endedPollNotificationQueue } from "./queues/ended-poll-notification.js"; +export { inboxQueue } from "./queues/inbox.js"; +export { objectStorageQueue } from "./queues/object-storage/index.js"; +export { systemQueue } from "./queues/system/index.js"; +export { webhookDeliverQueue } from "./queues/webhook-deliver.js"; export const queues = [ - systemQueue, - endedPollNotificationQueue, - deliverQueue, - inboxQueue, dbQueue, + deliverQueue, + endedPollNotificationQueue, + inboxQueue, objectStorageQueue, + systemQueue, webhookDeliverQueue, - backgroundQueue, ]; diff --git a/packages/backend/src/queue/processors/db/delete-account.ts b/packages/backend/src/queue/queues/db/delete-account.ts similarity index 94% rename from packages/backend/src/queue/processors/db/delete-account.ts rename to packages/backend/src/queue/queues/db/delete-account.ts index 1dca4ae1a..2c5b2908c 100644 --- a/packages/backend/src/queue/processors/db/delete-account.ts +++ b/packages/backend/src/queue/queues/db/delete-account.ts @@ -1,4 +1,3 @@ -import type Bull from "bull"; import { queueLogger } from "../../logger.js"; import { DriveFiles, Notes, UserProfiles, Users } from "@/models/index.js"; import type { DbUserDeleteJobData } from "@/queue/types.js"; @@ -8,16 +7,17 @@ import { MoreThan } from "typeorm"; import { deleteFileSync } from "@/services/drive/delete-file.js"; import { sendEmail } from "@/services/send-email.js"; import { publishInternalEvent } from "@/services/stream.js"; +import { Job } from "bullmq"; const logger = queueLogger.createSubLogger("delete-account"); export async function deleteAccount( - job: Bull.Job, -): Promise { + job: Job, +): Promise { logger.info(`Deleting account of ${job.data.user.id} ...`); const user = await Users.findOneBy({ id: job.data.user.id }); - if (!user) return; + if (!user) return "skip: User not found"; const isLocal = Users.isLocalUser(user); { diff --git a/packages/backend/src/queue/processors/db/delete-drive-files.ts b/packages/backend/src/queue/queues/db/delete-drive-files.ts similarity index 84% rename from packages/backend/src/queue/processors/db/delete-drive-files.ts rename to packages/backend/src/queue/queues/db/delete-drive-files.ts index 28e477132..ebccee093 100644 --- a/packages/backend/src/queue/processors/db/delete-drive-files.ts +++ b/packages/backend/src/queue/queues/db/delete-drive-files.ts @@ -1,23 +1,20 @@ -import type Bull from "bull"; - import { queueLogger } from "../../logger.js"; import { deleteFileSync } from "@/services/drive/delete-file.js"; import { Users, DriveFiles } from "@/models/index.js"; import { MoreThan } from "typeorm"; import type { DbUserJobData } from "@/queue/types.js"; +import { Job } from "bullmq"; const logger = queueLogger.createSubLogger("delete-drive-files"); export async function deleteDriveFiles( - job: Bull.Job, - done: any, -): Promise { + job: Job, +): Promise { logger.info(`Deleting drive files of ${job.data.user.id} ...`); const user = await Users.findOneBy({ id: job.data.user.id }); if (user == null) { - done(); - return; + return "skip: User not found"; } let deletedCount = 0; @@ -36,7 +33,7 @@ export async function deleteDriveFiles( }); if (files.length === 0) { - job.progress(100); + job.updateProgress(100); break; } @@ -51,11 +48,11 @@ export async function deleteDriveFiles( userId: user.id, }); - job.progress(deletedCount / total); + job.updateProgress(deletedCount / total); } logger.succ( `All drive files (${deletedCount}) of ${user.id} has been deleted.`, ); - done(); + return "Success"; } diff --git a/packages/backend/src/queue/processors/db/export-blocking.ts b/packages/backend/src/queue/queues/db/export-blocking.ts similarity index 91% rename from packages/backend/src/queue/processors/db/export-blocking.ts rename to packages/backend/src/queue/queues/db/export-blocking.ts index 90da76b87..fb5f6be24 100644 --- a/packages/backend/src/queue/processors/db/export-blocking.ts +++ b/packages/backend/src/queue/queues/db/export-blocking.ts @@ -1,4 +1,3 @@ -import type Bull from "bull"; import * as fs from "node:fs"; import { queueLogger } from "../../logger.js"; @@ -9,19 +8,18 @@ import { createTemp } from "@/misc/create-temp.js"; import { Users, Blockings } from "@/models/index.js"; import { MoreThan } from "typeorm"; import type { DbUserJobData } from "@/queue/types.js"; +import { Job } from "bullmq"; const logger = queueLogger.createSubLogger("export-blocking"); export async function exportBlocking( - job: Bull.Job, - done: any, -): Promise { + job: Job, +): Promise { logger.info(`Exporting blocking of ${job.data.user.id} ...`); const user = await Users.findOneBy({ id: job.data.user.id }); if (user == null) { - done(); - return; + return "skip: User not found"; } // Create temp file @@ -48,7 +46,7 @@ export async function exportBlocking( }); if (blockings.length === 0) { - job.progress(100); + job.updateProgress(100); break; } @@ -79,7 +77,7 @@ export async function exportBlocking( blockerId: user.id, }); - job.progress(exportedCount / total); + job.updateProgress(exportedCount / total); } stream.end(); @@ -101,5 +99,5 @@ export async function exportBlocking( cleanup(); } - done(); + return "Success"; } diff --git a/packages/backend/src/queue/processors/db/export-custom-emojis.ts b/packages/backend/src/queue/queues/db/export-custom-emojis.ts similarity index 71% rename from packages/backend/src/queue/processors/db/export-custom-emojis.ts rename to packages/backend/src/queue/queues/db/export-custom-emojis.ts index 1374d6c4e..cc7d76bfb 100644 --- a/packages/backend/src/queue/processors/db/export-custom-emojis.ts +++ b/packages/backend/src/queue/queues/db/export-custom-emojis.ts @@ -1,4 +1,3 @@ -import type Bull from "bull"; import * as fs from "node:fs"; import { ulid } from "ulid"; @@ -8,24 +7,23 @@ import { queueLogger } from "../../logger.js"; import { addFile } from "@/services/drive/add-file.js"; import { format as dateFormat } from "date-fns"; import { Users, Emojis } from "@/models/index.js"; -import {} from "@/queue/types.js"; +import { DbUserJobData } from "../../types.js"; import { createTemp, createTempDir } from "@/misc/create-temp.js"; import { downloadUrl } from "@/misc/download-url.js"; import config from "@/config/index.js"; import { IsNull } from "typeorm"; +import { Job } from "bullmq"; const logger = queueLogger.createSubLogger("export-custom-emojis"); export async function exportCustomEmojis( - job: Bull.Job, - done: () => void, -): Promise { + job: Job, +): Promise { logger.info("Exporting custom emojis ..."); const user = await Users.findOneBy({ id: job.data.user.id }); if (user == null) { - done(); - return; + return "skip: User not found"; } const [path, cleanup] = await createTempDir(); @@ -102,31 +100,39 @@ export async function exportCustomEmojis( metaStream.end(); // Create archive - const [archivePath, archiveCleanup] = await createTemp(); - const archiveStream = fs.createWriteStream(archivePath); - const archive = archiver("zip", { - zlib: { level: 0 }, + await new Promise(async (resolve, reject) => { + try { + const [archivePath, archiveCleanup] = await createTemp(); + const archiveStream = fs.createWriteStream(archivePath); + const archive = archiver("zip", { + zlib: { level: 0 }, + }); + archiveStream.on("close", async () => { + logger.succ(`Exported to: ${archivePath}`); + + const fileName = `custom-emojis-${dateFormat( + new Date(), + "yyyy-MM-dd-HH-mm-ss", + )}.zip`; + const driveFile = await addFile({ + user, + path: archivePath, + name: fileName, + force: true, + }); + + logger.succ(`Exported to: ${driveFile.id}`); + cleanup(); + archiveCleanup(); + resolve(undefined); + }); + archive.pipe(archiveStream); + archive.directory(path, false); + archive.finalize(); + } catch (e) { + reject(e); + } }); - archiveStream.on("close", async () => { - logger.succ(`Exported to: ${archivePath}`); - const fileName = `custom-emojis-${dateFormat( - new Date(), - "yyyy-MM-dd-HH-mm-ss", - )}.zip`; - const driveFile = await addFile({ - user, - path: archivePath, - name: fileName, - force: true, - }); - - logger.succ(`Exported to: ${driveFile.id}`); - cleanup(); - archiveCleanup(); - done(); - }); - archive.pipe(archiveStream); - archive.directory(path, false); - archive.finalize(); + return "Success"; } diff --git a/packages/backend/src/queue/processors/db/export-following.ts b/packages/backend/src/queue/queues/db/export-following.ts similarity index 95% rename from packages/backend/src/queue/processors/db/export-following.ts rename to packages/backend/src/queue/queues/db/export-following.ts index 80e8e6b92..f921a987a 100644 --- a/packages/backend/src/queue/processors/db/export-following.ts +++ b/packages/backend/src/queue/queues/db/export-following.ts @@ -1,4 +1,3 @@ -import type Bull from "bull"; import * as fs from "node:fs"; import { queueLogger } from "../../logger.js"; @@ -10,19 +9,18 @@ import { Users, Followings, Mutings } from "@/models/index.js"; import { In, MoreThan, Not } from "typeorm"; import type { DbUserJobData } from "@/queue/types.js"; import type { Following } from "@/models/entities/following.js"; +import { Job } from "bullmq"; const logger = queueLogger.createSubLogger("export-following"); export async function exportFollowing( - job: Bull.Job, - done: () => void, -): Promise { + job: Job, +): Promise { logger.info(`Exporting following of ${job.data.user.id} ...`); const user = await Users.findOneBy({ id: job.data.user.id }); if (user == null) { - done(); - return; + return "skip: User not found"; } // Create temp file @@ -109,5 +107,5 @@ export async function exportFollowing( cleanup(); } - done(); + return "Success"; } diff --git a/packages/backend/src/queue/processors/db/export-mute.ts b/packages/backend/src/queue/queues/db/export-mute.ts similarity index 91% rename from packages/backend/src/queue/processors/db/export-mute.ts rename to packages/backend/src/queue/queues/db/export-mute.ts index 87b140b76..53cc660a6 100644 --- a/packages/backend/src/queue/processors/db/export-mute.ts +++ b/packages/backend/src/queue/queues/db/export-mute.ts @@ -1,4 +1,3 @@ -import type Bull from "bull"; import * as fs from "node:fs"; import { queueLogger } from "../../logger.js"; @@ -9,19 +8,18 @@ import { createTemp } from "@/misc/create-temp.js"; import { Users, Mutings } from "@/models/index.js"; import { IsNull, MoreThan } from "typeorm"; import type { DbUserJobData } from "@/queue/types.js"; +import { Job } from "bullmq"; const logger = queueLogger.createSubLogger("export-mute"); export async function exportMute( - job: Bull.Job, - done: any, -): Promise { + job: Job, +): Promise { logger.info(`Exporting mute of ${job.data.user.id} ...`); const user = await Users.findOneBy({ id: job.data.user.id }); if (user == null) { - done(); - return; + return "skip: User not found"; } // Create temp file @@ -49,7 +47,7 @@ export async function exportMute( }); if (mutes.length === 0) { - job.progress(100); + job.updateProgress(100); break; } @@ -80,7 +78,7 @@ export async function exportMute( muterId: user.id, }); - job.progress(exportedCount / total); + job.updateProgress(exportedCount / total); } stream.end(); @@ -102,5 +100,5 @@ export async function exportMute( cleanup(); } - done(); + return "Success"; } diff --git a/packages/backend/src/queue/processors/db/export-notes.ts b/packages/backend/src/queue/queues/db/export-notes.ts similarity index 92% rename from packages/backend/src/queue/processors/db/export-notes.ts rename to packages/backend/src/queue/queues/db/export-notes.ts index bf53f8360..9a2cc9639 100644 --- a/packages/backend/src/queue/processors/db/export-notes.ts +++ b/packages/backend/src/queue/queues/db/export-notes.ts @@ -1,4 +1,3 @@ -import type Bull from "bull"; import * as fs from "node:fs"; import { queueLogger } from "../../logger.js"; @@ -10,19 +9,18 @@ import type { Note } from "@/models/entities/note.js"; import type { Poll } from "@/models/entities/poll.js"; import type { DbUserJobData } from "@/queue/types.js"; import { createTemp } from "@/misc/create-temp.js"; +import { Job } from "bullmq"; const logger = queueLogger.createSubLogger("export-notes"); export async function exportNotes( - job: Bull.Job, - done: any, -): Promise { + job: Job, +): Promise { logger.info(`Exporting notes of ${job.data.user.id} ...`); const user = await Users.findOneBy({ id: job.data.user.id }); if (user == null) { - done(); - return; + return "skip: User not found"; } // Create temp file @@ -64,7 +62,7 @@ export async function exportNotes( })) as Note[]; if (notes.length === 0) { - job.progress(100); + job.updateProgress(100); break; } @@ -85,7 +83,7 @@ export async function exportNotes( userId: user.id, }); - job.progress(exportedNotesCount / total); + job.updateProgress(exportedNotesCount / total); } await write("]"); @@ -109,7 +107,7 @@ export async function exportNotes( cleanup(); } - done(); + return "Success"; } async function serialize( diff --git a/packages/backend/src/queue/processors/db/export-user-lists.ts b/packages/backend/src/queue/queues/db/export-user-lists.ts similarity index 93% rename from packages/backend/src/queue/processors/db/export-user-lists.ts rename to packages/backend/src/queue/queues/db/export-user-lists.ts index e0c9cd8f3..648295afd 100644 --- a/packages/backend/src/queue/processors/db/export-user-lists.ts +++ b/packages/backend/src/queue/queues/db/export-user-lists.ts @@ -1,4 +1,3 @@ -import type Bull from "bull"; import * as fs from "node:fs"; import { queueLogger } from "../../logger.js"; @@ -9,19 +8,18 @@ import { createTemp } from "@/misc/create-temp.js"; import { Users, UserLists, UserListJoinings } from "@/models/index.js"; import { In } from "typeorm"; import type { DbUserJobData } from "@/queue/types.js"; +import { Job } from "bullmq"; const logger = queueLogger.createSubLogger("export-user-lists"); export async function exportUserLists( - job: Bull.Job, - done: any, -): Promise { + job: Job, +): Promise { logger.info(`Exporting user lists of ${job.data.user.id} ...`); const user = await Users.findOneBy({ id: job.data.user.id }); if (user == null) { - done(); - return; + return "skip: User not found"; } const lists = await UserLists.findBy({ @@ -77,5 +75,5 @@ export async function exportUserLists( cleanup(); } - done(); + return "Success"; } diff --git a/packages/backend/src/queue/processors/db/import-blocking.ts b/packages/backend/src/queue/queues/db/import-blocking.ts similarity index 91% rename from packages/backend/src/queue/processors/db/import-blocking.ts rename to packages/backend/src/queue/queues/db/import-blocking.ts index 2fdf80a6e..2c480bd61 100644 --- a/packages/backend/src/queue/processors/db/import-blocking.ts +++ b/packages/backend/src/queue/queues/db/import-blocking.ts @@ -1,5 +1,3 @@ -import type Bull from "bull"; - import { queueLogger } from "../../logger.js"; import * as Acct from "@/misc/acct.js"; import { resolveUser } from "@/remote/resolve-user.js"; @@ -9,27 +7,25 @@ import { Users, DriveFiles, Blockings } from "@/models/index.js"; import type { DbUserImportJobData } from "@/queue/types.js"; import block from "@/services/blocking/create.js"; import { IsNull } from "typeorm"; +import { Job } from "bullmq"; const logger = queueLogger.createSubLogger("import-blocking"); export async function importBlocking( - job: Bull.Job, - done: any, -): Promise { + job: Job, +): Promise { logger.info(`Importing blocking of ${job.data.user.id} ...`); const user = await Users.findOneBy({ id: job.data.user.id }); if (user == null) { - done(); - return; + return "skip: User not found"; } const file = await DriveFiles.findOneBy({ id: job.data.fileId, }); if (file == null) { - done(); - return; + return "skip: File not found"; } const csv = await downloadTextFile(file.url); @@ -75,5 +71,5 @@ export async function importBlocking( } logger.succ("Imported"); - done(); + return "Success"; } diff --git a/packages/backend/src/queue/queues/db/import-custom-emojis.ts b/packages/backend/src/queue/queues/db/import-custom-emojis.ts new file mode 100644 index 000000000..2e0474a5b --- /dev/null +++ b/packages/backend/src/queue/queues/db/import-custom-emojis.ts @@ -0,0 +1,153 @@ +import * as fs from "node:fs"; +import AdmZip from "adm-zip"; + +import { queueLogger } from "../../logger.js"; +import { createTempDir } from "@/misc/create-temp.js"; +import { downloadUrl } from "@/misc/download-url.js"; +import { DriveFiles, Emojis } from "@/models/index.js"; +import type { DbUserImportJobData } from "@/queue/types.js"; +import { addFile } from "@/services/drive/add-file.js"; +import { genId } from "@/misc/gen-id.js"; +import { db } from "@/db/postgre.js"; +import probeImageSize from "probe-image-size"; +import * as path from "path"; +import { Job } from "bullmq"; + +const logger = queueLogger.createSubLogger("import-custom-emojis"); + +// TODO: 名前衝突時の動作を選べるようにする +export async function importCustomEmojis( + job: Job, +): Promise { + logger.info("Importing custom emojis ..."); + + const file = await DriveFiles.findOneBy({ + id: job.data.fileId, + }); + if (file == null) { + return "skip: File not found"; + } + + const [tempPath, cleanup] = await createTempDir(); + + logger.info(`Temp dir is ${tempPath}`); + + const destPath = `${tempPath}/emojis.zip`; + + try { + fs.writeFileSync(destPath, "", "binary"); + await downloadUrl(file.url, destPath); + } catch (e) { + // TODO: 何度か再試行 + if (e instanceof Error || typeof e === "string") { + logger.error(e); + } + throw e; + } + + const outputPath = `${tempPath}/emojis`; + const unzipStream = fs.createReadStream(destPath); + const zip = new AdmZip(destPath); + logger.succ(`Unzipping to ${outputPath}`); + + await new Promise((resolve, reject) => { + zip.extractAllToAsync(outputPath, true, false, async (error) => { + if (error) reject(error); + + if (fs.existsSync(`${outputPath}/meta.json`)) { + logger.info("starting emoji import with metadata"); + const metaRaw = fs.readFileSync(`${outputPath}/meta.json`, "utf-8"); + const meta = JSON.parse(metaRaw); + + for (const record of meta.emojis) { + if (!record.downloaded) continue; + const emojiInfo = record.emoji; + const emojiPath = `${outputPath}/${record.fileName}`; + await Emojis.delete({ + name: emojiInfo.name, + }); + const driveFile = await addFile({ + user: null, + path: emojiPath, + name: record.fileName, + force: true, + }); + const file = fs.createReadStream(emojiPath); + const size = await probeImageSize(file); + file.destroy(); + await Emojis.insert({ + id: genId(), + updatedAt: new Date(), + name: emojiInfo.name, + category: emojiInfo.category, + host: null, + aliases: emojiInfo.aliases, + originalUrl: driveFile.url, + publicUrl: driveFile.webpublicUrl ?? driveFile.url, + type: driveFile.webpublicType ?? driveFile.type, + license: emojiInfo.license, + width: size.width || null, + height: size.height || null, + }).then((x) => Emojis.findOneByOrFail(x.identifiers[0])); + } + } else { + logger.info("starting emoji import without metadata"); + // Since we lack metadata, we import into a randomized category name instead + let categoryName = genId(); + + let containedEmojis = fs.readdirSync(outputPath); + + // Filter out accidental JSON files + containedEmojis = containedEmojis.filter( + (emoji) => !emoji.match(/\.(json)$/i), + ); + + for (const emojiFilename of containedEmojis) { + // strip extension and get filename to use as name + const name = path.basename(emojiFilename, path.extname(emojiFilename)); + const emojiPath = `${outputPath}/${emojiFilename}`; + + logger.info(`importing ${name}`); + + await Emojis.delete({ + name: name, + }); + const driveFile = await addFile({ + user: null, + path: emojiPath, + name: path.basename(emojiFilename), + force: true, + }); + const file = fs.createReadStream(emojiPath); + const size = await probeImageSize(file); + file.destroy(); + logger.info(`emoji size: ${size.width}x${size.height}`); + + await Emojis.insert({ + id: genId(), + updatedAt: new Date(), + name: name, + category: categoryName, + host: null, + aliases: [], + originalUrl: driveFile.url, + publicUrl: driveFile.webpublicUrl ?? driveFile.url, + type: driveFile.webpublicType ?? driveFile.type, + license: null, + width: size.width || null, + height: size.height || null, + }).then((x) => Emojis.findOneByOrFail(x.identifiers[0])); + } + } + + await db.queryResultCache!.remove(["meta_emojis"]); + + cleanup(); + + logger.succ("Imported"); + resolve(undefined); + }); + }); + + return "Success"; +} diff --git a/packages/backend/src/queue/processors/db/import-following.ts b/packages/backend/src/queue/queues/db/import-following.ts similarity index 94% rename from packages/backend/src/queue/processors/db/import-following.ts rename to packages/backend/src/queue/queues/db/import-following.ts index d58c71276..88020e0cd 100644 --- a/packages/backend/src/queue/processors/db/import-following.ts +++ b/packages/backend/src/queue/queues/db/import-following.ts @@ -8,29 +8,26 @@ import { isSelfHost, toPuny } from "@/misc/convert-host.js"; import { Users, DriveFiles } from "@/models/index.js"; import type { DbUserImportJobData } from "@/queue/types.js"; import { queueLogger } from "../../logger.js"; -import type Bull from "bull"; import { cache as heuristic } from "@/server/api/common/generate-following-query.js"; +import { Job } from "bullmq"; const logger = queueLogger.createSubLogger("import-following"); export async function importFollowing( - job: Bull.Job, - done: any, -): Promise { + job: Job, +): Promise { logger.info(`Importing following of ${job.data.user.id} ...`); const user = await Users.findOneBy({ id: job.data.user.id }); if (user == null) { - done(); - return; + return "skip: User not found"; } const file = await DriveFiles.findOneBy({ id: job.data.fileId, }); if (file == null) { - done(); - return; + return "skip: File not found"; } const csv = await downloadTextFile(file.url); @@ -114,5 +111,5 @@ export async function importFollowing( await heuristic.delete(user.id); logger.succ("Imported"); - done(); + return "Success"; } diff --git a/packages/backend/src/queue/processors/db/import-muting.ts b/packages/backend/src/queue/queues/db/import-muting.ts similarity index 92% rename from packages/backend/src/queue/processors/db/import-muting.ts rename to packages/backend/src/queue/queues/db/import-muting.ts index 80e056739..a2aebb716 100644 --- a/packages/backend/src/queue/processors/db/import-muting.ts +++ b/packages/backend/src/queue/queues/db/import-muting.ts @@ -1,5 +1,3 @@ -import type Bull from "bull"; - import { queueLogger } from "../../logger.js"; import * as Acct from "@/misc/acct.js"; import { resolveUser } from "@/remote/resolve-user.js"; @@ -10,27 +8,25 @@ import type { DbUserImportJobData } from "@/queue/types.js"; import type { User } from "@/models/entities/user.js"; import { genId } from "@/misc/gen-id.js"; import { IsNull } from "typeorm"; +import { Job } from "bullmq"; const logger = queueLogger.createSubLogger("import-muting"); export async function importMuting( - job: Bull.Job, - done: any, -): Promise { + job: Job, +): Promise { logger.info(`Importing muting of ${job.data.user.id} ...`); const user = await Users.findOneBy({ id: job.data.user.id }); if (user == null) { - done(); - return; + return "skip: User not found"; } const file = await DriveFiles.findOneBy({ id: job.data.fileId, }); if (file == null) { - done(); - return; + return "skip: File not found"; } const csv = await downloadTextFile(file.url); @@ -76,7 +72,7 @@ export async function importMuting( } logger.succ("Imported"); - done(); + return "Success"; } async function mute(user: User, target: User) { diff --git a/packages/backend/src/queue/processors/db/import-user-lists.ts b/packages/backend/src/queue/queues/db/import-user-lists.ts similarity index 93% rename from packages/backend/src/queue/processors/db/import-user-lists.ts rename to packages/backend/src/queue/queues/db/import-user-lists.ts index 68ba001ec..75652d949 100644 --- a/packages/backend/src/queue/processors/db/import-user-lists.ts +++ b/packages/backend/src/queue/queues/db/import-user-lists.ts @@ -1,5 +1,3 @@ -import type Bull from "bull"; - import { queueLogger } from "../../logger.js"; import * as Acct from "@/misc/acct.js"; import { resolveUser } from "@/remote/resolve-user.js"; @@ -15,27 +13,25 @@ import { import { genId } from "@/misc/gen-id.js"; import type { DbUserImportJobData } from "@/queue/types.js"; import { IsNull } from "typeorm"; +import { Job } from "bullmq"; const logger = queueLogger.createSubLogger("import-user-lists"); export async function importUserLists( - job: Bull.Job, - done: any, -): Promise { + job: Job, +): Promise { logger.info(`Importing user lists of ${job.data.user.id} ...`); const user = await Users.findOneBy({ id: job.data.user.id }); if (user == null) { - done(); - return; + return "skip: User not found"; } const file = await DriveFiles.findOneBy({ id: job.data.fileId, }); if (file == null) { - done(); - return; + return "skip: File not found"; } const csv = await downloadTextFile(file.url); @@ -107,5 +103,5 @@ export async function importUserLists( } logger.succ("Imported"); - done(); + return "Success"; } diff --git a/packages/backend/src/queue/queues/db/index.ts b/packages/backend/src/queue/queues/db/index.ts new file mode 100644 index 000000000..edf107b38 --- /dev/null +++ b/packages/backend/src/queue/queues/db/index.ts @@ -0,0 +1,278 @@ +import { Job, Processor } from "bullmq"; +import { deleteAccount } from "./delete-account.js"; +import { deleteDriveFiles } from "./delete-drive-files.js"; +import { exportBlocking } from "./export-blocking.js"; +import { exportCustomEmojis } from "./export-custom-emojis.js"; +import { exportFollowing } from "./export-following.js"; +import { exportMute } from "./export-mute.js"; +import { exportNotes } from "./export-notes.js"; +import { exportUserLists } from "./export-user-lists.js"; +import { importBlocking } from "./import-blocking.js"; +import { importCustomEmojis } from "./import-custom-emojis.js"; +import { importFollowing } from "./import-following.js"; +import { importMuting } from "./import-muting.js"; +import { importUserLists } from "./import-user-lists.js"; +import { createQueue } from "../index.js"; +import { ThinUser } from "@/queue/types.js"; +import { DriveFile } from "@/models/entities/drive-file.js"; +import config from "@/config/index.js"; + +const processors = { + deleteAccount, + deleteDriveFiles, + exportBlocking, + exportCustomEmojis, + exportFollowing, + exportMute, + exportNotes, + exportUserLists, + importBlocking, + importCustomEmojis, + importFollowing, + importMuting, + importUserLists, +} as Record; + +async function process(job: Job): Promise { + const processor = processors[job.name]; + if (processor === undefined) return "skip: unknown job name"; + return await processor(job); +} + +export const [dbQueue, dbInit] = + createQueue("db", process, { limitPerSec: 256, concurrency: 16 }); + +export function createDeleteDriveFilesJob(user: ThinUser) { + return dbQueue.add( + "deleteDriveFiles", + { + user: user, + }, + { + removeOnComplete: true, + removeOnFail: true, + }, + ); +} + +export function createExportCustomEmojisJob(user: ThinUser) { + return dbQueue.add( + "exportCustomEmojis", + { + user: user, + }, + { + removeOnComplete: true, + removeOnFail: true, + }, + ); +} + +export function createExportNotesJob(user: ThinUser) { + return dbQueue.add( + "exportNotes", + { + user: user, + }, + { + removeOnComplete: true, + removeOnFail: true, + }, + ); +} + +export function createExportFollowingJob( + user: ThinUser, + excludeMuting = false, + excludeInactive = false, +) { + return dbQueue.add( + "exportFollowing", + { + user: user, + excludeMuting, + excludeInactive, + }, + { + removeOnComplete: true, + removeOnFail: true, + }, + ); +} + +export function createExportMuteJob(user: ThinUser) { + return dbQueue.add( + "exportMute", + { + user: user, + }, + { + removeOnComplete: true, + removeOnFail: true, + }, + ); +} + +export function createExportBlockingJob(user: ThinUser) { + return dbQueue.add( + "exportBlocking", + { + user: user, + }, + { + removeOnComplete: true, + removeOnFail: true, + }, + ); +} + +export function createExportUserListsJob(user: ThinUser) { + return dbQueue.add( + "exportUserLists", + { + user: user, + }, + { + removeOnComplete: true, + removeOnFail: true, + }, + ); +} + +export function createImportFollowingJob( + user: ThinUser, + fileId: DriveFile["id"], +) { + return dbQueue.add( + "importFollowing", + { + user: user, + fileId: fileId, + }, + { + removeOnComplete: true, + removeOnFail: true, + }, + ); +} + +export function createImportMastoPostJob( + user: ThinUser, + post: any, + signatureCheck: boolean, +) { + return dbQueue.add( + "importMastoPost", + { + user: user, + post: post, + signatureCheck: signatureCheck, + }, + { + removeOnComplete: true, + removeOnFail: true, + attempts: config.inboxJobMaxAttempts || 8, + }, + ); +} + +export function createImportCkPostJob( + user: ThinUser, + post: any, + signatureCheck: boolean, +) { + return dbQueue.add( + "importCkPost", + { + user: user, + post: post, + signatureCheck: signatureCheck, + }, + { + removeOnComplete: true, + removeOnFail: true, + }, + ); +} + +export function createImportMutingJob(user: ThinUser, fileId: DriveFile["id"]) { + return dbQueue.add( + "importMuting", + { + user: user, + fileId: fileId, + }, + { + removeOnComplete: true, + removeOnFail: true, + }, + ); +} + +export function createImportBlockingJob( + user: ThinUser, + fileId: DriveFile["id"], +) { + return dbQueue.add( + "importBlocking", + { + user: user, + fileId: fileId, + }, + { + removeOnComplete: true, + removeOnFail: true, + }, + ); +} + +export function createImportUserListsJob( + user: ThinUser, + fileId: DriveFile["id"], +) { + return dbQueue.add( + "importUserLists", + { + user: user, + fileId: fileId, + }, + { + removeOnComplete: true, + removeOnFail: true, + }, + ); +} + +export function createImportCustomEmojisJob( + user: ThinUser, + fileId: DriveFile["id"], +) { + return dbQueue.add( + "importCustomEmojis", + { + user: user, + fileId: fileId, + }, + { + removeOnComplete: true, + removeOnFail: true, + }, + ); +} + +export function createDeleteAccountJob( + user: ThinUser, + opts: { soft?: boolean } = {}, +) { + return dbQueue.add( + "deleteAccount", + { + user: user, + soft: opts.soft, + }, + { + removeOnComplete: true, + removeOnFail: true, + }, + ); +} diff --git a/packages/backend/src/queue/processors/deliver.ts b/packages/backend/src/queue/queues/deliver.ts similarity index 70% rename from packages/backend/src/queue/processors/deliver.ts rename to packages/backend/src/queue/queues/deliver.ts index 0e5b4ac79..a3544247c 100644 --- a/packages/backend/src/queue/processors/deliver.ts +++ b/packages/backend/src/queue/queues/deliver.ts @@ -12,14 +12,17 @@ import { fetchInstanceMetadata } from "@/services/fetch-instance-metadata.js"; import { toPuny } from "@/misc/convert-host.js"; import { StatusError } from "@/misc/fetch.js"; import { shouldSkipInstance } from "@/misc/skipped-instances.js"; -import type { DeliverJobData } from "@/queue/types.js"; -import type Bull from "bull"; +import type { DeliverJobData } from "../types.js"; +import config from "@/config/index.js"; +import { createQueue, processorTimeout } from "./index.js"; +import { ThinUser } from "../types.js"; +import { Job } from "bullmq"; -const logger = new Logger("deliver"); +export const deliverLogger = new Logger("deliver"); let latest: string | null = null; -export default async (job: Bull.Job) => { +async function process(job: Job) { if (job.data == null || Object.keys(job.data).length === 0) { job.opts.removeOnComplete = true; return "Skip (data was null or empty)"; @@ -31,7 +34,7 @@ export default async (job: Bull.Job) => { try { if (latest !== (latest = JSON.stringify(job.data.content, null, 2))) { - logger.debug(`delivering ${latest}`); + deliverLogger.debug(`delivering ${latest}`); } await request(job.data.user, job.data.to, job.data.content); @@ -83,3 +86,34 @@ export default async (job: Bull.Job) => { } } }; + +export const [deliverQueue, deliverInit] = createQueue( + "deliver", + processorTimeout(process, 60), + { + limitPerSec: config.deliverJobPerSec || 128, + concurrency: config.deliverJobConcurrency || 128, + }, +); + +export function deliverJob(user: ThinUser, content: unknown, to: string | null) { + if (content == null) return null; + if (to == null) return null; + + const data = { + user: { + id: user.id, + }, + content, + to, + }; + + return deliverQueue.add("default", data, { + attempts: config.deliverJobMaxAttempts || 12, + backoff: { + type: "custom", + }, + removeOnComplete: true, + removeOnFail: true, + }); +} diff --git a/packages/backend/src/queue/processors/ended-poll-notification.ts b/packages/backend/src/queue/queues/ended-poll-notification.ts similarity index 74% rename from packages/backend/src/queue/processors/ended-poll-notification.ts rename to packages/backend/src/queue/queues/ended-poll-notification.ts index dafa11719..5f5610010 100644 --- a/packages/backend/src/queue/processors/ended-poll-notification.ts +++ b/packages/backend/src/queue/queues/ended-poll-notification.ts @@ -1,25 +1,18 @@ -import type Bull from "bull"; import { Notes, PollVotes } from "@/models/index.js"; -import { queueLogger } from "../logger.js"; import type { EndedPollNotificationJobData } from "@/queue/types.js"; import { createNotification } from "@/services/create-notification.js"; import { deliverQuestionUpdate } from "@/services/note/polls/update.js"; +import { Job } from "bullmq"; +import { createQueue } from "./index.js"; -const logger = queueLogger.createSubLogger("ended-poll-notification"); - -export async function endedPollNotification( - job: Bull.Job, - done: any, -): Promise { +async function process(job: Job): Promise { if (job.data == null || Object.keys(job.data).length === 0) { job.opts.removeOnComplete = true; - done(); - return; + return "skip: corrupt job"; } const note = await Notes.findOneBy({ id: job.data.noteId }); if (note == null || !note.hasPoll) { - done(); - return; + return "skip: note not found"; } const votes = await PollVotes.createQueryBuilder("vote") @@ -41,5 +34,8 @@ export async function endedPollNotification( // Broadcast the poll result once it ends if (!note.localOnly) await deliverQuestionUpdate(note.id); - done(); + return "complete" } + +export const [endedPollNotificationQueue, endedPollNotificationInit] = + createQueue("endedPollNotification", process); diff --git a/packages/backend/src/queue/processors/inbox.ts b/packages/backend/src/queue/queues/inbox.ts similarity index 87% rename from packages/backend/src/queue/processors/inbox.ts rename to packages/backend/src/queue/queues/inbox.ts index 5c52fb11d..bb0bc2438 100644 --- a/packages/backend/src/queue/processors/inbox.ts +++ b/packages/backend/src/queue/queues/inbox.ts @@ -1,5 +1,4 @@ import { URL } from "node:url"; -import type Bull from "bull"; import httpSignature from "@peertube/http-signature"; import perform from "@/remote/activitypub/perform.js"; import Logger from "@/services/logger.js"; @@ -12,7 +11,7 @@ import { } from "@/services/chart/index.js"; import { fetchMeta } from "@/misc/fetch-meta.js"; import { toPuny, extractDbHost } from "@/misc/convert-host.js"; -import { getApId } from "@/remote/activitypub/type.js"; +import { IActivity, getApId } from "@/remote/activitypub/type.js"; import { fetchInstanceMetadata } from "@/services/fetch-instance-metadata.js"; import type { InboxJobData } from "../types.js"; import DbResolver from "@/remote/activitypub/db-resolver.js"; @@ -23,11 +22,14 @@ import type { CacheableRemoteUser } from "@/models/entities/user.js"; import type { UserPublickey } from "@/models/entities/user-publickey.js"; import { shouldBlockInstance } from "@/misc/should-block-instance.js"; import { verifySignature } from "@/remote/activitypub/check-fetch.js"; +import { Job } from "bullmq"; +import { createQueue, processorTimeout } from "./index.js"; +import config from "@/config/index.js"; -const logger = new Logger("inbox"); +export const inboxLogger = new Logger("inbox"); // Processing when an activity arrives in the user's inbox -export default async (job: Bull.Job): Promise => { +async function process(job: Job): Promise { if (job.data == null || Object.keys(job.data).length === 0) { job.opts.removeOnComplete = true; return "Skip (data was null or empty)"; @@ -38,12 +40,10 @@ export default async (job: Bull.Job): Promise => { //#region Log const info = Object.assign({}, activity) as any; info["@context"] = undefined; - logger.debug(JSON.stringify(info, null, 2)); + inboxLogger.debug(JSON.stringify(info, null, 2)); if (!signature?.keyId) { - const err = `Invalid signature: ${signature}`; - job.moveToFailed({ message: err }); - return err; + throw new Error(`Invalid signature: ${signature}`); } //#endregion const host = toPuny(new URL(signature.keyId).hostname); @@ -167,7 +167,7 @@ export default async (job: Bull.Job): Promise => { } // ブロックしてたら中断 - const ldHost = extractDbHost(authUser.user.uri); + const ldHost = extractDbHost(authUser.user.uri!); if (await shouldBlockInstance(ldHost, meta)) { return `Blocked request: ${ldHost}`; } @@ -225,3 +225,31 @@ export default async (job: Bull.Job): Promise => { await perform(authUser.user, activity); return "ok"; }; + +export const [inboxQueue, inboxInit] = createQueue( + "inbox", + processorTimeout(process, 5 * 60), + { + limitPerSec: config.inboxJobPerSec || 16, + concurrency: config.inboxJobConcurrency || 16, + }, +); + +export function inboxJob( + activity: IActivity, + signature: httpSignature.IParsedSignature, +) { + const data = { + activity: activity, + signature, + }; + + return inboxQueue.add("default", data, { + attempts: config.inboxJobMaxAttempts || 8, + backoff: { + type: "custom", + }, + removeOnComplete: true, + removeOnFail: true, + }); +} diff --git a/packages/backend/src/queue/queues/index.ts b/packages/backend/src/queue/queues/index.ts new file mode 100644 index 000000000..5a0240f70 --- /dev/null +++ b/packages/backend/src/queue/queues/index.ts @@ -0,0 +1,98 @@ +import { Worker, Queue, BackoffStrategy, Processor, Job } from "bullmq"; +import config from "@/config/index.js"; +import { queueLogger, renderError } from "../logger.js"; + +const connectionDetails = { + port: config.redis.port, + host: config.redis.host, + family: config.redis.family == null ? 0 : config.redis.family, + username: config.redis.user ?? "default", + password: config.redis.pass, + db: config.redis.db || 0, + tls: config.redis.tls, + lazyConnect: true, + maxRetriesPerRequest: null, +}; + +// ref. https://github.com/misskey-dev/misskey/pull/7635#issue-971097019 +function apBackoff(attemptsMade: number) { + const baseDelay = 60 * 1000; // 1min + const maxBackoff = 8 * 60 * 60 * 1000; // 8hours + let backoff = (Math.pow(2, attemptsMade) - 1) * baseDelay; + backoff = Math.min(backoff, maxBackoff); + backoff += Math.round(backoff * Math.random() * 0.2); + return backoff; +} + +export function createQueue( + name: string, + processor: Processor, + opts: { + backoff?: BackoffStrategy, + limitPerSec?: number, + concurrency?: number, + } = {} +): [Queue, () => Promise] { + const backoffStrategy = opts.backoff || apBackoff; + const sublogger = queueLogger.createSubLogger(name); + + const queue = new Queue( + name, + { + connection: connectionDetails, + prefix: config.redis.prefix, + }, + ); + queue.on("waiting", (job) => sublogger.debug(`waiting id=${job.id}`)); + + const initWorker = () => { + return new Worker( + name, + processor, + { + connection: connectionDetails, + prefix: config.redis.prefix, + settings: { + backoffStrategy, + }, + limiter: (opts.limitPerSec === undefined || opts.limitPerSec === -1) + ? undefined : { + max: opts.limitPerSec, + duration: 1000, + }, + concurrency: opts.concurrency || 1, + }, + ) + .on("ready", () => {}) + .on("active", (job) => sublogger.debug(`active id=${job.id}`)) + .on("completed", (job) => sublogger.debug(`completed id=${job.id}`)) + .on("failed", (job, err) => + sublogger.warn(`failed(${err}) id=${job?.id}`, { + job, + e: renderError(err), + }), + ) + .on("error", (err) => + sublogger.error(`error(${err})`, { e: renderError(err) }), + ) + .on("stalled", (jobId) => + sublogger.warn(`stalled id=${jobId}`), + ) + .waitUntilReady() + .then(() => {}); + } + + return [queue, initWorker]; +} + +export function processorTimeout

(processor: P, timeout: number) { + return (job: Job) => { + return new Promise((resolve, reject) => { + const timer = setTimeout(() => reject(new Error("timeout reached")), timeout * 1000); + processor(job) + .then(resolve) + .catch(reject) + .finally(() => clearTimeout(timer)); + }); + }; +} diff --git a/packages/backend/src/queue/processors/object-storage/clean-remote-files.ts b/packages/backend/src/queue/queues/object-storage/clean-remote-files.ts similarity index 84% rename from packages/backend/src/queue/processors/object-storage/clean-remote-files.ts rename to packages/backend/src/queue/queues/object-storage/clean-remote-files.ts index 6db84aabe..3cbc50a75 100644 --- a/packages/backend/src/queue/processors/object-storage/clean-remote-files.ts +++ b/packages/backend/src/queue/queues/object-storage/clean-remote-files.ts @@ -1,18 +1,15 @@ -import type Bull from "bull"; - import { queueLogger } from "../../logger.js"; import { deleteFileSync } from "@/services/drive/delete-file.js"; -import { DriveFiles, Users } from "@/models/index.js"; -import { MoreThan, Not, IsNull } from "typeorm"; +import { DriveFiles } from "@/models/index.js"; import { User } from "@/models/entities/user.js"; import config from "@/config/index.js"; +import { Job } from "bullmq"; const logger = queueLogger.createSubLogger("clean-remote-files"); -export default async function cleanRemoteFiles( - job: Bull.Job>, - done: any, -): Promise { +export async function cleanRemoteFiles( + job: Job>, +): Promise { let progress = 0; const untilDate = new Date(Date.now() - ((new Date()).getTimezoneOffset() * 60000)); untilDate.setDate(untilDate.getDate() - (config.mediaCleanup?.maxAgeDays ?? 0)); @@ -54,7 +51,7 @@ export default async function cleanRemoteFiles( const files = await query.getMany(); if (files.length === 0) { - job.progress(100); + job.updateProgress(100); break; } @@ -62,9 +59,9 @@ export default async function cleanRemoteFiles( progress += files.length; - job.progress((progress / total) * 100); + job.updateProgress((progress / total) * 100); } logger.succ(`Remote media cleanup job completed successfully.`); - done(); + return "Success"; } diff --git a/packages/backend/src/queue/processors/object-storage/delete-file.ts b/packages/backend/src/queue/queues/object-storage/delete-file.ts similarity index 66% rename from packages/backend/src/queue/processors/object-storage/delete-file.ts rename to packages/backend/src/queue/queues/object-storage/delete-file.ts index 174aa1906..bb57bc185 100644 --- a/packages/backend/src/queue/processors/object-storage/delete-file.ts +++ b/packages/backend/src/queue/queues/object-storage/delete-file.ts @@ -1,8 +1,8 @@ import type { ObjectStorageFileJobData } from "@/queue/types.js"; -import type Bull from "bull"; import { deleteObjectStorageFile } from "@/services/drive/delete-file.js"; +import { Job } from "bullmq"; -export default async (job: Bull.Job) => { +export async function deleteFile(job: Job): Promise { const key: string = job.data.key; await deleteObjectStorageFile(key); diff --git a/packages/backend/src/queue/queues/object-storage/index.ts b/packages/backend/src/queue/queues/object-storage/index.ts new file mode 100644 index 000000000..29df7f935 --- /dev/null +++ b/packages/backend/src/queue/queues/object-storage/index.ts @@ -0,0 +1,42 @@ +import { Job, Processor } from "bullmq"; +import { createQueue } from "../index.js"; +import { cleanRemoteFiles } from "./clean-remote-files.js"; +import { deleteFile } from "./delete-file.js"; + +const processors = { + cleanRemoteFiles, + deleteFile, +} as Record; + +async function process(job: Job): Promise { + const processor = processors[job.name]; + if (processor === undefined) return "skip: unknown job name"; + return await processor(job); +} + +export const [objectStorageQueue, objectStorageInit] = + createQueue("objectStorage", process, { concurrency: 16 }); + +export function createDeleteObjectStorageFileJob(key: string) { + return objectStorageQueue.add( + "deleteFile", + { + key: key, + }, + { + removeOnComplete: true, + removeOnFail: true, + }, + ); +} + +export function createCleanRemoteFilesJob() { + return objectStorageQueue.add( + "cleanRemoteFiles", + {}, + { + removeOnComplete: true, + removeOnFail: true, + }, + ); +} diff --git a/packages/backend/src/queue/processors/system/check-expired-mutings.ts b/packages/backend/src/queue/queues/system/check-expired-mutings.ts similarity index 83% rename from packages/backend/src/queue/processors/system/check-expired-mutings.ts rename to packages/backend/src/queue/queues/system/check-expired-mutings.ts index a482d0218..15c229ed8 100644 --- a/packages/backend/src/queue/processors/system/check-expired-mutings.ts +++ b/packages/backend/src/queue/queues/system/check-expired-mutings.ts @@ -1,4 +1,3 @@ -import type Bull from "bull"; import { In } from "typeorm"; import { Mutings } from "@/models/index.js"; import { queueLogger } from "../../logger.js"; @@ -6,10 +5,7 @@ import { publishUserEvent } from "@/services/stream.js"; const logger = queueLogger.createSubLogger("check-expired-mutings"); -export async function checkExpiredMutings( - job: Bull.Job>, - done: any, -): Promise { +export async function checkExpiredMutings(): Promise { logger.info("Checking expired mutings..."); const expired = await Mutings.createQueryBuilder("muting") @@ -29,5 +25,5 @@ export async function checkExpiredMutings( } logger.succ("All expired mutings checked."); - done(); + return "Success"; } diff --git a/packages/backend/src/queue/processors/system/clean-charts.ts b/packages/backend/src/queue/queues/system/clean-charts.ts similarity index 84% rename from packages/backend/src/queue/processors/system/clean-charts.ts rename to packages/backend/src/queue/queues/system/clean-charts.ts index dde5d95fe..15170e3c5 100644 --- a/packages/backend/src/queue/processors/system/clean-charts.ts +++ b/packages/backend/src/queue/queues/system/clean-charts.ts @@ -1,5 +1,4 @@ -import type Bull from "bull"; - +import { Job } from "bullmq"; import { queueLogger } from "../../logger.js"; import { activeUsersChart, @@ -18,10 +17,7 @@ import { const logger = queueLogger.createSubLogger("clean-charts"); -export async function cleanCharts( - job: Bull.Job>, - done: any, -): Promise { +export async function cleanCharts(): Promise { logger.info("Clean charts..."); await Promise.all([ @@ -40,5 +36,5 @@ export async function cleanCharts( ]); logger.succ("All charts successfully cleaned."); - done(); + return "Success"; } diff --git a/packages/backend/src/queue/processors/system/clean.ts b/packages/backend/src/queue/queues/system/clean.ts similarity index 66% rename from packages/backend/src/queue/processors/system/clean.ts rename to packages/backend/src/queue/queues/system/clean.ts index fbd45b0bb..1edfef40f 100644 --- a/packages/backend/src/queue/processors/system/clean.ts +++ b/packages/backend/src/queue/queues/system/clean.ts @@ -1,4 +1,3 @@ -import type Bull from "bull"; import { LessThan } from "typeorm"; import { UserIps } from "@/models/index.js"; @@ -6,16 +5,13 @@ import { queueLogger } from "../../logger.js"; const logger = queueLogger.createSubLogger("clean"); -export async function clean( - job: Bull.Job>, - done: any, -): Promise { +export async function clean(): Promise { logger.info("Cleaning..."); - UserIps.delete({ + await UserIps.delete({ createdAt: LessThan(new Date(Date.now() - 1000 * 60 * 60 * 24 * 90)), }); logger.succ("Cleaned."); - done(); + return "Success"; } diff --git a/packages/backend/src/queue/queues/system/index.ts b/packages/backend/src/queue/queues/system/index.ts new file mode 100644 index 000000000..42755ee9b --- /dev/null +++ b/packages/backend/src/queue/queues/system/index.ts @@ -0,0 +1,45 @@ +import { Job, Processor } from "bullmq"; +import { createQueue } from "../index.js"; +import { clean } from "./clean.js"; +import { cleanCharts } from "./clean-charts.js"; +import { checkExpiredMutings } from "./check-expired-mutings.js"; +import { resyncCharts } from "./resync-charts.js"; +import { tickCharts } from "./tick-charts.js"; +import { verifyLinks } from "./verify-links.js"; + +const processors = { + clean, + cleanCharts, + checkExpiredMutings, + resyncCharts, + tickCharts, + verifyLinks, +} as Record; + +async function process(job: Job): Promise { + const processor = processors[job.name]; + if (processor === undefined) return "skip: unknown job name"; + return await processor(job); +} + +const [systemQueue_, systemInitQueue] = + createQueue("system", process, { concurrency: Object.keys(processors).length }); +export const systemQueue = systemQueue_; + +export async function systemInit() { + await systemInitQueue(); + for (const { name, seconds } of [ + { name: "clean", seconds: 60 * 60 }, + { name: "cleanCharts", seconds: 60 * 60 }, + { name: "checkExpiredMutings", seconds: 5 * 60 }, + { name: "resyncCharts", seconds: 60 * 60 }, + { name: "tickCharts", seconds: 60 }, + { name: "verifyLinks", seconds: 60 * 60 * 24 }, + ]) { + await systemQueue.upsertJobScheduler( + name, + { every: seconds * 1000 }, + { opts: { removeOnComplete: true } } + ); + } +} diff --git a/packages/backend/src/queue/processors/system/resync-charts.ts b/packages/backend/src/queue/queues/system/resync-charts.ts similarity index 77% rename from packages/backend/src/queue/processors/system/resync-charts.ts rename to packages/backend/src/queue/queues/system/resync-charts.ts index dbea0df73..32634e586 100644 --- a/packages/backend/src/queue/processors/system/resync-charts.ts +++ b/packages/backend/src/queue/queues/system/resync-charts.ts @@ -1,14 +1,9 @@ -import type Bull from "bull"; - import { queueLogger } from "../../logger.js"; import { driveChart, notesChart, usersChart } from "@/services/chart/index.js"; const logger = queueLogger.createSubLogger("resync-charts"); -export async function resyncCharts( - job: Bull.Job>, - done: any, -): Promise { +export async function resyncCharts(): Promise { logger.info("Resync charts..."); // TODO: ユーザーごとのチャートも更新する @@ -20,5 +15,5 @@ export async function resyncCharts( ]); logger.succ("All charts successfully resynced."); - done(); + return "Success"; } diff --git a/packages/backend/src/queue/processors/system/tick-charts.ts b/packages/backend/src/queue/queues/system/tick-charts.ts similarity index 85% rename from packages/backend/src/queue/processors/system/tick-charts.ts rename to packages/backend/src/queue/queues/system/tick-charts.ts index 33eed8a59..110928f2f 100644 --- a/packages/backend/src/queue/processors/system/tick-charts.ts +++ b/packages/backend/src/queue/queues/system/tick-charts.ts @@ -1,5 +1,3 @@ -import type Bull from "bull"; - import { queueLogger } from "../../logger.js"; import { activeUsersChart, @@ -18,10 +16,7 @@ import { const logger = queueLogger.createSubLogger("tick-charts"); -export async function tickCharts( - job: Bull.Job>, - done: any, -): Promise { +export async function tickCharts(): Promise { logger.info("Tick charts..."); await Promise.all([ @@ -40,5 +35,5 @@ export async function tickCharts( ]); logger.succ("All charts successfully ticked."); - done(); + return "Success"; } diff --git a/packages/backend/src/queue/processors/system/verify-links.ts b/packages/backend/src/queue/queues/system/verify-links.ts similarity index 82% rename from packages/backend/src/queue/processors/system/verify-links.ts rename to packages/backend/src/queue/queues/system/verify-links.ts index 3ddda9baf..d5b4435d4 100644 --- a/packages/backend/src/queue/processors/system/verify-links.ts +++ b/packages/backend/src/queue/queues/system/verify-links.ts @@ -1,17 +1,11 @@ -import type Bull from "bull"; - import { UserProfiles } from "@/models/index.js"; import { Not } from "typeorm"; import { queueLogger } from "../../logger.js"; import { verifyLink } from "@/services/fetch-rel-me.js"; -import config from "@/config/index.js"; const logger = queueLogger.createSubLogger("verify-links"); -export async function verifyLinks( - job: Bull.Job>, - done: any, -): Promise { +export async function verifyLinks(): Promise { logger.info("Verifying links..."); const usersToVerify = await UserProfiles.findBy({ @@ -34,11 +28,11 @@ export async function verifyLinks( }); } catch (e) { logger.error(`Failed to update user ${user.userId} ${e}`); - done(e); + throw e; } } } logger.succ("All links successfully verified."); - done(); + return "Success"; } diff --git a/packages/backend/src/queue/processors/webhook-deliver.ts b/packages/backend/src/queue/queues/webhook-deliver.ts similarity index 64% rename from packages/backend/src/queue/processors/webhook-deliver.ts rename to packages/backend/src/queue/queues/webhook-deliver.ts index e821026dd..71601d289 100644 --- a/packages/backend/src/queue/processors/webhook-deliver.ts +++ b/packages/backend/src/queue/queues/webhook-deliver.ts @@ -1,14 +1,16 @@ -import { URL } from "node:url"; -import type Bull from "bull"; import Logger from "@/services/logger.js"; import type { WebhookDeliverJobData } from "../types.js"; import { getResponse, StatusError } from "@/misc/fetch.js"; import { Webhooks } from "@/models/index.js"; import config from "@/config/index.js"; +import { Job } from "bullmq"; +import { Webhook, webhookEventTypes } from "@/models/entities/webhook.js"; +import { v4 as uuid } from "uuid"; +import { createQueue, processorTimeout } from "./index.js"; const logger = new Logger("webhook"); -export default async (job: Bull.Job) => { +async function process(job: Job) { if (job.data == null || Object.keys(job.data).length === 0) { job.opts.removeOnComplete = true; return "Skip (data was null or empty)"; @@ -68,3 +70,36 @@ export default async (job: Bull.Job) => { } } }; + +export const [webhookDeliverQueue, webhookDeliverInit] = + createQueue( + "webhookDeliver", + processorTimeout(process, 60), + { limitPerSec: 64, concurrency: 64 }, + ); + +export function webhookDeliverJob( + webhook: Webhook, + type: typeof webhookEventTypes[number], + content: unknown, +) { + const data = { + type, + content, + webhookId: webhook.id, + userId: webhook.userId, + to: webhook.url, + secret: webhook.secret, + createdAt: Date.now(), + eventId: uuid(), + }; + + return webhookDeliverQueue.add("default", data, { + attempts: 4, + backoff: { + type: "custom", + }, + removeOnComplete: true, + removeOnFail: true, + }); +} diff --git a/packages/backend/src/server/api/endpoints.ts b/packages/backend/src/server/api/endpoints.ts index 430a0496e..32669d0b9 100644 --- a/packages/backend/src/server/api/endpoints.ts +++ b/packages/backend/src/server/api/endpoints.ts @@ -47,7 +47,6 @@ import * as ep___admin_relays_list from "./endpoints/admin/relays/list.js"; import * as ep___admin_relays_remove from "./endpoints/admin/relays/remove.js"; import * as ep___admin_resetPassword from "./endpoints/admin/reset-password.js"; import * as ep___admin_resolveAbuseUserReport from "./endpoints/admin/resolve-abuse-user-report.js"; -import * as ep___admin_search_indexAll from "./endpoints/admin/search/index-all.js"; import * as ep___admin_sendEmail from "./endpoints/admin/send-email.js"; import * as ep___admin_sendModMail from "./endpoints/admin/send-mod-mail.js"; import * as ep___admin_serverInfo from "./endpoints/admin/server-info.js"; @@ -393,7 +392,6 @@ const eps = [ ["admin/relays/remove", ep___admin_relays_remove], ["admin/reset-password", ep___admin_resetPassword], ["admin/resolve-abuse-user-report", ep___admin_resolveAbuseUserReport], - ["admin/search/index-all", ep___admin_search_indexAll], ["admin/send-email", ep___admin_sendEmail], ["admin/send-mod-mail", ep___admin_sendModMail], ["admin/server-info", ep___admin_serverInfo], diff --git a/packages/backend/src/server/api/endpoints/admin/queue/stats.ts b/packages/backend/src/server/api/endpoints/admin/queue/stats.ts index 4a437c3d1..ecd67d893 100644 --- a/packages/backend/src/server/api/endpoints/admin/queue/stats.ts +++ b/packages/backend/src/server/api/endpoints/admin/queue/stats.ts @@ -3,7 +3,6 @@ import { inboxQueue, dbQueue, objectStorageQueue, - backgroundQueue, } from "@/queue/queues.js"; import define from "../../../define.js"; @@ -38,11 +37,6 @@ export const meta = { nullable: false, ref: "QueueCount", }, - backgroundQueue: { - optional: false, - nullable: false, - ref: "QueueCount", - }, }, }, } as const; @@ -58,13 +52,11 @@ export default define(meta, paramDef, async (ps) => { const inboxJobCounts = await inboxQueue.getJobCounts(); const dbJobCounts = await dbQueue.getJobCounts(); const objectStorageJobCounts = await objectStorageQueue.getJobCounts(); - const backgroundJobCounts = await backgroundQueue.getJobCounts(); return { deliver: deliverJobCounts, inbox: inboxJobCounts, db: dbJobCounts, objectStorage: objectStorageJobCounts, - backgroundQueue: backgroundJobCounts, }; }); diff --git a/packages/backend/src/server/api/endpoints/admin/search/index-all.ts b/packages/backend/src/server/api/endpoints/admin/search/index-all.ts deleted file mode 100644 index 135b48ecc..000000000 --- a/packages/backend/src/server/api/endpoints/admin/search/index-all.ts +++ /dev/null @@ -1,28 +0,0 @@ -import define from "../../../define.js"; -import { createIndexAllNotesJob } from "@/queue/index.js"; - -export const meta = { - tags: ["admin"], - - requireCredential: true, - requireModerator: true, -} as const; - -export const paramDef = { - type: "object", - properties: { - cursor: { - type: "string", - format: "misskey:id", - nullable: true, - default: null, - }, - }, - required: [], -} as const; - -export default define(meta, paramDef, async (ps, _me) => { - createIndexAllNotesJob({ - cursor: ps.cursor ?? undefined, - }); -}); diff --git a/packages/backend/src/services/note/create.ts b/packages/backend/src/services/note/create.ts index 87dc63a48..46e4d6feb 100644 --- a/packages/backend/src/services/note/create.ts +++ b/packages/backend/src/services/note/create.ts @@ -386,6 +386,7 @@ export default async ( if (data.poll?.expiresAt) { const delay = data.poll.expiresAt.getTime() - Date.now(); endedPollNotificationQueue.add( + "default", { noteId: note.id, }, diff --git a/yarn.lock b/yarn.lock index 181c2d342..e996c1e17 100644 --- a/yarn.lock +++ b/yarn.lock @@ -545,19 +545,30 @@ __metadata: languageName: node linkType: hard -"@bull-board/koa@npm:5.6.0": - version: 5.6.0 - resolution: "@bull-board/koa@npm:5.6.0" +"@bull-board/api@npm:6.0.0": + version: 6.0.0 + resolution: "@bull-board/api@npm:6.0.0" dependencies: - "@bull-board/api": "npm:5.6.0" - "@bull-board/ui": "npm:5.6.0" - ejs: "npm:^3.1.7" + redis-info: "npm:^3.0.8" + peerDependencies: + "@bull-board/ui": 6.0.0 + checksum: 10/922e46f2c567e51f5dd657b6bac2a2c49de69f9bf75d7819a03b48666d5bef9a9e991f825b413545fad04c0fcbb68d7eadbe98c9c96cb8b4ce06a3a39bbde9d4 + languageName: node + linkType: hard + +"@bull-board/koa@npm:6.0.0": + version: 6.0.0 + resolution: "@bull-board/koa@npm:6.0.0" + dependencies: + "@bull-board/api": "npm:6.0.0" + "@bull-board/ui": "npm:6.0.0" + ejs: "npm:^3.1.10" koa: "npm:^2.13.1" koa-mount: "npm:^4.0.0" koa-router: "npm:^10.0.0" koa-static: "npm:^5.0.0" koa-views: "npm:^7.0.1" - checksum: 10/54aef8b937e78aedc4c1d109562c8f362b571ebdf9eadc1b4a1e6d6dde10a2e96761fe470ef254ccd7b947dbfba53c92d05f238a395f62ccd51bccfa3aed94a6 + checksum: 10/0d1bac34e860a1c3b744db8f7011a7c607bf6e630827b4f2a4d56f25db66a4d8da28ee041a961ce2b94c356e1d1bd238c638189b56595844dca44bd24c412479 languageName: node linkType: hard @@ -570,6 +581,15 @@ __metadata: languageName: node linkType: hard +"@bull-board/ui@npm:6.0.0": + version: 6.0.0 + resolution: "@bull-board/ui@npm:6.0.0" + dependencies: + "@bull-board/api": "npm:6.0.0" + checksum: 10/6d083063a39b988648d2cc4001cc9ffdb2b7c9fef4876d83b7f0a11417bc3deaa311505c1d16bc1b540ed77bf097f31726abf692f127dc2fedebebeab061dd98 + languageName: node + linkType: hard + "@colors/colors@npm:1.5.0": version: 1.5.0 resolution: "@colors/colors@npm:1.5.0" @@ -5445,9 +5465,9 @@ __metadata: version: 0.0.0-use.local resolution: "backend@workspace:packages/backend" dependencies: - "@bull-board/api": "npm:5.6.0" - "@bull-board/koa": "npm:5.6.0" - "@bull-board/ui": "npm:5.6.0" + "@bull-board/api": "npm:6.0.0" + "@bull-board/koa": "npm:6.0.0" + "@bull-board/ui": "npm:6.0.0" "@discordapp/twemoji": "npm:14.1.2" "@iceshrimp/summaly": "npm:2.7.2" "@koa/cors": "npm:3.4.3" @@ -5518,7 +5538,7 @@ __metadata: axios: "npm:^1.4.0" bcryptjs: "npm:2.4.3" blurhash: "npm:2.0.5" - bull: "npm:4.10.4" + bullmq: "npm:5.16.0" cacheable-lookup: "npm:7.0.0" cbor: "npm:8.1.0" chalk: "npm:5.3.0" @@ -5543,7 +5563,7 @@ __metadata: happy-dom: "npm:^12.10.3" hpagent: "npm:0.1.2" iceshrimp-sdk: "workspace:*" - ioredis: "npm:5.3.2" + ioredis: "npm:5.4.1" ip-cidr: "npm:3.1.0" is-svg: "npm:4.3.2" js-yaml: "npm:4.1.0" @@ -6060,19 +6080,18 @@ __metadata: languageName: node linkType: hard -"bull@npm:4.10.4": - version: 4.10.4 - resolution: "bull@npm:4.10.4" +"bullmq@npm:5.16.0": + version: 5.16.0 + resolution: "bullmq@npm:5.16.0" dependencies: - cron-parser: "npm:^4.2.1" - debuglog: "npm:^1.0.0" - get-port: "npm:^5.1.1" - ioredis: "npm:^5.0.0" - lodash: "npm:^4.17.21" - msgpackr: "npm:^1.5.2" - semver: "npm:^7.3.2" - uuid: "npm:^8.3.0" - checksum: 10/8a9909731741e49700639d5e4102829dea0c0a959894d61d521f12ec6a762b012389765fb58ba745cdf51cbeeb8e07e24367361819206bbebac89eb24b189332 + cron-parser: "npm:^4.6.0" + ioredis: "npm:^5.4.1" + msgpackr: "npm:^1.10.1" + node-abort-controller: "npm:^3.1.1" + semver: "npm:^7.5.4" + tslib: "npm:^2.0.0" + uuid: "npm:^9.0.0" + checksum: 10/39febf6e4a43c27a47e43c6d4cbfa0173cf801b5d804a0a2aac871fed376fdaf49a60eb1a0394292de1a21c11916086b424d4a244499be8a013c0d4b383f5c17 languageName: node linkType: hard @@ -7311,12 +7330,12 @@ __metadata: languageName: node linkType: hard -"cron-parser@npm:^4.2.1": - version: 4.8.1 - resolution: "cron-parser@npm:4.8.1" +"cron-parser@npm:^4.6.0": + version: 4.9.0 + resolution: "cron-parser@npm:4.9.0" dependencies: luxon: "npm:^3.2.1" - checksum: 10/5deb3f82166e5b55bf307e824888e0d661bbbf607dd7947b53e544bfbd7981ebda8c1e73416879ccc3b5ec093178718365e886e947d6a3962c4386a3eea5980f + checksum: 10/ffca5e532a5ee0923412ee6e4c7f9bbceacc6ddf8810c16d3e9fb4fe5ec7e2de1b6896d7956f304bb6bc96b0ce37ad7e3935304179d52951c18d84107184faa7 languageName: node linkType: hard @@ -7694,13 +7713,6 @@ __metadata: languageName: node linkType: hard -"debuglog@npm:^1.0.0": - version: 1.0.1 - resolution: "debuglog@npm:1.0.1" - checksum: 10/942a3196951ef139e3c19dc55583c1f9532fad92e293ffc6cbf8bb67562ea1aa013b5b86b4a89c2dd89e5e1c16e00b975e5ba3aa0a11070a3577e81162e6e29d - languageName: node - linkType: hard - "decamelize-keys@npm:^1.1.0": version: 1.1.1 resolution: "decamelize-keys@npm:1.1.1" @@ -8315,6 +8327,17 @@ __metadata: languageName: node linkType: hard +"ejs@npm:^3.1.10": + version: 3.1.10 + resolution: "ejs@npm:3.1.10" + dependencies: + jake: "npm:^10.8.5" + bin: + ejs: bin/cli.js + checksum: 10/a9cb7d7cd13b7b1cd0be5c4788e44dd10d92f7285d2f65b942f33e127230c054f99a42db4d99f766d8dbc6c57e94799593ee66a14efd7c8dd70c4812bf6aa384 + languageName: node + linkType: hard + "ejs@npm:^3.1.7": version: 3.1.9 resolution: "ejs@npm:3.1.9" @@ -10429,13 +10452,6 @@ __metadata: languageName: node linkType: hard -"get-port@npm:^5.1.1": - version: 5.1.1 - resolution: "get-port@npm:5.1.1" - checksum: 10/0162663ffe5c09e748cd79d97b74cd70e5a5c84b760a475ce5767b357fb2a57cb821cee412d646aa8a156ed39b78aab88974eddaa9e5ee926173c036c0713787 - languageName: node - linkType: hard - "get-stream@npm:^2.2.0": version: 2.3.1 resolution: "get-stream@npm:2.3.1" @@ -11678,9 +11694,9 @@ __metadata: languageName: node linkType: hard -"ioredis@npm:5.3.2, ioredis@npm:^5.0.0": - version: 5.3.2 - resolution: "ioredis@npm:5.3.2" +"ioredis@npm:5.4.1, ioredis@npm:^5.4.1": + version: 5.4.1 + resolution: "ioredis@npm:5.4.1" dependencies: "@ioredis/commands": "npm:^1.1.1" cluster-key-slot: "npm:^1.1.0" @@ -11691,7 +11707,7 @@ __metadata: redis-errors: "npm:^1.2.0" redis-parser: "npm:^3.0.0" standard-as-callback: "npm:^2.1.0" - checksum: 10/0140f055ef81d28e16ca8400b99dabb9ce82009f54afd83cba952c7d0c5d736841e43247765b8ee1af1f02843531c5b8df240af18bd3d7e2ca3d60b36e76213f + checksum: 10/9043b812ac58065e80c759d130602cc64490fcaeaacf93723453fda04c7ba61dab0e2f50380eacb045592378ededf44f270c0d43e13e3e8b8d7c5a8d7fecb823 languageName: node linkType: hard @@ -15004,7 +15020,7 @@ __metadata: languageName: node linkType: hard -"msgpackr@npm:1.9.5, msgpackr@npm:^1.5.2": +"msgpackr@npm:1.9.5": version: 1.9.5 resolution: "msgpackr@npm:1.9.5" dependencies: @@ -15016,6 +15032,18 @@ __metadata: languageName: node linkType: hard +"msgpackr@npm:^1.10.1": + version: 1.11.0 + resolution: "msgpackr@npm:1.11.0" + dependencies: + msgpackr-extract: "npm:^3.0.2" + dependenciesMeta: + msgpackr-extract: + optional: true + checksum: 10/e95edf511ab269b34e312a7bd058c203e1ef4dc0656df8ccf1a10e9cdb40fac4c4b62b42ea0b2d199f85a1a53704f7f47e28ed5af5311f66097c591eafbbf8f3 + languageName: node + linkType: hard + "multer@npm:1.4.4-lts.1": version: 1.4.4-lts.1 resolution: "multer@npm:1.4.4-lts.1" @@ -15164,6 +15192,13 @@ __metadata: languageName: node linkType: hard +"node-abort-controller@npm:^3.1.1": + version: 3.1.1 + resolution: "node-abort-controller@npm:3.1.1" + checksum: 10/0a2cdb7ec0aeaf3cb31e1ca0e192f5add48f1c5c9c9ed822129f9dddbd9432f69b7425982f94ce803c56a2104884530aa67cd57696e5774b2e5b8ec2f58de042 + languageName: node + linkType: hard + "node-addon-api@npm:^5.0.0": version: 5.1.0 resolution: "node-addon-api@npm:5.1.0" @@ -20181,6 +20216,13 @@ __metadata: languageName: node linkType: hard +"tslib@npm:^2.0.0": + version: 2.7.0 + resolution: "tslib@npm:2.7.0" + checksum: 10/9a5b47ddac65874fa011c20ff76db69f97cf90c78cff5934799ab8894a5342db2d17b4e7613a087046bc1d133d21547ddff87ac558abeec31ffa929c88b7fce6 + languageName: node + linkType: hard + "tslib@npm:^2.1.0, tslib@npm:^2.3.0, tslib@npm:^2.5.0, tslib@npm:^2.5.2, tslib@npm:^2.6.0": version: 2.6.1 resolution: "tslib@npm:2.6.1" @@ -20816,7 +20858,7 @@ __metadata: languageName: node linkType: hard -"uuid@npm:^8.3.0, uuid@npm:^8.3.2": +"uuid@npm:^8.3.2": version: 8.3.2 resolution: "uuid@npm:8.3.2" bin: