diff --git a/streaming/index.js b/streaming/index.js index 5ef1f6f319..b302565a4e 100644 --- a/streaming/index.js +++ b/streaming/index.js @@ -111,6 +111,35 @@ const startServer = async () => { const server = http.createServer(); const wss = new WebSocketServer({ noServer: true }); + /** + * Adds a namespace to Redis keys or channel names + * Fixes: https://github.com/redis/ioredis/issues/1910 + * @param {string} keyOrChannel + * @returns {string} + */ + function redisNamespaced(keyOrChannel) { + if (redisConfig.namespace) { + return `${redisConfig.namespace}:${keyOrChannel}`; + } else { + return keyOrChannel; + } + } + + /** + * Removes the redis namespace from a channel name + * @param {string} channel + * @returns {string} + */ + function redisUnnamespaced(channel) { + if (typeof redisConfig.namespace === "string") { + // Note: this removes the configured namespace and the colon that is used + // to separate it: + return channel.slice(redisConfig.namespace.length + 1); + } else { + return channel; + } + } + // Set the X-Request-Id header on WebSockets: wss.on("headers", function onHeaders(headers, req) { headers.push(`X-Request-Id: ${req.id}`); @@ -200,7 +229,6 @@ const startServer = async () => { const subs = {}; const redisSubscribeClient = Redis.createClient(redisConfig, logger); - const { redisPrefix } = redisConfig; // When checking metrics in the browser, the favicon is requested this // prevents the request from falling through to the API Router, which would @@ -222,7 +250,7 @@ const startServer = async () => { const interval = 6 * 60; const tellSubscribed = () => { - channels.forEach(channel => redisClient.set(`${redisPrefix}subscribed:${channel}`, '1', 'EX', interval * 3)); + channels.forEach(channel => redisClient.set(redisNamespaced(`subscribed:${channel}`), '1', 'EX', interval * 3)); }; tellSubscribed(); @@ -240,11 +268,10 @@ const startServer = async () => { */ const onRedisMessage = (channel, message) => { metrics.redisMessagesReceived.inc(); + logger.debug(`New message on channel ${channel}`); - const callbacks = subs[channel]; - - logger.debug(`New message on channel ${redisPrefix}${channel}`); - + const key = redisUnnamespaced(channel); + const callbacks = subs[key]; if (!callbacks) { return; } @@ -273,7 +300,8 @@ const startServer = async () => { if (subs[channel].length === 0) { logger.debug(`Subscribe ${channel}`); - redisSubscribeClient.subscribe(channel, (err, count) => { + + redisSubscribeClient.subscribe(redisNamespaced(channel), (err, count) => { if (err) { logger.error(`Error subscribing to ${channel}`); } else if (typeof count === 'number') { @@ -300,7 +328,9 @@ const startServer = async () => { if (subs[channel].length === 0) { logger.debug(`Unsubscribe ${channel}`); - redisSubscribeClient.unsubscribe(channel, (err, count) => { + + // FIXME: https://github.com/redis/ioredis/issues/1910 + redisSubscribeClient.unsubscribe(redisNamespaced(channel), (err, count) => { if (err) { logger.error(`Error unsubscribing to ${channel}`); } else if (typeof count === 'number') { @@ -481,14 +511,14 @@ const startServer = async () => { }); res.on('close', () => { - unsubscribe(`${redisPrefix}${accessTokenChannelId}`, listener); - unsubscribe(`${redisPrefix}${systemChannelId}`, listener); + unsubscribe(accessTokenChannelId, listener); + unsubscribe(systemChannelId, listener); metrics.connectedChannels.labels({ type: 'eventsource', channel: 'system' }).dec(2); }); - subscribe(`${redisPrefix}${accessTokenChannelId}`, listener); - subscribe(`${redisPrefix}${systemChannelId}`, listener); + subscribe(accessTokenChannelId, listener); + subscribe(systemChannelId, listener); metrics.connectedChannels.labels({ type: 'eventsource', channel: 'system' }).inc(2); }; @@ -805,11 +835,11 @@ const startServer = async () => { }; channelIds.forEach(id => { - subscribe(`${redisPrefix}${id}`, listener); + subscribe(id, listener); }); if (typeof attachCloseHandler === 'function') { - attachCloseHandler(channelIds.map(id => `${redisPrefix}${id}`), listener); + attachCloseHandler(channelIds, listener); } return listener; @@ -1156,7 +1186,7 @@ const startServer = async () => { } channelIds.forEach(channelId => { - unsubscribe(`${redisPrefix}${channelId}`, subscription.listener); + unsubscribe(channelId, subscription.listener); }); metrics.connectedChannels.labels({ type: 'websocket', channel: subscription.channelName }).dec(); @@ -1200,8 +1230,8 @@ const startServer = async () => { }, }); - subscribe(`${redisPrefix}${accessTokenChannelId}`, listener); - subscribe(`${redisPrefix}${systemChannelId}`, listener); + subscribe(accessTokenChannelId, listener); + subscribe(systemChannelId, listener); subscriptions[accessTokenChannelId] = { channelName: 'system', diff --git a/streaming/redis.js b/streaming/redis.js index 208d6ae078..2a36b89dc5 100644 --- a/streaming/redis.js +++ b/streaming/redis.js @@ -4,44 +4,114 @@ import { parseIntFromEnvValue } from './utils.js'; /** * @typedef RedisConfiguration - * @property {import('ioredis').RedisOptions} redisParams - * @property {string} redisPrefix - * @property {string|undefined} redisUrl + * @property {string|undefined} namespace + * @property {string|undefined} url + * @property {import('ioredis').RedisOptions} options */ +/** + * + * @param {NodeJS.ProcessEnv} env + * @returns {boolean} + */ +function hasSentinelConfiguration(env) { + return ( + typeof env.REDIS_SENTINELS === 'string' && + env.REDIS_SENTINELS.length > 0 && + typeof env.REDIS_SENTINEL_MASTER === 'string' && + env.REDIS_SENTINEL_MASTER.length > 0 + ); +} + +/** + * + * @param {NodeJS.ProcessEnv} env + * @param {import('ioredis').SentinelConnectionOptions} commonOptions + * @returns {import('ioredis').SentinelConnectionOptions} + */ +function getSentinelConfiguration(env, commonOptions) { + const redisDatabase = parseIntFromEnvValue(env.REDIS_DB, 0, 'REDIS_DB'); + const sentinelPort = parseIntFromEnvValue(env.REDIS_SENTINEL_PORT, 26379, 'REDIS_SENTINEL_PORT'); + + const sentinels = env.REDIS_SENTINELS.split(',').map((sentinel) => { + const [host, port] = sentinel.split(':', 2); + + /** @type {import('ioredis').SentinelAddress} */ + return { + host: host, + port: port ?? sentinelPort, + // Force support for both IPv6 and IPv4, by default ioredis sets this to 4, + // only allowing IPv4 connections: + // https://github.com/redis/ioredis/issues/1576 + family: 0 + }; + }); + + return { + db: redisDatabase, + name: env.REDIS_SENTINEL_MASTER, + username: env.REDIS_USERNAME, + password: env.REDIS_PASSWORD, + sentinelUsername: env.REDIS_SENTINEL_USERNAME ?? env.REDIS_USERNAME, + sentinelPassword: env.REDIS_SENTINEL_PASSWORD ?? env.REDIS_PASSWORD, + sentinels, + ...commonOptions, + }; +} + /** * @param {NodeJS.ProcessEnv} env the `process.env` value to read configuration from * @returns {RedisConfiguration} configuration for the Redis connection */ export function configFromEnv(env) { - // ioredis *can* transparently add prefixes for us, but it doesn't *in some cases*, - // which means we can't use it. But this is something that should be looked into. - const redisPrefix = env.REDIS_NAMESPACE ? `${env.REDIS_NAMESPACE}:` : ''; + const redisNamespace = env.REDIS_NAMESPACE; + // These options apply for both REDIS_URL based connections and connections + // using the other REDIS_* environment variables: + const commonOptions = { + // Force support for both IPv6 and IPv4, by default ioredis sets this to 4, + // only allowing IPv4 connections: + // https://github.com/redis/ioredis/issues/1576 + family: 0 + // Note: we don't use auto-prefixing of keys since this doesn't apply to + // subscribe/unsubscribe which have "channel" instead of "key" arguments + }; + + // If we receive REDIS_URL, don't continue parsing any other REDIS_* + // environment variables: + if (typeof env.REDIS_URL === 'string' && env.REDIS_URL.length > 0) { + return { + url: env.REDIS_URL, + options: commonOptions, + namespace: redisNamespace + }; + } + + // If we have configuration for Redis Sentinel mode, prefer that: + if (hasSentinelConfiguration(env)) { + return { + options: getSentinelConfiguration(env, commonOptions), + namespace: redisNamespace + }; + } + + // Finally, handle all the other REDIS_* environment variables: let redisPort = parseIntFromEnvValue(env.REDIS_PORT, 6379, 'REDIS_PORT'); let redisDatabase = parseIntFromEnvValue(env.REDIS_DB, 0, 'REDIS_DB'); /** @type {import('ioredis').RedisOptions} */ - const redisParams = { - host: env.REDIS_HOST || '127.0.0.1', + const options = { + host: env.REDIS_HOST ?? '127.0.0.1', port: redisPort, - // Force support for both IPv6 and IPv4, by default ioredis sets this to 4, - // only allowing IPv4 connections: - // https://github.com/redis/ioredis/issues/1576 - family: 0, db: redisDatabase, - password: env.REDIS_PASSWORD || undefined, + username: env.REDIS_USERNAME, + password: env.REDIS_PASSWORD, + ...commonOptions, }; - // redisParams.path takes precedence over host and port. - if (env.REDIS_URL && env.REDIS_URL.startsWith('unix://')) { - redisParams.path = env.REDIS_URL.slice(7); - } - return { - redisParams, - redisPrefix, - redisUrl: typeof env.REDIS_URL === 'string' ? env.REDIS_URL : undefined, + options, + namespace: redisNamespace }; } @@ -50,13 +120,13 @@ export function configFromEnv(env) { * @param {import('pino').Logger} logger * @returns {Redis} */ -export function createClient({ redisParams, redisUrl }, logger) { +export function createClient({ url, options }, logger) { let client; - if (typeof redisUrl === 'string') { - client = new Redis(redisUrl, redisParams); + if (typeof url === 'string') { + client = new Redis(url, options); } else { - client = new Redis(redisParams); + client = new Redis(options); } client.on('error', (err) => logger.error({ err }, 'Redis Client Error!'));