From 51386722320f6cd1351d1508023bb59b3598ca01 Mon Sep 17 00:00:00 2001 From: KOBA789 Date: Mon, 16 Sep 2024 20:07:45 +0900 Subject: [PATCH] perf(backend): Defer instance metadata update (#727) --- .../backend/src/core/NoteCreateService.ts | 15 +++++++++++- packages/backend/src/misc/collapsed-queue.ts | 24 +++++++++++++++++++ .../queue/processors/InboxProcessorService.ts | 22 +++++++++++++---- 3 files changed, 56 insertions(+), 5 deletions(-) create mode 100644 packages/backend/src/misc/collapsed-queue.ts diff --git a/packages/backend/src/core/NoteCreateService.ts b/packages/backend/src/core/NoteCreateService.ts index cfe428c9e..d16ebac2a 100644 --- a/packages/backend/src/core/NoteCreateService.ts +++ b/packages/backend/src/core/NoteCreateService.ts @@ -59,6 +59,7 @@ import { isReply } from '@/misc/is-reply.js'; import { trackPromise } from '@/misc/promise-tracker.js'; import { isNotNull } from '@/misc/is-not-null.js'; import { IdentifiableError } from '@/misc/identifiable-error.js'; +import { CollapsedQueue } from '@/misc/collapsed-queue.js'; type NotificationType = 'reply' | 'renote' | 'quote' | 'mention'; @@ -150,6 +151,7 @@ type Option = { export class NoteCreateService implements OnApplicationShutdown { private logger: Logger; #shutdownController = new AbortController(); + private updateNotesCountQueue: CollapsedQueue; constructor( @Inject(DI.config) @@ -217,6 +219,7 @@ export class NoteCreateService implements OnApplicationShutdown { private loggerService: LoggerService, ) { this.logger = this.loggerService.getLogger('note:create'); + this.updateNotesCountQueue = new CollapsedQueue(60 * 1000 * 5, this.collapseNotesCount, this.performUpdateNotesCount); } @bindThis @@ -548,7 +551,7 @@ export class NoteCreateService implements OnApplicationShutdown { // Register host if (this.userEntityService.isRemoteUser(user)) { this.federatedInstanceService.fetch(user.host).then(async i => { - this.instancesRepository.increment({ id: i.id }, 'notesCount', 1); + this.updateNotesCountQueue.enqueue(i.id, 1); if ((await this.metaService.fetch()).enableChartsForFederatedInstances) { this.instanceChart.updateNote(i.host, note, true); } @@ -1092,6 +1095,16 @@ export class NoteCreateService implements OnApplicationShutdown { ); } + @bindThis + private collapseNotesCount(oldValue: number, newValue: number) { + return oldValue + newValue; + } + + @bindThis + private performUpdateNotesCount(id: MiNote['id'], incrBy: number) { + this.instancesRepository.increment({ id: id }, 'notesCount', incrBy); + } + @bindThis public dispose(): void { this.#shutdownController.abort(); diff --git a/packages/backend/src/misc/collapsed-queue.ts b/packages/backend/src/misc/collapsed-queue.ts new file mode 100644 index 000000000..731ff58d2 --- /dev/null +++ b/packages/backend/src/misc/collapsed-queue.ts @@ -0,0 +1,24 @@ +export class CollapsedQueue { + private jobs: Map = new Map(); + + constructor( + private timeout: number, + private collapse: (oldValue: V, newValue: V) => V, + private doJob: (key: K, value: V) => void, + ) { } + + enqueue(key: K, value: V) { + if (this.jobs.has(key)) { + const old = this.jobs.get(key)!; + const merged = this.collapse(old, value); + this.jobs.set(key, merged); + } else { + this.jobs.set(key, value); + setTimeout(() => { + const value = this.jobs.get(key)!; + this.jobs.delete(key); + this.doJob(key, value); + }, this.timeout); + } + } +} diff --git a/packages/backend/src/queue/processors/InboxProcessorService.ts b/packages/backend/src/queue/processors/InboxProcessorService.ts index be800bef4..e87a29b54 100644 --- a/packages/backend/src/queue/processors/InboxProcessorService.ts +++ b/packages/backend/src/queue/processors/InboxProcessorService.ts @@ -28,10 +28,13 @@ import { bindThis } from '@/decorators.js'; import { IdentifiableError } from '@/misc/identifiable-error.js'; import { QueueLoggerService } from '../QueueLoggerService.js'; import type { InboxJobData } from '../types.js'; +import { CollapsedQueue } from '@/misc/collapsed-queue.js'; +import { MiNote } from '@/models/Note.js'; @Injectable() export class InboxProcessorService { private logger: Logger; + private updateInstanceQueue: CollapsedQueue; constructor( private utilityService: UtilityService, @@ -48,6 +51,7 @@ export class InboxProcessorService { private queueLoggerService: QueueLoggerService, ) { this.logger = this.queueLoggerService.logger.createSubLogger('inbox'); + this.updateInstanceQueue = new CollapsedQueue(60 * 1000 * 5, this.collapseUpdateInstanceJobs, this.performUpdateInstance); } @bindThis @@ -180,10 +184,7 @@ export class InboxProcessorService { // Update stats this.federatedInstanceService.fetch(authUser.user.host).then(i => { - this.federatedInstanceService.update(i.id, { - latestRequestReceivedAt: new Date(), - isNotResponding: false, - }); + this.updateInstanceQueue.enqueue(i.id, new Date()); this.fetchInstanceMetadataService.fetchInstanceMetadata(i); @@ -211,4 +212,17 @@ export class InboxProcessorService { } return 'ok'; } + + @bindThis + public collapseUpdateInstanceJobs(oldValue: Date, newValue: Date) { + return oldValue < newValue ? newValue : oldValue; + } + + @bindThis + public performUpdateInstance(id: string, value: Date) { + this.federatedInstanceService.update(id, { + latestRequestReceivedAt: value, + isNotResponding: false, + }); + } }