Merge upstream

This commit is contained in:
オスカー、 2024-10-18 21:41:33 +09:00
commit d34eadc8b0
No known key found for this signature in database
GPG key ID: 139D6573F92DA9F7
35 changed files with 504 additions and 73 deletions

View file

@ -78,7 +78,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
private logger: Logger;
private systemQueueWorker: Bull.Worker;
private dbQueueWorker: Bull.Worker;
private deliverQueueWorker: Bull.Worker;
private deliverQueueWorkers: Bull.Worker[];
private inboxQueueWorker: Bull.Worker;
private webhookDeliverQueueWorker: Bull.Worker;
private relationshipQueueWorker: Bull.Worker;
@ -215,27 +215,31 @@ export class QueueProcessorService implements OnApplicationShutdown {
//#endregion
//#region deliver
this.deliverQueueWorker = new Bull.Worker(QUEUE.DELIVER, (job) => this.deliverProcessorService.process(job), {
...baseWorkerOptions(this.config.redisForDeliverQueue, this.config.bullmqWorkerOptions, QUEUE.DELIVER),
autorun: false,
concurrency: this.config.deliverJobConcurrency ?? 128,
limiter: {
max: this.config.deliverJobPerSec ?? 128,
duration: 1000,
},
settings: {
backoffStrategy: httpRelatedBackoff,
},
this.deliverQueueWorkers = this.config.redisForDeliverQueues
.filter((_, index) => process.env.QUEUE_WORKER_INDEX == null || index === Number.parseInt(process.env.QUEUE_WORKER_INDEX, 10))
.map(config => new Bull.Worker(QUEUE.DELIVER, (job) => this.deliverProcessorService.process(job), {
...baseWorkerOptions(config, this.config.bullmqWorkerOptions, QUEUE.DELIVER),
autorun: false,
concurrency: this.config.deliverJobConcurrency ?? 128,
limiter: {
max: this.config.deliverJobPerSec ?? 128,
duration: 1000,
},
settings: {
backoffStrategy: httpRelatedBackoff,
},
}));
this.deliverQueueWorkers.forEach((worker, index) => {
const deliverLogger = this.logger.createSubLogger(`deliver-${index}`);
worker
.on('active', (job) => deliverLogger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`))
.on('completed', (job, result) => deliverLogger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`))
.on('failed', (job, err) => deliverLogger.warn(`failed(${err.stack}) ${getJobInfo(job)} to=${job ? job.data.to : '-'}`))
.on('error', (err: Error) => deliverLogger.error(`error ${err.stack}`, { error: renderError(err) }))
.on('stalled', (jobId) => deliverLogger.warn(`stalled id=${jobId}`));
});
const deliverLogger = this.logger.createSubLogger('deliver');
this.deliverQueueWorker
.on('active', (job) => deliverLogger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`))
.on('completed', (job, result) => deliverLogger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`))
.on('failed', (job, err) => deliverLogger.warn(`failed(${err.stack}) ${getJobInfo(job)} to=${job ? job.data.to : '-'}`))
.on('error', (err: Error) => deliverLogger.error(`error ${err.stack}`, { error: renderError(err) }))
.on('stalled', (jobId) => deliverLogger.warn(`stalled id=${jobId}`));
//#endregion
//#region inbox
@ -351,7 +355,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
await Promise.all([
this.systemQueueWorker.run(),
this.dbQueueWorker.run(),
this.deliverQueueWorker.run(),
...this.deliverQueueWorkers.map(worker => worker.run()),
this.inboxQueueWorker.run(),
this.webhookDeliverQueueWorker.run(),
this.relationshipQueueWorker.run(),
@ -365,7 +369,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
await Promise.all([
this.systemQueueWorker.close(),
this.dbQueueWorker.close(),
this.deliverQueueWorker.close(),
...this.deliverQueueWorkers.map(worker => worker.close()),
this.inboxQueueWorker.close(),
this.webhookDeliverQueueWorker.close(),
this.relationshipQueueWorker.close(),