From f0eb26f7e89fca6d6c070651656e2554196d1b51 Mon Sep 17 00:00:00 2001 From: syuilo Date: Wed, 6 Feb 2019 13:51:02 +0900 Subject: [PATCH] Improve queue configuration Resolve #4157 Resolve #4158 --- src/argv.ts | 6 +++- src/index.ts | 80 ++++++++++++++++++++++++++++++++-------------- src/queue/index.ts | 15 ++++++--- 3 files changed, 71 insertions(+), 30 deletions(-) diff --git a/src/argv.ts b/src/argv.ts index 2a9d324a5..31325d138 100644 --- a/src/argv.ts +++ b/src/argv.ts @@ -5,11 +5,15 @@ program .version(pkg.version) .option('--no-daemons', 'Disable daemon processes (for debbuging)') .option('--disable-clustering', 'Disable clustering') - .option('--disable-queue', 'Disable job queue') + .option('--disable-queue', 'Disable job queue processing') + .option('--only-queue', 'Pocessing job queue only') .option('--quiet', 'Suppress all logs') .option('--verbose', 'Enable all logs') .option('--slow', 'Delay all requests (for debbuging)') .option('--color', 'This option is a dummy for some external program\'s (e.g. forever) issue.') .parse(process.argv); +if (process.env.MK_DISABLE_QUEUE) program.disableQueue = true; +if (process.env.MK_ONLY_QUEUE) program.onlyQueue = true; + export { program }; diff --git a/src/index.ts b/src/index.ts index c17f5ee70..a55251d3b 100644 --- a/src/index.ts +++ b/src/index.ts @@ -35,6 +35,11 @@ const ev = new Xev(); function main() { process.title = `Misskey (${cluster.isMaster ? 'master' : 'worker'})`; + if (program.onlyQueue) { + queueMain(); + return; + } + if (cluster.isMaster || program.disableClustering) { masterMain(); @@ -53,12 +58,7 @@ function main() { } } -/** - * Init master process - */ -async function masterMain() { - let config: Config; - +function greet() { if (!program.quiet) { //#region Misskey logo const v = `v${pkg.version}`; @@ -75,10 +75,34 @@ async function masterMain() { bootLogger.info('Welcome to Misskey!'); bootLogger.info(`Misskey v${pkg.version}`, true); bootLogger.info('Misskey is maintained by @syuilo, @AyaMorisawa, @mei23, and @acid-chicken.'); +} + +/** + * Init master process + */ +async function masterMain() { + greet(); + + let config: Config; try { // initialize app config = await init(); + + if (config.port == null) { + bootLogger.error('The port is not configured. Please configure port.', true); + process.exit(1); + } + + if (process.platform === 'linux' && isWellKnownPort(config.port) && !isRoot()) { + bootLogger.error('You need root privileges to listen on well-known port on Linux', true); + process.exit(1); + } + + if (!await isPortAvailable(config.port)) { + bootLogger.error(`Port ${config.port} is already in use`, true); + process.exit(1); + } } catch (e) { bootLogger.error('Fatal error occurred during initialization', true); process.exit(1); @@ -90,6 +114,9 @@ async function masterMain() { await spawnWorkers(config.clusterLimit); } + // start queue + require('./queue').default(); + bootLogger.succ(`Now listening on port ${config.port} on ${config.url}`, true); } @@ -100,15 +127,35 @@ async function workerMain() { // start server await require('./server').default(); - // start processor - require('./queue').default(); - if (cluster.isWorker) { // Send a 'ready' message to parent process process.send('ready'); } } +async function queueMain() { + greet(); + + try { + // initialize app + await init(); + } catch (e) { + bootLogger.error('Fatal error occurred during initialization', true); + process.exit(1); + } + + bootLogger.succ('Misskey initialized'); + + // start processor + const queue = require('./queue').default(); + + if (queue) { + bootLogger.succ('Queue started', true); + } else { + bootLogger.error('Queue not available'); + } +} + const runningNodejsVersion = process.version.slice(1).split('.').map(x => parseInt(x, 10)); const requiredNodejsVersion = [10, 0, 0]; const satisfyNodejsVersion = !lessThan(runningNodejsVersion, requiredNodejsVersion); @@ -170,21 +217,6 @@ async function init(): Promise { configLogger.succ('Loaded'); - if (config.port == null) { - bootLogger.error('The port is not configured. Please configure port.', true); - process.exit(1); - } - - if (process.platform === 'linux' && isWellKnownPort(config.port) && !isRoot()) { - bootLogger.error('You need root privileges to listen on well-known port on Linux', true); - process.exit(1); - } - - if (!await isPortAvailable(config.port)) { - bootLogger.error(`Port ${config.port} is already in use`, true); - process.exit(1); - } - // Try to connect to MongoDB try { await checkMongoDB(config, bootLogger); diff --git a/src/queue/index.ts b/src/queue/index.ts index cf8af17a4..161b8f9b2 100644 --- a/src/queue/index.ts +++ b/src/queue/index.ts @@ -4,13 +4,15 @@ import config from '../config'; import { ILocalUser } from '../models/user'; import { program } from '../argv'; import handler from './processors'; +import { queueLogger } from './logger'; -const enableQueue = config.redis != null && !program.disableQueue; +const enableQueue = !program.disableQueue; +const queueAvailable = config.redis != null; const queue = initializeQueue(); function initializeQueue() { - if (enableQueue) { + if (queueAvailable) { return new Queue('misskey', { redis: { port: config.redis.port, @@ -30,7 +32,7 @@ function initializeQueue() { } export function createHttpJob(data: any) { - if (enableQueue) { + if (queueAvailable) { return queue.createJob(data) .retries(4) .backoff('exponential', 16384) // 16s @@ -52,7 +54,7 @@ export function deliver(user: ILocalUser, content: any, to: any) { } export function createExportNotesJob(user: ILocalUser) { - if (!enableQueue) throw 'queue disabled'; + if (!queueAvailable) throw 'queue unavailable'; return queue.createJob({ type: 'exportNotes', @@ -62,7 +64,10 @@ export function createExportNotesJob(user: ILocalUser) { } export default function() { - if (enableQueue) { + if (queueAvailable && enableQueue) { queue.process(128, handler); + queueLogger.succ('Processing started'); } + + return queue; }