From 31bab910d2aea47b77f959ffb765b8dd88644da4 Mon Sep 17 00:00:00 2001 From: Varun Patil Date: Thu, 10 Nov 2022 07:34:07 -0800 Subject: [PATCH] Ugh --- stream.go | 57 ++++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 42 insertions(+), 15 deletions(-) diff --git a/stream.go b/stream.go index ec6b9a27..edac4603 100644 --- a/stream.go +++ b/stream.go @@ -89,7 +89,7 @@ func (s *Stream) ServeChunk(w http.ResponseWriter, id int) error { // Will have this soon enough foundBehind := false - for i := id - 1; i > id-4 && i >= 0; i++ { + for i := id - 1; i > id-4 && i >= 0; i-- { if _, ok := s.chunks[i]; ok { foundBehind = true } @@ -168,6 +168,12 @@ func (s *Stream) waitForChunk(w http.ResponseWriter, chunk *Chunk) { } func (s *Stream) restartAtChunk(w http.ResponseWriter, id int) { + // Stop current transcoder + if s.coder != nil { + s.coder.Process.Kill() + s.coder.Wait() + } + // Clear everything s.chunks = make(map[int]*Chunk) @@ -288,10 +294,18 @@ func (s *Stream) getTsPath(id int) string { // Separate goroutine func (s *Stream) monitorTranscodeOutput(cmdStdOut io.ReadCloser, startAt float64) { + s.mutex.Lock() + coder := s.coder + s.mutex.Unlock() + defer cmdStdOut.Close() stdoutReader := bufio.NewReader(cmdStdOut) for { + if s.coder != coder { + break + } + line, err := stdoutReader.ReadBytes('\n') if err == io.EOF { if len(line) == 0 { @@ -307,23 +321,36 @@ func (s *Stream) monitorTranscodeOutput(cmdStdOut io.ReadCloser, startAt float64 l := string(line) if strings.Contains(l, ".ts") { - // 1080p-000003.ts - idx := strings.Split(strings.Split(l, "-")[1], ".")[0] - id, err := strconv.Atoi(idx) - if err != nil { - log.Println("Error parsing chunk id") - } + go func() { + // 1080p-000003.ts + idx := strings.Split(strings.Split(l, "-")[1], ".")[0] + id, err := strconv.Atoi(idx) + if err != nil { + log.Println("Error parsing chunk id") + } - s.mutex.Lock() - chunk := s.createChunk(id) - chunk.done = true - for _, n := range chunk.notifs { - n <- true - } - s.mutex.Unlock() + s.mutex.Lock() + defer s.mutex.Unlock() + + if s.coder != coder { + return + } + + chunk := s.createChunk(id) + chunk.done = true + for _, n := range chunk.notifs { + n <- true + } + }() } - // log.Println("ffmpeg:", l) + log.Println("ffmpeg:", l) + } + + // Join the process + err := s.coder.Wait() + if err != nil { + log.Println("FFmpeg process wait failed with", err) } }