diff --git a/pkg/handlers.go b/pkg/handlers.go index 146cc18..66b86be 100644 --- a/pkg/handlers.go +++ b/pkg/handlers.go @@ -11,7 +11,7 @@ import ( func (s *Server) OnLogin(sub *Subscriber, msg Message) { // Ensure the username is unique, or rename it. var duplicate bool - for other := range s.IterSubscribers() { + for _, other := range s.IterSubscribers() { if other.ID != sub.ID && other.Username == msg.Username { duplicate = true break @@ -48,11 +48,7 @@ func (s *Server) OnLogin(sub *Subscriber, msg Message) { func (s *Server) OnMessage(sub *Subscriber, msg Message) { log.Info("[%s] %s", sub.Username, msg.Message) if sub.Username == "" { - sub.SendJSON(Message{ - Action: ActionMessage, - Username: "ChatServer", - Message: "You must log in first.", - }) + sub.ChatServer("You must log in first.") return } @@ -75,3 +71,63 @@ func (s *Server) OnMe(sub *Subscriber, msg Message) { // Sync the WhoList to everybody. s.SendWhoList() } + +// OnOpen is a client wanting to start WebRTC with another, e.g. to see their camera. +func (s *Server) OnOpen(sub *Subscriber, msg Message) { + // Look up the other subscriber. + other, err := s.GetSubscriber(msg.Username) + if err != nil { + sub.ChatServer(err.Error()) + return + } + + // Make up a WebRTC shared secret and send it to both of them. + secret := RandomString(16) + log.Info("WebRTC: %s opens %s with secret %s", sub.Username, other.Username, secret) + + // Ring the target of this request and give them the secret. + other.SendJSON(Message{ + Action: ActionRing, + Username: sub.Username, + OpenSecret: secret, + }) + + // To the caller, echo back the Open along with the secret. + sub.SendJSON(Message{ + Action: ActionOpen, + Username: other.Username, + OpenSecret: secret, + }) +} + +// OnCandidate handles WebRTC candidate signaling. +func (s *Server) OnCandidate(sub *Subscriber, msg Message) { + // Look up the other subscriber. + other, err := s.GetSubscriber(msg.Username) + if err != nil { + sub.ChatServer(err.Error()) + return + } + + other.SendJSON(Message{ + Action: ActionCandidate, + Username: sub.Username, + Candidate: msg.Candidate, + }) +} + +// OnSDP handles WebRTC sdp signaling. +func (s *Server) OnSDP(sub *Subscriber, msg Message) { + // Look up the other subscriber. + other, err := s.GetSubscriber(msg.Username) + if err != nil { + sub.ChatServer(err.Error()) + return + } + + other.SendJSON(Message{ + Action: ActionSDP, + Username: sub.Username, + Description: msg.Description, + }) +} diff --git a/pkg/messages.go b/pkg/messages.go index d290527..932e0ff 100644 --- a/pkg/messages.go +++ b/pkg/messages.go @@ -2,14 +2,21 @@ package barertc type Message struct { Action string `json:"action,omitempty"` - Username string `json:"username"` - Message string `json:"message"` + Username string `json:"username,omitempty"` + Message string `json:"message",omitempty` // WhoList for `who` actions - WhoList []WhoList `json:"whoList"` + WhoList []WhoList `json:"whoList,omitempty"` // Sent on `me` actions along with Username - VideoActive bool `json:"videoActive"` // user tells us their cam status + VideoActive bool `json:"videoActive,omitempty"` // user tells us their cam status + + // Sent on `open` actions along with the (other) Username. + OpenSecret string `json:"openSecret,omitempty"` + + // Parameters sent on WebRTC signaling messages. + Candidate string `json:"candidate,omitempty"` // candidate + Description string `json:"description,omitempty"` // sdp } const ( @@ -19,10 +26,17 @@ const ( // Actions sent by server or client ActionMessage = "message" // post a message to the room ActionMe = "me" // user self-info sent by FE or BE + ActionOpen = "open" // user wants to view a webcam (open WebRTC) + ActionRing = "ring" // receiver of a WebRTC open request // Actions sent by server only ActionWhoList = "who" // server pushes the Who List ActionPresence = "presence" // a user joined or left the room + ActionError = "error" // ChatServer errors + + // WebRTC signaling messages. + ActionCandidate = "candidate" + ActionSDP = "sdp" ) // WhoList is a member entry in the chat room. diff --git a/pkg/pages.go b/pkg/pages.go index 82a7c3a..bf7f240 100644 --- a/pkg/pages.go +++ b/pkg/pages.go @@ -16,12 +16,7 @@ func IndexPage() http.HandlerFunc { tmpl.Funcs(template.FuncMap{ // Cache busting random string for JS and CSS dependency. "CacheHash": func() string { - const charset = "abcdefghijklmnopqrstuvwxyz" - var result = make([]byte, 8) - for i := 0; i < 8; i++ { - result[i] = charset[rand.Intn(len(charset))] - } - return string(result) + return RandomString(8) }, }) tmpl, err := tmpl.ParseFiles("web/templates/chat.html") @@ -34,3 +29,13 @@ func IndexPage() http.HandlerFunc { tmpl.ExecuteTemplate(w, "index", nil) }) } + +// RandomString returns a random string of any length. +func RandomString(n int) string { + const charset = "abcdefghijklmnopqrstuvwxyz" + var result = make([]byte, n) + for i := 0; i < n; i++ { + result[i] = charset[rand.Intn(len(charset))] + } + return string(result) +} diff --git a/pkg/websocket.go b/pkg/websocket.go index ca64450..7b98afa 100644 --- a/pkg/websocket.go +++ b/pkg/websocket.go @@ -3,6 +3,7 @@ package barertc import ( "context" "encoding/json" + "errors" "fmt" "net/http" "time" @@ -61,12 +62,14 @@ func (sub *Subscriber) ReadLoop(s *Server) { s.OnMessage(sub, msg) case ActionMe: s.OnMe(sub, msg) + case ActionOpen: + s.OnOpen(sub, msg) + case ActionCandidate: + s.OnCandidate(sub, msg) + case ActionSDP: + s.OnSDP(sub, msg) default: - sub.SendJSON(Message{ - Action: ActionMessage, - Username: "ChatServer", - Message: "Unsupported message type.", - }) + sub.ChatServer("Unsupported message type.") } } }() @@ -91,6 +94,15 @@ func (sub *Subscriber) SendMe() { }) } +// ChatServer is a convenience function to deliver a ChatServer error to the client. +func (sub *Subscriber) ChatServer(message string, v ...interface{}) { + sub.SendJSON(Message{ + Action: ActionError, + Username: "ChatServer", + Message: fmt.Sprintf(message, v...), + }) +} + // WebSocket handles the /ws websocket connection. func (s *Server) WebSocket() http.HandlerFunc { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -152,6 +164,18 @@ func (s *Server) AddSubscriber(sub *Subscriber) { s.subscribersMu.Unlock() } +// GetSubscriber by username. +func (s *Server) GetSubscriber(username string) (*Subscriber, error) { + s.subscribersMu.RLock() + defer s.subscribersMu.RUnlock() + for _, sub := range s.IterSubscribers(true) { + if sub.Username == username { + return sub, nil + } + } + return nil, errors.New("not found") +} + // DeleteSubscriber removes a subscriber from the server. func (s *Server) DeleteSubscriber(sub *Subscriber) { log.Error("DeleteSubscriber: %s", sub.Username) @@ -162,31 +186,24 @@ func (s *Server) DeleteSubscriber(sub *Subscriber) { // IterSubscribers loops over the subscriber list with a read lock. If the // caller already holds a lock, pass the optional `true` parameter for isLocked. -func (s *Server) IterSubscribers(isLocked ...bool) chan *Subscriber { - var out = make(chan *Subscriber) - go func() { - log.Debug("IterSubscribers START..") +func (s *Server) IterSubscribers(isLocked ...bool) []*Subscriber { + log.Debug("IterSubscribers START..") - var result = []*Subscriber{} + var result = []*Subscriber{} - // Has the caller already taken the read lock or do we get it? - if locked := len(isLocked) > 0 && isLocked[0]; !locked { - log.Debug("Taking the lock") - s.subscribersMu.RLock() - defer s.subscribersMu.RUnlock() - } + // Has the caller already taken the read lock or do we get it? + if locked := len(isLocked) > 0 && isLocked[0]; !locked { + log.Debug("Taking the lock") + s.subscribersMu.RLock() + defer s.subscribersMu.RUnlock() + } - for sub := range s.subscribers { - result = append(result, sub) - } + for sub := range s.subscribers { + result = append(result, sub) + } - for _, r := range result { - out <- r - } - close(out) - log.Debug("IterSubscribers STOP!") - }() - return out + log.Debug("IterSubscribers STOP..") + return result } // Broadcast a message to the chat room. @@ -194,7 +211,7 @@ func (s *Server) Broadcast(msg Message) { log.Debug("Broadcast: %+v", msg) s.subscribersMu.RLock() defer s.subscribersMu.RUnlock() - for sub := range s.IterSubscribers(true) { + for _, sub := range s.IterSubscribers(true) { sub.SendJSON(Message{ Action: msg.Action, Username: msg.Username, @@ -205,15 +222,19 @@ func (s *Server) Broadcast(msg Message) { // SendWhoList broadcasts the connected members to everybody in the room. func (s *Server) SendWhoList() { - var users = []WhoList{} - for sub := range s.IterSubscribers() { + var ( + users = []WhoList{} + subscribers = s.IterSubscribers() + ) + + for _, sub := range subscribers { users = append(users, WhoList{ Username: sub.Username, VideoActive: sub.VideoActive, }) } - for sub := range s.IterSubscribers() { + for _, sub := range subscribers { sub.SendJSON(Message{ Action: ActionWhoList, WhoList: users, diff --git a/web/static/js/BareRTC.js b/web/static/js/BareRTC.js index 5eb7ad2..79bdafa 100644 --- a/web/static/js/BareRTC.js +++ b/web/static/js/BareRTC.js @@ -1,5 +1,13 @@ console.log("BareRTC!"); +// WebRTC configuration. +const configuration = { + iceServers: [{ + urls: 'stun:stun.l.google.com:19302' + }] +}; + + const app = Vue.createApp({ delimiters: ['[[', ']]'], data() { @@ -27,6 +35,15 @@ const app = Vue.createApp({ stream: null, // MediaStream object }, + // WebRTC sessions with other users. + WebRTC: { + // Streams per username. + streams: {}, + + // RTCPeerConnections per username. + pc: {}, + }, + // Chat history. history: [], historyScrollbox: null, @@ -55,6 +72,10 @@ const app = Vue.createApp({ this.dial(); }, + /** + * Chat API Methods (WebSocket packets sent/received) + */ + sendMessage() { if (!this.message) { return; @@ -87,11 +108,36 @@ const app = Vue.createApp({ // in our choice of username. if (this.username != msg.username) { this.ChatServer(`Your username has been changed to ${msg.username}.`); + this.username = msg.username; } this.ChatClient(`User sync from backend: ${JSON.stringify(msg)}`); }, + // Send a video request to access a user's camera. + sendOpen(username) { + this.ws.conn.send(JSON.stringify({ + action: "open", + username: username, + })); + }, + onOpen(msg) { + // Response for the opener to begin WebRTC connection. + const secret = msg.openSecret; + console.log("OPEN: connect to %s with secret %s", msg.username, secret); + this.ChatClient(`Connecting to stream for ${msg.username}.`); + + this.startWebRTC(msg.username, true); + }, + onRing(msg) { + // Message for the receiver to begin WebRTC connection. + const secret = msg.openSecret; + console.log("RING: connection from %s with secret %s", msg.username, secret); + this.ChatServer(`${msg.username} has opened your camera.`); + + this.startWebRTC(msg.username, false); + }, + // Handle messages sent in chat. onMessage(msg) { this.pushHistory({ @@ -102,6 +148,7 @@ const app = Vue.createApp({ // Dial the WebSocket connection. dial() { + console.log("Dialing WebSocket..."); const conn = new WebSocket(`ws://${location.host}/ws`); conn.addEventListener("close", ev => { @@ -152,6 +199,24 @@ const app = Vue.createApp({ message: msg.message, }); break; + case "ring": + this.onRing(msg); + break; + case "open": + this.onOpen(msg); + break; + case "candidate": + this.onCandidate(msg); + break; + case "sdp": + this.onSDP(msg); + break; + case "error": + this.pushHistory({ + username: msg.username || 'Internal Server Error', + message: msg.message, + isChatServer: true, + }); default: console.error("Unexpected action: %s", JSON.stringify(msg)); } @@ -160,6 +225,120 @@ const app = Vue.createApp({ this.ws.conn = conn; }, + /** + * WebRTC concerns. + */ + + // Start WebRTC with the other username. + startWebRTC(username, isOfferer) { + this.ChatClient(`Begin WebRTC with ${username}.`); + let pc = new RTCPeerConnection(configuration); + this.WebRTC.pc[username] = pc; + + // 'onicecandidate' notifies us whenever an ICE agent needs to deliver a + // message to the other peer through the signaling server. + pc.onicecandidate = event => { + this.ChatClient("On ICE Candidate called"); + console.log(event); + console.log(event.candidate); + if (event.candidate) { + this.ChatClient(`Send ICE candidate: ${event.candidate}`); + this.ws.conn.send(JSON.stringify({ + action: "candidate", + username: username, + candidate: event.candidate, + })); + } + }; + + // If the user is offerer let the 'negotiationneeded' event create the offer. + if (isOfferer) { + this.ChatClient("Sending offer:"); + pc.onnegotiationneeded = () => { + this.ChatClient("Negotiation Needed, creating WebRTC offer."); + pc.createOffer().then(this.localDescCreated(pc, username)).catch(this.ChatClient); + }; + } + + // When a remote stream arrives. + pc.ontrack = event => { + const stream = event.streams[0]; + + // Do we already have it? + this.ChatClient(`Received a video stream from ${username}.`); + if (this.WebRTC.streams[username] == undefined || + this.WebRTC.streams[username].id !== stream.id) { + this.WebRTC.streams[username] = stream; + } + }; + + // If we were already broadcasting video, send our stream to + // the connecting user. + if (!isOfferer && this.webcam.active) { + this.ChatClient(`Sharing our video stream to ${username}.`); + this.webcam.stream.getTracks().forEach(track => pc.addTrack(track, this.webcam.stream)); + } + + // If we are the offerer, begin the connection. + if (isOfferer) { + pc.createOffer().then(this.localDescCreated(pc, username)).catch(this.ChatClient); + } + }, + + // Common handler function for + localDescCreated(pc, username) { + return (desc) => { + this.ChatClient(`setLocalDescription ${JSON.stringify(desc)}`); + pc.setLocalDescription( + new RTCSessionDescription(desc), + () => { + this.ws.conn.send(JSON.stringify({ + action: "sdp", + username: username, + description: JSON.stringify(pc.localDescription), + })); + }, + console.error, + ) + }; + }, + + // Handle inbound WebRTC signaling messages proxied by the websocket. + onCandidate(msg) { + if (this.WebRTC.pc[msg.username] == undefined) return; + let pc = this.WebRTC.pc[msg.username]; + + // Add the new ICE candidate. + console.log("Add ICE candidate: %s", msg.candidate); + this.ChatClient(`Received an ICE candidate from ${username}: ${msg.candidate}`); + pc.addIceCandidate( + new RTCIceCandidate( + msg.candidate, + () => {}, + console.error, + ) + ); + }, + onSDP(msg) { + if (this.WebRTC.pc[msg.username] == undefined) return; + let pc = this.WebRTC.pc[msg.username]; + let description = JSON.parse(msg.description); + + // Add the new ICE candidate. + console.log("Set description: %s", description); + this.ChatClient(`Received a Remote Description from ${msg.username}: ${msg.description}.`); + pc.setRemoteDescription(new RTCSessionDescription(description), () => { + // When receiving an offer let's answer it. + if (pc.remoteDescription.type === 'offer') { + pc.createAnswer().then(this.localDescCreated(pc, msg.username)).catch(this.ChatClient); + } + }, console.error); + }, + + /** + * Front-end web app concerns. + */ + // Start broadcasting my webcam. startVideo() { if (this.webcam.busy) return; @@ -182,6 +361,16 @@ const app = Vue.createApp({ }) }, + // Begin connecting to someone else's webcam. + openVideo(user) { + if (user.username === this.username) { + this.ChatClient("You can already see your own webcam."); + return; + } + + this.sendOpen(user.username); + }, + // Stop broadcasting. stopVideo() { this.webcam.elem.srcObject = null; diff --git a/web/templates/chat.html b/web/templates/chat.html index 3083143..a0aaca6 100644 --- a/web/templates/chat.html +++ b/web/templates/chat.html @@ -223,7 +223,8 @@