BareRTC/pkg/websocket.go

444 lines
11 KiB
Go
Raw Normal View History

2023-01-11 06:38:48 +00:00
package barertc
import (
"context"
"encoding/json"
"errors"
2023-01-11 06:38:48 +00:00
"fmt"
"net/http"
"sort"
"strings"
"sync"
2023-01-11 06:38:48 +00:00
"time"
2023-03-22 04:29:24 +00:00
"git.kirsle.net/apps/barertc/pkg/config"
"git.kirsle.net/apps/barertc/pkg/jwt"
2023-01-11 06:38:48 +00:00
"git.kirsle.net/apps/barertc/pkg/log"
"git.kirsle.net/apps/barertc/pkg/util"
2023-01-11 06:38:48 +00:00
"nhooyr.io/websocket"
)
// Subscriber represents a connected WebSocket session.
type Subscriber struct {
// User properties
ID int // ID assigned by server
Username string
ChatStatus string
VideoStatus int
JWTClaims *jwt.Claims
authenticated bool // has passed the login step
conn *websocket.Conn
ctx context.Context
cancel context.CancelFunc
messages chan []byte
closeSlow func()
muteMu sync.RWMutex
booted map[string]struct{} // usernames booted off your camera
muted map[string]struct{} // usernames you muted
2023-06-24 20:08:15 +00:00
// Record which message IDs belong to this user.
midMu sync.Mutex
messageIDs map[int]struct{}
2023-01-11 06:38:48 +00:00
}
// ReadLoop spawns a goroutine that reads from the websocket connection.
func (sub *Subscriber) ReadLoop(s *Server) {
go func() {
for {
msgType, data, err := sub.conn.Read(sub.ctx)
if err != nil {
log.Error("ReadLoop error(%d=%s): %+v", sub.ID, sub.Username, err)
s.DeleteSubscriber(sub)
2023-03-29 01:09:13 +00:00
// Notify if this user was auth'd and not hidden
if sub.authenticated && sub.ChatStatus != "hidden" {
s.Broadcast(Message{
Action: ActionPresence,
Username: sub.Username,
Message: "has exited the room!",
})
s.SendWhoList()
}
2023-01-11 06:38:48 +00:00
return
}
if msgType != websocket.MessageText {
log.Error("Unexpected MessageType")
continue
}
// Read the user's posted message.
var msg Message
if err := json.Unmarshal(data, &msg); err != nil {
log.Error("Read(%d=%s) Message error: %s", sub.ID, sub.Username, err)
2023-01-11 06:38:48 +00:00
continue
}
2023-03-22 04:29:24 +00:00
if msg.Action != ActionFile {
log.Debug("Read(%d=%s): %s", sub.ID, sub.Username, data)
}
2023-01-11 06:38:48 +00:00
// What action are they performing?
switch msg.Action {
case ActionLogin:
s.OnLogin(sub, msg)
2023-01-11 06:38:48 +00:00
case ActionMessage:
s.OnMessage(sub, msg)
2023-03-22 04:29:24 +00:00
case ActionFile:
s.OnFile(sub, msg)
case ActionMe:
s.OnMe(sub, msg)
case ActionOpen:
s.OnOpen(sub, msg)
case ActionBoot:
s.OnBoot(sub, msg)
case ActionMute, ActionUnmute:
s.OnMute(sub, msg, msg.Action == ActionMute)
case ActionBlocklist:
s.OnBlocklist(sub, msg)
case ActionCandidate:
s.OnCandidate(sub, msg)
case ActionSDP:
s.OnSDP(sub, msg)
case ActionWatch:
s.OnWatch(sub, msg)
case ActionUnwatch:
s.OnUnwatch(sub, msg)
2023-06-24 20:08:15 +00:00
case ActionTakeback:
s.OnTakeback(sub, msg)
2023-07-01 03:00:21 +00:00
case ActionReact:
s.OnReact(sub, msg)
2023-01-11 06:38:48 +00:00
default:
sub.ChatServer("Unsupported message type.")
2023-01-11 06:38:48 +00:00
}
}
}()
}
// IsAdmin safely checks if the subscriber is an admin.
func (sub *Subscriber) IsAdmin() bool {
return sub.JWTClaims != nil && sub.JWTClaims.IsAdmin
}
2023-01-11 06:38:48 +00:00
// SendJSON sends a JSON message to the websocket client.
func (sub *Subscriber) SendJSON(v interface{}) error {
data, err := json.Marshal(v)
if err != nil {
return err
}
log.Debug("SendJSON(%d=%s): %s", sub.ID, sub.Username, data)
return sub.conn.Write(sub.ctx, websocket.MessageText, data)
2023-01-11 06:38:48 +00:00
}
// SendMe sends the current user state to the client.
func (sub *Subscriber) SendMe() {
sub.SendJSON(Message{
Action: ActionMe,
Username: sub.Username,
VideoStatus: sub.VideoStatus,
})
}
// ChatServer is a convenience function to deliver a ChatServer error to the client.
func (sub *Subscriber) ChatServer(message string, v ...interface{}) {
sub.SendJSON(Message{
Action: ActionError,
Username: "ChatServer",
Message: fmt.Sprintf(message, v...),
})
}
2023-03-22 04:29:24 +00:00
// WebSocket handles the /ws websocket connection endpoint.
2023-01-11 06:38:48 +00:00
func (s *Server) WebSocket() http.HandlerFunc {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ip := util.IPAddress(r)
log.Info("WebSocket connection from %s - %s", ip, r.Header.Get("User-Agent"))
log.Debug("Headers: %+v", r.Header)
2023-03-31 19:40:55 +00:00
c, err := websocket.Accept(w, r, &websocket.AcceptOptions{
CompressionMode: websocket.CompressionDisabled,
})
2023-01-11 06:38:48 +00:00
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprintf(w, "Could not accept websocket connection: %s", err)
2023-01-11 06:38:48 +00:00
return
}
defer c.Close(websocket.StatusInternalError, "the sky is falling")
log.Debug("WebSocket: %s has connected", ip)
2023-03-22 04:29:24 +00:00
c.SetReadLimit(config.Current.WebSocketReadLimit)
2023-01-11 06:38:48 +00:00
// CloseRead starts a goroutine that will read from the connection
// until it is closed.
// ctx := c.CloseRead(r.Context())
ctx, cancel := context.WithCancel(r.Context())
2023-01-11 06:38:48 +00:00
sub := &Subscriber{
conn: c,
ctx: ctx,
cancel: cancel,
2023-01-11 06:38:48 +00:00
messages: make(chan []byte, s.subscriberMessageBuffer),
closeSlow: func() {
c.Close(websocket.StatusPolicyViolation, "connection too slow to keep up with messages")
},
2023-03-28 04:13:04 +00:00
booted: make(map[string]struct{}),
muted: make(map[string]struct{}),
2023-06-24 20:08:15 +00:00
messageIDs: make(map[int]struct{}),
2023-03-28 04:13:04 +00:00
ChatStatus: "online",
2023-01-11 06:38:48 +00:00
}
s.AddSubscriber(sub)
defer s.DeleteSubscriber(sub)
2023-01-11 06:38:48 +00:00
go sub.ReadLoop(s)
pinger := time.NewTicker(PingInterval)
2023-01-11 06:38:48 +00:00
for {
select {
case msg := <-sub.messages:
err = writeTimeout(ctx, time.Second*5, c, msg)
if err != nil {
return
}
2023-02-11 06:46:39 +00:00
case <-pinger.C:
// Send a ping, and a refreshed JWT token if the user sent one.
var token string
if sub.JWTClaims != nil {
if jwt, err := sub.JWTClaims.ReSign(); err != nil {
log.Error("ReSign JWT token for %s: %s", sub.Username, err)
} else {
token = jwt
}
}
sub.SendJSON(Message{
Action: ActionPing,
JWTToken: token,
})
2023-01-11 06:38:48 +00:00
case <-ctx.Done():
pinger.Stop()
2023-01-11 06:38:48 +00:00
return
}
}
})
}
// Auto incrementing Subscriber ID, assigned in AddSubscriber.
var SubscriberID int
2023-01-11 06:38:48 +00:00
// AddSubscriber adds a WebSocket subscriber to the server.
func (s *Server) AddSubscriber(sub *Subscriber) {
// Assign a unique ID.
SubscriberID++
sub.ID = SubscriberID
log.Debug("AddSubscriber: ID #%d", sub.ID)
2023-01-11 06:38:48 +00:00
s.subscribersMu.Lock()
s.subscribers[sub] = struct{}{}
s.subscribersMu.Unlock()
}
// GetSubscriber by username.
func (s *Server) GetSubscriber(username string) (*Subscriber, error) {
for _, sub := range s.IterSubscribers() {
if sub.Username == username {
return sub, nil
}
}
return nil, errors.New("not found")
}
2023-01-11 06:38:48 +00:00
// DeleteSubscriber removes a subscriber from the server.
func (s *Server) DeleteSubscriber(sub *Subscriber) {
log.Error("DeleteSubscriber: %s", sub.Username)
// Cancel its context to clean up the for-loop goroutine.
if sub.cancel != nil {
sub.cancel()
}
2023-01-11 06:38:48 +00:00
s.subscribersMu.Lock()
delete(s.subscribers, sub)
s.subscribersMu.Unlock()
}
// IterSubscribers loops over the subscriber list with a read lock. If the
// caller already holds a lock, pass the optional `true` parameter for isLocked.
func (s *Server) IterSubscribers(isLocked ...bool) []*Subscriber {
var result = []*Subscriber{}
// Has the caller already taken the read lock or do we get it?
if locked := len(isLocked) > 0 && isLocked[0]; !locked {
s.subscribersMu.RLock()
defer s.subscribersMu.RUnlock()
}
for sub := range s.subscribers {
result = append(result, sub)
}
return result
}
// UniqueUsername ensures a username will be unique or renames it. If the name is already unique, the error result is nil.
func (s *Server) UniqueUsername(username string) (string, error) {
var (
subs = s.IterSubscribers()
usernames = map[string]interface{}{}
origUsername = username
counter = 2
)
for _, sub := range subs {
usernames[sub.Username] = nil
}
// Check until unique.
for {
if _, ok := usernames[username]; ok {
username = fmt.Sprintf("%s %d", origUsername, counter)
counter++
} else {
break
}
}
if username != origUsername {
return username, errors.New("username was not unique and a unique name has been returned")
}
return username, nil
}
2023-01-11 06:38:48 +00:00
// Broadcast a message to the chat room.
func (s *Server) Broadcast(msg Message) {
2023-03-22 04:29:24 +00:00
if len(msg.Message) < 1024 {
log.Debug("Broadcast: %+v", msg)
}
// Get the list of users who are online NOW, so we don't hold the mutex lock too long.
// Example: sending a fat GIF to a large audience could hang up the server for a long
// time until every copy of the GIF has been sent.
var subs = s.IterSubscribers()
for _, sub := range subs {
if !sub.authenticated {
continue
}
// Don't deliver it if the receiver has muted us.
if sub.Mutes(msg.Username) {
log.Debug("Do not broadcast message to %s: they have muted or booted %s", sub.Username, msg.Username)
continue
}
2023-02-06 21:27:29 +00:00
sub.SendJSON(msg)
2023-01-11 06:38:48 +00:00
}
}
// SendTo sends a message to a given username.
2023-02-06 21:27:29 +00:00
func (s *Server) SendTo(username string, msg Message) error {
log.Debug("SendTo(%s): %+v", username, msg)
username = strings.TrimPrefix(username, "@")
2023-02-06 21:27:29 +00:00
var found bool
var subs = s.IterSubscribers()
for _, sub := range subs {
if sub.Username == username {
2023-02-06 21:27:29 +00:00
found = true
sub.SendJSON(Message{
2023-06-24 20:47:20 +00:00
Action: msg.Action,
Channel: msg.Channel,
Username: msg.Username,
Message: msg.Message,
MessageID: msg.MessageID,
})
}
}
2023-02-06 21:27:29 +00:00
if !found {
return fmt.Errorf("%s is not online", username)
}
return nil
}
// SendWhoList broadcasts the connected members to everybody in the room.
func (s *Server) SendWhoList() {
var (
subscribers = s.IterSubscribers()
usernames = []string{} // distinct and sorted usernames
userSub = map[string]*Subscriber{}
)
for _, sub := range subscribers {
2023-04-02 06:44:15 +00:00
if !sub.authenticated {
continue
}
usernames = append(usernames, sub.Username)
userSub[sub.Username] = sub
}
sort.Strings(usernames)
// Build the WhoList for each subscriber.
// TODO: it's the only way to fake videoActive for booted user views.
for _, sub := range subscribers {
if !sub.authenticated {
continue
}
var users = []WhoList{}
for _, un := range usernames {
user := userSub[un]
2023-03-29 01:09:13 +00:00
if user.ChatStatus == "hidden" {
continue
}
who := WhoList{
Username: user.Username,
Status: user.ChatStatus,
Video: user.VideoStatus,
}
// If this person had booted us, force their camera to "off"
if (user.Boots(sub.Username) || user.Mutes(sub.Username)) && !sub.IsAdmin() {
who.Video = 0
}
2023-03-29 01:09:13 +00:00
if user.JWTClaims != nil {
who.Operator = user.JWTClaims.IsAdmin
who.Avatar = user.JWTClaims.Avatar
who.ProfileURL = user.JWTClaims.ProfileURL
who.Nickname = user.JWTClaims.Nick
2023-08-06 02:38:04 +00:00
who.Emoji = user.JWTClaims.Emoji
who.Gender = user.JWTClaims.Gender
}
users = append(users, who)
}
sub.SendJSON(Message{
2023-02-11 06:46:39 +00:00
Action: ActionWhoList,
WhoList: users,
})
}
}
// Boots checks whether the subscriber has blocked username from their camera.
func (s *Subscriber) Boots(username string) bool {
s.muteMu.RLock()
defer s.muteMu.RUnlock()
_, ok := s.booted[username]
return ok
}
// Mutes checks whether the subscriber has muted username.
func (s *Subscriber) Mutes(username string) bool {
s.muteMu.RLock()
defer s.muteMu.RUnlock()
_, ok := s.muted[username]
return ok
}
2023-01-11 06:38:48 +00:00
func writeTimeout(ctx context.Context, timeout time.Duration, c *websocket.Conn, msg []byte) error {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
return c.Write(ctx, websocket.MessageText, msg)
}