BareRTC/pkg/websocket.go
Noah Petherbridge c5c8d08c7a Boot and Mute
* Users can now boot viewers off their camera. From the viewer's POV the
  booter has just turned off their camera and it will remain "off" for
  the remainder of the booter's session.
* Users can now mute one another: if you mute a user, you will no longer
  see that user's messages or DMs; and the muted user will never see
  your video as being active (like a boot but revokable if you unmute
  later).
2023-03-22 20:21:04 -07:00

366 lines
9.0 KiB
Go

package barertc
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"strings"
"sync"
"time"
"git.kirsle.net/apps/barertc/pkg/config"
"git.kirsle.net/apps/barertc/pkg/jwt"
"git.kirsle.net/apps/barertc/pkg/log"
"git.kirsle.net/apps/barertc/pkg/util"
"nhooyr.io/websocket"
)
// Subscriber represents a connected WebSocket session.
type Subscriber struct {
// User properties
ID int // ID assigned by server
Username string
VideoActive bool
VideoNSFW bool
JWTClaims *jwt.Claims
authenticated bool // has passed the login step
conn *websocket.Conn
ctx context.Context
cancel context.CancelFunc
messages chan []byte
closeSlow func()
muteMu sync.RWMutex
booted map[string]struct{} // usernames booted off your camera
muted map[string]struct{} // usernames you muted
}
// ReadLoop spawns a goroutine that reads from the websocket connection.
func (sub *Subscriber) ReadLoop(s *Server) {
go func() {
for {
msgType, data, err := sub.conn.Read(sub.ctx)
if err != nil {
log.Error("ReadLoop error(%d=%s): %+v", sub.ID, sub.Username, err)
s.DeleteSubscriber(sub)
// Notify if this user was auth'd
if sub.authenticated {
s.Broadcast(Message{
Action: ActionPresence,
Username: sub.Username,
Message: "has exited the room!",
})
s.SendWhoList()
}
return
}
if msgType != websocket.MessageText {
log.Error("Unexpected MessageType")
continue
}
// Read the user's posted message.
var msg Message
if err := json.Unmarshal(data, &msg); err != nil {
log.Error("Read(%d=%s) Message error: %s", sub.ID, sub.Username, err)
continue
}
if msg.Action != ActionFile {
log.Debug("Read(%d=%s): %s", sub.ID, sub.Username, data)
}
// What action are they performing?
switch msg.Action {
case ActionLogin:
s.OnLogin(sub, msg)
case ActionMessage:
s.OnMessage(sub, msg)
case ActionFile:
s.OnFile(sub, msg)
case ActionMe:
s.OnMe(sub, msg)
case ActionOpen:
s.OnOpen(sub, msg)
case ActionBoot:
s.OnBoot(sub, msg)
case ActionMute, ActionUnmute:
s.OnMute(sub, msg, msg.Action == ActionMute)
case ActionCandidate:
s.OnCandidate(sub, msg)
case ActionSDP:
s.OnSDP(sub, msg)
case ActionWatch:
s.OnWatch(sub, msg)
case ActionUnwatch:
s.OnUnwatch(sub, msg)
default:
sub.ChatServer("Unsupported message type.")
}
}
}()
}
// SendJSON sends a JSON message to the websocket client.
func (sub *Subscriber) SendJSON(v interface{}) error {
data, err := json.Marshal(v)
if err != nil {
return err
}
log.Debug("SendJSON(%d=%s): %s", sub.ID, sub.Username, data)
return sub.conn.Write(sub.ctx, websocket.MessageText, data)
}
// SendMe sends the current user state to the client.
func (sub *Subscriber) SendMe() {
sub.SendJSON(Message{
Action: ActionMe,
Username: sub.Username,
VideoActive: sub.VideoActive,
})
}
// ChatServer is a convenience function to deliver a ChatServer error to the client.
func (sub *Subscriber) ChatServer(message string, v ...interface{}) {
sub.SendJSON(Message{
Action: ActionError,
Username: "ChatServer",
Message: fmt.Sprintf(message, v...),
})
}
// WebSocket handles the /ws websocket connection endpoint.
func (s *Server) WebSocket() http.HandlerFunc {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ip := util.IPAddress(r)
log.Info("WebSocket connection from %s - %s", ip, r.Header.Get("User-Agent"))
log.Debug("Headers: %+v", r.Header)
c, err := websocket.Accept(w, r, nil)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprintf(w, "Could not accept websocket connection: %s", err)
return
}
defer c.Close(websocket.StatusInternalError, "the sky is falling")
log.Debug("WebSocket: %s has connected", ip)
c.SetReadLimit(config.Current.WebSocketReadLimit)
// CloseRead starts a goroutine that will read from the connection
// until it is closed.
// ctx := c.CloseRead(r.Context())
ctx, cancel := context.WithCancel(r.Context())
sub := &Subscriber{
conn: c,
ctx: ctx,
cancel: cancel,
messages: make(chan []byte, s.subscriberMessageBuffer),
closeSlow: func() {
c.Close(websocket.StatusPolicyViolation, "connection too slow to keep up with messages")
},
booted: make(map[string]struct{}),
muted: make(map[string]struct{}),
}
s.AddSubscriber(sub)
defer s.DeleteSubscriber(sub)
go sub.ReadLoop(s)
pinger := time.NewTicker(PingInterval)
for {
select {
case msg := <-sub.messages:
err = writeTimeout(ctx, time.Second*5, c, msg)
if err != nil {
return
}
case <-pinger.C:
sub.SendJSON(Message{
Action: ActionPing,
})
case <-ctx.Done():
pinger.Stop()
return
}
}
})
}
// Auto incrementing Subscriber ID, assigned in AddSubscriber.
var SubscriberID int
// AddSubscriber adds a WebSocket subscriber to the server.
func (s *Server) AddSubscriber(sub *Subscriber) {
// Assign a unique ID.
SubscriberID++
sub.ID = SubscriberID
log.Debug("AddSubscriber: ID #%d", sub.ID)
s.subscribersMu.Lock()
s.subscribers[sub] = struct{}{}
s.subscribersMu.Unlock()
}
// GetSubscriber by username.
func (s *Server) GetSubscriber(username string) (*Subscriber, error) {
s.subscribersMu.RLock()
defer s.subscribersMu.RUnlock()
for _, sub := range s.IterSubscribers(true) {
if sub.Username == username {
return sub, nil
}
}
return nil, errors.New("not found")
}
// DeleteSubscriber removes a subscriber from the server.
func (s *Server) DeleteSubscriber(sub *Subscriber) {
log.Error("DeleteSubscriber: %s", sub.Username)
// Cancel its context to clean up the for-loop goroutine.
if sub.cancel != nil {
sub.cancel()
}
s.subscribersMu.Lock()
delete(s.subscribers, sub)
s.subscribersMu.Unlock()
}
// IterSubscribers loops over the subscriber list with a read lock. If the
// caller already holds a lock, pass the optional `true` parameter for isLocked.
func (s *Server) IterSubscribers(isLocked ...bool) []*Subscriber {
var result = []*Subscriber{}
// Has the caller already taken the read lock or do we get it?
if locked := len(isLocked) > 0 && isLocked[0]; !locked {
s.subscribersMu.RLock()
defer s.subscribersMu.RUnlock()
}
for sub := range s.subscribers {
result = append(result, sub)
}
return result
}
// Broadcast a message to the chat room.
func (s *Server) Broadcast(msg Message) {
if len(msg.Message) < 1024 {
log.Debug("Broadcast: %+v", msg)
}
s.subscribersMu.RLock()
defer s.subscribersMu.RUnlock()
for _, sub := range s.IterSubscribers(true) {
if !sub.authenticated {
continue
}
// Don't deliver it if the receiver has muted us.
if sub.Mutes(msg.Username) {
log.Debug("Do not broadcast message to %s: they have muted or booted %s", sub.Username, msg.Username)
continue
}
sub.SendJSON(msg)
}
}
// SendTo sends a message to a given username.
func (s *Server) SendTo(username string, msg Message) error {
log.Debug("SendTo(%s): %+v", username, msg)
username = strings.TrimPrefix(username, "@")
s.subscribersMu.RLock()
defer s.subscribersMu.RUnlock()
var found bool
for _, sub := range s.IterSubscribers(true) {
if sub.Username == username {
found = true
sub.SendJSON(Message{
Action: msg.Action,
Channel: msg.Channel,
Username: msg.Username,
Message: msg.Message,
})
}
}
if !found {
return fmt.Errorf("%s is not online", username)
}
return nil
}
// SendWhoList broadcasts the connected members to everybody in the room.
func (s *Server) SendWhoList() {
var (
subscribers = s.IterSubscribers()
)
// Build the WhoList for each subscriber.
// TODO: it's the only way to fake videoActive for booted user views.
for _, sub := range subscribers {
if !sub.authenticated {
continue
}
var users = []WhoList{}
for _, user := range subscribers {
who := WhoList{
Username: user.Username,
VideoActive: user.VideoActive,
NSFW: user.VideoNSFW,
}
// If this person had booted us, force their camera to "off"
if user.Boots(sub.Username) || user.Mutes(sub.Username) {
who.VideoActive = false
who.NSFW = false
}
if sub.JWTClaims != nil {
who.Operator = user.JWTClaims.IsAdmin
who.Avatar = user.JWTClaims.Avatar
who.ProfileURL = user.JWTClaims.ProfileURL
}
users = append(users, who)
}
sub.SendJSON(Message{
Action: ActionWhoList,
WhoList: users,
})
}
}
// Boots checks whether the subscriber has blocked username from their camera.
func (s *Subscriber) Boots(username string) bool {
s.muteMu.RLock()
defer s.muteMu.RUnlock()
_, ok := s.booted[username]
return ok
}
// Mutes checks whether the subscriber has muted username.
func (s *Subscriber) Mutes(username string) bool {
s.muteMu.RLock()
defer s.muteMu.RUnlock()
_, ok := s.muted[username]
return ok
}
func writeTimeout(ctx context.Context, timeout time.Duration, c *websocket.Conn, msg []byte) error {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
return c.Write(ctx, websocket.MessageText, msg)
}