From d466a1b7b8e1805270edae71f697d43dab02b5f3 Mon Sep 17 00:00:00 2001 From: Varun Patil Date: Thu, 10 Nov 2022 18:49:55 -0800 Subject: [PATCH] Pruning --- manager.go | 5 ++++ stream.go | 69 +++++++++++++++++++++++++++++++++++++++++++++++------- 2 files changed, 66 insertions(+), 8 deletions(-) diff --git a/manager.go b/manager.go index 33f43e2b..9330dfa9 100644 --- a/manager.go +++ b/manager.go @@ -68,6 +68,11 @@ func NewManager(c *Config, path string, id string, close chan string) (*Manager, bitrate: int(float64(highest) * 1.5), } + // Start all streams + for _, stream := range m.streams { + go stream.Run() + } + log.Printf("%s: new manager for %s", m.id, m.path) return m, nil diff --git a/stream.go b/stream.go index 3d2e754e..ce9b2e26 100644 --- a/stream.go +++ b/stream.go @@ -43,6 +43,56 @@ type Stream struct { chunks map[int]*Chunk coder *exec.Cmd + + inactive int +} + +func (s *Stream) Run() { + // run every 5s + t := time.NewTicker(5 * time.Second) + defer t.Stop() + + for { + <-t.C + s.mutex.Lock() + // Prune chunks + for id := range s.chunks { + if id < s.goal-s.c.goalBufferMax { + s.pruneChunk(id) + } + } + s.inactive++ + if s.inactive >= 4 { + t.Stop() + s.Stop() + s.mutex.Unlock() + return + } + s.mutex.Unlock() + } +} + +func (s *Stream) Stop() { + log.Printf("%s-%s: stopping stream", s.m.id, s.quality) + + for _, chunk := range s.chunks { + // Delete files + s.pruneChunk(chunk.id) + } + + s.chunks = make(map[int]*Chunk) + s.goal = 0 + + if s.coder != nil { + s.coder.Process.Kill() + s.coder = nil + } +} + +func (s *Stream) StopWithLock() { + s.mutex.Lock() + defer s.mutex.Unlock() + s.Stop() } func (s *Stream) ServeList(w http.ResponseWriter) error { @@ -77,6 +127,7 @@ func (s *Stream) ServeChunk(w http.ResponseWriter, id int) error { s.mutex.Lock() defer s.mutex.Unlock() + s.inactive = 0 s.checkGoal(id) // Already have this chunk @@ -122,6 +173,14 @@ func (s *Stream) createChunk(id int) *Chunk { } } +func (s *Stream) pruneChunk(id int) { + delete(s.chunks, id) + + // Remove file + filename := s.getTsPath(id) + os.Remove(filename) +} + func (s *Stream) returnChunk(w http.ResponseWriter, chunk *Chunk) { // This function is called with lock, but we don't need it s.mutex.Unlock() @@ -177,13 +236,7 @@ func (s *Stream) waitForChunk(w http.ResponseWriter, chunk *Chunk) { func (s *Stream) restartAtChunk(w http.ResponseWriter, id int) { // Stop current transcoder - if s.coder != nil { - s.coder.Process.Kill() - s.coder = nil - } - - // Clear everything - s.chunks = make(map[int]*Chunk) + s.Stop() chunk := s.createChunk(id) // create first chunk @@ -278,7 +331,7 @@ func (s *Stream) transcode(startId int) { }...) s.coder = exec.Command(s.c.ffmpeg, args...) - log.Printf("%s-%s: %s", strings.Join(s.coder.Args[:], " ")) + log.Printf("%s-%s: %s", s.m.id, s.quality, strings.Join(s.coder.Args[:], " ")) cmdStdOut, err := s.coder.StdoutPipe() if err != nil {