redisForJobQueue の接続を使い回す (MisskeyIO#268)
This commit is contained in:
parent
107cd9788e
commit
7122657f13
6 changed files with 54 additions and 33 deletions
|
@ -5,6 +5,7 @@
|
|||
|
||||
import { Inject, Injectable, OnApplicationShutdown } from '@nestjs/common';
|
||||
import * as Bull from 'bullmq';
|
||||
import * as Redis from 'ioredis';
|
||||
import type { Config } from '@/config.js';
|
||||
import { DI } from '@/di-symbols.js';
|
||||
import type Logger from '@/logger.js';
|
||||
|
@ -84,6 +85,9 @@ export class QueueProcessorService implements OnApplicationShutdown {
|
|||
@Inject(DI.config)
|
||||
private config: Config,
|
||||
|
||||
@Inject(DI.redisForJobQueue)
|
||||
private redisForJobQueue: Redis.Redis,
|
||||
|
||||
private queueLoggerService: QueueLoggerService,
|
||||
private webhookDeliverProcessorService: WebhookDeliverProcessorService,
|
||||
private endedPollNotificationProcessorService: EndedPollNotificationProcessorService,
|
||||
|
@ -146,7 +150,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
|
|||
default: throw new Error(`unrecognized job type ${job.name} for system`);
|
||||
}
|
||||
}, {
|
||||
...baseQueueOptions(this.config, QUEUE.SYSTEM),
|
||||
...baseQueueOptions(this.config, QUEUE.SYSTEM, this.redisForJobQueue),
|
||||
autorun: false,
|
||||
});
|
||||
|
||||
|
@ -185,7 +189,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
|
|||
default: throw new Error(`unrecognized job type ${job.name} for db`);
|
||||
}
|
||||
}, {
|
||||
...baseQueueOptions(this.config, QUEUE.DB),
|
||||
...baseQueueOptions(this.config, QUEUE.DB, this.redisForJobQueue),
|
||||
autorun: false,
|
||||
});
|
||||
|
||||
|
@ -201,7 +205,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
|
|||
|
||||
//#region deliver
|
||||
this.deliverQueueWorker = new Bull.Worker(QUEUE.DELIVER, (job) => this.deliverProcessorService.process(job), {
|
||||
...baseQueueOptions(this.config, QUEUE.DELIVER),
|
||||
...baseQueueOptions(this.config, QUEUE.DELIVER, this.redisForJobQueue),
|
||||
autorun: false,
|
||||
concurrency: this.config.deliverJobConcurrency ?? 128,
|
||||
limiter: {
|
||||
|
@ -225,7 +229,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
|
|||
|
||||
//#region inbox
|
||||
this.inboxQueueWorker = new Bull.Worker(QUEUE.INBOX, (job) => this.inboxProcessorService.process(job), {
|
||||
...baseQueueOptions(this.config, QUEUE.INBOX),
|
||||
...baseQueueOptions(this.config, QUEUE.INBOX, this.redisForJobQueue),
|
||||
autorun: false,
|
||||
concurrency: this.config.inboxJobConcurrency ?? 16,
|
||||
limiter: {
|
||||
|
@ -249,7 +253,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
|
|||
|
||||
//#region webhook deliver
|
||||
this.webhookDeliverQueueWorker = new Bull.Worker(QUEUE.WEBHOOK_DELIVER, (job) => this.webhookDeliverProcessorService.process(job), {
|
||||
...baseQueueOptions(this.config, QUEUE.WEBHOOK_DELIVER),
|
||||
...baseQueueOptions(this.config, QUEUE.WEBHOOK_DELIVER, this.redisForJobQueue),
|
||||
autorun: false,
|
||||
concurrency: 64,
|
||||
limiter: {
|
||||
|
@ -281,7 +285,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
|
|||
default: throw new Error(`unrecognized job type ${job.name} for relationship`);
|
||||
}
|
||||
}, {
|
||||
...baseQueueOptions(this.config, QUEUE.RELATIONSHIP),
|
||||
...baseQueueOptions(this.config, QUEUE.RELATIONSHIP, this.redisForJobQueue),
|
||||
autorun: false,
|
||||
concurrency: this.config.relashionshipJobConcurrency ?? 16,
|
||||
limiter: {
|
||||
|
@ -308,7 +312,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
|
|||
default: throw new Error(`unrecognized job type ${job.name} for objectStorage`);
|
||||
}
|
||||
}, {
|
||||
...baseQueueOptions(this.config, QUEUE.OBJECT_STORAGE),
|
||||
...baseQueueOptions(this.config, QUEUE.OBJECT_STORAGE, this.redisForJobQueue),
|
||||
autorun: false,
|
||||
concurrency: 16,
|
||||
});
|
||||
|
@ -325,7 +329,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
|
|||
|
||||
//#region ended poll notification
|
||||
this.endedPollNotificationQueueWorker = new Bull.Worker(QUEUE.ENDED_POLL_NOTIFICATION, (job) => this.endedPollNotificationProcessorService.process(job), {
|
||||
...baseQueueOptions(this.config, QUEUE.ENDED_POLL_NOTIFICATION),
|
||||
...baseQueueOptions(this.config, QUEUE.ENDED_POLL_NOTIFICATION, this.redisForJobQueue),
|
||||
autorun: false,
|
||||
});
|
||||
//#endregion
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue