Merge remote-tracking branch 'misskey-dev/develop' into io

This commit is contained in:
まっちゃとーにゅ 2024-03-02 05:23:53 +09:00
commit 9ab785c48f
No known key found for this signature in database
GPG key ID: 6AFBBF529601C1DB
94 changed files with 4169 additions and 2564 deletions

View file

@ -3,7 +3,7 @@
* SPDX-License-Identifier: AGPL-3.0-only
*/
import * as fs from 'node:fs';
import { ReadableStream, TextEncoderStream } from 'node:stream/web';
import { Inject, Injectable } from '@nestjs/common';
import { MoreThan } from 'typeorm';
import { format as dateFormat } from 'date-fns';
@ -12,16 +12,89 @@ import type { NotesRepository, PollsRepository, UsersRepository } from '@/models
import type Logger from '@/logger.js';
import { DriveService } from '@/core/DriveService.js';
import { createTemp } from '@/misc/create-temp.js';
import type { MiUser } from '@/models/User.js';
import type { MiPoll } from '@/models/Poll.js';
import type { MiNote } from '@/models/Note.js';
import { bindThis } from '@/decorators.js';
import { DriveFileEntityService } from '@/core/entities/DriveFileEntityService.js';
import { Packed } from '@/misc/json-schema.js';
import { IdService } from '@/core/IdService.js';
import { JsonArrayStream } from '@/misc/JsonArrayStream.js';
import { FileWriterStream } from '@/misc/FileWriterStream.js';
import { QueueLoggerService } from '../QueueLoggerService.js';
import type * as Bull from 'bullmq';
import type { DbJobDataWithUser } from '../types.js';
class NoteStream extends ReadableStream<Record<string, unknown>> {
constructor(
job: Bull.Job,
notesRepository: NotesRepository,
pollsRepository: PollsRepository,
driveFileEntityService: DriveFileEntityService,
idService: IdService,
user: MiUser,
) {
let exportedNotesCount = 0;
let cursor: MiNote['id'] | null = null;
const serialize = (
note: MiNote,
poll: MiPoll | null,
files: Packed<'DriveFile'>[],
): Record<string, unknown> => {
return {
id: note.id,
text: note.text,
createdAt: idService.parse(note.id).date.toISOString(),
fileIds: note.fileIds,
files: files,
replyId: note.replyId,
renoteId: note.renoteId,
poll: poll,
cw: note.cw,
visibility: note.visibility,
visibleUserIds: note.visibleUserIds,
localOnly: note.localOnly,
reactionAcceptance: note.reactionAcceptance,
};
};
super({
async pull(controller): Promise<void> {
const notes = await notesRepository.find({
where: {
userId: user.id,
...(cursor !== null ? { id: MoreThan(cursor) } : {}),
},
take: 100, // 100件ずつ取得
order: { id: 1 },
});
if (notes.length === 0) {
job.updateProgress(100);
controller.close();
}
cursor = notes.at(-1)?.id ?? null;
for (const note of notes) {
const poll = note.hasPoll
? await pollsRepository.findOneByOrFail({ noteId: note.id }) // N+1
: null;
const files = await driveFileEntityService.packManyByIds(note.fileIds, user); // N+1
const content = serialize(note, poll, files);
controller.enqueue(content);
exportedNotesCount++;
}
const total = await notesRepository.countBy({ userId: user.id });
job.updateProgress(exportedNotesCount / total);
},
});
}
}
@Injectable()
export class ExportNotesProcessorService {
private logger: Logger;
@ -59,67 +132,19 @@ export class ExportNotesProcessorService {
this.logger.info(`Temp file is ${path}`);
try {
const stream = fs.createWriteStream(path, { flags: 'a' });
// メモリが足りなくならないようにストリームで処理する
await new NoteStream(
job,
this.notesRepository,
this.pollsRepository,
this.driveFileEntityService,
this.idService,
user,
)
.pipeThrough(new JsonArrayStream())
.pipeThrough(new TextEncoderStream())
.pipeTo(new FileWriterStream(path));
const write = (text: string): Promise<void> => {
return new Promise<void>((res, rej) => {
stream.write(text, err => {
if (err) {
this.logger.error(err);
rej(err);
} else {
res();
}
});
});
};
await write('[');
let exportedNotesCount = 0;
let cursor: MiNote['id'] | null = null;
while (true) {
const notes = await this.notesRepository.find({
where: {
userId: user.id,
...(cursor ? { id: MoreThan(cursor) } : {}),
},
take: 100,
order: {
id: 1,
},
}) as MiNote[];
if (notes.length === 0) {
job.updateProgress(100);
break;
}
cursor = notes[notes.length - 1].id;
for (const note of notes) {
let poll: MiPoll | undefined;
if (note.hasPoll) {
poll = await this.pollsRepository.findOneByOrFail({ noteId: note.id });
}
const files = await this.driveFileEntityService.packManyByIds(note.fileIds, user);
const content = JSON.stringify(this.serialize(note, poll, files));
const isFirst = exportedNotesCount === 0;
await write(isFirst ? content : ',\n' + content);
exportedNotesCount++;
}
const total = await this.notesRepository.countBy({
userId: user.id,
});
job.updateProgress(exportedNotesCount / total);
}
await write(']');
stream.end();
this.logger.succ(`Exported to: ${path}`);
const fileName = 'notes-' + dateFormat(new Date(), 'yyyy-MM-dd-HH-mm-ss') + '.json';
@ -130,22 +155,4 @@ export class ExportNotesProcessorService {
cleanup();
}
}
private serialize(note: MiNote, poll: MiPoll | null = null, files: Packed<'DriveFile'>[]): Record<string, unknown> {
return {
id: note.id,
text: note.text,
createdAt: this.idService.parse(note.id).date.toISOString(),
fileIds: note.fileIds,
files: files,
replyId: note.replyId,
renoteId: note.renoteId,
poll: poll,
cw: note.cw,
visibility: note.visibility,
visibleUserIds: note.visibleUserIds,
localOnly: note.localOnly,
reactionAcceptance: note.reactionAcceptance,
};
}
}

View file

@ -185,8 +185,12 @@ export class InboxProcessorService {
await this.apInboxService.performActivity(authUser.user, activity, job.data.user?.id);
} catch (e) {
if (e instanceof IdentifiableError) {
if (e.id === 'e11b3a16-f543-4885-8eb1-66cad131dbfd') return 'blocked mentions from unfamiliar user';
if (e.id === '057d8d3e-b7ca-4f8b-b38c-dcdcbf34dc30') return 'blocked notes with prohibited words';
if ([
'e11b3a16-f543-4885-8eb1-66cad131dbfd',
'689ee33f-f97c-479a-ac49-1b9f8140af99',
'9f466dab-c856-48cd-9e65-ff90ff750580',
'85ab9bd7-3a41-4530-959d-f07073900109',
].includes(e.id)) return e.message;
}
throw e;
}