127.0.0.1:8000 watch-together / master server / websocket / index.js
master

Tree @master (Download .tar.gz)

index.js @masterraw · history · blame

const utils = require('../utils');
const ws_utils = require('./utils');
const watchrooms = require('../utils/watchrooms');
const state = require('./state');
const onmessage = require('./onmessage');
const broadcast = require('./broadcast');
const http = require('http');
/* jshint -W079 */
const WebSocket = require('ws');
/* jshint +W079 */

const wss = new WebSocket.Server({
    clientTracking: false,
    noServer: true
});

function initWS(user_id, ws) {
    state.servers[user_id] = ws;

    ws.onmessage = function(message) {
        // ws_utils.log.v(`Received message from user ${user_id}`);
        onmessage.process(ws, user_id, JSON.parse(message.data));
    };

    ws.onclose = ws.onerror = function() {
        if (state.servers[user_id]) {
            ws_utils.log.general.info("session closed for", user_id);
            delete state.servers[user_id];
            onmessage.handlers["stop-watching"](user_id);
        }
    };

    for (const [video_id, watchroom] of Object.entries(watchrooms.get_all())) {
        broadcast({
            command: "announce",
            data: {
                target: "all",
                type: "watchroom-open",
                video_id: video_id,
                host: watchroom.viewers[watchroom.host]
            }
        }, {
            [user_id]: true
        });
    }

    ws_utils.broadcast_storage({
        [user_id]: true
    });

    utils.query("SELECT video_id, name, status, expires FROM videos WHERE ?", {
        created_by: user_id
    }).then(function(videos) {
        broadcast({
            command: "init-library",
            data: videos
        }, {
            [user_id]: true
        });

        return utils.query(
            "SELECT * FROM user_prefs WHERE user_id=?",
            [user_id]
        );
    }).then(function([prefs]) {
        broadcast({
            command: "set-user-prefs",
            data: prefs
        }, {
            [user_id]: true
        });

        return utils.query(
            "SELECT display_name FROM users WHERE user_id=?",
            [user_id]
        );
    }).then(function([user]) {
        broadcast({
            command: "announce-message",
            data: `${user.display_name} Has connected!`
        });
    }).catch(utils.handle_err.sql(ws_utils.log, true));
}

module.exports = function(app, authenticate) {
    const server = http.createServer(app);
    server.on('upgrade', function(req, socket, head) {
        ws_utils.log.v('Parsing session from request...');

        authenticate(req, function(user_id) {
            if (user_id) {
                ws_utils.log.v('Session is parsed!', user_id);

                if (state.servers[user_id]) {
                    state.servers[user_id].onclose = function() {
                        wss.handleUpgrade(req, socket, head, function(ws) {
                            initWS(user_id, ws);
                        });
                    };
                    state.servers[user_id].close(4030);
                } else {
                    wss.handleUpgrade(req, socket, head, function(ws) {
                        initWS(user_id, ws);
                    });
                }
            } else {
                ws_utils.log.v("Could not restore session");
                socket.destroy();
            }
        });
    });

    return server;
};