[backend] Switch job queue to BullMQ

Signed-off-by: mia <mia@mia.jetzt>
This commit is contained in:
mia 2024-10-04 19:39:57 -07:00 committed by Laura Hausmann
parent 7074a2efaf
commit 23533c1aaa
No known key found for this signature in database
GPG Key ID: D044E84C5BE01605
68 changed files with 1163 additions and 1501 deletions

1
.gitignore vendored
View File

@ -34,7 +34,6 @@ coverage
# misskey
built
db
elasticsearch
redis
meili_data

156
.pnp.cjs generated
View File

@ -1029,6 +1029,27 @@ const RAW_RUNTIME_STATE =
],\
"linkType": "SOFT"\
}],\
["npm:6.0.0", {\
"packageLocation": "./.yarn/cache/@bull-board-api-npm-6.0.0-78af39dbec-922e46f2c5.zip/node_modules/@bull-board/api/",\
"packageDependencies": [\
["@bull-board/api", "npm:6.0.0"]\
],\
"linkType": "SOFT"\
}],\
["virtual:1a204444c80de4bcde4565c72a3b0d6571f4eb3f918d7d48d8cd7ddf8392bb94fcf59c7a20ab1f49717ca9715358d5e7cac35936b84d89d5579286a3dc50f306#npm:6.0.0", {\
"packageLocation": "./.yarn/__virtual__/@bull-board-api-virtual-0339be2ffb/0/cache/@bull-board-api-npm-6.0.0-78af39dbec-922e46f2c5.zip/node_modules/@bull-board/api/",\
"packageDependencies": [\
["@bull-board/api", "virtual:1a204444c80de4bcde4565c72a3b0d6571f4eb3f918d7d48d8cd7ddf8392bb94fcf59c7a20ab1f49717ca9715358d5e7cac35936b84d89d5579286a3dc50f306#npm:6.0.0"],\
["@bull-board/ui", "npm:6.0.0"],\
["@types/bull-board__ui", null],\
["redis-info", "npm:3.1.0"]\
],\
"packagePeers": [\
"@bull-board/ui",\
"@types/bull-board__ui"\
],\
"linkType": "HARD"\
}],\
["virtual:6d3c013820dba430e71ebb352cb5205445a13ea3c7a848f57a7ff58fb0d6469fe4d374280277dac42cb77a6dbf8e924e64f2f0b3413c28a02da9d890c199e6d7#npm:5.6.0", {\
"packageLocation": "./.yarn/__virtual__/@bull-board-api-virtual-7db979021f/0/cache/@bull-board-api-npm-5.6.0-a1466ed4c8-951b27f057.zip/node_modules/@bull-board/api/",\
"packageDependencies": [\
@ -1045,18 +1066,18 @@ const RAW_RUNTIME_STATE =
}]\
]],\
["@bull-board/koa", [\
["npm:5.6.0", {\
"packageLocation": "./.yarn/cache/@bull-board-koa-npm-5.6.0-9bfe284953-54aef8b937.zip/node_modules/@bull-board/koa/",\
["npm:6.0.0", {\
"packageLocation": "./.yarn/cache/@bull-board-koa-npm-6.0.0-47e7552797-0d1bac34e8.zip/node_modules/@bull-board/koa/",\
"packageDependencies": [\
["@bull-board/koa", "npm:5.6.0"],\
["@bull-board/api", "virtual:6d3c013820dba430e71ebb352cb5205445a13ea3c7a848f57a7ff58fb0d6469fe4d374280277dac42cb77a6dbf8e924e64f2f0b3413c28a02da9d890c199e6d7#npm:5.6.0"],\
["@bull-board/ui", "npm:5.6.0"],\
["ejs", "npm:3.1.9"],\
["@bull-board/koa", "npm:6.0.0"],\
["@bull-board/api", "virtual:1a204444c80de4bcde4565c72a3b0d6571f4eb3f918d7d48d8cd7ddf8392bb94fcf59c7a20ab1f49717ca9715358d5e7cac35936b84d89d5579286a3dc50f306#npm:6.0.0"],\
["@bull-board/ui", "npm:6.0.0"],\
["ejs", "npm:3.1.10"],\
["koa", "npm:2.14.2"],\
["koa-mount", "npm:4.0.0"],\
["koa-router", "npm:10.1.1"],\
["koa-static", "npm:5.0.0"],\
["koa-views", "virtual:9bfe284953271243c18a986cde1b0b605f8cc92af0dd289bd01addbff3c00d207acc42c3b36ff53fa242e7954e50922cd3937d7d58025dbb42ac550d5824bf67#npm:7.0.2"]\
["koa-views", "virtual:47e7552797ac54a9c2afddff4b4e04d9d332a7c4ae9063d663783ce2e037b701b60b632afdcf95d0a9161f0abbca22d39846d9260e4899a8593e974d45bbf1e0#npm:7.0.2"]\
],\
"linkType": "HARD"\
}]\
@ -1069,6 +1090,14 @@ const RAW_RUNTIME_STATE =
["@bull-board/api", "virtual:6d3c013820dba430e71ebb352cb5205445a13ea3c7a848f57a7ff58fb0d6469fe4d374280277dac42cb77a6dbf8e924e64f2f0b3413c28a02da9d890c199e6d7#npm:5.6.0"]\
],\
"linkType": "HARD"\
}],\
["npm:6.0.0", {\
"packageLocation": "./.yarn/cache/@bull-board-ui-npm-6.0.0-1a204444c8-6d083063a3.zip/node_modules/@bull-board/ui/",\
"packageDependencies": [\
["@bull-board/ui", "npm:6.0.0"],\
["@bull-board/api", "virtual:1a204444c80de4bcde4565c72a3b0d6571f4eb3f918d7d48d8cd7ddf8392bb94fcf59c7a20ab1f49717ca9715358d5e7cac35936b84d89d5579286a3dc50f306#npm:6.0.0"]\
],\
"linkType": "HARD"\
}]\
]],\
["@colors/colors", [\
@ -7136,9 +7165,9 @@ const RAW_RUNTIME_STATE =
"packageLocation": "./packages/backend/",\
"packageDependencies": [\
["backend", "workspace:packages/backend"],\
["@bull-board/api", "virtual:6d3c013820dba430e71ebb352cb5205445a13ea3c7a848f57a7ff58fb0d6469fe4d374280277dac42cb77a6dbf8e924e64f2f0b3413c28a02da9d890c199e6d7#npm:5.6.0"],\
["@bull-board/koa", "npm:5.6.0"],\
["@bull-board/ui", "npm:5.6.0"],\
["@bull-board/api", "virtual:1a204444c80de4bcde4565c72a3b0d6571f4eb3f918d7d48d8cd7ddf8392bb94fcf59c7a20ab1f49717ca9715358d5e7cac35936b84d89d5579286a3dc50f306#npm:6.0.0"],\
["@bull-board/koa", "npm:6.0.0"],\
["@bull-board/ui", "npm:6.0.0"],\
["@discordapp/twemoji", "npm:14.1.2"],\
["@iceshrimp/summaly", "npm:2.7.2::__archiveUrl=https%3A%2F%2Ficeshrimp.dev%2Fapi%2Fpackages%2Ficeshrimp%2Fnpm%2F%2540iceshrimp%252Fsummaly%2F-%2F2.7.2%2Fsummaly-2.7.2.tgz"],\
["@koa/cors", "npm:3.4.3"],\
@ -7209,7 +7238,7 @@ const RAW_RUNTIME_STATE =
["axios", "npm:1.4.0"],\
["bcryptjs", "npm:2.4.3"],\
["blurhash", "npm:2.0.5"],\
["bull", "npm:4.10.4"],\
["bullmq", "npm:5.16.0"],\
["cacheable-lookup", "npm:7.0.0"],\
["cbor", "npm:8.1.0"],\
["chalk", "npm:5.3.0"],\
@ -7234,7 +7263,7 @@ const RAW_RUNTIME_STATE =
["happy-dom", "npm:12.10.3"],\
["hpagent", "npm:0.1.2"],\
["iceshrimp-sdk", "workspace:packages/iceshrimp-sdk"],\
["ioredis", "npm:5.3.2"],\
["ioredis", "npm:5.4.1"],\
["ip-cidr", "npm:3.1.0"],\
["is-svg", "npm:4.3.2"],\
["js-yaml", "npm:4.1.0"],\
@ -7803,19 +7832,18 @@ const RAW_RUNTIME_STATE =
"linkType": "HARD"\
}]\
]],\
["bull", [\
["npm:4.10.4", {\
"packageLocation": "./.yarn/cache/bull-npm-4.10.4-3465f09e40-8a99097317.zip/node_modules/bull/",\
["bullmq", [\
["npm:5.16.0", {\
"packageLocation": "./.yarn/cache/bullmq-npm-5.16.0-fe493a4098-39febf6e4a.zip/node_modules/bullmq/",\
"packageDependencies": [\
["bull", "npm:4.10.4"],\
["cron-parser", "npm:4.8.1"],\
["debuglog", "npm:1.0.1"],\
["get-port", "npm:5.1.1"],\
["ioredis", "npm:5.3.2"],\
["lodash", "npm:4.17.21"],\
["msgpackr", "npm:1.9.5"],\
["bullmq", "npm:5.16.0"],\
["cron-parser", "npm:4.9.0"],\
["ioredis", "npm:5.4.1"],\
["msgpackr", "npm:1.11.0"],\
["node-abort-controller", "npm:3.1.1"],\
["semver", "npm:7.5.4"],\
["uuid", "npm:8.3.2"]\
["tslib", "npm:2.7.0"],\
["uuid", "npm:9.0.0"]\
],\
"linkType": "HARD"\
}]\
@ -9093,10 +9121,10 @@ const RAW_RUNTIME_STATE =
],\
"linkType": "SOFT"\
}],\
["virtual:92dc05b84fde30e8037028575c990319f08cb08b0698e442e4c8d1ac4aceb7d666cb7b6454e308d767b637ef0226cb4e16e0c82a358c214878c7185041270440#npm:0.16.0", {\
"packageLocation": "./.yarn/__virtual__/consolidate-virtual-185da77412/0/cache/consolidate-npm-0.16.0-1a9b3c81f9-74b9bc2f1c.zip/node_modules/consolidate/",\
["virtual:86ee2a8ef0b659363be6b28aece63a5c4d251db21e8e56fe2ea2e03233d27c1fc9c744ff2a904e3dbda7f6a5fedaa3054e931f4e747f5b9deb58af95c389db98#npm:0.16.0", {\
"packageLocation": "./.yarn/__virtual__/consolidate-virtual-be4eb30fd6/0/cache/consolidate-npm-0.16.0-1a9b3c81f9-74b9bc2f1c.zip/node_modules/consolidate/",\
"packageDependencies": [\
["consolidate", "virtual:92dc05b84fde30e8037028575c990319f08cb08b0698e442e4c8d1ac4aceb7d666cb7b6454e308d767b637ef0226cb4e16e0c82a358c214878c7185041270440#npm:0.16.0"],\
["consolidate", "virtual:86ee2a8ef0b659363be6b28aece63a5c4d251db21e8e56fe2ea2e03233d27c1fc9c744ff2a904e3dbda7f6a5fedaa3054e931f4e747f5b9deb58af95c389db98#npm:0.16.0"],\
["@types/arc-templates", null],\
["@types/atpl", null],\
["@types/babel-core", null],\
@ -9448,10 +9476,10 @@ const RAW_RUNTIME_STATE =
}]\
]],\
["cron-parser", [\
["npm:4.8.1", {\
"packageLocation": "./.yarn/cache/cron-parser-npm-4.8.1-53e673fffa-5deb3f8216.zip/node_modules/cron-parser/",\
["npm:4.9.0", {\
"packageLocation": "./.yarn/cache/cron-parser-npm-4.9.0-2a573f98e9-ffca5e532a.zip/node_modules/cron-parser/",\
"packageDependencies": [\
["cron-parser", "npm:4.8.1"],\
["cron-parser", "npm:4.9.0"],\
["luxon", "npm:3.3.0"]\
],\
"linkType": "HARD"\
@ -9896,15 +9924,6 @@ const RAW_RUNTIME_STATE =
"linkType": "HARD"\
}]\
]],\
["debuglog", [\
["npm:1.0.1", {\
"packageLocation": "./.yarn/cache/debuglog-npm-1.0.1-c553c84ea5-942a319695.zip/node_modules/debuglog/",\
"packageDependencies": [\
["debuglog", "npm:1.0.1"]\
],\
"linkType": "HARD"\
}]\
]],\
["decamelize", [\
["npm:1.2.0", {\
"packageLocation": "./.yarn/cache/decamelize-npm-1.2.0-c5a2fdc622-ad8c51a7e7.zip/node_modules/decamelize/",\
@ -10593,6 +10612,14 @@ const RAW_RUNTIME_STATE =
}]\
]],\
["ejs", [\
["npm:3.1.10", {\
"packageLocation": "./.yarn/cache/ejs-npm-3.1.10-4e8cf4bdc1-a9cb7d7cd1.zip/node_modules/ejs/",\
"packageDependencies": [\
["ejs", "npm:3.1.10"],\
["jake", "npm:10.8.7"]\
],\
"linkType": "HARD"\
}],\
["npm:3.1.9", {\
"packageLocation": "./.yarn/cache/ejs-npm-3.1.9-e201b2088c-71f56d3754.zip/node_modules/ejs/",\
"packageDependencies": [\
@ -13093,15 +13120,6 @@ const RAW_RUNTIME_STATE =
"linkType": "HARD"\
}]\
]],\
["get-port", [\
["npm:5.1.1", {\
"packageLocation": "./.yarn/cache/get-port-npm-5.1.1-2f6074007a-0162663ffe.zip/node_modules/get-port/",\
"packageDependencies": [\
["get-port", "npm:5.1.1"]\
],\
"linkType": "HARD"\
}]\
]],\
["get-stream", [\
["npm:2.3.1", {\
"packageLocation": "./.yarn/cache/get-stream-npm-2.3.1-1755f3cab9-712738e6a3.zip/node_modules/get-stream/",\
@ -14423,10 +14441,10 @@ const RAW_RUNTIME_STATE =
}]\
]],\
["ioredis", [\
["npm:5.3.2", {\
"packageLocation": "./.yarn/cache/ioredis-npm-5.3.2-58471071b1-0140f055ef.zip/node_modules/ioredis/",\
["npm:5.4.1", {\
"packageLocation": "./.yarn/cache/ioredis-npm-5.4.1-c96e18ae67-9043b812ac.zip/node_modules/ioredis/",\
"packageDependencies": [\
["ioredis", "npm:5.3.2"],\
["ioredis", "npm:5.4.1"],\
["@ioredis/commands", "npm:1.2.0"],\
["cluster-key-slot", "npm:1.1.2"],\
["debug", "virtual:ac3d8e680759ce54399273724d44e041d6c9b73454d191d411a8c44bb27e22f02aaf6ed9d3ad0ac1c298eac4833cff369c9c7b84c573016112c4f84be2cd8543#npm:4.3.4"],\
@ -16822,12 +16840,12 @@ const RAW_RUNTIME_STATE =
],\
"linkType": "SOFT"\
}],\
["virtual:9bfe284953271243c18a986cde1b0b605f8cc92af0dd289bd01addbff3c00d207acc42c3b36ff53fa242e7954e50922cd3937d7d58025dbb42ac550d5824bf67#npm:7.0.2", {\
"packageLocation": "./.yarn/__virtual__/koa-views-virtual-92dc05b84f/0/cache/koa-views-npm-7.0.2-f4a5c0091b-edff754c9f.zip/node_modules/koa-views/",\
["virtual:47e7552797ac54a9c2afddff4b4e04d9d332a7c4ae9063d663783ce2e037b701b60b632afdcf95d0a9161f0abbca22d39846d9260e4899a8593e974d45bbf1e0#npm:7.0.2", {\
"packageLocation": "./.yarn/__virtual__/koa-views-virtual-86ee2a8ef0/0/cache/koa-views-npm-7.0.2-f4a5c0091b-edff754c9f.zip/node_modules/koa-views/",\
"packageDependencies": [\
["koa-views", "virtual:9bfe284953271243c18a986cde1b0b605f8cc92af0dd289bd01addbff3c00d207acc42c3b36ff53fa242e7954e50922cd3937d7d58025dbb42ac550d5824bf67#npm:7.0.2"],\
["koa-views", "virtual:47e7552797ac54a9c2afddff4b4e04d9d332a7c4ae9063d663783ce2e037b701b60b632afdcf95d0a9161f0abbca22d39846d9260e4899a8593e974d45bbf1e0#npm:7.0.2"],\
["@types/koa", null],\
["consolidate", "virtual:92dc05b84fde30e8037028575c990319f08cb08b0698e442e4c8d1ac4aceb7d666cb7b6454e308d767b637ef0226cb4e16e0c82a358c214878c7185041270440#npm:0.16.0"],\
["consolidate", "virtual:86ee2a8ef0b659363be6b28aece63a5c4d251db21e8e56fe2ea2e03233d27c1fc9c744ff2a904e3dbda7f6a5fedaa3054e931f4e747f5b9deb58af95c389db98#npm:0.16.0"],\
["debug", "virtual:ac3d8e680759ce54399273724d44e041d6c9b73454d191d411a8c44bb27e22f02aaf6ed9d3ad0ac1c298eac4833cff369c9c7b84c573016112c4f84be2cd8543#npm:4.3.4"],\
["get-paths", "npm:0.0.7"],\
["koa-send", "npm:5.0.1"],\
@ -16845,7 +16863,7 @@ const RAW_RUNTIME_STATE =
"packageDependencies": [\
["koa-views", "virtual:aa59773ac87791c4813d53447077fcf8a847d6de5a301d34dc31286584b1dbb26d30d3adb5b4c41c1e8aea04371e926fda05c09c6253647c432e11d872a304ba#npm:7.0.2"],\
["@types/koa", "npm:2.13.6"],\
["consolidate", "virtual:92dc05b84fde30e8037028575c990319f08cb08b0698e442e4c8d1ac4aceb7d666cb7b6454e308d767b637ef0226cb4e16e0c82a358c214878c7185041270440#npm:0.16.0"],\
["consolidate", "virtual:86ee2a8ef0b659363be6b28aece63a5c4d251db21e8e56fe2ea2e03233d27c1fc9c744ff2a904e3dbda7f6a5fedaa3054e931f4e747f5b9deb58af95c389db98#npm:0.16.0"],\
["debug", "virtual:ac3d8e680759ce54399273724d44e041d6c9b73454d191d411a8c44bb27e22f02aaf6ed9d3ad0ac1c298eac4833cff369c9c7b84c573016112c4f84be2cd8543#npm:4.3.4"],\
["get-paths", "npm:0.0.7"],\
["koa-send", "npm:5.0.1"],\
@ -18107,6 +18125,14 @@ const RAW_RUNTIME_STATE =
}]\
]],\
["msgpackr", [\
["npm:1.11.0", {\
"packageLocation": "./.yarn/cache/msgpackr-npm-1.11.0-c075b2537e-e95edf511a.zip/node_modules/msgpackr/",\
"packageDependencies": [\
["msgpackr", "npm:1.11.0"],\
["msgpackr-extract", "npm:3.0.2"]\
],\
"linkType": "HARD"\
}],\
["npm:1.9.5", {\
"packageLocation": "./.yarn/cache/msgpackr-npm-1.9.5-69f0e8f5b8-d95fbee39b.zip/node_modules/msgpackr/",\
"packageDependencies": [\
@ -18300,6 +18326,15 @@ const RAW_RUNTIME_STATE =
"linkType": "HARD"\
}]\
]],\
["node-abort-controller", [\
["npm:3.1.1", {\
"packageLocation": "./.yarn/cache/node-abort-controller-npm-3.1.1-e246ed42cd-0a2cdb7ec0.zip/node_modules/node-abort-controller/",\
"packageDependencies": [\
["node-abort-controller", "npm:3.1.1"]\
],\
"linkType": "HARD"\
}]\
]],\
["node-addon-api", [\
["npm:5.1.0", {\
"packageLocation": "./.yarn/unplugged/node-addon-api-npm-5.1.0-b50d00f739/node_modules/node-addon-api/",\
@ -21142,7 +21177,7 @@ const RAW_RUNTIME_STATE =
["redis-semaphore", "virtual:aa59773ac87791c4813d53447077fcf8a847d6de5a301d34dc31286584b1dbb26d30d3adb5b4c41c1e8aea04371e926fda05c09c6253647c432e11d872a304ba#npm:5.3.1"],\
["@types/ioredis", null],\
["debug", "virtual:ac3d8e680759ce54399273724d44e041d6c9b73454d191d411a8c44bb27e22f02aaf6ed9d3ad0ac1c298eac4833cff369c9c7b84c573016112c4f84be2cd8543#npm:4.3.4"],\
["ioredis", "npm:5.3.2"]\
["ioredis", "npm:5.4.1"]\
],\
"packagePeers": [\
"@types/ioredis",\
@ -23900,6 +23935,13 @@ const RAW_RUNTIME_STATE =
["tslib", "npm:2.6.2"]\
],\
"linkType": "HARD"\
}],\
["npm:2.7.0", {\
"packageLocation": "./.yarn/cache/tslib-npm-2.7.0-21668f5c21-9a5b47ddac.zip/node_modules/tslib/",\
"packageDependencies": [\
["tslib", "npm:2.7.0"]\
],\
"linkType": "HARD"\
}]\
]],\
["tsscmp", [\
@ -24171,7 +24213,7 @@ const RAW_RUNTIME_STATE =
["dotenv", "npm:16.3.1"],\
["glob", "npm:8.1.0"],\
["hdb-pool", null],\
["ioredis", "npm:5.3.2"],\
["ioredis", "npm:5.4.1"],\
["mkdirp", "npm:2.1.6"],\
["mongodb", null],\
["mssql", null],\

BIN
.yarn/cache/@bull-board-api-npm-6.0.0-78af39dbec-922e46f2c5.zip (Stored with Git LFS) vendored Normal file

Binary file not shown.

Binary file not shown.

BIN
.yarn/cache/@bull-board-koa-npm-6.0.0-47e7552797-0d1bac34e8.zip (Stored with Git LFS) vendored Normal file

Binary file not shown.

BIN
.yarn/cache/@bull-board-ui-npm-6.0.0-1a204444c8-6d083063a3.zip (Stored with Git LFS) vendored Normal file

Binary file not shown.

BIN
.yarn/cache/bull-npm-4.10.4-3465f09e40-8a99097317.zip (Stored with Git LFS) vendored

Binary file not shown.

BIN
.yarn/cache/bullmq-npm-5.16.0-fe493a4098-39febf6e4a.zip (Stored with Git LFS) vendored Normal file

Binary file not shown.

Binary file not shown.

BIN
.yarn/cache/cron-parser-npm-4.9.0-2a573f98e9-ffca5e532a.zip (Stored with Git LFS) vendored Normal file

Binary file not shown.

Binary file not shown.

BIN
.yarn/cache/ejs-npm-3.1.10-4e8cf4bdc1-a9cb7d7cd1.zip (Stored with Git LFS) vendored Normal file

Binary file not shown.

Binary file not shown.

BIN
.yarn/cache/ioredis-npm-5.3.2-58471071b1-0140f055ef.zip (Stored with Git LFS) vendored

Binary file not shown.

BIN
.yarn/cache/ioredis-npm-5.4.1-c96e18ae67-9043b812ac.zip (Stored with Git LFS) vendored Normal file

Binary file not shown.

BIN
.yarn/cache/msgpackr-npm-1.11.0-c075b2537e-e95edf511a.zip (Stored with Git LFS) vendored Normal file

Binary file not shown.

BIN
.yarn/cache/node-abort-controller-npm-3.1.1-e246ed42cd-0a2cdb7ec0.zip (Stored with Git LFS) vendored Normal file

Binary file not shown.

BIN
.yarn/cache/tslib-npm-2.7.0-21668f5c21-9a5b47ddac.zip (Stored with Git LFS) vendored Normal file

Binary file not shown.

View File

@ -23,9 +23,9 @@
"@types/formidable": "^2.0.5"
},
"dependencies": {
"@bull-board/api": "5.6.0",
"@bull-board/koa": "5.6.0",
"@bull-board/ui": "5.6.0",
"@bull-board/api": "6.0.0",
"@bull-board/koa": "6.0.0",
"@bull-board/ui": "6.0.0",
"@discordapp/twemoji": "14.1.2",
"@iceshrimp/summaly": "2.7.2",
"@koa/cors": "3.4.3",
@ -47,7 +47,7 @@
"axios": "^1.4.0",
"bcryptjs": "2.4.3",
"blurhash": "2.0.5",
"bull": "4.10.4",
"bullmq": "5.16.0",
"cacheable-lookup": "7.0.0",
"cbor": "8.1.0",
"chalk": "5.3.0",
@ -69,7 +69,7 @@
"happy-dom": "^12.10.3",
"hpagent": "0.1.2",
"iceshrimp-sdk": "workspace:*",
"ioredis": "5.3.2",
"ioredis": "5.4.1",
"ip-cidr": "3.1.0",
"is-svg": "4.3.2",
"js-yaml": "4.1.0",

View File

@ -1,18 +0,0 @@
import type Bull from "bull";
export function getJobInfo(job: Bull.Job, increment = false) {
const age = Date.now() - job.timestamp;
const formated =
age > 60000
? `${Math.floor(age / 1000 / 60)}m`
: age > 10000
? `${Math.floor(age / 1000)}s`
: `${age}ms`;
// onActiveとかonCompletedのattemptsMadeがなぜか0始まりなのでインクリメントする
const currentAttempts = job.attemptsMade + (increment ? 1 : 0);
const maxAttempts = job.opts ? job.opts.attempts : 0;
return `id=${job.id} attempts=${currentAttempts}/${maxAttempts} age=${formated}`;
}

View File

@ -1,617 +1,56 @@
import type httpSignature from "@peertube/http-signature";
import { v4 as uuid } from "uuid";
import { deliverQueue, inboxQueue } from "./queues.js";
import { dbInit } from "./queues/db/index.js";
import { deliverInit, deliverLogger } from "./queues/deliver.js";
import { endedPollNotificationInit } from "./queues/ended-poll-notification.js";
import { inboxInit, inboxLogger } from "./queues/inbox.js";
import { objectStorageInit } from "./queues/object-storage/index.js";
import { systemInit } from "./queues/system/index.js";
import { webhookDeliverInit } from "./queues/webhook-deliver.js";
import config from "@/config/index.js";
import type { DriveFile } from "@/models/entities/drive-file.js";
import type { IActivity } from "@/remote/activitypub/type.js";
import type { Webhook, webhookEventTypes } from "@/models/entities/webhook.js";
import { envOption } from "../env.js";
export {
createDeleteDriveFilesJob,
createExportCustomEmojisJob,
createExportNotesJob,
createExportFollowingJob,
createExportMuteJob,
createExportBlockingJob,
createExportUserListsJob,
createImportFollowingJob,
createImportMutingJob,
createImportBlockingJob,
createImportUserListsJob,
createImportCustomEmojisJob,
createDeleteAccountJob,
} from "./queues/db/index.js";
export {
createDeleteObjectStorageFileJob,
createCleanRemoteFilesJob,
} from "./queues/object-storage/index.js";
export { systemQueue } from "./queues/system/index.js";
export { deliverJob as deliver, deliverQueue } from "./queues/deliver.js";
export { endedPollNotificationQueue } from "./queues/ended-poll-notification.js";
export { inboxJob as inbox, inboxQueue } from "./queues/inbox.js";
export { webhookDeliverJob as webhookDeliver, webhookDeliverQueue } from "./queues/webhook-deliver.js";
import processDeliver from "./processors/deliver.js";
import processInbox from "./processors/inbox.js";
import processDb from "./processors/db/index.js";
import processObjectStorage from "./processors/object-storage/index.js";
import processSystemQueue from "./processors/system/index.js";
import processWebhookDeliver from "./processors/webhook-deliver.js";
import processBackground from "./processors/background/index.js";
import { endedPollNotification } from "./processors/ended-poll-notification.js";
import { queueLogger } from "./logger.js";
import { getJobInfo } from "./get-job-info.js";
import {
systemQueue,
dbQueue,
deliverQueue,
inboxQueue,
objectStorageQueue,
endedPollNotificationQueue,
webhookDeliverQueue,
backgroundQueue,
} from "./queues.js";
import type { ThinUser } from "./types.js";
function renderError(e: Error): any {
return {
stack: e.stack,
message: e.message,
name: e.name,
};
}
const systemLogger = queueLogger.createSubLogger("system");
const deliverLogger = queueLogger.createSubLogger("deliver");
const webhookLogger = queueLogger.createSubLogger("webhook");
const inboxLogger = queueLogger.createSubLogger("inbox");
const dbLogger = queueLogger.createSubLogger("db");
const objectStorageLogger = queueLogger.createSubLogger("objectStorage");
systemQueue
.on("waiting", (jobId) => systemLogger.debug(`waiting id=${jobId}`))
.on("active", (job) => systemLogger.debug(`active id=${job.id}`))
.on("completed", (job, result) =>
systemLogger.debug(`completed(${result}) id=${job.id}`),
)
.on("failed", (job, err) =>
systemLogger.warn(`failed(${err}) id=${job.id}`, {
job,
e: renderError(err),
}),
)
.on("error", (job: any, err: Error) =>
systemLogger.error(`error ${err}`, { job, e: renderError(err) }),
)
.on("stalled", (job) => systemLogger.warn(`stalled id=${job.id}`));
deliverQueue
.on("waiting", (jobId) => deliverLogger.debug(`waiting id=${jobId}`))
.on("active", (job) =>
deliverLogger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`),
)
.on("completed", (job, result) =>
deliverLogger.debug(
`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`,
),
)
.on("failed", (job, err) =>
deliverLogger.warn(`failed(${err}) ${getJobInfo(job)} to=${job.data.to}`),
)
.on("error", (job: any, err: Error) =>
deliverLogger.error(`error ${err}`, { job, e: renderError(err) }),
)
.on("stalled", (job) =>
deliverLogger.warn(`stalled ${getJobInfo(job)} to=${job.data.to}`),
);
inboxQueue
.on("waiting", (jobId) => inboxLogger.debug(`waiting id=${jobId}`))
.on("active", (job) => inboxLogger.debug(`active ${getJobInfo(job, true)}`))
.on("completed", (job, result) =>
inboxLogger.debug(`completed(${result}) ${getJobInfo(job, true)}`),
)
.on("failed", (job, err) =>
inboxLogger.warn(
`failed(${err}) ${getJobInfo(job)} activity=${
job.data.activity ? job.data.activity.id : "none"
}`,
{ job, e: renderError(err) },
),
)
.on("error", (job: any, err: Error) =>
inboxLogger.error(`error ${err}`, { job, e: renderError(err) }),
)
.on("stalled", (job) =>
inboxLogger.warn(
`stalled ${getJobInfo(job)} activity=${
job.data.activity ? job.data.activity.id : "none"
}`,
),
);
dbQueue
.on("waiting", (jobId) => dbLogger.debug(`waiting id=${jobId}`))
.on("active", (job) => dbLogger.debug(`active id=${job.id}`))
.on("completed", (job, result) =>
dbLogger.debug(`completed(${result}) id=${job.id}`),
)
.on("failed", (job, err) =>
dbLogger.warn(`failed(${err}) id=${job.id}`, { job, e: renderError(err) }),
)
.on("error", (job: any, err: Error) =>
dbLogger.error(`error ${err}`, { job, e: renderError(err) }),
)
.on("stalled", (job) => dbLogger.warn(`stalled id=${job.id}`));
objectStorageQueue
.on("waiting", (jobId) => objectStorageLogger.debug(`waiting id=${jobId}`))
.on("active", (job) => objectStorageLogger.debug(`active id=${job.id}`))
.on("completed", (job, result) =>
objectStorageLogger.debug(`completed(${result}) id=${job.id}`),
)
.on("failed", (job, err) =>
objectStorageLogger.warn(`failed(${err}) id=${job.id}`, {
job,
e: renderError(err),
}),
)
.on("error", (job: any, err: Error) =>
objectStorageLogger.error(`error ${err}`, { job, e: renderError(err) }),
)
.on("stalled", (job) => objectStorageLogger.warn(`stalled id=${job.id}`));
webhookDeliverQueue
.on("waiting", (jobId) => webhookLogger.debug(`waiting id=${jobId}`))
.on("active", (job) =>
webhookLogger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`),
)
.on("completed", (job, result) =>
webhookLogger.debug(
`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`,
),
)
.on("failed", (job, err) =>
webhookLogger.warn(`failed(${err}) ${getJobInfo(job)} to=${job.data.to}`),
)
.on("error", (job: any, err: Error) =>
webhookLogger.error(`error ${err}`, { job, e: renderError(err) }),
)
.on("stalled", (job) =>
webhookLogger.warn(`stalled ${getJobInfo(job)} to=${job.data.to}`),
);
export function deliver(user: ThinUser, content: unknown, to: string | null) {
if (content == null) return null;
if (to == null) return null;
const data = {
user: {
id: user.id,
},
content,
to,
};
return deliverQueue.add(data, {
attempts: config.deliverJobMaxAttempts || 12,
timeout: 1 * 60 * 1000, // 1min
backoff: {
type: "apBackoff",
},
removeOnComplete: true,
removeOnFail: true,
});
}
export function inbox(
activity: IActivity,
signature: httpSignature.IParsedSignature,
) {
const data = {
activity: activity,
signature,
};
return inboxQueue.add(data, {
attempts: config.inboxJobMaxAttempts || 8,
timeout: 5 * 60 * 1000, // 5min
backoff: {
type: "apBackoff",
},
removeOnComplete: true,
removeOnFail: true,
});
}
export function createDeleteDriveFilesJob(user: ThinUser) {
return dbQueue.add(
"deleteDriveFiles",
{
user: user,
},
{
removeOnComplete: true,
removeOnFail: true,
},
);
}
export function createExportCustomEmojisJob(user: ThinUser) {
return dbQueue.add(
"exportCustomEmojis",
{
user: user,
},
{
removeOnComplete: true,
removeOnFail: true,
},
);
}
export function createExportNotesJob(user: ThinUser) {
return dbQueue.add(
"exportNotes",
{
user: user,
},
{
removeOnComplete: true,
removeOnFail: true,
},
);
}
export function createExportFollowingJob(
user: ThinUser,
excludeMuting = false,
excludeInactive = false,
) {
return dbQueue.add(
"exportFollowing",
{
user: user,
excludeMuting,
excludeInactive,
},
{
removeOnComplete: true,
removeOnFail: true,
},
);
}
export function createExportMuteJob(user: ThinUser) {
return dbQueue.add(
"exportMute",
{
user: user,
},
{
removeOnComplete: true,
removeOnFail: true,
},
);
}
export function createExportBlockingJob(user: ThinUser) {
return dbQueue.add(
"exportBlocking",
{
user: user,
},
{
removeOnComplete: true,
removeOnFail: true,
},
);
}
export function createExportUserListsJob(user: ThinUser) {
return dbQueue.add(
"exportUserLists",
{
user: user,
},
{
removeOnComplete: true,
removeOnFail: true,
},
);
}
export function createImportFollowingJob(
user: ThinUser,
fileId: DriveFile["id"],
) {
return dbQueue.add(
"importFollowing",
{
user: user,
fileId: fileId,
},
{
removeOnComplete: true,
removeOnFail: true,
},
);
}
export function createImportPostsJob(
user: ThinUser,
fileId: DriveFile["id"],
signatureCheck: boolean,
) {
return dbQueue.add(
"importPosts",
{
user: user,
fileId: fileId,
signatureCheck: signatureCheck,
},
{
removeOnComplete: true,
removeOnFail: true,
},
);
}
export function createImportMastoPostJob(
user: ThinUser,
post: any,
signatureCheck: boolean,
) {
return dbQueue.add(
"importMastoPost",
{
user: user,
post: post,
signatureCheck: signatureCheck,
},
{
removeOnComplete: true,
removeOnFail: true,
attempts: config.inboxJobMaxAttempts || 8,
timeout: 60 * 1000, // 1min
},
);
}
export function createImportCkPostJob(
user: ThinUser,
post: any,
signatureCheck: boolean,
) {
return dbQueue.add(
"importCkPost",
{
user: user,
post: post,
signatureCheck: signatureCheck,
},
{
removeOnComplete: true,
removeOnFail: true,
},
);
}
export function createImportMutingJob(user: ThinUser, fileId: DriveFile["id"]) {
return dbQueue.add(
"importMuting",
{
user: user,
fileId: fileId,
},
{
removeOnComplete: true,
removeOnFail: true,
},
);
}
export function createImportBlockingJob(
user: ThinUser,
fileId: DriveFile["id"],
) {
return dbQueue.add(
"importBlocking",
{
user: user,
fileId: fileId,
},
{
removeOnComplete: true,
removeOnFail: true,
},
);
}
export function createImportUserListsJob(
user: ThinUser,
fileId: DriveFile["id"],
) {
return dbQueue.add(
"importUserLists",
{
user: user,
fileId: fileId,
},
{
removeOnComplete: true,
removeOnFail: true,
},
);
}
export function createImportCustomEmojisJob(
user: ThinUser,
fileId: DriveFile["id"],
) {
return dbQueue.add(
"importCustomEmojis",
{
user: user,
fileId: fileId,
},
{
removeOnComplete: true,
removeOnFail: true,
},
);
}
export function createDeleteAccountJob(
user: ThinUser,
opts: { soft?: boolean } = {},
) {
return dbQueue.add(
"deleteAccount",
{
user: user,
soft: opts.soft,
},
{
removeOnComplete: true,
removeOnFail: true,
},
);
}
export function createDeleteObjectStorageFileJob(key: string) {
return objectStorageQueue.add(
"deleteFile",
{
key: key,
},
{
removeOnComplete: true,
removeOnFail: true,
},
);
}
export function createCleanRemoteFilesJob() {
return objectStorageQueue.add(
"cleanRemoteFiles",
{},
{
removeOnComplete: true,
removeOnFail: true,
},
);
}
export function createIndexAllNotesJob(data = {}) {
return backgroundQueue.add("indexAllNotes", data, {
removeOnComplete: true,
removeOnFail: false,
timeout: 1000 * 60 * 60 * 24,
});
}
export function webhookDeliver(
webhook: Webhook,
type: typeof webhookEventTypes[number],
content: unknown,
) {
const data = {
type,
content,
webhookId: webhook.id,
userId: webhook.userId,
to: webhook.url,
secret: webhook.secret,
createdAt: Date.now(),
eventId: uuid(),
};
return webhookDeliverQueue.add(data, {
attempts: 4,
timeout: 1 * 60 * 1000, // 1min
backoff: {
type: "apBackoff",
},
removeOnComplete: true,
removeOnFail: true,
});
}
export default function () {
if (envOption.onlyServer) return;
deliverQueue.process(config.deliverJobConcurrency || 128, processDeliver);
inboxQueue.process(config.inboxJobConcurrency || 16, processInbox);
endedPollNotificationQueue.process(endedPollNotification);
webhookDeliverQueue.process(64, processWebhookDeliver);
processDb(dbQueue);
if (config.mediaCleanup?.cron) {
objectStorageQueue.add(
"cleanRemoteFiles",
{},
{
repeat: {
cron: "0 0 * * *",
},
removeOnComplete: true,
removeOnFail: true,
},
);
}
processObjectStorage(objectStorageQueue);
processBackground(backgroundQueue);
systemQueue.add(
"tickCharts",
{},
{
repeat: { cron: "55 * * * *" },
removeOnComplete: true,
},
);
systemQueue.add(
"resyncCharts",
{},
{
repeat: { cron: "0 0 * * *" },
removeOnComplete: true,
},
);
systemQueue.add(
"cleanCharts",
{},
{
repeat: { cron: "0 0 * * *" },
removeOnComplete: true,
},
);
systemQueue.add(
"clean",
{},
{
repeat: { cron: "0 0 * * *" },
removeOnComplete: true,
},
);
systemQueue.add(
"checkExpiredMutings",
{},
{
repeat: { cron: "*/5 * * * *" },
removeOnComplete: true,
},
);
systemQueue.add(
"setLocalEmojiSizes",
{},
{ removeOnComplete: true, removeOnFail: true },
);
systemQueue.add(
"verifyLinks",
{},
{
repeat: { cron: "0 0 * * 0" },
removeOnComplete: true,
removeOnFail: true,
},
);
processSystemQueue(systemQueue);
}
export default async function () {
// initialize queue workers
await dbInit();
await objectStorageInit();
await systemInit();
await deliverInit();
await endedPollNotificationInit();
await inboxInit();
await webhookDeliverInit();
};
export function destroy() {
deliverQueue.once("cleaned", (jobs, status) => {
deliverLogger.succ(`Cleaned ${jobs.length} ${status} jobs`);
});
deliverQueue.clean(0, "delayed");
deliverQueue.clean(0, Infinity, "delayed");
inboxQueue.once("cleaned", (jobs, status) => {
inboxLogger.succ(`Cleaned ${jobs.length} ${status} jobs`);
});
inboxQueue.clean(0, "delayed");
inboxQueue.clean(0, Infinity, "delayed");
}

View File

@ -1,41 +0,0 @@
import Bull from "bull";
import config from "@/config/index.js";
export function initialize<T>(name: string, limitPerSec = -1) {
return new Bull<T>(name, {
redis: {
port: config.redis.port,
host: config.redis.host,
family: config.redis.family == null ? 0 : config.redis.family,
username: config.redis.user ?? "default",
password: config.redis.pass,
db: config.redis.db || 0,
tls: config.redis.tls,
},
prefix: config.redis.prefix ? `${config.redis.prefix}:queue` : "queue",
limiter:
limitPerSec > 0
? {
max: limitPerSec,
duration: 1000,
}
: undefined,
settings: {
stalledInterval: 60,
maxStalledCount: 2,
backoffStrategies: {
apBackoff,
},
},
});
}
// ref. https://github.com/misskey-dev/misskey/pull/7635#issue-971097019
function apBackoff(attemptsMade: number, err: Error) {
const baseDelay = 60 * 1000; // 1min
const maxBackoff = 8 * 60 * 60 * 1000; // 8hours
let backoff = (Math.pow(2, attemptsMade) - 1) * baseDelay;
backoff = Math.min(backoff, maxBackoff);
backoff += Math.round(backoff * Math.random() * 0.2);
return backoff;
}

View File

@ -1,3 +1,11 @@
import Logger from "@/services/logger.js";
export const queueLogger = new Logger("queue", "orange");
export function renderError(e: Error): any {
return {
stack: e.stack,
message: e.message,
name: e.name,
};
}

View File

@ -1,12 +0,0 @@
import type Bull from "bull";
import { noop } from "@/queue/processors/noop.js";
const jobs = {} as Record<string, Bull.ProcessCallbackFunction<Record<string, unknown>>>;
export default function (q: Bull.Queue) {
for (const [k, v] of Object.entries(jobs)) {
q.process(k, 16, v);
}
q.process(noop);
}

View File

@ -1,150 +0,0 @@
import type Bull from "bull";
import * as fs from "node:fs";
import AdmZip from "adm-zip";
import { queueLogger } from "../../logger.js";
import { createTempDir } from "@/misc/create-temp.js";
import { downloadUrl } from "@/misc/download-url.js";
import { DriveFiles, Emojis } from "@/models/index.js";
import type { DbUserImportJobData } from "@/queue/types.js";
import { addFile } from "@/services/drive/add-file.js";
import { genId } from "@/misc/gen-id.js";
import { db } from "@/db/postgre.js";
import probeImageSize from "probe-image-size";
import * as path from "path";
const logger = queueLogger.createSubLogger("import-custom-emojis");
// TODO: 名前衝突時の動作を選べるようにする
export async function importCustomEmojis(
job: Bull.Job<DbUserImportJobData>,
done: any,
): Promise<void> {
logger.info("Importing custom emojis ...");
const file = await DriveFiles.findOneBy({
id: job.data.fileId,
});
if (file == null) {
done();
return;
}
const [tempPath, cleanup] = await createTempDir();
logger.info(`Temp dir is ${tempPath}`);
const destPath = `${tempPath}/emojis.zip`;
try {
fs.writeFileSync(destPath, "", "binary");
await downloadUrl(file.url, destPath);
} catch (e) {
// TODO: 何度か再試行
if (e instanceof Error || typeof e === "string") {
logger.error(e);
}
throw e;
}
const outputPath = `${tempPath}/emojis`;
const unzipStream = fs.createReadStream(destPath);
const zip = new AdmZip(destPath);
zip.extractAllToAsync(outputPath, true, false, async (error) => {
if (error) throw error;
if (fs.existsSync(`${outputPath}/meta.json`)) {
logger.info("starting emoji import with metadata");
const metaRaw = fs.readFileSync(`${outputPath}/meta.json`, "utf-8");
const meta = JSON.parse(metaRaw);
for (const record of meta.emojis) {
if (!record.downloaded) continue;
const emojiInfo = record.emoji;
const emojiPath = `${outputPath}/${record.fileName}`;
await Emojis.delete({
name: emojiInfo.name,
});
const driveFile = await addFile({
user: null,
path: emojiPath,
name: record.fileName,
force: true,
});
const file = fs.createReadStream(emojiPath);
const size = await probeImageSize(file);
file.destroy();
await Emojis.insert({
id: genId(),
updatedAt: new Date(),
name: emojiInfo.name,
category: emojiInfo.category,
host: null,
aliases: emojiInfo.aliases,
originalUrl: driveFile.url,
publicUrl: driveFile.webpublicUrl ?? driveFile.url,
type: driveFile.webpublicType ?? driveFile.type,
license: emojiInfo.license,
width: size.width || null,
height: size.height || null,
}).then((x) => Emojis.findOneByOrFail(x.identifiers[0]));
}
} else {
logger.info("starting emoji import without metadata");
// Since we lack metadata, we import into a randomized category name instead
let categoryName = genId();
let containedEmojis = fs.readdirSync(outputPath);
// Filter out accidental JSON files
containedEmojis = containedEmojis.filter(
(emoji) => !emoji.match(/\.(json)$/i),
);
for (const emojiFilename of containedEmojis) {
// strip extension and get filename to use as name
const name = path.basename(emojiFilename, path.extname(emojiFilename));
const emojiPath = `${outputPath}/${emojiFilename}`;
logger.info(`importing ${name}`);
await Emojis.delete({
name: name,
});
const driveFile = await addFile({
user: null,
path: emojiPath,
name: path.basename(emojiFilename),
force: true,
});
const file = fs.createReadStream(emojiPath);
const size = await probeImageSize(file);
file.destroy();
logger.info(`emoji size: ${size.width}x${size.height}`);
await Emojis.insert({
id: genId(),
updatedAt: new Date(),
name: name,
category: categoryName,
host: null,
aliases: [],
originalUrl: driveFile.url,
publicUrl: driveFile.webpublicUrl ?? driveFile.url,
type: driveFile.webpublicType ?? driveFile.type,
license: null,
width: size.width || null,
height: size.height || null,
}).then((x) => Emojis.findOneByOrFail(x.identifiers[0]));
}
}
await db.queryResultCache!.remove(["meta_emojis"]);
cleanup();
logger.succ("Imported");
done();
});
logger.succ(`Unzipping to ${outputPath}`);
}

View File

@ -1,15 +0,0 @@
import * as Post from "@/misc/post.js";
import create from "@/services/note/create.js";
import { Users } from "@/models/index.js";
import type { DbUserImportMastoPostJobData } from "@/queue/types.js";
import { queueLogger } from "../../logger.js";
import type Bull from "bull";
const logger = queueLogger.createSubLogger("import-firefish-post");
export async function importCkPost(
job: Bull.Job<DbUserImportMastoPostJobData>,
done: any,
): Promise<void> {
done();
}

View File

@ -1,19 +0,0 @@
import create from "@/services/note/create.js";
import { Users } from "@/models/index.js";
import type { DbUserImportMastoPostJobData } from "@/queue/types.js";
import { queueLogger } from "../../logger.js";
import type Bull from "bull";
import { htmlToMfm } from "@/remote/activitypub/misc/html-to-mfm.js";
import { resolveNote } from "@/remote/activitypub/models/note.js";
import { Note } from "@/models/entities/note.js";
import { uploadFromUrl } from "@/services/drive/upload-from-url.js";
import type { DriveFile } from "@/models/entities/drive-file.js";
const logger = queueLogger.createSubLogger("import-masto-post");
export async function importMastoPost(
job: Bull.Job<DbUserImportMastoPostJobData>,
done: any,
): Promise<void> {
done();
}

View File

@ -1,76 +0,0 @@
import { downloadTextFile } from "@/misc/download-text-file.js";
import { processMastoNotes } from "@/misc/process-masto-notes.js";
import { Users, DriveFiles } from "@/models/index.js";
import type { DbUserImportPostsJobData } from "@/queue/types.js";
import { queueLogger } from "../../logger.js";
import type Bull from "bull";
import {
createImportCkPostJob,
createImportMastoPostJob,
} from "@/queue/index.js";
const logger = queueLogger.createSubLogger("import-posts");
export async function importPosts(
job: Bull.Job<DbUserImportPostsJobData>,
done: any,
): Promise<void> {
logger.info(`Importing posts of ${job.data.user.id} ...`);
const user = await Users.findOneBy({ id: job.data.user.id });
if (user == null) {
done();
return;
}
const file = await DriveFiles.findOneBy({
id: job.data.fileId,
});
if (file == null) {
done();
return;
}
if (file.name.endsWith("tar.gz") || file.name.endsWith("zip")) {
try {
logger.info("Reading Mastodon archive");
const outbox = await processMastoNotes(
file.name,
file.url,
job.data.user.id,
);
for (const post of outbox.orderedItems) {
createImportMastoPostJob(job.data.user, post, job.data.signatureCheck);
}
} catch (e) {
// handle error
logger.warn(`Failed reading Mastodon archive: ${e}`);
}
logger.succ("Mastodon archive imported");
done();
return;
}
const json = await downloadTextFile(file.url);
try {
const parsed = JSON.parse(json);
if (parsed instanceof Array) {
logger.info("Parsing key style posts");
for (const post of JSON.parse(json)) {
createImportCkPostJob(job.data.user, post, job.data.signatureCheck);
}
} else if (parsed instanceof Object) {
logger.info("Parsing animal style posts");
for (const post of parsed.orderedItems) {
createImportMastoPostJob(job.data.user, post, job.data.signatureCheck);
}
}
} catch (e) {
// handle error
logger.warn(`Error reading: ${e}`);
}
logger.succ("Imported");
done();
}

View File

@ -1,50 +0,0 @@
import type Bull from "bull";
import type { DbJobData } from "@/queue/types.js";
import { deleteDriveFiles } from "./delete-drive-files.js";
import { exportCustomEmojis } from "./export-custom-emojis.js";
import { exportNotes } from "./export-notes.js";
import { exportFollowing } from "./export-following.js";
import { exportMute } from "./export-mute.js";
import { exportBlocking } from "./export-blocking.js";
import { exportUserLists } from "./export-user-lists.js";
import { importFollowing } from "./import-following.js";
import { importUserLists } from "./import-user-lists.js";
import { deleteAccount } from "./delete-account.js";
import { importMuting } from "./import-muting.js";
import { importPosts } from "./import-posts.js";
import { importMastoPost } from "./import-masto-post.js";
import { importCkPost } from "./import-firefish-post.js";
import { importBlocking } from "./import-blocking.js";
import { importCustomEmojis } from "./import-custom-emojis.js";
import { noop } from "@/queue/processors/noop.js";
const jobs = {
deleteDriveFiles,
exportCustomEmojis,
exportNotes,
exportFollowing,
exportMute,
exportBlocking,
exportUserLists,
importFollowing,
importMuting,
importBlocking,
importUserLists,
importPosts,
importMastoPost,
importCkPost,
importCustomEmojis,
deleteAccount,
} as Record<
string,
| Bull.ProcessCallbackFunction<DbJobData>
| Bull.ProcessPromiseFunction<DbJobData>
>;
export default function (dbQueue: Bull.Queue<DbJobData>) {
for (const [k, v] of Object.entries(jobs)) {
dbQueue.process(k, v);
}
dbQueue.process(noop);
}

View File

@ -1,7 +0,0 @@
import Bull from "bull";
// Processor to be registered for jobs with __default__ (unnamed) handlers in queues that only have named handlers
// Prevents sporadic bogus jobs from clogging up the queues
export async function noop(job: Bull.Job): Promise<void> {
job.opts.removeOnComplete = true;
}

View File

@ -1,22 +0,0 @@
import type Bull from "bull";
import type { ObjectStorageJobData } from "@/queue/types.js";
import deleteFile from "./delete-file.js";
import cleanRemoteFiles from "./clean-remote-files.js";
import { noop } from "@/queue/processors/noop.js";
const jobs = {
deleteFile,
cleanRemoteFiles,
} as Record<
string,
| Bull.ProcessCallbackFunction<ObjectStorageJobData>
| Bull.ProcessPromiseFunction<ObjectStorageJobData>
>;
export default function (q: Bull.Queue) {
for (const [k, v] of Object.entries(jobs)) {
q.process(k, 16, v);
}
q.process(noop);
}

View File

@ -1,31 +0,0 @@
import type Bull from "bull";
import { tickCharts } from "./tick-charts.js";
import { resyncCharts } from "./resync-charts.js";
import { cleanCharts } from "./clean-charts.js";
import { checkExpiredMutings } from "./check-expired-mutings.js";
import { clean } from "./clean.js";
import { setLocalEmojiSizes } from "./local-emoji-size.js";
import { verifyLinks } from "./verify-links.js";
import { noop } from "@/queue/processors/noop.js";
const jobs = {
tickCharts,
resyncCharts,
cleanCharts,
checkExpiredMutings,
clean,
setLocalEmojiSizes,
verifyLinks,
} as Record<
string,
| Bull.ProcessCallbackFunction<Record<string, unknown>>
| Bull.ProcessPromiseFunction<Record<string, unknown>>
>;
export default function (dbQueue: Bull.Queue<Record<string, unknown>>) {
for (const [k, v] of Object.entries(jobs)) {
dbQueue.process(k, v);
}
dbQueue.process(noop);
}

View File

@ -1,42 +0,0 @@
import type Bull from "bull";
import { IsNull } from "typeorm";
import { Emojis } from "@/models/index.js";
import { queueLogger } from "../../logger.js";
import { getEmojiSize } from "@/misc/emoji-meta.js";
const logger = queueLogger.createSubLogger("local-emoji-size");
export async function setLocalEmojiSizes(
_job: Bull.Job<Record<string, unknown>>,
done: any,
): Promise<void> {
logger.info("Setting sizes of local emojis...");
const emojis = await Emojis.findBy([
{ host: IsNull(), width: IsNull(), height: IsNull() },
]);
logger.info(`${emojis.length} emojis need to be fetched.`);
for (let i = 0; i < emojis.length; i++) {
try {
const size = await getEmojiSize(emojis[i].publicUrl);
await Emojis.update(emojis[i].id, {
width: size.width || null,
height: size.height || null,
});
} catch (e) {
logger.error(
`Unable to set emoji size (${i + 1}/${emojis.length}): ${e}`,
);
/* skip if any error happens */
} finally {
// wait for 1sec so that this would not overwhelm the object storage.
await new Promise((resolve) => setTimeout(resolve, 1000));
if (i % 10 === 9) logger.succ(`fetched ${i + 1}/${emojis.length} emojis`);
}
}
logger.succ("Done.");
done();
}

View File

@ -1,41 +1,25 @@
import config from "@/config/index.js";
import { initialize as initializeQueue } from "./initialize.js";
import type {
DeliverJobData,
InboxJobData,
DbJobData,
ObjectStorageJobData,
EndedPollNotificationJobData,
WebhookDeliverJobData,
} from "./types.js";
import { dbQueue } from "./queues/db/index.js";
import { deliverQueue } from "./queues/deliver.js";
import { endedPollNotificationQueue } from "./queues/ended-poll-notification.js";
import { inboxQueue } from "./queues/inbox.js";
import { objectStorageQueue } from "./queues/object-storage/index.js";
import { systemQueue } from "./queues/system/index.js";
import { webhookDeliverQueue } from "./queues/webhook-deliver.js";
export const systemQueue = initializeQueue<Record<string, unknown>>("system");
export const endedPollNotificationQueue =
initializeQueue<EndedPollNotificationJobData>("endedPollNotification");
export const deliverQueue = initializeQueue<DeliverJobData>(
"deliver",
config.deliverJobPerSec || 128,
);
export const inboxQueue = initializeQueue<InboxJobData>(
"inbox",
config.inboxJobPerSec || 16,
);
export const dbQueue = initializeQueue<DbJobData>("db", 256);
export const objectStorageQueue =
initializeQueue<ObjectStorageJobData>("objectStorage");
export const webhookDeliverQueue = initializeQueue<WebhookDeliverJobData>(
"webhookDeliver",
64,
);
export const backgroundQueue = initializeQueue<Record<string, unknown>>("bg");
export { dbQueue } from "./queues/db/index.js";
export { deliverQueue } from "./queues/deliver.js";
export { endedPollNotificationQueue } from "./queues/ended-poll-notification.js";
export { inboxQueue } from "./queues/inbox.js";
export { objectStorageQueue } from "./queues/object-storage/index.js";
export { systemQueue } from "./queues/system/index.js";
export { webhookDeliverQueue } from "./queues/webhook-deliver.js";
export const queues = [
systemQueue,
endedPollNotificationQueue,
deliverQueue,
inboxQueue,
dbQueue,
deliverQueue,
endedPollNotificationQueue,
inboxQueue,
objectStorageQueue,
systemQueue,
webhookDeliverQueue,
backgroundQueue,
];

View File

@ -1,4 +1,3 @@
import type Bull from "bull";
import { queueLogger } from "../../logger.js";
import { DriveFiles, Notes, UserProfiles, Users } from "@/models/index.js";
import type { DbUserDeleteJobData } from "@/queue/types.js";
@ -8,16 +7,17 @@ import { MoreThan } from "typeorm";
import { deleteFileSync } from "@/services/drive/delete-file.js";
import { sendEmail } from "@/services/send-email.js";
import { publishInternalEvent } from "@/services/stream.js";
import { Job } from "bullmq";
const logger = queueLogger.createSubLogger("delete-account");
export async function deleteAccount(
job: Bull.Job<DbUserDeleteJobData>,
): Promise<string | void> {
job: Job<DbUserDeleteJobData>,
): Promise<string> {
logger.info(`Deleting account of ${job.data.user.id} ...`);
const user = await Users.findOneBy({ id: job.data.user.id });
if (!user) return;
if (!user) return "skip: User not found";
const isLocal = Users.isLocalUser(user);
{

View File

@ -1,23 +1,20 @@
import type Bull from "bull";
import { queueLogger } from "../../logger.js";
import { deleteFileSync } from "@/services/drive/delete-file.js";
import { Users, DriveFiles } from "@/models/index.js";
import { MoreThan } from "typeorm";
import type { DbUserJobData } from "@/queue/types.js";
import { Job } from "bullmq";
const logger = queueLogger.createSubLogger("delete-drive-files");
export async function deleteDriveFiles(
job: Bull.Job<DbUserJobData>,
done: any,
): Promise<void> {
job: Job<DbUserJobData>,
): Promise<string> {
logger.info(`Deleting drive files of ${job.data.user.id} ...`);
const user = await Users.findOneBy({ id: job.data.user.id });
if (user == null) {
done();
return;
return "skip: User not found";
}
let deletedCount = 0;
@ -36,7 +33,7 @@ export async function deleteDriveFiles(
});
if (files.length === 0) {
job.progress(100);
job.updateProgress(100);
break;
}
@ -51,11 +48,11 @@ export async function deleteDriveFiles(
userId: user.id,
});
job.progress(deletedCount / total);
job.updateProgress(deletedCount / total);
}
logger.succ(
`All drive files (${deletedCount}) of ${user.id} has been deleted.`,
);
done();
return "Success";
}

View File

@ -1,4 +1,3 @@
import type Bull from "bull";
import * as fs from "node:fs";
import { queueLogger } from "../../logger.js";
@ -9,19 +8,18 @@ import { createTemp } from "@/misc/create-temp.js";
import { Users, Blockings } from "@/models/index.js";
import { MoreThan } from "typeorm";
import type { DbUserJobData } from "@/queue/types.js";
import { Job } from "bullmq";
const logger = queueLogger.createSubLogger("export-blocking");
export async function exportBlocking(
job: Bull.Job<DbUserJobData>,
done: any,
): Promise<void> {
job: Job<DbUserJobData>,
): Promise<string> {
logger.info(`Exporting blocking of ${job.data.user.id} ...`);
const user = await Users.findOneBy({ id: job.data.user.id });
if (user == null) {
done();
return;
return "skip: User not found";
}
// Create temp file
@ -48,7 +46,7 @@ export async function exportBlocking(
});
if (blockings.length === 0) {
job.progress(100);
job.updateProgress(100);
break;
}
@ -79,7 +77,7 @@ export async function exportBlocking(
blockerId: user.id,
});
job.progress(exportedCount / total);
job.updateProgress(exportedCount / total);
}
stream.end();
@ -101,5 +99,5 @@ export async function exportBlocking(
cleanup();
}
done();
return "Success";
}

View File

@ -1,4 +1,3 @@
import type Bull from "bull";
import * as fs from "node:fs";
import { ulid } from "ulid";
@ -8,24 +7,23 @@ import { queueLogger } from "../../logger.js";
import { addFile } from "@/services/drive/add-file.js";
import { format as dateFormat } from "date-fns";
import { Users, Emojis } from "@/models/index.js";
import {} from "@/queue/types.js";
import { DbUserJobData } from "../../types.js";
import { createTemp, createTempDir } from "@/misc/create-temp.js";
import { downloadUrl } from "@/misc/download-url.js";
import config from "@/config/index.js";
import { IsNull } from "typeorm";
import { Job } from "bullmq";
const logger = queueLogger.createSubLogger("export-custom-emojis");
export async function exportCustomEmojis(
job: Bull.Job,
done: () => void,
): Promise<void> {
job: Job<DbUserJobData>,
): Promise<string> {
logger.info("Exporting custom emojis ...");
const user = await Users.findOneBy({ id: job.data.user.id });
if (user == null) {
done();
return;
return "skip: User not found";
}
const [path, cleanup] = await createTempDir();
@ -102,31 +100,39 @@ export async function exportCustomEmojis(
metaStream.end();
// Create archive
const [archivePath, archiveCleanup] = await createTemp();
const archiveStream = fs.createWriteStream(archivePath);
const archive = archiver("zip", {
zlib: { level: 0 },
});
archiveStream.on("close", async () => {
logger.succ(`Exported to: ${archivePath}`);
await new Promise(async (resolve, reject) => {
try {
const [archivePath, archiveCleanup] = await createTemp();
const archiveStream = fs.createWriteStream(archivePath);
const archive = archiver("zip", {
zlib: { level: 0 },
});
archiveStream.on("close", async () => {
logger.succ(`Exported to: ${archivePath}`);
const fileName = `custom-emojis-${dateFormat(
new Date(),
"yyyy-MM-dd-HH-mm-ss",
)}.zip`;
const driveFile = await addFile({
user,
path: archivePath,
name: fileName,
force: true,
});
const fileName = `custom-emojis-${dateFormat(
new Date(),
"yyyy-MM-dd-HH-mm-ss",
)}.zip`;
const driveFile = await addFile({
user,
path: archivePath,
name: fileName,
force: true,
});
logger.succ(`Exported to: ${driveFile.id}`);
cleanup();
archiveCleanup();
done();
logger.succ(`Exported to: ${driveFile.id}`);
cleanup();
archiveCleanup();
resolve(undefined);
});
archive.pipe(archiveStream);
archive.directory(path, false);
archive.finalize();
} catch (e) {
reject(e);
}
});
archive.pipe(archiveStream);
archive.directory(path, false);
archive.finalize();
return "Success";
}

View File

@ -1,4 +1,3 @@
import type Bull from "bull";
import * as fs from "node:fs";
import { queueLogger } from "../../logger.js";
@ -10,19 +9,18 @@ import { Users, Followings, Mutings } from "@/models/index.js";
import { In, MoreThan, Not } from "typeorm";
import type { DbUserJobData } from "@/queue/types.js";
import type { Following } from "@/models/entities/following.js";
import { Job } from "bullmq";
const logger = queueLogger.createSubLogger("export-following");
export async function exportFollowing(
job: Bull.Job<DbUserJobData>,
done: () => void,
): Promise<void> {
job: Job<DbUserJobData>,
): Promise<string> {
logger.info(`Exporting following of ${job.data.user.id} ...`);
const user = await Users.findOneBy({ id: job.data.user.id });
if (user == null) {
done();
return;
return "skip: User not found";
}
// Create temp file
@ -109,5 +107,5 @@ export async function exportFollowing(
cleanup();
}
done();
return "Success";
}

View File

@ -1,4 +1,3 @@
import type Bull from "bull";
import * as fs from "node:fs";
import { queueLogger } from "../../logger.js";
@ -9,19 +8,18 @@ import { createTemp } from "@/misc/create-temp.js";
import { Users, Mutings } from "@/models/index.js";
import { IsNull, MoreThan } from "typeorm";
import type { DbUserJobData } from "@/queue/types.js";
import { Job } from "bullmq";
const logger = queueLogger.createSubLogger("export-mute");
export async function exportMute(
job: Bull.Job<DbUserJobData>,
done: any,
): Promise<void> {
job: Job<DbUserJobData>,
): Promise<string> {
logger.info(`Exporting mute of ${job.data.user.id} ...`);
const user = await Users.findOneBy({ id: job.data.user.id });
if (user == null) {
done();
return;
return "skip: User not found";
}
// Create temp file
@ -49,7 +47,7 @@ export async function exportMute(
});
if (mutes.length === 0) {
job.progress(100);
job.updateProgress(100);
break;
}
@ -80,7 +78,7 @@ export async function exportMute(
muterId: user.id,
});
job.progress(exportedCount / total);
job.updateProgress(exportedCount / total);
}
stream.end();
@ -102,5 +100,5 @@ export async function exportMute(
cleanup();
}
done();
return "Success";
}

View File

@ -1,4 +1,3 @@
import type Bull from "bull";
import * as fs from "node:fs";
import { queueLogger } from "../../logger.js";
@ -10,19 +9,18 @@ import type { Note } from "@/models/entities/note.js";
import type { Poll } from "@/models/entities/poll.js";
import type { DbUserJobData } from "@/queue/types.js";
import { createTemp } from "@/misc/create-temp.js";
import { Job } from "bullmq";
const logger = queueLogger.createSubLogger("export-notes");
export async function exportNotes(
job: Bull.Job<DbUserJobData>,
done: any,
): Promise<void> {
job: Job<DbUserJobData>,
): Promise<string> {
logger.info(`Exporting notes of ${job.data.user.id} ...`);
const user = await Users.findOneBy({ id: job.data.user.id });
if (user == null) {
done();
return;
return "skip: User not found";
}
// Create temp file
@ -64,7 +62,7 @@ export async function exportNotes(
})) as Note[];
if (notes.length === 0) {
job.progress(100);
job.updateProgress(100);
break;
}
@ -85,7 +83,7 @@ export async function exportNotes(
userId: user.id,
});
job.progress(exportedNotesCount / total);
job.updateProgress(exportedNotesCount / total);
}
await write("]");
@ -109,7 +107,7 @@ export async function exportNotes(
cleanup();
}
done();
return "Success";
}
async function serialize(

View File

@ -1,4 +1,3 @@
import type Bull from "bull";
import * as fs from "node:fs";
import { queueLogger } from "../../logger.js";
@ -9,19 +8,18 @@ import { createTemp } from "@/misc/create-temp.js";
import { Users, UserLists, UserListJoinings } from "@/models/index.js";
import { In } from "typeorm";
import type { DbUserJobData } from "@/queue/types.js";
import { Job } from "bullmq";
const logger = queueLogger.createSubLogger("export-user-lists");
export async function exportUserLists(
job: Bull.Job<DbUserJobData>,
done: any,
): Promise<void> {
job: Job<DbUserJobData>,
): Promise<string> {
logger.info(`Exporting user lists of ${job.data.user.id} ...`);
const user = await Users.findOneBy({ id: job.data.user.id });
if (user == null) {
done();
return;
return "skip: User not found";
}
const lists = await UserLists.findBy({
@ -77,5 +75,5 @@ export async function exportUserLists(
cleanup();
}
done();
return "Success";
}

View File

@ -1,5 +1,3 @@
import type Bull from "bull";
import { queueLogger } from "../../logger.js";
import * as Acct from "@/misc/acct.js";
import { resolveUser } from "@/remote/resolve-user.js";
@ -9,27 +7,25 @@ import { Users, DriveFiles, Blockings } from "@/models/index.js";
import type { DbUserImportJobData } from "@/queue/types.js";
import block from "@/services/blocking/create.js";
import { IsNull } from "typeorm";
import { Job } from "bullmq";
const logger = queueLogger.createSubLogger("import-blocking");
export async function importBlocking(
job: Bull.Job<DbUserImportJobData>,
done: any,
): Promise<void> {
job: Job<DbUserImportJobData>,
): Promise<string> {
logger.info(`Importing blocking of ${job.data.user.id} ...`);
const user = await Users.findOneBy({ id: job.data.user.id });
if (user == null) {
done();
return;
return "skip: User not found";
}
const file = await DriveFiles.findOneBy({
id: job.data.fileId,
});
if (file == null) {
done();
return;
return "skip: File not found";
}
const csv = await downloadTextFile(file.url);
@ -75,5 +71,5 @@ export async function importBlocking(
}
logger.succ("Imported");
done();
return "Success";
}

View File

@ -0,0 +1,153 @@
import * as fs from "node:fs";
import AdmZip from "adm-zip";
import { queueLogger } from "../../logger.js";
import { createTempDir } from "@/misc/create-temp.js";
import { downloadUrl } from "@/misc/download-url.js";
import { DriveFiles, Emojis } from "@/models/index.js";
import type { DbUserImportJobData } from "@/queue/types.js";
import { addFile } from "@/services/drive/add-file.js";
import { genId } from "@/misc/gen-id.js";
import { db } from "@/db/postgre.js";
import probeImageSize from "probe-image-size";
import * as path from "path";
import { Job } from "bullmq";
const logger = queueLogger.createSubLogger("import-custom-emojis");
// TODO: 名前衝突時の動作を選べるようにする
export async function importCustomEmojis(
job: Job<DbUserImportJobData>,
): Promise<string> {
logger.info("Importing custom emojis ...");
const file = await DriveFiles.findOneBy({
id: job.data.fileId,
});
if (file == null) {
return "skip: File not found";
}
const [tempPath, cleanup] = await createTempDir();
logger.info(`Temp dir is ${tempPath}`);
const destPath = `${tempPath}/emojis.zip`;
try {
fs.writeFileSync(destPath, "", "binary");
await downloadUrl(file.url, destPath);
} catch (e) {
// TODO: 何度か再試行
if (e instanceof Error || typeof e === "string") {
logger.error(e);
}
throw e;
}
const outputPath = `${tempPath}/emojis`;
const unzipStream = fs.createReadStream(destPath);
const zip = new AdmZip(destPath);
logger.succ(`Unzipping to ${outputPath}`);
await new Promise((resolve, reject) => {
zip.extractAllToAsync(outputPath, true, false, async (error) => {
if (error) reject(error);
if (fs.existsSync(`${outputPath}/meta.json`)) {
logger.info("starting emoji import with metadata");
const metaRaw = fs.readFileSync(`${outputPath}/meta.json`, "utf-8");
const meta = JSON.parse(metaRaw);
for (const record of meta.emojis) {
if (!record.downloaded) continue;
const emojiInfo = record.emoji;
const emojiPath = `${outputPath}/${record.fileName}`;
await Emojis.delete({
name: emojiInfo.name,
});
const driveFile = await addFile({
user: null,
path: emojiPath,
name: record.fileName,
force: true,
});
const file = fs.createReadStream(emojiPath);
const size = await probeImageSize(file);
file.destroy();
await Emojis.insert({
id: genId(),
updatedAt: new Date(),
name: emojiInfo.name,
category: emojiInfo.category,
host: null,
aliases: emojiInfo.aliases,
originalUrl: driveFile.url,
publicUrl: driveFile.webpublicUrl ?? driveFile.url,
type: driveFile.webpublicType ?? driveFile.type,
license: emojiInfo.license,
width: size.width || null,
height: size.height || null,
}).then((x) => Emojis.findOneByOrFail(x.identifiers[0]));
}
} else {
logger.info("starting emoji import without metadata");
// Since we lack metadata, we import into a randomized category name instead
let categoryName = genId();
let containedEmojis = fs.readdirSync(outputPath);
// Filter out accidental JSON files
containedEmojis = containedEmojis.filter(
(emoji) => !emoji.match(/\.(json)$/i),
);
for (const emojiFilename of containedEmojis) {
// strip extension and get filename to use as name
const name = path.basename(emojiFilename, path.extname(emojiFilename));
const emojiPath = `${outputPath}/${emojiFilename}`;
logger.info(`importing ${name}`);
await Emojis.delete({
name: name,
});
const driveFile = await addFile({
user: null,
path: emojiPath,
name: path.basename(emojiFilename),
force: true,
});
const file = fs.createReadStream(emojiPath);
const size = await probeImageSize(file);
file.destroy();
logger.info(`emoji size: ${size.width}x${size.height}`);
await Emojis.insert({
id: genId(),
updatedAt: new Date(),
name: name,
category: categoryName,
host: null,
aliases: [],
originalUrl: driveFile.url,
publicUrl: driveFile.webpublicUrl ?? driveFile.url,
type: driveFile.webpublicType ?? driveFile.type,
license: null,
width: size.width || null,
height: size.height || null,
}).then((x) => Emojis.findOneByOrFail(x.identifiers[0]));
}
}
await db.queryResultCache!.remove(["meta_emojis"]);
cleanup();
logger.succ("Imported");
resolve(undefined);
});
});
return "Success";
}

View File

@ -8,29 +8,26 @@ import { isSelfHost, toPuny } from "@/misc/convert-host.js";
import { Users, DriveFiles } from "@/models/index.js";
import type { DbUserImportJobData } from "@/queue/types.js";
import { queueLogger } from "../../logger.js";
import type Bull from "bull";
import { cache as heuristic } from "@/server/api/common/generate-following-query.js";
import { Job } from "bullmq";
const logger = queueLogger.createSubLogger("import-following");
export async function importFollowing(
job: Bull.Job<DbUserImportJobData>,
done: any,
): Promise<void> {
job: Job<DbUserImportJobData>,
): Promise<string> {
logger.info(`Importing following of ${job.data.user.id} ...`);
const user = await Users.findOneBy({ id: job.data.user.id });
if (user == null) {
done();
return;
return "skip: User not found";
}
const file = await DriveFiles.findOneBy({
id: job.data.fileId,
});
if (file == null) {
done();
return;
return "skip: File not found";
}
const csv = await downloadTextFile(file.url);
@ -114,5 +111,5 @@ export async function importFollowing(
await heuristic.delete(user.id);
logger.succ("Imported");
done();
return "Success";
}

View File

@ -1,5 +1,3 @@
import type Bull from "bull";
import { queueLogger } from "../../logger.js";
import * as Acct from "@/misc/acct.js";
import { resolveUser } from "@/remote/resolve-user.js";
@ -10,27 +8,25 @@ import type { DbUserImportJobData } from "@/queue/types.js";
import type { User } from "@/models/entities/user.js";
import { genId } from "@/misc/gen-id.js";
import { IsNull } from "typeorm";
import { Job } from "bullmq";
const logger = queueLogger.createSubLogger("import-muting");
export async function importMuting(
job: Bull.Job<DbUserImportJobData>,
done: any,
): Promise<void> {
job: Job<DbUserImportJobData>,
): Promise<string> {
logger.info(`Importing muting of ${job.data.user.id} ...`);
const user = await Users.findOneBy({ id: job.data.user.id });
if (user == null) {
done();
return;
return "skip: User not found";
}
const file = await DriveFiles.findOneBy({
id: job.data.fileId,
});
if (file == null) {
done();
return;
return "skip: File not found";
}
const csv = await downloadTextFile(file.url);
@ -76,7 +72,7 @@ export async function importMuting(
}
logger.succ("Imported");
done();
return "Success";
}
async function mute(user: User, target: User) {

View File

@ -1,5 +1,3 @@
import type Bull from "bull";
import { queueLogger } from "../../logger.js";
import * as Acct from "@/misc/acct.js";
import { resolveUser } from "@/remote/resolve-user.js";
@ -15,27 +13,25 @@ import {
import { genId } from "@/misc/gen-id.js";
import type { DbUserImportJobData } from "@/queue/types.js";
import { IsNull } from "typeorm";
import { Job } from "bullmq";
const logger = queueLogger.createSubLogger("import-user-lists");
export async function importUserLists(
job: Bull.Job<DbUserImportJobData>,
done: any,
): Promise<void> {
job: Job<DbUserImportJobData>,
): Promise<string> {
logger.info(`Importing user lists of ${job.data.user.id} ...`);
const user = await Users.findOneBy({ id: job.data.user.id });
if (user == null) {
done();
return;
return "skip: User not found";
}
const file = await DriveFiles.findOneBy({
id: job.data.fileId,
});
if (file == null) {
done();
return;
return "skip: File not found";
}
const csv = await downloadTextFile(file.url);
@ -107,5 +103,5 @@ export async function importUserLists(
}
logger.succ("Imported");
done();
return "Success";
}

View File

@ -0,0 +1,278 @@
import { Job, Processor } from "bullmq";
import { deleteAccount } from "./delete-account.js";
import { deleteDriveFiles } from "./delete-drive-files.js";
import { exportBlocking } from "./export-blocking.js";
import { exportCustomEmojis } from "./export-custom-emojis.js";
import { exportFollowing } from "./export-following.js";
import { exportMute } from "./export-mute.js";
import { exportNotes } from "./export-notes.js";
import { exportUserLists } from "./export-user-lists.js";
import { importBlocking } from "./import-blocking.js";
import { importCustomEmojis } from "./import-custom-emojis.js";
import { importFollowing } from "./import-following.js";
import { importMuting } from "./import-muting.js";
import { importUserLists } from "./import-user-lists.js";
import { createQueue } from "../index.js";
import { ThinUser } from "@/queue/types.js";
import { DriveFile } from "@/models/entities/drive-file.js";
import config from "@/config/index.js";
const processors = {
deleteAccount,
deleteDriveFiles,
exportBlocking,
exportCustomEmojis,
exportFollowing,
exportMute,
exportNotes,
exportUserLists,
importBlocking,
importCustomEmojis,
importFollowing,
importMuting,
importUserLists,
} as Record<string, Processor>;
async function process(job: Job<any>): Promise<string> {
const processor = processors[job.name];
if (processor === undefined) return "skip: unknown job name";
return await processor(job);
}
export const [dbQueue, dbInit] =
createQueue("db", process, { limitPerSec: 256, concurrency: 16 });
export function createDeleteDriveFilesJob(user: ThinUser) {
return dbQueue.add(
"deleteDriveFiles",
{
user: user,
},
{
removeOnComplete: true,
removeOnFail: true,
},
);
}
export function createExportCustomEmojisJob(user: ThinUser) {
return dbQueue.add(
"exportCustomEmojis",
{
user: user,
},
{
removeOnComplete: true,
removeOnFail: true,
},
);
}
export function createExportNotesJob(user: ThinUser) {
return dbQueue.add(
"exportNotes",
{
user: user,
},
{
removeOnComplete: true,
removeOnFail: true,
},
);
}
export function createExportFollowingJob(
user: ThinUser,
excludeMuting = false,
excludeInactive = false,
) {
return dbQueue.add(
"exportFollowing",
{
user: user,
excludeMuting,
excludeInactive,
},
{
removeOnComplete: true,
removeOnFail: true,
},
);
}
export function createExportMuteJob(user: ThinUser) {
return dbQueue.add(
"exportMute",
{
user: user,
},
{
removeOnComplete: true,
removeOnFail: true,
},
);
}
export function createExportBlockingJob(user: ThinUser) {
return dbQueue.add(
"exportBlocking",
{
user: user,
},
{
removeOnComplete: true,
removeOnFail: true,
},
);
}
export function createExportUserListsJob(user: ThinUser) {
return dbQueue.add(
"exportUserLists",
{
user: user,
},
{
removeOnComplete: true,
removeOnFail: true,
},
);
}
export function createImportFollowingJob(
user: ThinUser,
fileId: DriveFile["id"],
) {
return dbQueue.add(
"importFollowing",
{
user: user,
fileId: fileId,
},
{
removeOnComplete: true,
removeOnFail: true,
},
);
}
export function createImportMastoPostJob(
user: ThinUser,
post: any,
signatureCheck: boolean,
) {
return dbQueue.add(
"importMastoPost",
{
user: user,
post: post,
signatureCheck: signatureCheck,
},
{
removeOnComplete: true,
removeOnFail: true,
attempts: config.inboxJobMaxAttempts || 8,
},
);
}
export function createImportCkPostJob(
user: ThinUser,
post: any,
signatureCheck: boolean,
) {
return dbQueue.add(
"importCkPost",
{
user: user,
post: post,
signatureCheck: signatureCheck,
},
{
removeOnComplete: true,
removeOnFail: true,
},
);
}
export function createImportMutingJob(user: ThinUser, fileId: DriveFile["id"]) {
return dbQueue.add(
"importMuting",
{
user: user,
fileId: fileId,
},
{
removeOnComplete: true,
removeOnFail: true,
},
);
}
export function createImportBlockingJob(
user: ThinUser,
fileId: DriveFile["id"],
) {
return dbQueue.add(
"importBlocking",
{
user: user,
fileId: fileId,
},
{
removeOnComplete: true,
removeOnFail: true,
},
);
}
export function createImportUserListsJob(
user: ThinUser,
fileId: DriveFile["id"],
) {
return dbQueue.add(
"importUserLists",
{
user: user,
fileId: fileId,
},
{
removeOnComplete: true,
removeOnFail: true,
},
);
}
export function createImportCustomEmojisJob(
user: ThinUser,
fileId: DriveFile["id"],
) {
return dbQueue.add(
"importCustomEmojis",
{
user: user,
fileId: fileId,
},
{
removeOnComplete: true,
removeOnFail: true,
},
);
}
export function createDeleteAccountJob(
user: ThinUser,
opts: { soft?: boolean } = {},
) {
return dbQueue.add(
"deleteAccount",
{
user: user,
soft: opts.soft,
},
{
removeOnComplete: true,
removeOnFail: true,
},
);
}

View File

@ -12,14 +12,17 @@ import { fetchInstanceMetadata } from "@/services/fetch-instance-metadata.js";
import { toPuny } from "@/misc/convert-host.js";
import { StatusError } from "@/misc/fetch.js";
import { shouldSkipInstance } from "@/misc/skipped-instances.js";
import type { DeliverJobData } from "@/queue/types.js";
import type Bull from "bull";
import type { DeliverJobData } from "../types.js";
import config from "@/config/index.js";
import { createQueue, processorTimeout } from "./index.js";
import { ThinUser } from "../types.js";
import { Job } from "bullmq";
const logger = new Logger("deliver");
export const deliverLogger = new Logger("deliver");
let latest: string | null = null;
export default async (job: Bull.Job<DeliverJobData>) => {
async function process(job: Job<DeliverJobData>) {
if (job.data == null || Object.keys(job.data).length === 0) {
job.opts.removeOnComplete = true;
return "Skip (data was null or empty)";
@ -31,7 +34,7 @@ export default async (job: Bull.Job<DeliverJobData>) => {
try {
if (latest !== (latest = JSON.stringify(job.data.content, null, 2))) {
logger.debug(`delivering ${latest}`);
deliverLogger.debug(`delivering ${latest}`);
}
await request(job.data.user, job.data.to, job.data.content);
@ -83,3 +86,34 @@ export default async (job: Bull.Job<DeliverJobData>) => {
}
}
};
export const [deliverQueue, deliverInit] = createQueue(
"deliver",
processorTimeout(process, 60),
{
limitPerSec: config.deliverJobPerSec || 128,
concurrency: config.deliverJobConcurrency || 128,
},
);
export function deliverJob(user: ThinUser, content: unknown, to: string | null) {
if (content == null) return null;
if (to == null) return null;
const data = {
user: {
id: user.id,
},
content,
to,
};
return deliverQueue.add("default", data, {
attempts: config.deliverJobMaxAttempts || 12,
backoff: {
type: "custom",
},
removeOnComplete: true,
removeOnFail: true,
});
}

View File

@ -1,25 +1,18 @@
import type Bull from "bull";
import { Notes, PollVotes } from "@/models/index.js";
import { queueLogger } from "../logger.js";
import type { EndedPollNotificationJobData } from "@/queue/types.js";
import { createNotification } from "@/services/create-notification.js";
import { deliverQuestionUpdate } from "@/services/note/polls/update.js";
import { Job } from "bullmq";
import { createQueue } from "./index.js";
const logger = queueLogger.createSubLogger("ended-poll-notification");
export async function endedPollNotification(
job: Bull.Job<EndedPollNotificationJobData>,
done: any,
): Promise<void> {
async function process(job: Job<EndedPollNotificationJobData>): Promise<string> {
if (job.data == null || Object.keys(job.data).length === 0) {
job.opts.removeOnComplete = true;
done();
return;
return "skip: corrupt job";
}
const note = await Notes.findOneBy({ id: job.data.noteId });
if (note == null || !note.hasPoll) {
done();
return;
return "skip: note not found";
}
const votes = await PollVotes.createQueryBuilder("vote")
@ -41,5 +34,8 @@ export async function endedPollNotification(
// Broadcast the poll result once it ends
if (!note.localOnly) await deliverQuestionUpdate(note.id);
done();
return "complete"
}
export const [endedPollNotificationQueue, endedPollNotificationInit] =
createQueue("endedPollNotification", process);

View File

@ -1,5 +1,4 @@
import { URL } from "node:url";
import type Bull from "bull";
import httpSignature from "@peertube/http-signature";
import perform from "@/remote/activitypub/perform.js";
import Logger from "@/services/logger.js";
@ -12,7 +11,7 @@ import {
} from "@/services/chart/index.js";
import { fetchMeta } from "@/misc/fetch-meta.js";
import { toPuny, extractDbHost } from "@/misc/convert-host.js";
import { getApId } from "@/remote/activitypub/type.js";
import { IActivity, getApId } from "@/remote/activitypub/type.js";
import { fetchInstanceMetadata } from "@/services/fetch-instance-metadata.js";
import type { InboxJobData } from "../types.js";
import DbResolver from "@/remote/activitypub/db-resolver.js";
@ -23,11 +22,14 @@ import type { CacheableRemoteUser } from "@/models/entities/user.js";
import type { UserPublickey } from "@/models/entities/user-publickey.js";
import { shouldBlockInstance } from "@/misc/should-block-instance.js";
import { verifySignature } from "@/remote/activitypub/check-fetch.js";
import { Job } from "bullmq";
import { createQueue, processorTimeout } from "./index.js";
import config from "@/config/index.js";
const logger = new Logger("inbox");
export const inboxLogger = new Logger("inbox");
// Processing when an activity arrives in the user's inbox
export default async (job: Bull.Job<InboxJobData>): Promise<string> => {
async function process(job: Job<InboxJobData>): Promise<string> {
if (job.data == null || Object.keys(job.data).length === 0) {
job.opts.removeOnComplete = true;
return "Skip (data was null or empty)";
@ -38,12 +40,10 @@ export default async (job: Bull.Job<InboxJobData>): Promise<string> => {
//#region Log
const info = Object.assign({}, activity) as any;
info["@context"] = undefined;
logger.debug(JSON.stringify(info, null, 2));
inboxLogger.debug(JSON.stringify(info, null, 2));
if (!signature?.keyId) {
const err = `Invalid signature: ${signature}`;
job.moveToFailed({ message: err });
return err;
throw new Error(`Invalid signature: ${signature}`);
}
//#endregion
const host = toPuny(new URL(signature.keyId).hostname);
@ -167,7 +167,7 @@ export default async (job: Bull.Job<InboxJobData>): Promise<string> => {
}
// ブロックしてたら中断
const ldHost = extractDbHost(authUser.user.uri);
const ldHost = extractDbHost(authUser.user.uri!);
if (await shouldBlockInstance(ldHost, meta)) {
return `Blocked request: ${ldHost}`;
}
@ -225,3 +225,31 @@ export default async (job: Bull.Job<InboxJobData>): Promise<string> => {
await perform(authUser.user, activity);
return "ok";
};
export const [inboxQueue, inboxInit] = createQueue(
"inbox",
processorTimeout(process, 5 * 60),
{
limitPerSec: config.inboxJobPerSec || 16,
concurrency: config.inboxJobConcurrency || 16,
},
);
export function inboxJob(
activity: IActivity,
signature: httpSignature.IParsedSignature,
) {
const data = {
activity: activity,
signature,
};
return inboxQueue.add("default", data, {
attempts: config.inboxJobMaxAttempts || 8,
backoff: {
type: "custom",
},
removeOnComplete: true,
removeOnFail: true,
});
}

View File

@ -0,0 +1,98 @@
import { Worker, Queue, BackoffStrategy, Processor, Job } from "bullmq";
import config from "@/config/index.js";
import { queueLogger, renderError } from "../logger.js";
const connectionDetails = {
port: config.redis.port,
host: config.redis.host,
family: config.redis.family == null ? 0 : config.redis.family,
username: config.redis.user ?? "default",
password: config.redis.pass,
db: config.redis.db || 0,
tls: config.redis.tls,
lazyConnect: true,
maxRetriesPerRequest: null,
};
// ref. https://github.com/misskey-dev/misskey/pull/7635#issue-971097019
function apBackoff(attemptsMade: number) {
const baseDelay = 60 * 1000; // 1min
const maxBackoff = 8 * 60 * 60 * 1000; // 8hours
let backoff = (Math.pow(2, attemptsMade) - 1) * baseDelay;
backoff = Math.min(backoff, maxBackoff);
backoff += Math.round(backoff * Math.random() * 0.2);
return backoff;
}
export function createQueue<D>(
name: string,
processor: Processor<D>,
opts: {
backoff?: BackoffStrategy,
limitPerSec?: number,
concurrency?: number,
} = {}
): [Queue<D>, () => Promise<void>] {
const backoffStrategy = opts.backoff || apBackoff;
const sublogger = queueLogger.createSubLogger(name);
const queue = new Queue<D>(
name,
{
connection: connectionDetails,
prefix: config.redis.prefix,
},
);
queue.on("waiting", (job) => sublogger.debug(`waiting id=${job.id}`));
const initWorker = () => {
return new Worker<D>(
name,
processor,
{
connection: connectionDetails,
prefix: config.redis.prefix,
settings: {
backoffStrategy,
},
limiter: (opts.limitPerSec === undefined || opts.limitPerSec === -1)
? undefined : {
max: opts.limitPerSec,
duration: 1000,
},
concurrency: opts.concurrency || 1,
},
)
.on("ready", () => {})
.on("active", (job) => sublogger.debug(`active id=${job.id}`))
.on("completed", (job) => sublogger.debug(`completed id=${job.id}`))
.on("failed", (job, err) =>
sublogger.warn(`failed(${err}) id=${job?.id}`, {
job,
e: renderError(err),
}),
)
.on("error", (err) =>
sublogger.error(`error(${err})`, { e: renderError(err) }),
)
.on("stalled", (jobId) =>
sublogger.warn(`stalled id=${jobId}`),
)
.waitUntilReady()
.then(() => {});
}
return [queue, initWorker];
}
export function processorTimeout<P extends Processor>(processor: P, timeout: number) {
return (job: Job) => {
return new Promise((resolve, reject) => {
const timer = setTimeout(() => reject(new Error("timeout reached")), timeout * 1000);
processor(job)
.then(resolve)
.catch(reject)
.finally(() => clearTimeout(timer));
});
};
}

View File

@ -1,18 +1,15 @@
import type Bull from "bull";
import { queueLogger } from "../../logger.js";
import { deleteFileSync } from "@/services/drive/delete-file.js";
import { DriveFiles, Users } from "@/models/index.js";
import { MoreThan, Not, IsNull } from "typeorm";
import { DriveFiles } from "@/models/index.js";
import { User } from "@/models/entities/user.js";
import config from "@/config/index.js";
import { Job } from "bullmq";
const logger = queueLogger.createSubLogger("clean-remote-files");
export default async function cleanRemoteFiles(
job: Bull.Job<Record<string, unknown>>,
done: any,
): Promise<void> {
export async function cleanRemoteFiles(
job: Job<Record<string, unknown>>,
): Promise<string> {
let progress = 0;
const untilDate = new Date(Date.now() - ((new Date()).getTimezoneOffset() * 60000));
untilDate.setDate(untilDate.getDate() - (config.mediaCleanup?.maxAgeDays ?? 0));
@ -54,7 +51,7 @@ export default async function cleanRemoteFiles(
const files = await query.getMany();
if (files.length === 0) {
job.progress(100);
job.updateProgress(100);
break;
}
@ -62,9 +59,9 @@ export default async function cleanRemoteFiles(
progress += files.length;
job.progress((progress / total) * 100);
job.updateProgress((progress / total) * 100);
}
logger.succ(`Remote media cleanup job completed successfully.`);
done();
return "Success";
}

View File

@ -1,8 +1,8 @@
import type { ObjectStorageFileJobData } from "@/queue/types.js";
import type Bull from "bull";
import { deleteObjectStorageFile } from "@/services/drive/delete-file.js";
import { Job } from "bullmq";
export default async (job: Bull.Job<ObjectStorageFileJobData>) => {
export async function deleteFile(job: Job<ObjectStorageFileJobData>): Promise<string> {
const key: string = job.data.key;
await deleteObjectStorageFile(key);

View File

@ -0,0 +1,42 @@
import { Job, Processor } from "bullmq";
import { createQueue } from "../index.js";
import { cleanRemoteFiles } from "./clean-remote-files.js";
import { deleteFile } from "./delete-file.js";
const processors = {
cleanRemoteFiles,
deleteFile,
} as Record<string, Processor>;
async function process(job: Job<any>): Promise<string> {
const processor = processors[job.name];
if (processor === undefined) return "skip: unknown job name";
return await processor(job);
}
export const [objectStorageQueue, objectStorageInit] =
createQueue("objectStorage", process, { concurrency: 16 });
export function createDeleteObjectStorageFileJob(key: string) {
return objectStorageQueue.add(
"deleteFile",
{
key: key,
},
{
removeOnComplete: true,
removeOnFail: true,
},
);
}
export function createCleanRemoteFilesJob() {
return objectStorageQueue.add(
"cleanRemoteFiles",
{},
{
removeOnComplete: true,
removeOnFail: true,
},
);
}

View File

@ -1,4 +1,3 @@
import type Bull from "bull";
import { In } from "typeorm";
import { Mutings } from "@/models/index.js";
import { queueLogger } from "../../logger.js";
@ -6,10 +5,7 @@ import { publishUserEvent } from "@/services/stream.js";
const logger = queueLogger.createSubLogger("check-expired-mutings");
export async function checkExpiredMutings(
job: Bull.Job<Record<string, unknown>>,
done: any,
): Promise<void> {
export async function checkExpiredMutings(): Promise<string> {
logger.info("Checking expired mutings...");
const expired = await Mutings.createQueryBuilder("muting")
@ -29,5 +25,5 @@ export async function checkExpiredMutings(
}
logger.succ("All expired mutings checked.");
done();
return "Success";
}

View File

@ -1,5 +1,4 @@
import type Bull from "bull";
import { Job } from "bullmq";
import { queueLogger } from "../../logger.js";
import {
activeUsersChart,
@ -18,10 +17,7 @@ import {
const logger = queueLogger.createSubLogger("clean-charts");
export async function cleanCharts(
job: Bull.Job<Record<string, unknown>>,
done: any,
): Promise<void> {
export async function cleanCharts(): Promise<string> {
logger.info("Clean charts...");
await Promise.all([
@ -40,5 +36,5 @@ export async function cleanCharts(
]);
logger.succ("All charts successfully cleaned.");
done();
return "Success";
}

View File

@ -1,4 +1,3 @@
import type Bull from "bull";
import { LessThan } from "typeorm";
import { UserIps } from "@/models/index.js";
@ -6,16 +5,13 @@ import { queueLogger } from "../../logger.js";
const logger = queueLogger.createSubLogger("clean");
export async function clean(
job: Bull.Job<Record<string, unknown>>,
done: any,
): Promise<void> {
export async function clean(): Promise<string> {
logger.info("Cleaning...");
UserIps.delete({
await UserIps.delete({
createdAt: LessThan(new Date(Date.now() - 1000 * 60 * 60 * 24 * 90)),
});
logger.succ("Cleaned.");
done();
return "Success";
}

View File

@ -0,0 +1,45 @@
import { Job, Processor } from "bullmq";
import { createQueue } from "../index.js";
import { clean } from "./clean.js";
import { cleanCharts } from "./clean-charts.js";
import { checkExpiredMutings } from "./check-expired-mutings.js";
import { resyncCharts } from "./resync-charts.js";
import { tickCharts } from "./tick-charts.js";
import { verifyLinks } from "./verify-links.js";
const processors = {
clean,
cleanCharts,
checkExpiredMutings,
resyncCharts,
tickCharts,
verifyLinks,
} as Record<string, Processor>;
async function process(job: Job<any>): Promise<string> {
const processor = processors[job.name];
if (processor === undefined) return "skip: unknown job name";
return await processor(job);
}
const [systemQueue_, systemInitQueue] =
createQueue("system", process, { concurrency: Object.keys(processors).length });
export const systemQueue = systemQueue_;
export async function systemInit() {
await systemInitQueue();
for (const { name, seconds } of [
{ name: "clean", seconds: 60 * 60 },
{ name: "cleanCharts", seconds: 60 * 60 },
{ name: "checkExpiredMutings", seconds: 5 * 60 },
{ name: "resyncCharts", seconds: 60 * 60 },
{ name: "tickCharts", seconds: 60 },
{ name: "verifyLinks", seconds: 60 * 60 * 24 },
]) {
await systemQueue.upsertJobScheduler(
name,
{ every: seconds * 1000 },
{ opts: { removeOnComplete: true } }
);
}
}

View File

@ -1,14 +1,9 @@
import type Bull from "bull";
import { queueLogger } from "../../logger.js";
import { driveChart, notesChart, usersChart } from "@/services/chart/index.js";
const logger = queueLogger.createSubLogger("resync-charts");
export async function resyncCharts(
job: Bull.Job<Record<string, unknown>>,
done: any,
): Promise<void> {
export async function resyncCharts(): Promise<string> {
logger.info("Resync charts...");
// TODO: ユーザーごとのチャートも更新する
@ -20,5 +15,5 @@ export async function resyncCharts(
]);
logger.succ("All charts successfully resynced.");
done();
return "Success";
}

View File

@ -1,5 +1,3 @@
import type Bull from "bull";
import { queueLogger } from "../../logger.js";
import {
activeUsersChart,
@ -18,10 +16,7 @@ import {
const logger = queueLogger.createSubLogger("tick-charts");
export async function tickCharts(
job: Bull.Job<Record<string, unknown>>,
done: any,
): Promise<void> {
export async function tickCharts(): Promise<string> {
logger.info("Tick charts...");
await Promise.all([
@ -40,5 +35,5 @@ export async function tickCharts(
]);
logger.succ("All charts successfully ticked.");
done();
return "Success";
}

View File

@ -1,17 +1,11 @@
import type Bull from "bull";
import { UserProfiles } from "@/models/index.js";
import { Not } from "typeorm";
import { queueLogger } from "../../logger.js";
import { verifyLink } from "@/services/fetch-rel-me.js";
import config from "@/config/index.js";
const logger = queueLogger.createSubLogger("verify-links");
export async function verifyLinks(
job: Bull.Job<Record<string, unknown>>,
done: any,
): Promise<void> {
export async function verifyLinks(): Promise<string> {
logger.info("Verifying links...");
const usersToVerify = await UserProfiles.findBy({
@ -34,11 +28,11 @@ export async function verifyLinks(
});
} catch (e) {
logger.error(`Failed to update user ${user.userId} ${e}`);
done(e);
throw e;
}
}
}
logger.succ("All links successfully verified.");
done();
return "Success";
}

View File

@ -1,14 +1,16 @@
import { URL } from "node:url";
import type Bull from "bull";
import Logger from "@/services/logger.js";
import type { WebhookDeliverJobData } from "../types.js";
import { getResponse, StatusError } from "@/misc/fetch.js";
import { Webhooks } from "@/models/index.js";
import config from "@/config/index.js";
import { Job } from "bullmq";
import { Webhook, webhookEventTypes } from "@/models/entities/webhook.js";
import { v4 as uuid } from "uuid";
import { createQueue, processorTimeout } from "./index.js";
const logger = new Logger("webhook");
export default async (job: Bull.Job<WebhookDeliverJobData>) => {
async function process(job: Job<WebhookDeliverJobData>) {
if (job.data == null || Object.keys(job.data).length === 0) {
job.opts.removeOnComplete = true;
return "Skip (data was null or empty)";
@ -68,3 +70,36 @@ export default async (job: Bull.Job<WebhookDeliverJobData>) => {
}
}
};
export const [webhookDeliverQueue, webhookDeliverInit] =
createQueue(
"webhookDeliver",
processorTimeout(process, 60),
{ limitPerSec: 64, concurrency: 64 },
);
export function webhookDeliverJob(
webhook: Webhook,
type: typeof webhookEventTypes[number],
content: unknown,
) {
const data = {
type,
content,
webhookId: webhook.id,
userId: webhook.userId,
to: webhook.url,
secret: webhook.secret,
createdAt: Date.now(),
eventId: uuid(),
};
return webhookDeliverQueue.add("default", data, {
attempts: 4,
backoff: {
type: "custom",
},
removeOnComplete: true,
removeOnFail: true,
});
}

View File

@ -47,7 +47,6 @@ import * as ep___admin_relays_list from "./endpoints/admin/relays/list.js";
import * as ep___admin_relays_remove from "./endpoints/admin/relays/remove.js";
import * as ep___admin_resetPassword from "./endpoints/admin/reset-password.js";
import * as ep___admin_resolveAbuseUserReport from "./endpoints/admin/resolve-abuse-user-report.js";
import * as ep___admin_search_indexAll from "./endpoints/admin/search/index-all.js";
import * as ep___admin_sendEmail from "./endpoints/admin/send-email.js";
import * as ep___admin_sendModMail from "./endpoints/admin/send-mod-mail.js";
import * as ep___admin_serverInfo from "./endpoints/admin/server-info.js";
@ -393,7 +392,6 @@ const eps = [
["admin/relays/remove", ep___admin_relays_remove],
["admin/reset-password", ep___admin_resetPassword],
["admin/resolve-abuse-user-report", ep___admin_resolveAbuseUserReport],
["admin/search/index-all", ep___admin_search_indexAll],
["admin/send-email", ep___admin_sendEmail],
["admin/send-mod-mail", ep___admin_sendModMail],
["admin/server-info", ep___admin_serverInfo],

View File

@ -3,7 +3,6 @@ import {
inboxQueue,
dbQueue,
objectStorageQueue,
backgroundQueue,
} from "@/queue/queues.js";
import define from "../../../define.js";
@ -38,11 +37,6 @@ export const meta = {
nullable: false,
ref: "QueueCount",
},
backgroundQueue: {
optional: false,
nullable: false,
ref: "QueueCount",
},
},
},
} as const;
@ -58,13 +52,11 @@ export default define(meta, paramDef, async (ps) => {
const inboxJobCounts = await inboxQueue.getJobCounts();
const dbJobCounts = await dbQueue.getJobCounts();
const objectStorageJobCounts = await objectStorageQueue.getJobCounts();
const backgroundJobCounts = await backgroundQueue.getJobCounts();
return {
deliver: deliverJobCounts,
inbox: inboxJobCounts,
db: dbJobCounts,
objectStorage: objectStorageJobCounts,
backgroundQueue: backgroundJobCounts,
};
});

View File

@ -1,28 +0,0 @@
import define from "../../../define.js";
import { createIndexAllNotesJob } from "@/queue/index.js";
export const meta = {
tags: ["admin"],
requireCredential: true,
requireModerator: true,
} as const;
export const paramDef = {
type: "object",
properties: {
cursor: {
type: "string",
format: "misskey:id",
nullable: true,
default: null,
},
},
required: [],
} as const;
export default define(meta, paramDef, async (ps, _me) => {
createIndexAllNotesJob({
cursor: ps.cursor ?? undefined,
});
});

View File

@ -386,6 +386,7 @@ export default async (
if (data.poll?.expiresAt) {
const delay = data.poll.expiresAt.getTime() - Date.now();
endedPollNotificationQueue.add(
"default",
{
noteId: note.id,
},

138
yarn.lock
View File

@ -545,19 +545,30 @@ __metadata:
languageName: node
linkType: hard
"@bull-board/koa@npm:5.6.0":
version: 5.6.0
resolution: "@bull-board/koa@npm:5.6.0"
"@bull-board/api@npm:6.0.0":
version: 6.0.0
resolution: "@bull-board/api@npm:6.0.0"
dependencies:
"@bull-board/api": "npm:5.6.0"
"@bull-board/ui": "npm:5.6.0"
ejs: "npm:^3.1.7"
redis-info: "npm:^3.0.8"
peerDependencies:
"@bull-board/ui": 6.0.0
checksum: 10/922e46f2c567e51f5dd657b6bac2a2c49de69f9bf75d7819a03b48666d5bef9a9e991f825b413545fad04c0fcbb68d7eadbe98c9c96cb8b4ce06a3a39bbde9d4
languageName: node
linkType: hard
"@bull-board/koa@npm:6.0.0":
version: 6.0.0
resolution: "@bull-board/koa@npm:6.0.0"
dependencies:
"@bull-board/api": "npm:6.0.0"
"@bull-board/ui": "npm:6.0.0"
ejs: "npm:^3.1.10"
koa: "npm:^2.13.1"
koa-mount: "npm:^4.0.0"
koa-router: "npm:^10.0.0"
koa-static: "npm:^5.0.0"
koa-views: "npm:^7.0.1"
checksum: 10/54aef8b937e78aedc4c1d109562c8f362b571ebdf9eadc1b4a1e6d6dde10a2e96761fe470ef254ccd7b947dbfba53c92d05f238a395f62ccd51bccfa3aed94a6
checksum: 10/0d1bac34e860a1c3b744db8f7011a7c607bf6e630827b4f2a4d56f25db66a4d8da28ee041a961ce2b94c356e1d1bd238c638189b56595844dca44bd24c412479
languageName: node
linkType: hard
@ -570,6 +581,15 @@ __metadata:
languageName: node
linkType: hard
"@bull-board/ui@npm:6.0.0":
version: 6.0.0
resolution: "@bull-board/ui@npm:6.0.0"
dependencies:
"@bull-board/api": "npm:6.0.0"
checksum: 10/6d083063a39b988648d2cc4001cc9ffdb2b7c9fef4876d83b7f0a11417bc3deaa311505c1d16bc1b540ed77bf097f31726abf692f127dc2fedebebeab061dd98
languageName: node
linkType: hard
"@colors/colors@npm:1.5.0":
version: 1.5.0
resolution: "@colors/colors@npm:1.5.0"
@ -5445,9 +5465,9 @@ __metadata:
version: 0.0.0-use.local
resolution: "backend@workspace:packages/backend"
dependencies:
"@bull-board/api": "npm:5.6.0"
"@bull-board/koa": "npm:5.6.0"
"@bull-board/ui": "npm:5.6.0"
"@bull-board/api": "npm:6.0.0"
"@bull-board/koa": "npm:6.0.0"
"@bull-board/ui": "npm:6.0.0"
"@discordapp/twemoji": "npm:14.1.2"
"@iceshrimp/summaly": "npm:2.7.2"
"@koa/cors": "npm:3.4.3"
@ -5518,7 +5538,7 @@ __metadata:
axios: "npm:^1.4.0"
bcryptjs: "npm:2.4.3"
blurhash: "npm:2.0.5"
bull: "npm:4.10.4"
bullmq: "npm:5.16.0"
cacheable-lookup: "npm:7.0.0"
cbor: "npm:8.1.0"
chalk: "npm:5.3.0"
@ -5543,7 +5563,7 @@ __metadata:
happy-dom: "npm:^12.10.3"
hpagent: "npm:0.1.2"
iceshrimp-sdk: "workspace:*"
ioredis: "npm:5.3.2"
ioredis: "npm:5.4.1"
ip-cidr: "npm:3.1.0"
is-svg: "npm:4.3.2"
js-yaml: "npm:4.1.0"
@ -6060,19 +6080,18 @@ __metadata:
languageName: node
linkType: hard
"bull@npm:4.10.4":
version: 4.10.4
resolution: "bull@npm:4.10.4"
"bullmq@npm:5.16.0":
version: 5.16.0
resolution: "bullmq@npm:5.16.0"
dependencies:
cron-parser: "npm:^4.2.1"
debuglog: "npm:^1.0.0"
get-port: "npm:^5.1.1"
ioredis: "npm:^5.0.0"
lodash: "npm:^4.17.21"
msgpackr: "npm:^1.5.2"
semver: "npm:^7.3.2"
uuid: "npm:^8.3.0"
checksum: 10/8a9909731741e49700639d5e4102829dea0c0a959894d61d521f12ec6a762b012389765fb58ba745cdf51cbeeb8e07e24367361819206bbebac89eb24b189332
cron-parser: "npm:^4.6.0"
ioredis: "npm:^5.4.1"
msgpackr: "npm:^1.10.1"
node-abort-controller: "npm:^3.1.1"
semver: "npm:^7.5.4"
tslib: "npm:^2.0.0"
uuid: "npm:^9.0.0"
checksum: 10/39febf6e4a43c27a47e43c6d4cbfa0173cf801b5d804a0a2aac871fed376fdaf49a60eb1a0394292de1a21c11916086b424d4a244499be8a013c0d4b383f5c17
languageName: node
linkType: hard
@ -7311,12 +7330,12 @@ __metadata:
languageName: node
linkType: hard
"cron-parser@npm:^4.2.1":
version: 4.8.1
resolution: "cron-parser@npm:4.8.1"
"cron-parser@npm:^4.6.0":
version: 4.9.0
resolution: "cron-parser@npm:4.9.0"
dependencies:
luxon: "npm:^3.2.1"
checksum: 10/5deb3f82166e5b55bf307e824888e0d661bbbf607dd7947b53e544bfbd7981ebda8c1e73416879ccc3b5ec093178718365e886e947d6a3962c4386a3eea5980f
checksum: 10/ffca5e532a5ee0923412ee6e4c7f9bbceacc6ddf8810c16d3e9fb4fe5ec7e2de1b6896d7956f304bb6bc96b0ce37ad7e3935304179d52951c18d84107184faa7
languageName: node
linkType: hard
@ -7694,13 +7713,6 @@ __metadata:
languageName: node
linkType: hard
"debuglog@npm:^1.0.0":
version: 1.0.1
resolution: "debuglog@npm:1.0.1"
checksum: 10/942a3196951ef139e3c19dc55583c1f9532fad92e293ffc6cbf8bb67562ea1aa013b5b86b4a89c2dd89e5e1c16e00b975e5ba3aa0a11070a3577e81162e6e29d
languageName: node
linkType: hard
"decamelize-keys@npm:^1.1.0":
version: 1.1.1
resolution: "decamelize-keys@npm:1.1.1"
@ -8315,6 +8327,17 @@ __metadata:
languageName: node
linkType: hard
"ejs@npm:^3.1.10":
version: 3.1.10
resolution: "ejs@npm:3.1.10"
dependencies:
jake: "npm:^10.8.5"
bin:
ejs: bin/cli.js
checksum: 10/a9cb7d7cd13b7b1cd0be5c4788e44dd10d92f7285d2f65b942f33e127230c054f99a42db4d99f766d8dbc6c57e94799593ee66a14efd7c8dd70c4812bf6aa384
languageName: node
linkType: hard
"ejs@npm:^3.1.7":
version: 3.1.9
resolution: "ejs@npm:3.1.9"
@ -10429,13 +10452,6 @@ __metadata:
languageName: node
linkType: hard
"get-port@npm:^5.1.1":
version: 5.1.1
resolution: "get-port@npm:5.1.1"
checksum: 10/0162663ffe5c09e748cd79d97b74cd70e5a5c84b760a475ce5767b357fb2a57cb821cee412d646aa8a156ed39b78aab88974eddaa9e5ee926173c036c0713787
languageName: node
linkType: hard
"get-stream@npm:^2.2.0":
version: 2.3.1
resolution: "get-stream@npm:2.3.1"
@ -11678,9 +11694,9 @@ __metadata:
languageName: node
linkType: hard
"ioredis@npm:5.3.2, ioredis@npm:^5.0.0":
version: 5.3.2
resolution: "ioredis@npm:5.3.2"
"ioredis@npm:5.4.1, ioredis@npm:^5.4.1":
version: 5.4.1
resolution: "ioredis@npm:5.4.1"
dependencies:
"@ioredis/commands": "npm:^1.1.1"
cluster-key-slot: "npm:^1.1.0"
@ -11691,7 +11707,7 @@ __metadata:
redis-errors: "npm:^1.2.0"
redis-parser: "npm:^3.0.0"
standard-as-callback: "npm:^2.1.0"
checksum: 10/0140f055ef81d28e16ca8400b99dabb9ce82009f54afd83cba952c7d0c5d736841e43247765b8ee1af1f02843531c5b8df240af18bd3d7e2ca3d60b36e76213f
checksum: 10/9043b812ac58065e80c759d130602cc64490fcaeaacf93723453fda04c7ba61dab0e2f50380eacb045592378ededf44f270c0d43e13e3e8b8d7c5a8d7fecb823
languageName: node
linkType: hard
@ -15004,7 +15020,7 @@ __metadata:
languageName: node
linkType: hard
"msgpackr@npm:1.9.5, msgpackr@npm:^1.5.2":
"msgpackr@npm:1.9.5":
version: 1.9.5
resolution: "msgpackr@npm:1.9.5"
dependencies:
@ -15016,6 +15032,18 @@ __metadata:
languageName: node
linkType: hard
"msgpackr@npm:^1.10.1":
version: 1.11.0
resolution: "msgpackr@npm:1.11.0"
dependencies:
msgpackr-extract: "npm:^3.0.2"
dependenciesMeta:
msgpackr-extract:
optional: true
checksum: 10/e95edf511ab269b34e312a7bd058c203e1ef4dc0656df8ccf1a10e9cdb40fac4c4b62b42ea0b2d199f85a1a53704f7f47e28ed5af5311f66097c591eafbbf8f3
languageName: node
linkType: hard
"multer@npm:1.4.4-lts.1":
version: 1.4.4-lts.1
resolution: "multer@npm:1.4.4-lts.1"
@ -15164,6 +15192,13 @@ __metadata:
languageName: node
linkType: hard
"node-abort-controller@npm:^3.1.1":
version: 3.1.1
resolution: "node-abort-controller@npm:3.1.1"
checksum: 10/0a2cdb7ec0aeaf3cb31e1ca0e192f5add48f1c5c9c9ed822129f9dddbd9432f69b7425982f94ce803c56a2104884530aa67cd57696e5774b2e5b8ec2f58de042
languageName: node
linkType: hard
"node-addon-api@npm:^5.0.0":
version: 5.1.0
resolution: "node-addon-api@npm:5.1.0"
@ -20181,6 +20216,13 @@ __metadata:
languageName: node
linkType: hard
"tslib@npm:^2.0.0":
version: 2.7.0
resolution: "tslib@npm:2.7.0"
checksum: 10/9a5b47ddac65874fa011c20ff76db69f97cf90c78cff5934799ab8894a5342db2d17b4e7613a087046bc1d133d21547ddff87ac558abeec31ffa929c88b7fce6
languageName: node
linkType: hard
"tslib@npm:^2.1.0, tslib@npm:^2.3.0, tslib@npm:^2.5.0, tslib@npm:^2.5.2, tslib@npm:^2.6.0":
version: 2.6.1
resolution: "tslib@npm:2.6.1"
@ -20816,7 +20858,7 @@ __metadata:
languageName: node
linkType: hard
"uuid@npm:^8.3.0, uuid@npm:^8.3.2":
"uuid@npm:^8.3.2":
version: 8.3.2
resolution: "uuid@npm:8.3.2"
bin: