mirror of
https://github.com/MisskeyIO/misskey
synced 2024-11-23 14:46:40 +09:00
BullMQ + DragonflyDB で Hashtag を使用しすべてをロックしないようにする
This commit is contained in:
parent
30ad8544de
commit
b0d0dbeffc
@ -24,7 +24,8 @@ services:
|
||||
DFLY_snapshot_cron: '* * * * *'
|
||||
DFLY_version_check: false
|
||||
DFLY_tcp_backlog: 2048
|
||||
DFLY_default_lua_flags: allow-undeclared-keys
|
||||
DFLY_cluster_mode: emulated
|
||||
DFLY_lock_on_hashtags: true
|
||||
DFLY_pipeline_squash: 0
|
||||
DFLY_multi_exec_squash: false
|
||||
DFLY_conn_io_threads: 4
|
||||
|
6
.github/workflows/test-backend.yml
vendored
6
.github/workflows/test-backend.yml
vendored
@ -38,7 +38,8 @@ jobs:
|
||||
env:
|
||||
DFLY_version_check: false
|
||||
DFLY_tcp_backlog: 2048
|
||||
DFLY_default_lua_flags: allow-undeclared-keys
|
||||
DFLY_cluster_mode: emulated
|
||||
DFLY_lock_on_hashtags: true
|
||||
DFLY_pipeline_squash: 0
|
||||
DFLY_multi_exec_squash: false
|
||||
DFLY_conn_io_threads: 4
|
||||
@ -100,7 +101,8 @@ jobs:
|
||||
env:
|
||||
DFLY_version_check: false
|
||||
DFLY_tcp_backlog: 2048
|
||||
DFLY_default_lua_flags: allow-undeclared-keys
|
||||
DFLY_cluster_mode: emulated
|
||||
DFLY_lock_on_hashtags: true
|
||||
DFLY_pipeline_squash: 0
|
||||
DFLY_multi_exec_squash: false
|
||||
DFLY_conn_io_threads: 4
|
||||
|
@ -44,8 +44,10 @@ spec:
|
||||
value: false
|
||||
- name: DFLY_tcp_backlog
|
||||
value: 2048
|
||||
- name: DFLY_default_lua_flags
|
||||
value: allow-undeclared-keys
|
||||
- name: DFLY_cluster_mode
|
||||
value: emulated
|
||||
- name: DFLY_lock_on_hashtags
|
||||
value: true
|
||||
- name: DFLY_pipeline_squash
|
||||
value: 0
|
||||
- name: DFLY_multi_exec_squash
|
||||
|
@ -12,7 +12,8 @@ services:
|
||||
DFLY_snapshot_cron: '* * * * *'
|
||||
DFLY_version_check: false
|
||||
DFLY_tcp_backlog: 2048
|
||||
DFLY_default_lua_flags: allow-undeclared-keys
|
||||
DFLY_cluster_mode: emulated
|
||||
DFLY_lock_on_hashtags: true
|
||||
DFLY_pipeline_squash: 0
|
||||
DFLY_multi_exec_squash: false
|
||||
DFLY_conn_io_threads: 4
|
||||
|
@ -32,7 +32,8 @@ services:
|
||||
DFLY_snapshot_cron: '* * * * *'
|
||||
DFLY_version_check: false
|
||||
DFLY_tcp_backlog: 2048
|
||||
DFLY_default_lua_flags: allow-undeclared-keys
|
||||
DFLY_cluster_mode: emulated
|
||||
DFLY_lock_on_hashtags: true
|
||||
DFLY_pipeline_squash: 0
|
||||
DFLY_multi_exec_squash: false
|
||||
DFLY_conn_io_threads: 4
|
||||
|
@ -36,13 +36,13 @@ const $endedPollNotification: Provider = {
|
||||
|
||||
const $deliver: Provider = {
|
||||
provide: 'queue:deliver',
|
||||
useFactory: (config: Config) => new Queues(config.redisForDeliverQueues.map(queueConfig => new Bull.Queue(QUEUE.DELIVER, baseQueueOptions(queueConfig, config.bullmqQueueOptions, QUEUE.DELIVER)))),
|
||||
useFactory: (config: Config) => new Queues(config.redisForDeliverQueues.map((queueConfig, index) => new Bull.Queue(`${QUEUE.DELIVER}-${index}`, baseQueueOptions(queueConfig, config.bullmqQueueOptions, QUEUE.DELIVER, index))),),
|
||||
inject: [DI.config],
|
||||
};
|
||||
|
||||
const $inbox: Provider = {
|
||||
provide: 'queue:inbox',
|
||||
useFactory: (config: Config) => new Queues(config.redisForInboxQueues.map(queueConfig => new Bull.Queue(QUEUE.INBOX, baseQueueOptions(queueConfig, config.bullmqQueueOptions, QUEUE.INBOX)))),
|
||||
useFactory: (config: Config) => new Queues(config.redisForInboxQueues.map((queueConfig, index) => new Bull.Queue(`${QUEUE.INBOX}-${index}`, baseQueueOptions(queueConfig, config.bullmqQueueOptions, QUEUE.INBOX, index)))),
|
||||
inject: [DI.config],
|
||||
};
|
||||
|
||||
@ -54,7 +54,7 @@ const $db: Provider = {
|
||||
|
||||
const $relationship: Provider = {
|
||||
provide: 'queue:relationship',
|
||||
useFactory: (config: Config) => new Queues(config.redisForRelationshipQueues.map(queueConfig => new Bull.Queue(QUEUE.RELATIONSHIP, baseQueueOptions(queueConfig, config.bullmqQueueOptions, QUEUE.RELATIONSHIP)))),
|
||||
useFactory: (config: Config) => new Queues(config.redisForRelationshipQueues.map((queueConfig, index) => new Bull.Queue(`${QUEUE.RELATIONSHIP}-${index}`, baseQueueOptions(queueConfig, config.bullmqQueueOptions, QUEUE.RELATIONSHIP, index)))),
|
||||
inject: [DI.config],
|
||||
};
|
||||
|
||||
|
@ -208,8 +208,8 @@ export class QueueProcessorService implements OnApplicationShutdown {
|
||||
//#region deliver
|
||||
this.deliverQueueWorkers = this.config.redisForDeliverQueues
|
||||
.filter((_, index) => process.env.QUEUE_WORKER_INDEX == null || index === Number.parseInt(process.env.QUEUE_WORKER_INDEX, 10))
|
||||
.map(config => new Bull.Worker(QUEUE.DELIVER, (job) => this.deliverProcessorService.process(job), {
|
||||
...baseWorkerOptions(config, this.config.bullmqWorkerOptions, QUEUE.DELIVER),
|
||||
.map((config, index) => new Bull.Worker(`${QUEUE.DELIVER}-${index}`, (job) => this.deliverProcessorService.process(job), {
|
||||
...baseWorkerOptions(config, this.config.bullmqWorkerOptions, QUEUE.DELIVER, index),
|
||||
autorun: false,
|
||||
concurrency: this.config.deliverJobConcurrency ?? 128,
|
||||
limiter: {
|
||||
@ -236,8 +236,8 @@ export class QueueProcessorService implements OnApplicationShutdown {
|
||||
//#region inbox
|
||||
this.inboxQueueWorkers = this.config.redisForInboxQueues
|
||||
.filter((_, index) => process.env.QUEUE_WORKER_INDEX == null || index === Number.parseInt(process.env.QUEUE_WORKER_INDEX, 10))
|
||||
.map(config => new Bull.Worker(QUEUE.INBOX, (job) => this.inboxProcessorService.process(job), {
|
||||
...baseWorkerOptions(config, this.config.bullmqWorkerOptions, QUEUE.INBOX),
|
||||
.map((config, index) => new Bull.Worker(`${QUEUE.INBOX}-${index}`, (job) => this.inboxProcessorService.process(job), {
|
||||
...baseWorkerOptions(config, this.config.bullmqWorkerOptions, QUEUE.INBOX, index),
|
||||
autorun: false,
|
||||
concurrency: this.config.inboxJobConcurrency ?? 16,
|
||||
limiter: {
|
||||
@ -288,7 +288,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
|
||||
//#region relationship
|
||||
this.relationshipQueueWorkers = this.config.redisForRelationshipQueues
|
||||
.filter((_, index) => process.env.QUEUE_WORKER_INDEX == null || index === Number.parseInt(process.env.QUEUE_WORKER_INDEX, 10))
|
||||
.map(config => new Bull.Worker(QUEUE.RELATIONSHIP, (job) => {
|
||||
.map((config, index) => new Bull.Worker(`${QUEUE.RELATIONSHIP}-${index}`, (job) => {
|
||||
switch (job.name) {
|
||||
case 'follow': return this.relationshipProcessorService.processFollow(job);
|
||||
case 'unfollow': return this.relationshipProcessorService.processUnfollow(job);
|
||||
@ -297,7 +297,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
|
||||
default: throw new Error(`unrecognized job type ${job.name} for relationship`);
|
||||
}
|
||||
}, {
|
||||
...baseWorkerOptions(config, this.config.bullmqWorkerOptions, QUEUE.RELATIONSHIP),
|
||||
...baseWorkerOptions(config, this.config.bullmqWorkerOptions, QUEUE.RELATIONSHIP, index),
|
||||
autorun: false,
|
||||
concurrency: this.config.relationshipJobConcurrency ?? 16,
|
||||
limiter: {
|
||||
|
@ -18,7 +18,8 @@ export const QUEUE = {
|
||||
WEBHOOK_DELIVER: 'webhookDeliver',
|
||||
};
|
||||
|
||||
export function baseQueueOptions(config: RedisOptions & RedisOptionsSource, queueOptions: Partial<Bull.QueueOptions>, queueName: typeof QUEUE[keyof typeof QUEUE]): Bull.QueueOptions {
|
||||
export function baseQueueOptions(config: RedisOptions & RedisOptionsSource, queueOptions: Partial<Bull.QueueOptions>, queueName: typeof QUEUE[keyof typeof QUEUE], index?: number): Bull.QueueOptions {
|
||||
const name = typeof index === 'number' ? `${queueName}-${index}` : queueName;
|
||||
return {
|
||||
...queueOptions,
|
||||
connection: {
|
||||
@ -33,11 +34,12 @@ export function baseQueueOptions(config: RedisOptions & RedisOptionsSource, queu
|
||||
return 1;
|
||||
},
|
||||
},
|
||||
prefix: config.prefix ? `${config.prefix}:queue:${queueName}` : `queue:${queueName}`,
|
||||
prefix: config.prefix ? `{${config.prefix}:queue:${name}}` : `{queue:${name}}`,
|
||||
};
|
||||
}
|
||||
|
||||
export function baseWorkerOptions(config: RedisOptions & RedisOptionsSource, workerOptions: Partial<Bull.WorkerOptions>, queueName: typeof QUEUE[keyof typeof QUEUE]): Bull.WorkerOptions {
|
||||
export function baseWorkerOptions(config: RedisOptions & RedisOptionsSource, workerOptions: Partial<Bull.WorkerOptions>, queueName: typeof QUEUE[keyof typeof QUEUE], index?: number): Bull.WorkerOptions {
|
||||
const name = typeof index === 'number' ? `${queueName}-${index}` : queueName;
|
||||
return {
|
||||
...workerOptions,
|
||||
connection: {
|
||||
@ -52,6 +54,6 @@ export function baseWorkerOptions(config: RedisOptions & RedisOptionsSource, wor
|
||||
return 1;
|
||||
},
|
||||
},
|
||||
prefix: config.prefix ? `${config.prefix}:queue:${queueName}` : `queue:${queueName}`,
|
||||
prefix: config.prefix ? `{${config.prefix}:queue:${name}}` : `{queue:${name}}`,
|
||||
};
|
||||
}
|
||||
|
@ -245,13 +245,13 @@ export class ClientServerService {
|
||||
queues: [
|
||||
this.systemQueue,
|
||||
this.endedPollNotificationQueue,
|
||||
...this.deliverQueue.queues,
|
||||
...this.inboxQueue.queues,
|
||||
this.dbQueue,
|
||||
...this.relationshipQueue.queues,
|
||||
this.objectStorageQueue,
|
||||
this.webhookDeliverQueue,
|
||||
].map(q => new BullMQAdapter(q))
|
||||
.concat(this.deliverQueue.queues.map((q, index) => new BullMQAdapter(q, { prefix: `${index}-` })))
|
||||
.concat(this.inboxQueue.queues.map((q, index) => new BullMQAdapter(q, { prefix: `${index}-` })))
|
||||
.concat(this.relationshipQueue.queues.map((q, index) => new BullMQAdapter(q, { prefix: `${index}-` }))),
|
||||
].map(q => new BullMQAdapter(q)),
|
||||
serverAdapter,
|
||||
});
|
||||
|
||||
|
@ -8,7 +8,8 @@ services:
|
||||
environment:
|
||||
DFLY_version_check: false
|
||||
DFLY_tcp_backlog: 2048
|
||||
DFLY_default_lua_flags: allow-undeclared-keys
|
||||
DFLY_cluster_mode: emulated
|
||||
DFLY_lock_on_hashtags: true
|
||||
DFLY_pipeline_squash: 0
|
||||
DFLY_multi_exec_squash: false
|
||||
DFLY_conn_io_threads: 4
|
||||
|
Loading…
Reference in New Issue
Block a user