Polling API for the chat room

polling-api
Noah 2023-12-10 18:43:18 -08:00
parent d57d41ea3a
commit 0e0aac991d
6 changed files with 528 additions and 77 deletions

240
pkg/polling_api.go Normal file
View File

@ -0,0 +1,240 @@
package barertc
import (
"context"
"encoding/json"
"net/http"
"time"
"git.kirsle.net/apps/barertc/pkg/log"
"git.kirsle.net/apps/barertc/pkg/messages"
"git.kirsle.net/apps/barertc/pkg/util"
"github.com/google/uuid"
)
// Polling user timeout before disconnecting them.
const PollingUserTimeout = time.Minute
// JSON payload structure for polling API.
type PollMessage struct {
// Send the username after authenticated.
Username string `json:"username,omitempty"`
// SessionID for authentication.
SessionID string `json:"session_id,omitempty"`
// BareRTC protocol message.
Message messages.Message `json:"msg"`
}
type PollResponse struct {
// Session ID.
Username string `json:"username,omitempty"`
SessionID string `json:"session_id,omitempty"`
// Pending messages.
Messages []messages.Message `json:"messages"`
}
// Helper method to send an error as a PollResponse.
func PollResponseError(message string) PollResponse {
return PollResponse{
Messages: []messages.Message{
{
Action: messages.ActionError,
Username: "ChatServer",
Message: message,
},
},
}
}
// KickIdlePollUsers is a goroutine that will disconnect polling API users
// who haven't been seen in a while.
func (s *Server) KickIdlePollUsers() {
log.Debug("KickIdlePollUsers goroutine engaged")
for {
time.Sleep(10 * time.Second)
for _, sub := range s.IterSubscribers() {
if sub.usePolling && time.Since(sub.lastPollAt) > PollingUserTimeout {
// Send an exit message.
if sub.authenticated && sub.ChatStatus != "hidden" {
log.Error("KickIdlePollUsers: %s last seen %s ago", sub.Username, sub.lastPollAt)
sub.authenticated = false
s.Broadcast(messages.Message{
Action: messages.ActionPresence,
Username: sub.Username,
Message: "has timed out!",
})
s.SendWhoList()
}
s.DeleteSubscriber(sub)
}
}
}
}
// FlushPollResponse returns a response for the polling API that will flush
// all pending messages sent to the client.
func (sub *Subscriber) FlushPollResponse() PollResponse {
var msgs = []messages.Message{}
// Drain the messages from the outbox channel.
for len(sub.messages) > 0 {
message := <-sub.messages
var msg messages.Message
json.Unmarshal(message, &msg)
msgs = append(msgs, msg)
}
return PollResponse{
Username: sub.Username,
SessionID: sub.sessionID,
Messages: msgs,
}
}
// Functions for the Polling API as an alternative to WebSockets.
func (s *Server) PollingAPI() http.HandlerFunc {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ip := util.IPAddress(r)
// JSON writer for the response.
w.Header().Set("Content-Type", "application/json")
enc := json.NewEncoder(w)
enc.SetIndent("", " ")
// Parse the request.
if r.Method != http.MethodPost {
w.WriteHeader(http.StatusBadRequest)
enc.Encode(PollResponseError("Only POST methods allowed"))
return
} else if r.Header.Get("Content-Type") != "application/json" {
w.WriteHeader(http.StatusBadRequest)
enc.Encode(PollResponseError("Only application/json content-types allowed"))
return
}
defer r.Body.Close()
// Parse the request payload.
var (
params PollMessage
dec = json.NewDecoder(r.Body)
)
if err := dec.Decode(&params); err != nil {
w.WriteHeader(http.StatusBadRequest)
enc.Encode(PollResponseError(err.Error()))
return
}
// Debug logging.
log.Debug("Polling connection from %s - %s", ip, r.Header.Get("User-Agent"))
// Are they resuming an authenticated session?
var sub *Subscriber
if params.Username != "" || params.SessionID != "" {
if params.Username == "" || params.SessionID == "" {
w.WriteHeader(http.StatusBadRequest)
enc.Encode(PollResponseError("Authentication error: SessionID and Username both required."))
return
}
log.Debug("Polling API: check if %s (%s) is authenticated", params.Username, params.SessionID)
// Look up the subscriber.
var (
authOK bool
err error
)
sub, err = s.GetSubscriber(params.Username)
if err == nil {
// Validate the SessionID.
if sub.sessionID == params.SessionID {
authOK = true
}
}
// Authentication error.
if !authOK {
s.DeleteSubscriber(sub)
w.WriteHeader(http.StatusBadRequest)
enc.Encode(PollResponse{
Messages: []messages.Message{
{
Action: messages.ActionError,
Username: "ChatServer",
Message: "Your authentication has expired, please log back into the chat again.",
},
{
Action: messages.ActionKick,
},
},
})
return
}
// Ping their last seen time.
sub.lastPollAt = time.Now()
}
// If they are authenticated, handle this message.
if sub != nil && sub.authenticated {
s.OnClientMessage(sub, params.Message)
// If they use JWT authentication, give them a ping back with an updated
// JWT once in a while. Equivalent to the WebSockets pinger channel.
if time.Since(sub.lastPollJWT) > PingInterval {
sub.lastPollJWT = time.Now()
if sub.JWTClaims != nil {
if jwt, err := sub.JWTClaims.ReSign(); err != nil {
log.Error("ReSign JWT token for %s#%d: %s", sub.Username, sub.ID, err)
} else {
sub.SendJSON(messages.Message{
Action: messages.ActionPing,
JWTToken: jwt,
})
}
}
}
enc.Encode(sub.FlushPollResponse())
return
}
// Not authenticated: the only acceptable message is login.
if params.Message.Action != messages.ActionLogin {
w.WriteHeader(http.StatusBadRequest)
enc.Encode(PollResponseError("Not logged in."))
return
}
// Prepare a Subscriber object for them. Do not add it to the server
// roster unless their login succeeds.
ctx, cancel := context.WithCancel(r.Context())
sub = s.NewPollingSubscriber(ctx, cancel)
// Tentatively add them to the server. If they don't pass authentication,
// remove their subscriber immediately. Note: they need added here so they
// will receive their own "has entered the room" and WhoList updates.
s.AddSubscriber(sub)
s.OnLogin(sub, params.Message)
// Are they authenticated?
if sub.authenticated {
// Generate a SessionID number.
sessionID := uuid.New().String()
sub.sessionID = sessionID
log.Debug("Polling API: new user authenticated in: %s (sid %s)", sub.Username, sub.sessionID)
} else {
s.DeleteSubscriber(sub)
}
enc.Encode(sub.FlushPollResponse())
})
}

View File

@ -38,6 +38,7 @@ func (s *Server) Setup() error {
mux.Handle("/about", AboutPage())
mux.Handle("/logout", LogoutPage())
mux.Handle("/ws", s.WebSocket())
mux.Handle("/poll", s.PollingAPI())
mux.Handle("/api/statistics", s.Statistics())
mux.Handle("/api/blocklist", s.BlockList())
mux.Handle("/api/block/now", s.BlockNow())
@ -54,5 +55,7 @@ func (s *Server) Setup() error {
// ListenAndServe starts the web server.
func (s *Server) ListenAndServe(address string) error {
// Run the polling user idle kicker.
go s.KickIdlePollUsers()
return http.ListenAndServe(address, s.mux)
}

View File

@ -31,11 +31,19 @@ type Subscriber struct {
JWTClaims *jwt.Claims
authenticated bool // has passed the login step
loginAt time.Time
conn *websocket.Conn
ctx context.Context
cancel context.CancelFunc
messages chan []byte
closeSlow func()
// Connection details (WebSocket).
conn *websocket.Conn // WebSocket user
ctx context.Context
cancel context.CancelFunc
messages chan []byte
closeSlow func()
// Polling API users.
usePolling bool
sessionID string
lastPollAt time.Time
lastPollJWT time.Time // give a new JWT once in a while
muteMu sync.RWMutex
booted map[string]struct{} // usernames booted off your camera
@ -51,6 +59,100 @@ type Subscriber struct {
logfh map[string]io.WriteCloser
}
// NewSubscriber initializes a connected chat user.
func (s *Server) NewSubscriber(ctx context.Context, cancelFunc func()) *Subscriber {
return &Subscriber{
ctx: ctx,
cancel: cancelFunc,
messages: make(chan []byte, s.subscriberMessageBuffer),
booted: make(map[string]struct{}),
muted: make(map[string]struct{}),
blocked: make(map[string]struct{}),
messageIDs: make(map[int64]struct{}),
ChatStatus: "online",
}
}
// NewWebSocketSubscriber returns a new subscriber with a WebSocket connection.
func (s *Server) NewWebSocketSubscriber(ctx context.Context, conn *websocket.Conn, cancelFunc func()) *Subscriber {
sub := s.NewSubscriber(ctx, cancelFunc)
sub.conn = conn
sub.closeSlow = func() {
conn.Close(websocket.StatusPolicyViolation, "connection too slow to keep up with messages")
}
return sub
}
// NewPollingSubscriber returns a new subscriber using the polling API.
func (s *Server) NewPollingSubscriber(ctx context.Context, cancelFunc func()) *Subscriber {
sub := s.NewSubscriber(ctx, cancelFunc)
sub.usePolling = true
sub.lastPollAt = time.Now()
sub.lastPollJWT = time.Now()
sub.closeSlow = func() {
// Their outbox is filled up, disconnect them.
log.Error("Polling subscriber %s#%d: inbox is filled up!", sub.Username, sub.ID)
// Send an exit message.
if sub.authenticated && sub.ChatStatus != "hidden" {
sub.authenticated = false
s.Broadcast(messages.Message{
Action: messages.ActionPresence,
Username: sub.Username,
Message: "has exited the room!",
})
s.SendWhoList()
}
s.DeleteSubscriber(sub)
}
return sub
}
// OnClientMessage handles a chat protocol message from the user's WebSocket or polling API.
func (s *Server) OnClientMessage(sub *Subscriber, msg messages.Message) {
// What action are they performing?
switch msg.Action {
case messages.ActionLogin:
s.OnLogin(sub, msg)
case messages.ActionMessage:
s.OnMessage(sub, msg)
case messages.ActionFile:
s.OnFile(sub, msg)
case messages.ActionMe:
s.OnMe(sub, msg)
case messages.ActionOpen:
s.OnOpen(sub, msg)
case messages.ActionBoot:
s.OnBoot(sub, msg, true)
case messages.ActionUnboot:
s.OnBoot(sub, msg, false)
case messages.ActionMute, messages.ActionUnmute:
s.OnMute(sub, msg, msg.Action == messages.ActionMute)
case messages.ActionBlock:
s.OnBlock(sub, msg)
case messages.ActionBlocklist:
s.OnBlocklist(sub, msg)
case messages.ActionCandidate:
s.OnCandidate(sub, msg)
case messages.ActionSDP:
s.OnSDP(sub, msg)
case messages.ActionWatch:
s.OnWatch(sub, msg)
case messages.ActionUnwatch:
s.OnUnwatch(sub, msg)
case messages.ActionTakeback:
s.OnTakeback(sub, msg)
case messages.ActionReact:
s.OnReact(sub, msg)
case messages.ActionReport:
s.OnReport(sub, msg)
case messages.ActionPing:
default:
sub.ChatServer("Unsupported message type.")
}
}
// ReadLoop spawns a goroutine that reads from the websocket connection.
func (sub *Subscriber) ReadLoop(s *Server) {
go func() {
@ -88,45 +190,8 @@ func (sub *Subscriber) ReadLoop(s *Server) {
log.Debug("Read(%d=%s): %s", sub.ID, sub.Username, data)
}
// What action are they performing?
switch msg.Action {
case messages.ActionLogin:
s.OnLogin(sub, msg)
case messages.ActionMessage:
s.OnMessage(sub, msg)
case messages.ActionFile:
s.OnFile(sub, msg)
case messages.ActionMe:
s.OnMe(sub, msg)
case messages.ActionOpen:
s.OnOpen(sub, msg)
case messages.ActionBoot:
s.OnBoot(sub, msg, true)
case messages.ActionUnboot:
s.OnBoot(sub, msg, false)
case messages.ActionMute, messages.ActionUnmute:
s.OnMute(sub, msg, msg.Action == messages.ActionMute)
case messages.ActionBlock:
s.OnBlock(sub, msg)
case messages.ActionBlocklist:
s.OnBlocklist(sub, msg)
case messages.ActionCandidate:
s.OnCandidate(sub, msg)
case messages.ActionSDP:
s.OnSDP(sub, msg)
case messages.ActionWatch:
s.OnWatch(sub, msg)
case messages.ActionUnwatch:
s.OnUnwatch(sub, msg)
case messages.ActionTakeback:
s.OnTakeback(sub, msg)
case messages.ActionReact:
s.OnReact(sub, msg)
case messages.ActionReport:
s.OnReport(sub, msg)
default:
sub.ChatServer("Unsupported message type.")
}
// Handle their message.
s.OnClientMessage(sub, msg)
}
}()
}
@ -202,20 +267,7 @@ func (s *Server) WebSocket() http.HandlerFunc {
// ctx := c.CloseRead(r.Context())
ctx, cancel := context.WithCancel(r.Context())
sub := &Subscriber{
conn: c,
ctx: ctx,
cancel: cancel,
messages: make(chan []byte, s.subscriberMessageBuffer),
closeSlow: func() {
c.Close(websocket.StatusPolicyViolation, "connection too slow to keep up with messages")
},
booted: make(map[string]struct{}),
muted: make(map[string]struct{}),
blocked: make(map[string]struct{}),
messageIDs: make(map[int64]struct{}),
ChatStatus: "online",
}
sub := s.NewWebSocketSubscriber(ctx, c, cancel)
s.AddSubscriber(sub)
defer s.DeleteSubscriber(sub)
@ -280,6 +332,10 @@ func (s *Server) GetSubscriber(username string) (*Subscriber, error) {
// DeleteSubscriber removes a subscriber from the server.
func (s *Server) DeleteSubscriber(sub *Subscriber) {
if sub == nil {
return
}
log.Error("DeleteSubscriber: %s", sub.Username)
// Cancel its context to clean up the for-loop goroutine.

View File

@ -143,6 +143,7 @@ export default {
// Misc. user preferences (TODO: move all of them here)
prefs: {
usePolling: false, // use the polling API instead of WebSockets.
joinMessages: true, // show "has entered the room" in public channels
exitMessages: false, // hide exit messages by default in public channels
watchNotif: true, // notify in chat about cameras being watched
@ -461,6 +462,12 @@ export default {
"prefs.muteSounds": function () {
LocalStorage.set('muteSounds', this.prefs.muteSounds);
},
"prefs.usePolling": function () {
LocalStorage.set('usePolling', this.prefs.usePolling);
// Reset the chat client on change.
this.resetChatClient();
},
"prefs.closeDMs": function () {
LocalStorage.set('closeDMs', this.prefs.closeDMs);
@ -469,6 +476,12 @@ export default {
},
},
computed: {
connected() {
if (this.client.connected != undefined) {
return this.client.connected();
}
return false;
},
chatHistory() {
if (this.channels[this.channel] == undefined) {
return [];
@ -772,6 +785,9 @@ export default {
}
// Misc preferences
if (settings.usePolling != undefined) {
this.prefs.usePolling = settings.usePolling === true;
}
if (settings.joinMessages != undefined) {
this.prefs.joinMessages = settings.joinMessages === true;
}
@ -827,7 +843,7 @@ export default {
return;
}
if (!this.client.connected()) {
if (!this.connected) {
this.ChatClient("You are not connected to the server.");
return;
}
@ -979,7 +995,7 @@ export default {
// Sync the current user state (such as video broadcasting status) to
// the backend, which will reload everybody's Who List.
sendMe() {
if (!this.client.connected()) return;
if (!this.connected) return;
this.client.send({
action: "me",
video: this.myVideoFlag,
@ -1288,12 +1304,12 @@ export default {
// Dial the WebSocket connection.
dial() {
this.ChatClient("Establishing connection to server...");
// Set up the ChatClient connection.
this.client = new ChatClient({
usePolling: this.prefs.usePolling,
onClientError: this.ChatClient,
username: this.username,
jwt: this.jwt,
prefs: this.prefs,
@ -1317,12 +1333,28 @@ export default {
},
pushHistory: this.pushHistory,
onNewJWT: jwt => {
this.jwt.token = msg.jwt;
this.ChatClient("new jwt: " + jwt);
this.jwt.token = jwt;
},
});
this.client.dial();
},
resetChatClient() {
if (!this.connected) return;
// Reset the ChatClient, e.g. when toggling between WebSocket vs. Polling methods.
this.ChatClient(
"Your connection method to the chat server has been updated; attempting to reconnect now.",
);
window.requestAnimationFrame(() => {
this.client.disconnect();
setTimeout(() => {
this.dial();
}, 1000);
});
},
/**
* WebRTC concerns.
@ -3367,6 +3399,31 @@ export default {
</p>
</div>
<div class="field">
<label class="label mb-0">
Server Connection Method
</label>
<label class="checkbox">
<input type="radio" v-model="prefs.usePolling" :value="false">
WebSockets (realtime connection; recommended for most people)
</label>
<label class="checkbox">
<input type="radio" v-model="prefs.usePolling" :value="true">
Polling (check for new messages every 5 seconds)
</label>
<p class="help">
By default the chat server requires a constant WebSockets connection to stay online.
If you are experiencing frequent disconnects (e.g. because you are on a slow or
unstable network connection), try switching to the "Polling" method which will be
more robust, at the cost of up to 5-seconds latency to receive new messages.
<!-- If disconnected currently, tell them to refresh. -->
<span v-if="!connected" class="has-text-danger">
Notice: you may need to refresh the chat page after changing this setting.
</span>
</p>
</div>
</div>
</div>

View File

@ -11,6 +11,7 @@ class ChatClient {
usePolling=false,
onClientError,
username,
jwt, // JWT token for authorization
prefs, // User preferences for 'me' action (close DMs, etc)
@ -40,6 +41,7 @@ class ChatClient {
// Pointer to the 'ChatClient(message)' command from the main app.
this.ChatClient = onClientError;
this.username = username;
this.jwt = jwt;
this.prefs = prefs;
@ -67,13 +69,25 @@ class ChatClient {
this.ws = {
conn: null,
connected: false,
// Disconnect spamming: don't retry too many times.
reconnect: true, // unless told to go away
disconnectLimit: 2,
disconnectCount: 0,
};
// Polling connection.
this.polling = {
username: "",
sessionID: "",
timeout: null, // setTimeout for next poll.
}
}
// Connected polls if the client is connected.
connected() {
if (this.usePolling) {
return true;
return this.polling.timeout != null && this.polling.sessionID != "";
}
return this.ws.connected;
}
@ -81,16 +95,47 @@ class ChatClient {
// Disconnect from the server.
disconnect() {
if (this.usePolling) {
throw new Exception("Not implemented");
this.polling.sessionID = "";
this.polling.username = "";
this.stopPolling();
this.ChatClient("You have disconnected from the server.");
return;
}
this.ws.conn.close();
this.ws.connected = false;
this.ws.conn.close(1000, "server asked to close the connection");
}
// Common function to send a message to the server. The message
// is a JSON object before stringify.
send(message) {
if (this.usePolling) {
throw new Exception("Not implemented");
fetch("/poll", {
method: "POST",
mode: "same-origin",
cache: "no-cache",
credentials: "same-origin",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({
username: this.polling.username,
session_id: this.polling.sessionID,
msg: message,
})
}).then(resp => resp.json()).then(resp => {
console.log(resp);
// Store sessionID information.
this.polling.sessionID = resp.session_id;
this.polling.username = resp.username;
for (let msg of resp.messages) {
this.handle(msg);
}
}).catch(err => {
this.ChatClient("Error from polling API: " + err);
});
return;
}
if (!this.ws.connected) {
@ -98,7 +143,6 @@ class ChatClient {
return;
}
console.log("send:", message);
if (typeof(message) !== "string") {
message = JSON.stringify(message);
}
@ -157,9 +201,8 @@ class ChatClient {
break;
case "disconnect":
this.onWho({ whoList: [] });
this.disconnect = true;
this.ws.connected = false;
this.ws.conn.close(1000, "server asked to close the connection");
this.ws.reconnect = false;
this.disconnect();
break;
case "ping":
// New JWT token?
@ -170,7 +213,7 @@ class ChatClient {
// Reset disconnect retry counter: if we were on long enough to get
// a ping, we're well connected and can reconnect no matter how many
// times the chat server is rebooted.
this.disconnectCount = 0;
this.ws.disconnectCount = 0;
break;
default:
console.error("Unexpected action: %s", JSON.stringify(msg));
@ -179,6 +222,21 @@ class ChatClient {
// Dial the WebSocket.
dial() {
// Polling API?
if (this.usePolling) {
this.ChatClient("Connecting to the server via polling API...");
this.startPolling();
// Log in now.
this.send({
action: "login",
username: this.username,
jwt: this.jwt.token,
dnd: this.prefs.closeDMs,
});
return;
}
this.ChatClient("Establishing connection to server...");
const proto = location.protocol === 'https:' ? 'wss' : 'ws';
@ -191,16 +249,22 @@ class ChatClient {
this.ws.connected = false;
this.ChatClient(`WebSocket Disconnected code: ${ev.code}, reason: ${ev.reason}`);
this.disconnectCount++;
if (this.disconnectCount > this.disconnectLimit) {
this.ChatClient(`It seems there's a problem connecting to the server. Please try some other time.`);
this.ws.disconnectCount++;
if (this.ws.disconnectCount > this.ws.disconnectLimit) {
this.ChatClient(
`It seems there's a problem connecting to the server. Please try some other time.<br><br>` +
`If you experience this problem frequently, try going into the Chat Settings 'Misc' tab ` +
`and switch to the 'Polling' Server Connection method.`
);
return;
}
if (!this.disconnect) {
if (this.ws.reconnect) {
if (ev.code !== 1001 && ev.code !== 1000) {
this.ChatClient("Reconnecting in 5s");
setTimeout(this.dial, 5000);
setTimeout(() => {
this.dial();
}, 5000);
}
}
});
@ -240,6 +304,36 @@ class ChatClient {
this.ws.conn = conn;
}
// Start the polling interval.
startPolling() {
if (!this.usePolling) return;
this.stopPolling();
this.polling.timeout = setTimeout(() => {
this.poll();
this.startPolling();
}, 5000);
}
// Poll the API.
poll() {
if (!this.usePolling) {
this.stopPolling();
return;
}
this.send({
action: "ping",
});
this.startPolling();
}
// Stop polling.
stopPolling() {
if (this.polling.timeout != null) {
clearTimeout(this.polling.timeout);
}
}
}
export default ChatClient;

View File

@ -18,6 +18,7 @@ const keys = {
'rememberExpresslyClosed': Boolean,
// Booleans
'usePolling': Boolean, // use the polling API instead of WebSocket
'joinMessages': Boolean,
'exitMessages': Boolean,
'watchNotif': Boolean,