mirror of
https://iceshrimp.dev/iceshrimp/iceshrimp
synced 2025-01-19 03:52:52 +09:00
Add sonic full-text search support (#9714)
This pull request adds support for the [sonic](https://github.com/valeriansaliou/sonic) full text indexing server into Calckey. In addition to this, a stateful endpoint has been added that will completely (re-)index all notes into any (elasticsearch and/or sonic) indexing server defined in your config at `/api/admin/search/index-all`. It can (optionally) take input data to define the starting point, such as: ``` {"cursor": "9beg3lx6ad"} ``` Currently if both sonic and elasticsearch are defined in the config, sonic will take precedence for searching, but both indexes will continue to be updated for new note creations. Future enhancements may include the ability to choose which indexer to use (or combine multiple). Co-authored-by: Kaitlyn Allan <kaitlyn.allan@enlabs.cloud> Reviewed-on: https://codeberg.org/calckey/calckey/pulls/9714 Co-authored-by: Kaity A <supakaity@noreply.codeberg.org> Co-committed-by: Kaity A <supakaity@noreply.codeberg.org>
This commit is contained in:
parent
e3fb0cbb3b
commit
77c12cba8d
@ -72,6 +72,16 @@ redis:
|
||||
# user:
|
||||
# pass:
|
||||
|
||||
# ┌─────────────────────┐
|
||||
#───┘ Sonic configuration └─────────────────────────────────────
|
||||
|
||||
#sonic:
|
||||
# host: localhost
|
||||
# port: 1491
|
||||
# auth: SecretPassword
|
||||
# collection: notes
|
||||
# bucket: default
|
||||
|
||||
# ┌───────────────┐
|
||||
#───┘ ID generation └───────────────────────────────────────────
|
||||
|
||||
|
1
.gitignore
vendored
1
.gitignore
vendored
@ -44,6 +44,7 @@ ormconfig.json
|
||||
packages/backend/assets/instance.css
|
||||
packages/backend/assets/sounds/None.mp3
|
||||
|
||||
!packages/backend/src/db
|
||||
|
||||
# blender backups
|
||||
*.blend1
|
||||
|
@ -112,6 +112,7 @@
|
||||
"seedrandom": "^3.0.5",
|
||||
"semver": "7.3.8",
|
||||
"sharp": "0.31.3",
|
||||
"sonic-channel": "^1.3.1",
|
||||
"speakeasy": "2.0.0",
|
||||
"stringz": "2.1.0",
|
||||
"summaly": "2.7.0",
|
||||
|
@ -32,6 +32,13 @@ export type Source = {
|
||||
pass?: string;
|
||||
index?: string;
|
||||
};
|
||||
sonic: {
|
||||
host: string;
|
||||
port: number;
|
||||
auth?: string;
|
||||
collection?: string;
|
||||
bucket?: string;
|
||||
};
|
||||
|
||||
proxy?: string;
|
||||
proxySmtp?: string;
|
||||
|
51
packages/backend/src/db/sonic.ts
Normal file
51
packages/backend/src/db/sonic.ts
Normal file
@ -0,0 +1,51 @@
|
||||
import * as SonicChannel from "sonic-channel";
|
||||
import { dbLogger } from "./logger.js";
|
||||
|
||||
import config from "@/config/index.js";
|
||||
|
||||
const logger = dbLogger.createSubLogger("sonic", "gray", false);
|
||||
|
||||
logger.info("Connecting to Sonic");
|
||||
|
||||
const handlers = (type: string): SonicChannel.Handlers => (
|
||||
{
|
||||
connected: () => {
|
||||
logger.succ(`Connected to Sonic ${type}`);
|
||||
},
|
||||
disconnected: (error) => {
|
||||
logger.warn(`Disconnected from Sonic ${type}, error: ${error}`);
|
||||
},
|
||||
error: (error) => {
|
||||
logger.warn(`Sonic ${type} error: ${error}`);
|
||||
},
|
||||
retrying: () => {
|
||||
logger.info(`Sonic ${type} retrying`);
|
||||
},
|
||||
timeout: () => {
|
||||
logger.warn(`Sonic ${type} timeout`);
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
const hasConfig =
|
||||
config.sonic
|
||||
&& ( config.sonic.host
|
||||
|| config.sonic.port
|
||||
|| config.sonic.auth
|
||||
)
|
||||
|
||||
const host = hasConfig ? config.sonic.host ?? "localhost" : "";
|
||||
const port = hasConfig ? config.sonic.port ?? 1491 : 0;
|
||||
const auth = hasConfig ? config.sonic.auth ?? "SecretPassword" : "";
|
||||
const collection = hasConfig ? config.sonic.collection ?? "main" : "";
|
||||
const bucket = hasConfig ? config.sonic.bucket ?? "default" : "";
|
||||
|
||||
export default hasConfig
|
||||
? {
|
||||
search: new SonicChannel.Search({host, port, auth}).connect(handlers("search")),
|
||||
ingest: new SonicChannel.Ingest({host, port, auth}).connect(handlers("ingest")),
|
||||
|
||||
collection,
|
||||
bucket,
|
||||
}
|
||||
: null;
|
@ -13,6 +13,7 @@ import processDb from "./processors/db/index.js";
|
||||
import processObjectStorage from "./processors/object-storage/index.js";
|
||||
import processSystemQueue from "./processors/system/index.js";
|
||||
import processWebhookDeliver from "./processors/webhook-deliver.js";
|
||||
import processBackground from "./processors/background/index.js";
|
||||
import { endedPollNotification } from "./processors/ended-poll-notification.js";
|
||||
import { queueLogger } from "./logger.js";
|
||||
import { getJobInfo } from "./get-job-info.js";
|
||||
@ -24,6 +25,7 @@ import {
|
||||
objectStorageQueue,
|
||||
endedPollNotificationQueue,
|
||||
webhookDeliverQueue,
|
||||
backgroundQueue,
|
||||
} from "./queues.js";
|
||||
import type { ThinUser } from "./types.js";
|
||||
|
||||
@ -418,6 +420,17 @@ export function createCleanRemoteFilesJob() {
|
||||
);
|
||||
}
|
||||
|
||||
export function createIndexAllNotesJob(data = {}) {
|
||||
return backgroundQueue.add(
|
||||
"indexAllNotes",
|
||||
data,
|
||||
{
|
||||
removeOnComplete: true,
|
||||
removeOnFail: true,
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
export function webhookDeliver(
|
||||
webhook: Webhook,
|
||||
type: typeof webhookEventTypes[number],
|
||||
@ -454,6 +467,7 @@ export default function () {
|
||||
webhookDeliverQueue.process(64, processWebhookDeliver);
|
||||
processDb(dbQueue);
|
||||
processObjectStorage(objectStorageQueue);
|
||||
processBackground(backgroundQueue);
|
||||
|
||||
systemQueue.add(
|
||||
"tickCharts",
|
||||
|
@ -0,0 +1,76 @@
|
||||
import type Bull from "bull";
|
||||
|
||||
import { queueLogger } from "../../logger.js";
|
||||
import { Notes } from "@/models/index.js";
|
||||
import { MoreThan } from "typeorm";
|
||||
import { index } from "@/services/note/create.js"
|
||||
import { Note } from "@/models/entities/note.js";
|
||||
|
||||
const logger = queueLogger.createSubLogger("index-all-notes");
|
||||
|
||||
export default async function indexAllNotes(
|
||||
job: Bull.Job<Record<string, unknown>>,
|
||||
done: ()=>void,
|
||||
): Promise<void> {
|
||||
logger.info("Indexing all notes...");
|
||||
|
||||
let cursor: string|null = job.data.cursor as string ?? null;
|
||||
let indexedCount: number = job.data.indexedCount as number ?? 0;
|
||||
let total: number = job.data.total as number ?? 0;
|
||||
|
||||
let running = true;
|
||||
const take = 50000;
|
||||
const batch = 100;
|
||||
while (running) {
|
||||
logger.info(`Querying for ${take} notes ${indexedCount}/${total ? total : '?'} at ${cursor}`);
|
||||
|
||||
let notes: Note[] = [];
|
||||
try {
|
||||
notes = await Notes.find({
|
||||
where: {
|
||||
...(cursor ? { id: MoreThan(cursor) } : {}),
|
||||
},
|
||||
take: take,
|
||||
order: {
|
||||
id: 1,
|
||||
},
|
||||
});
|
||||
} catch (e) {
|
||||
logger.error(`Failed to query notes ${e}`);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (notes.length === 0) {
|
||||
job.progress(100);
|
||||
running = false;
|
||||
break;
|
||||
}
|
||||
|
||||
try {
|
||||
const count = await Notes.count();
|
||||
total = count;
|
||||
job.update({ indexedCount, cursor, total })
|
||||
} catch (e) {
|
||||
}
|
||||
|
||||
for (let i = 0; i < notes.length; i += batch) {
|
||||
const chunk = notes.slice(i, i + batch);
|
||||
await Promise.all(chunk.map(note => index(note)));
|
||||
|
||||
indexedCount += chunk.length;
|
||||
const pct = (indexedCount / total)*100;
|
||||
job.update({ indexedCount, cursor, total })
|
||||
job.progress(+(pct.toFixed(1)));
|
||||
logger.info(`Indexed notes ${indexedCount}/${total ? total : '?'}`);
|
||||
}
|
||||
cursor = notes[notes.length - 1].id;
|
||||
job.update({ indexedCount, cursor, total })
|
||||
|
||||
if (notes.length < take) {
|
||||
running = false;
|
||||
}
|
||||
}
|
||||
|
||||
done();
|
||||
logger.info("All notes have been indexed.");
|
||||
}
|
15
packages/backend/src/queue/processors/background/index.ts
Normal file
15
packages/backend/src/queue/processors/background/index.ts
Normal file
@ -0,0 +1,15 @@
|
||||
import type Bull from "bull";
|
||||
import indexAllNotes from "./index-all-notes.js";
|
||||
|
||||
const jobs = {
|
||||
indexAllNotes,
|
||||
} as Record<
|
||||
string,
|
||||
Bull.ProcessCallbackFunction<Record<string, unknown>>
|
||||
>;
|
||||
|
||||
export default function (q: Bull.Queue) {
|
||||
for (const [k, v] of Object.entries(jobs)) {
|
||||
q.process(k, 16, v);
|
||||
}
|
||||
}
|
@ -27,6 +27,7 @@ export const webhookDeliverQueue = initializeQueue<WebhookDeliverJobData>(
|
||||
"webhookDeliver",
|
||||
64,
|
||||
);
|
||||
export const backgroundQueue = initializeQueue<Record<string, unknown>>("bg");
|
||||
|
||||
export const queues = [
|
||||
systemQueue,
|
||||
@ -36,4 +37,5 @@ export const queues = [
|
||||
dbQueue,
|
||||
objectStorageQueue,
|
||||
webhookDeliverQueue,
|
||||
backgroundQueue,
|
||||
];
|
||||
|
@ -51,6 +51,7 @@ import * as ep___admin_relays_list from "./endpoints/admin/relays/list.js";
|
||||
import * as ep___admin_relays_remove from "./endpoints/admin/relays/remove.js";
|
||||
import * as ep___admin_resetPassword from "./endpoints/admin/reset-password.js";
|
||||
import * as ep___admin_resolveAbuseUserReport from "./endpoints/admin/resolve-abuse-user-report.js";
|
||||
import * as ep___admin_search_indexAll from "./endpoints/admin/search/index-all.js";
|
||||
import * as ep___admin_sendEmail from "./endpoints/admin/send-email.js";
|
||||
import * as ep___admin_serverInfo from "./endpoints/admin/server-info.js";
|
||||
import * as ep___admin_showModerationLogs from "./endpoints/admin/show-moderation-logs.js";
|
||||
@ -393,6 +394,7 @@ const eps = [
|
||||
["admin/relays/remove", ep___admin_relays_remove],
|
||||
["admin/reset-password", ep___admin_resetPassword],
|
||||
["admin/resolve-abuse-user-report", ep___admin_resolveAbuseUserReport],
|
||||
["admin/search/index-all", ep___admin_search_indexAll],
|
||||
["admin/send-email", ep___admin_sendEmail],
|
||||
["admin/server-info", ep___admin_serverInfo],
|
||||
["admin/show-moderation-logs", ep___admin_showModerationLogs],
|
||||
|
@ -3,6 +3,7 @@ import {
|
||||
inboxQueue,
|
||||
dbQueue,
|
||||
objectStorageQueue,
|
||||
backgroundQueue,
|
||||
} from "@/queue/queues.js";
|
||||
import define from "../../../define.js";
|
||||
|
||||
@ -37,6 +38,11 @@ export const meta = {
|
||||
nullable: false,
|
||||
ref: "QueueCount",
|
||||
},
|
||||
backgroundQueue: {
|
||||
optional: false,
|
||||
nullable: false,
|
||||
ref: "QueueCount",
|
||||
},
|
||||
},
|
||||
},
|
||||
} as const;
|
||||
@ -52,11 +58,13 @@ export default define(meta, paramDef, async (ps) => {
|
||||
const inboxJobCounts = await inboxQueue.getJobCounts();
|
||||
const dbJobCounts = await dbQueue.getJobCounts();
|
||||
const objectStorageJobCounts = await objectStorageQueue.getJobCounts();
|
||||
const backgroundJobCounts = await backgroundQueue.getJobCounts();
|
||||
|
||||
return {
|
||||
deliver: deliverJobCounts,
|
||||
inbox: inboxJobCounts,
|
||||
db: dbJobCounts,
|
||||
objectStorage: objectStorageJobCounts,
|
||||
backgroundQueue: backgroundJobCounts,
|
||||
};
|
||||
});
|
||||
|
@ -0,0 +1,28 @@
|
||||
import define from "../../../define.js";
|
||||
import { createIndexAllNotesJob } from "@/queue/index.js";
|
||||
|
||||
export const meta = {
|
||||
tags: ["admin"],
|
||||
|
||||
requireCredential: true,
|
||||
requireModerator: true,
|
||||
} as const;
|
||||
|
||||
export const paramDef = {
|
||||
type: "object",
|
||||
properties: {
|
||||
cursor: {
|
||||
type: "string",
|
||||
format: "misskey:id",
|
||||
nullable: true,
|
||||
default: null,
|
||||
},
|
||||
},
|
||||
required: [],
|
||||
} as const;
|
||||
|
||||
export default define(meta, paramDef, async (ps, _me) => {
|
||||
createIndexAllNotesJob({
|
||||
cursor: ps.cursor ?? undefined,
|
||||
});
|
||||
});
|
@ -1,7 +1,9 @@
|
||||
import { In } from "typeorm";
|
||||
import { Notes } from "@/models/index.js";
|
||||
import { Note } from "@/models/entities/note.js";
|
||||
import config from "@/config/index.js";
|
||||
import es from "../../../../db/elasticsearch.js";
|
||||
import sonic from "../../../../db/sonic.js";
|
||||
import define from "../../define.js";
|
||||
import { makePaginationQuery } from "../../common/make-pagination-query.js";
|
||||
import { generateVisibilityQuery } from "../../common/generate-visibility-query.js";
|
||||
@ -59,7 +61,7 @@ export const paramDef = {
|
||||
} as const;
|
||||
|
||||
export default define(meta, paramDef, async (ps, me) => {
|
||||
if (es == null) {
|
||||
if (es == null && sonic == null) {
|
||||
const query = makePaginationQuery(
|
||||
Notes.createQueryBuilder("note"),
|
||||
ps.sinceId,
|
||||
@ -92,9 +94,82 @@ export default define(meta, paramDef, async (ps, me) => {
|
||||
if (me) generateMutedUserQuery(query, me);
|
||||
if (me) generateBlockedUserQuery(query, me);
|
||||
|
||||
const notes = await query.take(ps.limit).getMany();
|
||||
const notes: Note[] = await query.take(ps.limit).getMany();
|
||||
|
||||
return await Notes.packMany(notes, me);
|
||||
} else if (sonic) {
|
||||
let start = 0;
|
||||
const chunkSize = 100;
|
||||
|
||||
// Use sonic to fetch and step through all search results that could match the requirements
|
||||
const ids = [];
|
||||
while (true) {
|
||||
const results = await sonic.search.query(
|
||||
sonic.collection,
|
||||
sonic.bucket,
|
||||
ps.query,
|
||||
{
|
||||
limit: chunkSize,
|
||||
offset: start,
|
||||
},
|
||||
);
|
||||
|
||||
start += chunkSize;
|
||||
|
||||
if (results.length === 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
const res = results
|
||||
.map((k) => JSON.parse(k))
|
||||
.filter((key) => {
|
||||
if (ps.userId && key.userId !== ps.userId) {
|
||||
return false;
|
||||
}
|
||||
if (ps.channelId && key.channelId !== ps.channelId) {
|
||||
return false;
|
||||
}
|
||||
if (ps.sinceId && key.id <= ps.sinceId) {
|
||||
return false;
|
||||
}
|
||||
if (ps.untilId && key.id >= ps.untilId) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
})
|
||||
.map((key) => key.id);
|
||||
|
||||
ids.push(...res);
|
||||
}
|
||||
|
||||
// Sort all the results by note id DESC (newest first)
|
||||
ids.sort((a, b) => b - a);
|
||||
|
||||
// Fetch the notes from the database until we have enough to satisfy the limit
|
||||
start = 0;
|
||||
const found = [];
|
||||
while (found.length < ps.limit && start < ids.length) {
|
||||
const chunk = ids.slice(start, start + chunkSize);
|
||||
const notes: Note[] = await Notes.find({
|
||||
where: {
|
||||
id: In(chunk),
|
||||
},
|
||||
order: {
|
||||
id: "DESC",
|
||||
},
|
||||
});
|
||||
|
||||
// The notes are checked for visibility and muted/blocked users when packed
|
||||
found.push(...await Notes.packMany(notes, me));
|
||||
start += chunkSize;
|
||||
}
|
||||
|
||||
// If we have more results than the limit, trim them
|
||||
if (found.length > ps.limit) {
|
||||
found.length = ps.limit;
|
||||
}
|
||||
|
||||
return found;
|
||||
} else {
|
||||
const userQuery =
|
||||
ps.userId != null
|
||||
|
@ -1,5 +1,6 @@
|
||||
import * as mfm from "mfm-js";
|
||||
import es from "../../db/elasticsearch.js";
|
||||
import sonic from "../../db/sonic.js";
|
||||
import {
|
||||
publishMainStream,
|
||||
publishNotesStream,
|
||||
@ -588,7 +589,7 @@ export default async (
|
||||
}
|
||||
|
||||
// Register to search database
|
||||
index(note);
|
||||
await index(note);
|
||||
});
|
||||
|
||||
async function renderNoteOrRenoteActivity(data: Option, note: Note) {
|
||||
@ -728,18 +729,34 @@ async function insertNote(
|
||||
}
|
||||
}
|
||||
|
||||
function index(note: Note) {
|
||||
if (note.text == null || config.elasticsearch == null) return;
|
||||
export async function index(note: Note): Promise<void> {
|
||||
if (!note.text) return;
|
||||
|
||||
es!.index({
|
||||
index: config.elasticsearch.index || "misskey_note",
|
||||
id: note.id.toString(),
|
||||
body: {
|
||||
text: normalizeForSearch(note.text),
|
||||
userId: note.userId,
|
||||
userHost: note.userHost,
|
||||
},
|
||||
});
|
||||
if (config.elasticsearch && es) {
|
||||
es.index({
|
||||
index: config.elasticsearch.index || "misskey_note",
|
||||
id: note.id.toString(),
|
||||
body: {
|
||||
text: normalizeForSearch(note.text),
|
||||
userId: note.userId,
|
||||
userHost: note.userHost,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
if (sonic) {
|
||||
await sonic.ingest.push(
|
||||
sonic.collection,
|
||||
sonic.bucket,
|
||||
JSON.stringify({
|
||||
id: note.id,
|
||||
userId: note.userId,
|
||||
userHost: note.userHost,
|
||||
channelId: note.channelId,
|
||||
}),
|
||||
note.text,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
async function notifyToWatchersOfRenotee(
|
||||
|
7
pnpm-lock.yaml
generated
7
pnpm-lock.yaml
generated
@ -196,6 +196,7 @@ importers:
|
||||
seedrandom: ^3.0.5
|
||||
semver: 7.3.8
|
||||
sharp: 0.31.3
|
||||
sonic-channel: ^1.3.1
|
||||
speakeasy: 2.0.0
|
||||
strict-event-emitter-types: 2.0.0
|
||||
stringz: 2.1.0
|
||||
@ -310,6 +311,7 @@ importers:
|
||||
seedrandom: 3.0.5
|
||||
semver: 7.3.8
|
||||
sharp: 0.31.3
|
||||
sonic-channel: 1.3.1
|
||||
speakeasy: 2.0.0
|
||||
stringz: 2.1.0
|
||||
summaly: 2.7.0
|
||||
@ -11594,6 +11596,11 @@ packages:
|
||||
smart-buffer: 4.2.0
|
||||
dev: false
|
||||
|
||||
/sonic-channel/1.3.1:
|
||||
resolution: {integrity: sha512-+K4IZVFE7Tf2DB4EFZ23xo7a/+gJaiOHhFzXVZpzkX6Rs/rvf4YbSxnEGdYw8mrTcjtpG+jLVQEhP8sNTtN5VA==}
|
||||
engines: {node: '>= 6.0.0'}
|
||||
dev: false
|
||||
|
||||
/sort-keys-length/1.0.1:
|
||||
resolution: {integrity: sha512-GRbEOUqCxemTAk/b32F2xa8wDTs+Z1QHOkbhJDQTvv/6G3ZkbJ+frYWsTcc7cBB3Fu4wy4XlLCuNtJuMn7Gsvw==}
|
||||
engines: {node: '>=0.10.0'}
|
||||
|
Loading…
Reference in New Issue
Block a user