diff --git a/pkg/polling_api.go b/pkg/polling_api.go new file mode 100644 index 0000000..7c85fde --- /dev/null +++ b/pkg/polling_api.go @@ -0,0 +1,240 @@ +package barertc + +import ( + "context" + "encoding/json" + "net/http" + "time" + + "git.kirsle.net/apps/barertc/pkg/log" + "git.kirsle.net/apps/barertc/pkg/messages" + "git.kirsle.net/apps/barertc/pkg/util" + "github.com/google/uuid" +) + +// Polling user timeout before disconnecting them. +const PollingUserTimeout = time.Minute + +// JSON payload structure for polling API. +type PollMessage struct { + // Send the username after authenticated. + Username string `json:"username,omitempty"` + + // SessionID for authentication. + SessionID string `json:"session_id,omitempty"` + + // BareRTC protocol message. + Message messages.Message `json:"msg"` +} + +type PollResponse struct { + // Session ID. + Username string `json:"username,omitempty"` + SessionID string `json:"session_id,omitempty"` + + // Pending messages. + Messages []messages.Message `json:"messages"` +} + +// Helper method to send an error as a PollResponse. +func PollResponseError(message string) PollResponse { + return PollResponse{ + Messages: []messages.Message{ + { + Action: messages.ActionError, + Username: "ChatServer", + Message: message, + }, + }, + } +} + +// KickIdlePollUsers is a goroutine that will disconnect polling API users +// who haven't been seen in a while. +func (s *Server) KickIdlePollUsers() { + log.Debug("KickIdlePollUsers goroutine engaged") + for { + time.Sleep(10 * time.Second) + for _, sub := range s.IterSubscribers() { + if sub.usePolling && time.Since(sub.lastPollAt) > PollingUserTimeout { + // Send an exit message. + if sub.authenticated && sub.ChatStatus != "hidden" { + log.Error("KickIdlePollUsers: %s last seen %s ago", sub.Username, sub.lastPollAt) + + sub.authenticated = false + s.Broadcast(messages.Message{ + Action: messages.ActionPresence, + Username: sub.Username, + Message: "has timed out!", + }) + s.SendWhoList() + } + + s.DeleteSubscriber(sub) + } + } + } +} + +// FlushPollResponse returns a response for the polling API that will flush +// all pending messages sent to the client. +func (sub *Subscriber) FlushPollResponse() PollResponse { + var msgs = []messages.Message{} + + // Drain the messages from the outbox channel. + for len(sub.messages) > 0 { + message := <-sub.messages + var msg messages.Message + json.Unmarshal(message, &msg) + msgs = append(msgs, msg) + } + + return PollResponse{ + Username: sub.Username, + SessionID: sub.sessionID, + Messages: msgs, + } +} + +// Functions for the Polling API as an alternative to WebSockets. +func (s *Server) PollingAPI() http.HandlerFunc { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + ip := util.IPAddress(r) + + // JSON writer for the response. + w.Header().Set("Content-Type", "application/json") + enc := json.NewEncoder(w) + enc.SetIndent("", " ") + + // Parse the request. + if r.Method != http.MethodPost { + w.WriteHeader(http.StatusBadRequest) + enc.Encode(PollResponseError("Only POST methods allowed")) + return + } else if r.Header.Get("Content-Type") != "application/json" { + w.WriteHeader(http.StatusBadRequest) + enc.Encode(PollResponseError("Only application/json content-types allowed")) + return + } + + defer r.Body.Close() + + // Parse the request payload. + var ( + params PollMessage + dec = json.NewDecoder(r.Body) + ) + if err := dec.Decode(¶ms); err != nil { + w.WriteHeader(http.StatusBadRequest) + enc.Encode(PollResponseError(err.Error())) + return + } + + // Debug logging. + log.Debug("Polling connection from %s - %s", ip, r.Header.Get("User-Agent")) + + // Are they resuming an authenticated session? + var sub *Subscriber + if params.Username != "" || params.SessionID != "" { + if params.Username == "" || params.SessionID == "" { + w.WriteHeader(http.StatusBadRequest) + enc.Encode(PollResponseError("Authentication error: SessionID and Username both required.")) + return + } + + log.Debug("Polling API: check if %s (%s) is authenticated", params.Username, params.SessionID) + + // Look up the subscriber. + var ( + authOK bool + err error + ) + sub, err = s.GetSubscriber(params.Username) + if err == nil { + // Validate the SessionID. + if sub.sessionID == params.SessionID { + authOK = true + } + } + + // Authentication error. + if !authOK { + s.DeleteSubscriber(sub) + w.WriteHeader(http.StatusBadRequest) + enc.Encode(PollResponse{ + Messages: []messages.Message{ + { + Action: messages.ActionError, + Username: "ChatServer", + Message: "Your authentication has expired, please log back into the chat again.", + }, + { + Action: messages.ActionKick, + }, + }, + }) + return + } + + // Ping their last seen time. + sub.lastPollAt = time.Now() + } + + // If they are authenticated, handle this message. + if sub != nil && sub.authenticated { + s.OnClientMessage(sub, params.Message) + + // If they use JWT authentication, give them a ping back with an updated + // JWT once in a while. Equivalent to the WebSockets pinger channel. + if time.Since(sub.lastPollJWT) > PingInterval { + sub.lastPollJWT = time.Now() + + if sub.JWTClaims != nil { + if jwt, err := sub.JWTClaims.ReSign(); err != nil { + log.Error("ReSign JWT token for %s#%d: %s", sub.Username, sub.ID, err) + } else { + sub.SendJSON(messages.Message{ + Action: messages.ActionPing, + JWTToken: jwt, + }) + } + } + } + + enc.Encode(sub.FlushPollResponse()) + return + } + + // Not authenticated: the only acceptable message is login. + if params.Message.Action != messages.ActionLogin { + w.WriteHeader(http.StatusBadRequest) + enc.Encode(PollResponseError("Not logged in.")) + return + } + + // Prepare a Subscriber object for them. Do not add it to the server + // roster unless their login succeeds. + ctx, cancel := context.WithCancel(r.Context()) + sub = s.NewPollingSubscriber(ctx, cancel) + + // Tentatively add them to the server. If they don't pass authentication, + // remove their subscriber immediately. Note: they need added here so they + // will receive their own "has entered the room" and WhoList updates. + s.AddSubscriber(sub) + + s.OnLogin(sub, params.Message) + + // Are they authenticated? + if sub.authenticated { + // Generate a SessionID number. + sessionID := uuid.New().String() + sub.sessionID = sessionID + + log.Debug("Polling API: new user authenticated in: %s (sid %s)", sub.Username, sub.sessionID) + } else { + s.DeleteSubscriber(sub) + } + + enc.Encode(sub.FlushPollResponse()) + }) +} diff --git a/pkg/server.go b/pkg/server.go index 1511c2a..eab6228 100644 --- a/pkg/server.go +++ b/pkg/server.go @@ -38,6 +38,7 @@ func (s *Server) Setup() error { mux.Handle("/about", AboutPage()) mux.Handle("/logout", LogoutPage()) mux.Handle("/ws", s.WebSocket()) + mux.Handle("/poll", s.PollingAPI()) mux.Handle("/api/statistics", s.Statistics()) mux.Handle("/api/blocklist", s.BlockList()) mux.Handle("/api/block/now", s.BlockNow()) @@ -54,5 +55,7 @@ func (s *Server) Setup() error { // ListenAndServe starts the web server. func (s *Server) ListenAndServe(address string) error { + // Run the polling user idle kicker. + go s.KickIdlePollUsers() return http.ListenAndServe(address, s.mux) } diff --git a/pkg/websocket.go b/pkg/websocket.go index e2ad059..8661943 100644 --- a/pkg/websocket.go +++ b/pkg/websocket.go @@ -31,11 +31,19 @@ type Subscriber struct { JWTClaims *jwt.Claims authenticated bool // has passed the login step loginAt time.Time - conn *websocket.Conn - ctx context.Context - cancel context.CancelFunc - messages chan []byte - closeSlow func() + + // Connection details (WebSocket). + conn *websocket.Conn // WebSocket user + ctx context.Context + cancel context.CancelFunc + messages chan []byte + closeSlow func() + + // Polling API users. + usePolling bool + sessionID string + lastPollAt time.Time + lastPollJWT time.Time // give a new JWT once in a while muteMu sync.RWMutex booted map[string]struct{} // usernames booted off your camera @@ -51,6 +59,100 @@ type Subscriber struct { logfh map[string]io.WriteCloser } +// NewSubscriber initializes a connected chat user. +func (s *Server) NewSubscriber(ctx context.Context, cancelFunc func()) *Subscriber { + return &Subscriber{ + ctx: ctx, + cancel: cancelFunc, + messages: make(chan []byte, s.subscriberMessageBuffer), + booted: make(map[string]struct{}), + muted: make(map[string]struct{}), + blocked: make(map[string]struct{}), + messageIDs: make(map[int64]struct{}), + ChatStatus: "online", + } +} + +// NewWebSocketSubscriber returns a new subscriber with a WebSocket connection. +func (s *Server) NewWebSocketSubscriber(ctx context.Context, conn *websocket.Conn, cancelFunc func()) *Subscriber { + sub := s.NewSubscriber(ctx, cancelFunc) + sub.conn = conn + sub.closeSlow = func() { + conn.Close(websocket.StatusPolicyViolation, "connection too slow to keep up with messages") + } + return sub +} + +// NewPollingSubscriber returns a new subscriber using the polling API. +func (s *Server) NewPollingSubscriber(ctx context.Context, cancelFunc func()) *Subscriber { + sub := s.NewSubscriber(ctx, cancelFunc) + sub.usePolling = true + sub.lastPollAt = time.Now() + sub.lastPollJWT = time.Now() + sub.closeSlow = func() { + // Their outbox is filled up, disconnect them. + log.Error("Polling subscriber %s#%d: inbox is filled up!", sub.Username, sub.ID) + + // Send an exit message. + if sub.authenticated && sub.ChatStatus != "hidden" { + sub.authenticated = false + s.Broadcast(messages.Message{ + Action: messages.ActionPresence, + Username: sub.Username, + Message: "has exited the room!", + }) + s.SendWhoList() + } + + s.DeleteSubscriber(sub) + } + return sub +} + +// OnClientMessage handles a chat protocol message from the user's WebSocket or polling API. +func (s *Server) OnClientMessage(sub *Subscriber, msg messages.Message) { + // What action are they performing? + switch msg.Action { + case messages.ActionLogin: + s.OnLogin(sub, msg) + case messages.ActionMessage: + s.OnMessage(sub, msg) + case messages.ActionFile: + s.OnFile(sub, msg) + case messages.ActionMe: + s.OnMe(sub, msg) + case messages.ActionOpen: + s.OnOpen(sub, msg) + case messages.ActionBoot: + s.OnBoot(sub, msg, true) + case messages.ActionUnboot: + s.OnBoot(sub, msg, false) + case messages.ActionMute, messages.ActionUnmute: + s.OnMute(sub, msg, msg.Action == messages.ActionMute) + case messages.ActionBlock: + s.OnBlock(sub, msg) + case messages.ActionBlocklist: + s.OnBlocklist(sub, msg) + case messages.ActionCandidate: + s.OnCandidate(sub, msg) + case messages.ActionSDP: + s.OnSDP(sub, msg) + case messages.ActionWatch: + s.OnWatch(sub, msg) + case messages.ActionUnwatch: + s.OnUnwatch(sub, msg) + case messages.ActionTakeback: + s.OnTakeback(sub, msg) + case messages.ActionReact: + s.OnReact(sub, msg) + case messages.ActionReport: + s.OnReport(sub, msg) + case messages.ActionPing: + default: + sub.ChatServer("Unsupported message type.") + } +} + // ReadLoop spawns a goroutine that reads from the websocket connection. func (sub *Subscriber) ReadLoop(s *Server) { go func() { @@ -88,45 +190,8 @@ func (sub *Subscriber) ReadLoop(s *Server) { log.Debug("Read(%d=%s): %s", sub.ID, sub.Username, data) } - // What action are they performing? - switch msg.Action { - case messages.ActionLogin: - s.OnLogin(sub, msg) - case messages.ActionMessage: - s.OnMessage(sub, msg) - case messages.ActionFile: - s.OnFile(sub, msg) - case messages.ActionMe: - s.OnMe(sub, msg) - case messages.ActionOpen: - s.OnOpen(sub, msg) - case messages.ActionBoot: - s.OnBoot(sub, msg, true) - case messages.ActionUnboot: - s.OnBoot(sub, msg, false) - case messages.ActionMute, messages.ActionUnmute: - s.OnMute(sub, msg, msg.Action == messages.ActionMute) - case messages.ActionBlock: - s.OnBlock(sub, msg) - case messages.ActionBlocklist: - s.OnBlocklist(sub, msg) - case messages.ActionCandidate: - s.OnCandidate(sub, msg) - case messages.ActionSDP: - s.OnSDP(sub, msg) - case messages.ActionWatch: - s.OnWatch(sub, msg) - case messages.ActionUnwatch: - s.OnUnwatch(sub, msg) - case messages.ActionTakeback: - s.OnTakeback(sub, msg) - case messages.ActionReact: - s.OnReact(sub, msg) - case messages.ActionReport: - s.OnReport(sub, msg) - default: - sub.ChatServer("Unsupported message type.") - } + // Handle their message. + s.OnClientMessage(sub, msg) } }() } @@ -202,20 +267,7 @@ func (s *Server) WebSocket() http.HandlerFunc { // ctx := c.CloseRead(r.Context()) ctx, cancel := context.WithCancel(r.Context()) - sub := &Subscriber{ - conn: c, - ctx: ctx, - cancel: cancel, - messages: make(chan []byte, s.subscriberMessageBuffer), - closeSlow: func() { - c.Close(websocket.StatusPolicyViolation, "connection too slow to keep up with messages") - }, - booted: make(map[string]struct{}), - muted: make(map[string]struct{}), - blocked: make(map[string]struct{}), - messageIDs: make(map[int64]struct{}), - ChatStatus: "online", - } + sub := s.NewWebSocketSubscriber(ctx, c, cancel) s.AddSubscriber(sub) defer s.DeleteSubscriber(sub) @@ -280,6 +332,10 @@ func (s *Server) GetSubscriber(username string) (*Subscriber, error) { // DeleteSubscriber removes a subscriber from the server. func (s *Server) DeleteSubscriber(sub *Subscriber) { + if sub == nil { + return + } + log.Error("DeleteSubscriber: %s", sub.Username) // Cancel its context to clean up the for-loop goroutine. diff --git a/src/App.vue b/src/App.vue index d4f22ca..7393ad8 100644 --- a/src/App.vue +++ b/src/App.vue @@ -143,6 +143,7 @@ export default { // Misc. user preferences (TODO: move all of them here) prefs: { + usePolling: false, // use the polling API instead of WebSockets. joinMessages: true, // show "has entered the room" in public channels exitMessages: false, // hide exit messages by default in public channels watchNotif: true, // notify in chat about cameras being watched @@ -461,6 +462,12 @@ export default { "prefs.muteSounds": function () { LocalStorage.set('muteSounds', this.prefs.muteSounds); }, + "prefs.usePolling": function () { + LocalStorage.set('usePolling', this.prefs.usePolling); + + // Reset the chat client on change. + this.resetChatClient(); + }, "prefs.closeDMs": function () { LocalStorage.set('closeDMs', this.prefs.closeDMs); @@ -469,6 +476,12 @@ export default { }, }, computed: { + connected() { + if (this.client.connected != undefined) { + return this.client.connected(); + } + return false; + }, chatHistory() { if (this.channels[this.channel] == undefined) { return []; @@ -772,6 +785,9 @@ export default { } // Misc preferences + if (settings.usePolling != undefined) { + this.prefs.usePolling = settings.usePolling === true; + } if (settings.joinMessages != undefined) { this.prefs.joinMessages = settings.joinMessages === true; } @@ -827,7 +843,7 @@ export default { return; } - if (!this.client.connected()) { + if (!this.connected) { this.ChatClient("You are not connected to the server."); return; } @@ -979,7 +995,7 @@ export default { // Sync the current user state (such as video broadcasting status) to // the backend, which will reload everybody's Who List. sendMe() { - if (!this.client.connected()) return; + if (!this.connected) return; this.client.send({ action: "me", video: this.myVideoFlag, @@ -1288,12 +1304,12 @@ export default { // Dial the WebSocket connection. dial() { - this.ChatClient("Establishing connection to server..."); - // Set up the ChatClient connection. this.client = new ChatClient({ + usePolling: this.prefs.usePolling, onClientError: this.ChatClient, + username: this.username, jwt: this.jwt, prefs: this.prefs, @@ -1317,12 +1333,28 @@ export default { }, pushHistory: this.pushHistory, onNewJWT: jwt => { - this.jwt.token = msg.jwt; + this.ChatClient("new jwt: " + jwt); + this.jwt.token = jwt; }, }); this.client.dial(); }, + resetChatClient() { + if (!this.connected) return; + + // Reset the ChatClient, e.g. when toggling between WebSocket vs. Polling methods. + this.ChatClient( + "Your connection method to the chat server has been updated; attempting to reconnect now.", + ); + + window.requestAnimationFrame(() => { + this.client.disconnect(); + setTimeout(() => { + this.dial(); + }, 1000); + }); + }, /** * WebRTC concerns. @@ -3367,6 +3399,31 @@ export default {

+
+ + + +

+ By default the chat server requires a constant WebSockets connection to stay online. + If you are experiencing frequent disconnects (e.g. because you are on a slow or + unstable network connection), try switching to the "Polling" method which will be + more robust, at the cost of up to 5-seconds latency to receive new messages. + + + + Notice: you may need to refresh the chat page after changing this setting. + +

+
+ diff --git a/src/lib/ChatClient.js b/src/lib/ChatClient.js index b82427c..08385a3 100644 --- a/src/lib/ChatClient.js +++ b/src/lib/ChatClient.js @@ -11,6 +11,7 @@ class ChatClient { usePolling=false, onClientError, + username, jwt, // JWT token for authorization prefs, // User preferences for 'me' action (close DMs, etc) @@ -40,6 +41,7 @@ class ChatClient { // Pointer to the 'ChatClient(message)' command from the main app. this.ChatClient = onClientError; + this.username = username; this.jwt = jwt; this.prefs = prefs; @@ -67,13 +69,25 @@ class ChatClient { this.ws = { conn: null, connected: false, + + // Disconnect spamming: don't retry too many times. + reconnect: true, // unless told to go away + disconnectLimit: 2, + disconnectCount: 0, }; + + // Polling connection. + this.polling = { + username: "", + sessionID: "", + timeout: null, // setTimeout for next poll. + } } // Connected polls if the client is connected. connected() { if (this.usePolling) { - return true; + return this.polling.timeout != null && this.polling.sessionID != ""; } return this.ws.connected; } @@ -81,16 +95,47 @@ class ChatClient { // Disconnect from the server. disconnect() { if (this.usePolling) { - throw new Exception("Not implemented"); + this.polling.sessionID = ""; + this.polling.username = ""; + this.stopPolling(); + this.ChatClient("You have disconnected from the server."); + return; } - this.ws.conn.close(); + + this.ws.connected = false; + this.ws.conn.close(1000, "server asked to close the connection"); } // Common function to send a message to the server. The message // is a JSON object before stringify. send(message) { if (this.usePolling) { - throw new Exception("Not implemented"); + fetch("/poll", { + method: "POST", + mode: "same-origin", + cache: "no-cache", + credentials: "same-origin", + headers: { + "Content-Type": "application/json", + }, + body: JSON.stringify({ + username: this.polling.username, + session_id: this.polling.sessionID, + msg: message, + }) + }).then(resp => resp.json()).then(resp => { + console.log(resp); + + // Store sessionID information. + this.polling.sessionID = resp.session_id; + this.polling.username = resp.username; + for (let msg of resp.messages) { + this.handle(msg); + } + }).catch(err => { + this.ChatClient("Error from polling API: " + err); + }); + return; } if (!this.ws.connected) { @@ -98,7 +143,6 @@ class ChatClient { return; } - console.log("send:", message); if (typeof(message) !== "string") { message = JSON.stringify(message); } @@ -157,9 +201,8 @@ class ChatClient { break; case "disconnect": this.onWho({ whoList: [] }); - this.disconnect = true; - this.ws.connected = false; - this.ws.conn.close(1000, "server asked to close the connection"); + this.ws.reconnect = false; + this.disconnect(); break; case "ping": // New JWT token? @@ -170,7 +213,7 @@ class ChatClient { // Reset disconnect retry counter: if we were on long enough to get // a ping, we're well connected and can reconnect no matter how many // times the chat server is rebooted. - this.disconnectCount = 0; + this.ws.disconnectCount = 0; break; default: console.error("Unexpected action: %s", JSON.stringify(msg)); @@ -179,6 +222,21 @@ class ChatClient { // Dial the WebSocket. dial() { + // Polling API? + if (this.usePolling) { + this.ChatClient("Connecting to the server via polling API..."); + this.startPolling(); + + // Log in now. + this.send({ + action: "login", + username: this.username, + jwt: this.jwt.token, + dnd: this.prefs.closeDMs, + }); + return; + } + this.ChatClient("Establishing connection to server..."); const proto = location.protocol === 'https:' ? 'wss' : 'ws'; @@ -191,16 +249,22 @@ class ChatClient { this.ws.connected = false; this.ChatClient(`WebSocket Disconnected code: ${ev.code}, reason: ${ev.reason}`); - this.disconnectCount++; - if (this.disconnectCount > this.disconnectLimit) { - this.ChatClient(`It seems there's a problem connecting to the server. Please try some other time.`); + this.ws.disconnectCount++; + if (this.ws.disconnectCount > this.ws.disconnectLimit) { + this.ChatClient( + `It seems there's a problem connecting to the server. Please try some other time.

` + + `If you experience this problem frequently, try going into the Chat Settings 'Misc' tab ` + + `and switch to the 'Polling' Server Connection method.` + ); return; } - if (!this.disconnect) { + if (this.ws.reconnect) { if (ev.code !== 1001 && ev.code !== 1000) { this.ChatClient("Reconnecting in 5s"); - setTimeout(this.dial, 5000); + setTimeout(() => { + this.dial(); + }, 5000); } } }); @@ -240,6 +304,36 @@ class ChatClient { this.ws.conn = conn; } + + // Start the polling interval. + startPolling() { + if (!this.usePolling) return; + this.stopPolling(); + + this.polling.timeout = setTimeout(() => { + this.poll(); + this.startPolling(); + }, 5000); + } + + // Poll the API. + poll() { + if (!this.usePolling) { + this.stopPolling(); + return; + } + this.send({ + action: "ping", + }); + this.startPolling(); + } + + // Stop polling. + stopPolling() { + if (this.polling.timeout != null) { + clearTimeout(this.polling.timeout); + } + } } export default ChatClient; diff --git a/src/lib/LocalStorage.js b/src/lib/LocalStorage.js index 37134b1..0124931 100644 --- a/src/lib/LocalStorage.js +++ b/src/lib/LocalStorage.js @@ -18,6 +18,7 @@ const keys = { 'rememberExpresslyClosed': Boolean, // Booleans + 'usePolling': Boolean, // use the polling API instead of WebSocket 'joinMessages': Boolean, 'exitMessages': Boolean, 'watchNotif': Boolean,