BareRTC/pkg/websocket.go
Noah Petherbridge 9c77bdb62e New Op commands and fixes with blocking admin users
Add moderation rules:

* You can apply rules in the settings.toml to enforce moderator restrictions on
  certain users, e.g. to force their camera to always be NSFW or bar them from
  sharing their webcam at all anymore.

Chat UI improvements around users blocking admin accounts:

* When a main website block is in place, the DMs button in the Who List shows
  as greyed out with a cross through, as if that user had closed their DMs.
* Admin users are always able to watch the camera of people who have blocked
  them. The broadcaster is not notified about the watch.

New operator commands:

* /cut username: to tell a user to turn off their webcam.
* /unmute-all: to lift all mutes on your side, e.g. so your moderator chatbot
  can still see public messages from users who have blocked it.
* /help-advanced: moved the more dangerous admin command documentation here.

Miscellaneous fixes:

* The admin commands now tolerate an @ prefix in front of usernames.
* The /nsfw command won't fire unless the user's camera is actually active and
  not marked as explicit.
2024-05-17 17:15:48 -07:00

618 lines
16 KiB
Go

package barertc
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"sort"
"strings"
"sync"
"time"
"git.kirsle.net/apps/barertc/pkg/config"
"git.kirsle.net/apps/barertc/pkg/jwt"
"git.kirsle.net/apps/barertc/pkg/log"
"git.kirsle.net/apps/barertc/pkg/messages"
"git.kirsle.net/apps/barertc/pkg/util"
"nhooyr.io/websocket"
)
// Subscriber represents a connected WebSocket session.
type Subscriber struct {
// User properties
ID int // ID assigned by server
Username string
ChatStatus string
VideoStatus int
DND bool // Do Not Disturb status (DMs are closed)
JWTClaims *jwt.Claims
authenticated bool // has passed the login step
loginAt time.Time
// Connection details (WebSocket).
conn *websocket.Conn // WebSocket user
ctx context.Context
cancel context.CancelFunc
messages chan []byte
closeSlow func()
// Polling API users.
usePolling bool
sessionID string
lastPollAt time.Time
lastPollJWT time.Time // give a new JWT once in a while
muteMu sync.RWMutex
booted map[string]struct{} // usernames booted off your camera
blocked map[string]struct{} // usernames you have blocked
muted map[string]struct{} // usernames you muted
// Record which message IDs belong to this user.
midMu sync.Mutex
messageIDs map[int64]struct{}
// Logging.
log bool
logfh map[string]io.WriteCloser
}
// NewSubscriber initializes a connected chat user.
func (s *Server) NewSubscriber(ctx context.Context, cancelFunc func()) *Subscriber {
return &Subscriber{
ctx: ctx,
cancel: cancelFunc,
messages: make(chan []byte, s.subscriberMessageBuffer),
booted: make(map[string]struct{}),
muted: make(map[string]struct{}),
blocked: make(map[string]struct{}),
messageIDs: make(map[int64]struct{}),
ChatStatus: "online",
}
}
// NewWebSocketSubscriber returns a new subscriber with a WebSocket connection.
func (s *Server) NewWebSocketSubscriber(ctx context.Context, conn *websocket.Conn, cancelFunc func()) *Subscriber {
sub := s.NewSubscriber(ctx, cancelFunc)
sub.conn = conn
sub.closeSlow = func() {
conn.Close(websocket.StatusPolicyViolation, "connection too slow to keep up with messages")
}
return sub
}
// NewPollingSubscriber returns a new subscriber using the polling API.
func (s *Server) NewPollingSubscriber(ctx context.Context, cancelFunc func()) *Subscriber {
sub := s.NewSubscriber(ctx, cancelFunc)
sub.usePolling = true
sub.lastPollAt = time.Now()
sub.lastPollJWT = time.Now()
sub.closeSlow = func() {
// Their outbox is filled up, disconnect them.
log.Error("Polling subscriber %s#%d: inbox is filled up!", sub.Username, sub.ID)
// Send an exit message.
if sub.authenticated && sub.ChatStatus != "hidden" {
sub.authenticated = false
s.Broadcast(messages.Message{
Action: messages.ActionPresence,
Username: sub.Username,
Message: messages.PresenceExited,
})
s.SendWhoList()
}
s.DeleteSubscriber(sub)
}
return sub
}
// OnClientMessage handles a chat protocol message from the user's WebSocket or polling API.
func (s *Server) OnClientMessage(sub *Subscriber, msg messages.Message) {
// What action are they performing?
switch msg.Action {
case messages.ActionLogin:
s.OnLogin(sub, msg)
case messages.ActionMessage:
s.OnMessage(sub, msg)
case messages.ActionFile:
s.OnFile(sub, msg)
case messages.ActionMe:
s.OnMe(sub, msg)
case messages.ActionOpen:
s.OnOpen(sub, msg)
case messages.ActionBoot:
s.OnBoot(sub, msg, true)
case messages.ActionUnboot:
s.OnBoot(sub, msg, false)
case messages.ActionMute, messages.ActionUnmute:
s.OnMute(sub, msg, msg.Action == messages.ActionMute)
case messages.ActionBlock:
s.OnBlock(sub, msg)
case messages.ActionBlocklist:
s.OnBlocklist(sub, msg)
case messages.ActionCandidate:
s.OnCandidate(sub, msg)
case messages.ActionSDP:
s.OnSDP(sub, msg)
case messages.ActionWatch:
s.OnWatch(sub, msg)
case messages.ActionUnwatch:
s.OnUnwatch(sub, msg)
case messages.ActionTakeback:
s.OnTakeback(sub, msg)
case messages.ActionReact:
s.OnReact(sub, msg)
case messages.ActionReport:
s.OnReport(sub, msg)
case messages.ActionPing:
default:
sub.ChatServer("Unsupported message type: %s", msg.Action)
}
}
// ReadLoop spawns a goroutine that reads from the websocket connection.
func (sub *Subscriber) ReadLoop(s *Server) {
go func() {
for {
msgType, data, err := sub.conn.Read(sub.ctx)
if err != nil {
log.Error("ReadLoop error(%d=%s): %+v", sub.ID, sub.Username, err)
s.DeleteSubscriber(sub)
// Notify if this user was auth'd and not hidden
if sub.authenticated && sub.ChatStatus != "hidden" {
s.Broadcast(messages.Message{
Action: messages.ActionPresence,
Username: sub.Username,
Message: messages.PresenceExited,
})
s.SendWhoList()
}
return
}
if msgType != websocket.MessageText {
log.Error("Unexpected MessageType")
continue
}
// Read the user's posted message.
var msg messages.Message
if err := json.Unmarshal(data, &msg); err != nil {
log.Error("Read(%d=%s) Message error: %s", sub.ID, sub.Username, err)
continue
}
if msg.Action != messages.ActionFile {
log.Debug("Read(%d=%s): %s", sub.ID, sub.Username, data)
}
// Handle their message.
s.OnClientMessage(sub, msg)
}
}()
}
// IsAdmin safely checks if the subscriber is an admin.
func (sub *Subscriber) IsAdmin() bool {
return sub.JWTClaims != nil && sub.JWTClaims.IsAdmin
}
// IsVIP safely checks if the subscriber has VIP status.
func (sub *Subscriber) IsVIP() bool {
return sub.JWTClaims != nil && sub.JWTClaims.VIP
}
// SendJSON sends a JSON message to the websocket client.
func (sub *Subscriber) SendJSON(v interface{}) error {
data, err := json.Marshal(v)
if err != nil {
return err
}
log.Debug("SendJSON(%d=%s): %s", sub.ID, sub.Username, data)
// Add the message to the recipient's queue. If the queue is too full,
// disconnect the client as they can't keep up.
select {
case sub.messages <- data:
default:
go sub.closeSlow()
}
return nil
}
// SendMe sends the current user state to the client.
func (sub *Subscriber) SendMe() {
sub.SendJSON(messages.Message{
Action: messages.ActionMe,
Username: sub.Username,
VideoStatus: sub.VideoStatus,
})
}
// SendCut sends the client a 'cut' message to deactivate their camera.
func (sub *Subscriber) SendCut() {
sub.SendJSON(messages.Message{
Action: messages.ActionCut,
})
}
// ChatServer is a convenience function to deliver a ChatServer error to the client.
func (sub *Subscriber) ChatServer(message string, v ...interface{}) {
sub.SendJSON(messages.Message{
Action: messages.ActionError,
Username: "ChatServer",
Message: fmt.Sprintf(message, v...),
})
}
// WebSocket handles the /ws websocket connection endpoint.
func (s *Server) WebSocket() http.HandlerFunc {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ip := util.IPAddress(r)
log.Info("WebSocket connection from %s - %s", ip, r.Header.Get("User-Agent"))
log.Debug("Headers: %+v", r.Header)
c, err := websocket.Accept(w, r, &websocket.AcceptOptions{
CompressionMode: websocket.CompressionDisabled,
})
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprintf(w, "Could not accept websocket connection: %s", err)
return
}
defer c.Close(websocket.StatusInternalError, "the sky is falling")
log.Debug("WebSocket: %s has connected", ip)
c.SetReadLimit(config.Current.WebSocketReadLimit)
// CloseRead starts a goroutine that will read from the connection
// until it is closed.
// ctx := c.CloseRead(r.Context())
ctx, cancel := context.WithCancel(r.Context())
sub := s.NewWebSocketSubscriber(ctx, c, cancel)
s.AddSubscriber(sub)
defer s.DeleteSubscriber(sub)
go sub.ReadLoop(s)
pinger := time.NewTicker(PingInterval)
for {
select {
case msg := <-sub.messages:
err = writeTimeout(ctx, time.Second*time.Duration(config.Current.WebSocketSendTimeout), c, msg)
if err != nil {
return
}
case <-pinger.C:
// Send a ping, and a refreshed JWT token if the user sent one.
var token string
if sub.JWTClaims != nil {
if jwt, err := sub.JWTClaims.ReSign(); err != nil {
log.Error("ReSign JWT token for %s#%d: %s", sub.Username, sub.ID, err)
} else {
token = jwt
}
}
sub.SendJSON(messages.Message{
Action: messages.ActionPing,
JWTToken: token,
})
case <-ctx.Done():
pinger.Stop()
return
}
}
})
}
// Auto incrementing Subscriber ID, assigned in AddSubscriber.
var SubscriberID int
// AddSubscriber adds a WebSocket subscriber to the server.
func (s *Server) AddSubscriber(sub *Subscriber) {
// Assign a unique ID.
SubscriberID++
sub.ID = SubscriberID
log.Debug("AddSubscriber: ID #%d", sub.ID)
s.subscribersMu.Lock()
s.subscribers[sub] = struct{}{}
s.subscribersMu.Unlock()
}
// GetSubscriber by username.
func (s *Server) GetSubscriber(username string) (*Subscriber, error) {
for _, sub := range s.IterSubscribers() {
if sub.Username == username {
return sub, nil
}
}
return nil, errors.New("not found")
}
// DeleteSubscriber removes a subscriber from the server.
func (s *Server) DeleteSubscriber(sub *Subscriber) {
if sub == nil {
return
}
log.Error("DeleteSubscriber: %s", sub.Username)
// Cancel its context to clean up the for-loop goroutine.
if sub.cancel != nil {
log.Info("Calling sub.cancel() on subscriber: %s#%d", sub.Username, sub.ID)
sub.cancel()
}
// Clean up any log files.
sub.teardownLogs()
s.subscribersMu.Lock()
delete(s.subscribers, sub)
s.subscribersMu.Unlock()
}
// IterSubscribers loops over the subscriber list with a read lock.
func (s *Server) IterSubscribers() []*Subscriber {
var result = []*Subscriber{}
// Lock for reads.
s.subscribersMu.RLock()
for sub := range s.subscribers {
result = append(result, sub)
}
s.subscribersMu.RUnlock()
return result
}
// UniqueUsername ensures a username will be unique or renames it. If the name is already unique, the error result is nil.
func (s *Server) UniqueUsername(username string) (string, error) {
var (
subs = s.IterSubscribers()
usernames = map[string]interface{}{}
origUsername = username
counter = 2
)
for _, sub := range subs {
usernames[sub.Username] = nil
}
// Check until unique.
for {
if _, ok := usernames[username]; ok {
username = fmt.Sprintf("%s %d", origUsername, counter)
counter++
} else {
break
}
}
if username != origUsername {
return username, errors.New("username was not unique and a unique name has been returned")
}
return username, nil
}
// Broadcast a message to the chat room.
func (s *Server) Broadcast(msg messages.Message) {
if len(msg.Message) < 1024 {
log.Debug("Broadcast: %+v", msg)
}
// Get the sender of this message.
sender, err := s.GetSubscriber(msg.Username)
if err != nil {
log.Error("Broadcast: sender name %s not found as a current subscriber!", msg.Username)
sender = nil
}
// Get the list of users who are online NOW, so we don't hold the mutex lock too long.
// Example: sending a fat GIF to a large audience could hang up the server for a long
// time until every copy of the GIF has been sent.
var subs = s.IterSubscribers()
for _, sub := range subs {
if !sub.authenticated {
continue
}
// Don't deliver it if the receiver has muted us.
if sub.Mutes(msg.Username) {
log.Debug("Do not broadcast message to %s: they have muted or booted %s", sub.Username, msg.Username)
continue
}
// Don't deliver it if there is any blocking between sender and receiver.
if sender != nil && sender.Blocks(sub) {
log.Debug("Do not broadcast message to %s: blocking between them and %s", msg.Username, sub.Username)
continue
}
// VIP channels: only deliver to subscribed VIP users.
if ch, ok := config.Current.GetChannel(msg.Channel); ok && ch.VIP && !sub.IsVIP() && !sub.IsAdmin() {
log.Debug("Do not broadcast message to %s: VIP channel and they are not VIP", sub.Username)
continue
}
sub.SendJSON(msg)
}
}
// SendTo sends a message to a given username.
func (s *Server) SendTo(username string, msg messages.Message) error {
log.Debug("SendTo(%s): %+v", username, msg)
username = strings.TrimPrefix(username, "@")
var found bool
var subs = s.IterSubscribers()
for _, sub := range subs {
if sub.Username == username {
found = true
sub.SendJSON(messages.Message{
Action: msg.Action,
Channel: msg.Channel,
Username: msg.Username,
Message: msg.Message,
MessageID: msg.MessageID,
})
}
}
if !found {
return fmt.Errorf("%s is not online", username)
}
return nil
}
// SendWhoList broadcasts the connected members to everybody in the room.
func (s *Server) SendWhoList() {
var (
subscribers = s.IterSubscribers()
usernames = []string{} // distinct and sorted usernames
userSub = map[string]*Subscriber{}
)
for _, sub := range subscribers {
if !sub.authenticated {
continue
}
usernames = append(usernames, sub.Username)
userSub[sub.Username] = sub
}
sort.Strings(usernames)
// Build the WhoList for each subscriber.
// TODO: it's the only way to fake videoActive for booted user views.
for _, sub := range subscribers {
if !sub.authenticated {
continue
}
var users = []messages.WhoList{}
for _, un := range usernames {
user := userSub[un]
if user.ChatStatus == "hidden" {
continue
}
// Blocking: hide the presence of both people from the Who List.
if user.Blocks(sub) {
log.Debug("WhoList: hide %s from %s (blocking)", user.Username, sub.Username)
continue
}
who := messages.WhoList{
Username: user.Username,
Status: user.ChatStatus,
Video: user.VideoStatus,
DND: user.DND,
LoginAt: user.loginAt.Unix(),
}
// Hide video flags of other users (never for the current user).
if user.Username != sub.Username {
// If this person had booted us, force their camera to "off"
if user.Boots(sub.Username) || user.Mutes(sub.Username) {
if sub.IsAdmin() {
// They kicked the admin off, but admin can reopen the cam if they want.
// But, unset the user's "auto-open your camera" flag, so if the admin
// reopens it, the admin's cam won't open on the recipient's screen.
who.Video ^= messages.VideoFlagMutualOpen
} else {
// Force their video to "off"
who.Video = 0
}
}
// If this person's VideoFlag is set to VIP Only, force their camera to "off"
// except when the person looking has the VIP status.
if (user.VideoStatus&messages.VideoFlagOnlyVIP == messages.VideoFlagOnlyVIP) && (!sub.IsVIP() && !sub.IsAdmin()) {
who.Video = 0
}
}
if user.JWTClaims != nil {
who.Operator = user.JWTClaims.IsAdmin
who.Avatar = user.JWTClaims.Avatar
who.ProfileURL = user.JWTClaims.ProfileURL
who.Nickname = user.JWTClaims.Nick
who.Emoji = user.JWTClaims.Emoji
who.Gender = user.JWTClaims.Gender
// VIP flags: if we are in MutuallySecret mode, only VIPs can see
// other VIP flags on the Who List.
if config.Current.VIP.MutuallySecret {
if sub.IsVIP() {
who.VIP = user.JWTClaims.VIP
}
} else {
who.VIP = user.JWTClaims.VIP
}
}
users = append(users, who)
}
sub.SendJSON(messages.Message{
Action: messages.ActionWhoList,
WhoList: users,
})
}
}
// Boots checks whether the subscriber has blocked username from their camera.
func (s *Subscriber) Boots(username string) bool {
s.muteMu.RLock()
defer s.muteMu.RUnlock()
_, ok := s.booted[username]
return ok
}
// Mutes checks whether the subscriber has muted username.
func (s *Subscriber) Mutes(username string) bool {
s.muteMu.RLock()
defer s.muteMu.RUnlock()
_, ok := s.muted[username]
return ok
}
// Blocks checks whether the subscriber blocks the username, or vice versa (blocking goes both directions).
func (s *Subscriber) Blocks(other *Subscriber) bool {
if s == nil || other == nil {
return false
}
// If either side is an admin, blocking is not allowed.
if s.IsAdmin() || other.IsAdmin() {
return false
}
s.muteMu.RLock()
defer s.muteMu.RUnlock()
// Forward block?
if _, ok := s.blocked[other.Username]; ok {
return true
}
// Reverse block?
other.muteMu.RLock()
defer other.muteMu.RUnlock()
_, ok := other.blocked[s.Username]
return ok
}
func writeTimeout(ctx context.Context, timeout time.Duration, c *websocket.Conn, msg []byte) error {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
return c.Write(ctx, websocket.MessageText, msg)
}