Compare commits

...

4 Commits

Author SHA1 Message Date
f75ad32728 Ban command update, join/leave messages
* The /ban command doesn't require the target user to be online at the
  time of the ban.
* Update the presence messages so they will generally only go to the
  primary (first) public channel, and also to another public channel if
  the user is currently looking at one of the others.
2023-12-16 15:10:48 -08:00
264b8f2a46 Cleanup debug log 2023-12-10 18:44:18 -08:00
0e0aac991d Polling API for the chat room 2023-12-10 18:43:18 -08:00
d57d41ea3a Abstract WebSocket client into library 2023-12-10 16:09:00 -08:00
8 changed files with 834 additions and 235 deletions

View File

@ -229,14 +229,11 @@ func (s *Server) BanCommand(words []string, sub *Subscriber) {
log.Info("Operator %s bans %s for %d hours", sub.Username, username, duration/time.Hour)
other, err := s.GetSubscriber(username)
if err != nil {
sub.ChatServer("/ban: username not found: %s", username)
} else {
// Ban them.
BanUser(username, duration)
// Add them to the ban list.
BanUser(username, duration)
// Broadcast it to everyone.
// If the target user is currently online, disconnect them and broadcast the ban to everybody.
if other, err := s.GetSubscriber(username); err == nil {
s.Broadcast(messages.Message{
Action: messages.ActionPresence,
Username: username,
@ -249,8 +246,9 @@ func (s *Server) BanCommand(words []string, sub *Subscriber) {
})
other.authenticated = false
other.Username = ""
sub.ChatServer("%s has been banned from the room for %d hours.", username, duration/time.Hour)
}
sub.ChatServer("%s has been banned from the room for %d hours.", username, duration/time.Hour)
}
// UnbanCommand handles the `/unban` operator command.

View File

@ -83,7 +83,6 @@ func (s *Server) OnLogin(sub *Subscriber, msg messages.Message) {
sub.SendJSON(messages.Message{
Action: messages.ActionKick,
})
s.DeleteSubscriber(sub)
return
}

240
pkg/polling_api.go Normal file
View File

@ -0,0 +1,240 @@
package barertc
import (
"context"
"encoding/json"
"net/http"
"time"
"git.kirsle.net/apps/barertc/pkg/log"
"git.kirsle.net/apps/barertc/pkg/messages"
"git.kirsle.net/apps/barertc/pkg/util"
"github.com/google/uuid"
)
// Polling user timeout before disconnecting them.
const PollingUserTimeout = time.Minute
// JSON payload structure for polling API.
type PollMessage struct {
// Send the username after authenticated.
Username string `json:"username,omitempty"`
// SessionID for authentication.
SessionID string `json:"session_id,omitempty"`
// BareRTC protocol message.
Message messages.Message `json:"msg"`
}
type PollResponse struct {
// Session ID.
Username string `json:"username,omitempty"`
SessionID string `json:"session_id,omitempty"`
// Pending messages.
Messages []messages.Message `json:"messages"`
}
// Helper method to send an error as a PollResponse.
func PollResponseError(message string) PollResponse {
return PollResponse{
Messages: []messages.Message{
{
Action: messages.ActionError,
Username: "ChatServer",
Message: message,
},
},
}
}
// KickIdlePollUsers is a goroutine that will disconnect polling API users
// who haven't been seen in a while.
func (s *Server) KickIdlePollUsers() {
log.Debug("KickIdlePollUsers goroutine engaged")
for {
time.Sleep(10 * time.Second)
for _, sub := range s.IterSubscribers() {
if sub.usePolling && time.Since(sub.lastPollAt) > PollingUserTimeout {
// Send an exit message.
if sub.authenticated && sub.ChatStatus != "hidden" {
log.Error("KickIdlePollUsers: %s last seen %s ago", sub.Username, sub.lastPollAt)
sub.authenticated = false
s.Broadcast(messages.Message{
Action: messages.ActionPresence,
Username: sub.Username,
Message: "has timed out!",
})
s.SendWhoList()
}
s.DeleteSubscriber(sub)
}
}
}
}
// FlushPollResponse returns a response for the polling API that will flush
// all pending messages sent to the client.
func (sub *Subscriber) FlushPollResponse() PollResponse {
var msgs = []messages.Message{}
// Drain the messages from the outbox channel.
for len(sub.messages) > 0 {
message := <-sub.messages
var msg messages.Message
json.Unmarshal(message, &msg)
msgs = append(msgs, msg)
}
return PollResponse{
Username: sub.Username,
SessionID: sub.sessionID,
Messages: msgs,
}
}
// Functions for the Polling API as an alternative to WebSockets.
func (s *Server) PollingAPI() http.HandlerFunc {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ip := util.IPAddress(r)
// JSON writer for the response.
w.Header().Set("Content-Type", "application/json")
enc := json.NewEncoder(w)
enc.SetIndent("", " ")
// Parse the request.
if r.Method != http.MethodPost {
w.WriteHeader(http.StatusBadRequest)
enc.Encode(PollResponseError("Only POST methods allowed"))
return
} else if r.Header.Get("Content-Type") != "application/json" {
w.WriteHeader(http.StatusBadRequest)
enc.Encode(PollResponseError("Only application/json content-types allowed"))
return
}
defer r.Body.Close()
// Parse the request payload.
var (
params PollMessage
dec = json.NewDecoder(r.Body)
)
if err := dec.Decode(&params); err != nil {
w.WriteHeader(http.StatusBadRequest)
enc.Encode(PollResponseError(err.Error()))
return
}
// Debug logging.
log.Debug("Polling connection from %s - %s", ip, r.Header.Get("User-Agent"))
// Are they resuming an authenticated session?
var sub *Subscriber
if params.Username != "" || params.SessionID != "" {
if params.Username == "" || params.SessionID == "" {
w.WriteHeader(http.StatusBadRequest)
enc.Encode(PollResponseError("Authentication error: SessionID and Username both required."))
return
}
log.Debug("Polling API: check if %s (%s) is authenticated", params.Username, params.SessionID)
// Look up the subscriber.
var (
authOK bool
err error
)
sub, err = s.GetSubscriber(params.Username)
if err == nil {
// Validate the SessionID.
if sub.sessionID == params.SessionID {
authOK = true
}
}
// Authentication error.
if !authOK {
s.DeleteSubscriber(sub)
w.WriteHeader(http.StatusBadRequest)
enc.Encode(PollResponse{
Messages: []messages.Message{
{
Action: messages.ActionError,
Username: "ChatServer",
Message: "Your authentication has expired, please log back into the chat again.",
},
{
Action: messages.ActionKick,
},
},
})
return
}
// Ping their last seen time.
sub.lastPollAt = time.Now()
}
// If they are authenticated, handle this message.
if sub != nil && sub.authenticated {
s.OnClientMessage(sub, params.Message)
// If they use JWT authentication, give them a ping back with an updated
// JWT once in a while. Equivalent to the WebSockets pinger channel.
if time.Since(sub.lastPollJWT) > PingInterval {
sub.lastPollJWT = time.Now()
if sub.JWTClaims != nil {
if jwt, err := sub.JWTClaims.ReSign(); err != nil {
log.Error("ReSign JWT token for %s#%d: %s", sub.Username, sub.ID, err)
} else {
sub.SendJSON(messages.Message{
Action: messages.ActionPing,
JWTToken: jwt,
})
}
}
}
enc.Encode(sub.FlushPollResponse())
return
}
// Not authenticated: the only acceptable message is login.
if params.Message.Action != messages.ActionLogin {
w.WriteHeader(http.StatusBadRequest)
enc.Encode(PollResponseError("Not logged in."))
return
}
// Prepare a Subscriber object for them. Do not add it to the server
// roster unless their login succeeds.
ctx, cancel := context.WithCancel(r.Context())
sub = s.NewPollingSubscriber(ctx, cancel)
// Tentatively add them to the server. If they don't pass authentication,
// remove their subscriber immediately. Note: they need added here so they
// will receive their own "has entered the room" and WhoList updates.
s.AddSubscriber(sub)
s.OnLogin(sub, params.Message)
// Are they authenticated?
if sub.authenticated {
// Generate a SessionID number.
sessionID := uuid.New().String()
sub.sessionID = sessionID
log.Debug("Polling API: new user authenticated in: %s (sid %s)", sub.Username, sub.sessionID)
} else {
s.DeleteSubscriber(sub)
}
enc.Encode(sub.FlushPollResponse())
})
}

View File

@ -38,6 +38,7 @@ func (s *Server) Setup() error {
mux.Handle("/about", AboutPage())
mux.Handle("/logout", LogoutPage())
mux.Handle("/ws", s.WebSocket())
mux.Handle("/poll", s.PollingAPI())
mux.Handle("/api/statistics", s.Statistics())
mux.Handle("/api/blocklist", s.BlockList())
mux.Handle("/api/block/now", s.BlockNow())
@ -54,5 +55,7 @@ func (s *Server) Setup() error {
// ListenAndServe starts the web server.
func (s *Server) ListenAndServe(address string) error {
// Run the polling user idle kicker.
go s.KickIdlePollUsers()
return http.ListenAndServe(address, s.mux)
}

View File

@ -31,11 +31,19 @@ type Subscriber struct {
JWTClaims *jwt.Claims
authenticated bool // has passed the login step
loginAt time.Time
conn *websocket.Conn
ctx context.Context
cancel context.CancelFunc
messages chan []byte
closeSlow func()
// Connection details (WebSocket).
conn *websocket.Conn // WebSocket user
ctx context.Context
cancel context.CancelFunc
messages chan []byte
closeSlow func()
// Polling API users.
usePolling bool
sessionID string
lastPollAt time.Time
lastPollJWT time.Time // give a new JWT once in a while
muteMu sync.RWMutex
booted map[string]struct{} // usernames booted off your camera
@ -51,6 +59,100 @@ type Subscriber struct {
logfh map[string]io.WriteCloser
}
// NewSubscriber initializes a connected chat user.
func (s *Server) NewSubscriber(ctx context.Context, cancelFunc func()) *Subscriber {
return &Subscriber{
ctx: ctx,
cancel: cancelFunc,
messages: make(chan []byte, s.subscriberMessageBuffer),
booted: make(map[string]struct{}),
muted: make(map[string]struct{}),
blocked: make(map[string]struct{}),
messageIDs: make(map[int64]struct{}),
ChatStatus: "online",
}
}
// NewWebSocketSubscriber returns a new subscriber with a WebSocket connection.
func (s *Server) NewWebSocketSubscriber(ctx context.Context, conn *websocket.Conn, cancelFunc func()) *Subscriber {
sub := s.NewSubscriber(ctx, cancelFunc)
sub.conn = conn
sub.closeSlow = func() {
conn.Close(websocket.StatusPolicyViolation, "connection too slow to keep up with messages")
}
return sub
}
// NewPollingSubscriber returns a new subscriber using the polling API.
func (s *Server) NewPollingSubscriber(ctx context.Context, cancelFunc func()) *Subscriber {
sub := s.NewSubscriber(ctx, cancelFunc)
sub.usePolling = true
sub.lastPollAt = time.Now()
sub.lastPollJWT = time.Now()
sub.closeSlow = func() {
// Their outbox is filled up, disconnect them.
log.Error("Polling subscriber %s#%d: inbox is filled up!", sub.Username, sub.ID)
// Send an exit message.
if sub.authenticated && sub.ChatStatus != "hidden" {
sub.authenticated = false
s.Broadcast(messages.Message{
Action: messages.ActionPresence,
Username: sub.Username,
Message: "has exited the room!",
})
s.SendWhoList()
}
s.DeleteSubscriber(sub)
}
return sub
}
// OnClientMessage handles a chat protocol message from the user's WebSocket or polling API.
func (s *Server) OnClientMessage(sub *Subscriber, msg messages.Message) {
// What action are they performing?
switch msg.Action {
case messages.ActionLogin:
s.OnLogin(sub, msg)
case messages.ActionMessage:
s.OnMessage(sub, msg)
case messages.ActionFile:
s.OnFile(sub, msg)
case messages.ActionMe:
s.OnMe(sub, msg)
case messages.ActionOpen:
s.OnOpen(sub, msg)
case messages.ActionBoot:
s.OnBoot(sub, msg, true)
case messages.ActionUnboot:
s.OnBoot(sub, msg, false)
case messages.ActionMute, messages.ActionUnmute:
s.OnMute(sub, msg, msg.Action == messages.ActionMute)
case messages.ActionBlock:
s.OnBlock(sub, msg)
case messages.ActionBlocklist:
s.OnBlocklist(sub, msg)
case messages.ActionCandidate:
s.OnCandidate(sub, msg)
case messages.ActionSDP:
s.OnSDP(sub, msg)
case messages.ActionWatch:
s.OnWatch(sub, msg)
case messages.ActionUnwatch:
s.OnUnwatch(sub, msg)
case messages.ActionTakeback:
s.OnTakeback(sub, msg)
case messages.ActionReact:
s.OnReact(sub, msg)
case messages.ActionReport:
s.OnReport(sub, msg)
case messages.ActionPing:
default:
sub.ChatServer("Unsupported message type.")
}
}
// ReadLoop spawns a goroutine that reads from the websocket connection.
func (sub *Subscriber) ReadLoop(s *Server) {
go func() {
@ -88,45 +190,8 @@ func (sub *Subscriber) ReadLoop(s *Server) {
log.Debug("Read(%d=%s): %s", sub.ID, sub.Username, data)
}
// What action are they performing?
switch msg.Action {
case messages.ActionLogin:
s.OnLogin(sub, msg)
case messages.ActionMessage:
s.OnMessage(sub, msg)
case messages.ActionFile:
s.OnFile(sub, msg)
case messages.ActionMe:
s.OnMe(sub, msg)
case messages.ActionOpen:
s.OnOpen(sub, msg)
case messages.ActionBoot:
s.OnBoot(sub, msg, true)
case messages.ActionUnboot:
s.OnBoot(sub, msg, false)
case messages.ActionMute, messages.ActionUnmute:
s.OnMute(sub, msg, msg.Action == messages.ActionMute)
case messages.ActionBlock:
s.OnBlock(sub, msg)
case messages.ActionBlocklist:
s.OnBlocklist(sub, msg)
case messages.ActionCandidate:
s.OnCandidate(sub, msg)
case messages.ActionSDP:
s.OnSDP(sub, msg)
case messages.ActionWatch:
s.OnWatch(sub, msg)
case messages.ActionUnwatch:
s.OnUnwatch(sub, msg)
case messages.ActionTakeback:
s.OnTakeback(sub, msg)
case messages.ActionReact:
s.OnReact(sub, msg)
case messages.ActionReport:
s.OnReport(sub, msg)
default:
sub.ChatServer("Unsupported message type.")
}
// Handle their message.
s.OnClientMessage(sub, msg)
}
}()
}
@ -202,20 +267,7 @@ func (s *Server) WebSocket() http.HandlerFunc {
// ctx := c.CloseRead(r.Context())
ctx, cancel := context.WithCancel(r.Context())
sub := &Subscriber{
conn: c,
ctx: ctx,
cancel: cancel,
messages: make(chan []byte, s.subscriberMessageBuffer),
closeSlow: func() {
c.Close(websocket.StatusPolicyViolation, "connection too slow to keep up with messages")
},
booted: make(map[string]struct{}),
muted: make(map[string]struct{}),
blocked: make(map[string]struct{}),
messageIDs: make(map[int64]struct{}),
ChatStatus: "online",
}
sub := s.NewWebSocketSubscriber(ctx, c, cancel)
s.AddSubscriber(sub)
defer s.DeleteSubscriber(sub)
@ -280,6 +332,10 @@ func (s *Server) GetSubscriber(username string) (*Subscriber, error) {
// DeleteSubscriber removes a subscriber from the server.
func (s *Server) DeleteSubscriber(sub *Subscriber) {
if sub == nil {
return
}
log.Error("DeleteSubscriber: %s", sub.Username)
// Cancel its context to clean up the for-loop goroutine.

View File

@ -13,6 +13,7 @@ import WhoListRow from './components/WhoListRow.vue';
import VideoFeed from './components/VideoFeed.vue';
import ProfileModal from './components/ProfileModal.vue';
import ChatClient from './lib/ChatClient';
import LocalStorage from './lib/LocalStorage';
import VideoFlag from './lib/VideoFlag';
import { SoundEffects, DefaultSounds } from './lib/sounds';
@ -129,10 +130,8 @@ export default {
idleThreshold: 300, // number of seconds you must be idle
// WebSocket connection.
ws: {
conn: null,
connected: false,
},
// Initialized in the dial() function.
client: {},
// Who List for the room.
whoList: [],
@ -144,6 +143,7 @@ export default {
// Misc. user preferences (TODO: move all of them here)
prefs: {
usePolling: false, // use the polling API instead of WebSockets.
joinMessages: true, // show "has entered the room" in public channels
exitMessages: false, // hide exit messages by default in public channels
watchNotif: true, // notify in chat about cameras being watched
@ -462,6 +462,12 @@ export default {
"prefs.muteSounds": function () {
LocalStorage.set('muteSounds', this.prefs.muteSounds);
},
"prefs.usePolling": function () {
LocalStorage.set('usePolling', this.prefs.usePolling);
// Reset the chat client on change.
this.resetChatClient();
},
"prefs.closeDMs": function () {
LocalStorage.set('closeDMs', this.prefs.closeDMs);
@ -470,6 +476,12 @@ export default {
},
},
computed: {
connected() {
if (this.client.connected != undefined) {
return this.client.connected();
}
return false;
},
chatHistory() {
if (this.channels[this.channel] == undefined) {
return [];
@ -773,6 +785,9 @@ export default {
}
// Misc preferences
if (settings.usePolling != undefined) {
this.prefs.usePolling = settings.usePolling === true;
}
if (settings.joinMessages != undefined) {
this.prefs.joinMessages = settings.joinMessages === true;
}
@ -828,7 +843,7 @@ export default {
return;
}
if (!this.ws.connected) {
if (!this.connected) {
this.ChatClient("You are not connected to the server.");
return;
}
@ -842,12 +857,12 @@ export default {
// If they do it twice, kick them from the room.
if (this.spamWarningCount >= 1) {
// Walk of shame.
this.ws.conn.send(JSON.stringify({
this.client.send({
action: "message",
channel: "lobby",
message: "**(Message of Shame)** I have been naughty and posted spam in chat despite being warned, " +
"and I am now being kicked from the room in shame. ☹️",
}));
});
this.ChatServer(
"It is <strong>not allowed</strong> to promote your Onlyfans (or similar) " +
@ -861,9 +876,9 @@ export default {
action: "presence",
});
this.disconnect = true;
this.ws.connected = false;
this.client.ws.connected = false;
setTimeout(() => {
this.ws.conn.close();
this.client.disconnect();
}, 1000);
return;
}
@ -922,11 +937,11 @@ export default {
}
// console.debug("Send message: %s", this.message);
this.ws.conn.send(JSON.stringify({
this.client.send({
action: "message",
channel: this.channel,
message: this.message,
}));
});
this.message = "";
},
@ -937,11 +952,11 @@ export default {
// Emoji reactions
sendReact(message, emoji) {
this.ws.conn.send(JSON.stringify({
this.client.send({
action: 'react',
msgID: message.msgID,
message: emoji,
}));
});
},
onReact(msg) {
// Search all channels for this message ID and append the reaction.
@ -980,13 +995,13 @@ export default {
// Sync the current user state (such as video broadcasting status) to
// the backend, which will reload everybody's Who List.
sendMe() {
if (!this.ws.connected) return;
this.ws.conn.send(JSON.stringify({
if (!this.connected) return;
this.client.send({
action: "me",
video: this.myVideoFlag,
status: this.status,
dnd: this.prefs.closeDMs,
}));
});
},
onMe(msg) {
// We have had settings pushed to us by the server, such as a change
@ -1145,10 +1160,10 @@ export default {
}
},
sendMute(username, mute) {
this.ws.conn.send(JSON.stringify({
this.client.send({
action: mute ? "mute" : "unmute",
username: username,
}));
});
},
isMutedUser(username) {
return this.muted[this.normalizeUsername(username)] != undefined;
@ -1169,30 +1184,30 @@ export default {
}
// Send the username list to the server.
this.ws.conn.send(JSON.stringify({
this.client.send({
action: "blocklist",
usernames: blocklist,
}))
});
},
// Send a video request to access a user's camera.
sendOpen(username) {
this.ws.conn.send(JSON.stringify({
this.client.send({
action: "open",
username: username,
}));
});
},
sendBoot(username) {
this.ws.conn.send(JSON.stringify({
this.client.send({
action: "boot",
username: username,
}));
});
},
sendUnboot(username) {
this.ws.conn.send(JSON.stringify({
this.client.send({
action: "unboot",
username: username,
}));
});
},
onOpen(msg) {
// Response for the opener to begin WebRTC connection.
@ -1262,12 +1277,22 @@ export default {
isJoin = true;
}
// Push it to the history of all public channels (depending on user preference).
// Push it to the history of the public channels (respecting user preferences).
if ((isJoin && this.prefs.joinMessages) || (isLeave && this.prefs.exitMessages)
|| (!isJoin && !isLeave)) {
for (let channel of this.config.channels) {
// Always put them in the first public channel.
let channel = this.config.channels[0];
this.pushHistory({
channel: channel.ID,
action: msg.action,
username: msg.username,
message: msg.message,
});
// If the current user is focused on another public channel, also post it there.
if (!this.isDM && this.channel !== channel.ID) {
this.pushHistory({
channel: channel.ID,
channel: this.channel.ID,
action: msg.action,
username: msg.username,
message: msg.message,
@ -1289,141 +1314,55 @@ export default {
// Dial the WebSocket connection.
dial() {
this.ChatClient("Establishing connection to server...");
// Set up the ChatClient connection.
this.client = new ChatClient({
usePolling: this.prefs.usePolling,
onClientError: this.ChatClient,
const proto = location.protocol === 'https:' ? 'wss' : 'ws';
const conn = new WebSocket(`${proto}://${location.host}/ws`);
username: this.username,
jwt: this.jwt,
prefs: this.prefs,
conn.addEventListener("close", ev => {
// Lost connection to server - scrub who list.
this.onWho({ whoList: [] });
this.muted = {};
onWho: this.onWho,
onMe: this.onMe,
onMessage: this.onMessage,
onTakeback: this.onTakeback,
onReact: this.onReact,
onPresence: this.onPresence,
onRing: this.onRing,
onOpen: this.onOpen,
onCandidate: this.onCandidate,
onSDP: this.onSDP,
onWatch: this.onWatch,
onUnwatch: this.onUnwatch,
onBlock: this.onBlock,
this.ws.connected = false;
this.ChatClient(`WebSocket Disconnected code: ${ev.code}, reason: ${ev.reason}`);
this.disconnectCount++;
if (this.disconnectCount > this.disconnectLimit) {
this.ChatClient(`It seems there's a problem connecting to the server. Please try some other time.`);
return;
}
if (!this.disconnect) {
if (ev.code !== 1001 && ev.code !== 1000) {
this.ChatClient("Reconnecting in 5s");
setTimeout(this.dial, 5000);
}
}
});
conn.addEventListener("open", ev => {
this.ws.connected = true;
this.ChatClient("Websocket connected!");
// Upload our blocklist to the server before login. This resolves a bug where if a block
// was added recently (other user still online in chat), that user would briefly see your
// "has entered the room" message followed by you immediately not being online.
this.bulkMuteUsers();
// Tell the server our username.
this.ws.conn.send(JSON.stringify({
action: "login",
username: this.username,
jwt: this.jwt.token,
dnd: this.prefs.closeDMs,
}));
// Focus the message entry box.
window.requestAnimationFrame(() => {
bulkMuteUsers: this.bulkMuteUsers,
focusMessageBox: () => {
this.messageBox.focus();
});
},
pushHistory: this.pushHistory,
onNewJWT: jwt => {
this.jwt.token = jwt;
},
});
conn.addEventListener("message", ev => {
if (typeof ev.data !== "string") {
console.error("unexpected message type", typeof ev.data);
return;
}
this.client.dial();
},
resetChatClient() {
if (!this.connected) return;
let msg = JSON.parse(ev.data);
try {
// Cast timestamp to date.
msg.at = new Date(msg.at);
} catch (e) {
console.error("Parsing timestamp '%s' on msg: %s", msg.at, e);
}
// Reset the ChatClient, e.g. when toggling between WebSocket vs. Polling methods.
this.ChatClient(
"Your connection method to the chat server has been updated; attempting to reconnect now.",
);
switch (msg.action) {
case "who":
this.onWho(msg);
break;
case "me":
this.onMe(msg);
break;
case "message":
this.onMessage(msg);
break;
case "takeback":
this.onTakeback(msg);
break;
case "react":
this.onReact(msg);
break;
case "presence":
this.onPresence(msg);
break;
case "ring":
this.onRing(msg);
break;
case "open":
this.onOpen(msg);
break;
case "candidate":
this.onCandidate(msg);
break;
case "sdp":
this.onSDP(msg);
break;
case "watch":
this.onWatch(msg);
break;
case "unwatch":
this.onUnwatch(msg);
break;
case "block":
this.onBlock(msg);
break;
case "error":
this.pushHistory({
channel: msg.channel,
username: msg.username || 'Internal Server Error',
message: msg.message,
isChatServer: true,
});
break;
case "disconnect":
this.onWho({ whoList: [] });
this.disconnect = true;
this.ws.connected = false;
this.ws.conn.close(1000, "server asked to close the connection");
break;
case "ping":
// New JWT token?
if (msg.jwt) {
this.jwt.token = msg.jwt;
}
// Reset disconnect retry counter: if we were on long enough to get
// a ping, we're well connected and can reconnect no matter how many
// times the chat server is rebooted.
this.disconnectCount = 0;
break;
default:
console.error("Unexpected action: %s", JSON.stringify(msg));
}
window.requestAnimationFrame(() => {
this.client.disconnect();
setTimeout(() => {
this.dial();
}, 1000);
});
this.ws.conn = conn;
},
/**
@ -1462,11 +1401,11 @@ export default {
// message to the other peer through the signaling server.
pc.onicecandidate = event => {
if (event.candidate) {
this.ws.conn.send(JSON.stringify({
this.client.send({
action: "candidate",
username: username,
candidate: JSON.stringify(event.candidate),
}));
});
}
};
@ -1587,11 +1526,11 @@ export default {
localDescCreated(pc, username) {
return (desc) => {
pc.setLocalDescription(desc).then(() => {
this.ws.conn.send(JSON.stringify({
this.client.send({
action: "sdp",
username: username,
description: JSON.stringify(pc.localDescription),
}));
});
}).catch(e => {
console.error(`Error sending WebRTC negotiation message (SDP): ${e}`);
});
@ -1659,10 +1598,10 @@ export default {
},
sendWatch(username, watching) {
// Send the watch or unwatch message to backend.
this.ws.conn.send(JSON.stringify({
this.client.send({
action: watching ? "watch" : "unwatch",
username: username,
}));
});
},
isWatchingMe(username) {
// Return whether the user is watching your camera
@ -1825,10 +1764,10 @@ export default {
"Do you want to take this message back? Doing so will remove this message from everybody's view in the chat room."
)) return;
this.ws.conn.send(JSON.stringify({
this.client.send({
action: "takeback",
msgID: msg.msgID,
}));
});
},
removeMessage(msg) {
if (!window.confirm(
@ -2863,10 +2802,9 @@ export default {
// Attach the file to the message.
msg.message = file.name;
msg.bytes = fileByteArray;
msg = JSON.stringify(msg);
// Send it to the chat server.
this.ws.conn.send(msg);
this.client.send(msg);
};
reader.readAsArrayBuffer(file);
@ -3066,7 +3004,7 @@ export default {
let msg = this.reportModal.message;
this.ws.conn.send(JSON.stringify({
this.client.send({
action: "report",
channel: msg.channel,
username: msg.username,
@ -3074,7 +3012,7 @@ export default {
reason: classification,
message: msg.message,
comment: comment,
}));
});
this.reportModal.busy = false;
this.reportModal.visible = false;
@ -3470,6 +3408,31 @@ export default {
</p>
</div>
<div class="field">
<label class="label mb-0">
Server Connection Method
</label>
<label class="checkbox">
<input type="radio" v-model="prefs.usePolling" :value="false">
WebSockets (realtime connection; recommended for most people)
</label>
<label class="checkbox">
<input type="radio" v-model="prefs.usePolling" :value="true">
Polling (check for new messages every 5 seconds)
</label>
<p class="help">
By default the chat server requires a constant WebSockets connection to stay online.
If you are experiencing frequent disconnects (e.g. because you are on a slow or
unstable network connection), try switching to the "Polling" method which will be
more robust, at the cost of up to 5-seconds latency to receive new messages.
<!-- If disconnected currently, tell them to refresh. -->
<span v-if="!connected" class="has-text-danger">
Notice: you may need to refresh the chat page after changing this setting.
</span>
</p>
</div>
</div>
</div>
@ -3992,7 +3955,7 @@ export default {
<!-- My text box -->
<input type="text" class="input" id="messageBox" v-model="message"
placeholder="Write a message" @keydown="sendTypingNotification()" autocomplete="off"
:disabled="!ws.connected">
:disabled="!client.connected">
<!-- At Mention templates-->
<template #no-result>

339
src/lib/ChatClient.js Normal file
View File

@ -0,0 +1,339 @@
// WebSocket chat client handler.
class ChatClient {
/**
* Constructor for the client.
*
* @param usePolling: instead of WebSocket use the ajax polling API.
* @param onClientError: function to receive 'ChatClient' messages to
* add to the chat room (this.ChatClient())
*/
constructor({
usePolling=false,
onClientError,
username,
jwt, // JWT token for authorization
prefs, // User preferences for 'me' action (close DMs, etc)
// Chat Protocol handler functions for the caller.
onWho,
onMe,
onMessage,
onTakeback,
onReact,
onPresence,
onRing,
onOpen,
onCandidate,
onSDP,
onWatch,
onUnwatch,
onBlock,
// Misc function registrations for callback.
onNewJWT, // new JWT token from ping response
bulkMuteUsers, // Upload our blocklist on connect.
focusMessageBox, // Tell caller to focus the message entry box.
pushHistory,
}) {
this.usePolling = usePolling;
// Pointer to the 'ChatClient(message)' command from the main app.
this.ChatClient = onClientError;
this.username = username;
this.jwt = jwt;
this.prefs = prefs;
// Register the handler functions.
this.onWho = onWho;
this.onMe = onMe;
this.onMessage = onMessage;
this.onTakeback = onTakeback;
this.onReact = onReact;
this.onPresence = onPresence;
this.onRing = onRing;
this.onOpen = onOpen;
this.onCandidate = onCandidate;
this.onSDP = onSDP;
this.onWatch = onWatch;
this.onUnwatch = onUnwatch;
this.onBlock = onBlock;
this.onNewJWT = onNewJWT;
this.bulkMuteUsers = bulkMuteUsers;
this.focusMessageBox = focusMessageBox;
this.pushHistory = pushHistory;
// WebSocket connection.
this.ws = {
conn: null,
connected: false,
// Disconnect spamming: don't retry too many times.
reconnect: true, // unless told to go away
disconnectLimit: 2,
disconnectCount: 0,
};
// Polling connection.
this.polling = {
username: "",
sessionID: "",
timeout: null, // setTimeout for next poll.
}
}
// Connected polls if the client is connected.
connected() {
if (this.usePolling) {
return this.polling.timeout != null && this.polling.sessionID != "";
}
return this.ws.connected;
}
// Disconnect from the server.
disconnect() {
if (this.usePolling) {
this.polling.sessionID = "";
this.polling.username = "";
this.stopPolling();
this.ChatClient("You have disconnected from the server.");
return;
}
this.ws.connected = false;
this.ws.conn.close(1000, "server asked to close the connection");
}
// Common function to send a message to the server. The message
// is a JSON object before stringify.
send(message) {
if (this.usePolling) {
fetch("/poll", {
method: "POST",
mode: "same-origin",
cache: "no-cache",
credentials: "same-origin",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({
username: this.polling.username,
session_id: this.polling.sessionID,
msg: message,
})
}).then(resp => resp.json()).then(resp => {
console.log(resp);
// Store sessionID information.
this.polling.sessionID = resp.session_id;
this.polling.username = resp.username;
for (let msg of resp.messages) {
this.handle(msg);
}
}).catch(err => {
this.ChatClient("Error from polling API: " + err);
});
return;
}
if (!this.ws.connected) {
this.ChatClient("Couldn't send WebSocket message: not connected.");
return;
}
if (typeof(message) !== "string") {
message = JSON.stringify(message);
}
this.ws.conn.send(message);
}
// Common function to handle a message from the server.
handle(msg) {
switch (msg.action) {
case "who":
this.onWho(msg);
break;
case "me":
this.onMe(msg);
break;
case "message":
this.onMessage(msg);
break;
case "takeback":
this.onTakeback(msg);
break;
case "react":
this.onReact(msg);
break;
case "presence":
this.onPresence(msg);
break;
case "ring":
this.onRing(msg);
break;
case "open":
this.onOpen(msg);
break;
case "candidate":
this.onCandidate(msg);
break;
case "sdp":
this.onSDP(msg);
break;
case "watch":
this.onWatch(msg);
break;
case "unwatch":
this.onUnwatch(msg);
break;
case "block":
this.onBlock(msg);
break;
case "error":
this.pushHistory({
channel: msg.channel,
username: msg.username || 'Internal Server Error',
message: msg.message,
isChatServer: true,
});
break;
case "disconnect":
this.onWho({ whoList: [] });
this.ws.reconnect = false;
this.disconnect();
break;
case "ping":
// New JWT token?
if (msg.jwt) {
this.onNewJWT(msg.jwt);
}
// Reset disconnect retry counter: if we were on long enough to get
// a ping, we're well connected and can reconnect no matter how many
// times the chat server is rebooted.
this.ws.disconnectCount = 0;
break;
default:
console.error("Unexpected action: %s", JSON.stringify(msg));
}
}
// Dial the WebSocket.
dial() {
// Polling API?
if (this.usePolling) {
this.ChatClient("Connecting to the server via polling API...");
this.startPolling();
// Log in now.
this.send({
action: "login",
username: this.username,
jwt: this.jwt.token,
dnd: this.prefs.closeDMs,
});
return;
}
this.ChatClient("Establishing connection to server...");
const proto = location.protocol === 'https:' ? 'wss' : 'ws';
const conn = new WebSocket(`${proto}://${location.host}/ws`);
conn.addEventListener("close", ev => {
// Lost connection to server - scrub who list.
this.onWho({ whoList: [] });
this.ws.connected = false;
this.ChatClient(`WebSocket Disconnected code: ${ev.code}, reason: ${ev.reason}`);
this.ws.disconnectCount++;
if (this.ws.disconnectCount > this.ws.disconnectLimit) {
this.ChatClient(
`It seems there's a problem connecting to the server. Please try some other time.<br><br>` +
`If you experience this problem frequently, try going into the Chat Settings 'Misc' tab ` +
`and switch to the 'Polling' Server Connection method.`
);
return;
}
if (this.ws.reconnect) {
if (ev.code !== 1001 && ev.code !== 1000) {
this.ChatClient("Reconnecting in 5s");
setTimeout(() => {
this.dial();
}, 5000);
}
}
});
conn.addEventListener("open", ev => {
this.ws.connected = true;
this.ChatClient("Websocket connected!");
// Upload our blocklist to the server before login. This resolves a bug where if a block
// was added recently (other user still online in chat), that user would briefly see your
// "has entered the room" message followed by you immediately not being online.
this.bulkMuteUsers();
// Tell the server our username.
this.send({
action: "login",
username: this.username,
jwt: this.jwt.token,
dnd: this.prefs.closeDMs,
});
// Focus the message entry box.
window.requestAnimationFrame(() => {
this.focusMessageBox();
});
});
conn.addEventListener("message", ev => {
if (typeof ev.data !== "string") {
console.error("unexpected message type", typeof ev.data);
return;
}
let msg = JSON.parse(ev.data);
this.handle(msg);
});
this.ws.conn = conn;
}
// Start the polling interval.
startPolling() {
if (!this.usePolling) return;
this.stopPolling();
this.polling.timeout = setTimeout(() => {
this.poll();
this.startPolling();
}, 5000);
}
// Poll the API.
poll() {
if (!this.usePolling) {
this.stopPolling();
return;
}
this.send({
action: "ping",
});
this.startPolling();
}
// Stop polling.
stopPolling() {
if (this.polling.timeout != null) {
clearTimeout(this.polling.timeout);
}
}
}
export default ChatClient;

View File

@ -18,6 +18,7 @@ const keys = {
'rememberExpresslyClosed': Boolean,
// Booleans
'usePolling': Boolean, // use the polling API instead of WebSocket
'joinMessages': Boolean,
'exitMessages': Boolean,
'watchNotif': Boolean,