From a4de0e364b9f73299ab121340c0f5ab27125be9e Mon Sep 17 00:00:00 2001 From: Emelia Smith Date: Tue, 28 Nov 2023 15:24:41 +0100 Subject: [PATCH] Refactor streaming to simplify for logging change (#28056) --- streaming/index.js | 104 +++++------------------------------------- streaming/metrics.js | 105 +++++++++++++++++++++++++++++++++++++++++++ streaming/utils.js | 22 +++++++++ 3 files changed, 139 insertions(+), 92 deletions(-) create mode 100644 streaming/metrics.js create mode 100644 streaming/utils.js diff --git a/streaming/index.js b/streaming/index.js index e3b63b53a6..fb3e3fb2be 100644 --- a/streaming/index.js +++ b/streaming/index.js @@ -12,10 +12,12 @@ const { JSDOM } = require('jsdom'); const log = require('npmlog'); const pg = require('pg'); const dbUrlToConfig = require('pg-connection-string').parse; -const metrics = require('prom-client'); const uuid = require('uuid'); const WebSocket = require('ws'); +const { setupMetrics } = require('./metrics'); +const { isTruthy } = require("./utils"); + const environment = process.env.NODE_ENV || 'development'; // Correctly detect and load .env or .env.production file based on environment: @@ -196,78 +198,15 @@ const startServer = async () => { const redisClient = await createRedisClient(redisConfig); const { redisPrefix } = redisConfig; - // Collect metrics from Node.js - metrics.collectDefaultMetrics(); - - new metrics.Gauge({ - name: 'pg_pool_total_connections', - help: 'The total number of clients existing within the pool', - collect() { - this.set(pgPool.totalCount); - }, - }); - - new metrics.Gauge({ - name: 'pg_pool_idle_connections', - help: 'The number of clients which are not checked out but are currently idle in the pool', - collect() { - this.set(pgPool.idleCount); - }, - }); - - new metrics.Gauge({ - name: 'pg_pool_waiting_queries', - help: 'The number of queued requests waiting on a client when all clients are checked out', - collect() { - this.set(pgPool.waitingCount); - }, - }); - - const connectedClients = new metrics.Gauge({ - name: 'connected_clients', - help: 'The number of clients connected to the streaming server', - labelNames: ['type'], - }); - - const connectedChannels = new metrics.Gauge({ - name: 'connected_channels', - help: 'The number of channels the streaming server is streaming to', - labelNames: [ 'type', 'channel' ] - }); - - const redisSubscriptions = new metrics.Gauge({ - name: 'redis_subscriptions', - help: 'The number of Redis channels the streaming server is subscribed to', - }); - - const redisMessagesReceived = new metrics.Counter({ - name: 'redis_messages_received_total', - help: 'The total number of messages the streaming server has received from redis subscriptions' - }); - - const messagesSent = new metrics.Counter({ - name: 'messages_sent_total', - help: 'The total number of messages the streaming server sent to clients per connection type', - labelNames: [ 'type' ] - }); - - // Prime the gauges so we don't loose metrics between restarts: - redisSubscriptions.set(0); - connectedClients.set({ type: 'websocket' }, 0); - connectedClients.set({ type: 'eventsource' }, 0); - - // For each channel, initialize the gauges at zero; There's only a finite set of channels available - CHANNEL_NAMES.forEach(( channel ) => { - connectedChannels.set({ type: 'websocket', channel }, 0); - connectedChannels.set({ type: 'eventsource', channel }, 0); - }); - - // Prime the counters so that we don't loose metrics between restarts. - // Unfortunately counters don't support the set() API, so instead I'm using - // inc(0) to achieve the same result. - redisMessagesReceived.inc(0); - messagesSent.inc({ type: 'websocket' }, 0); - messagesSent.inc({ type: 'eventsource' }, 0); + const metrics = setupMetrics(CHANNEL_NAMES, pgPool); + // TODO: migrate all metrics to metrics.X.method() instead of just X.method() + const { + connectedClients, + connectedChannels, + redisSubscriptions, + redisMessagesReceived, + messagesSent, + } = metrics; // When checking metrics in the browser, the favicon is requested this // prevents the request from falling through to the API Router, which would @@ -388,25 +327,6 @@ const startServer = async () => { } }; - const FALSE_VALUES = [ - false, - 0, - '0', - 'f', - 'F', - 'false', - 'FALSE', - 'off', - 'OFF', - ]; - - /** - * @param {any} value - * @returns {boolean} - */ - const isTruthy = value => - value && !FALSE_VALUES.includes(value); - /** * @param {any} req * @param {any} res diff --git a/streaming/metrics.js b/streaming/metrics.js new file mode 100644 index 0000000000..d05b4c9b16 --- /dev/null +++ b/streaming/metrics.js @@ -0,0 +1,105 @@ +// @ts-check + +const metrics = require('prom-client'); + +/** + * @typedef StreamingMetrics + * @property {metrics.Registry} register + * @property {metrics.Gauge<"type">} connectedClients + * @property {metrics.Gauge<"type" | "channel">} connectedChannels + * @property {metrics.Gauge} redisSubscriptions + * @property {metrics.Counter} redisMessagesReceived + * @property {metrics.Counter<"type">} messagesSent + */ + +/** + * + * @param {string[]} channels + * @param {import('pg').Pool} pgPool + * @returns {StreamingMetrics} + */ +function setupMetrics(channels, pgPool) { + // Collect metrics from Node.js + metrics.collectDefaultMetrics(); + + new metrics.Gauge({ + name: 'pg_pool_total_connections', + help: 'The total number of clients existing within the pool', + collect() { + this.set(pgPool.totalCount); + }, + }); + + new metrics.Gauge({ + name: 'pg_pool_idle_connections', + help: 'The number of clients which are not checked out but are currently idle in the pool', + collect() { + this.set(pgPool.idleCount); + }, + }); + + new metrics.Gauge({ + name: 'pg_pool_waiting_queries', + help: 'The number of queued requests waiting on a client when all clients are checked out', + collect() { + this.set(pgPool.waitingCount); + }, + }); + + const connectedClients = new metrics.Gauge({ + name: 'connected_clients', + help: 'The number of clients connected to the streaming server', + labelNames: ['type'], + }); + + const connectedChannels = new metrics.Gauge({ + name: 'connected_channels', + help: 'The number of channels the streaming server is streaming to', + labelNames: [ 'type', 'channel' ] + }); + + const redisSubscriptions = new metrics.Gauge({ + name: 'redis_subscriptions', + help: 'The number of Redis channels the streaming server is subscribed to', + }); + + const redisMessagesReceived = new metrics.Counter({ + name: 'redis_messages_received_total', + help: 'The total number of messages the streaming server has received from redis subscriptions' + }); + + const messagesSent = new metrics.Counter({ + name: 'messages_sent_total', + help: 'The total number of messages the streaming server sent to clients per connection type', + labelNames: [ 'type' ] + }); + + // Prime the gauges so we don't loose metrics between restarts: + redisSubscriptions.set(0); + connectedClients.set({ type: 'websocket' }, 0); + connectedClients.set({ type: 'eventsource' }, 0); + + // For each channel, initialize the gauges at zero; There's only a finite set of channels available + channels.forEach(( channel ) => { + connectedChannels.set({ type: 'websocket', channel }, 0); + connectedChannels.set({ type: 'eventsource', channel }, 0); + }); + + // Prime the counters so that we don't loose metrics between restarts. + // Unfortunately counters don't support the set() API, so instead I'm using + // inc(0) to achieve the same result. + redisMessagesReceived.inc(0); + messagesSent.inc({ type: 'websocket' }, 0); + messagesSent.inc({ type: 'eventsource' }, 0); + + return { + register: metrics.register, + connectedClients, + connectedChannels, + redisSubscriptions, + redisMessagesReceived, + messagesSent, + }; +} + +exports.setupMetrics = setupMetrics; diff --git a/streaming/utils.js b/streaming/utils.js new file mode 100644 index 0000000000..ad8dd4889f --- /dev/null +++ b/streaming/utils.js @@ -0,0 +1,22 @@ +// @ts-check + +const FALSE_VALUES = [ + false, + 0, + '0', + 'f', + 'F', + 'false', + 'FALSE', + 'off', + 'OFF', +]; + +/** + * @param {any} value + * @returns {boolean} + */ +const isTruthy = value => + value && !FALSE_VALUES.includes(value); + +exports.isTruthy = isTruthy;