179 lines
2.9 KiB
Go
179 lines
2.9 KiB
Go
|
package reisenstreamer
|
||
|
|
||
|
import (
|
||
|
"bytes"
|
||
|
"context"
|
||
|
"encoding/binary"
|
||
|
"log"
|
||
|
"sync"
|
||
|
"time"
|
||
|
|
||
|
"github.com/gopxl/beep"
|
||
|
"github.com/zergon321/reisen"
|
||
|
)
|
||
|
|
||
|
type Streamer struct {
|
||
|
sync.WaitGroup
|
||
|
|
||
|
sampleBuffer chan [2]float64
|
||
|
Duration time.Duration
|
||
|
SampleRate int
|
||
|
Media *reisen.Media
|
||
|
AudioStream *reisen.AudioStream
|
||
|
closed bool
|
||
|
Repeat bool
|
||
|
Started bool
|
||
|
}
|
||
|
|
||
|
const (
|
||
|
channelCount = 2
|
||
|
bitDepth = 8
|
||
|
sampleBufferSize = 32 * channelCount * bitDepth * 4096
|
||
|
)
|
||
|
|
||
|
func New(ctx context.Context, uri string) *Streamer {
|
||
|
deadline := time.Now().Add(3000 * time.Millisecond)
|
||
|
|
||
|
mediactx, mediaC := context.WithDeadline(ctx, deadline)
|
||
|
defer mediaC()
|
||
|
|
||
|
mediach := make(chan *reisen.Media, 1)
|
||
|
go func() {
|
||
|
if mediactx.Err() != nil {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
media, err := reisen.NewMedia(uri)
|
||
|
if err != nil {
|
||
|
log.Println(err)
|
||
|
return
|
||
|
}
|
||
|
mediach <- media
|
||
|
}()
|
||
|
|
||
|
var media *reisen.Media
|
||
|
select {
|
||
|
case <-mediactx.Done():
|
||
|
log.Println(mediactx.Err())
|
||
|
return nil
|
||
|
case media = <-mediach:
|
||
|
break
|
||
|
}
|
||
|
|
||
|
s := new(Streamer)
|
||
|
s.Media = media
|
||
|
err := media.OpenDecode()
|
||
|
if err != nil {
|
||
|
log.Fatal(err)
|
||
|
}
|
||
|
|
||
|
as := media.AudioStreams()[0]
|
||
|
err = as.Open()
|
||
|
if err != nil {
|
||
|
log.Fatal(err)
|
||
|
}
|
||
|
s.AudioStream = as
|
||
|
s.Duration, _ = as.Duration()
|
||
|
s.SampleRate = as.SampleRate()
|
||
|
|
||
|
s.sampleBuffer = make(chan [2]float64, sampleBufferSize)
|
||
|
s.Add(1)
|
||
|
|
||
|
go func() {
|
||
|
<-ctx.Done()
|
||
|
|
||
|
// cancel sent, drain buffer
|
||
|
for len(s.sampleBuffer) > 0 {
|
||
|
<-s.sampleBuffer
|
||
|
}
|
||
|
}()
|
||
|
|
||
|
go func() {
|
||
|
|
||
|
loop:
|
||
|
for {
|
||
|
select {
|
||
|
case <-ctx.Done():
|
||
|
s.Done()
|
||
|
return
|
||
|
default:
|
||
|
packet, gotPacket, err := media.ReadPacket()
|
||
|
if err != nil {
|
||
|
log.Println(err)
|
||
|
break loop
|
||
|
}
|
||
|
if !gotPacket {
|
||
|
if s.Repeat {
|
||
|
s.AudioStream.Rewind(time.Duration(0))
|
||
|
continue loop
|
||
|
}
|
||
|
|
||
|
break loop
|
||
|
}
|
||
|
st := media.Streams()[packet.StreamIndex()].(*reisen.AudioStream)
|
||
|
audioFrame, gotFrame, err := st.ReadAudioFrame()
|
||
|
if err != nil {
|
||
|
log.Println(err)
|
||
|
break loop
|
||
|
}
|
||
|
|
||
|
if !gotFrame {
|
||
|
break loop
|
||
|
}
|
||
|
|
||
|
if audioFrame == nil {
|
||
|
continue loop
|
||
|
}
|
||
|
|
||
|
reader := bytes.NewReader(audioFrame.Data())
|
||
|
for reader.Len() >= 16 {
|
||
|
sample := [2]float64{0, 0}
|
||
|
err = binary.Read(reader, binary.LittleEndian, sample[:])
|
||
|
if err != nil {
|
||
|
log.Println(err)
|
||
|
break
|
||
|
}
|
||
|
|
||
|
s.sampleBuffer <- sample
|
||
|
if !s.Started {
|
||
|
s.Started = true
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
s.Done()
|
||
|
s.Close()
|
||
|
}()
|
||
|
|
||
|
return s
|
||
|
}
|
||
|
|
||
|
func (s *Streamer) Close() {
|
||
|
if s.closed {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
s.Wait()
|
||
|
|
||
|
s.AudioStream.Close()
|
||
|
s.Media.CloseDecode()
|
||
|
|
||
|
close(s.sampleBuffer)
|
||
|
s.closed = true
|
||
|
}
|
||
|
|
||
|
func (s *Streamer) StreamerFunc() beep.StreamerFunc {
|
||
|
return beep.StreamerFunc(func(samples [][2]float64) (n int, ok bool) {
|
||
|
for i := 0; i < len(samples); i++ {
|
||
|
sample, ok := <-s.sampleBuffer
|
||
|
if !ok {
|
||
|
return i, false
|
||
|
}
|
||
|
|
||
|
samples[i] = sample
|
||
|
}
|
||
|
|
||
|
return len(samples), true
|
||
|
})
|
||
|
}
|