From 198a8ceb3a5d153a3a7ddc9ea9e4a38a517d5037 Mon Sep 17 00:00:00 2001 From: Lamp Date: Tue, 4 Jan 2022 16:01:27 -0800 Subject: [PATCH] rewrite data collector maybe it will work --- datacollector.js | 42 ------------------------------------------ ddpbridge.js | 8 ++++---- lib/datacollector.js | 28 ++++++++++++++++++++++++++++ main.js | 2 +- misc.js | 2 +- mppbridger.js | 15 +++++++-------- 6 files changed, 41 insertions(+), 56 deletions(-) delete mode 100644 datacollector.js create mode 100644 lib/datacollector.js diff --git a/datacollector.js b/datacollector.js deleted file mode 100644 index db23500..0000000 --- a/datacollector.js +++ /dev/null @@ -1,42 +0,0 @@ -var os = require('os'); -var zlib = require('zlib'); - -// WebSocket message data collector. Returns a function for inputting websocket messages. -// Collects inputted messages to a file and gzips them every time it reaches 8mb, -// and sends it to the output callback function. -// For recording websocket data to a discord channel. -module.exports = function createWSMessageCollector(output) { // output func must be async - var filepath = os.tmpdir() + "/" + Math.random().toString(36).substring(2); - var size = 0; - var startDate = new Date(); - - // gzip the data & send to output callback - async function save(callback){ - fs.readFile(filepath, (err, file) => { - if (err) return handleError(err); - zlib.gzip(file, async function(err, data){ - if (err) return handleError(err); - var thisStartDate = startDate, thisEndDate = new Date(); - fs.writeFileSync(filepath, ''); - size = 0; - startDate = new Date(); - await output(data, thisStartDate, thisEndDate); - if (callback) callback(); - }); - }); - } - - // save on exit - exitHook(callback => { - save(()=>callback()); - }); - - return function input(message) { // input for websocket messages - message = message.data || message; - if (message instanceof ArrayBuffer) message = Buffer.from(message).toString('base64'); - var line = `${Date.now()} ${message}\n`; - size += line.length; - fs.appendFileSync(filepath, line); - if (size > 8000000) {save(); size = 0;} - }; -} \ No newline at end of file diff --git a/ddpbridge.js b/ddpbridge.js index a839d50..8132f3c 100644 --- a/ddpbridge.js +++ b/ddpbridge.js @@ -1,14 +1,14 @@ var WebSocket = require('ws'); var Discord = require('discord.js'); -var createWsMessageCollector = require('./datacollector'); +var WebSocketMessageCollector = require('./lib/datacollector'); -var webhook = new Discord.WebhookClient({url: config.webhooks.ddp, allowedMentions: {parse: []}}); +var webhook = new Discord.WebhookClient({url: config.webhooks.ddp}, {allowedMentions: {parse: []}}); var ws; var wasConnected = false; //var myId; -var collectWsMessage = createWsMessageCollector(async function(data, startDate, endDate){ +var wsc = new WebSocketMessageCollector(async function(data, startDate, endDate){ await webhook.send({files:[{ attachment: data, name: `daydun piano main raw data recording from ${startDate.toISOString()} to ${endDate.toISOString()} .txt.gz` @@ -22,7 +22,7 @@ var collectWsMessage = createWsMessageCollector(async function(data, startDate, wasConnected = true; }); ws.on("message", message => { - collectWsMessage(message); + wsc.collect(message); if (typeof message != 'string') return; var transmission = JSON.parse(message); if (transmission.type == 'chat') { diff --git a/lib/datacollector.js b/lib/datacollector.js new file mode 100644 index 0000000..2fed304 --- /dev/null +++ b/lib/datacollector.js @@ -0,0 +1,28 @@ +var gzip = require("util").promisify(require("zlib").gzip); + +module.exports = class WebSocketMessageCollector { + constructor(dispatchFunction) { + this.maxSize = 8000000; + this.data = ""; + this.startDate = new Date(); + this.dispatchFunction = dispatchFunction; + exitHook(cb => { + this.dispatchFunction().then(cb); + }); + } + + collect(message) { + message = message.data || message; + if (message instanceof ArrayBuffer) message = Buffer.from(message).toString('base64'); + var line = `${Date.now()} ${message}\n`; + this.data += line; + if (this.data.length > this.maxSize) this.package(); + } + + async package() { + var data = this.data, startDate = this.startDate, endDate = new Date(); + this.data = "", this.startDate = new Date(); + data = await gzip(data); + await this.dispatchFunction(data, startDate, endDate); + } +} \ No newline at end of file diff --git a/main.js b/main.js index dcdc83d..dad658d 100644 --- a/main.js +++ b/main.js @@ -13,7 +13,7 @@ global.dClient = new Discord.Client({ // error handling { - let webhook = new Discord.WebhookClient({url: config.webhooks.error, allowedMentions: {parse: []}}); + let webhook = new Discord.WebhookClient({url: config.webhooks.error}, {allowedMentions: {parse: []}}); global.handleError = function logError(error, title) { let msg = error && (error.stack || error.message || error); console.error(title + ':\n' + msg); diff --git a/misc.js b/misc.js index dcb9316..33aaa1f 100644 --- a/misc.js +++ b/misc.js @@ -1,6 +1,6 @@ // join/leave (async function(){ - var webhook = new Discord.WebhookClient({url: config.webhooks.welcome, allowedMentions: {parse: []}}); + var webhook = new Discord.WebhookClient({url: config.webhooks.welcome}, {allowedMentions: {parse: []}}); dClient.on('guildMemberAdd', async member => { if (member.guild.id != config.guildID) return; let username = member.user.username.toLowerCase().includes('clyde') ? member.user.username.replace(/C/g,'Q').replace(/c/g,'q') : member.user.username; diff --git a/mppbridger.js b/mppbridger.js index cb143e8..f760144 100644 --- a/mppbridger.js +++ b/mppbridger.js @@ -1,4 +1,5 @@ var Client = require('./lib/Client.js'); +var WebSocketMessageCollector = require("./lib/datacollector"); global.createMPPbridge = async function createMPPbridge({room, channel, uri}) { channel = dClient.channels.resolve(channel); @@ -190,14 +191,12 @@ global.createMPPbridge = async function createMPPbridge({room, channel, uri}) { // addons - { - // record raw data - let createWSMessageCollector = require("./datacollector") - gClient.on("message", createWSMessageCollector(async function(data, startDate, endDate){ - var attachmentName = `${uri} ${room} raw data recording from ${startDate.toISOString()} to ${endDate.toISOString()} .txt.gz`; - await channel.send(new Discord.MessageAttachment(data, attachmentName)); - })); - } + var wsc = new WebSocketMessageCollector(async function(data, startDate, endDate) { + var attachmentName = `${uri} ${room} raw data recording from ${startDate.toISOString()} to ${endDate.toISOString()} .txt.gz`; + await channel.send(new Discord.MessageAttachment(data, attachmentName)); + }); + gClient.on("message", wsc.collect.bind(wsc)); + return gClient; };