package reisenstreamer import ( "bytes" "context" "encoding/binary" "log" "sync" "time" "github.com/gopxl/beep" "github.com/zergon321/reisen" ) type Streamer struct { sync.WaitGroup sampleBuffer chan [2]float64 Duration time.Duration SampleRate int Media *reisen.Media AudioStream *reisen.AudioStream closed bool Repeat bool Started bool } const ( channelCount = 2 bitDepth = 8 sampleBufferSize = 32 * channelCount * bitDepth * 4096 ) func New(ctx context.Context, uri string) *Streamer { deadline := time.Now().Add(3000 * time.Millisecond) mediactx, mediaC := context.WithDeadline(ctx, deadline) defer mediaC() mediach := make(chan *reisen.Media, 1) go func() { if mediactx.Err() != nil { return } media, err := reisen.NewMedia(uri) if err != nil { log.Println(err) return } mediach <- media }() var media *reisen.Media select { case <-mediactx.Done(): log.Println(mediactx.Err()) return nil case media = <-mediach: break } s := new(Streamer) s.Media = media err := media.OpenDecode() if err != nil { log.Fatal(err) } as := media.AudioStreams()[0] err = as.Open() if err != nil { log.Fatal(err) } s.AudioStream = as s.Duration, _ = as.Duration() s.SampleRate = as.SampleRate() s.sampleBuffer = make(chan [2]float64, sampleBufferSize) s.Add(1) go func() { <-ctx.Done() // cancel sent, drain buffer for len(s.sampleBuffer) > 0 { <-s.sampleBuffer } }() go func() { loop: for { select { case <-ctx.Done(): s.Done() return default: packet, gotPacket, err := media.ReadPacket() if err != nil { log.Println(err) break loop } if !gotPacket { if s.Repeat { s.AudioStream.Rewind(time.Duration(0)) continue loop } break loop } st := media.Streams()[packet.StreamIndex()].(*reisen.AudioStream) audioFrame, gotFrame, err := st.ReadAudioFrame() if err != nil { log.Println(err) break loop } if !gotFrame { break loop } if audioFrame == nil { continue loop } reader := bytes.NewReader(audioFrame.Data()) for reader.Len() >= 16 { sample := [2]float64{0, 0} err = binary.Read(reader, binary.LittleEndian, sample[:]) if err != nil { log.Println(err) break } s.sampleBuffer <- sample if !s.Started { s.Started = true } } } } s.Done() s.Close() }() return s } func (s *Streamer) Close() { if s.closed { return } s.Wait() s.AudioStream.Close() s.Media.CloseDecode() close(s.sampleBuffer) s.closed = true } func (s *Streamer) StreamerFunc() beep.StreamerFunc { return beep.StreamerFunc(func(samples [][2]float64) (n int, ok bool) { for i := 0; i < len(samples); i++ { sample, ok := <-s.sampleBuffer if !ok { return i, false } samples[i] = sample } return len(samples), true }) }