1
0
mirror of https://github.com/MisskeyIO/misskey synced 2024-11-23 22:56:49 +09:00

enhance(queue): inbox と relationship にも MisskeyIO#745 を適用 (MisskeyIO#752)

This commit is contained in:
riku6460 2024-10-18 23:06:33 +09:00 committed by GitHub
parent a8bbccbefa
commit c47140eab7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 72 additions and 64 deletions

View File

@ -51,9 +51,9 @@ type Source = {
redisForSystemQueue?: RedisOptionsSource; redisForSystemQueue?: RedisOptionsSource;
redisForEndedPollNotificationQueue?: RedisOptionsSource; redisForEndedPollNotificationQueue?: RedisOptionsSource;
redisForDeliverQueues?: Array<RedisOptionsSource>; redisForDeliverQueues?: Array<RedisOptionsSource>;
redisForInboxQueue?: RedisOptionsSource; redisForInboxQueues?: Array<RedisOptionsSource>;
redisForDbQueue?: RedisOptionsSource; redisForDbQueue?: RedisOptionsSource;
redisForRelationshipQueue?: RedisOptionsSource; redisForRelationshipQueues?: Array<RedisOptionsSource>;
redisForObjectStorageQueue?: RedisOptionsSource; redisForObjectStorageQueue?: RedisOptionsSource;
redisForWebhookDeliverQueue?: RedisOptionsSource; redisForWebhookDeliverQueue?: RedisOptionsSource;
redisForTimelines?: RedisOptionsSource; redisForTimelines?: RedisOptionsSource;
@ -221,9 +221,9 @@ export type Config = {
redisForSystemQueue: RedisOptions & RedisOptionsSource; redisForSystemQueue: RedisOptions & RedisOptionsSource;
redisForEndedPollNotificationQueue: RedisOptions & RedisOptionsSource; redisForEndedPollNotificationQueue: RedisOptions & RedisOptionsSource;
redisForDeliverQueues: Array<RedisOptions & RedisOptionsSource>; redisForDeliverQueues: Array<RedisOptions & RedisOptionsSource>;
redisForInboxQueue: RedisOptions & RedisOptionsSource; redisForInboxQueues: Array<RedisOptions & RedisOptionsSource>;
redisForDbQueue: RedisOptions & RedisOptionsSource; redisForDbQueue: RedisOptions & RedisOptionsSource;
redisForRelationshipQueue: RedisOptions & RedisOptionsSource; redisForRelationshipQueues: Array<RedisOptions & RedisOptionsSource>;
redisForObjectStorageQueue: RedisOptions & RedisOptionsSource; redisForObjectStorageQueue: RedisOptions & RedisOptionsSource;
redisForWebhookDeliverQueue: RedisOptions & RedisOptionsSource; redisForWebhookDeliverQueue: RedisOptions & RedisOptionsSource;
redisForTimelines: RedisOptions & RedisOptionsSource; redisForTimelines: RedisOptions & RedisOptionsSource;
@ -297,9 +297,9 @@ export function loadConfig(): Config {
redisForSystemQueue: config.redisForSystemQueue ? convertRedisOptions(config.redisForSystemQueue, host) : redisForJobQueue, redisForSystemQueue: config.redisForSystemQueue ? convertRedisOptions(config.redisForSystemQueue, host) : redisForJobQueue,
redisForEndedPollNotificationQueue: config.redisForEndedPollNotificationQueue ? convertRedisOptions(config.redisForEndedPollNotificationQueue, host) : redisForJobQueue, redisForEndedPollNotificationQueue: config.redisForEndedPollNotificationQueue ? convertRedisOptions(config.redisForEndedPollNotificationQueue, host) : redisForJobQueue,
redisForDeliverQueues: config.redisForDeliverQueues ? config.redisForDeliverQueues.map(config => convertRedisOptions(config, host)) : [redisForJobQueue], redisForDeliverQueues: config.redisForDeliverQueues ? config.redisForDeliverQueues.map(config => convertRedisOptions(config, host)) : [redisForJobQueue],
redisForInboxQueue: config.redisForInboxQueue ? convertRedisOptions(config.redisForInboxQueue, host) : redisForJobQueue, redisForInboxQueues: config.redisForInboxQueues ? config.redisForInboxQueues.map(config => convertRedisOptions(config, host)) : [redisForJobQueue],
redisForDbQueue: config.redisForDbQueue ? convertRedisOptions(config.redisForDbQueue, host) : redisForJobQueue, redisForDbQueue: config.redisForDbQueue ? convertRedisOptions(config.redisForDbQueue, host) : redisForJobQueue,
redisForRelationshipQueue: config.redisForRelationshipQueue ? convertRedisOptions(config.redisForRelationshipQueue, host) : redisForJobQueue, redisForRelationshipQueues: config.redisForRelationshipQueues ? config.redisForRelationshipQueues.map(config => convertRedisOptions(config, host)) : [redisForJobQueue],
redisForObjectStorageQueue: config.redisForObjectStorageQueue ? convertRedisOptions(config.redisForObjectStorageQueue, host) : redisForJobQueue, redisForObjectStorageQueue: config.redisForObjectStorageQueue ? convertRedisOptions(config.redisForObjectStorageQueue, host) : redisForJobQueue,
redisForWebhookDeliverQueue: config.redisForWebhookDeliverQueue ? convertRedisOptions(config.redisForWebhookDeliverQueue, host) : redisForJobQueue, redisForWebhookDeliverQueue: config.redisForWebhookDeliverQueue ? convertRedisOptions(config.redisForWebhookDeliverQueue, host) : redisForJobQueue,
redisForTimelines: config.redisForTimelines ? convertRedisOptions(config.redisForTimelines, host) : redis, redisForTimelines: config.redisForTimelines ? convertRedisOptions(config.redisForTimelines, host) : redis,

View File

@ -16,9 +16,9 @@ import type { DeliverJobData, InboxJobData, EndedPollNotificationJobData, Webhoo
export type SystemQueue = Bull.Queue<Record<string, unknown>>; export type SystemQueue = Bull.Queue<Record<string, unknown>>;
export type EndedPollNotificationQueue = Bull.Queue<EndedPollNotificationJobData>; export type EndedPollNotificationQueue = Bull.Queue<EndedPollNotificationJobData>;
export type DeliverQueue = Queues<DeliverJobData>; export type DeliverQueue = Queues<DeliverJobData>;
export type InboxQueue = Bull.Queue<InboxJobData>; export type InboxQueue = Queues<InboxJobData>;
export type DbQueue = Bull.Queue; export type DbQueue = Bull.Queue;
export type RelationshipQueue = Bull.Queue<RelationshipJobData>; export type RelationshipQueue = Queues<RelationshipJobData>;
export type ObjectStorageQueue = Bull.Queue; export type ObjectStorageQueue = Bull.Queue;
export type WebhookDeliverQueue = Bull.Queue<WebhookDeliverJobData>; export type WebhookDeliverQueue = Bull.Queue<WebhookDeliverJobData>;
@ -42,7 +42,7 @@ const $deliver: Provider = {
const $inbox: Provider = { const $inbox: Provider = {
provide: 'queue:inbox', provide: 'queue:inbox',
useFactory: (config: Config) => new Bull.Queue(QUEUE.INBOX, baseQueueOptions(config.redisForInboxQueue, config.bullmqQueueOptions, QUEUE.INBOX)), useFactory: (config: Config) => new Queues(config.redisForInboxQueues.map(queueConfig => new Bull.Queue(QUEUE.INBOX, baseQueueOptions(queueConfig, config.bullmqQueueOptions, QUEUE.INBOX)))),
inject: [DI.config], inject: [DI.config],
}; };
@ -54,7 +54,7 @@ const $db: Provider = {
const $relationship: Provider = { const $relationship: Provider = {
provide: 'queue:relationship', provide: 'queue:relationship',
useFactory: (config: Config) => new Bull.Queue(QUEUE.RELATIONSHIP, baseQueueOptions(config.redisForRelationshipQueue, config.bullmqQueueOptions, QUEUE.RELATIONSHIP)), useFactory: (config: Config) => new Queues(config.redisForRelationshipQueues.map(queueConfig => new Bull.Queue(QUEUE.RELATIONSHIP, baseQueueOptions(queueConfig, config.bullmqQueueOptions, QUEUE.RELATIONSHIP)))),
inject: [DI.config], inject: [DI.config],
}; };

View File

@ -76,9 +76,9 @@ export class QueueProcessorService implements OnApplicationShutdown {
private systemQueueWorker: Bull.Worker; private systemQueueWorker: Bull.Worker;
private dbQueueWorker: Bull.Worker; private dbQueueWorker: Bull.Worker;
private deliverQueueWorkers: Bull.Worker[]; private deliverQueueWorkers: Bull.Worker[];
private inboxQueueWorker: Bull.Worker; private inboxQueueWorkers: Bull.Worker[];
private webhookDeliverQueueWorker: Bull.Worker; private webhookDeliverQueueWorker: Bull.Worker;
private relationshipQueueWorker: Bull.Worker; private relationshipQueueWorkers: Bull.Worker[];
private objectStorageQueueWorker: Bull.Worker; private objectStorageQueueWorker: Bull.Worker;
private endedPollNotificationQueueWorker: Bull.Worker; private endedPollNotificationQueueWorker: Bull.Worker;
@ -234,8 +234,10 @@ export class QueueProcessorService implements OnApplicationShutdown {
//#endregion //#endregion
//#region inbox //#region inbox
this.inboxQueueWorker = new Bull.Worker(QUEUE.INBOX, (job) => this.inboxProcessorService.process(job), { this.inboxQueueWorkers = this.config.redisForInboxQueues
...baseWorkerOptions(this.config.redisForInboxQueue, this.config.bullmqWorkerOptions, QUEUE.INBOX), .filter((_, index) => process.env.QUEUE_WORKER_INDEX == null || index === Number.parseInt(process.env.QUEUE_WORKER_INDEX, 10))
.map(config => new Bull.Worker(QUEUE.INBOX, (job) => this.inboxProcessorService.process(job), {
...baseWorkerOptions(config, this.config.bullmqWorkerOptions, QUEUE.INBOX),
autorun: false, autorun: false,
concurrency: this.config.inboxJobConcurrency ?? 16, concurrency: this.config.inboxJobConcurrency ?? 16,
limiter: { limiter: {
@ -245,16 +247,18 @@ export class QueueProcessorService implements OnApplicationShutdown {
settings: { settings: {
backoffStrategy: httpRelatedBackoff, backoffStrategy: httpRelatedBackoff,
}, },
}); }));
const inboxLogger = this.logger.createSubLogger('inbox'); this.inboxQueueWorkers.forEach((worker, index) => {
const inboxLogger = this.logger.createSubLogger(`inbox-${index}`);
this.inboxQueueWorker worker
.on('active', (job) => inboxLogger.debug(`active ${getJobInfo(job, true)}`)) .on('active', (job) => inboxLogger.debug(`active ${getJobInfo(job, true)}`))
.on('completed', (job, result) => inboxLogger.debug(`completed(${result}) ${getJobInfo(job, true)}`)) .on('completed', (job, result) => inboxLogger.debug(`completed(${result}) ${getJobInfo(job, true)}`))
.on('failed', (job, err) => inboxLogger.warn(`failed(${err.stack}) ${getJobInfo(job)} activity=${job ? (job.data.activity ? job.data.activity.id : 'none') : '-'}`, { job, error: renderError(err) })) .on('failed', (job, err) => inboxLogger.warn(`failed(${err.stack}) ${getJobInfo(job)} activity=${job ? (job.data.activity ? job.data.activity.id : 'none') : '-'}`, { job, error: renderError(err) }))
.on('error', (err: Error) => inboxLogger.error(`error ${err.stack}`, { error: renderError(err) })) .on('error', (err: Error) => inboxLogger.error(`error ${err.stack}`, { error: renderError(err) }))
.on('stalled', (jobId) => inboxLogger.warn(`stalled id=${jobId}`)); .on('stalled', (jobId) => inboxLogger.warn(`stalled id=${jobId}`));
});
//#endregion //#endregion
//#region webhook deliver //#region webhook deliver
@ -282,7 +286,9 @@ export class QueueProcessorService implements OnApplicationShutdown {
//#endregion //#endregion
//#region relationship //#region relationship
this.relationshipQueueWorker = new Bull.Worker(QUEUE.RELATIONSHIP, (job) => { this.relationshipQueueWorkers = this.config.redisForRelationshipQueues
.filter((_, index) => process.env.QUEUE_WORKER_INDEX == null || index === Number.parseInt(process.env.QUEUE_WORKER_INDEX, 10))
.map(config => new Bull.Worker(QUEUE.RELATIONSHIP, (job) => {
switch (job.name) { switch (job.name) {
case 'follow': return this.relationshipProcessorService.processFollow(job); case 'follow': return this.relationshipProcessorService.processFollow(job);
case 'unfollow': return this.relationshipProcessorService.processUnfollow(job); case 'unfollow': return this.relationshipProcessorService.processUnfollow(job);
@ -291,23 +297,25 @@ export class QueueProcessorService implements OnApplicationShutdown {
default: throw new Error(`unrecognized job type ${job.name} for relationship`); default: throw new Error(`unrecognized job type ${job.name} for relationship`);
} }
}, { }, {
...baseWorkerOptions(this.config.redisForRelationshipQueue, this.config.bullmqWorkerOptions, QUEUE.RELATIONSHIP), ...baseWorkerOptions(config, this.config.bullmqWorkerOptions, QUEUE.RELATIONSHIP),
autorun: false, autorun: false,
concurrency: this.config.relationshipJobConcurrency ?? 16, concurrency: this.config.relationshipJobConcurrency ?? 16,
limiter: { limiter: {
max: this.config.relationshipJobPerSec ?? 64, max: this.config.relationshipJobPerSec ?? 64,
duration: 1000, duration: 1000,
}, },
}); }));
const relationshipLogger = this.logger.createSubLogger('relationship'); this.relationshipQueueWorkers.forEach((worker, index) => {
const relationshipLogger = this.logger.createSubLogger(`relationship-${index}`);
this.relationshipQueueWorker worker
.on('active', (job) => relationshipLogger.debug(`active id=${job.id}`)) .on('active', (job) => relationshipLogger.debug(`active id=${job.id}`))
.on('completed', (job, result) => relationshipLogger.debug(`completed(${result}) id=${job.id}`)) .on('completed', (job, result) => relationshipLogger.debug(`completed(${result}) id=${job.id}`))
.on('failed', (job, err) => relationshipLogger.warn(`failed(${err.stack}) id=${job ? job.id : '-'}`, { job, error: renderError(err) })) .on('failed', (job, err) => relationshipLogger.warn(`failed(${err.stack}) id=${job ? job.id : '-'}`, { job, error: renderError(err) }))
.on('error', (err: Error) => relationshipLogger.error(`error ${err.stack}`, { error: renderError(err) })) .on('error', (err: Error) => relationshipLogger.error(`error ${err.stack}`, { error: renderError(err) }))
.on('stalled', (jobId) => relationshipLogger.warn(`stalled id=${jobId}`)); .on('stalled', (jobId) => relationshipLogger.warn(`stalled id=${jobId}`));
});
//#endregion //#endregion
//#region object storage //#region object storage
@ -347,9 +355,9 @@ export class QueueProcessorService implements OnApplicationShutdown {
this.systemQueueWorker.run(), this.systemQueueWorker.run(),
this.dbQueueWorker.run(), this.dbQueueWorker.run(),
...this.deliverQueueWorkers.map(worker => worker.run()), ...this.deliverQueueWorkers.map(worker => worker.run()),
this.inboxQueueWorker.run(), this.inboxQueueWorkers.map(worker => worker.run()),
this.webhookDeliverQueueWorker.run(), this.webhookDeliverQueueWorker.run(),
this.relationshipQueueWorker.run(), this.relationshipQueueWorkers.map(worker => worker.run()),
this.objectStorageQueueWorker.run(), this.objectStorageQueueWorker.run(),
this.endedPollNotificationQueueWorker.run(), this.endedPollNotificationQueueWorker.run(),
]); ]);
@ -361,9 +369,9 @@ export class QueueProcessorService implements OnApplicationShutdown {
this.systemQueueWorker.close(), this.systemQueueWorker.close(),
this.dbQueueWorker.close(), this.dbQueueWorker.close(),
...this.deliverQueueWorkers.map(worker => worker.close()), ...this.deliverQueueWorkers.map(worker => worker.close()),
this.inboxQueueWorker.close(), this.inboxQueueWorkers.map(worker => worker.close()),
this.webhookDeliverQueueWorker.close(), this.webhookDeliverQueueWorker.close(),
this.relationshipQueueWorker.close(), this.relationshipQueueWorkers.map(worker => worker.close()),
this.objectStorageQueueWorker.close(), this.objectStorageQueueWorker.close(),
this.endedPollNotificationQueueWorker.close(), this.endedPollNotificationQueueWorker.close(),
]); ]);

View File

@ -245,13 +245,13 @@ export class ClientServerService {
queues: [ queues: [
this.systemQueue, this.systemQueue,
this.endedPollNotificationQueue, this.endedPollNotificationQueue,
this.inboxQueue,
this.dbQueue, this.dbQueue,
this.relationshipQueue,
this.objectStorageQueue, this.objectStorageQueue,
this.webhookDeliverQueue, this.webhookDeliverQueue,
].map(q => new BullMQAdapter(q)) ].map(q => new BullMQAdapter(q))
.concat(this.deliverQueue.queues.map((q, index) => new BullMQAdapter(q, { prefix: `${index}-` }))), .concat(this.deliverQueue.queues.map((q, index) => new BullMQAdapter(q, { prefix: `${index}-` })))
.concat(this.inboxQueue.queues.map((q, index) => new BullMQAdapter(q, { prefix: `${index}-` })))
.concat(this.relationshipQueue.queues.map((q, index) => new BullMQAdapter(q, { prefix: `${index}-` }))),
serverAdapter, serverAdapter,
}); });