0
0
Fork 0

Move to ioredis for streaming (#26581)

Co-authored-by: Emelia Smith <ThisIsMissEm@users.noreply.github.com>
This commit is contained in:
Gabriel Simmer 2023-09-01 16:44:28 +01:00 committed by GitHub
parent 9e26cd5503
commit be991f1d18
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 85 additions and 100 deletions

View file

@ -6,12 +6,12 @@ const url = require('url');
const dotenv = require('dotenv');
const express = require('express');
const Redis = require('ioredis');
const { JSDOM } = require('jsdom');
const log = require('npmlog');
const pg = require('pg');
const dbUrlToConfig = require('pg-connection-string').parse;
const metrics = require('prom-client');
const redis = require('redis');
const uuid = require('uuid');
const WebSocket = require('ws');
@ -24,30 +24,12 @@ dotenv.config({
log.level = process.env.LOG_LEVEL || 'verbose';
/**
* @param {Object.<string, any>} defaultConfig
* @param {string} redisUrl
* @param {Object.<string, any>} config
*/
const redisUrlToClient = async (defaultConfig, redisUrl) => {
const config = defaultConfig;
let client;
if (!redisUrl) {
client = redis.createClient(config);
} else if (redisUrl.startsWith('unix://')) {
client = redis.createClient(Object.assign(config, {
socket: {
path: redisUrl.slice(7),
},
}));
} else {
client = redis.createClient(Object.assign(config, {
url: redisUrl,
}));
}
const createRedisClient = async (config) => {
const { redisParams, redisUrl } = config;
const client = new Redis(redisUrl, redisParams);
client.on('error', (err) => log.error('Redis Client Error!', err));
await client.connect();
return client;
};
@ -147,23 +129,22 @@ const pgConfigFromEnv = (env) => {
* @returns {Object.<string, any>} configuration for the Redis connection
*/
const redisConfigFromEnv = (env) => {
const redisNamespace = env.REDIS_NAMESPACE || null;
// ioredis *can* transparently add prefixes for us, but it doesn't *in some cases*,
// which means we can't use it. But this is something that should be looked into.
const redisPrefix = env.REDIS_NAMESPACE ? `${env.REDIS_NAMESPACE}:` : '';
const redisParams = {
socket: {
host: env.REDIS_HOST || '127.0.0.1',
port: env.REDIS_PORT || 6379,
},
database: env.REDIS_DB || 0,
host: env.REDIS_HOST || '127.0.0.1',
port: env.REDIS_PORT || 6379,
db: env.REDIS_DB || 0,
password: env.REDIS_PASSWORD || undefined,
};
if (redisNamespace) {
redisParams.namespace = redisNamespace;
// redisParams.path takes precedence over host and port.
if (env.REDIS_URL && env.REDIS_URL.startsWith('unix://')) {
redisParams.path = env.REDIS_URL.slice(7);
}
const redisPrefix = redisNamespace ? `${redisNamespace}:` : '';
return {
redisParams,
redisPrefix,
@ -179,15 +160,15 @@ const startServer = async () => {
const pgPool = new pg.Pool(pgConfigFromEnv(process.env));
const server = http.createServer(app);
const { redisParams, redisUrl, redisPrefix } = redisConfigFromEnv(process.env);
/**
* @type {Object.<string, Array.<function(Object<string, any>): void>>}
*/
const subs = {};
const redisSubscribeClient = await redisUrlToClient(redisParams, redisUrl);
const redisClient = await redisUrlToClient(redisParams, redisUrl);
const redisConfig = redisConfigFromEnv(process.env);
const redisSubscribeClient = await createRedisClient(redisConfig);
const redisClient = await createRedisClient(redisConfig);
const { redisPrefix } = redisConfig;
// Collect metrics from Node.js
metrics.collectDefaultMetrics();
@ -277,13 +258,13 @@ const startServer = async () => {
};
/**
* @param {string} message
* @param {string} channel
* @param {string} message
*/
const onRedisMessage = (message, channel) => {
const onRedisMessage = (channel, message) => {
const callbacks = subs[channel];
log.silly(`New message on channel ${channel}`);
log.silly(`New message on channel ${redisPrefix}${channel}`);
if (!callbacks) {
return;
@ -294,6 +275,7 @@ const startServer = async () => {
callbacks.forEach(callback => callback(json));
};
redisSubscribeClient.on("message", onRedisMessage);
/**
* @callback SubscriptionListener
@ -312,8 +294,14 @@ const startServer = async () => {
if (subs[channel].length === 0) {
log.verbose(`Subscribe ${channel}`);
redisSubscribeClient.subscribe(channel, onRedisMessage);
redisSubscriptions.inc();
redisSubscribeClient.subscribe(channel, (err, count) => {
if (err) {
log.error(`Error subscribing to ${channel}`);
}
else {
redisSubscriptions.set(count);
}
});
}
subs[channel].push(callback);
@ -334,8 +322,14 @@ const startServer = async () => {
if (subs[channel].length === 0) {
log.verbose(`Unsubscribe ${channel}`);
redisSubscribeClient.unsubscribe(channel);
redisSubscriptions.dec();
redisSubscribeClient.unsubscribe(channel, (err, count) => {
if (err) {
log.error(`Error unsubscribing to ${channel}`);
}
else {
redisSubscriptions.set(count);
}
});
delete subs[channel];
}
};