From 8f02d2ac3f41fda5414660bbbfa26645c54b13ff Mon Sep 17 00:00:00 2001 From: Lamp Date: Fri, 7 Dec 2018 23:11:06 -0800 Subject: [PATCH] Refactor data collector & add to ddpbridge --- src/datacollector.js | 42 +++++++++++++++++++++++++++++++++ src/ddpbridge.js | 15 +++++++++--- src/mppbridger/datacollector.js | 35 --------------------------- src/mppbridger/index.js | 7 +++++- 4 files changed, 60 insertions(+), 39 deletions(-) create mode 100644 src/datacollector.js delete mode 100644 src/mppbridger/datacollector.js diff --git a/src/datacollector.js b/src/datacollector.js new file mode 100644 index 0000000..9ec41a7 --- /dev/null +++ b/src/datacollector.js @@ -0,0 +1,42 @@ +var os = require('os'); +var zlib = require('zlib'); + +// WebSocket message data collector. Returns a function for inputting websocket messages. +// Collects inputted messages to a file and gzips them every time it reaches 8mb, +// and sends it to the output callback function. +// For recording websocket data to a discord channel. +module.exports = async function createWSMessageCollector(output) { // output func must be async + var filepath = os.tmpdir() + "/" + Math.random().toString(36).substring(2); + var size = 0; + var startDate = new Date(); + + // gzip the data & send to output callback + async function save(callback){ + fs.readFile(filepath, (err, file) => { + if (err) return console.error(err); + zlib.gzip(file, async function(err, data){ + if (err) return console.error(err); + var thisStartDate = startDate, thisEndDate = new Date(); + fs.writeFileSync(filepath, ''); + size = 0; + startDate = new Date(); + await output(data, thisStartDate, thisEndDate); + if (callback) callback(); + }); + }); + } + + // save on exit + exitHook(callback => { + save(()=>callback()); + }); + + return function input(message) { // input for websocket messages + message = message.data || message; + if (message instanceof ArrayBuffer) message = Buffer.from(message).toString('base64'); + var line = `${Date.now()} ${message}\n`; + size += line.length; + fs.appendFile(filepath, line, ()=>{}); + if (size > 8000000) {save(); size = 0;} + }; +} \ No newline at end of file diff --git a/src/ddpbridge.js b/src/ddpbridge.js index a7dd97b..70713fb 100644 --- a/src/ddpbridge.js +++ b/src/ddpbridge.js @@ -1,12 +1,22 @@ -process.env['NODE_TLS_REJECT_UNAUTHORIZED'] = 0; +process.env['NODE_TLS_REJECT_UNAUTHORIZED'] = 0; //TODO fix var WebSocket = require('ws'); var Discord = require('discord.js'); +var createWsMessageCollector = require('./datacollector'); + +var webhook = new Discord.WebhookClient(config.webhooks.ddp[0], config.webhooks.ddp[1], {disableEveryone:true}); var ws; var wasConnected = false; //var myId; +var collectWsMessage = createWsMessageCollector(async function(data, startDate, endDate){ + await ws.send({files:[{ + attachment: data, + name: `daydun piano main raw data recording from ${startDate.toISOString()} to ${endDate.toISOString()} .txt.gz` + }]}); +}); + (function connect() { ws = new WebSocket("wss://daydun.com:5012/?nick=%5Bdiscord.gg%2Fk44Eqha%5D"); ws.on("open", () => { @@ -14,6 +24,7 @@ var wasConnected = false; wasConnected = true; }); ws.on("message", message => { + collectWsMessage(message); if (typeof message != 'string') return; var transmission = JSON.parse(message); if (transmission.type == 'chat') { @@ -39,8 +50,6 @@ var wasConnected = false; }); })(); -var webhook = new Discord.WebhookClient(config.webhooks.ddp[0], config.webhooks.ddp[1], {disableEveryone:true}); - function send2discord(message) { webhook.send(message, {split:{char:'',maxLength:2000}}); } diff --git a/src/mppbridger/datacollector.js b/src/mppbridger/datacollector.js deleted file mode 100644 index c813103..0000000 --- a/src/mppbridger/datacollector.js +++ /dev/null @@ -1,35 +0,0 @@ -module.exports = async function(gClient, site, room, DiscordChannel) { - var path = require('os').tmpdir(); - var filename = `${site} ${room} .txt`.replace(/\//g, ':'); - var filepath = path + "/" + filename; - var size = 0; - var startDate = new Date(); - gClient.on('message', function(msg){ - var data = msg.data; - if (data instanceof ArrayBuffer) data = Buffer.from(data).toString('base64'); - var line = `${Date.now()} ${data}\n`; - size += line.length; - fs.appendFile(filepath, line, ()=>{}); - if (size > 8000000) {save(); size = 0;} - }); - async function save(callback){ - console.log(`saving data recording`, filename) - fs.readFile(filepath, (err, file) => { - if (err) return console.error(err); - require('zlib').gzip(file, async function(err, gzip){ - if (err) return console.error(err); - var attachmentName = `${site} ${room} raw data recording from ${startDate.toISOString()} to ${new Date().toISOString()} .txt.gz`; - await DiscordChannel.send(new Discord.MessageAttachment(gzip, attachmentName)); - fs.writeFileSync(filepath, ''); - size = 0; - startDate = new Date(); - console.log(`saved raw data recording`, attachmentName); - if (callback) callback(); - }); - }); - } - exitHook(callback => { - save(()=>callback()); - }); - gClient.dataCollectorSave = function(){save()}; // test -} \ No newline at end of file diff --git a/src/mppbridger/index.js b/src/mppbridger/index.js index e9ca9ce..263b1c7 100755 --- a/src/mppbridger/index.js +++ b/src/mppbridger/index.js @@ -246,7 +246,12 @@ global.createMPPbridge = function createMPPbridge(room, DiscordChannelID, site = require('./namecollector').collect(participant); }); // record raw data - require('./datacollector')(gClient, site, room, DiscordChannel); + //require('./datacollector')(gClient, site, room, DiscordChannel); + let createWSMessageCollector = require("../datacollector") + gClient.on("message", createWSMessageCollector(async function(data, startDate, endDate){ + var attachmentName = `${site} ${room} raw data recording from ${startDate.toISOString()} to ${endDate.toISOString()} .txt.gz`; + await DiscordChannel.send(new Discord.MessageAttachment(data, attachmentName)); + })); } if (!clients[site]) clients[site] = {};