dndmusicbot/reisenstreamer/streamer.go

179 lines
2.9 KiB
Go
Raw Permalink Normal View History

package reisenstreamer
import (
"bytes"
"context"
"encoding/binary"
"log"
"sync"
"time"
"github.com/gopxl/beep"
"github.com/zergon321/reisen"
)
type Streamer struct {
sync.WaitGroup
sampleBuffer chan [2]float64
Duration time.Duration
SampleRate int
Media *reisen.Media
AudioStream *reisen.AudioStream
closed bool
Repeat bool
Started bool
}
const (
channelCount = 2
bitDepth = 8
sampleBufferSize = 32 * channelCount * bitDepth * 4096
)
func New(ctx context.Context, uri string) *Streamer {
deadline := time.Now().Add(3000 * time.Millisecond)
mediactx, mediaC := context.WithDeadline(ctx, deadline)
defer mediaC()
mediach := make(chan *reisen.Media, 1)
go func() {
if mediactx.Err() != nil {
return
}
media, err := reisen.NewMedia(uri)
if err != nil {
log.Println(err)
return
}
mediach <- media
}()
var media *reisen.Media
select {
case <-mediactx.Done():
log.Println(mediactx.Err())
return nil
case media = <-mediach:
break
}
s := new(Streamer)
s.Media = media
err := media.OpenDecode()
if err != nil {
log.Fatal(err)
}
as := media.AudioStreams()[0]
err = as.Open()
if err != nil {
log.Fatal(err)
}
s.AudioStream = as
s.Duration, _ = as.Duration()
s.SampleRate = as.SampleRate()
s.sampleBuffer = make(chan [2]float64, sampleBufferSize)
s.Add(1)
go func() {
<-ctx.Done()
// cancel sent, drain buffer
for len(s.sampleBuffer) > 0 {
<-s.sampleBuffer
}
}()
go func() {
loop:
for {
select {
case <-ctx.Done():
s.Done()
return
default:
packet, gotPacket, err := media.ReadPacket()
if err != nil {
log.Println(err)
break loop
}
if !gotPacket {
if s.Repeat {
s.AudioStream.Rewind(time.Duration(0))
continue loop
}
break loop
}
st := media.Streams()[packet.StreamIndex()].(*reisen.AudioStream)
audioFrame, gotFrame, err := st.ReadAudioFrame()
if err != nil {
log.Println(err)
break loop
}
if !gotFrame {
break loop
}
if audioFrame == nil {
continue loop
}
reader := bytes.NewReader(audioFrame.Data())
for reader.Len() >= 16 {
sample := [2]float64{0, 0}
err = binary.Read(reader, binary.LittleEndian, sample[:])
if err != nil {
log.Println(err)
break
}
s.sampleBuffer <- sample
if !s.Started {
s.Started = true
}
}
}
}
s.Done()
s.Close()
}()
return s
}
func (s *Streamer) Close() {
if s.closed {
return
}
s.Wait()
s.AudioStream.Close()
s.Media.CloseDecode()
close(s.sampleBuffer)
s.closed = true
}
func (s *Streamer) StreamerFunc() beep.StreamerFunc {
return beep.StreamerFunc(func(samples [][2]float64) (n int, ok bool) {
for i := 0; i < len(samples); i++ {
sample, ok := <-s.sampleBuffer
if !ok {
return i, false
}
samples[i] = sample
}
return len(samples), true
})
}