Switch to config file

monorepo
Varun Patil 2023-03-09 11:57:15 -08:00
parent 35b4b3a8b2
commit f41680455c
4 changed files with 79 additions and 67 deletions

View File

@ -1,24 +1,31 @@
package main package main
type Config struct { type Config struct {
// Bind address
Bind string `json:"bind"`
// FFmpeg binary // FFmpeg binary
ffmpeg string FFmpeg string `json:"ffmpeg"`
// FFprobe binary // FFprobe binary
ffprobe string FFprobe string `json:"ffprobe"`
// Temp files directory // Temp files directory
tempdir string TempDir string `json:"tempdir"`
// Size of each chunk in seconds // Size of each chunk in seconds
chunkSize int ChunkSize int `json:"chunkSize"`
// How many *chunks* to look behind before restarting transcoding // How many *chunks* to look behind before restarting transcoding
lookBehind int LookBehind int `json:"lookBehind"`
// Number of chunks in goal to restart encoding // Number of chunks in goal to restart encoding
goalBufferMin int GoalBufferMin int `json:"goalBufferMin"`
// Number of chunks in goal to stop encoding // Number of chunks in goal to stop encoding
goalBufferMax int GoalBufferMax int `json:"goalBufferMax"`
// Number of seconds to wait before shutting down encoding // Number of seconds to wait before shutting down encoding
streamIdleTime int StreamIdleTime int `json:"streamIdleTime"`
// Number of seconds to wait before shutting down a client // Number of seconds to wait before shutting down a client
managerIdleTime int ManagerIdleTime int `json:"managerIdleTime"`
// Hardware acceleration configuration
VAAPI bool `json:"vaapi"`
NVENC bool `json:"nvenc"`
} }

81
main.go
View File

@ -1,7 +1,9 @@
package main package main
import ( import (
"encoding/json"
"fmt" "fmt"
"io/ioutil"
"log" "log"
"net/http" "net/http"
"os" "os"
@ -24,8 +26,8 @@ func NewHandler(c *Config) *Handler {
} }
// Recreate tempdir // Recreate tempdir
os.RemoveAll(c.tempdir) os.RemoveAll(c.TempDir)
os.MkdirAll(c.tempdir, 0755) os.MkdirAll(c.TempDir, 0755)
go h.watchClose() go h.watchClose()
return h return h
@ -120,51 +122,54 @@ func (h *Handler) Close() {
h.close <- "" h.close <- ""
} }
func loadConfig(path string, c *Config) {
// load json config
content, err := ioutil.ReadFile(path)
if err != nil {
log.Fatal("Error when opening file: ", err)
}
err = json.Unmarshal(content, &c)
if err != nil {
log.Fatal("Error loading config file", err)
}
// Print loaded config
fmt.Printf("%+v\n", c)
}
func main() { func main() {
if len(os.Args) >= 2 && os.Args[1] == "test" { if len(os.Args) >= 2 && os.Args[1] == "test" {
fmt.Println("test successful") fmt.Println("test successful")
return return
} }
c := &Config{
Bind: ":47788",
ChunkSize: 3,
LookBehind: 5,
GoalBufferMin: 1,
GoalBufferMax: 4,
StreamIdleTime: 60,
ManagerIdleTime: 60,
}
// Load config file from second argument
if len(os.Args) >= 2 {
loadConfig(os.Args[1], c)
} else {
log.Fatal("Missing config file")
}
if c.FFmpeg == "" || c.FFprobe == "" || c.TempDir == "" {
log.Fatal("Missing critical param -- check config file")
}
log.Println("Starting VOD server") log.Println("Starting VOD server")
// get executable paths h := NewHandler(c)
ffmpeg := os.Getenv("FFMPEG")
if ffmpeg == "" {
ffmpeg = "ffmpeg"
}
ffprobe := os.Getenv("FFPROBE")
if ffprobe == "" {
ffprobe = "ffprobe"
}
// get tempdir
tempdir := os.Getenv("GOVOD_TEMPDIR")
if tempdir == "" {
tempdir = "/tmp/go-vod"
}
// get port
bind := os.Getenv("GOVOD_BIND")
if bind == "" {
bind = ":47788"
}
h := NewHandler(&Config{
ffmpeg: ffmpeg,
ffprobe: ffprobe,
tempdir: tempdir,
chunkSize: 3,
lookBehind: 5,
goalBufferMin: 1,
goalBufferMax: 4,
streamIdleTime: 60,
managerIdleTime: 60,
})
http.Handle("/", h) http.Handle("/", h)
http.ListenAndServe(bind, nil) http.ListenAndServe(c.Bind, nil)
log.Println("Exiting VOD server") log.Println("Exiting VOD server")
h.Close() h.Close()

View File

@ -49,7 +49,7 @@ func NewManager(c *Config, path string, id string, close chan string) (*Manager,
h := fnv.New32a() h := fnv.New32a()
h.Write([]byte(path)) h.Write([]byte(path))
ph := fmt.Sprint(h.Sum32()) ph := fmt.Sprint(h.Sum32())
m.tempDir = fmt.Sprintf("%s/%s-%s", m.c.tempdir, id, ph) m.tempDir = fmt.Sprintf("%s/%s-%s", m.c.TempDir, id, ph)
// Delete temp dir if exists // Delete temp dir if exists
os.RemoveAll(m.tempDir) os.RemoveAll(m.tempDir)
@ -64,7 +64,7 @@ func NewManager(c *Config, path string, id string, close chan string) (*Manager,
m.probe.BitRate *= 2 m.probe.BitRate *= 2
} }
m.numChunks = int(math.Ceil(m.probe.Duration.Seconds() / float64(c.chunkSize))) m.numChunks = int(math.Ceil(m.probe.Duration.Seconds() / float64(c.ChunkSize)))
// Possible streams // Possible streams
m.streams["360p"] = &Stream{c: c, m: m, quality: "360p", height: 360, width: 640, bitrate: 500000} m.streams["360p"] = &Stream{c: c, m: m, quality: "360p", height: 360, width: 640, bitrate: 500000}
@ -134,7 +134,7 @@ func NewManager(c *Config, path string, id string, close chan string) (*Manager,
} }
// Nothing done for 5 minutes // Nothing done for 5 minutes
if m.inactive >= m.c.managerIdleTime/5 { if m.inactive >= m.c.ManagerIdleTime/5 {
t.Stop() t.Stop()
m.Destroy() m.Destroy()
m.close <- m.id m.close <- m.id
@ -247,7 +247,7 @@ func (m *Manager) ffprobe() error {
} }
ctx, _ := context.WithDeadline(context.TODO(), time.Now().Add(5*time.Second)) ctx, _ := context.WithDeadline(context.TODO(), time.Now().Add(5*time.Second))
cmd := exec.CommandContext(ctx, m.c.ffprobe, args...) cmd := exec.CommandContext(ctx, m.c.FFprobe, args...)
var stdout, stderr bytes.Buffer var stdout, stderr bytes.Buffer
cmd.Stdout = &stdout cmd.Stdout = &stdout

View File

@ -63,7 +63,7 @@ func (s *Stream) Run() {
s.mutex.Lock() s.mutex.Lock()
// Prune chunks // Prune chunks
for id := range s.chunks { for id := range s.chunks {
if id < s.goal-s.c.goalBufferMax { if id < s.goal-s.c.GoalBufferMax {
s.pruneChunk(id) s.pruneChunk(id)
} }
} }
@ -71,7 +71,7 @@ func (s *Stream) Run() {
s.inactive++ s.inactive++
// Nothing done for 2 minutes // Nothing done for 2 minutes
if s.inactive >= s.c.streamIdleTime/5 && s.coder != nil { if s.inactive >= s.c.StreamIdleTime/5 && s.coder != nil {
t.Stop() t.Stop()
s.clear() s.clear()
} }
@ -119,14 +119,14 @@ func (s *Stream) ServeList(w http.ResponseWriter, r *http.Request) error {
w.Write([]byte("#EXT-X-VERSION:4\n")) w.Write([]byte("#EXT-X-VERSION:4\n"))
w.Write([]byte("#EXT-X-MEDIA-SEQUENCE:0\n")) w.Write([]byte("#EXT-X-MEDIA-SEQUENCE:0\n"))
w.Write([]byte("#EXT-X-PLAYLIST-TYPE:VOD\n")) w.Write([]byte("#EXT-X-PLAYLIST-TYPE:VOD\n"))
w.Write([]byte(fmt.Sprintf("#EXT-X-TARGETDURATION:%d\n", s.c.chunkSize))) w.Write([]byte(fmt.Sprintf("#EXT-X-TARGETDURATION:%d\n", s.c.ChunkSize)))
query := GetQueryString(r) query := GetQueryString(r)
duration := s.m.probe.Duration.Seconds() duration := s.m.probe.Duration.Seconds()
i := 0 i := 0
for duration > 0 { for duration > 0 {
size := float64(s.c.chunkSize) size := float64(s.c.ChunkSize)
if duration < size { if duration < size {
size = duration size = duration
} }
@ -134,7 +134,7 @@ func (s *Stream) ServeList(w http.ResponseWriter, r *http.Request) error {
w.Write([]byte(fmt.Sprintf("#EXTINF:%.3f, nodesc\n", size))) w.Write([]byte(fmt.Sprintf("#EXTINF:%.3f, nodesc\n", size)))
w.Write([]byte(fmt.Sprintf("%s-%06d.ts%s\n", s.quality, i, query))) w.Write([]byte(fmt.Sprintf("%s-%06d.ts%s\n", s.quality, i, query)))
duration -= float64(s.c.chunkSize) duration -= float64(s.c.ChunkSize)
i++ i++
} }
@ -165,7 +165,7 @@ func (s *Stream) ServeChunk(w http.ResponseWriter, id int) error {
// Will have this soon enough // Will have this soon enough
foundBehind := false foundBehind := false
for i := id - 1; i > id-s.c.lookBehind && i >= 0; i-- { for i := id - 1; i > id-s.c.LookBehind && i >= 0; i-- {
if _, ok := s.chunks[i]; ok { if _, ok := s.chunks[i]; ok {
foundBehind = true foundBehind = true
} }
@ -201,7 +201,7 @@ func (s *Stream) ServeFullVideo(w http.ResponseWriter, r *http.Request) error {
"-movflags", "frag_keyframe+empty_moov+faststart", "-f", "mov", "pipe:1", "-movflags", "frag_keyframe+empty_moov+faststart", "-f", "mov", "pipe:1",
}...) }...)
coder := exec.Command(s.c.ffmpeg, args...) coder := exec.Command(s.c.FFmpeg, args...)
log.Printf("%s-%s: %s", s.m.id, s.quality, strings.Join(coder.Args[:], " ")) log.Printf("%s-%s: %s", s.m.id, s.quality, strings.Join(coder.Args[:], " "))
cmdStdOut, err := coder.StdoutPipe() cmdStdOut, err := coder.StdoutPipe()
@ -350,7 +350,7 @@ func (s *Stream) restartAtChunk(w http.ResponseWriter, id int) {
chunk := s.createChunk(id) // create first chunk chunk := s.createChunk(id) // create first chunk
// Start the transcoder // Start the transcoder
s.goal = id + s.c.goalBufferMax s.goal = id + s.c.GoalBufferMax
s.transcode(id) s.transcode(id)
s.waitForChunk(w, chunk) // this is also a request s.waitForChunk(w, chunk) // this is also a request
@ -372,11 +372,11 @@ func (s *Stream) transcodeArgs(startAt float64) []string {
CV := "libx264" CV := "libx264"
// Check whether hwaccel should be used // Check whether hwaccel should be used
if os.Getenv("VAAPI") == "1" { if s.c.VAAPI {
CV = "h264_vaapi" CV = "h264_vaapi"
extra := "-hwaccel vaapi -hwaccel_device /dev/dri/renderD128 -hwaccel_output_format vaapi" extra := "-hwaccel vaapi -hwaccel_device /dev/dri/renderD128 -hwaccel_output_format vaapi"
args = append(args, strings.Split(extra, " ")...) args = append(args, strings.Split(extra, " ")...)
} else if os.Getenv("NVENC") == "1" { } else if s.c.NVENC {
CV = "h264_nvenc" CV = "h264_nvenc"
extra := "-hwaccel cuda -hwaccel_output_format cuda" extra := "-hwaccel cuda -hwaccel_output_format cuda"
args = append(args, strings.Split(extra, " ")...) args = append(args, strings.Split(extra, " ")...)
@ -481,7 +481,7 @@ func (s *Stream) transcode(startId int) {
// This ensures that the keyframes are aligned // This ensures that the keyframes are aligned
startId-- startId--
} }
startAt := float64(startId * s.c.chunkSize) startAt := float64(startId * s.c.ChunkSize)
args := s.transcodeArgs(startAt) args := s.transcodeArgs(startAt)
@ -489,15 +489,15 @@ func (s *Stream) transcode(startId int) {
args = append(args, []string{ args = append(args, []string{
"-avoid_negative_ts", "disabled", "-avoid_negative_ts", "disabled",
"-f", "hls", "-f", "hls",
"-hls_time", fmt.Sprintf("%d", s.c.chunkSize), "-hls_time", fmt.Sprintf("%d", s.c.ChunkSize),
"-force_key_frames", fmt.Sprintf("expr:gte(t,n_forced*%d)", s.c.chunkSize), "-force_key_frames", fmt.Sprintf("expr:gte(t,n_forced*%d)", s.c.ChunkSize),
"-hls_segment_type", "mpegts", "-hls_segment_type", "mpegts",
"-start_number", fmt.Sprintf("%d", startId), "-start_number", fmt.Sprintf("%d", startId),
"-hls_segment_filename", s.getTsPath(-1), "-hls_segment_filename", s.getTsPath(-1),
"-", "-",
}...) }...)
s.coder = exec.Command(s.c.ffmpeg, args...) s.coder = exec.Command(s.c.FFmpeg, args...)
log.Printf("%s-%s: %s", s.m.id, s.quality, strings.Join(s.coder.Args[:], " ")) log.Printf("%s-%s: %s", s.m.id, s.quality, strings.Join(s.coder.Args[:], " "))
cmdStdOut, err := s.coder.StdoutPipe() cmdStdOut, err := s.coder.StdoutPipe()
@ -520,9 +520,9 @@ func (s *Stream) transcode(startId int) {
} }
func (s *Stream) checkGoal(id int) { func (s *Stream) checkGoal(id int) {
goal := id + s.c.goalBufferMin goal := id + s.c.GoalBufferMin
if goal > s.goal { if goal > s.goal {
s.goal = id + s.c.goalBufferMax s.goal = id + s.c.GoalBufferMax
// resume encoding // resume encoding
if s.coder != nil { if s.coder != nil {