monorepo
Varun Patil 2022-11-10 18:49:55 -08:00
parent a094c37fb5
commit d466a1b7b8
2 changed files with 66 additions and 8 deletions

View File

@ -68,6 +68,11 @@ func NewManager(c *Config, path string, id string, close chan string) (*Manager,
bitrate: int(float64(highest) * 1.5),
}
// Start all streams
for _, stream := range m.streams {
go stream.Run()
}
log.Printf("%s: new manager for %s", m.id, m.path)
return m, nil

View File

@ -43,6 +43,56 @@ type Stream struct {
chunks map[int]*Chunk
coder *exec.Cmd
inactive int
}
func (s *Stream) Run() {
// run every 5s
t := time.NewTicker(5 * time.Second)
defer t.Stop()
for {
<-t.C
s.mutex.Lock()
// Prune chunks
for id := range s.chunks {
if id < s.goal-s.c.goalBufferMax {
s.pruneChunk(id)
}
}
s.inactive++
if s.inactive >= 4 {
t.Stop()
s.Stop()
s.mutex.Unlock()
return
}
s.mutex.Unlock()
}
}
func (s *Stream) Stop() {
log.Printf("%s-%s: stopping stream", s.m.id, s.quality)
for _, chunk := range s.chunks {
// Delete files
s.pruneChunk(chunk.id)
}
s.chunks = make(map[int]*Chunk)
s.goal = 0
if s.coder != nil {
s.coder.Process.Kill()
s.coder = nil
}
}
func (s *Stream) StopWithLock() {
s.mutex.Lock()
defer s.mutex.Unlock()
s.Stop()
}
func (s *Stream) ServeList(w http.ResponseWriter) error {
@ -77,6 +127,7 @@ func (s *Stream) ServeChunk(w http.ResponseWriter, id int) error {
s.mutex.Lock()
defer s.mutex.Unlock()
s.inactive = 0
s.checkGoal(id)
// Already have this chunk
@ -122,6 +173,14 @@ func (s *Stream) createChunk(id int) *Chunk {
}
}
func (s *Stream) pruneChunk(id int) {
delete(s.chunks, id)
// Remove file
filename := s.getTsPath(id)
os.Remove(filename)
}
func (s *Stream) returnChunk(w http.ResponseWriter, chunk *Chunk) {
// This function is called with lock, but we don't need it
s.mutex.Unlock()
@ -177,13 +236,7 @@ func (s *Stream) waitForChunk(w http.ResponseWriter, chunk *Chunk) {
func (s *Stream) restartAtChunk(w http.ResponseWriter, id int) {
// Stop current transcoder
if s.coder != nil {
s.coder.Process.Kill()
s.coder = nil
}
// Clear everything
s.chunks = make(map[int]*Chunk)
s.Stop()
chunk := s.createChunk(id) // create first chunk
@ -278,7 +331,7 @@ func (s *Stream) transcode(startId int) {
}...)
s.coder = exec.Command(s.c.ffmpeg, args...)
log.Printf("%s-%s: %s", strings.Join(s.coder.Args[:], " "))
log.Printf("%s-%s: %s", s.m.id, s.quality, strings.Join(s.coder.Args[:], " "))
cmdStdOut, err := s.coder.StdoutPipe()
if err != nil {