Refactor data collector & add to ddpbridge

This commit is contained in:
Lamp 2018-12-07 23:11:06 -08:00
parent 9ddc81ae59
commit 8f02d2ac3f
No known key found for this signature in database
GPG Key ID: 0F1F8704BEDE369E
4 changed files with 60 additions and 39 deletions

42
src/datacollector.js Normal file
View File

@ -0,0 +1,42 @@
var os = require('os');
var zlib = require('zlib');
// WebSocket message data collector. Returns a function for inputting websocket messages.
// Collects inputted messages to a file and gzips them every time it reaches 8mb,
// and sends it to the output callback function.
// For recording websocket data to a discord channel.
module.exports = async function createWSMessageCollector(output) { // output func must be async
var filepath = os.tmpdir() + "/" + Math.random().toString(36).substring(2);
var size = 0;
var startDate = new Date();
// gzip the data & send to output callback
async function save(callback){
fs.readFile(filepath, (err, file) => {
if (err) return console.error(err);
zlib.gzip(file, async function(err, data){
if (err) return console.error(err);
var thisStartDate = startDate, thisEndDate = new Date();
fs.writeFileSync(filepath, '');
size = 0;
startDate = new Date();
await output(data, thisStartDate, thisEndDate);
if (callback) callback();
});
});
}
// save on exit
exitHook(callback => {
save(()=>callback());
});
return function input(message) { // input for websocket messages
message = message.data || message;
if (message instanceof ArrayBuffer) message = Buffer.from(message).toString('base64');
var line = `${Date.now()} ${message}\n`;
size += line.length;
fs.appendFile(filepath, line, ()=>{});
if (size > 8000000) {save(); size = 0;}
};
}

View File

@ -1,12 +1,22 @@
process.env['NODE_TLS_REJECT_UNAUTHORIZED'] = 0;
process.env['NODE_TLS_REJECT_UNAUTHORIZED'] = 0; //TODO fix
var WebSocket = require('ws');
var Discord = require('discord.js');
var createWsMessageCollector = require('./datacollector');
var webhook = new Discord.WebhookClient(config.webhooks.ddp[0], config.webhooks.ddp[1], {disableEveryone:true});
var ws;
var wasConnected = false;
//var myId;
var collectWsMessage = createWsMessageCollector(async function(data, startDate, endDate){
await ws.send({files:[{
attachment: data,
name: `daydun piano main raw data recording from ${startDate.toISOString()} to ${endDate.toISOString()} .txt.gz`
}]});
});
(function connect() {
ws = new WebSocket("wss://daydun.com:5012/?nick=%5Bdiscord.gg%2Fk44Eqha%5D");
ws.on("open", () => {
@ -14,6 +24,7 @@ var wasConnected = false;
wasConnected = true;
});
ws.on("message", message => {
collectWsMessage(message);
if (typeof message != 'string') return;
var transmission = JSON.parse(message);
if (transmission.type == 'chat') {
@ -39,8 +50,6 @@ var wasConnected = false;
});
})();
var webhook = new Discord.WebhookClient(config.webhooks.ddp[0], config.webhooks.ddp[1], {disableEveryone:true});
function send2discord(message) {
webhook.send(message, {split:{char:'',maxLength:2000}});
}

View File

@ -1,35 +0,0 @@
module.exports = async function(gClient, site, room, DiscordChannel) {
var path = require('os').tmpdir();
var filename = `${site} ${room} .txt`.replace(/\//g, ':');
var filepath = path + "/" + filename;
var size = 0;
var startDate = new Date();
gClient.on('message', function(msg){
var data = msg.data;
if (data instanceof ArrayBuffer) data = Buffer.from(data).toString('base64');
var line = `${Date.now()} ${data}\n`;
size += line.length;
fs.appendFile(filepath, line, ()=>{});
if (size > 8000000) {save(); size = 0;}
});
async function save(callback){
console.log(`saving data recording`, filename)
fs.readFile(filepath, (err, file) => {
if (err) return console.error(err);
require('zlib').gzip(file, async function(err, gzip){
if (err) return console.error(err);
var attachmentName = `${site} ${room} raw data recording from ${startDate.toISOString()} to ${new Date().toISOString()} .txt.gz`;
await DiscordChannel.send(new Discord.MessageAttachment(gzip, attachmentName));
fs.writeFileSync(filepath, '');
size = 0;
startDate = new Date();
console.log(`saved raw data recording`, attachmentName);
if (callback) callback();
});
});
}
exitHook(callback => {
save(()=>callback());
});
gClient.dataCollectorSave = function(){save()}; // test
}

View File

@ -246,7 +246,12 @@ global.createMPPbridge = function createMPPbridge(room, DiscordChannelID, site =
require('./namecollector').collect(participant);
});
// record raw data
require('./datacollector')(gClient, site, room, DiscordChannel);
//require('./datacollector')(gClient, site, room, DiscordChannel);
let createWSMessageCollector = require("../datacollector")
gClient.on("message", createWSMessageCollector(async function(data, startDate, endDate){
var attachmentName = `${site} ${room} raw data recording from ${startDate.toISOString()} to ${endDate.toISOString()} .txt.gz`;
await DiscordChannel.send(new Discord.MessageAttachment(data, attachmentName));
}));
}
if (!clients[site]) clients[site] = {};