enhance(backend): migrate bull to bullmq (#10910)

* wip

* wip

* Update QueueService.ts

* wip

* refactor

* ✌️

* fix

* Update QueueStatsService.ts

* refactor

* Update ApNoteService.ts

* Update mock-resolver.ts

* refactor

* Update mock-resolver.ts
This commit is contained in:
syuilo 2023-05-29 11:54:49 +09:00 committed by GitHub
parent 7cbd852fe5
commit fd7b77c542
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
53 changed files with 532 additions and 490 deletions

View file

@ -1,8 +1,8 @@
import { URL } from 'node:url';
import { Inject, Injectable } from '@nestjs/common';
import httpSignature from '@peertube/http-signature';
import * as Bull from 'bullmq';
import { DI } from '@/di-symbols.js';
import type { InstancesRepository, DriveFilesRepository } from '@/models/index.js';
import type { Config } from '@/config.js';
import type Logger from '@/logger.js';
import { MetaService } from '@/core/MetaService.js';
@ -23,10 +23,8 @@ import { LdSignatureService } from '@/core/activitypub/LdSignatureService.js';
import { ApInboxService } from '@/core/activitypub/ApInboxService.js';
import { bindThis } from '@/decorators.js';
import { QueueLoggerService } from '../QueueLoggerService.js';
import type Bull from 'bull';
import type { InboxJobData } from '../types.js';
// ユーザーのinboxにアクティビティが届いた時の処理
@Injectable()
export class InboxProcessorService {
private logger: Logger;
@ -35,12 +33,6 @@ export class InboxProcessorService {
@Inject(DI.config)
private config: Config,
@Inject(DI.instancesRepository)
private instancesRepository: InstancesRepository,
@Inject(DI.driveFilesRepository)
private driveFilesRepository: DriveFilesRepository,
private utilityService: UtilityService,
private metaService: MetaService,
private apInboxService: ApInboxService,
@ -93,24 +85,24 @@ export class InboxProcessorService {
try {
authUser = await this.apDbResolverService.getAuthUserFromApId(getApId(activity.actor));
} catch (err) {
// 対象が4xxならスキップ
// 対象が4xxならスキップ
if (err instanceof StatusError) {
if (err.isClientError) {
return `skip: Ignored deleted actors on both ends ${activity.actor} - ${err.statusCode}`;
throw new Bull.UnrecoverableError(`skip: Ignored deleted actors on both ends ${activity.actor} - ${err.statusCode}`);
}
throw `Error in actor ${activity.actor} - ${err.statusCode ?? err}`;
throw new Error(`Error in actor ${activity.actor} - ${err.statusCode ?? err}`);
}
}
}
// それでもわからなければ終了
if (authUser == null) {
return 'skip: failed to resolve user';
throw new Bull.UnrecoverableError('skip: failed to resolve user');
}
// publicKey がなくても終了
if (authUser.key == null) {
return 'skip: failed to resolve user publicKey';
throw new Bull.UnrecoverableError('skip: failed to resolve user publicKey');
}
// HTTP-Signatureの検証
@ -118,10 +110,10 @@ export class InboxProcessorService {
// また、signatureのsignerは、activity.actorと一致する必要がある
if (!httpSignatureValidated || authUser.user.uri !== activity.actor) {
// 一致しなくても、でもLD-Signatureがありそうならそっちも見る
// 一致しなくても、でもLD-Signatureがありそうならそっちも見る
if (activity.signature) {
if (activity.signature.type !== 'RsaSignature2017') {
return `skip: unsupported LD-signature type ${activity.signature.type}`;
throw new Bull.UnrecoverableError(`skip: unsupported LD-signature type ${activity.signature.type}`);
}
// activity.signature.creator: https://example.oom/users/user#main-key
@ -134,32 +126,32 @@ export class InboxProcessorService {
// keyIdからLD-Signatureのユーザーを取得
authUser = await this.apDbResolverService.getAuthUserFromKeyId(activity.signature.creator);
if (authUser == null) {
return 'skip: LD-Signatureのユーザーが取得できませんでした';
throw new Bull.UnrecoverableError('skip: LD-Signatureのユーザーが取得できませんでした');
}
if (authUser.key == null) {
return 'skip: LD-SignatureのユーザーはpublicKeyを持っていませんでした';
throw new Bull.UnrecoverableError('skip: LD-SignatureのユーザーはpublicKeyを持っていませんでした');
}
// LD-Signature検証
const ldSignature = this.ldSignatureService.use();
const verified = await ldSignature.verifyRsaSignature2017(activity, authUser.key.keyPem).catch(() => false);
if (!verified) {
return 'skip: LD-Signatureの検証に失敗しました';
throw new Bull.UnrecoverableError('skip: LD-Signatureの検証に失敗しました');
}
// もう一度actorチェック
if (authUser.user.uri !== activity.actor) {
return `skip: LD-Signature user(${authUser.user.uri}) !== activity.actor(${activity.actor})`;
throw new Bull.UnrecoverableError(`skip: LD-Signature user(${authUser.user.uri}) !== activity.actor(${activity.actor})`);
}
// ブロックしてたら中断
const ldHost = this.utilityService.extractDbHost(authUser.user.uri);
if (this.utilityService.isBlockedHost(meta.blockedHosts, ldHost)) {
return `Blocked request: ${ldHost}`;
throw new Bull.UnrecoverableError(`Blocked request: ${ldHost}`);
}
} else {
return `skip: http-signature verification failed and no LD-Signature. keyId=${signature.keyId}`;
throw new Bull.UnrecoverableError(`skip: http-signature verification failed and no LD-Signature. keyId=${signature.keyId}`);
}
}
@ -168,7 +160,7 @@ export class InboxProcessorService {
const signerHost = this.utilityService.extractDbHost(authUser.user.uri!);
const activityIdHost = this.utilityService.extractDbHost(activity.id);
if (signerHost !== activityIdHost) {
return `skip: signerHost(${signerHost}) !== activity.id host(${activityIdHost}`;
throw new Bull.UnrecoverableError(`skip: signerHost(${signerHost}) !== activity.id host(${activityIdHost}`);
}
}