diff --git a/client/deadlock_watch.go b/client/deadlock_watch.go new file mode 100644 index 0000000..21443ff --- /dev/null +++ b/client/deadlock_watch.go @@ -0,0 +1,95 @@ +package client + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net/http" + "strings" + "time" + + "git.kirsle.net/apps/barertc/client/config" + "git.kirsle.net/apps/barertc/pkg/log" + "git.kirsle.net/apps/barertc/pkg/messages" +) + +const deadlockTTL = time.Minute + +/* +Deadlock detection for the chat server. + +Part of the chatbot handlers. The bot will send DMs to itself on an interval +and test whether the server is responsive; if it goes down, it will issue the +/api/shutdown command to reboot the server automatically. + +This function is a goroutine spawned in the background. +*/ +func (h *BotHandlers) watchForDeadlock() { + log.Info("Deadlock monitor engaged!") + h.deadlockLastOK = time.Now() + + for { + time.Sleep(15 * time.Second) + h.client.Send(messages.Message{ + Action: messages.ActionMessage, + Channel: "@" + h.client.Username(), + Message: "deadlock ping", + }) + + // Has it been a while since our last ping? + if time.Since(h.deadlockLastOK) > deadlockTTL { + log.Error("Deadlock detected! Rebooting the chat server!") + h.deadlockLastOK = time.Now() + h.rebootChatServer() + } + } +} + +// onMessageFromSelf handles DMs sent to ourself, e.g. for deadlock detection. +func (h *BotHandlers) onMessageFromSelf(msg messages.Message) { + // If it is our own DM channel thread, it's for deadlock detection. + if msg.Channel == "@"+h.client.Username() { + h.deadlockLastOK = time.Now() + } +} + +// Reboot the chat server via web API, in case of deadlock. +func (h *BotHandlers) rebootChatServer() error { + // API request struct for BareRTC /api/shutdown endpoint. + var request = struct { + APIKey string + }{ + APIKey: config.Current.BareRTC.AdminAPIKey, + } + + // JSON request body. + jsonStr, err := json.Marshal(request) + if err != nil { + return err + } + + // Make the API request to BareRTC. + var url = strings.TrimSuffix(config.Current.BareRTC.URL, "/") + "/api/shutdown" + req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonStr)) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + + client := &http.Client{ + Timeout: 10 * time.Second, + } + resp, err := client.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + body, _ := io.ReadAll(resp.Body) + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("RebootChatServer: error posting to BareRTC: status %d body %s", resp.StatusCode, body) + } + + return nil +} diff --git a/client/handlers.go b/client/handlers.go index d5f872e..b78dd08 100644 --- a/client/handlers.go +++ b/client/handlers.go @@ -62,6 +62,10 @@ type BotHandlers struct { // so we don't accidentally take back our own reactions. reactions map[int]map[string]interface{} reactionsMu sync.Mutex + + // Deadlock detection (deadlock_watch.go): record time of last successful + // ping to self, to detect when the server is deadlocked. + deadlockLastOK time.Time } // SetupChatbot configures a sensible set of default handlers for the BareBot application. @@ -105,6 +109,9 @@ func (c *Client) SetupChatbot() error { c.OnDisconnect = handler.OnDisconnect c.OnPing = handler.OnPing + // Watch for deadlocks. + go handler.watchForDeadlock() + return nil } @@ -157,6 +164,7 @@ func (h *BotHandlers) OnMessage(msg messages.Message) { // Ignore echoed message from ourself. if msg.Username == h.client.Username() { + h.onMessageFromSelf(msg) return } diff --git a/cmd/BareBot/commands/run.go b/cmd/BareBot/commands/run.go index 5d5144c..c9956c6 100644 --- a/cmd/BareBot/commands/run.go +++ b/cmd/BareBot/commands/run.go @@ -4,6 +4,7 @@ import ( "fmt" "os" "path/filepath" + "time" "git.kirsle.net/apps/barertc/client" "git.kirsle.net/apps/barertc/client/config" @@ -74,6 +75,11 @@ func init() { // Run! log.Info("Connecting to ChatServer") + err = client.Run() + if err != nil { + log.Error("Error: %s (and sleeping 5 seconds before exit)", err) + time.Sleep(5 * time.Second) + } return cli.Exit(client.Run(), 1) }, } diff --git a/docs/API.md b/docs/API.md index 0eda7aa..939d909 100644 --- a/docs/API.md +++ b/docs/API.md @@ -50,6 +50,31 @@ The return schema looks like: } ``` +## POST /api/shutdown + +Shut down (and hopefully, reboot) the chat server. It is equivalent to the `/shutdown` operator command issued in chat, but callable from your web application. It is also used as part of deadlock detection on the BareBot chatbot. + +It requires the AdminAPIKey to post: + +```json +{ + "APIKey": "from settings.toml" +} +``` + +The return schema looks like: + +```json +{ + "OK": true, + "Error": "error string, omitted if none" +} +``` + +The HTTP server will respond OK, and then shut down a couple of seconds later, attempting to send a ChatServer broadcast first (as in the `/shutdown` command). If the chat server is deadlocked, this broadcast won't go out but the program will still exit. + +It is up to your process supervisor to automatically restart BareRTC when it exits. + ## POST /api/blocklist Your server may pre-cache the user's blocklist for them **before** they diff --git a/pkg/api.go b/pkg/api.go index d15aa5a..b0ca0da 100644 --- a/pkg/api.go +++ b/pkg/api.go @@ -3,8 +3,10 @@ package barertc import ( "encoding/json" "net/http" + "os" "strings" "sync" + "time" "git.kirsle.net/apps/barertc/pkg/config" "git.kirsle.net/apps/barertc/pkg/jwt" @@ -174,6 +176,101 @@ func (s *Server) Authenticate() http.HandlerFunc { }) } +// Shutdown (/api/shutdown) the chat server, hopefully to reboot it. +// +// This endpoint is equivalent to the operator '/shutdown' command but may be +// invoked by your website, or your chatbot. It requires the AdminAPIKey. +// +// It is a POST request with a json body containing the following schema: +// +// { +// "APIKey": "from settings.toml", +// } +// +// The return schema looks like: +// +// { +// "OK": true, +// "Error": "error string, omitted if none", +// } +func (s *Server) ShutdownAPI() http.HandlerFunc { + type request struct { + APIKey string + Claims jwt.Claims + } + + type result struct { + OK bool + Error string `json:",omitempty"` + } + + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // JSON writer for the response. + w.Header().Set("Content-Type", "application/json") + enc := json.NewEncoder(w) + enc.SetIndent("", " ") + + // Parse the request. + if r.Method != http.MethodPost { + w.WriteHeader(http.StatusBadRequest) + enc.Encode(result{ + Error: "Only POST methods allowed", + }) + return + } else if r.Header.Get("Content-Type") != "application/json" { + w.WriteHeader(http.StatusBadRequest) + enc.Encode(result{ + Error: "Only application/json content-types allowed", + }) + return + } + + defer r.Body.Close() + + // Parse the request payload. + var ( + params request + dec = json.NewDecoder(r.Body) + ) + if err := dec.Decode(¶ms); err != nil { + w.WriteHeader(http.StatusBadRequest) + enc.Encode(result{ + Error: err.Error(), + }) + return + } + + // Validate the API key. + if params.APIKey != config.Current.AdminAPIKey { + w.WriteHeader(http.StatusUnauthorized) + enc.Encode(result{ + Error: "Authentication denied.", + }) + return + } + + // Send the response. + enc.Encode(result{ + OK: true, + }) + + // Defer a shutdown a moment later. + go func() { + time.Sleep(2 * time.Second) + os.Exit(1) + }() + + // Attempt to broadcast, but if deadlocked this might not go out. + go func() { + s.Broadcast(messages.Message{ + Action: messages.ActionError, + Username: "ChatServer", + Message: "The chat server is going down for a reboot NOW!", + }) + }() + }) +} + // BlockList (/api/blocklist) allows your website to pre-sync mute lists between your // user accounts, so that when they see each other in chat they will pre-emptively mute // or boot one another. diff --git a/pkg/commands.go b/pkg/commands.go index 008501f..e9160eb 100644 --- a/pkg/commands.go +++ b/pkg/commands.go @@ -83,6 +83,19 @@ func (s *Server) ProcessCommand(sub *Subscriber, msg messages.Message) bool { case "/deop": s.DeopCommand(words, sub) return true + case "/debug-dangerous-force-deadlock": + // TEMPORARY debug command to willfully force a deadlock. + s.Broadcast(messages.Message{ + Action: messages.ActionError, + Username: "ChatServer", + Message: "NOTICE: The admin is testing a force deadlock of the chat server; things may become unresponsive soon.", + }) + go func() { + time.Sleep(2 * time.Second) + s.subscribersMu.Lock() + s.subscribersMu.Lock() + }() + return true } } diff --git a/pkg/handlers.go b/pkg/handlers.go index d502b9f..88ab71c 100644 --- a/pkg/handlers.go +++ b/pkg/handlers.go @@ -89,6 +89,7 @@ func (s *Server) OnLogin(sub *Subscriber, msg messages.Message) { // Use their username. sub.Username = msg.Username sub.authenticated = true + sub.DND = msg.DND sub.loginAt = time.Now() log.Debug("OnLogin: %s joins the room", sub.Username) @@ -144,9 +145,8 @@ func (s *Server) OnMessage(sub *Subscriber, msg messages.Message) { markdown = s.ExpandMedia(markdown) // Assign a message ID and own it to the sender. - messages.MessageID++ - var mid = messages.MessageID sub.midMu.Lock() + var mid = messages.NextMessageID() sub.messageIDs[mid] = struct{}{} sub.midMu.Unlock() @@ -194,8 +194,9 @@ func (s *Server) OnTakeback(sub *Subscriber, msg messages.Message) { // Permission check. if sub.JWTClaims == nil || !sub.JWTClaims.IsAdmin { sub.midMu.Lock() - defer sub.midMu.Unlock() - if _, ok := sub.messageIDs[msg.MessageID]; !ok { + _, ok := sub.messageIDs[msg.MessageID] + sub.midMu.Unlock() + if !ok { sub.ChatServer("That is not your message to take back.") return } @@ -249,9 +250,8 @@ func (s *Server) OnFile(sub *Subscriber, msg messages.Message) { var dataURL = fmt.Sprintf("data:%s;base64,%s", filetype, base64.StdEncoding.EncodeToString(img)) // Assign a message ID and own it to the sender. - messages.MessageID++ - var mid = messages.MessageID sub.midMu.Lock() + var mid = messages.NextMessageID() sub.messageIDs[mid] = struct{}{} sub.midMu.Unlock() @@ -329,6 +329,7 @@ func (s *Server) OnMe(sub *Subscriber, msg messages.Message) { sub.VideoStatus = msg.VideoStatus sub.ChatStatus = msg.ChatStatus + sub.DND = msg.DND // Sync the WhoList to everybody. s.SendWhoList() diff --git a/pkg/messages/messages.go b/pkg/messages/messages.go index 4bc47af..f16eea6 100644 --- a/pkg/messages/messages.go +++ b/pkg/messages/messages.go @@ -1,7 +1,21 @@ package messages +import "sync" + // Auto incrementing Message ID for anything pushed out by the server. -var MessageID int +var ( + messageID int + mu sync.Mutex +) + +// NextMessageID atomically increments and returns a new MessageID. +func NextMessageID() int { + mu.Lock() + defer mu.Unlock() + messageID++ + var mid = messageID + return mid +} /* Message is the basic carrier of WebSocket chat protocol actions. @@ -25,6 +39,7 @@ type Message struct { // Sent on `me` actions along with Username VideoStatus int `json:"video,omitempty"` // user video flags ChatStatus string `json:"status,omitempty"` // online vs. away + DND bool `json:"dnd,omitempty"` // Do Not Disturb, e.g. DMs are closed // Message ID to support takebacks/local deletions MessageID int `json:"msgID,omitempty"` @@ -87,6 +102,7 @@ type WhoList struct { Nickname string `json:"nickname,omitempty"` Status string `json:"status"` Video int `json:"video"` + DND bool `json:"dnd,omitempty"` LoginAt int64 `json:"loginAt"` // JWT auth extra settings. diff --git a/pkg/server.go b/pkg/server.go index 2d04650..dc03544 100644 --- a/pkg/server.go +++ b/pkg/server.go @@ -36,6 +36,7 @@ func (s *Server) Setup() error { mux.Handle("/api/statistics", s.Statistics()) mux.Handle("/api/blocklist", s.BlockList()) mux.Handle("/api/authenticate", s.Authenticate()) + mux.Handle("/api/shutdown", s.ShutdownAPI()) mux.Handle("/static/", http.StripPrefix("/static/", http.FileServer(http.Dir("web/static")))) s.mux = mux diff --git a/pkg/websocket.go b/pkg/websocket.go index 4b2e2f7..09a173f 100644 --- a/pkg/websocket.go +++ b/pkg/websocket.go @@ -26,6 +26,7 @@ type Subscriber struct { Username string ChatStatus string VideoStatus int + DND bool // Do Not Disturb status (DMs are closed) JWTClaims *jwt.Claims authenticated bool // has passed the login step loginAt time.Time @@ -266,20 +267,16 @@ func (s *Server) DeleteSubscriber(sub *Subscriber) { s.subscribersMu.Unlock() } -// IterSubscribers loops over the subscriber list with a read lock. If the -// caller already holds a lock, pass the optional `true` parameter for isLocked. -func (s *Server) IterSubscribers(isLocked ...bool) []*Subscriber { +// IterSubscribers loops over the subscriber list with a read lock. +func (s *Server) IterSubscribers() []*Subscriber { var result = []*Subscriber{} - // Has the caller already taken the read lock or do we get it? - if locked := len(isLocked) > 0 && isLocked[0]; !locked { - s.subscribersMu.RLock() - defer s.subscribersMu.RUnlock() - } - + // Lock for reads. + s.subscribersMu.RLock() for sub := range s.subscribers { result = append(result, sub) } + s.subscribersMu.RUnlock() return result } @@ -399,6 +396,7 @@ func (s *Server) SendWhoList() { Username: user.Username, Status: user.ChatStatus, Video: user.VideoStatus, + DND: user.DND, LoginAt: user.loginAt.Unix(), } diff --git a/web/static/js/BareRTC.js b/web/static/js/BareRTC.js index c672fe2..73a4fca 100644 --- a/web/static/js/BareRTC.js +++ b/web/static/js/BareRTC.js @@ -103,6 +103,7 @@ const app = Vue.createApp({ username: "", //"test", autoLogin: false, // e.g. from JWT auth message: "", + messageBox: null, // HTML element for message entry box typingNotifDebounce: null, status: "online", // away/idle status @@ -127,6 +128,7 @@ const app = Vue.createApp({ prefs: { joinMessages: true, // show "has entered the room" in public channels exitMessages: false, // hide exit messages by default in public channels + watchNotif: true, // notify in chat about cameras being watched closeDMs: false, // ignore unsolicited DMs }, @@ -290,6 +292,7 @@ const app = Vue.createApp({ $center: document.querySelector(".chat-column"), $right: document.querySelector(".right-column"), }; + this.messageBox = document.getElementById("messageBox"); // Reset CSS overrides for responsive display on any window size change. In effect, // making the chat panel the current screen again on phone rotation. @@ -387,8 +390,14 @@ const app = Vue.createApp({ "prefs.exitMessages": function() { localStorage.exitMessages = this.prefs.exitMessages; }, + "prefs.watchNotif": function() { + localStorage.watchNotif = this.prefs.watchNotif; + }, "prefs.closeDMs": function() { localStorage.closeDMs = this.prefs.closeDMs; + + // Tell ChatServer if we have gone to/from DND. + this.sendMe(); }, }, computed: { @@ -588,6 +597,9 @@ const app = Vue.createApp({ if (localStorage.exitMessages != undefined) { this.prefs.exitMessages = localStorage.exitMessages === "true"; } + if (localStorage.watchNotif != undefined) { + this.prefs.watchNotif = localStorage.watchNotif === "true"; + } if (localStorage.closeDMs != undefined) { this.prefs.closeDMs = localStorage.closeDMs === "true"; } @@ -686,10 +698,12 @@ const app = Vue.createApp({ // Sync the current user state (such as video broadcasting status) to // the backend, which will reload everybody's Who List. sendMe() { + if (!this.ws.connected) return; this.ws.conn.send(JSON.stringify({ action: "me", video: this.myVideoFlag, status: this.status, + dnd: this.prefs.closeDMs, })); }, onMe(msg) { @@ -974,7 +988,13 @@ const app = Vue.createApp({ action: "login", username: this.username, jwt: this.jwt.token, + dnd: this.prefs.closeDMs, })); + + // Focus the message entry box. + window.requestAnimationFrame(() => { + this.messageBox.focus(); + }); }); conn.addEventListener("message", ev => { @@ -1252,6 +1272,14 @@ const app = Vue.createApp({ onWatch(msg) { // The user has our video feed open now. if (this.isBootedAdmin(msg.username)) return; + + // Notify in chat if this was the first watch (viewer may send multiple per each track they received) + if (this.prefs.watchNotif && this.webcam.watching[msg.username] != true) { + this.ChatServer( + `${msg.username} is now watching your camera.`, + ); + } + this.webcam.watching[msg.username] = true; this.playSound("Watch"); }, @@ -1295,6 +1323,9 @@ const app = Vue.createApp({ // Edit hyperlinks to open in a new window. this.makeLinksExternal(); + + // Focus the message entry box. + this.messageBox.focus(); }, hasUnread(channel) { if (this.channels[channel] == undefined) { @@ -1382,6 +1413,10 @@ const app = Vue.createApp({ } return username; }, + isUsernameDND(username) { + if (!username) return false; + return this.whoMap[username] != undefined && this.whoMap[username].dnd; + }, isUsernameCamNSFW(username) { // returns true if the username is broadcasting and NSFW, false otherwise. // (used for the color coding of their nickname on their video box - so assumed they are broadcasting) @@ -1490,8 +1525,8 @@ const app = Vue.createApp({ let mediaParams = { audio: true, video: { - width: { max: 1280 }, - height: { max: 720 }, + width: { max: 640 }, + height: { max: 480 }, }, }; diff --git a/web/templates/chat.html b/web/templates/chat.html index c76ab3f..3ab836f 100644 --- a/web/templates/chat.html +++ b/web/templates/chat.html @@ -373,7 +373,7 @@
- +
-

- Whether to show 'has joined the room' style messages in public channels. -

+
+ +
+ +
@@ -1114,7 +1121,8 @@ v-if="!(msg.username === username || isDM)" class="button is-grey is-outlined is-small px-2" @click="openDMs({username: msg.username})" - title="Open a Direct Message (DM) thread"> + :title="isUsernameDND(msg.username) ? 'This person is not accepting new DMs' : 'Open a Direct Message (DM) thread'" + :disabled="isUsernameDND(msg.username)"> @@ -1250,6 +1258,7 @@ :class="{'pl-1': canUploadFile}">
+ :disabled="u.username === username || u.dnd" + :title="u.dnd ? 'This person is not accepting new DMs' : 'Send a Direct Message'">