diff --git a/pkg/config.go b/pkg/config.go new file mode 100644 index 0000000..64c9648 --- /dev/null +++ b/pkg/config.go @@ -0,0 +1,5 @@ +package barertc + +import "time" + +const PingInterval = 60 * time.Second diff --git a/pkg/messages.go b/pkg/messages.go index 932e0ff..387e4f1 100644 --- a/pkg/messages.go +++ b/pkg/messages.go @@ -15,8 +15,8 @@ type Message struct { OpenSecret string `json:"openSecret,omitempty"` // Parameters sent on WebRTC signaling messages. - Candidate string `json:"candidate,omitempty"` // candidate - Description string `json:"description,omitempty"` // sdp + Candidate map[string]interface{} `json:"candidate,omitempty"` // candidate + Description map[string]interface{} `json:"description,omitempty"` // sdp } const ( @@ -30,6 +30,7 @@ const ( ActionRing = "ring" // receiver of a WebRTC open request // Actions sent by server only + ActionPing = "ping" ActionWhoList = "who" // server pushes the Who List ActionPresence = "presence" // a user joined or left the room ActionError = "error" // ChatServer errors diff --git a/pkg/websocket.go b/pkg/websocket.go index 7b98afa..7b5fdd8 100644 --- a/pkg/websocket.go +++ b/pkg/websocket.go @@ -20,6 +20,7 @@ type Subscriber struct { VideoActive bool conn *websocket.Conn ctx context.Context + cancel context.CancelFunc messages chan []byte closeSlow func() } @@ -30,7 +31,7 @@ func (sub *Subscriber) ReadLoop(s *Server) { for { msgType, data, err := sub.conn.Read(sub.ctx) if err != nil { - log.Error("ReadLoop error: %+v", err) + log.Error("ReadLoop error(%s): %+v", sub.Username, err) s.DeleteSubscriber(sub) s.Broadcast(Message{ Action: ActionPresence, @@ -119,11 +120,12 @@ func (s *Server) WebSocket() http.HandlerFunc { // CloseRead starts a goroutine that will read from the connection // until it is closed. // ctx := c.CloseRead(r.Context()) - ctx, _ := context.WithCancel(r.Context()) + ctx, cancel := context.WithCancel(r.Context()) sub := &Subscriber{ conn: c, ctx: ctx, + cancel: cancel, messages: make(chan []byte, s.subscriberMessageBuffer), closeSlow: func() { c.Close(websocket.StatusPolicyViolation, "connection too slow to keep up with messages") @@ -134,6 +136,7 @@ func (s *Server) WebSocket() http.HandlerFunc { // defer s.DeleteSubscriber(sub) go sub.ReadLoop(s) + pinger := time.NewTicker(PingInterval) for { select { case msg := <-sub.messages: @@ -141,7 +144,13 @@ func (s *Server) WebSocket() http.HandlerFunc { if err != nil { return } + case timestamp := <-pinger.C: + sub.SendJSON(Message{ + Action: ActionPing, + Message: timestamp.Format(time.RFC3339), + }) case <-ctx.Done(): + pinger.Stop() return } } @@ -179,6 +188,12 @@ func (s *Server) GetSubscriber(username string) (*Subscriber, error) { // DeleteSubscriber removes a subscriber from the server. func (s *Server) DeleteSubscriber(sub *Subscriber) { log.Error("DeleteSubscriber: %s", sub.Username) + + // Cancel its context to clean up the for-loop goroutine. + if sub.cancel != nil { + sub.cancel() + } + s.subscribersMu.Lock() delete(s.subscribers, sub) s.subscribersMu.Unlock() diff --git a/web/static/js/BareRTC.js b/web/static/js/BareRTC.js index 79bdafa..1cb9825 100644 --- a/web/static/js/BareRTC.js +++ b/web/static/js/BareRTC.js @@ -125,7 +125,7 @@ const app = Vue.createApp({ // Response for the opener to begin WebRTC connection. const secret = msg.openSecret; console.log("OPEN: connect to %s with secret %s", msg.username, secret); - this.ChatClient(`Connecting to stream for ${msg.username}.`); + this.ChatClient(`onOpen called for ${msg.username}.`); this.startWebRTC(msg.username, true); }, @@ -137,6 +137,11 @@ const app = Vue.createApp({ this.startWebRTC(msg.username, false); }, + onUserExited(msg) { + // A user has logged off the server. Clean up any WebRTC connections. + delete(this.WebRTC.streams[msg.username]); + delete(this.WebRTC.pc[msg.username]); + }, // Handle messages sent in chat. onMessage(msg) { @@ -149,7 +154,8 @@ const app = Vue.createApp({ // Dial the WebSocket connection. dial() { console.log("Dialing WebSocket..."); - const conn = new WebSocket(`ws://${location.host}/ws`); + const proto = location.protocol === 'https:' ? 'wss' : 'ws'; + const conn = new WebSocket(`${proto}://${location.host}/ws`); conn.addEventListener("close", ev => { this.ws.connected = false; @@ -193,6 +199,11 @@ const app = Vue.createApp({ this.onMessage(msg); break; case "presence": + // TODO: make a dedicated leave event + if (msg.message.indexOf("has exited the room!") > -1) { + // Clean up data about this user. + this.onUserExited(msg); + } this.pushHistory({ action: msg.action, username: msg.username, @@ -235,14 +246,21 @@ const app = Vue.createApp({ let pc = new RTCPeerConnection(configuration); this.WebRTC.pc[username] = pc; + // Create a data channel so we have something to connect over even if + // the local user is not broadcasting their own camera. + let dataChannel = pc.createDataChannel("data"); + dataChannel.addEventListener("open", event => { + // beginTransmission(dataChannel); + }) + // 'onicecandidate' notifies us whenever an ICE agent needs to deliver a // message to the other peer through the signaling server. pc.onicecandidate = event => { - this.ChatClient("On ICE Candidate called"); - console.log(event); - console.log(event.candidate); + console.error("WebRTC OnICECandidate called!", event); + // this.ChatClient("On ICE Candidate called!"); if (event.candidate) { - this.ChatClient(`Send ICE candidate: ${event.candidate}`); + // this.ChatClient(`Send ICE candidate: ${JSON.stringify(event.candidate)}`); + console.log("Sending candidate to websockets:", event.candidate); this.ws.conn.send(JSON.stringify({ action: "candidate", username: username, @@ -253,8 +271,9 @@ const app = Vue.createApp({ // If the user is offerer let the 'negotiationneeded' event create the offer. if (isOfferer) { - this.ChatClient("Sending offer:"); + this.ChatClient("We are the offerer - set up onNegotiationNeeded"); pc.onnegotiationneeded = () => { + console.error("WebRTC OnNegotiationNeeded called!"); this.ChatClient("Negotiation Needed, creating WebRTC offer."); pc.createOffer().then(this.localDescCreated(pc, username)).catch(this.ChatClient); }; @@ -262,6 +281,8 @@ const app = Vue.createApp({ // When a remote stream arrives. pc.ontrack = event => { + this.ChatServer("ON TRACK CALLED!!!"); + console.error("WebRTC OnTrack called!", event); const stream = event.streams[0]; // Do we already have it? @@ -270,13 +291,28 @@ const app = Vue.createApp({ this.WebRTC.streams[username].id !== stream.id) { this.WebRTC.streams[username] = stream; } + + window.requestAnimationFrame(() => { + this.ChatServer("Setting -
- y + +
+ [[username]] - [[stream]]
y