From f41680455c4c1d576403378aee05356e6fe5aaf6 Mon Sep 17 00:00:00 2001 From: Varun Patil Date: Thu, 9 Mar 2023 11:57:15 -0800 Subject: [PATCH] Switch to config file --- config.go | 25 +++++++++++------ main.go | 81 +++++++++++++++++++++++++++++------------------------- manager.go | 8 +++--- stream.go | 32 ++++++++++----------- 4 files changed, 79 insertions(+), 67 deletions(-) diff --git a/config.go b/config.go index ba73034c..688cfc2e 100644 --- a/config.go +++ b/config.go @@ -1,24 +1,31 @@ package main type Config struct { + // Bind address + Bind string `json:"bind"` + // FFmpeg binary - ffmpeg string + FFmpeg string `json:"ffmpeg"` // FFprobe binary - ffprobe string + FFprobe string `json:"ffprobe"` // Temp files directory - tempdir string + TempDir string `json:"tempdir"` // Size of each chunk in seconds - chunkSize int + ChunkSize int `json:"chunkSize"` // How many *chunks* to look behind before restarting transcoding - lookBehind int + LookBehind int `json:"lookBehind"` // Number of chunks in goal to restart encoding - goalBufferMin int + GoalBufferMin int `json:"goalBufferMin"` // Number of chunks in goal to stop encoding - goalBufferMax int + GoalBufferMax int `json:"goalBufferMax"` // Number of seconds to wait before shutting down encoding - streamIdleTime int + StreamIdleTime int `json:"streamIdleTime"` // Number of seconds to wait before shutting down a client - managerIdleTime int + ManagerIdleTime int `json:"managerIdleTime"` + + // Hardware acceleration configuration + VAAPI bool `json:"vaapi"` + NVENC bool `json:"nvenc"` } diff --git a/main.go b/main.go index 4d592535..896db9ad 100644 --- a/main.go +++ b/main.go @@ -1,7 +1,9 @@ package main import ( + "encoding/json" "fmt" + "io/ioutil" "log" "net/http" "os" @@ -24,8 +26,8 @@ func NewHandler(c *Config) *Handler { } // Recreate tempdir - os.RemoveAll(c.tempdir) - os.MkdirAll(c.tempdir, 0755) + os.RemoveAll(c.TempDir) + os.MkdirAll(c.TempDir, 0755) go h.watchClose() return h @@ -120,51 +122,54 @@ func (h *Handler) Close() { h.close <- "" } +func loadConfig(path string, c *Config) { + // load json config + content, err := ioutil.ReadFile(path) + if err != nil { + log.Fatal("Error when opening file: ", err) + } + + err = json.Unmarshal(content, &c) + if err != nil { + log.Fatal("Error loading config file", err) + } + + // Print loaded config + fmt.Printf("%+v\n", c) +} + func main() { if len(os.Args) >= 2 && os.Args[1] == "test" { fmt.Println("test successful") return } + c := &Config{ + Bind: ":47788", + ChunkSize: 3, + LookBehind: 5, + GoalBufferMin: 1, + GoalBufferMax: 4, + StreamIdleTime: 60, + ManagerIdleTime: 60, + } + + // Load config file from second argument + if len(os.Args) >= 2 { + loadConfig(os.Args[1], c) + } else { + log.Fatal("Missing config file") + } + + if c.FFmpeg == "" || c.FFprobe == "" || c.TempDir == "" { + log.Fatal("Missing critical param -- check config file") + } + log.Println("Starting VOD server") - // get executable paths - ffmpeg := os.Getenv("FFMPEG") - if ffmpeg == "" { - ffmpeg = "ffmpeg" - } - - ffprobe := os.Getenv("FFPROBE") - if ffprobe == "" { - ffprobe = "ffprobe" - } - - // get tempdir - tempdir := os.Getenv("GOVOD_TEMPDIR") - if tempdir == "" { - tempdir = "/tmp/go-vod" - } - - // get port - bind := os.Getenv("GOVOD_BIND") - if bind == "" { - bind = ":47788" - } - - h := NewHandler(&Config{ - ffmpeg: ffmpeg, - ffprobe: ffprobe, - tempdir: tempdir, - chunkSize: 3, - lookBehind: 5, - goalBufferMin: 1, - goalBufferMax: 4, - streamIdleTime: 60, - managerIdleTime: 60, - }) - + h := NewHandler(c) http.Handle("/", h) - http.ListenAndServe(bind, nil) + http.ListenAndServe(c.Bind, nil) log.Println("Exiting VOD server") h.Close() diff --git a/manager.go b/manager.go index d4f30110..254bcf68 100644 --- a/manager.go +++ b/manager.go @@ -49,7 +49,7 @@ func NewManager(c *Config, path string, id string, close chan string) (*Manager, h := fnv.New32a() h.Write([]byte(path)) ph := fmt.Sprint(h.Sum32()) - m.tempDir = fmt.Sprintf("%s/%s-%s", m.c.tempdir, id, ph) + m.tempDir = fmt.Sprintf("%s/%s-%s", m.c.TempDir, id, ph) // Delete temp dir if exists os.RemoveAll(m.tempDir) @@ -64,7 +64,7 @@ func NewManager(c *Config, path string, id string, close chan string) (*Manager, m.probe.BitRate *= 2 } - m.numChunks = int(math.Ceil(m.probe.Duration.Seconds() / float64(c.chunkSize))) + m.numChunks = int(math.Ceil(m.probe.Duration.Seconds() / float64(c.ChunkSize))) // Possible streams m.streams["360p"] = &Stream{c: c, m: m, quality: "360p", height: 360, width: 640, bitrate: 500000} @@ -134,7 +134,7 @@ func NewManager(c *Config, path string, id string, close chan string) (*Manager, } // Nothing done for 5 minutes - if m.inactive >= m.c.managerIdleTime/5 { + if m.inactive >= m.c.ManagerIdleTime/5 { t.Stop() m.Destroy() m.close <- m.id @@ -247,7 +247,7 @@ func (m *Manager) ffprobe() error { } ctx, _ := context.WithDeadline(context.TODO(), time.Now().Add(5*time.Second)) - cmd := exec.CommandContext(ctx, m.c.ffprobe, args...) + cmd := exec.CommandContext(ctx, m.c.FFprobe, args...) var stdout, stderr bytes.Buffer cmd.Stdout = &stdout diff --git a/stream.go b/stream.go index dfd40cce..86ad421c 100644 --- a/stream.go +++ b/stream.go @@ -63,7 +63,7 @@ func (s *Stream) Run() { s.mutex.Lock() // Prune chunks for id := range s.chunks { - if id < s.goal-s.c.goalBufferMax { + if id < s.goal-s.c.GoalBufferMax { s.pruneChunk(id) } } @@ -71,7 +71,7 @@ func (s *Stream) Run() { s.inactive++ // Nothing done for 2 minutes - if s.inactive >= s.c.streamIdleTime/5 && s.coder != nil { + if s.inactive >= s.c.StreamIdleTime/5 && s.coder != nil { t.Stop() s.clear() } @@ -119,14 +119,14 @@ func (s *Stream) ServeList(w http.ResponseWriter, r *http.Request) error { w.Write([]byte("#EXT-X-VERSION:4\n")) w.Write([]byte("#EXT-X-MEDIA-SEQUENCE:0\n")) w.Write([]byte("#EXT-X-PLAYLIST-TYPE:VOD\n")) - w.Write([]byte(fmt.Sprintf("#EXT-X-TARGETDURATION:%d\n", s.c.chunkSize))) + w.Write([]byte(fmt.Sprintf("#EXT-X-TARGETDURATION:%d\n", s.c.ChunkSize))) query := GetQueryString(r) duration := s.m.probe.Duration.Seconds() i := 0 for duration > 0 { - size := float64(s.c.chunkSize) + size := float64(s.c.ChunkSize) if duration < size { size = duration } @@ -134,7 +134,7 @@ func (s *Stream) ServeList(w http.ResponseWriter, r *http.Request) error { w.Write([]byte(fmt.Sprintf("#EXTINF:%.3f, nodesc\n", size))) w.Write([]byte(fmt.Sprintf("%s-%06d.ts%s\n", s.quality, i, query))) - duration -= float64(s.c.chunkSize) + duration -= float64(s.c.ChunkSize) i++ } @@ -165,7 +165,7 @@ func (s *Stream) ServeChunk(w http.ResponseWriter, id int) error { // Will have this soon enough foundBehind := false - for i := id - 1; i > id-s.c.lookBehind && i >= 0; i-- { + for i := id - 1; i > id-s.c.LookBehind && i >= 0; i-- { if _, ok := s.chunks[i]; ok { foundBehind = true } @@ -201,7 +201,7 @@ func (s *Stream) ServeFullVideo(w http.ResponseWriter, r *http.Request) error { "-movflags", "frag_keyframe+empty_moov+faststart", "-f", "mov", "pipe:1", }...) - coder := exec.Command(s.c.ffmpeg, args...) + coder := exec.Command(s.c.FFmpeg, args...) log.Printf("%s-%s: %s", s.m.id, s.quality, strings.Join(coder.Args[:], " ")) cmdStdOut, err := coder.StdoutPipe() @@ -350,7 +350,7 @@ func (s *Stream) restartAtChunk(w http.ResponseWriter, id int) { chunk := s.createChunk(id) // create first chunk // Start the transcoder - s.goal = id + s.c.goalBufferMax + s.goal = id + s.c.GoalBufferMax s.transcode(id) s.waitForChunk(w, chunk) // this is also a request @@ -372,11 +372,11 @@ func (s *Stream) transcodeArgs(startAt float64) []string { CV := "libx264" // Check whether hwaccel should be used - if os.Getenv("VAAPI") == "1" { + if s.c.VAAPI { CV = "h264_vaapi" extra := "-hwaccel vaapi -hwaccel_device /dev/dri/renderD128 -hwaccel_output_format vaapi" args = append(args, strings.Split(extra, " ")...) - } else if os.Getenv("NVENC") == "1" { + } else if s.c.NVENC { CV = "h264_nvenc" extra := "-hwaccel cuda -hwaccel_output_format cuda" args = append(args, strings.Split(extra, " ")...) @@ -481,7 +481,7 @@ func (s *Stream) transcode(startId int) { // This ensures that the keyframes are aligned startId-- } - startAt := float64(startId * s.c.chunkSize) + startAt := float64(startId * s.c.ChunkSize) args := s.transcodeArgs(startAt) @@ -489,15 +489,15 @@ func (s *Stream) transcode(startId int) { args = append(args, []string{ "-avoid_negative_ts", "disabled", "-f", "hls", - "-hls_time", fmt.Sprintf("%d", s.c.chunkSize), - "-force_key_frames", fmt.Sprintf("expr:gte(t,n_forced*%d)", s.c.chunkSize), + "-hls_time", fmt.Sprintf("%d", s.c.ChunkSize), + "-force_key_frames", fmt.Sprintf("expr:gte(t,n_forced*%d)", s.c.ChunkSize), "-hls_segment_type", "mpegts", "-start_number", fmt.Sprintf("%d", startId), "-hls_segment_filename", s.getTsPath(-1), "-", }...) - s.coder = exec.Command(s.c.ffmpeg, args...) + s.coder = exec.Command(s.c.FFmpeg, args...) log.Printf("%s-%s: %s", s.m.id, s.quality, strings.Join(s.coder.Args[:], " ")) cmdStdOut, err := s.coder.StdoutPipe() @@ -520,9 +520,9 @@ func (s *Stream) transcode(startId int) { } func (s *Stream) checkGoal(id int) { - goal := id + s.c.goalBufferMin + goal := id + s.c.GoalBufferMin if goal > s.goal { - s.goal = id + s.c.goalBufferMax + s.goal = id + s.c.GoalBufferMax // resume encoding if s.coder != nil {