mirror of
https://github.com/MisskeyIO/misskey
synced 2024-11-27 06:18:40 +09:00
enhance(queue): deliver queue を複数個設定できるようにする (MisskeyIO#745)
This commit is contained in:
parent
14d71e2064
commit
227c85c2cf
@ -50,7 +50,7 @@ type Source = {
|
|||||||
redisForJobQueue?: RedisOptionsSource;
|
redisForJobQueue?: RedisOptionsSource;
|
||||||
redisForSystemQueue?: RedisOptionsSource;
|
redisForSystemQueue?: RedisOptionsSource;
|
||||||
redisForEndedPollNotificationQueue?: RedisOptionsSource;
|
redisForEndedPollNotificationQueue?: RedisOptionsSource;
|
||||||
redisForDeliverQueue?: RedisOptionsSource;
|
redisForDeliverQueues?: Array<RedisOptionsSource>;
|
||||||
redisForInboxQueue?: RedisOptionsSource;
|
redisForInboxQueue?: RedisOptionsSource;
|
||||||
redisForDbQueue?: RedisOptionsSource;
|
redisForDbQueue?: RedisOptionsSource;
|
||||||
redisForRelationshipQueue?: RedisOptionsSource;
|
redisForRelationshipQueue?: RedisOptionsSource;
|
||||||
@ -220,7 +220,7 @@ export type Config = {
|
|||||||
redisForPubsub: RedisOptions & RedisOptionsSource;
|
redisForPubsub: RedisOptions & RedisOptionsSource;
|
||||||
redisForSystemQueue: RedisOptions & RedisOptionsSource;
|
redisForSystemQueue: RedisOptions & RedisOptionsSource;
|
||||||
redisForEndedPollNotificationQueue: RedisOptions & RedisOptionsSource;
|
redisForEndedPollNotificationQueue: RedisOptions & RedisOptionsSource;
|
||||||
redisForDeliverQueue: RedisOptions & RedisOptionsSource;
|
redisForDeliverQueues: Array<RedisOptions & RedisOptionsSource>;
|
||||||
redisForInboxQueue: RedisOptions & RedisOptionsSource;
|
redisForInboxQueue: RedisOptions & RedisOptionsSource;
|
||||||
redisForDbQueue: RedisOptions & RedisOptionsSource;
|
redisForDbQueue: RedisOptions & RedisOptionsSource;
|
||||||
redisForRelationshipQueue: RedisOptions & RedisOptionsSource;
|
redisForRelationshipQueue: RedisOptions & RedisOptionsSource;
|
||||||
@ -296,7 +296,7 @@ export function loadConfig(): Config {
|
|||||||
redisForPubsub: config.redisForPubsub ? convertRedisOptions(config.redisForPubsub, host) : redis,
|
redisForPubsub: config.redisForPubsub ? convertRedisOptions(config.redisForPubsub, host) : redis,
|
||||||
redisForSystemQueue: config.redisForSystemQueue ? convertRedisOptions(config.redisForSystemQueue, host) : redisForJobQueue,
|
redisForSystemQueue: config.redisForSystemQueue ? convertRedisOptions(config.redisForSystemQueue, host) : redisForJobQueue,
|
||||||
redisForEndedPollNotificationQueue: config.redisForEndedPollNotificationQueue ? convertRedisOptions(config.redisForEndedPollNotificationQueue, host) : redisForJobQueue,
|
redisForEndedPollNotificationQueue: config.redisForEndedPollNotificationQueue ? convertRedisOptions(config.redisForEndedPollNotificationQueue, host) : redisForJobQueue,
|
||||||
redisForDeliverQueue: config.redisForDeliverQueue ? convertRedisOptions(config.redisForDeliverQueue, host) : redisForJobQueue,
|
redisForDeliverQueues: config.redisForDeliverQueues ? config.redisForDeliverQueues.map(config => convertRedisOptions(config, host)) : [redisForJobQueue],
|
||||||
redisForInboxQueue: config.redisForInboxQueue ? convertRedisOptions(config.redisForInboxQueue, host) : redisForJobQueue,
|
redisForInboxQueue: config.redisForInboxQueue ? convertRedisOptions(config.redisForInboxQueue, host) : redisForJobQueue,
|
||||||
redisForDbQueue: config.redisForDbQueue ? convertRedisOptions(config.redisForDbQueue, host) : redisForJobQueue,
|
redisForDbQueue: config.redisForDbQueue ? convertRedisOptions(config.redisForDbQueue, host) : redisForJobQueue,
|
||||||
redisForRelationshipQueue: config.redisForRelationshipQueue ? convertRedisOptions(config.redisForRelationshipQueue, host) : redisForJobQueue,
|
redisForRelationshipQueue: config.redisForRelationshipQueue ? convertRedisOptions(config.redisForRelationshipQueue, host) : redisForJobQueue,
|
||||||
|
@ -9,12 +9,13 @@ import { DI } from '@/di-symbols.js';
|
|||||||
import type { Config } from '@/config.js';
|
import type { Config } from '@/config.js';
|
||||||
import { QUEUE, baseQueueOptions } from '@/queue/const.js';
|
import { QUEUE, baseQueueOptions } from '@/queue/const.js';
|
||||||
import { allSettled } from '@/misc/promise-tracker.js';
|
import { allSettled } from '@/misc/promise-tracker.js';
|
||||||
|
import { Queues } from '@/misc/queues.js';
|
||||||
import type { Provider } from '@nestjs/common';
|
import type { Provider } from '@nestjs/common';
|
||||||
import type { DeliverJobData, InboxJobData, EndedPollNotificationJobData, WebhookDeliverJobData, RelationshipJobData } from '../queue/types.js';
|
import type { DeliverJobData, InboxJobData, EndedPollNotificationJobData, WebhookDeliverJobData, RelationshipJobData } from '../queue/types.js';
|
||||||
|
|
||||||
export type SystemQueue = Bull.Queue<Record<string, unknown>>;
|
export type SystemQueue = Bull.Queue<Record<string, unknown>>;
|
||||||
export type EndedPollNotificationQueue = Bull.Queue<EndedPollNotificationJobData>;
|
export type EndedPollNotificationQueue = Bull.Queue<EndedPollNotificationJobData>;
|
||||||
export type DeliverQueue = Bull.Queue<DeliverJobData>;
|
export type DeliverQueue = Queues<DeliverJobData>;
|
||||||
export type InboxQueue = Bull.Queue<InboxJobData>;
|
export type InboxQueue = Bull.Queue<InboxJobData>;
|
||||||
export type DbQueue = Bull.Queue;
|
export type DbQueue = Bull.Queue;
|
||||||
export type RelationshipQueue = Bull.Queue<RelationshipJobData>;
|
export type RelationshipQueue = Bull.Queue<RelationshipJobData>;
|
||||||
@ -35,7 +36,7 @@ const $endedPollNotification: Provider = {
|
|||||||
|
|
||||||
const $deliver: Provider = {
|
const $deliver: Provider = {
|
||||||
provide: 'queue:deliver',
|
provide: 'queue:deliver',
|
||||||
useFactory: (config: Config) => new Bull.Queue(QUEUE.DELIVER, baseQueueOptions(config.redisForDeliverQueue, config.bullmqQueueOptions, QUEUE.DELIVER)),
|
useFactory: (config: Config) => new Queues(config.redisForDeliverQueues.map(queueConfig => new Bull.Queue(QUEUE.DELIVER, baseQueueOptions(queueConfig, config.bullmqQueueOptions, QUEUE.DELIVER)))),
|
||||||
inject: [DI.config],
|
inject: [DI.config],
|
||||||
};
|
};
|
||||||
|
|
||||||
|
69
packages/backend/src/misc/queues.ts
Normal file
69
packages/backend/src/misc/queues.ts
Normal file
@ -0,0 +1,69 @@
|
|||||||
|
import { EventEmitter } from 'node:events';
|
||||||
|
import * as Bull from 'bullmq';
|
||||||
|
|
||||||
|
export class Queues<DataType = any, ResultType = any, NameType extends string = string> {
|
||||||
|
public readonly queues: ReadonlyArray<Bull.Queue<DataType, ResultType, NameType>>;
|
||||||
|
|
||||||
|
constructor(queues: Bull.Queue<DataType, ResultType, NameType>[]) {
|
||||||
|
if (queues.length === 0) {
|
||||||
|
throw new Error('queues cannot be empty.');
|
||||||
|
}
|
||||||
|
this.queues = queues;
|
||||||
|
}
|
||||||
|
|
||||||
|
getRandomQueue(): Bull.Queue<DataType, ResultType, NameType> {
|
||||||
|
return this.queues[Math.floor(Math.random() * this.queues.length)];
|
||||||
|
}
|
||||||
|
|
||||||
|
add(name: NameType, data: DataType, opts?: Bull.JobsOptions): Promise<Bull.Job<DataType, ResultType, NameType>> {
|
||||||
|
return this.getRandomQueue().add(name, data, opts);
|
||||||
|
}
|
||||||
|
|
||||||
|
async addBulk(jobs: { name: NameType; data: DataType; opts?: Bull.BulkJobOptions }[]): Promise<Bull.Job<DataType, ResultType, NameType>[]> {
|
||||||
|
return (await Promise.allSettled(jobs.map(job => this.add(job.name, job.data, job.opts))))
|
||||||
|
.filter((value): value is PromiseFulfilledResult<Bull.Job<DataType, ResultType, NameType>> => value.status === 'fulfilled')
|
||||||
|
.flatMap(value => value.value);
|
||||||
|
}
|
||||||
|
|
||||||
|
async close(): Promise<void> {
|
||||||
|
await Promise.allSettled(this.queues.map(queue => queue.close()));
|
||||||
|
}
|
||||||
|
|
||||||
|
async getDelayed(start?: number, end?: number): Promise<Bull.Job<DataType, ResultType, NameType>[]> {
|
||||||
|
return (await Promise.allSettled(this.queues.map(queue => queue.getDelayed(start, end))))
|
||||||
|
.filter((value): value is PromiseFulfilledResult<Bull.Job<DataType, ResultType, NameType>[]> => value.status === 'fulfilled')
|
||||||
|
.flatMap(value => value.value);
|
||||||
|
}
|
||||||
|
|
||||||
|
async getJobCounts(...types: Bull.JobType[]): Promise<{ [p: string]: number }> {
|
||||||
|
return (await Promise.allSettled(this.queues.map(queue => queue.getJobCounts(...types))))
|
||||||
|
.filter((value): value is PromiseFulfilledResult<Record<string, number>> => value.status === 'fulfilled')
|
||||||
|
.reduce((previousValue, currentValue) => {
|
||||||
|
for (const key in currentValue.value) {
|
||||||
|
previousValue[key] = (previousValue[key] || 0) + currentValue.value[key];
|
||||||
|
}
|
||||||
|
return previousValue;
|
||||||
|
}, {} as Record<string, number>);
|
||||||
|
}
|
||||||
|
|
||||||
|
once<U extends keyof Bull.QueueListener<DataType, ResultType, NameType>>(event: U, listener: Bull.QueueListener<DataType, ResultType, NameType>[U]): void {
|
||||||
|
const e = new EventEmitter();
|
||||||
|
e.once(event, listener);
|
||||||
|
|
||||||
|
const listener1 = (...args: any[]) => e.emit(event, ...args);
|
||||||
|
this.queues.forEach(queue => queue.once(event, listener1));
|
||||||
|
e.once(event, () => this.queues.forEach(queue => queue.off(event, listener1)));
|
||||||
|
}
|
||||||
|
|
||||||
|
async clean(grace: number, limit: number, type?: 'completed' | 'wait' | 'active' | 'paused' | 'prioritized' | 'delayed' | 'failed'): Promise<NameType[]> {
|
||||||
|
return (await Promise.allSettled(this.queues.map(queue => queue.clean(grace, limit, type))))
|
||||||
|
.filter((value): value is PromiseFulfilledResult<NameType[]> => value.status === 'fulfilled')
|
||||||
|
.flatMap(value => value.value);
|
||||||
|
}
|
||||||
|
|
||||||
|
async getJobs(types?: Bull.JobType[] | Bull.JobType, start?: number, end?: number, asc?: boolean): Promise<Bull.Job<DataType, ResultType, NameType>[]> {
|
||||||
|
return (await Promise.allSettled(this.queues.map(queue => queue.getJobs(types, start, end, asc))))
|
||||||
|
.filter((value): value is PromiseFulfilledResult<Bull.Job<DataType, ResultType, NameType>[]> => value.status === 'fulfilled')
|
||||||
|
.flatMap(value => value.value);
|
||||||
|
}
|
||||||
|
}
|
@ -75,7 +75,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
|
|||||||
private logger: Logger;
|
private logger: Logger;
|
||||||
private systemQueueWorker: Bull.Worker;
|
private systemQueueWorker: Bull.Worker;
|
||||||
private dbQueueWorker: Bull.Worker;
|
private dbQueueWorker: Bull.Worker;
|
||||||
private deliverQueueWorker: Bull.Worker;
|
private deliverQueueWorkers: Bull.Worker[];
|
||||||
private inboxQueueWorker: Bull.Worker;
|
private inboxQueueWorker: Bull.Worker;
|
||||||
private webhookDeliverQueueWorker: Bull.Worker;
|
private webhookDeliverQueueWorker: Bull.Worker;
|
||||||
private relationshipQueueWorker: Bull.Worker;
|
private relationshipQueueWorker: Bull.Worker;
|
||||||
@ -206,27 +206,31 @@ export class QueueProcessorService implements OnApplicationShutdown {
|
|||||||
//#endregion
|
//#endregion
|
||||||
|
|
||||||
//#region deliver
|
//#region deliver
|
||||||
this.deliverQueueWorker = new Bull.Worker(QUEUE.DELIVER, (job) => this.deliverProcessorService.process(job), {
|
this.deliverQueueWorkers = this.config.redisForDeliverQueues
|
||||||
...baseWorkerOptions(this.config.redisForDeliverQueue, this.config.bullmqWorkerOptions, QUEUE.DELIVER),
|
.filter((_, index) => process.env.QUEUE_WORKER_INDEX == null || index === Number.parseInt(process.env.QUEUE_WORKER_INDEX, 10))
|
||||||
autorun: false,
|
.map(config => new Bull.Worker(QUEUE.DELIVER, (job) => this.deliverProcessorService.process(job), {
|
||||||
concurrency: this.config.deliverJobConcurrency ?? 128,
|
...baseWorkerOptions(config, this.config.bullmqWorkerOptions, QUEUE.DELIVER),
|
||||||
limiter: {
|
autorun: false,
|
||||||
max: this.config.deliverJobPerSec ?? 128,
|
concurrency: this.config.deliverJobConcurrency ?? 128,
|
||||||
duration: 1000,
|
limiter: {
|
||||||
},
|
max: this.config.deliverJobPerSec ?? 128,
|
||||||
settings: {
|
duration: 1000,
|
||||||
backoffStrategy: httpRelatedBackoff,
|
},
|
||||||
},
|
settings: {
|
||||||
|
backoffStrategy: httpRelatedBackoff,
|
||||||
|
},
|
||||||
|
}));
|
||||||
|
|
||||||
|
this.deliverQueueWorkers.forEach((worker, index) => {
|
||||||
|
const deliverLogger = this.logger.createSubLogger(`deliver-${index}`);
|
||||||
|
|
||||||
|
worker
|
||||||
|
.on('active', (job) => deliverLogger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`))
|
||||||
|
.on('completed', (job, result) => deliverLogger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`))
|
||||||
|
.on('failed', (job, err) => deliverLogger.warn(`failed(${err.stack}) ${getJobInfo(job)} to=${job ? job.data.to : '-'}`))
|
||||||
|
.on('error', (err: Error) => deliverLogger.error(`error ${err.stack}`, { error: renderError(err) }))
|
||||||
|
.on('stalled', (jobId) => deliverLogger.warn(`stalled id=${jobId}`));
|
||||||
});
|
});
|
||||||
|
|
||||||
const deliverLogger = this.logger.createSubLogger('deliver');
|
|
||||||
|
|
||||||
this.deliverQueueWorker
|
|
||||||
.on('active', (job) => deliverLogger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`))
|
|
||||||
.on('completed', (job, result) => deliverLogger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`))
|
|
||||||
.on('failed', (job, err) => deliverLogger.warn(`failed(${err.stack}) ${getJobInfo(job)} to=${job ? job.data.to : '-'}`))
|
|
||||||
.on('error', (err: Error) => deliverLogger.error(`error ${err.stack}`, { error: renderError(err) }))
|
|
||||||
.on('stalled', (jobId) => deliverLogger.warn(`stalled id=${jobId}`));
|
|
||||||
//#endregion
|
//#endregion
|
||||||
|
|
||||||
//#region inbox
|
//#region inbox
|
||||||
@ -342,7 +346,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
|
|||||||
await Promise.all([
|
await Promise.all([
|
||||||
this.systemQueueWorker.run(),
|
this.systemQueueWorker.run(),
|
||||||
this.dbQueueWorker.run(),
|
this.dbQueueWorker.run(),
|
||||||
this.deliverQueueWorker.run(),
|
...this.deliverQueueWorkers.map(worker => worker.run()),
|
||||||
this.inboxQueueWorker.run(),
|
this.inboxQueueWorker.run(),
|
||||||
this.webhookDeliverQueueWorker.run(),
|
this.webhookDeliverQueueWorker.run(),
|
||||||
this.relationshipQueueWorker.run(),
|
this.relationshipQueueWorker.run(),
|
||||||
@ -356,7 +360,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
|
|||||||
await Promise.all([
|
await Promise.all([
|
||||||
this.systemQueueWorker.close(),
|
this.systemQueueWorker.close(),
|
||||||
this.dbQueueWorker.close(),
|
this.dbQueueWorker.close(),
|
||||||
this.deliverQueueWorker.close(),
|
...this.deliverQueueWorkers.map(worker => worker.close()),
|
||||||
this.inboxQueueWorker.close(),
|
this.inboxQueueWorker.close(),
|
||||||
this.webhookDeliverQueueWorker.close(),
|
this.webhookDeliverQueueWorker.close(),
|
||||||
this.relationshipQueueWorker.close(),
|
this.relationshipQueueWorker.close(),
|
||||||
|
@ -235,12 +235,12 @@ export class ClientServerService {
|
|||||||
queues: [
|
queues: [
|
||||||
this.systemQueue,
|
this.systemQueue,
|
||||||
this.endedPollNotificationQueue,
|
this.endedPollNotificationQueue,
|
||||||
this.deliverQueue,
|
|
||||||
this.inboxQueue,
|
this.inboxQueue,
|
||||||
this.dbQueue,
|
this.dbQueue,
|
||||||
this.objectStorageQueue,
|
this.objectStorageQueue,
|
||||||
this.webhookDeliverQueue,
|
this.webhookDeliverQueue,
|
||||||
].map(q => new BullMQAdapter(q)),
|
].map(q => new BullMQAdapter(q))
|
||||||
|
.concat(this.deliverQueue.queues.map((q, index) => new BullMQAdapter(q, { prefix: `${index}-` }))),
|
||||||
serverAdapter,
|
serverAdapter,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -53,6 +53,7 @@ const devConfig = {
|
|||||||
'/cli': httpUrl,
|
'/cli': httpUrl,
|
||||||
'/inbox': httpUrl,
|
'/inbox': httpUrl,
|
||||||
'/emoji/': httpUrl,
|
'/emoji/': httpUrl,
|
||||||
|
'/queue': httpUrl,
|
||||||
'/notes': {
|
'/notes': {
|
||||||
target: httpUrl,
|
target: httpUrl,
|
||||||
headers: {
|
headers: {
|
||||||
|
Loading…
Reference in New Issue
Block a user