memories/go_vod/stream.go

705 lines
15 KiB
Go
Raw Normal View History

2023-10-21 18:36:22 +00:00
package go_vod
2022-11-10 12:09:35 +00:00
import (
2022-11-10 14:54:32 +00:00
"bufio"
2022-11-10 12:09:35 +00:00
"fmt"
2022-11-10 14:54:32 +00:00
"io"
"log"
2022-11-10 12:09:35 +00:00
"net/http"
2022-11-10 14:54:32 +00:00
"os"
"os/exec"
"strconv"
"strings"
"sync"
2022-11-10 15:46:35 +00:00
"syscall"
2022-11-10 14:54:32 +00:00
"time"
2022-11-10 12:09:35 +00:00
)
2023-04-04 01:44:11 +00:00
const (
2023-04-04 01:55:26 +00:00
ENCODER_COPY = "copy"
ENCODER_X264 = "libx264"
2023-04-04 01:44:11 +00:00
ENCODER_VAAPI = "h264_vaapi"
ENCODER_NVENC = "h264_nvenc"
2022-11-10 14:54:32 +00:00
2023-04-04 01:44:11 +00:00
QUALITY_MAX = "max"
2023-04-04 01:55:26 +00:00
CODEC_H264 = "h264"
2023-04-04 01:44:11 +00:00
)
2022-11-10 14:54:32 +00:00
2022-11-10 12:09:35 +00:00
type Stream struct {
2022-11-10 12:27:29 +00:00
c *Config
2022-11-10 12:09:35 +00:00
m *Manager
quality string
2022-12-01 21:05:19 +00:00
order int
2022-11-10 12:09:35 +00:00
height int
width int
bitrate int
2022-11-10 14:54:32 +00:00
2022-11-10 15:46:35 +00:00
goal int
2022-11-12 11:05:30 +00:00
mutex sync.Mutex
chunks map[int]*Chunk
seenChunks map[int]bool // only for stdout reader
2022-11-10 14:54:32 +00:00
coder *exec.Cmd
2022-11-11 02:49:55 +00:00
inactive int
2022-11-11 03:23:28 +00:00
stop chan bool
2022-11-11 02:49:55 +00:00
}
func (s *Stream) Run() {
// run every 5s
t := time.NewTicker(5 * time.Second)
defer t.Stop()
2022-11-11 03:23:28 +00:00
s.stop = make(chan bool)
2022-11-11 02:49:55 +00:00
for {
2022-11-11 03:23:28 +00:00
select {
case <-t.C:
s.mutex.Lock()
// Prune chunks
for id := range s.chunks {
2023-03-09 19:57:15 +00:00
if id < s.goal-s.c.GoalBufferMax {
2022-11-11 03:23:28 +00:00
s.pruneChunk(id)
}
2022-11-11 02:49:55 +00:00
}
2022-11-11 03:23:28 +00:00
s.inactive++
// Nothing done for 2 minutes
2023-03-09 19:57:15 +00:00
if s.inactive >= s.c.StreamIdleTime/5 && s.coder != nil {
2022-11-11 03:23:28 +00:00
t.Stop()
s.clear()
}
s.mutex.Unlock()
case <-s.stop:
2022-11-11 02:49:55 +00:00
t.Stop()
2022-11-11 03:23:28 +00:00
s.mutex.Lock()
s.clear()
2022-11-11 02:49:55 +00:00
s.mutex.Unlock()
return
}
}
}
2022-11-11 03:23:28 +00:00
func (s *Stream) clear() {
2022-11-11 02:49:55 +00:00
log.Printf("%s-%s: stopping stream", s.m.id, s.quality)
for _, chunk := range s.chunks {
// Delete files
s.pruneChunk(chunk.id)
}
s.chunks = make(map[int]*Chunk)
2022-11-12 11:05:30 +00:00
s.seenChunks = make(map[int]bool)
2022-11-11 02:49:55 +00:00
s.goal = 0
if s.coder != nil {
s.coder.Process.Kill()
2022-11-29 21:00:36 +00:00
s.coder.Wait()
2022-11-11 02:49:55 +00:00
s.coder = nil
}
}
2022-11-11 03:23:28 +00:00
func (s *Stream) Stop() {
2022-11-11 04:14:38 +00:00
select {
case s.stop <- true:
default:
}
2022-11-10 12:09:35 +00:00
}
2022-12-03 06:06:03 +00:00
func (s *Stream) ServeList(w http.ResponseWriter, r *http.Request) error {
2022-11-10 12:09:35 +00:00
WriteM3U8ContentType(w)
w.Write([]byte("#EXTM3U\n"))
w.Write([]byte("#EXT-X-VERSION:4\n"))
w.Write([]byte("#EXT-X-MEDIA-SEQUENCE:0\n"))
w.Write([]byte("#EXT-X-PLAYLIST-TYPE:VOD\n"))
2023-03-09 19:57:15 +00:00
w.Write([]byte(fmt.Sprintf("#EXT-X-TARGETDURATION:%d\n", s.c.ChunkSize)))
2022-11-10 12:09:35 +00:00
2022-12-03 06:06:03 +00:00
query := GetQueryString(r)
2022-11-10 12:45:10 +00:00
duration := s.m.probe.Duration.Seconds()
i := 0
for duration > 0 {
2023-03-09 19:57:15 +00:00
size := float64(s.c.ChunkSize)
2022-11-10 12:45:10 +00:00
if duration < size {
size = duration
}
w.Write([]byte(fmt.Sprintf("#EXTINF:%.3f, nodesc\n", size)))
2022-12-03 06:06:03 +00:00
w.Write([]byte(fmt.Sprintf("%s-%06d.ts%s\n", s.quality, i, query)))
2022-11-10 12:45:10 +00:00
2023-03-09 19:57:15 +00:00
duration -= float64(s.c.ChunkSize)
2022-11-10 12:45:10 +00:00
i++
2022-11-10 12:09:35 +00:00
}
w.Write([]byte("#EXT-X-ENDLIST\n"))
return nil
}
2022-11-10 12:45:10 +00:00
2022-11-10 14:54:32 +00:00
func (s *Stream) ServeChunk(w http.ResponseWriter, id int) error {
s.mutex.Lock()
defer s.mutex.Unlock()
2022-11-11 02:49:55 +00:00
s.inactive = 0
2022-11-10 15:46:35 +00:00
s.checkGoal(id)
2022-11-10 14:54:32 +00:00
// Already have this chunk
if chunk, ok := s.chunks[id]; ok {
// Chunk is finished, just return it
if chunk.done {
s.returnChunk(w, chunk)
return nil
}
// Still waiting on transcoder
s.waitForChunk(w, chunk)
return nil
}
// Will have this soon enough
foundBehind := false
2023-03-09 19:57:15 +00:00
for i := id - 1; i > id-s.c.LookBehind && i >= 0; i-- {
2022-11-10 14:54:32 +00:00
if _, ok := s.chunks[i]; ok {
foundBehind = true
}
}
if foundBehind {
// Make sure the chunk exists
chunk := s.createChunk(id)
// Wait for it
s.waitForChunk(w, chunk)
return nil
}
// Let's start over
s.restartAtChunk(w, id)
2022-11-10 12:45:10 +00:00
return nil
}
2022-11-10 14:54:32 +00:00
2022-12-03 15:24:24 +00:00
func (s *Stream) ServeFullVideo(w http.ResponseWriter, r *http.Request) error {
2023-07-22 02:02:31 +00:00
args := s.transcodeArgs(0, false)
2022-11-29 21:00:36 +00:00
2023-04-04 01:44:11 +00:00
if s.m.probe.CodecName == CODEC_H264 && s.quality == QUALITY_MAX {
2022-12-03 15:24:24 +00:00
// try to just send the original file
http.ServeFile(w, r, s.m.path)
return nil
2022-11-29 21:12:35 +00:00
}
2022-11-29 22:16:27 +00:00
// Output mov
2022-11-29 21:00:36 +00:00
args = append(args, []string{
2023-05-15 02:38:13 +00:00
"-movflags", "frag_keyframe+empty_moov+faststart",
2023-09-30 18:34:18 +00:00
"-f", "mp4", "pipe:1",
2022-11-29 21:00:36 +00:00
}...)
2023-03-09 19:57:15 +00:00
coder := exec.Command(s.c.FFmpeg, args...)
2022-11-29 21:00:36 +00:00
log.Printf("%s-%s: %s", s.m.id, s.quality, strings.Join(coder.Args[:], " "))
cmdStdOut, err := coder.StdoutPipe()
if err != nil {
2023-09-29 17:24:41 +00:00
log.Printf("FATAL: ffmpeg command stdout failed with %s\n", err)
2022-11-29 21:00:36 +00:00
}
cmdStdErr, err := coder.StderrPipe()
if err != nil {
2023-09-29 17:24:41 +00:00
log.Printf("FATAL: ffmpeg command stdout failed with %s\n", err)
2022-11-29 21:00:36 +00:00
}
err = coder.Start()
if err != nil {
log.Printf("FATAL: ffmpeg command failed with %s\n", err)
}
go s.monitorStderr(cmdStdErr)
// Write to response
defer cmdStdOut.Close()
stdoutReader := bufio.NewReader(cmdStdOut)
2022-11-29 22:16:27 +00:00
// Write mov headers
2023-09-30 18:34:18 +00:00
w.Header().Set("Content-Type", "video/mp4")
2022-11-29 21:00:36 +00:00
w.WriteHeader(http.StatusOK)
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "Server does not support Flusher!",
http.StatusInternalServerError)
return nil
}
// Write data, flusing every 1MB
buf := make([]byte, 1024*1024)
for {
n, err := stdoutReader.Read(buf)
if err != nil {
if err == io.EOF {
break
}
log.Printf("FATAL: ffmpeg command failed with %s\n", err)
break
}
_, err = w.Write(buf[:n])
if err != nil {
log.Printf("%s-%s: client closed connection", s.m.id, s.quality)
log.Println(err)
break
}
flusher.Flush()
}
// Terminate ffmpeg process
coder.Process.Kill()
coder.Wait()
return nil
}
2022-11-10 14:54:32 +00:00
func (s *Stream) createChunk(id int) *Chunk {
if c, ok := s.chunks[id]; ok {
return c
} else {
s.chunks[id] = NewChunk(id)
return s.chunks[id]
}
}
2022-11-11 02:49:55 +00:00
func (s *Stream) pruneChunk(id int) {
delete(s.chunks, id)
// Remove file
filename := s.getTsPath(id)
os.Remove(filename)
}
2022-11-10 14:54:32 +00:00
func (s *Stream) returnChunk(w http.ResponseWriter, chunk *Chunk) {
2022-11-11 02:20:47 +00:00
// This function is called with lock, but we don't need it
s.mutex.Unlock()
defer s.mutex.Lock()
2022-11-10 14:54:32 +00:00
// Read file and write to response
filename := s.getTsPath(chunk.id)
f, err := os.Open(filename)
if err != nil {
log.Println(err)
w.WriteHeader(http.StatusInternalServerError)
return
}
defer f.Close()
w.Header().Set("Content-Type", "video/MP2T")
io.Copy(w, f)
}
func (s *Stream) waitForChunk(w http.ResponseWriter, chunk *Chunk) {
if chunk.done {
s.returnChunk(w, chunk)
return
}
// Add our channel
notif := make(chan bool)
chunk.notifs = append(chunk.notifs, notif)
2022-11-22 00:22:28 +00:00
t := time.NewTimer(10 * time.Second)
2023-02-24 08:26:10 +00:00
coder := s.coder
2022-11-10 14:54:32 +00:00
s.mutex.Unlock()
select {
case <-notif:
t.Stop()
case <-t.C:
}
s.mutex.Lock()
// remove channel
for i, c := range chunk.notifs {
if c == notif {
chunk.notifs = append(chunk.notifs[:i], chunk.notifs[i+1:]...)
break
}
}
// check for success
if chunk.done {
s.returnChunk(w, chunk)
2022-11-12 10:41:42 +00:00
return
2022-11-10 14:54:32 +00:00
}
2022-11-12 02:18:16 +00:00
2023-02-24 08:26:10 +00:00
// Check if coder was changed
if coder != s.coder {
w.WriteHeader(http.StatusConflict)
return
}
2022-11-12 02:18:16 +00:00
// Return timeout error
w.WriteHeader(http.StatusRequestTimeout)
2022-11-10 14:54:32 +00:00
}
func (s *Stream) restartAtChunk(w http.ResponseWriter, id int) {
2022-11-10 15:34:07 +00:00
// Stop current transcoder
2022-11-11 03:23:28 +00:00
s.clear()
2022-11-10 14:54:32 +00:00
chunk := s.createChunk(id) // create first chunk
// Start the transcoder
2023-03-09 19:57:15 +00:00
s.goal = id + s.c.GoalBufferMax
2022-11-10 14:54:32 +00:00
s.transcode(id)
s.waitForChunk(w, chunk) // this is also a request
}
2022-11-29 21:00:36 +00:00
// Get arguments to ffmpeg
2023-07-22 02:02:31 +00:00
func (s *Stream) transcodeArgs(startAt float64, isHls bool) []string {
2022-11-10 14:54:32 +00:00
args := []string{
"-loglevel", "warning",
}
if startAt > 0 {
args = append(args, []string{
"-ss", fmt.Sprintf("%.6f", startAt),
}...)
}
2022-11-12 17:50:16 +00:00
// encoder selection
2023-04-04 01:44:11 +00:00
CV := ENCODER_X264
2022-11-12 17:50:16 +00:00
2023-04-17 07:36:36 +00:00
// Check whether hwaccel should be used
if s.c.VAAPI {
CV = ENCODER_VAAPI
extra := "-hwaccel vaapi -hwaccel_device /dev/dri/renderD128 -hwaccel_output_format vaapi"
args = append(args, strings.Split(extra, " ")...)
} else if s.c.NVENC {
CV = ENCODER_NVENC
extra := "-hwaccel cuda"
args = append(args, strings.Split(extra, " ")...)
2022-11-29 21:55:00 +00:00
}
2022-11-10 14:54:32 +00:00
// Disable autorotation (see transpose comments below)
2023-09-29 17:00:22 +00:00
if s.c.UseTranspose {
args = append(args, []string{"-noautorotate"}...)
}
2023-07-22 02:02:31 +00:00
2022-11-11 05:29:38 +00:00
// Input specs
args = append(args, []string{
2023-05-15 02:38:13 +00:00
"-i", s.m.path, // Input file
2022-11-11 05:29:38 +00:00
"-copyts", // So the "-to" refers to the original TS
2023-10-25 02:06:55 +00:00
"-fflags", "+genpts",
2022-11-11 05:29:38 +00:00
}...)
2023-03-17 23:40:31 +00:00
// Filters
format := "format=nv12"
scaler := "scale"
scalerArgs := make([]string, 0)
scalerArgs = append(scalerArgs, "force_original_aspect_ratio=decrease")
2023-04-04 01:44:11 +00:00
if CV == ENCODER_VAAPI {
2022-11-22 17:58:41 +00:00
format = "format=nv12|vaapi,hwupload"
2023-03-17 23:40:31 +00:00
scaler = "scale_vaapi"
scalerArgs = append(scalerArgs, "format=nv12")
2023-04-04 01:44:11 +00:00
} else if CV == ENCODER_NVENC {
format = "format=nv12|cuda,hwupload"
2023-03-17 23:40:31 +00:00
scaler = fmt.Sprintf("scale_%s", s.c.NVENCScale)
// workaround to force scale_cuda to examine all input frames
if s.c.NVENCScale == "cuda" {
scalerArgs = append(scalerArgs, "passthrough=0")
}
2023-03-17 23:40:31 +00:00
}
2023-03-09 21:02:57 +00:00
2023-03-17 23:40:31 +00:00
// Scale height and width if not max quality
2023-04-04 01:44:11 +00:00
if s.quality != QUALITY_MAX {
2023-08-04 04:08:29 +00:00
maxDim := s.height
if s.width > s.height {
maxDim = s.width
}
scalerArgs = append(scalerArgs, fmt.Sprintf("w=%d", maxDim))
scalerArgs = append(scalerArgs, fmt.Sprintf("h=%d", maxDim))
2022-11-10 14:54:32 +00:00
}
2023-03-17 23:40:31 +00:00
// Apply filter
2023-04-04 01:55:26 +00:00
if CV != ENCODER_COPY {
filter := fmt.Sprintf("%s,%s=%s", format, scaler, strings.Join(scalerArgs, ":"))
2023-05-15 01:10:17 +00:00
// Rotation is a mess: https://trac.ffmpeg.org/ticket/8329
2023-07-22 02:02:31 +00:00
// 1/ -noautorotate copies the sidecar metadata to the output
// 2/ autorotation doesn't seem to work with some types of HW (at least not with VAAPI)
2023-07-22 02:02:31 +00:00
// 3/ autorotation doesn't work with HLS streams
// 4/ VAAPI cannot transport on AMD GPUs
// So: give the user to disable autorotation for HLS and use a manual transpose
if isHls && s.c.UseTranspose {
2023-05-15 02:38:13 +00:00
transposer := "transpose"
if CV == ENCODER_VAAPI {
transposer = "transpose_vaapi"
} else if CV == ENCODER_NVENC {
2023-07-22 03:21:39 +00:00
transposer = fmt.Sprintf("transpose_%s", s.c.NVENCScale)
2023-05-15 02:38:13 +00:00
}
2023-07-22 03:21:39 +00:00
if transposer != "transpose_cuda" { // does not exist
if s.m.probe.Rotation == -90 {
filter = fmt.Sprintf("%s,%s=1", filter, transposer)
} else if s.m.probe.Rotation == 90 {
filter = fmt.Sprintf("%s,%s=2", filter, transposer)
} else if s.m.probe.Rotation == 180 || s.m.probe.Rotation == -180 {
filter = fmt.Sprintf("%s,%s=1,%s=1", filter, transposer, transposer)
}
2023-05-15 02:38:13 +00:00
}
2023-05-15 01:10:17 +00:00
}
2023-04-04 01:55:26 +00:00
args = append(args, []string{"-vf", filter}...)
}
2023-03-17 23:40:31 +00:00
2023-03-29 22:54:53 +00:00
// Output specs for video
2022-11-10 14:54:32 +00:00
args = append(args, []string{
2023-03-29 22:54:53 +00:00
"-map", "0:v:0",
2022-11-10 14:54:32 +00:00
"-c:v", CV,
}...)
2022-11-12 10:39:56 +00:00
// Device specific output args
2023-04-04 01:44:11 +00:00
if CV == ENCODER_VAAPI {
2023-10-25 00:17:36 +00:00
args = append(args, []string{"-global_quality", fmt.Sprintf("%d", s.c.QF)}...)
2023-03-09 20:39:43 +00:00
if s.c.VAAPILowPower {
args = append(args, []string{"-low_power", "1"}...)
}
2023-04-04 01:44:11 +00:00
} else if CV == ENCODER_NVENC {
2022-11-29 21:55:00 +00:00
args = append(args, []string{
"-preset", "p6",
"-tune", "ll",
"-rc", "vbr",
"-rc-lookahead", "30",
2023-10-25 00:17:36 +00:00
"-cq", fmt.Sprintf("%d", s.c.QF),
2022-11-29 21:55:00 +00:00
}...)
2023-03-09 21:02:57 +00:00
if s.c.NVENCTemporalAQ {
args = append(args, []string{"-temporal-aq", "1"}...)
}
2023-04-04 01:44:11 +00:00
} else if CV == ENCODER_X264 {
2022-11-10 14:54:32 +00:00
args = append(args, []string{
2022-11-11 01:56:38 +00:00
"-preset", "faster",
2023-10-25 00:17:36 +00:00
"-crf", fmt.Sprintf("%d", s.c.QF),
2022-11-10 14:54:32 +00:00
}...)
}
2023-03-29 22:54:53 +00:00
// Audio output specs
2022-11-10 14:54:32 +00:00
args = append(args, []string{
2023-03-29 22:54:53 +00:00
"-map", "0:a:0?",
2022-11-10 14:54:32 +00:00
"-c:a", "aac",
2022-11-16 15:10:21 +00:00
"-ac", "1",
2022-11-10 14:54:32 +00:00
}...)
2022-11-29 21:00:36 +00:00
return args
}
func (s *Stream) transcode(startId int) {
if startId > 0 {
// Start one frame before
// This ensures that the keyframes are aligned
startId--
}
2023-03-09 19:57:15 +00:00
startAt := float64(startId * s.c.ChunkSize)
2022-11-29 21:00:36 +00:00
2023-07-22 02:02:31 +00:00
args := s.transcodeArgs(startAt, true)
2022-11-29 21:00:36 +00:00
2022-11-10 14:54:32 +00:00
// Segmenting specs
args = append(args, []string{
2023-09-29 17:25:00 +00:00
"-start_number", fmt.Sprintf("%d", startId),
2022-11-10 14:54:32 +00:00
"-avoid_negative_ts", "disabled",
"-f", "hls",
2023-08-31 02:59:03 +00:00
"-hls_flags", "split_by_time",
2023-03-09 19:57:15 +00:00
"-hls_time", fmt.Sprintf("%d", s.c.ChunkSize),
2022-11-10 14:54:32 +00:00
"-hls_segment_type", "mpegts",
"-hls_segment_filename", s.getTsPath(-1),
}...)
2023-09-29 17:25:00 +00:00
// Keyframe specs
if s.c.UseGopSize && s.m.probe.FrameRate > 0 {
// Fix GOP size
args = append(args, []string{
"-g", fmt.Sprintf("%d", s.c.ChunkSize*s.m.probe.FrameRate),
"-keyint_min", fmt.Sprintf("%d", s.c.ChunkSize*s.m.probe.FrameRate),
}...)
} else {
// Force keyframes every chunk
args = append(args, []string{
"-force_key_frames", fmt.Sprintf("expr:gte(t,n_forced*%d)", s.c.ChunkSize),
}...)
}
// Output to stdout
args = append(args, "-")
// Start the process
2023-03-09 19:57:15 +00:00
s.coder = exec.Command(s.c.FFmpeg, args...)
2023-10-25 02:10:20 +00:00
// Log command, quoting the args as needed
quotedArgs := make([]string, len(s.coder.Args))
invalidChars := strings.Join([]string{" ", "=", ":", "\"", "\\", "\n", "\t"}, "")
for i, arg := range s.coder.Args {
if strings.ContainsAny(arg, invalidChars) {
quotedArgs[i] = fmt.Sprintf("\"%s\"", arg)
} else {
quotedArgs[i] = arg
}
}
log.Printf("%s-%s: %s", s.m.id, s.quality, strings.Join(quotedArgs[:], " "))
2022-11-10 14:54:32 +00:00
cmdStdOut, err := s.coder.StdoutPipe()
if err != nil {
2023-09-29 17:24:41 +00:00
log.Printf("FATAL: ffmpeg command stdout failed with %s\n", err)
2022-11-10 14:54:32 +00:00
}
cmdStdErr, err := s.coder.StderrPipe()
if err != nil {
2023-09-29 17:24:41 +00:00
log.Printf("FATAL: ffmpeg command stdout failed with %s\n", err)
2022-11-10 14:54:32 +00:00
}
err = s.coder.Start()
if err != nil {
log.Printf("FATAL: ffmpeg command failed with %s\n", err)
}
go s.monitorTranscodeOutput(cmdStdOut, startAt)
go s.monitorStderr(cmdStdErr)
2023-03-17 21:44:59 +00:00
go s.monitorExit()
2022-11-10 14:54:32 +00:00
}
2022-11-10 15:46:35 +00:00
func (s *Stream) checkGoal(id int) {
2023-03-09 19:57:15 +00:00
goal := id + s.c.GoalBufferMin
2022-11-10 15:46:35 +00:00
if goal > s.goal {
2023-03-09 19:57:15 +00:00
s.goal = id + s.c.GoalBufferMax
2022-11-10 15:46:35 +00:00
// resume encoding
if s.coder != nil {
2022-11-11 02:20:47 +00:00
log.Printf("%s-%s: resuming transcoding", s.m.id, s.quality)
2022-11-10 15:46:35 +00:00
s.coder.Process.Signal(syscall.SIGCONT)
}
}
}
2022-11-10 14:54:32 +00:00
func (s *Stream) getTsPath(id int) string {
if id == -1 {
2022-11-11 03:40:53 +00:00
return fmt.Sprintf("%s/%s-%%06d.ts", s.m.tempDir, s.quality)
2022-11-10 14:54:32 +00:00
}
2022-11-11 03:40:53 +00:00
return fmt.Sprintf("%s/%s-%06d.ts", s.m.tempDir, s.quality, id)
2022-11-10 14:54:32 +00:00
}
// Separate goroutine
func (s *Stream) monitorTranscodeOutput(cmdStdOut io.ReadCloser, startAt float64) {
2022-11-10 15:34:07 +00:00
s.mutex.Lock()
coder := s.coder
s.mutex.Unlock()
2022-11-10 14:54:32 +00:00
defer cmdStdOut.Close()
stdoutReader := bufio.NewReader(cmdStdOut)
for {
2022-11-10 15:34:07 +00:00
if s.coder != coder {
break
}
2022-11-10 14:54:32 +00:00
line, err := stdoutReader.ReadBytes('\n')
if err == io.EOF {
if len(line) == 0 {
break
}
2023-03-17 21:30:30 +00:00
} else if err != nil {
log.Println(err)
break
2022-11-10 14:54:32 +00:00
} else {
line = line[:(len(line) - 1)]
}
l := string(line)
if strings.Contains(l, ".ts") {
2022-11-10 15:46:35 +00:00
// 1080p-000003.ts
idx := strings.Split(strings.Split(l, "-")[1], ".")[0]
id, err := strconv.Atoi(idx)
if err != nil {
log.Println("Error parsing chunk id")
}
2022-11-10 15:34:07 +00:00
2022-11-12 11:05:30 +00:00
if s.seenChunks[id] {
continue
}
s.seenChunks[id] = true
// Debug
log.Printf("%s-%s: recv %s", s.m.id, s.quality, l)
2022-11-11 02:20:47 +00:00
func() {
2022-11-10 15:34:07 +00:00
s.mutex.Lock()
defer s.mutex.Unlock()
2022-11-10 15:46:35 +00:00
// The coder has changed; do nothing
2022-11-10 15:34:07 +00:00
if s.coder != coder {
return
}
2022-11-10 15:46:35 +00:00
// Notify everyone
2022-11-10 15:34:07 +00:00
chunk := s.createChunk(id)
2022-11-10 17:39:09 +00:00
if chunk.done {
return
}
2022-11-10 15:34:07 +00:00
chunk.done = true
for _, n := range chunk.notifs {
n <- true
}
2022-11-10 15:46:35 +00:00
// Check goal satisfied
if id >= s.goal {
2022-11-11 02:20:47 +00:00
log.Printf("%s-%s: goal satisfied: %d", s.m.id, s.quality, s.goal)
2022-11-10 15:46:35 +00:00
s.coder.Process.Signal(syscall.SIGSTOP)
}
2022-11-10 15:34:07 +00:00
}()
2022-11-10 14:54:32 +00:00
}
2022-11-10 15:34:07 +00:00
}
2022-11-10 14:54:32 +00:00
}
func (s *Stream) monitorStderr(cmdStdErr io.ReadCloser) {
stderrReader := bufio.NewReader(cmdStdErr)
for {
line, err := stderrReader.ReadBytes('\n')
if err == io.EOF {
if len(line) == 0 {
break
}
2023-03-17 21:30:30 +00:00
} else if err != nil {
log.Println(err)
break
2022-11-10 14:54:32 +00:00
} else {
line = line[:(len(line) - 1)]
}
log.Println("ffmpeg-error:", string(line))
}
}
2023-03-17 21:44:59 +00:00
func (s *Stream) monitorExit() {
// Join the process
coder := s.coder
err := coder.Wait()
// Try to get exit status
if exitError, ok := err.(*exec.ExitError); ok {
exitcode := exitError.ExitCode()
log.Printf("%s-%s: ffmpeg exited with status: %d", s.m.id, s.quality, exitcode)
s.mutex.Lock()
defer s.mutex.Unlock()
// If error code is >0, there was an error in transcoding
if exitcode > 0 && s.coder == coder {
// Notify all outstanding chunks
for _, chunk := range s.chunks {
for _, n := range chunk.notifs {
n <- true
}
}
}
}
}