iceshrimp/packages/backend/src/queue/index.ts

618 lines
12 KiB
TypeScript
Raw Normal View History

2023-01-13 13:40:33 +09:00
import type httpSignature from "@peertube/http-signature";
import { v4 as uuid } from "uuid";
import config from "@/config/index.js";
import type { DriveFile } from "@/models/entities/drive-file.js";
import type { IActivity } from "@/remote/activitypub/type.js";
import type { Webhook, webhookEventTypes } from "@/models/entities/webhook.js";
import { envOption } from "../env.js";
import processDeliver from "./processors/deliver.js";
import processInbox from "./processors/inbox.js";
import processDb from "./processors/db/index.js";
import processObjectStorage from "./processors/object-storage/index.js";
import processSystemQueue from "./processors/system/index.js";
import processWebhookDeliver from "./processors/webhook-deliver.js";
import processBackground from "./processors/background/index.js";
2023-01-13 13:40:33 +09:00
import { endedPollNotification } from "./processors/ended-poll-notification.js";
import { queueLogger } from "./logger.js";
import { getJobInfo } from "./get-job-info.js";
import {
systemQueue,
dbQueue,
deliverQueue,
inboxQueue,
objectStorageQueue,
endedPollNotificationQueue,
webhookDeliverQueue,
backgroundQueue,
2023-01-13 13:40:33 +09:00
} from "./queues.js";
import type { ThinUser } from "./types.js";
2019-04-14 02:19:59 +09:00
function renderError(e: Error): any {
return {
stack: e.stack,
message: e.message,
name: e.name,
2019-04-14 02:19:59 +09:00
};
}
2023-01-13 13:40:33 +09:00
const systemLogger = queueLogger.createSubLogger("system");
const deliverLogger = queueLogger.createSubLogger("deliver");
const webhookLogger = queueLogger.createSubLogger("webhook");
const inboxLogger = queueLogger.createSubLogger("inbox");
const dbLogger = queueLogger.createSubLogger("db");
const objectStorageLogger = queueLogger.createSubLogger("objectStorage");
2019-03-09 10:18:59 +09:00
systemQueue
2023-01-13 13:40:33 +09:00
.on("waiting", (jobId) => systemLogger.debug(`waiting id=${jobId}`))
.on("active", (job) => systemLogger.debug(`active id=${job.id}`))
.on("completed", (job, result) =>
systemLogger.debug(`completed(${result}) id=${job.id}`),
)
.on("failed", (job, err) =>
systemLogger.warn(`failed(${err}) id=${job.id}`, {
job,
e: renderError(err),
}),
)
.on("error", (job: any, err: Error) =>
systemLogger.error(`error ${err}`, { job, e: renderError(err) }),
)
.on("stalled", (job) => systemLogger.warn(`stalled id=${job.id}`));
2019-03-09 08:57:55 +09:00
deliverQueue
2023-01-13 13:40:33 +09:00
.on("waiting", (jobId) => deliverLogger.debug(`waiting id=${jobId}`))
.on("active", (job) =>
deliverLogger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`),
)
.on("completed", (job, result) =>
deliverLogger.debug(
`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`,
),
)
.on("failed", (job, err) =>
deliverLogger.warn(`failed(${err}) ${getJobInfo(job)} to=${job.data.to}`),
)
.on("error", (job: any, err: Error) =>
deliverLogger.error(`error ${err}`, { job, e: renderError(err) }),
)
.on("stalled", (job) =>
deliverLogger.warn(`stalled ${getJobInfo(job)} to=${job.data.to}`),
);
2019-03-09 10:18:59 +09:00
inboxQueue
2023-01-13 13:40:33 +09:00
.on("waiting", (jobId) => inboxLogger.debug(`waiting id=${jobId}`))
.on("active", (job) => inboxLogger.debug(`active ${getJobInfo(job, true)}`))
.on("completed", (job, result) =>
inboxLogger.debug(`completed(${result}) ${getJobInfo(job, true)}`),
)
.on("failed", (job, err) =>
inboxLogger.warn(
`failed(${err}) ${getJobInfo(job)} activity=${
job.data.activity ? job.data.activity.id : "none"
}`,
{ job, e: renderError(err) },
),
)
.on("error", (job: any, err: Error) =>
inboxLogger.error(`error ${err}`, { job, e: renderError(err) }),
)
.on("stalled", (job) =>
inboxLogger.warn(
`stalled ${getJobInfo(job)} activity=${
job.data.activity ? job.data.activity.id : "none"
}`,
),
);
2019-03-09 08:57:55 +09:00
2019-05-27 16:54:47 +09:00
dbQueue
2023-01-13 13:40:33 +09:00
.on("waiting", (jobId) => dbLogger.debug(`waiting id=${jobId}`))
.on("active", (job) => dbLogger.debug(`active id=${job.id}`))
.on("completed", (job, result) =>
dbLogger.debug(`completed(${result}) id=${job.id}`),
)
.on("failed", (job, err) =>
dbLogger.warn(`failed(${err}) id=${job.id}`, { job, e: renderError(err) }),
)
.on("error", (job: any, err: Error) =>
dbLogger.error(`error ${err}`, { job, e: renderError(err) }),
)
.on("stalled", (job) => dbLogger.warn(`stalled id=${job.id}`));
2019-05-27 16:54:47 +09:00
objectStorageQueue
2023-01-13 13:40:33 +09:00
.on("waiting", (jobId) => objectStorageLogger.debug(`waiting id=${jobId}`))
.on("active", (job) => objectStorageLogger.debug(`active id=${job.id}`))
.on("completed", (job, result) =>
objectStorageLogger.debug(`completed(${result}) id=${job.id}`),
)
.on("failed", (job, err) =>
objectStorageLogger.warn(`failed(${err}) id=${job.id}`, {
job,
e: renderError(err),
}),
)
.on("error", (job: any, err: Error) =>
objectStorageLogger.error(`error ${err}`, { job, e: renderError(err) }),
)
.on("stalled", (job) => objectStorageLogger.warn(`stalled id=${job.id}`));
2019-05-27 16:54:47 +09:00
webhookDeliverQueue
2023-01-13 13:40:33 +09:00
.on("waiting", (jobId) => webhookLogger.debug(`waiting id=${jobId}`))
.on("active", (job) =>
webhookLogger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`),
)
.on("completed", (job, result) =>
webhookLogger.debug(
`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`,
),
)
.on("failed", (job, err) =>
webhookLogger.warn(`failed(${err}) ${getJobInfo(job)} to=${job.data.to}`),
)
.on("error", (job: any, err: Error) =>
webhookLogger.error(`error ${err}`, { job, e: renderError(err) }),
)
.on("stalled", (job) =>
webhookLogger.warn(`stalled ${getJobInfo(job)} to=${job.data.to}`),
);
2021-05-08 18:56:21 +09:00
export function deliver(user: ThinUser, content: unknown, to: string | null) {
2019-03-07 23:07:21 +09:00
if (content == null) return null;
2021-05-08 18:56:21 +09:00
if (to == null) return null;
2019-02-06 15:01:43 +09:00
const data = {
user: {
2021-12-09 23:58:30 +09:00
id: user.id,
},
2019-02-06 15:01:43 +09:00
content,
2021-12-09 23:58:30 +09:00
to,
2019-02-06 15:01:43 +09:00
};
2019-03-07 23:07:21 +09:00
return deliverQueue.add(data, {
attempts: config.deliverJobMaxAttempts || 12,
2023-01-13 13:40:33 +09:00
timeout: 1 * 60 * 1000, // 1min
2019-03-07 23:07:21 +09:00
backoff: {
2023-01-13 13:40:33 +09:00
type: "apBackoff",
2019-03-07 23:07:21 +09:00
},
removeOnComplete: true,
2021-12-09 23:58:30 +09:00
removeOnFail: true,
2019-03-07 23:07:21 +09:00
});
2018-04-04 23:12:35 +09:00
}
2023-01-13 13:40:33 +09:00
export function inbox(
activity: IActivity,
signature: httpSignature.IParsedSignature,
) {
2019-02-06 15:01:43 +09:00
const data = {
activity: activity,
2021-12-09 23:58:30 +09:00
signature,
2019-02-06 15:01:43 +09:00
};
2019-03-07 23:07:21 +09:00
return inboxQueue.add(data, {
attempts: config.inboxJobMaxAttempts || 8,
2023-01-13 13:40:33 +09:00
timeout: 5 * 60 * 1000, // 5min
2019-03-07 23:07:21 +09:00
backoff: {
2023-01-13 13:40:33 +09:00
type: "apBackoff",
2019-03-07 23:07:21 +09:00
},
removeOnComplete: true,
2021-12-09 23:58:30 +09:00
removeOnFail: true,
2019-03-07 23:07:21 +09:00
});
2018-04-05 23:24:51 +09:00
}
2021-05-08 18:56:21 +09:00
export function createDeleteDriveFilesJob(user: ThinUser) {
2023-01-13 13:40:33 +09:00
return dbQueue.add(
"deleteDriveFiles",
{
user: user,
},
{
removeOnComplete: true,
removeOnFail: true,
},
);
}
export function createExportCustomEmojisJob(user: ThinUser) {
2023-01-13 13:40:33 +09:00
return dbQueue.add(
"exportCustomEmojis",
{
user: user,
},
{
removeOnComplete: true,
removeOnFail: true,
},
);
2019-02-21 01:30:21 +09:00
}
2021-05-08 18:56:21 +09:00
export function createExportNotesJob(user: ThinUser) {
2023-01-13 13:40:33 +09:00
return dbQueue.add(
"exportNotes",
{
user: user,
},
{
removeOnComplete: true,
removeOnFail: true,
},
);
}
2019-02-04 13:35:58 +09:00
2023-01-13 13:40:33 +09:00
export function createExportFollowingJob(
user: ThinUser,
excludeMuting = false,
excludeInactive = false,
) {
return dbQueue.add(
"exportFollowing",
{
user: user,
excludeMuting,
excludeInactive,
},
{
removeOnComplete: true,
removeOnFail: true,
},
);
2019-02-06 21:21:49 +09:00
}
2021-05-08 18:56:21 +09:00
export function createExportMuteJob(user: ThinUser) {
2023-01-13 13:40:33 +09:00
return dbQueue.add(
"exportMute",
{
user: user,
},
{
removeOnComplete: true,
removeOnFail: true,
},
);
2019-02-06 21:21:49 +09:00
}
2021-05-08 18:56:21 +09:00
export function createExportBlockingJob(user: ThinUser) {
2023-01-13 13:40:33 +09:00
return dbQueue.add(
"exportBlocking",
{
user: user,
},
{
removeOnComplete: true,
removeOnFail: true,
},
);
2019-02-06 21:21:49 +09:00
}
2021-05-08 18:56:21 +09:00
export function createExportUserListsJob(user: ThinUser) {
2023-01-13 13:40:33 +09:00
return dbQueue.add(
"exportUserLists",
{
user: user,
},
{
removeOnComplete: true,
removeOnFail: true,
},
);
}
2023-01-13 13:40:33 +09:00
export function createImportFollowingJob(
user: ThinUser,
fileId: DriveFile["id"],
) {
return dbQueue.add(
"importFollowing",
{
user: user,
fileId: fileId,
},
{
removeOnComplete: true,
removeOnFail: true,
},
);
2019-03-12 00:34:19 +09:00
}
2023-03-29 06:29:47 +09:00
export function createImportPostsJob(
user: ThinUser,
fileId: DriveFile["id"],
2023-03-31 00:31:29 +09:00
signatureCheck: boolean,
2023-03-29 06:29:47 +09:00
) {
return dbQueue.add(
"importPosts",
{
user: user,
fileId: fileId,
2023-03-31 00:31:29 +09:00
signatureCheck: signatureCheck,
2023-03-29 06:29:47 +09:00
},
{
removeOnComplete: true,
removeOnFail: true,
},
);
}
2023-05-15 01:46:48 +09:00
export function createImportMastoPostJob(
user: ThinUser,
post: any,
signatureCheck: boolean,
) {
return dbQueue.add(
"importMastoPost",
{
user: user,
post: post,
signatureCheck: signatureCheck,
},
{
removeOnComplete: true,
removeOnFail: true,
2023-05-25 04:03:37 +09:00
attempts: config.inboxJobMaxAttempts || 8,
timeout: 60 * 1000, // 1min
2023-05-15 01:46:48 +09:00
},
);
}
export function createImportCkPostJob(
user: ThinUser,
post: any,
signatureCheck: boolean,
) {
return dbQueue.add(
"importCkPost",
{
user: user,
post: post,
signatureCheck: signatureCheck,
},
{
removeOnComplete: true,
removeOnFail: true,
},
);
}
2023-01-13 13:40:33 +09:00
export function createImportMutingJob(user: ThinUser, fileId: DriveFile["id"]) {
return dbQueue.add(
"importMuting",
{
user: user,
fileId: fileId,
},
{
removeOnComplete: true,
removeOnFail: true,
},
);
}
2023-01-13 13:40:33 +09:00
export function createImportBlockingJob(
user: ThinUser,
fileId: DriveFile["id"],
) {
return dbQueue.add(
"importBlocking",
{
user: user,
fileId: fileId,
},
{
removeOnComplete: true,
removeOnFail: true,
},
);
}
2023-01-13 13:40:33 +09:00
export function createImportUserListsJob(
user: ThinUser,
fileId: DriveFile["id"],
) {
return dbQueue.add(
"importUserLists",
{
user: user,
fileId: fileId,
},
{
removeOnComplete: true,
removeOnFail: true,
},
);
2019-03-11 19:43:58 +09:00
}
2023-01-13 13:40:33 +09:00
export function createImportCustomEmojisJob(
user: ThinUser,
fileId: DriveFile["id"],
) {
return dbQueue.add(
"importCustomEmojis",
{
user: user,
fileId: fileId,
},
{
removeOnComplete: true,
removeOnFail: true,
},
);
2022-01-13 00:48:58 +09:00
}
2023-01-13 13:40:33 +09:00
export function createDeleteAccountJob(
user: ThinUser,
opts: { soft?: boolean } = {},
) {
return dbQueue.add(
"deleteAccount",
{
user: user,
soft: opts.soft,
},
{
removeOnComplete: true,
removeOnFail: true,
},
);
}
2019-05-27 16:54:47 +09:00
export function createDeleteObjectStorageFileJob(key: string) {
2023-01-13 13:40:33 +09:00
return objectStorageQueue.add(
"deleteFile",
{
key: key,
},
{
removeOnComplete: true,
removeOnFail: true,
},
);
2019-05-27 16:54:47 +09:00
}
export function createCleanRemoteFilesJob() {
2023-01-13 13:40:33 +09:00
return objectStorageQueue.add(
"cleanRemoteFiles",
{},
{
removeOnComplete: true,
removeOnFail: true,
},
);
}
export function createIndexAllNotesJob(data = {}) {
2023-03-31 11:10:03 +09:00
return backgroundQueue.add("indexAllNotes", data, {
removeOnComplete: true,
2023-07-02 16:47:55 +09:00
removeOnFail: false,
timeout: 1000 * 60 * 60 * 24,
2023-03-31 11:10:03 +09:00
});
}
2023-01-13 13:40:33 +09:00
export function webhookDeliver(
webhook: Webhook,
type: typeof webhookEventTypes[number],
content: unknown,
) {
const data = {
2022-04-03 22:36:30 +09:00
type,
content,
webhookId: webhook.id,
userId: webhook.userId,
to: webhook.url,
secret: webhook.secret,
2022-04-03 22:36:30 +09:00
createdAt: Date.now(),
eventId: uuid(),
};
return webhookDeliverQueue.add(data, {
attempts: 4,
2023-01-13 13:40:33 +09:00
timeout: 1 * 60 * 1000, // 1min
backoff: {
2023-01-13 13:40:33 +09:00
type: "apBackoff",
},
removeOnComplete: true,
removeOnFail: true,
});
}
2023-01-13 13:40:33 +09:00
export default function () {
if (envOption.onlyServer) return;
deliverQueue.process(config.deliverJobConcurrency || 128, processDeliver);
inboxQueue.process(config.inboxJobConcurrency || 16, processInbox);
endedPollNotificationQueue.process(endedPollNotification);
webhookDeliverQueue.process(64, processWebhookDeliver);
processDb(dbQueue);
2023-08-02 06:02:11 +09:00
if (config.mediaCleanup?.cron) {
objectStorageQueue.add(
"cleanRemoteFiles",
{},
{
repeat: {
cron: "0 0 * * *",
},
removeOnComplete: true,
removeOnFail: true,
},
);
}
2022-02-08 22:18:39 +09:00
processObjectStorage(objectStorageQueue);
processBackground(backgroundQueue);
2023-01-13 13:40:33 +09:00
systemQueue.add(
"tickCharts",
{},
{
repeat: { cron: "55 * * * *" },
removeOnComplete: true,
},
);
systemQueue.add(
"resyncCharts",
{},
{
repeat: { cron: "0 0 * * *" },
removeOnComplete: true,
},
);
systemQueue.add(
"cleanCharts",
{},
{
repeat: { cron: "0 0 * * *" },
removeOnComplete: true,
},
);
systemQueue.add(
"clean",
{},
{
repeat: { cron: "0 0 * * *" },
removeOnComplete: true,
},
);
systemQueue.add(
"checkExpiredMutings",
{},
{
repeat: { cron: "*/5 * * * *" },
removeOnComplete: true,
},
);
2022-03-04 20:23:53 +09:00
2023-05-20 11:26:13 +09:00
systemQueue.add(
"setLocalEmojiSizes",
{},
{ removeOnComplete: true, removeOnFail: true },
);
systemQueue.add(
"verifyLinks",
{},
{
repeat: { cron: "0 0 * * 0" },
removeOnComplete: true,
removeOnFail: true,
},
);
processSystemQueue(systemQueue);
2019-02-04 13:35:58 +09:00
}
2019-02-06 15:24:59 +09:00
export function destroy() {
2023-01-13 13:40:33 +09:00
deliverQueue.once("cleaned", (jobs, status) => {
2019-03-09 10:18:59 +09:00
deliverLogger.succ(`Cleaned ${jobs.length} ${status} jobs`);
2019-03-07 23:36:08 +09:00
});
2023-01-13 13:40:33 +09:00
deliverQueue.clean(0, "delayed");
2019-03-07 23:36:08 +09:00
2023-01-13 13:40:33 +09:00
inboxQueue.once("cleaned", (jobs, status) => {
2019-03-09 10:18:59 +09:00
inboxLogger.succ(`Cleaned ${jobs.length} ${status} jobs`);
2019-03-07 23:36:08 +09:00
});
2023-01-13 13:40:33 +09:00
inboxQueue.clean(0, "delayed");
2019-02-06 15:24:59 +09:00
}