diff --git a/packages/backend/src/queue/QueueProcessorService.ts b/packages/backend/src/queue/QueueProcessorService.ts index ce999d9cef..eb1901d069 100644 --- a/packages/backend/src/queue/QueueProcessorService.ts +++ b/packages/backend/src/queue/QueueProcessorService.ts @@ -5,6 +5,7 @@ import { Inject, Injectable, OnApplicationShutdown } from '@nestjs/common'; import * as Bull from 'bullmq'; +import * as Sentry from '@sentry/node'; import type { Config } from '@/config.js'; import { DI } from '@/di-symbols.js'; import type Logger from '@/logger.js'; @@ -135,199 +136,271 @@ export class QueueProcessorService implements OnApplicationShutdown { } //#region system - this.systemQueueWorker = new Bull.Worker(QUEUE.SYSTEM, (job) => { - switch (job.name) { - case 'tickCharts': return this.tickChartsProcessorService.process(); - case 'resyncCharts': return this.resyncChartsProcessorService.process(); - case 'cleanCharts': return this.cleanChartsProcessorService.process(); - case 'aggregateRetention': return this.aggregateRetentionProcessorService.process(); - case 'checkExpiredMutings': return this.checkExpiredMutingsProcessorService.process(); - case 'clean': return this.cleanProcessorService.process(); - default: throw new Error(`unrecognized job type ${job.name} for system`); - } - }, { - ...baseQueueOptions(this.config, QUEUE.SYSTEM), - autorun: false, - }); + { + const processer = (job: Bull.Job) => { + switch (job.name) { + case 'tickCharts': return this.tickChartsProcessorService.process(); + case 'resyncCharts': return this.resyncChartsProcessorService.process(); + case 'cleanCharts': return this.cleanChartsProcessorService.process(); + case 'aggregateRetention': return this.aggregateRetentionProcessorService.process(); + case 'checkExpiredMutings': return this.checkExpiredMutingsProcessorService.process(); + case 'clean': return this.cleanProcessorService.process(); + default: throw new Error(`unrecognized job type ${job.name} for system`); + } + }; - const systemLogger = this.logger.createSubLogger('system'); + this.systemQueueWorker = new Bull.Worker(QUEUE.SYSTEM, (job) => { + if (this.config.sentryForBackend) { + return Sentry.startSpan({ name: 'Queue: System: ' + job.name }, () => processer(job)); + } else { + return processer(job); + } + }, { + ...baseQueueOptions(this.config, QUEUE.SYSTEM), + autorun: false, + }); - this.systemQueueWorker - .on('active', (job) => systemLogger.debug(`active id=${job.id}`)) - .on('completed', (job, result) => systemLogger.debug(`completed(${result}) id=${job.id}`)) - .on('failed', (job, err) => systemLogger.warn(`failed(${err.stack}) id=${job ? job.id : '-'}`, { job, e: renderError(err) })) - .on('error', (err: Error) => systemLogger.error(`error ${err.stack}`, { e: renderError(err) })) - .on('stalled', (jobId) => systemLogger.warn(`stalled id=${jobId}`)); + const systemLogger = this.logger.createSubLogger('system'); + + this.systemQueueWorker + .on('active', (job) => systemLogger.debug(`active id=${job.id}`)) + .on('completed', (job, result) => systemLogger.debug(`completed(${result}) id=${job.id}`)) + .on('failed', (job, err) => systemLogger.warn(`failed(${err.stack}) id=${job ? job.id : '-'}`, { job, e: renderError(err) })) + .on('error', (err: Error) => systemLogger.error(`error ${err.stack}`, { e: renderError(err) })) + .on('stalled', (jobId) => systemLogger.warn(`stalled id=${jobId}`)); + } //#endregion //#region db - this.dbQueueWorker = new Bull.Worker(QUEUE.DB, (job) => { - switch (job.name) { - case 'deleteDriveFiles': return this.deleteDriveFilesProcessorService.process(job); - case 'exportCustomEmojis': return this.exportCustomEmojisProcessorService.process(job); - case 'exportNotes': return this.exportNotesProcessorService.process(job); - case 'exportClips': return this.exportClipsProcessorService.process(job); - case 'exportFavorites': return this.exportFavoritesProcessorService.process(job); - case 'exportFollowing': return this.exportFollowingProcessorService.process(job); - case 'exportMuting': return this.exportMutingProcessorService.process(job); - case 'exportBlocking': return this.exportBlockingProcessorService.process(job); - case 'exportUserLists': return this.exportUserListsProcessorService.process(job); - case 'exportAntennas': return this.exportAntennasProcessorService.process(job); - case 'importFollowing': return this.importFollowingProcessorService.process(job); - case 'importFollowingToDb': return this.importFollowingProcessorService.processDb(job); - case 'importMuting': return this.importMutingProcessorService.process(job); - case 'importBlocking': return this.importBlockingProcessorService.process(job); - case 'importBlockingToDb': return this.importBlockingProcessorService.processDb(job); - case 'importUserLists': return this.importUserListsProcessorService.process(job); - case 'importCustomEmojis': return this.importCustomEmojisProcessorService.process(job); - case 'importAntennas': return this.importAntennasProcessorService.process(job); - case 'deleteAccount': return this.deleteAccountProcessorService.process(job); - default: throw new Error(`unrecognized job type ${job.name} for db`); - } - }, { - ...baseQueueOptions(this.config, QUEUE.DB), - autorun: false, - }); + { + const processer = (job: Bull.Job) => { + switch (job.name) { + case 'deleteDriveFiles': return this.deleteDriveFilesProcessorService.process(job); + case 'exportCustomEmojis': return this.exportCustomEmojisProcessorService.process(job); + case 'exportNotes': return this.exportNotesProcessorService.process(job); + case 'exportClips': return this.exportClipsProcessorService.process(job); + case 'exportFavorites': return this.exportFavoritesProcessorService.process(job); + case 'exportFollowing': return this.exportFollowingProcessorService.process(job); + case 'exportMuting': return this.exportMutingProcessorService.process(job); + case 'exportBlocking': return this.exportBlockingProcessorService.process(job); + case 'exportUserLists': return this.exportUserListsProcessorService.process(job); + case 'exportAntennas': return this.exportAntennasProcessorService.process(job); + case 'importFollowing': return this.importFollowingProcessorService.process(job); + case 'importFollowingToDb': return this.importFollowingProcessorService.processDb(job); + case 'importMuting': return this.importMutingProcessorService.process(job); + case 'importBlocking': return this.importBlockingProcessorService.process(job); + case 'importBlockingToDb': return this.importBlockingProcessorService.processDb(job); + case 'importUserLists': return this.importUserListsProcessorService.process(job); + case 'importCustomEmojis': return this.importCustomEmojisProcessorService.process(job); + case 'importAntennas': return this.importAntennasProcessorService.process(job); + case 'deleteAccount': return this.deleteAccountProcessorService.process(job); + default: throw new Error(`unrecognized job type ${job.name} for db`); + } + }; - const dbLogger = this.logger.createSubLogger('db'); + this.dbQueueWorker = new Bull.Worker(QUEUE.DB, (job) => { + if (this.config.sentryForBackend) { + return Sentry.startSpan({ name: 'Queue: DB: ' + job.name }, () => processer(job)); + } else { + return processer(job); + } + }, { + ...baseQueueOptions(this.config, QUEUE.DB), + autorun: false, + }); - this.dbQueueWorker - .on('active', (job) => dbLogger.debug(`active id=${job.id}`)) - .on('completed', (job, result) => dbLogger.debug(`completed(${result}) id=${job.id}`)) - .on('failed', (job, err) => dbLogger.warn(`failed(${err.stack}) id=${job ? job.id : '-'}`, { job, e: renderError(err) })) - .on('error', (err: Error) => dbLogger.error(`error ${err.stack}`, { e: renderError(err) })) - .on('stalled', (jobId) => dbLogger.warn(`stalled id=${jobId}`)); + const dbLogger = this.logger.createSubLogger('db'); + + this.dbQueueWorker + .on('active', (job) => dbLogger.debug(`active id=${job.id}`)) + .on('completed', (job, result) => dbLogger.debug(`completed(${result}) id=${job.id}`)) + .on('failed', (job, err) => dbLogger.warn(`failed(${err.stack}) id=${job ? job.id : '-'}`, { job, e: renderError(err) })) + .on('error', (err: Error) => dbLogger.error(`error ${err.stack}`, { e: renderError(err) })) + .on('stalled', (jobId) => dbLogger.warn(`stalled id=${jobId}`)); + } //#endregion //#region deliver - this.deliverQueueWorker = new Bull.Worker(QUEUE.DELIVER, (job) => this.deliverProcessorService.process(job), { - ...baseQueueOptions(this.config, QUEUE.DELIVER), - autorun: false, - concurrency: this.config.deliverJobConcurrency ?? 128, - limiter: { - max: this.config.deliverJobPerSec ?? 128, - duration: 1000, - }, - settings: { - backoffStrategy: httpRelatedBackoff, - }, - }); + { + this.deliverQueueWorker = new Bull.Worker(QUEUE.DELIVER, (job) => { + if (this.config.sentryForBackend) { + return Sentry.startSpan({ name: 'Queue: Deliver' }, () => this.deliverProcessorService.process(job)); + } else { + return this.deliverProcessorService.process(job); + } + }, { + ...baseQueueOptions(this.config, QUEUE.DELIVER), + autorun: false, + concurrency: this.config.deliverJobConcurrency ?? 128, + limiter: { + max: this.config.deliverJobPerSec ?? 128, + duration: 1000, + }, + settings: { + backoffStrategy: httpRelatedBackoff, + }, + }); - const deliverLogger = this.logger.createSubLogger('deliver'); + const deliverLogger = this.logger.createSubLogger('deliver'); - this.deliverQueueWorker - .on('active', (job) => deliverLogger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`)) - .on('completed', (job, result) => deliverLogger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`)) - .on('failed', (job, err) => deliverLogger.warn(`failed(${err.stack}) ${getJobInfo(job)} to=${job ? job.data.to : '-'}`)) - .on('error', (err: Error) => deliverLogger.error(`error ${err.stack}`, { e: renderError(err) })) - .on('stalled', (jobId) => deliverLogger.warn(`stalled id=${jobId}`)); + this.deliverQueueWorker + .on('active', (job) => deliverLogger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`)) + .on('completed', (job, result) => deliverLogger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`)) + .on('failed', (job, err) => deliverLogger.warn(`failed(${err.stack}) ${getJobInfo(job)} to=${job ? job.data.to : '-'}`)) + .on('error', (err: Error) => deliverLogger.error(`error ${err.stack}`, { e: renderError(err) })) + .on('stalled', (jobId) => deliverLogger.warn(`stalled id=${jobId}`)); + } //#endregion //#region inbox - this.inboxQueueWorker = new Bull.Worker(QUEUE.INBOX, (job) => this.inboxProcessorService.process(job), { - ...baseQueueOptions(this.config, QUEUE.INBOX), - autorun: false, - concurrency: this.config.inboxJobConcurrency ?? 16, - limiter: { - max: this.config.inboxJobPerSec ?? 32, - duration: 1000, - }, - settings: { - backoffStrategy: httpRelatedBackoff, - }, - }); + { + this.inboxQueueWorker = new Bull.Worker(QUEUE.INBOX, (job) => { + if (this.config.sentryForBackend) { + return Sentry.startSpan({ name: 'Queue: Inbox' }, () => this.inboxProcessorService.process(job)); + } else { + return this.inboxProcessorService.process(job); + } + }, { + ...baseQueueOptions(this.config, QUEUE.INBOX), + autorun: false, + concurrency: this.config.inboxJobConcurrency ?? 16, + limiter: { + max: this.config.inboxJobPerSec ?? 32, + duration: 1000, + }, + settings: { + backoffStrategy: httpRelatedBackoff, + }, + }); - const inboxLogger = this.logger.createSubLogger('inbox'); + const inboxLogger = this.logger.createSubLogger('inbox'); - this.inboxQueueWorker - .on('active', (job) => inboxLogger.debug(`active ${getJobInfo(job, true)}`)) - .on('completed', (job, result) => inboxLogger.debug(`completed(${result}) ${getJobInfo(job, true)}`)) - .on('failed', (job, err) => inboxLogger.warn(`failed(${err.stack}) ${getJobInfo(job)} activity=${job ? (job.data.activity ? job.data.activity.id : 'none') : '-'}`, { job, e: renderError(err) })) - .on('error', (err: Error) => inboxLogger.error(`error ${err.stack}`, { e: renderError(err) })) - .on('stalled', (jobId) => inboxLogger.warn(`stalled id=${jobId}`)); + this.inboxQueueWorker + .on('active', (job) => inboxLogger.debug(`active ${getJobInfo(job, true)}`)) + .on('completed', (job, result) => inboxLogger.debug(`completed(${result}) ${getJobInfo(job, true)}`)) + .on('failed', (job, err) => inboxLogger.warn(`failed(${err.stack}) ${getJobInfo(job)} activity=${job ? (job.data.activity ? job.data.activity.id : 'none') : '-'}`, { job, e: renderError(err) })) + .on('error', (err: Error) => inboxLogger.error(`error ${err.stack}`, { e: renderError(err) })) + .on('stalled', (jobId) => inboxLogger.warn(`stalled id=${jobId}`)); + } //#endregion //#region webhook deliver - this.webhookDeliverQueueWorker = new Bull.Worker(QUEUE.WEBHOOK_DELIVER, (job) => this.webhookDeliverProcessorService.process(job), { - ...baseQueueOptions(this.config, QUEUE.WEBHOOK_DELIVER), - autorun: false, - concurrency: 64, - limiter: { - max: 64, - duration: 1000, - }, - settings: { - backoffStrategy: httpRelatedBackoff, - }, - }); + { + this.webhookDeliverQueueWorker = new Bull.Worker(QUEUE.WEBHOOK_DELIVER, (job) => { + if (this.config.sentryForBackend) { + return Sentry.startSpan({ name: 'Queue: WebhookDeliver' }, () => this.webhookDeliverProcessorService.process(job)); + } else { + return this.webhookDeliverProcessorService.process(job); + } + }, { + ...baseQueueOptions(this.config, QUEUE.WEBHOOK_DELIVER), + autorun: false, + concurrency: 64, + limiter: { + max: 64, + duration: 1000, + }, + settings: { + backoffStrategy: httpRelatedBackoff, + }, + }); - const webhookLogger = this.logger.createSubLogger('webhook'); + const webhookLogger = this.logger.createSubLogger('webhook'); - this.webhookDeliverQueueWorker - .on('active', (job) => webhookLogger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`)) - .on('completed', (job, result) => webhookLogger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`)) - .on('failed', (job, err) => webhookLogger.warn(`failed(${err.stack}) ${getJobInfo(job)} to=${job ? job.data.to : '-'}`)) - .on('error', (err: Error) => webhookLogger.error(`error ${err.stack}`, { e: renderError(err) })) - .on('stalled', (jobId) => webhookLogger.warn(`stalled id=${jobId}`)); + this.webhookDeliverQueueWorker + .on('active', (job) => webhookLogger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`)) + .on('completed', (job, result) => webhookLogger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`)) + .on('failed', (job, err) => webhookLogger.warn(`failed(${err.stack}) ${getJobInfo(job)} to=${job ? job.data.to : '-'}`)) + .on('error', (err: Error) => webhookLogger.error(`error ${err.stack}`, { e: renderError(err) })) + .on('stalled', (jobId) => webhookLogger.warn(`stalled id=${jobId}`)); + } //#endregion //#region relationship - this.relationshipQueueWorker = new Bull.Worker(QUEUE.RELATIONSHIP, (job) => { - switch (job.name) { - case 'follow': return this.relationshipProcessorService.processFollow(job); - case 'unfollow': return this.relationshipProcessorService.processUnfollow(job); - case 'block': return this.relationshipProcessorService.processBlock(job); - case 'unblock': return this.relationshipProcessorService.processUnblock(job); - default: throw new Error(`unrecognized job type ${job.name} for relationship`); - } - }, { - ...baseQueueOptions(this.config, QUEUE.RELATIONSHIP), - autorun: false, - concurrency: this.config.relationshipJobConcurrency ?? 16, - limiter: { - max: this.config.relationshipJobPerSec ?? 64, - duration: 1000, - }, - }); + { + const processer = (job: Bull.Job) => { + switch (job.name) { + case 'follow': return this.relationshipProcessorService.processFollow(job); + case 'unfollow': return this.relationshipProcessorService.processUnfollow(job); + case 'block': return this.relationshipProcessorService.processBlock(job); + case 'unblock': return this.relationshipProcessorService.processUnblock(job); + default: throw new Error(`unrecognized job type ${job.name} for relationship`); + } + }; - const relationshipLogger = this.logger.createSubLogger('relationship'); + this.relationshipQueueWorker = new Bull.Worker(QUEUE.RELATIONSHIP, (job) => { + if (this.config.sentryForBackend) { + return Sentry.startSpan({ name: 'Queue: Relationship: ' + job.name }, () => processer(job)); + } else { + return processer(job); + } + }, { + ...baseQueueOptions(this.config, QUEUE.RELATIONSHIP), + autorun: false, + concurrency: this.config.relationshipJobConcurrency ?? 16, + limiter: { + max: this.config.relationshipJobPerSec ?? 64, + duration: 1000, + }, + }); - this.relationshipQueueWorker - .on('active', (job) => relationshipLogger.debug(`active id=${job.id}`)) - .on('completed', (job, result) => relationshipLogger.debug(`completed(${result}) id=${job.id}`)) - .on('failed', (job, err) => relationshipLogger.warn(`failed(${err.stack}) id=${job ? job.id : '-'}`, { job, e: renderError(err) })) - .on('error', (err: Error) => relationshipLogger.error(`error ${err.stack}`, { e: renderError(err) })) - .on('stalled', (jobId) => relationshipLogger.warn(`stalled id=${jobId}`)); + const relationshipLogger = this.logger.createSubLogger('relationship'); + + this.relationshipQueueWorker + .on('active', (job) => relationshipLogger.debug(`active id=${job.id}`)) + .on('completed', (job, result) => relationshipLogger.debug(`completed(${result}) id=${job.id}`)) + .on('failed', (job, err) => relationshipLogger.warn(`failed(${err.stack}) id=${job ? job.id : '-'}`, { job, e: renderError(err) })) + .on('error', (err: Error) => relationshipLogger.error(`error ${err.stack}`, { e: renderError(err) })) + .on('stalled', (jobId) => relationshipLogger.warn(`stalled id=${jobId}`)); + } //#endregion //#region object storage - this.objectStorageQueueWorker = new Bull.Worker(QUEUE.OBJECT_STORAGE, (job) => { - switch (job.name) { - case 'deleteFile': return this.deleteFileProcessorService.process(job); - case 'cleanRemoteFiles': return this.cleanRemoteFilesProcessorService.process(job); - default: throw new Error(`unrecognized job type ${job.name} for objectStorage`); - } - }, { - ...baseQueueOptions(this.config, QUEUE.OBJECT_STORAGE), - autorun: false, - concurrency: 16, - }); + { + const processer = (job: Bull.Job) => { + switch (job.name) { + case 'deleteFile': return this.deleteFileProcessorService.process(job); + case 'cleanRemoteFiles': return this.cleanRemoteFilesProcessorService.process(job); + default: throw new Error(`unrecognized job type ${job.name} for objectStorage`); + } + }; - const objectStorageLogger = this.logger.createSubLogger('objectStorage'); + this.objectStorageQueueWorker = new Bull.Worker(QUEUE.OBJECT_STORAGE, (job) => { + if (this.config.sentryForBackend) { + return Sentry.startSpan({ name: 'Queue: ObjectStorage: ' + job.name }, () => processer(job)); + } else { + return processer(job); + } + }, { + ...baseQueueOptions(this.config, QUEUE.OBJECT_STORAGE), + autorun: false, + concurrency: 16, + }); - this.objectStorageQueueWorker - .on('active', (job) => objectStorageLogger.debug(`active id=${job.id}`)) - .on('completed', (job, result) => objectStorageLogger.debug(`completed(${result}) id=${job.id}`)) - .on('failed', (job, err) => objectStorageLogger.warn(`failed(${err.stack}) id=${job ? job.id : '-'}`, { job, e: renderError(err) })) - .on('error', (err: Error) => objectStorageLogger.error(`error ${err.stack}`, { e: renderError(err) })) - .on('stalled', (jobId) => objectStorageLogger.warn(`stalled id=${jobId}`)); + const objectStorageLogger = this.logger.createSubLogger('objectStorage'); + + this.objectStorageQueueWorker + .on('active', (job) => objectStorageLogger.debug(`active id=${job.id}`)) + .on('completed', (job, result) => objectStorageLogger.debug(`completed(${result}) id=${job.id}`)) + .on('failed', (job, err) => objectStorageLogger.warn(`failed(${err.stack}) id=${job ? job.id : '-'}`, { job, e: renderError(err) })) + .on('error', (err: Error) => objectStorageLogger.error(`error ${err.stack}`, { e: renderError(err) })) + .on('stalled', (jobId) => objectStorageLogger.warn(`stalled id=${jobId}`)); + } //#endregion //#region ended poll notification - this.endedPollNotificationQueueWorker = new Bull.Worker(QUEUE.ENDED_POLL_NOTIFICATION, (job) => this.endedPollNotificationProcessorService.process(job), { - ...baseQueueOptions(this.config, QUEUE.ENDED_POLL_NOTIFICATION), - autorun: false, - }); + { + this.endedPollNotificationQueueWorker = new Bull.Worker(QUEUE.ENDED_POLL_NOTIFICATION, (job) => { + if (this.config.sentryForBackend) { + return Sentry.startSpan({ name: 'Queue: EndedPollNotification' }, () => this.endedPollNotificationProcessorService.process(job)); + } else { + return this.endedPollNotificationProcessorService.process(job); + } + }, { + ...baseQueueOptions(this.config, QUEUE.ENDED_POLL_NOTIFICATION), + autorun: false, + }); + } //#endregion }