log improv

monorepo
Varun Patil 2022-11-10 18:20:47 -08:00
parent 1141c358b8
commit a094c37fb5
4 changed files with 26 additions and 23 deletions

View File

@ -10,6 +10,8 @@ type Config struct {
chunkSize int
// How many *chunks* to look behind before restarting transcoding
lookBehind int
// How many chunks to buffer ahead of player position
goalBuffer int
// Number of chunks in goal to restart encoding
goalBufferMin int
// Number of chunks in goal to stop encoding
goalBufferMax int
}

13
main.go
View File

@ -44,7 +44,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
path := "/" + strings.Join(parts[1:len(parts)-1], "/")
chunk := parts[len(parts)-1]
log.Println("Serving", path, streamid, chunk)
// log.Println("Serving", path, streamid, chunk)
if streamid == "" || chunk == "" || path == "" {
w.WriteHeader(http.StatusBadRequest)
@ -109,11 +109,12 @@ func main() {
log.Println("Starting VOD server")
h := NewHandler(&Config{
ffmpeg: "ffmpeg",
ffprobe: "ffprobe",
chunkSize: 3,
lookBehind: 5,
goalBuffer: 5,
ffmpeg: "ffmpeg",
ffprobe: "ffprobe",
chunkSize: 3,
lookBehind: 5,
goalBufferMin: 3,
goalBufferMax: 8,
})
http.Handle("/", h)

View File

@ -68,11 +68,7 @@ func NewManager(c *Config, path string, id string, close chan string) (*Manager,
bitrate: int(float64(highest) * 1.5),
}
log.Println("New manager", m.id,
"with streams:", len(m.streams),
"duration:", m.probe.Duration,
"resolution:", m.probe.Width, "x", m.probe.Height,
)
log.Printf("%s: new manager for %s", m.id, m.path)
return m, nil
}

View File

@ -123,6 +123,10 @@ func (s *Stream) createChunk(id int) *Chunk {
}
func (s *Stream) returnChunk(w http.ResponseWriter, chunk *Chunk) {
// This function is called with lock, but we don't need it
s.mutex.Unlock()
defer s.mutex.Lock()
// Read file and write to response
filename := s.getTsPath(chunk.id)
f, err := os.Open(filename)
@ -132,7 +136,6 @@ func (s *Stream) returnChunk(w http.ResponseWriter, chunk *Chunk) {
return
}
defer f.Close()
log.Printf("Served chunk %d", chunk.id)
w.Header().Set("Content-Type", "video/MP2T")
io.Copy(w, f)
}
@ -185,7 +188,7 @@ func (s *Stream) restartAtChunk(w http.ResponseWriter, id int) {
chunk := s.createChunk(id) // create first chunk
// Start the transcoder
s.goal = id + 5
s.goal = id + s.c.goalBufferMax
s.transcode(id)
s.waitForChunk(w, chunk) // this is also a request
@ -275,7 +278,7 @@ func (s *Stream) transcode(startId int) {
}...)
s.coder = exec.Command(s.c.ffmpeg, args...)
log.Println("Starting FFmpeg process with args", strings.Join(s.coder.Args[:], " "))
log.Printf("%s-%s: %s", strings.Join(s.coder.Args[:], " "))
cmdStdOut, err := s.coder.StdoutPipe()
if err != nil {
@ -297,13 +300,13 @@ func (s *Stream) transcode(startId int) {
}
func (s *Stream) checkGoal(id int) {
goal := id + s.c.goalBuffer
goal := id + s.c.goalBufferMin
if goal > s.goal {
s.goal = goal
s.goal = id + s.c.goalBufferMax
// resume encoding
if s.coder != nil {
log.Println("Resuming encoding")
log.Printf("%s-%s: resuming transcoding", s.m.id, s.quality)
s.coder.Process.Signal(syscall.SIGCONT)
}
}
@ -345,6 +348,9 @@ func (s *Stream) monitorTranscodeOutput(cmdStdOut io.ReadCloser, startAt float64
l := string(line)
if strings.Contains(l, ".ts") {
// Debug
log.Printf("%s-%s: recv %s", s.m.id, s.quality, l)
// 1080p-000003.ts
idx := strings.Split(strings.Split(l, "-")[1], ".")[0]
id, err := strconv.Atoi(idx)
@ -352,7 +358,7 @@ func (s *Stream) monitorTranscodeOutput(cmdStdOut io.ReadCloser, startAt float64
log.Println("Error parsing chunk id")
}
go func() {
func() {
s.mutex.Lock()
defer s.mutex.Unlock()
@ -373,13 +379,11 @@ func (s *Stream) monitorTranscodeOutput(cmdStdOut io.ReadCloser, startAt float64
// Check goal satisfied
if id >= s.goal {
log.Println("Goal satisfied, pausing encoding")
log.Printf("%s-%s: goal satisfied: %d", s.m.id, s.quality, s.goal)
s.coder.Process.Signal(syscall.SIGSTOP)
}
}()
}
log.Println("ffmpeg:", l)
}
// Join the process