Compare commits

..

No commits in common. "f75ad32728f67307f0fd0e2e36648a092e0b824f" and "00c6015148e84c362714793f4f706c5b7bca7f9b" have entirely different histories.

8 changed files with 235 additions and 834 deletions

View File

@ -229,11 +229,14 @@ func (s *Server) BanCommand(words []string, sub *Subscriber) {
log.Info("Operator %s bans %s for %d hours", sub.Username, username, duration/time.Hour)
// Add them to the ban list.
BanUser(username, duration)
other, err := s.GetSubscriber(username)
if err != nil {
sub.ChatServer("/ban: username not found: %s", username)
} else {
// Ban them.
BanUser(username, duration)
// If the target user is currently online, disconnect them and broadcast the ban to everybody.
if other, err := s.GetSubscriber(username); err == nil {
// Broadcast it to everyone.
s.Broadcast(messages.Message{
Action: messages.ActionPresence,
Username: username,
@ -246,9 +249,8 @@ func (s *Server) BanCommand(words []string, sub *Subscriber) {
})
other.authenticated = false
other.Username = ""
sub.ChatServer("%s has been banned from the room for %d hours.", username, duration/time.Hour)
}
sub.ChatServer("%s has been banned from the room for %d hours.", username, duration/time.Hour)
}
// UnbanCommand handles the `/unban` operator command.

View File

@ -83,6 +83,7 @@ func (s *Server) OnLogin(sub *Subscriber, msg messages.Message) {
sub.SendJSON(messages.Message{
Action: messages.ActionKick,
})
s.DeleteSubscriber(sub)
return
}

View File

@ -1,240 +0,0 @@
package barertc
import (
"context"
"encoding/json"
"net/http"
"time"
"git.kirsle.net/apps/barertc/pkg/log"
"git.kirsle.net/apps/barertc/pkg/messages"
"git.kirsle.net/apps/barertc/pkg/util"
"github.com/google/uuid"
)
// Polling user timeout before disconnecting them.
const PollingUserTimeout = time.Minute
// JSON payload structure for polling API.
type PollMessage struct {
// Send the username after authenticated.
Username string `json:"username,omitempty"`
// SessionID for authentication.
SessionID string `json:"session_id,omitempty"`
// BareRTC protocol message.
Message messages.Message `json:"msg"`
}
type PollResponse struct {
// Session ID.
Username string `json:"username,omitempty"`
SessionID string `json:"session_id,omitempty"`
// Pending messages.
Messages []messages.Message `json:"messages"`
}
// Helper method to send an error as a PollResponse.
func PollResponseError(message string) PollResponse {
return PollResponse{
Messages: []messages.Message{
{
Action: messages.ActionError,
Username: "ChatServer",
Message: message,
},
},
}
}
// KickIdlePollUsers is a goroutine that will disconnect polling API users
// who haven't been seen in a while.
func (s *Server) KickIdlePollUsers() {
log.Debug("KickIdlePollUsers goroutine engaged")
for {
time.Sleep(10 * time.Second)
for _, sub := range s.IterSubscribers() {
if sub.usePolling && time.Since(sub.lastPollAt) > PollingUserTimeout {
// Send an exit message.
if sub.authenticated && sub.ChatStatus != "hidden" {
log.Error("KickIdlePollUsers: %s last seen %s ago", sub.Username, sub.lastPollAt)
sub.authenticated = false
s.Broadcast(messages.Message{
Action: messages.ActionPresence,
Username: sub.Username,
Message: "has timed out!",
})
s.SendWhoList()
}
s.DeleteSubscriber(sub)
}
}
}
}
// FlushPollResponse returns a response for the polling API that will flush
// all pending messages sent to the client.
func (sub *Subscriber) FlushPollResponse() PollResponse {
var msgs = []messages.Message{}
// Drain the messages from the outbox channel.
for len(sub.messages) > 0 {
message := <-sub.messages
var msg messages.Message
json.Unmarshal(message, &msg)
msgs = append(msgs, msg)
}
return PollResponse{
Username: sub.Username,
SessionID: sub.sessionID,
Messages: msgs,
}
}
// Functions for the Polling API as an alternative to WebSockets.
func (s *Server) PollingAPI() http.HandlerFunc {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ip := util.IPAddress(r)
// JSON writer for the response.
w.Header().Set("Content-Type", "application/json")
enc := json.NewEncoder(w)
enc.SetIndent("", " ")
// Parse the request.
if r.Method != http.MethodPost {
w.WriteHeader(http.StatusBadRequest)
enc.Encode(PollResponseError("Only POST methods allowed"))
return
} else if r.Header.Get("Content-Type") != "application/json" {
w.WriteHeader(http.StatusBadRequest)
enc.Encode(PollResponseError("Only application/json content-types allowed"))
return
}
defer r.Body.Close()
// Parse the request payload.
var (
params PollMessage
dec = json.NewDecoder(r.Body)
)
if err := dec.Decode(&params); err != nil {
w.WriteHeader(http.StatusBadRequest)
enc.Encode(PollResponseError(err.Error()))
return
}
// Debug logging.
log.Debug("Polling connection from %s - %s", ip, r.Header.Get("User-Agent"))
// Are they resuming an authenticated session?
var sub *Subscriber
if params.Username != "" || params.SessionID != "" {
if params.Username == "" || params.SessionID == "" {
w.WriteHeader(http.StatusBadRequest)
enc.Encode(PollResponseError("Authentication error: SessionID and Username both required."))
return
}
log.Debug("Polling API: check if %s (%s) is authenticated", params.Username, params.SessionID)
// Look up the subscriber.
var (
authOK bool
err error
)
sub, err = s.GetSubscriber(params.Username)
if err == nil {
// Validate the SessionID.
if sub.sessionID == params.SessionID {
authOK = true
}
}
// Authentication error.
if !authOK {
s.DeleteSubscriber(sub)
w.WriteHeader(http.StatusBadRequest)
enc.Encode(PollResponse{
Messages: []messages.Message{
{
Action: messages.ActionError,
Username: "ChatServer",
Message: "Your authentication has expired, please log back into the chat again.",
},
{
Action: messages.ActionKick,
},
},
})
return
}
// Ping their last seen time.
sub.lastPollAt = time.Now()
}
// If they are authenticated, handle this message.
if sub != nil && sub.authenticated {
s.OnClientMessage(sub, params.Message)
// If they use JWT authentication, give them a ping back with an updated
// JWT once in a while. Equivalent to the WebSockets pinger channel.
if time.Since(sub.lastPollJWT) > PingInterval {
sub.lastPollJWT = time.Now()
if sub.JWTClaims != nil {
if jwt, err := sub.JWTClaims.ReSign(); err != nil {
log.Error("ReSign JWT token for %s#%d: %s", sub.Username, sub.ID, err)
} else {
sub.SendJSON(messages.Message{
Action: messages.ActionPing,
JWTToken: jwt,
})
}
}
}
enc.Encode(sub.FlushPollResponse())
return
}
// Not authenticated: the only acceptable message is login.
if params.Message.Action != messages.ActionLogin {
w.WriteHeader(http.StatusBadRequest)
enc.Encode(PollResponseError("Not logged in."))
return
}
// Prepare a Subscriber object for them. Do not add it to the server
// roster unless their login succeeds.
ctx, cancel := context.WithCancel(r.Context())
sub = s.NewPollingSubscriber(ctx, cancel)
// Tentatively add them to the server. If they don't pass authentication,
// remove their subscriber immediately. Note: they need added here so they
// will receive their own "has entered the room" and WhoList updates.
s.AddSubscriber(sub)
s.OnLogin(sub, params.Message)
// Are they authenticated?
if sub.authenticated {
// Generate a SessionID number.
sessionID := uuid.New().String()
sub.sessionID = sessionID
log.Debug("Polling API: new user authenticated in: %s (sid %s)", sub.Username, sub.sessionID)
} else {
s.DeleteSubscriber(sub)
}
enc.Encode(sub.FlushPollResponse())
})
}

View File

@ -38,7 +38,6 @@ func (s *Server) Setup() error {
mux.Handle("/about", AboutPage())
mux.Handle("/logout", LogoutPage())
mux.Handle("/ws", s.WebSocket())
mux.Handle("/poll", s.PollingAPI())
mux.Handle("/api/statistics", s.Statistics())
mux.Handle("/api/blocklist", s.BlockList())
mux.Handle("/api/block/now", s.BlockNow())
@ -55,7 +54,5 @@ func (s *Server) Setup() error {
// ListenAndServe starts the web server.
func (s *Server) ListenAndServe(address string) error {
// Run the polling user idle kicker.
go s.KickIdlePollUsers()
return http.ListenAndServe(address, s.mux)
}

View File

@ -31,19 +31,11 @@ type Subscriber struct {
JWTClaims *jwt.Claims
authenticated bool // has passed the login step
loginAt time.Time
// Connection details (WebSocket).
conn *websocket.Conn // WebSocket user
ctx context.Context
cancel context.CancelFunc
messages chan []byte
closeSlow func()
// Polling API users.
usePolling bool
sessionID string
lastPollAt time.Time
lastPollJWT time.Time // give a new JWT once in a while
conn *websocket.Conn
ctx context.Context
cancel context.CancelFunc
messages chan []byte
closeSlow func()
muteMu sync.RWMutex
booted map[string]struct{} // usernames booted off your camera
@ -59,100 +51,6 @@ type Subscriber struct {
logfh map[string]io.WriteCloser
}
// NewSubscriber initializes a connected chat user.
func (s *Server) NewSubscriber(ctx context.Context, cancelFunc func()) *Subscriber {
return &Subscriber{
ctx: ctx,
cancel: cancelFunc,
messages: make(chan []byte, s.subscriberMessageBuffer),
booted: make(map[string]struct{}),
muted: make(map[string]struct{}),
blocked: make(map[string]struct{}),
messageIDs: make(map[int64]struct{}),
ChatStatus: "online",
}
}
// NewWebSocketSubscriber returns a new subscriber with a WebSocket connection.
func (s *Server) NewWebSocketSubscriber(ctx context.Context, conn *websocket.Conn, cancelFunc func()) *Subscriber {
sub := s.NewSubscriber(ctx, cancelFunc)
sub.conn = conn
sub.closeSlow = func() {
conn.Close(websocket.StatusPolicyViolation, "connection too slow to keep up with messages")
}
return sub
}
// NewPollingSubscriber returns a new subscriber using the polling API.
func (s *Server) NewPollingSubscriber(ctx context.Context, cancelFunc func()) *Subscriber {
sub := s.NewSubscriber(ctx, cancelFunc)
sub.usePolling = true
sub.lastPollAt = time.Now()
sub.lastPollJWT = time.Now()
sub.closeSlow = func() {
// Their outbox is filled up, disconnect them.
log.Error("Polling subscriber %s#%d: inbox is filled up!", sub.Username, sub.ID)
// Send an exit message.
if sub.authenticated && sub.ChatStatus != "hidden" {
sub.authenticated = false
s.Broadcast(messages.Message{
Action: messages.ActionPresence,
Username: sub.Username,
Message: "has exited the room!",
})
s.SendWhoList()
}
s.DeleteSubscriber(sub)
}
return sub
}
// OnClientMessage handles a chat protocol message from the user's WebSocket or polling API.
func (s *Server) OnClientMessage(sub *Subscriber, msg messages.Message) {
// What action are they performing?
switch msg.Action {
case messages.ActionLogin:
s.OnLogin(sub, msg)
case messages.ActionMessage:
s.OnMessage(sub, msg)
case messages.ActionFile:
s.OnFile(sub, msg)
case messages.ActionMe:
s.OnMe(sub, msg)
case messages.ActionOpen:
s.OnOpen(sub, msg)
case messages.ActionBoot:
s.OnBoot(sub, msg, true)
case messages.ActionUnboot:
s.OnBoot(sub, msg, false)
case messages.ActionMute, messages.ActionUnmute:
s.OnMute(sub, msg, msg.Action == messages.ActionMute)
case messages.ActionBlock:
s.OnBlock(sub, msg)
case messages.ActionBlocklist:
s.OnBlocklist(sub, msg)
case messages.ActionCandidate:
s.OnCandidate(sub, msg)
case messages.ActionSDP:
s.OnSDP(sub, msg)
case messages.ActionWatch:
s.OnWatch(sub, msg)
case messages.ActionUnwatch:
s.OnUnwatch(sub, msg)
case messages.ActionTakeback:
s.OnTakeback(sub, msg)
case messages.ActionReact:
s.OnReact(sub, msg)
case messages.ActionReport:
s.OnReport(sub, msg)
case messages.ActionPing:
default:
sub.ChatServer("Unsupported message type.")
}
}
// ReadLoop spawns a goroutine that reads from the websocket connection.
func (sub *Subscriber) ReadLoop(s *Server) {
go func() {
@ -190,8 +88,45 @@ func (sub *Subscriber) ReadLoop(s *Server) {
log.Debug("Read(%d=%s): %s", sub.ID, sub.Username, data)
}
// Handle their message.
s.OnClientMessage(sub, msg)
// What action are they performing?
switch msg.Action {
case messages.ActionLogin:
s.OnLogin(sub, msg)
case messages.ActionMessage:
s.OnMessage(sub, msg)
case messages.ActionFile:
s.OnFile(sub, msg)
case messages.ActionMe:
s.OnMe(sub, msg)
case messages.ActionOpen:
s.OnOpen(sub, msg)
case messages.ActionBoot:
s.OnBoot(sub, msg, true)
case messages.ActionUnboot:
s.OnBoot(sub, msg, false)
case messages.ActionMute, messages.ActionUnmute:
s.OnMute(sub, msg, msg.Action == messages.ActionMute)
case messages.ActionBlock:
s.OnBlock(sub, msg)
case messages.ActionBlocklist:
s.OnBlocklist(sub, msg)
case messages.ActionCandidate:
s.OnCandidate(sub, msg)
case messages.ActionSDP:
s.OnSDP(sub, msg)
case messages.ActionWatch:
s.OnWatch(sub, msg)
case messages.ActionUnwatch:
s.OnUnwatch(sub, msg)
case messages.ActionTakeback:
s.OnTakeback(sub, msg)
case messages.ActionReact:
s.OnReact(sub, msg)
case messages.ActionReport:
s.OnReport(sub, msg)
default:
sub.ChatServer("Unsupported message type.")
}
}
}()
}
@ -267,7 +202,20 @@ func (s *Server) WebSocket() http.HandlerFunc {
// ctx := c.CloseRead(r.Context())
ctx, cancel := context.WithCancel(r.Context())
sub := s.NewWebSocketSubscriber(ctx, c, cancel)
sub := &Subscriber{
conn: c,
ctx: ctx,
cancel: cancel,
messages: make(chan []byte, s.subscriberMessageBuffer),
closeSlow: func() {
c.Close(websocket.StatusPolicyViolation, "connection too slow to keep up with messages")
},
booted: make(map[string]struct{}),
muted: make(map[string]struct{}),
blocked: make(map[string]struct{}),
messageIDs: make(map[int64]struct{}),
ChatStatus: "online",
}
s.AddSubscriber(sub)
defer s.DeleteSubscriber(sub)
@ -332,10 +280,6 @@ func (s *Server) GetSubscriber(username string) (*Subscriber, error) {
// DeleteSubscriber removes a subscriber from the server.
func (s *Server) DeleteSubscriber(sub *Subscriber) {
if sub == nil {
return
}
log.Error("DeleteSubscriber: %s", sub.Username)
// Cancel its context to clean up the for-loop goroutine.

View File

@ -13,7 +13,6 @@ import WhoListRow from './components/WhoListRow.vue';
import VideoFeed from './components/VideoFeed.vue';
import ProfileModal from './components/ProfileModal.vue';
import ChatClient from './lib/ChatClient';
import LocalStorage from './lib/LocalStorage';
import VideoFlag from './lib/VideoFlag';
import { SoundEffects, DefaultSounds } from './lib/sounds';
@ -130,8 +129,10 @@ export default {
idleThreshold: 300, // number of seconds you must be idle
// WebSocket connection.
// Initialized in the dial() function.
client: {},
ws: {
conn: null,
connected: false,
},
// Who List for the room.
whoList: [],
@ -143,7 +144,6 @@ export default {
// Misc. user preferences (TODO: move all of them here)
prefs: {
usePolling: false, // use the polling API instead of WebSockets.
joinMessages: true, // show "has entered the room" in public channels
exitMessages: false, // hide exit messages by default in public channels
watchNotif: true, // notify in chat about cameras being watched
@ -462,12 +462,6 @@ export default {
"prefs.muteSounds": function () {
LocalStorage.set('muteSounds', this.prefs.muteSounds);
},
"prefs.usePolling": function () {
LocalStorage.set('usePolling', this.prefs.usePolling);
// Reset the chat client on change.
this.resetChatClient();
},
"prefs.closeDMs": function () {
LocalStorage.set('closeDMs', this.prefs.closeDMs);
@ -476,12 +470,6 @@ export default {
},
},
computed: {
connected() {
if (this.client.connected != undefined) {
return this.client.connected();
}
return false;
},
chatHistory() {
if (this.channels[this.channel] == undefined) {
return [];
@ -785,9 +773,6 @@ export default {
}
// Misc preferences
if (settings.usePolling != undefined) {
this.prefs.usePolling = settings.usePolling === true;
}
if (settings.joinMessages != undefined) {
this.prefs.joinMessages = settings.joinMessages === true;
}
@ -843,7 +828,7 @@ export default {
return;
}
if (!this.connected) {
if (!this.ws.connected) {
this.ChatClient("You are not connected to the server.");
return;
}
@ -857,12 +842,12 @@ export default {
// If they do it twice, kick them from the room.
if (this.spamWarningCount >= 1) {
// Walk of shame.
this.client.send({
this.ws.conn.send(JSON.stringify({
action: "message",
channel: "lobby",
message: "**(Message of Shame)** I have been naughty and posted spam in chat despite being warned, " +
"and I am now being kicked from the room in shame. ☹️",
});
}));
this.ChatServer(
"It is <strong>not allowed</strong> to promote your Onlyfans (or similar) " +
@ -876,9 +861,9 @@ export default {
action: "presence",
});
this.disconnect = true;
this.client.ws.connected = false;
this.ws.connected = false;
setTimeout(() => {
this.client.disconnect();
this.ws.conn.close();
}, 1000);
return;
}
@ -937,11 +922,11 @@ export default {
}
// console.debug("Send message: %s", this.message);
this.client.send({
this.ws.conn.send(JSON.stringify({
action: "message",
channel: this.channel,
message: this.message,
});
}));
this.message = "";
},
@ -952,11 +937,11 @@ export default {
// Emoji reactions
sendReact(message, emoji) {
this.client.send({
this.ws.conn.send(JSON.stringify({
action: 'react',
msgID: message.msgID,
message: emoji,
});
}));
},
onReact(msg) {
// Search all channels for this message ID and append the reaction.
@ -995,13 +980,13 @@ export default {
// Sync the current user state (such as video broadcasting status) to
// the backend, which will reload everybody's Who List.
sendMe() {
if (!this.connected) return;
this.client.send({
if (!this.ws.connected) return;
this.ws.conn.send(JSON.stringify({
action: "me",
video: this.myVideoFlag,
status: this.status,
dnd: this.prefs.closeDMs,
});
}));
},
onMe(msg) {
// We have had settings pushed to us by the server, such as a change
@ -1160,10 +1145,10 @@ export default {
}
},
sendMute(username, mute) {
this.client.send({
this.ws.conn.send(JSON.stringify({
action: mute ? "mute" : "unmute",
username: username,
});
}));
},
isMutedUser(username) {
return this.muted[this.normalizeUsername(username)] != undefined;
@ -1184,30 +1169,30 @@ export default {
}
// Send the username list to the server.
this.client.send({
this.ws.conn.send(JSON.stringify({
action: "blocklist",
usernames: blocklist,
});
}))
},
// Send a video request to access a user's camera.
sendOpen(username) {
this.client.send({
this.ws.conn.send(JSON.stringify({
action: "open",
username: username,
});
}));
},
sendBoot(username) {
this.client.send({
this.ws.conn.send(JSON.stringify({
action: "boot",
username: username,
});
}));
},
sendUnboot(username) {
this.client.send({
this.ws.conn.send(JSON.stringify({
action: "unboot",
username: username,
});
}));
},
onOpen(msg) {
// Response for the opener to begin WebRTC connection.
@ -1277,22 +1262,12 @@ export default {
isJoin = true;
}
// Push it to the history of the public channels (respecting user preferences).
// Push it to the history of all public channels (depending on user preference).
if ((isJoin && this.prefs.joinMessages) || (isLeave && this.prefs.exitMessages)
|| (!isJoin && !isLeave)) {
// Always put them in the first public channel.
let channel = this.config.channels[0];
this.pushHistory({
channel: channel.ID,
action: msg.action,
username: msg.username,
message: msg.message,
});
// If the current user is focused on another public channel, also post it there.
if (!this.isDM && this.channel !== channel.ID) {
for (let channel of this.config.channels) {
this.pushHistory({
channel: this.channel.ID,
channel: channel.ID,
action: msg.action,
username: msg.username,
message: msg.message,
@ -1314,55 +1289,141 @@ export default {
// Dial the WebSocket connection.
dial() {
// Set up the ChatClient connection.
this.client = new ChatClient({
usePolling: this.prefs.usePolling,
onClientError: this.ChatClient,
this.ChatClient("Establishing connection to server...");
username: this.username,
jwt: this.jwt,
prefs: this.prefs,
const proto = location.protocol === 'https:' ? 'wss' : 'ws';
const conn = new WebSocket(`${proto}://${location.host}/ws`);
onWho: this.onWho,
onMe: this.onMe,
onMessage: this.onMessage,
onTakeback: this.onTakeback,
onReact: this.onReact,
onPresence: this.onPresence,
onRing: this.onRing,
onOpen: this.onOpen,
onCandidate: this.onCandidate,
onSDP: this.onSDP,
onWatch: this.onWatch,
onUnwatch: this.onUnwatch,
onBlock: this.onBlock,
conn.addEventListener("close", ev => {
// Lost connection to server - scrub who list.
this.onWho({ whoList: [] });
this.muted = {};
bulkMuteUsers: this.bulkMuteUsers,
focusMessageBox: () => {
this.ws.connected = false;
this.ChatClient(`WebSocket Disconnected code: ${ev.code}, reason: ${ev.reason}`);
this.disconnectCount++;
if (this.disconnectCount > this.disconnectLimit) {
this.ChatClient(`It seems there's a problem connecting to the server. Please try some other time.`);
return;
}
if (!this.disconnect) {
if (ev.code !== 1001 && ev.code !== 1000) {
this.ChatClient("Reconnecting in 5s");
setTimeout(this.dial, 5000);
}
}
});
conn.addEventListener("open", ev => {
this.ws.connected = true;
this.ChatClient("Websocket connected!");
// Upload our blocklist to the server before login. This resolves a bug where if a block
// was added recently (other user still online in chat), that user would briefly see your
// "has entered the room" message followed by you immediately not being online.
this.bulkMuteUsers();
// Tell the server our username.
this.ws.conn.send(JSON.stringify({
action: "login",
username: this.username,
jwt: this.jwt.token,
dnd: this.prefs.closeDMs,
}));
// Focus the message entry box.
window.requestAnimationFrame(() => {
this.messageBox.focus();
},
pushHistory: this.pushHistory,
onNewJWT: jwt => {
this.jwt.token = jwt;
},
});
});
this.client.dial();
},
resetChatClient() {
if (!this.connected) return;
conn.addEventListener("message", ev => {
if (typeof ev.data !== "string") {
console.error("unexpected message type", typeof ev.data);
return;
}
// Reset the ChatClient, e.g. when toggling between WebSocket vs. Polling methods.
this.ChatClient(
"Your connection method to the chat server has been updated; attempting to reconnect now.",
);
let msg = JSON.parse(ev.data);
try {
// Cast timestamp to date.
msg.at = new Date(msg.at);
} catch (e) {
console.error("Parsing timestamp '%s' on msg: %s", msg.at, e);
}
window.requestAnimationFrame(() => {
this.client.disconnect();
setTimeout(() => {
this.dial();
}, 1000);
switch (msg.action) {
case "who":
this.onWho(msg);
break;
case "me":
this.onMe(msg);
break;
case "message":
this.onMessage(msg);
break;
case "takeback":
this.onTakeback(msg);
break;
case "react":
this.onReact(msg);
break;
case "presence":
this.onPresence(msg);
break;
case "ring":
this.onRing(msg);
break;
case "open":
this.onOpen(msg);
break;
case "candidate":
this.onCandidate(msg);
break;
case "sdp":
this.onSDP(msg);
break;
case "watch":
this.onWatch(msg);
break;
case "unwatch":
this.onUnwatch(msg);
break;
case "block":
this.onBlock(msg);
break;
case "error":
this.pushHistory({
channel: msg.channel,
username: msg.username || 'Internal Server Error',
message: msg.message,
isChatServer: true,
});
break;
case "disconnect":
this.onWho({ whoList: [] });
this.disconnect = true;
this.ws.connected = false;
this.ws.conn.close(1000, "server asked to close the connection");
break;
case "ping":
// New JWT token?
if (msg.jwt) {
this.jwt.token = msg.jwt;
}
// Reset disconnect retry counter: if we were on long enough to get
// a ping, we're well connected and can reconnect no matter how many
// times the chat server is rebooted.
this.disconnectCount = 0;
break;
default:
console.error("Unexpected action: %s", JSON.stringify(msg));
}
});
this.ws.conn = conn;
},
/**
@ -1401,11 +1462,11 @@ export default {
// message to the other peer through the signaling server.
pc.onicecandidate = event => {
if (event.candidate) {
this.client.send({
this.ws.conn.send(JSON.stringify({
action: "candidate",
username: username,
candidate: JSON.stringify(event.candidate),
});
}));
}
};
@ -1526,11 +1587,11 @@ export default {
localDescCreated(pc, username) {
return (desc) => {
pc.setLocalDescription(desc).then(() => {
this.client.send({
this.ws.conn.send(JSON.stringify({
action: "sdp",
username: username,
description: JSON.stringify(pc.localDescription),
});
}));
}).catch(e => {
console.error(`Error sending WebRTC negotiation message (SDP): ${e}`);
});
@ -1598,10 +1659,10 @@ export default {
},
sendWatch(username, watching) {
// Send the watch or unwatch message to backend.
this.client.send({
this.ws.conn.send(JSON.stringify({
action: watching ? "watch" : "unwatch",
username: username,
});
}));
},
isWatchingMe(username) {
// Return whether the user is watching your camera
@ -1764,10 +1825,10 @@ export default {
"Do you want to take this message back? Doing so will remove this message from everybody's view in the chat room."
)) return;
this.client.send({
this.ws.conn.send(JSON.stringify({
action: "takeback",
msgID: msg.msgID,
});
}));
},
removeMessage(msg) {
if (!window.confirm(
@ -2802,9 +2863,10 @@ export default {
// Attach the file to the message.
msg.message = file.name;
msg.bytes = fileByteArray;
msg = JSON.stringify(msg);
// Send it to the chat server.
this.client.send(msg);
this.ws.conn.send(msg);
};
reader.readAsArrayBuffer(file);
@ -3004,7 +3066,7 @@ export default {
let msg = this.reportModal.message;
this.client.send({
this.ws.conn.send(JSON.stringify({
action: "report",
channel: msg.channel,
username: msg.username,
@ -3012,7 +3074,7 @@ export default {
reason: classification,
message: msg.message,
comment: comment,
});
}));
this.reportModal.busy = false;
this.reportModal.visible = false;
@ -3408,31 +3470,6 @@ export default {
</p>
</div>
<div class="field">
<label class="label mb-0">
Server Connection Method
</label>
<label class="checkbox">
<input type="radio" v-model="prefs.usePolling" :value="false">
WebSockets (realtime connection; recommended for most people)
</label>
<label class="checkbox">
<input type="radio" v-model="prefs.usePolling" :value="true">
Polling (check for new messages every 5 seconds)
</label>
<p class="help">
By default the chat server requires a constant WebSockets connection to stay online.
If you are experiencing frequent disconnects (e.g. because you are on a slow or
unstable network connection), try switching to the "Polling" method which will be
more robust, at the cost of up to 5-seconds latency to receive new messages.
<!-- If disconnected currently, tell them to refresh. -->
<span v-if="!connected" class="has-text-danger">
Notice: you may need to refresh the chat page after changing this setting.
</span>
</p>
</div>
</div>
</div>
@ -3955,7 +3992,7 @@ export default {
<!-- My text box -->
<input type="text" class="input" id="messageBox" v-model="message"
placeholder="Write a message" @keydown="sendTypingNotification()" autocomplete="off"
:disabled="!client.connected">
:disabled="!ws.connected">
<!-- At Mention templates-->
<template #no-result>

View File

@ -1,339 +0,0 @@
// WebSocket chat client handler.
class ChatClient {
/**
* Constructor for the client.
*
* @param usePolling: instead of WebSocket use the ajax polling API.
* @param onClientError: function to receive 'ChatClient' messages to
* add to the chat room (this.ChatClient())
*/
constructor({
usePolling=false,
onClientError,
username,
jwt, // JWT token for authorization
prefs, // User preferences for 'me' action (close DMs, etc)
// Chat Protocol handler functions for the caller.
onWho,
onMe,
onMessage,
onTakeback,
onReact,
onPresence,
onRing,
onOpen,
onCandidate,
onSDP,
onWatch,
onUnwatch,
onBlock,
// Misc function registrations for callback.
onNewJWT, // new JWT token from ping response
bulkMuteUsers, // Upload our blocklist on connect.
focusMessageBox, // Tell caller to focus the message entry box.
pushHistory,
}) {
this.usePolling = usePolling;
// Pointer to the 'ChatClient(message)' command from the main app.
this.ChatClient = onClientError;
this.username = username;
this.jwt = jwt;
this.prefs = prefs;
// Register the handler functions.
this.onWho = onWho;
this.onMe = onMe;
this.onMessage = onMessage;
this.onTakeback = onTakeback;
this.onReact = onReact;
this.onPresence = onPresence;
this.onRing = onRing;
this.onOpen = onOpen;
this.onCandidate = onCandidate;
this.onSDP = onSDP;
this.onWatch = onWatch;
this.onUnwatch = onUnwatch;
this.onBlock = onBlock;
this.onNewJWT = onNewJWT;
this.bulkMuteUsers = bulkMuteUsers;
this.focusMessageBox = focusMessageBox;
this.pushHistory = pushHistory;
// WebSocket connection.
this.ws = {
conn: null,
connected: false,
// Disconnect spamming: don't retry too many times.
reconnect: true, // unless told to go away
disconnectLimit: 2,
disconnectCount: 0,
};
// Polling connection.
this.polling = {
username: "",
sessionID: "",
timeout: null, // setTimeout for next poll.
}
}
// Connected polls if the client is connected.
connected() {
if (this.usePolling) {
return this.polling.timeout != null && this.polling.sessionID != "";
}
return this.ws.connected;
}
// Disconnect from the server.
disconnect() {
if (this.usePolling) {
this.polling.sessionID = "";
this.polling.username = "";
this.stopPolling();
this.ChatClient("You have disconnected from the server.");
return;
}
this.ws.connected = false;
this.ws.conn.close(1000, "server asked to close the connection");
}
// Common function to send a message to the server. The message
// is a JSON object before stringify.
send(message) {
if (this.usePolling) {
fetch("/poll", {
method: "POST",
mode: "same-origin",
cache: "no-cache",
credentials: "same-origin",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({
username: this.polling.username,
session_id: this.polling.sessionID,
msg: message,
})
}).then(resp => resp.json()).then(resp => {
console.log(resp);
// Store sessionID information.
this.polling.sessionID = resp.session_id;
this.polling.username = resp.username;
for (let msg of resp.messages) {
this.handle(msg);
}
}).catch(err => {
this.ChatClient("Error from polling API: " + err);
});
return;
}
if (!this.ws.connected) {
this.ChatClient("Couldn't send WebSocket message: not connected.");
return;
}
if (typeof(message) !== "string") {
message = JSON.stringify(message);
}
this.ws.conn.send(message);
}
// Common function to handle a message from the server.
handle(msg) {
switch (msg.action) {
case "who":
this.onWho(msg);
break;
case "me":
this.onMe(msg);
break;
case "message":
this.onMessage(msg);
break;
case "takeback":
this.onTakeback(msg);
break;
case "react":
this.onReact(msg);
break;
case "presence":
this.onPresence(msg);
break;
case "ring":
this.onRing(msg);
break;
case "open":
this.onOpen(msg);
break;
case "candidate":
this.onCandidate(msg);
break;
case "sdp":
this.onSDP(msg);
break;
case "watch":
this.onWatch(msg);
break;
case "unwatch":
this.onUnwatch(msg);
break;
case "block":
this.onBlock(msg);
break;
case "error":
this.pushHistory({
channel: msg.channel,
username: msg.username || 'Internal Server Error',
message: msg.message,
isChatServer: true,
});
break;
case "disconnect":
this.onWho({ whoList: [] });
this.ws.reconnect = false;
this.disconnect();
break;
case "ping":
// New JWT token?
if (msg.jwt) {
this.onNewJWT(msg.jwt);
}
// Reset disconnect retry counter: if we were on long enough to get
// a ping, we're well connected and can reconnect no matter how many
// times the chat server is rebooted.
this.ws.disconnectCount = 0;
break;
default:
console.error("Unexpected action: %s", JSON.stringify(msg));
}
}
// Dial the WebSocket.
dial() {
// Polling API?
if (this.usePolling) {
this.ChatClient("Connecting to the server via polling API...");
this.startPolling();
// Log in now.
this.send({
action: "login",
username: this.username,
jwt: this.jwt.token,
dnd: this.prefs.closeDMs,
});
return;
}
this.ChatClient("Establishing connection to server...");
const proto = location.protocol === 'https:' ? 'wss' : 'ws';
const conn = new WebSocket(`${proto}://${location.host}/ws`);
conn.addEventListener("close", ev => {
// Lost connection to server - scrub who list.
this.onWho({ whoList: [] });
this.ws.connected = false;
this.ChatClient(`WebSocket Disconnected code: ${ev.code}, reason: ${ev.reason}`);
this.ws.disconnectCount++;
if (this.ws.disconnectCount > this.ws.disconnectLimit) {
this.ChatClient(
`It seems there's a problem connecting to the server. Please try some other time.<br><br>` +
`If you experience this problem frequently, try going into the Chat Settings 'Misc' tab ` +
`and switch to the 'Polling' Server Connection method.`
);
return;
}
if (this.ws.reconnect) {
if (ev.code !== 1001 && ev.code !== 1000) {
this.ChatClient("Reconnecting in 5s");
setTimeout(() => {
this.dial();
}, 5000);
}
}
});
conn.addEventListener("open", ev => {
this.ws.connected = true;
this.ChatClient("Websocket connected!");
// Upload our blocklist to the server before login. This resolves a bug where if a block
// was added recently (other user still online in chat), that user would briefly see your
// "has entered the room" message followed by you immediately not being online.
this.bulkMuteUsers();
// Tell the server our username.
this.send({
action: "login",
username: this.username,
jwt: this.jwt.token,
dnd: this.prefs.closeDMs,
});
// Focus the message entry box.
window.requestAnimationFrame(() => {
this.focusMessageBox();
});
});
conn.addEventListener("message", ev => {
if (typeof ev.data !== "string") {
console.error("unexpected message type", typeof ev.data);
return;
}
let msg = JSON.parse(ev.data);
this.handle(msg);
});
this.ws.conn = conn;
}
// Start the polling interval.
startPolling() {
if (!this.usePolling) return;
this.stopPolling();
this.polling.timeout = setTimeout(() => {
this.poll();
this.startPolling();
}, 5000);
}
// Poll the API.
poll() {
if (!this.usePolling) {
this.stopPolling();
return;
}
this.send({
action: "ping",
});
this.startPolling();
}
// Stop polling.
stopPolling() {
if (this.polling.timeout != null) {
clearTimeout(this.polling.timeout);
}
}
}
export default ChatClient;

View File

@ -18,7 +18,6 @@ const keys = {
'rememberExpresslyClosed': Boolean,
// Booleans
'usePolling': Boolean, // use the polling API instead of WebSocket
'joinMessages': Boolean,
'exitMessages': Boolean,
'watchNotif': Boolean,