Merge branch 'develop' of codeberg.org:calckey/calckey into develop

This commit is contained in:
ThatOneCalculator 2023-03-19 01:32:23 -07:00
commit edb694a88a
15 changed files with 328 additions and 14 deletions

View File

@ -72,6 +72,16 @@ redis:
# user:
# pass:
# ┌─────────────────────┐
#───┘ Sonic configuration └─────────────────────────────────────
#sonic:
# host: localhost
# port: 1491
# auth: SecretPassword
# collection: notes
# bucket: default
# ┌───────────────┐
#───┘ ID generation └───────────────────────────────────────────

1
.gitignore vendored
View File

@ -44,6 +44,7 @@ ormconfig.json
packages/backend/assets/instance.css
packages/backend/assets/sounds/None.mp3
!packages/backend/src/db
# blender backups
*.blend1

View File

@ -112,6 +112,7 @@
"seedrandom": "^3.0.5",
"semver": "7.3.8",
"sharp": "0.31.3",
"sonic-channel": "^1.3.1",
"speakeasy": "2.0.0",
"stringz": "2.1.0",
"summaly": "2.7.0",

View File

@ -32,6 +32,13 @@ export type Source = {
pass?: string;
index?: string;
};
sonic: {
host: string;
port: number;
auth?: string;
collection?: string;
bucket?: string;
};
proxy?: string;
proxySmtp?: string;

View File

@ -0,0 +1,51 @@
import * as SonicChannel from "sonic-channel";
import { dbLogger } from "./logger.js";
import config from "@/config/index.js";
const logger = dbLogger.createSubLogger("sonic", "gray", false);
logger.info("Connecting to Sonic");
const handlers = (type: string): SonicChannel.Handlers => (
{
connected: () => {
logger.succ(`Connected to Sonic ${type}`);
},
disconnected: (error) => {
logger.warn(`Disconnected from Sonic ${type}, error: ${error}`);
},
error: (error) => {
logger.warn(`Sonic ${type} error: ${error}`);
},
retrying: () => {
logger.info(`Sonic ${type} retrying`);
},
timeout: () => {
logger.warn(`Sonic ${type} timeout`);
},
}
)
const hasConfig =
config.sonic
&& ( config.sonic.host
|| config.sonic.port
|| config.sonic.auth
)
const host = hasConfig ? config.sonic.host ?? "localhost" : "";
const port = hasConfig ? config.sonic.port ?? 1491 : 0;
const auth = hasConfig ? config.sonic.auth ?? "SecretPassword" : "";
const collection = hasConfig ? config.sonic.collection ?? "main" : "";
const bucket = hasConfig ? config.sonic.bucket ?? "default" : "";
export default hasConfig
? {
search: new SonicChannel.Search({host, port, auth}).connect(handlers("search")),
ingest: new SonicChannel.Ingest({host, port, auth}).connect(handlers("ingest")),
collection,
bucket,
}
: null;

View File

@ -13,6 +13,7 @@ import processDb from "./processors/db/index.js";
import processObjectStorage from "./processors/object-storage/index.js";
import processSystemQueue from "./processors/system/index.js";
import processWebhookDeliver from "./processors/webhook-deliver.js";
import processBackground from "./processors/background/index.js";
import { endedPollNotification } from "./processors/ended-poll-notification.js";
import { queueLogger } from "./logger.js";
import { getJobInfo } from "./get-job-info.js";
@ -24,6 +25,7 @@ import {
objectStorageQueue,
endedPollNotificationQueue,
webhookDeliverQueue,
backgroundQueue,
} from "./queues.js";
import type { ThinUser } from "./types.js";
@ -418,6 +420,17 @@ export function createCleanRemoteFilesJob() {
);
}
export function createIndexAllNotesJob(data = {}) {
return backgroundQueue.add(
"indexAllNotes",
data,
{
removeOnComplete: true,
removeOnFail: true,
},
);
}
export function webhookDeliver(
webhook: Webhook,
type: typeof webhookEventTypes[number],
@ -454,6 +467,7 @@ export default function () {
webhookDeliverQueue.process(64, processWebhookDeliver);
processDb(dbQueue);
processObjectStorage(objectStorageQueue);
processBackground(backgroundQueue);
systemQueue.add(
"tickCharts",

View File

@ -0,0 +1,76 @@
import type Bull from "bull";
import { queueLogger } from "../../logger.js";
import { Notes } from "@/models/index.js";
import { MoreThan } from "typeorm";
import { index } from "@/services/note/create.js"
import { Note } from "@/models/entities/note.js";
const logger = queueLogger.createSubLogger("index-all-notes");
export default async function indexAllNotes(
job: Bull.Job<Record<string, unknown>>,
done: ()=>void,
): Promise<void> {
logger.info("Indexing all notes...");
let cursor: string|null = job.data.cursor as string ?? null;
let indexedCount: number = job.data.indexedCount as number ?? 0;
let total: number = job.data.total as number ?? 0;
let running = true;
const take = 50000;
const batch = 100;
while (running) {
logger.info(`Querying for ${take} notes ${indexedCount}/${total ? total : '?'} at ${cursor}`);
let notes: Note[] = [];
try {
notes = await Notes.find({
where: {
...(cursor ? { id: MoreThan(cursor) } : {}),
},
take: take,
order: {
id: 1,
},
});
} catch (e) {
logger.error(`Failed to query notes ${e}`);
continue;
}
if (notes.length === 0) {
job.progress(100);
running = false;
break;
}
try {
const count = await Notes.count();
total = count;
job.update({ indexedCount, cursor, total })
} catch (e) {
}
for (let i = 0; i < notes.length; i += batch) {
const chunk = notes.slice(i, i + batch);
await Promise.all(chunk.map(note => index(note)));
indexedCount += chunk.length;
const pct = (indexedCount / total)*100;
job.update({ indexedCount, cursor, total })
job.progress(+(pct.toFixed(1)));
logger.info(`Indexed notes ${indexedCount}/${total ? total : '?'}`);
}
cursor = notes[notes.length - 1].id;
job.update({ indexedCount, cursor, total })
if (notes.length < take) {
running = false;
}
}
done();
logger.info("All notes have been indexed.");
}

View File

@ -0,0 +1,15 @@
import type Bull from "bull";
import indexAllNotes from "./index-all-notes.js";
const jobs = {
indexAllNotes,
} as Record<
string,
Bull.ProcessCallbackFunction<Record<string, unknown>>
>;
export default function (q: Bull.Queue) {
for (const [k, v] of Object.entries(jobs)) {
q.process(k, 16, v);
}
}

View File

@ -27,6 +27,7 @@ export const webhookDeliverQueue = initializeQueue<WebhookDeliverJobData>(
"webhookDeliver",
64,
);
export const backgroundQueue = initializeQueue<Record<string, unknown>>("bg");
export const queues = [
systemQueue,
@ -36,4 +37,5 @@ export const queues = [
dbQueue,
objectStorageQueue,
webhookDeliverQueue,
backgroundQueue,
];

View File

@ -51,6 +51,7 @@ import * as ep___admin_relays_list from "./endpoints/admin/relays/list.js";
import * as ep___admin_relays_remove from "./endpoints/admin/relays/remove.js";
import * as ep___admin_resetPassword from "./endpoints/admin/reset-password.js";
import * as ep___admin_resolveAbuseUserReport from "./endpoints/admin/resolve-abuse-user-report.js";
import * as ep___admin_search_indexAll from "./endpoints/admin/search/index-all.js";
import * as ep___admin_sendEmail from "./endpoints/admin/send-email.js";
import * as ep___admin_serverInfo from "./endpoints/admin/server-info.js";
import * as ep___admin_showModerationLogs from "./endpoints/admin/show-moderation-logs.js";
@ -393,6 +394,7 @@ const eps = [
["admin/relays/remove", ep___admin_relays_remove],
["admin/reset-password", ep___admin_resetPassword],
["admin/resolve-abuse-user-report", ep___admin_resolveAbuseUserReport],
["admin/search/index-all", ep___admin_search_indexAll],
["admin/send-email", ep___admin_sendEmail],
["admin/server-info", ep___admin_serverInfo],
["admin/show-moderation-logs", ep___admin_showModerationLogs],

View File

@ -3,6 +3,7 @@ import {
inboxQueue,
dbQueue,
objectStorageQueue,
backgroundQueue,
} from "@/queue/queues.js";
import define from "../../../define.js";
@ -37,6 +38,11 @@ export const meta = {
nullable: false,
ref: "QueueCount",
},
backgroundQueue: {
optional: false,
nullable: false,
ref: "QueueCount",
},
},
},
} as const;
@ -52,11 +58,13 @@ export default define(meta, paramDef, async (ps) => {
const inboxJobCounts = await inboxQueue.getJobCounts();
const dbJobCounts = await dbQueue.getJobCounts();
const objectStorageJobCounts = await objectStorageQueue.getJobCounts();
const backgroundJobCounts = await backgroundQueue.getJobCounts();
return {
deliver: deliverJobCounts,
inbox: inboxJobCounts,
db: dbJobCounts,
objectStorage: objectStorageJobCounts,
backgroundQueue: backgroundJobCounts,
};
});

View File

@ -0,0 +1,28 @@
import define from "../../../define.js";
import { createIndexAllNotesJob } from "@/queue/index.js";
export const meta = {
tags: ["admin"],
requireCredential: true,
requireModerator: true,
} as const;
export const paramDef = {
type: "object",
properties: {
cursor: {
type: "string",
format: "misskey:id",
nullable: true,
default: null,
},
},
required: [],
} as const;
export default define(meta, paramDef, async (ps, _me) => {
createIndexAllNotesJob({
cursor: ps.cursor ?? undefined,
});
});

View File

@ -1,7 +1,9 @@
import { In } from "typeorm";
import { Notes } from "@/models/index.js";
import { Note } from "@/models/entities/note.js";
import config from "@/config/index.js";
import es from "../../../../db/elasticsearch.js";
import sonic from "../../../../db/sonic.js";
import define from "../../define.js";
import { makePaginationQuery } from "../../common/make-pagination-query.js";
import { generateVisibilityQuery } from "../../common/generate-visibility-query.js";
@ -59,7 +61,7 @@ export const paramDef = {
} as const;
export default define(meta, paramDef, async (ps, me) => {
if (es == null) {
if (es == null && sonic == null) {
const query = makePaginationQuery(
Notes.createQueryBuilder("note"),
ps.sinceId,
@ -92,9 +94,82 @@ export default define(meta, paramDef, async (ps, me) => {
if (me) generateMutedUserQuery(query, me);
if (me) generateBlockedUserQuery(query, me);
const notes = await query.take(ps.limit).getMany();
const notes: Note[] = await query.take(ps.limit).getMany();
return await Notes.packMany(notes, me);
} else if (sonic) {
let start = 0;
const chunkSize = 100;
// Use sonic to fetch and step through all search results that could match the requirements
const ids = [];
while (true) {
const results = await sonic.search.query(
sonic.collection,
sonic.bucket,
ps.query,
{
limit: chunkSize,
offset: start,
},
);
start += chunkSize;
if (results.length === 0) {
break;
}
const res = results
.map((k) => JSON.parse(k))
.filter((key) => {
if (ps.userId && key.userId !== ps.userId) {
return false;
}
if (ps.channelId && key.channelId !== ps.channelId) {
return false;
}
if (ps.sinceId && key.id <= ps.sinceId) {
return false;
}
if (ps.untilId && key.id >= ps.untilId) {
return false;
}
return true;
})
.map((key) => key.id);
ids.push(...res);
}
// Sort all the results by note id DESC (newest first)
ids.sort((a, b) => b - a);
// Fetch the notes from the database until we have enough to satisfy the limit
start = 0;
const found = [];
while (found.length < ps.limit && start < ids.length) {
const chunk = ids.slice(start, start + chunkSize);
const notes: Note[] = await Notes.find({
where: {
id: In(chunk),
},
order: {
id: "DESC",
},
});
// The notes are checked for visibility and muted/blocked users when packed
found.push(...await Notes.packMany(notes, me));
start += chunkSize;
}
// If we have more results than the limit, trim them
if (found.length > ps.limit) {
found.length = ps.limit;
}
return found;
} else {
const userQuery =
ps.userId != null

View File

@ -1,5 +1,6 @@
import * as mfm from "mfm-js";
import es from "../../db/elasticsearch.js";
import sonic from "../../db/sonic.js";
import {
publishMainStream,
publishNotesStream,
@ -588,7 +589,7 @@ export default async (
}
// Register to search database
index(note);
await index(note);
});
async function renderNoteOrRenoteActivity(data: Option, note: Note) {
@ -728,18 +729,34 @@ async function insertNote(
}
}
function index(note: Note) {
if (note.text == null || config.elasticsearch == null) return;
export async function index(note: Note): Promise<void> {
if (!note.text) return;
es!.index({
index: config.elasticsearch.index || "misskey_note",
id: note.id.toString(),
body: {
text: normalizeForSearch(note.text),
userId: note.userId,
userHost: note.userHost,
},
});
if (config.elasticsearch && es) {
es.index({
index: config.elasticsearch.index || "misskey_note",
id: note.id.toString(),
body: {
text: normalizeForSearch(note.text),
userId: note.userId,
userHost: note.userHost,
},
});
}
if (sonic) {
await sonic.ingest.push(
sonic.collection,
sonic.bucket,
JSON.stringify({
id: note.id,
userId: note.userId,
userHost: note.userHost,
channelId: note.channelId,
}),
note.text,
);
}
}
async function notifyToWatchersOfRenotee(

7
pnpm-lock.yaml generated
View File

@ -196,6 +196,7 @@ importers:
seedrandom: ^3.0.5
semver: 7.3.8
sharp: 0.31.3
sonic-channel: ^1.3.1
speakeasy: 2.0.0
strict-event-emitter-types: 2.0.0
stringz: 2.1.0
@ -310,6 +311,7 @@ importers:
seedrandom: 3.0.5
semver: 7.3.8
sharp: 0.31.3
sonic-channel: 1.3.1
speakeasy: 2.0.0
stringz: 2.1.0
summaly: 2.7.0
@ -11594,6 +11596,11 @@ packages:
smart-buffer: 4.2.0
dev: false
/sonic-channel/1.3.1:
resolution: {integrity: sha512-+K4IZVFE7Tf2DB4EFZ23xo7a/+gJaiOHhFzXVZpzkX6Rs/rvf4YbSxnEGdYw8mrTcjtpG+jLVQEhP8sNTtN5VA==}
engines: {node: '>= 6.0.0'}
dev: false
/sort-keys-length/1.0.1:
resolution: {integrity: sha512-GRbEOUqCxemTAk/b32F2xa8wDTs+Z1QHOkbhJDQTvv/6G3ZkbJ+frYWsTcc7cBB3Fu4wy4XlLCuNtJuMn7Gsvw==}
engines: {node: '>=0.10.0'}