Add E2EE API (#13820)
This commit is contained in:
parent
9b7e3b4774
commit
5d8398c8b8
72 changed files with 1463 additions and 233 deletions
|
@ -144,13 +144,21 @@ const startWorker = (workerId) => {
|
|||
callbacks.forEach(callback => callback(message));
|
||||
});
|
||||
|
||||
const subscriptionHeartbeat = (channel) => {
|
||||
const interval = 6*60;
|
||||
const subscriptionHeartbeat = channels => {
|
||||
if (!Array.isArray(channels)) {
|
||||
channels = [channels];
|
||||
}
|
||||
|
||||
const interval = 6 * 60;
|
||||
|
||||
const tellSubscribed = () => {
|
||||
redisClient.set(`${redisPrefix}subscribed:${channel}`, '1', 'EX', interval*3);
|
||||
channels.forEach(channel => redisClient.set(`${redisPrefix}subscribed:${channel}`, '1', 'EX', interval * 3));
|
||||
};
|
||||
|
||||
tellSubscribed();
|
||||
const heartbeat = setInterval(tellSubscribed, interval*1000);
|
||||
|
||||
const heartbeat = setInterval(tellSubscribed, interval * 1000);
|
||||
|
||||
return () => {
|
||||
clearInterval(heartbeat);
|
||||
};
|
||||
|
@ -203,7 +211,7 @@ const startWorker = (workerId) => {
|
|||
return;
|
||||
}
|
||||
|
||||
client.query('SELECT oauth_access_tokens.resource_owner_id, users.account_id, users.chosen_languages, oauth_access_tokens.scopes FROM oauth_access_tokens INNER JOIN users ON oauth_access_tokens.resource_owner_id = users.id WHERE oauth_access_tokens.token = $1 AND oauth_access_tokens.revoked_at IS NULL LIMIT 1', [token], (err, result) => {
|
||||
client.query('SELECT oauth_access_tokens.resource_owner_id, users.account_id, users.chosen_languages, oauth_access_tokens.scopes, devices.device_id FROM oauth_access_tokens INNER JOIN users ON oauth_access_tokens.resource_owner_id = users.id LEFT OUTER JOIN devices ON oauth_access_tokens.id = devices.access_token_id WHERE oauth_access_tokens.token = $1 AND oauth_access_tokens.revoked_at IS NULL LIMIT 1', [token], (err, result) => {
|
||||
done();
|
||||
|
||||
if (err) {
|
||||
|
@ -232,6 +240,7 @@ const startWorker = (workerId) => {
|
|||
req.accountId = result.rows[0].account_id;
|
||||
req.chosenLanguages = result.rows[0].chosen_languages;
|
||||
req.allowNotifications = scopes.some(scope => ['read', 'read:notifications'].includes(scope));
|
||||
req.deviceId = result.rows[0].device_id;
|
||||
|
||||
next();
|
||||
});
|
||||
|
@ -353,11 +362,15 @@ const startWorker = (workerId) => {
|
|||
});
|
||||
};
|
||||
|
||||
const streamFrom = (id, req, output, attachCloseHandler, needsFiltering = false, notificationOnly = false) => {
|
||||
const accountId = req.accountId || req.remoteAddress;
|
||||
|
||||
const streamFrom = (ids, req, output, attachCloseHandler, needsFiltering = false, notificationOnly = false) => {
|
||||
const accountId = req.accountId || req.remoteAddress;
|
||||
const streamType = notificationOnly ? ' (notification)' : '';
|
||||
log.verbose(req.requestId, `Starting stream from ${id} for ${accountId}${streamType}`);
|
||||
|
||||
if (!Array.isArray(ids)) {
|
||||
ids = [ids];
|
||||
}
|
||||
|
||||
log.verbose(req.requestId, `Starting stream from ${ids.join(', ')} for ${accountId}${streamType}`);
|
||||
|
||||
const listener = message => {
|
||||
const { event, payload, queued_at } = JSON.parse(message);
|
||||
|
@ -430,8 +443,11 @@ const startWorker = (workerId) => {
|
|||
});
|
||||
};
|
||||
|
||||
subscribe(`${redisPrefix}${id}`, listener);
|
||||
attachCloseHandler(`${redisPrefix}${id}`, listener);
|
||||
ids.forEach(id => {
|
||||
subscribe(`${redisPrefix}${id}`, listener);
|
||||
});
|
||||
|
||||
attachCloseHandler(ids.map(id => `${redisPrefix}${id}`), listener);
|
||||
};
|
||||
|
||||
// Setup stream output to HTTP
|
||||
|
@ -458,9 +474,16 @@ const startWorker = (workerId) => {
|
|||
};
|
||||
|
||||
// Setup stream end for HTTP
|
||||
const streamHttpEnd = (req, closeHandler = false) => (id, listener) => {
|
||||
const streamHttpEnd = (req, closeHandler = false) => (ids, listener) => {
|
||||
if (!Array.isArray(ids)) {
|
||||
ids = [ids];
|
||||
}
|
||||
|
||||
req.on('close', () => {
|
||||
unsubscribe(id, listener);
|
||||
ids.forEach(id => {
|
||||
unsubscribe(id, listener);
|
||||
});
|
||||
|
||||
if (closeHandler) {
|
||||
closeHandler();
|
||||
}
|
||||
|
@ -516,8 +539,13 @@ const startWorker = (workerId) => {
|
|||
app.use(errorMiddleware);
|
||||
|
||||
app.get('/api/v1/streaming/user', (req, res) => {
|
||||
const channel = `timeline:${req.accountId}`;
|
||||
streamFrom(channel, req, streamToHttp(req, res), streamHttpEnd(req, subscriptionHeartbeat(channel)));
|
||||
const channels = [`timeline:${req.accountId}`];
|
||||
|
||||
if (req.deviceId) {
|
||||
channels.push(`timeline:${req.accountId}:${req.deviceId}`);
|
||||
}
|
||||
|
||||
streamFrom(channels, req, streamToHttp(req, res), streamHttpEnd(req, subscriptionHeartbeat(channels)));
|
||||
});
|
||||
|
||||
app.get('/api/v1/streaming/user/notification', (req, res) => {
|
||||
|
@ -597,7 +625,12 @@ const startWorker = (workerId) => {
|
|||
|
||||
switch(location.query.stream) {
|
||||
case 'user':
|
||||
channel = `timeline:${req.accountId}`;
|
||||
channel = [`timeline:${req.accountId}`];
|
||||
|
||||
if (req.deviceId) {
|
||||
channel.push(`timeline:${req.accountId}:${req.deviceId}`);
|
||||
}
|
||||
|
||||
streamFrom(channel, req, streamToWs(req, ws), streamWsEnd(req, ws, subscriptionHeartbeat(channel)));
|
||||
break;
|
||||
case 'user:notification':
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue