From d072f85d051a141aa410b28d391ceeb3e2e37035 Mon Sep 17 00:00:00 2001 From: Stein Ivar Berghei Date: Sat, 4 Nov 2023 11:16:48 +0100 Subject: [PATCH] Make websockets less jank, fixes #10 --- ambiance.go | 52 ++++++++++------------ events.go | 121 +++++++++++++++++++++++----------------------------- routes.go | 2 +- ws.go | 66 +++++++++++++++------------- ytdl.go | 58 ++++++++++++------------- 5 files changed, 141 insertions(+), 158 deletions(-) diff --git a/ambiance.go b/ambiance.go index 40e6b48..4e1b42c 100644 --- a/ambiance.go +++ b/ambiance.go @@ -69,20 +69,17 @@ func GetAmbiances() (amb []Ambiance, err error) { func AddAmbiance(uri, title string) (Ambiance, error) { var amb Ambiance - msg := make(map[string]interface{}) - msg["event"] = "ambiance_add_start" - data := make(map[string]string) - data["name"] = title - msg["payload"] = data - ws_msg <- msg + ev := Event{"ambiance_add_start", map[string]string{ + "name": title, + }} + + go ws.SendEvent(ev) defer func() { - msg = make(map[string]interface{}) - msg["event"] = "ambiance_add_finish" - data = make(map[string]string) - data["name"] = title - msg["payload"] = data - ws_msg <- msg + ev = Event{"ambiance_add_finish", map[string]string{ + "name": title, + }} + go ws.SendEvent(ev) }() tmpfile, err := exec.Command("mktemp", "/tmp/dnd_XXXXXXXXXXXX.opus").Output() @@ -134,16 +131,13 @@ func AddAmbiance(uri, title string) (Ambiance, error) { log.Printf("Start ffmpeg to extract audio to %s", string(tmpfile)) - msg = make(map[string]interface{}) - msg["event"] = "ambiance_encode_start" - data = make(map[string]string) - data["name"] = title - msg["payload"] = data - ws_msg <- msg + ev = Event{"ambiance_encode_start", map[string]string{ + "name": title, + }} - msg = make(map[string]interface{}) - msg["event"] = "ambiance_encode_progress" - data = make(map[string]string) + go ws.SendEvent(ev) + + data := make(map[string]string) data["name"] = title scanner := bufio.NewScanner(ffprogress) @@ -162,8 +156,7 @@ func AddAmbiance(uri, title string) (Ambiance, error) { data["percent"] = percent } - msg["payload"] = data - ws_msg <- msg + go ws.SendEvent(Event{"ambiance_encode_progress", data}) }) } @@ -176,15 +169,14 @@ func AddAmbiance(uri, title string) (Ambiance, error) { return amb, err } - msg = make(map[string]interface{}) - msg["event"] = "ambiance_encode_complete" - data = make(map[string]string) - data["name"] = title - msg["payload"] = data - ws_msg <- msg + ev = Event{"ambiance_encode_complete", map[string]string{ + "name": title, + }} + + go ws.SendEvent(ev) id := uuid.New() - fn := filepath.Join(config.GetString("ambiance.path"), fmt.Sprintf("%s.opus", id.String())) + fn := filepath.Join(config.GetString("ambiance.path"), fmt.Sprintf("%s.aac", id.String())) log.Printf("Moving to %s", fn) diff --git a/events.go b/events.go index 5d83398..c34c524 100644 --- a/events.go +++ b/events.go @@ -148,74 +148,70 @@ func (app *App) volset(payload ...interface{}) { amb_volume.Volume = vol } - app.sendVolume() + ev := Event{"volume", map[string]float64{ + "playlist": pl_volume.Volume, + "ambiance": amb_volume.Volume, + }} + + go ws.SendEvent(ev) } -func (app *App) sendVolume() { - msg := make(map[string]interface{}) - out := make(map[string]float64) - msg["event"] = "volume" - out["playlist"] = pl_volume.Volume - out["ambiance"] = amb_volume.Volume - msg["payload"] = out - ws_msg <- msg -} +func (app *App) songInfoEvent(event string) (ev Event, err error) { + ev.Event = event -func (app *App) songInfoEvent(event string) map[string]interface{} { - msg := make(map[string]interface{}) - msg["event"] = event status, err := app.mpd.Status() if err != nil { - log.Println(err) - return nil + return } cur, err := app.mpd.CurrentSong() if err != nil { - log.Println(err) - return nil + return } info := new(SongInfo) if status["state"] != "play" { info.Pause = true - msg["payload"] = info - return msg + ev.Payload = info + return } duration, ok := status["duration"] if ok && duration != "" { - slen, err := strconv.ParseFloat(duration, 64) + var slen float64 + slen, err = strconv.ParseFloat(duration, 64) if err != nil { log.Println(err) - return nil + return } info.Length = time.Duration(slen * float64(time.Second)).Milliseconds() } elapsed, ok := status["elapsed"] if ok && elapsed != "" { - spos, err := strconv.ParseFloat(elapsed, 64) + var spos float64 + spos, err = strconv.ParseFloat(elapsed, 64) if err != nil { log.Println(err) - return nil + return } info.Position = time.Duration(spos * float64(time.Second)).Milliseconds() } album, ok := cur["Album"] if ok { - plid, err := uuid.ParseBytes([]byte(album)) + var plid uuid.UUID + plid, err = uuid.ParseBytes([]byte(album)) if err != nil { log.Println(err) - return nil + return } - - pl, err := app.GetPlaylist(plid) + var pl *Playlist + pl, err = app.GetPlaylist(plid) if err != nil { log.Println(err) - return nil + return } info.Playlist = pl.Id @@ -237,9 +233,9 @@ func (app *App) songInfoEvent(event string) map[string]interface{} { info.Song = location } - msg["payload"] = *info + ev.Payload = *info - return msg + return ev, nil } func (app *App) ambiancePlay(payload ...interface{}) { @@ -292,15 +288,13 @@ func (app *App) ambiancePlay(payload ...interface{}) { log.Fatal(err) } - msg := make(map[string]interface{}) - out := make(map[string]interface{}) - app.curamb = amb - msg["event"] = "ambiance_play" - out["id"] = id - msg["payload"] = out - ws_msg <- msg + ev := Event{"ambiance_play", map[string]string{ + "id": id, + }} + + go ws.SendEvent(ev) } func (app *App) ambianceStop(payload ...interface{}) { @@ -319,10 +313,7 @@ func (app *App) ambianceStop(payload ...interface{}) { log.Fatal(err) } - msg := make(map[string]interface{}) - msg["event"] = "ambiance_stop" - ws_msg <- msg - + go ws.SendEvent(Event{"ambiance_stop", nil}) } func (app *App) ambianceAdd(payload ...interface{}) { @@ -359,13 +350,12 @@ func (app *App) ambianceAdd(payload ...interface{}) { return } - msg := make(map[string]interface{}) - out := make(map[string]interface{}) - msg["event"] = "ambiance_add" - out["title"] = amb.Title - out["id"] = amb.Id - msg["payload"] = out - ws_msg <- msg + ev := Event{"ambiance_add", map[string]string{ + "title": amb.Title, + "id": amb.Id, + }} + + go ws.SendEvent(ev) } func (app *App) songPosition(payload ...interface{}) { @@ -380,18 +370,16 @@ func (app *App) songPosition(payload ...interface{}) { } l.Do(func() { - msg := make(map[string]interface{}) - out := make(map[string]interface{}) slen, _ := strconv.ParseFloat(status["duration"], 64) spos, _ := strconv.ParseFloat(status["elapsed"], 64) - msg["event"] = "song_position" - out["len"] = time.Duration(slen * float64(time.Second)).Milliseconds() - out["position"] = time.Duration(spos * float64(time.Second)).Milliseconds() + ev := Event{"song_position", map[string]int64{ + "len": time.Duration(slen * float64(time.Second)).Milliseconds(), + "position": time.Duration(spos * float64(time.Second)).Milliseconds(), + }} - msg["payload"] = out - ws_msg <- msg + go ws.SendEvent(ev) }) } @@ -399,10 +387,12 @@ func (app *App) songPosition(payload ...interface{}) { func (app *App) songInfo(payload ...interface{}) { log.Println("song_info event received") - msg := app.songInfoEvent("song_info") - if msg != nil { - ws_msg <- msg + ev, err := app.songInfoEvent("song_info") + if err != nil { + log.Println(err) + return } + go ws.SendEvent(ev) } func (app *App) stop(payload ...interface{}) { @@ -415,10 +405,7 @@ func (app *App) stop(payload ...interface{}) { app.mpd.Stop() app.plmutex.Unlock() - msg := make(map[string]interface{}) - msg["event"] = "stop" - - ws_msg <- msg + go ws.SendEvent(Event{"stop", nil}) } func (app *App) prevSong(payload ...interface{}) { @@ -492,12 +479,12 @@ func (app *App) addPlaylist(payload ...interface{}) { log.Println("Error getting youtube playlist info,", plid) } - msg := make(map[string]interface{}) + ev := Event{"new_playlist", map[string]string{ + "url": id.String(), + "title": pltitle, + }} - msg["event"] = "new_playlist" - msg["payload"] = map[string]string{"url": id.String(), "title": pltitle} - - ws_msg <- msg + go ws.SendEvent(ev) } func (app *App) loadPlaylist(payload ...interface{}) { diff --git a/routes.go b/routes.go index 58cf005..c5ed9f2 100644 --- a/routes.go +++ b/routes.go @@ -69,7 +69,7 @@ func init() { return } - err = handleWS(conn) + err = ws.join(conn) if err != nil { log.Printf("WS connection closed, %v\n", r.RemoteAddr) } diff --git a/ws.go b/ws.go index 097877f..d9e4df0 100644 --- a/ws.go +++ b/ws.go @@ -12,18 +12,21 @@ import ( "github.com/kataras/go-events" ) +type Websocket struct { + sync.Mutex + + clients *bcast.Group +} + +var ws *Websocket + func init() { log.Println("ws.go loading..") - go ws_clients.Broadcast(0) - ws_msg = make(chan interface{}) + ws = new(Websocket) - go func() { - var msg interface{} - for { - msg = <-ws_msg - ws_clients.Send(msg) - } - }() + ws.clients = bcast.NewGroup() + + go ws.clients.Broadcast(0) log.Println("ws.go done.") } @@ -33,12 +36,19 @@ type WSmsg struct { Payload json.RawMessage } -var ws_clients = bcast.NewGroup() -var ws_msg chan interface{} -var WSMutex = &sync.Mutex{} +type Event struct { + Event string `json:"event"` + Payload any `json:"payload,omitempty"` +} -func handleWS(c *websocket.Conn) error { - memb := ws_clients.Join() +func (ws *Websocket) SendEvent(e Event) { + ws.Lock() + ws.clients.Send(e) + ws.Unlock() +} + +func (ws *Websocket) join(c *websocket.Conn) error { + memb := ws.clients.Join() defer memb.Close() ctx, cancel := context.WithCancel(context.Background()) @@ -64,29 +74,27 @@ func handleWS(c *websocket.Conn) error { return nil }) - msg := app.songInfoEvent("song_info") + msg, err := app.songInfoEvent("song_info") + if err != nil { + return err + } - vol := make(map[string]interface{}) - volout := make(map[string]float64) - vol["event"] = "volume" - volout["playlist"] = pl_volume.Volume - volout["ambiance"] = amb_volume.Volume - vol["payload"] = volout + vol := Event{"volume", map[string]float64{ + "playlist": pl_volume.Volume, + "ambiance": amb_volume.Volume, + }} c.SetWriteDeadline(time.Now().Add(10 * time.Second)) c.WriteJSON(msg) c.WriteJSON(vol) if app.ambiance.Len() > 0 { - msg := make(map[string]interface{}) - out := make(map[string]interface{}) - msg["event"] = "ambiance_play" - out["id"] = app.curamb.Id - msg["payload"] = out + msg := Event{"ambiance_play", map[string]string{ + "id": app.curamb.Id, + }} c.WriteJSON(msg) } else { - msg := make(map[string]interface{}) - msg["event"] = "ambiance_stop" + msg := Event{"ambiance_stop", nil} c.WriteJSON(msg) } @@ -97,6 +105,6 @@ func handleWS(c *websocket.Conn) error { return err } - app.events.Emit(events.EventName(msg.Event), msg.Payload) + app.events.Emit(events.EventName(msg.Event), msg.Payload, memb) } } diff --git a/ytdl.go b/ytdl.go index 93b1c6d..22e3aba 100644 --- a/ytdl.go +++ b/ytdl.go @@ -1,23 +1,42 @@ package main import ( - "bufio" - "encoding/json" "fmt" - "log" - "math" "net/url" - "os" - "os/exec" - "strconv" "time" "golang.org/x/time/rate" ) var prate = rate.Sometimes{Interval: 1 * time.Second} -var yturl = "https://youtu.be/%s" +// var yturl = "https://youtu.be/%s" + +func YTUrl(uri string) (vid string, err error) { + u, err := url.Parse(uri) + if err != nil { + return "", err + } + + switch u.Host { + case "youtu.be": + vid = u.Path[1:] + case "m.youtube.com": + fallthrough + case "youtube.com": + fallthrough + case "www.youtube.com": + vid = u.Query().Get("v") + } + + if vid == "" { + return vid, fmt.Errorf("unable to parse vid") + } + + return +} + +/* func NewYTdlUrl(vid string) ([]byte, error) { ytdl := config.GetString("youtube.ytdl") yt := exec.Command( @@ -132,29 +151,6 @@ func NewYTdl(vid string) ([]byte, error) { return tmpfile, nil } -func YTUrl(uri string) (vid string, err error) { - u, err := url.Parse(uri) - if err != nil { - return "", err - } - - switch u.Host { - case "youtu.be": - vid = u.Path[1:] - case "m.youtube.com": - fallthrough - case "youtube.com": - fallthrough - case "www.youtube.com": - vid = u.Query().Get("v") - } - - if vid == "" { - return vid, fmt.Errorf("unable to parse vid") - } - - return -} /* func DownloadAmbiance(uri string, name string) error {