0
0
Fork 0

Add logging and filtering to the node.js streaming API

This commit is contained in:
Eugen Rochko 2017-02-02 13:56:14 +01:00
parent 0cfd3188b4
commit 1ee4a17f37
4 changed files with 102 additions and 10 deletions

View file

@ -2,6 +2,7 @@ import dotenv from 'dotenv'
import express from 'express'
import redis from 'redis'
import pg from 'pg'
import log from 'npmlog'
dotenv.config()
@ -40,6 +41,7 @@ const authenticationMiddleware = (req, res, next) => {
pgPool.connect((err, client, done) => {
if (err) {
log.error(err)
return next(err)
}
@ -47,6 +49,7 @@ const authenticationMiddleware = (req, res, next) => {
done()
if (err) {
log.error(err)
return next(err)
}
@ -66,10 +69,12 @@ const authenticationMiddleware = (req, res, next) => {
const errorMiddleware = (err, req, res, next) => {
res.writeHead(err.statusCode || 500, { 'Content-Type': 'application/json' })
res.end(JSON.stringify({ error: `${err}` }))
res.end(JSON.stringify({ error: err.statusCode ? `${err}` : 'An unexpected error occured' }))
}
const streamFrom = (id, res) => {
const streamFrom = (id, req, res, needsFiltering = false) => {
log.verbose(`Starting stream from ${id} for ${req.accountId}`)
res.setHeader('Content-Type', 'text/event-stream')
res.setHeader('Transfer-Encoding', 'chunked')
@ -78,11 +83,40 @@ const streamFrom = (id, res) => {
redisClient.on('message', (channel, message) => {
const { event, payload } = JSON.parse(message)
res.write(`event: ${event}\n`)
res.write(`data: ${payload}\n\n`)
if (needsFiltering) {
pgPool.connect((err, client, done) => {
if (err) {
log.error(err)
return
}
const unpackedPayload = JSON.parse(payload)
const targetAccountIds = [unpackedPayload.account.id] + unpackedPayload.mentions.map(item => item.id) + (unpackedPayload.reblog ? unpackedPayload.reblog.account.id : [])
client.query('SELECT target_account_id FROM blocks WHERE account_id = $1 AND target_account_id IN ($2)', [req.accountId, targetAccountIds], (err, result) => {
done()
if (err) {
log.error(err)
return
}
if (result.rows.length > 0) {
return
}
res.write(`event: ${event}\n`)
res.write(`data: ${payload}\n\n`)
})
})
} else {
res.write(`event: ${event}\n`)
res.write(`data: ${payload}\n\n`)
}
})
setInterval(() => res.write('\n'), 15000)
// Heartbeat to keep connection alive
setInterval(() => res.write(':thump\n'), 15000)
redisClient.subscribe(id)
}
@ -90,8 +124,11 @@ const streamFrom = (id, res) => {
app.use(authenticationMiddleware)
app.use(errorMiddleware)
app.get('/api/v1/streaming/user', (req, res) => streamFrom(`timeline:${req.accountId}`, res))
app.get('/api/v1/streaming/public', (_, res) => streamFrom('timeline:public', res))
app.get('/api/v1/streaming/hashtag', (req, res) => streamFrom(`timeline:hashtag:${req.params.tag}`, res))
app.get('/api/v1/streaming/user', (req, res) => streamFrom(`timeline:${req.accountId}`, req, res))
app.get('/api/v1/streaming/public', (req, res) => streamFrom('timeline:public', req, res, true))
app.get('/api/v1/streaming/hashtag', (req, res) => streamFrom(`timeline:hashtag:${req.params.tag}`, req, res, true))
app.listen(4000)
log.level = 'verbose'
log.info(`Starting HTTP server on port ${process.env.PORT || 4000}`)
app.listen(process.env.PORT || 4000)