rewrite data collector

maybe it will work
This commit is contained in:
Lamp 2022-01-04 16:01:27 -08:00
parent e8b4334e3a
commit 198a8ceb3a
6 changed files with 41 additions and 56 deletions

View File

@ -1,42 +0,0 @@
var os = require('os');
var zlib = require('zlib');
// WebSocket message data collector. Returns a function for inputting websocket messages.
// Collects inputted messages to a file and gzips them every time it reaches 8mb,
// and sends it to the output callback function.
// For recording websocket data to a discord channel.
module.exports = function createWSMessageCollector(output) { // output func must be async
var filepath = os.tmpdir() + "/" + Math.random().toString(36).substring(2);
var size = 0;
var startDate = new Date();
// gzip the data & send to output callback
async function save(callback){
fs.readFile(filepath, (err, file) => {
if (err) return handleError(err);
zlib.gzip(file, async function(err, data){
if (err) return handleError(err);
var thisStartDate = startDate, thisEndDate = new Date();
fs.writeFileSync(filepath, '');
size = 0;
startDate = new Date();
await output(data, thisStartDate, thisEndDate);
if (callback) callback();
});
});
}
// save on exit
exitHook(callback => {
save(()=>callback());
});
return function input(message) { // input for websocket messages
message = message.data || message;
if (message instanceof ArrayBuffer) message = Buffer.from(message).toString('base64');
var line = `${Date.now()} ${message}\n`;
size += line.length;
fs.appendFileSync(filepath, line);
if (size > 8000000) {save(); size = 0;}
};
}

View File

@ -1,14 +1,14 @@
var WebSocket = require('ws'); var WebSocket = require('ws');
var Discord = require('discord.js'); var Discord = require('discord.js');
var createWsMessageCollector = require('./datacollector'); var WebSocketMessageCollector = require('./lib/datacollector');
var webhook = new Discord.WebhookClient({url: config.webhooks.ddp, allowedMentions: {parse: []}}); var webhook = new Discord.WebhookClient({url: config.webhooks.ddp}, {allowedMentions: {parse: []}});
var ws; var ws;
var wasConnected = false; var wasConnected = false;
//var myId; //var myId;
var collectWsMessage = createWsMessageCollector(async function(data, startDate, endDate){ var wsc = new WebSocketMessageCollector(async function(data, startDate, endDate){
await webhook.send({files:[{ await webhook.send({files:[{
attachment: data, attachment: data,
name: `daydun piano main raw data recording from ${startDate.toISOString()} to ${endDate.toISOString()} .txt.gz` name: `daydun piano main raw data recording from ${startDate.toISOString()} to ${endDate.toISOString()} .txt.gz`
@ -22,7 +22,7 @@ var collectWsMessage = createWsMessageCollector(async function(data, startDate,
wasConnected = true; wasConnected = true;
}); });
ws.on("message", message => { ws.on("message", message => {
collectWsMessage(message); wsc.collect(message);
if (typeof message != 'string') return; if (typeof message != 'string') return;
var transmission = JSON.parse(message); var transmission = JSON.parse(message);
if (transmission.type == 'chat') { if (transmission.type == 'chat') {

28
lib/datacollector.js Normal file
View File

@ -0,0 +1,28 @@
var gzip = require("util").promisify(require("zlib").gzip);
module.exports = class WebSocketMessageCollector {
constructor(dispatchFunction) {
this.maxSize = 8000000;
this.data = "";
this.startDate = new Date();
this.dispatchFunction = dispatchFunction;
exitHook(cb => {
this.dispatchFunction().then(cb);
});
}
collect(message) {
message = message.data || message;
if (message instanceof ArrayBuffer) message = Buffer.from(message).toString('base64');
var line = `${Date.now()} ${message}\n`;
this.data += line;
if (this.data.length > this.maxSize) this.package();
}
async package() {
var data = this.data, startDate = this.startDate, endDate = new Date();
this.data = "", this.startDate = new Date();
data = await gzip(data);
await this.dispatchFunction(data, startDate, endDate);
}
}

View File

@ -13,7 +13,7 @@ global.dClient = new Discord.Client({
// error handling // error handling
{ {
let webhook = new Discord.WebhookClient({url: config.webhooks.error, allowedMentions: {parse: []}}); let webhook = new Discord.WebhookClient({url: config.webhooks.error}, {allowedMentions: {parse: []}});
global.handleError = function logError(error, title) { global.handleError = function logError(error, title) {
let msg = error && (error.stack || error.message || error); let msg = error && (error.stack || error.message || error);
console.error(title + ':\n' + msg); console.error(title + ':\n' + msg);

View File

@ -1,6 +1,6 @@
// join/leave // join/leave
(async function(){ (async function(){
var webhook = new Discord.WebhookClient({url: config.webhooks.welcome, allowedMentions: {parse: []}}); var webhook = new Discord.WebhookClient({url: config.webhooks.welcome}, {allowedMentions: {parse: []}});
dClient.on('guildMemberAdd', async member => { dClient.on('guildMemberAdd', async member => {
if (member.guild.id != config.guildID) return; if (member.guild.id != config.guildID) return;
let username = member.user.username.toLowerCase().includes('clyde') ? member.user.username.replace(/C/g,'Q').replace(/c/g,'q') : member.user.username; let username = member.user.username.toLowerCase().includes('clyde') ? member.user.username.replace(/C/g,'Q').replace(/c/g,'q') : member.user.username;

View File

@ -1,4 +1,5 @@
var Client = require('./lib/Client.js'); var Client = require('./lib/Client.js');
var WebSocketMessageCollector = require("./lib/datacollector");
global.createMPPbridge = async function createMPPbridge({room, channel, uri}) { global.createMPPbridge = async function createMPPbridge({room, channel, uri}) {
channel = dClient.channels.resolve(channel); channel = dClient.channels.resolve(channel);
@ -190,14 +191,12 @@ global.createMPPbridge = async function createMPPbridge({room, channel, uri}) {
// addons // addons
{ var wsc = new WebSocketMessageCollector(async function(data, startDate, endDate) {
// record raw data var attachmentName = `${uri} ${room} raw data recording from ${startDate.toISOString()} to ${endDate.toISOString()} .txt.gz`;
let createWSMessageCollector = require("./datacollector") await channel.send(new Discord.MessageAttachment(data, attachmentName));
gClient.on("message", createWSMessageCollector(async function(data, startDate, endDate){ });
var attachmentName = `${uri} ${room} raw data recording from ${startDate.toISOString()} to ${endDate.toISOString()} .txt.gz`; gClient.on("message", wsc.collect.bind(wsc));
await channel.send(new Discord.MessageAttachment(data, attachmentName));
}));
}
return gClient; return gClient;
}; };