monorepo
Varun Patil 2022-11-10 07:34:07 -08:00
parent 339b7f1e9e
commit 31bab910d2
1 changed files with 42 additions and 15 deletions

View File

@ -89,7 +89,7 @@ func (s *Stream) ServeChunk(w http.ResponseWriter, id int) error {
// Will have this soon enough
foundBehind := false
for i := id - 1; i > id-4 && i >= 0; i++ {
for i := id - 1; i > id-4 && i >= 0; i-- {
if _, ok := s.chunks[i]; ok {
foundBehind = true
}
@ -168,6 +168,12 @@ func (s *Stream) waitForChunk(w http.ResponseWriter, chunk *Chunk) {
}
func (s *Stream) restartAtChunk(w http.ResponseWriter, id int) {
// Stop current transcoder
if s.coder != nil {
s.coder.Process.Kill()
s.coder.Wait()
}
// Clear everything
s.chunks = make(map[int]*Chunk)
@ -288,10 +294,18 @@ func (s *Stream) getTsPath(id int) string {
// Separate goroutine
func (s *Stream) monitorTranscodeOutput(cmdStdOut io.ReadCloser, startAt float64) {
s.mutex.Lock()
coder := s.coder
s.mutex.Unlock()
defer cmdStdOut.Close()
stdoutReader := bufio.NewReader(cmdStdOut)
for {
if s.coder != coder {
break
}
line, err := stdoutReader.ReadBytes('\n')
if err == io.EOF {
if len(line) == 0 {
@ -307,6 +321,7 @@ func (s *Stream) monitorTranscodeOutput(cmdStdOut io.ReadCloser, startAt float64
l := string(line)
if strings.Contains(l, ".ts") {
go func() {
// 1080p-000003.ts
idx := strings.Split(strings.Split(l, "-")[1], ".")[0]
id, err := strconv.Atoi(idx)
@ -315,15 +330,27 @@ func (s *Stream) monitorTranscodeOutput(cmdStdOut io.ReadCloser, startAt float64
}
s.mutex.Lock()
defer s.mutex.Unlock()
if s.coder != coder {
return
}
chunk := s.createChunk(id)
chunk.done = true
for _, n := range chunk.notifs {
n <- true
}
s.mutex.Unlock()
}()
}
// log.Println("ffmpeg:", l)
log.Println("ffmpeg:", l)
}
// Join the process
err := s.coder.Wait()
if err != nil {
log.Println("FFmpeg process wait failed with", err)
}
}