Add 'go-vod/' from commit 'a37c11daf8c4aa207186c4ef030d8a35f251d433'

git-subtree-dir: go-vod
git-subtree-mainline: a41998f0f0
git-subtree-split: a37c11daf8
monorepo
Varun Patil 2023-11-01 09:32:27 -07:00
commit 4df1ce40f3
19 changed files with 2065 additions and 0 deletions

View File

@ -0,0 +1 @@
Dockerfile

View File

@ -0,0 +1,68 @@
name: release
on:
push:
tags:
- "*"
jobs:
binary:
name: Binary
runs-on: ubuntu-latest
container:
image: golang:1.20-bullseye
steps:
- name: Checkout
uses: actions/checkout@v3
- name: Build
run: |
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -buildvcs=false -ldflags="-s -w" -o go-vod-amd64
CGO_ENABLED=0 GOOS=linux GOARCH=arm64 go build -buildvcs=false -ldflags="-s -w" -o go-vod-arm64
- name: Upload to releases
uses: svenstaro/upload-release-action@v2
id: attach_to_release
with:
file: go-vod-*
file_glob: true
tag: ${{ github.ref }}
overwrite: true
docker:
runs-on: ubuntu-latest
name: Docker
steps:
- name: Check out the repo
uses: actions/checkout@v4
- name: Set up QEMU
uses: docker/setup-qemu-action@v3
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Login to DockerHub
uses: docker/login-action@v3
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_TOKEN }}
- name: Get image label
id: image_label
run: echo "label=${GITHUB_REF#refs/tags/}" >> $GITHUB_OUTPUT
- name: Build container image
uses: docker/build-push-action@v5
with:
push: true
platforms: linux/amd64,linux/arm64
context: './'
no-cache: true
file: 'Dockerfile'
tags: radialapps/go-vod:${{ steps.image_label.outputs.label }} , radialapps/go-vod:latest
provenance: false

22
go-vod/.gitignore vendored 100644
View File

@ -0,0 +1,22 @@
# If you prefer the allow list template instead of the deny list, see community template:
# https://github.com/github/gitignore/blob/main/community/Golang/Go.AllowList.gitignore
#
# Binaries for programs and plugins
*.exe
*.exe~
*.dll
*.so
*.dylib
go-vod
# Test binary, built with `go test -c`
*.test
# Output of the go coverage tool, specifically when used with LiteIDE
*.out
# Dependency directories (remove the comment below to include it)
# vendor/
# Go workspace file
go.work

View File

@ -0,0 +1,7 @@
FROM linuxserver/ffmpeg:latest
COPY run.sh /go-vod.sh
EXPOSE 47788
ENTRYPOINT ["/go-vod.sh"]

202
go-vod/LICENSE 100644
View File

@ -0,0 +1,202 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

26
go-vod/README.md 100644
View File

@ -0,0 +1,26 @@
# go-vod
Extremely minimal on-demand video transcoding server in go. Used by the FOSS photos app, [Memories](https://github.com/pulsejet/memories).
## Filing Issues
Please file issues at the [Memories](https://github.com/pulsejet/memories) repository.
## Usage
Note: this package provides bespoke functionality for Memories. As such it is not intended to be used as a library.
You need go and ffmpeg/ffprobe installed
```bash
CGO_ENABLED=0 go build -ldflags="-s -w"
./go-vod
```
The server exposes all files as HLS streams, at the URL
```
http://localhost:47788/player-id/path/to/file/index.m3u8
```
## Thanks
Partially inspired from [go-transcode](https://github.com/m1k1o/go-transcode). The projects use different approaches for segmenting the transcodes.

View File

@ -0,0 +1,39 @@
#!/bin/bash
set -e
# This script is intended for bare-metal installations.
# It builds ffmpeg and NVENC drivers from source.
apt-get remove -y ffmpeg
apt-get update
apt-get install -y \
sudo curl wget \
autoconf libtool libdrm-dev xorg xorg-dev openbox \
libx11-dev libgl1-mesa-glx libgl1-mesa-dev \
xcb libxcb-xkb-dev x11-xkb-utils libx11-xcb-dev \
libxkbcommon-x11-dev libxcb-dri3-dev \
cmake git nasm build-essential \
libx264-dev \
libffmpeg-nvenc-dev clang
git clone --branch sdk/11.1 https://git.videolan.org/git/ffmpeg/nv-codec-headers.git
cd nv-codec-headers
sudo make install
cd ..
git clone --depth 1 --branch n5.1.3 https://github.com/FFmpeg/FFmpeg
cd FFmpeg
./configure \
--enable-nonfree \
--enable-gpl \
--enable-libx264 \
--enable-nvenc \
--enable-ffnvcodec \
--enable-cuda-llvm
make -j"$(nproc)"
sudo make install
sudo ldconfig
cd ..

View File

@ -0,0 +1,62 @@
#!/bin/bash
# This script is intended for bare-metal installations.
# It builds ffmpeg and VA-API drivers from source.
set -e
apt-get remove -y libva ffmpeg
apt-get update
apt-get install -y \
sudo curl wget \
autoconf libtool libdrm-dev xorg xorg-dev openbox \
libx11-dev libgl1-mesa-glx libgl1-mesa-dev \
xcb libxcb-xkb-dev x11-xkb-utils libx11-xcb-dev \
libxkbcommon-x11-dev libxcb-dri3-dev \
cmake git nasm build-essential \
libx264-dev
mkdir qsvbuild
cd qsvbuild
git clone --depth 1 --branch 2.18.0 https://github.com/intel/libva
cd libva
./autogen.sh
make
sudo make install
sudo ldconfig
cd ..
git clone --depth 1 --branch intel-gmmlib-22.3.5 https://github.com/intel/gmmlib
cd gmmlib
mkdir build && cd build
cmake ..
make -j"$(nproc)"
sudo make install
sudo ldconfig
cd ../..
git clone --depth 1 --branch intel-media-23.1.6 https://github.com/intel/media-driver
mkdir -p build_media
cd build_media
cmake ../media-driver
make -j"$(nproc)"
sudo make install
sudo ldconfig
cd ..
git clone --depth 1 --branch n6.0 https://github.com/FFmpeg/FFmpeg
cd FFmpeg
./configure \
--enable-nonfree \
--enable-gpl \
--enable-libx264
make -j"$(nproc)"
sudo make install
sudo ldconfig
cd ..
cd ..
rm -rf qsvbuild

View File

@ -0,0 +1,12 @@
FROM golang:bullseye AS builder
WORKDIR /app
COPY . .
RUN CGO_ENABLED=0 go build -buildvcs=false -ldflags="-s -w"
FROM linuxserver/ffmpeg:latest
COPY --from=builder /app/go-vod .
EXPOSE 47788
ENTRYPOINT ["/go-vod"]

3
go-vod/go.mod 100644
View File

@ -0,0 +1,3 @@
module github.com/pulsejet/go-vod
go 1.16

48
go-vod/main.go 100644
View File

@ -0,0 +1,48 @@
package main
import (
"fmt"
"log"
"os"
"github.com/pulsejet/go-vod/transcoder"
)
const VERSION = "0.1.28"
func main() {
// Build initial configuration
c := &transcoder.Config{
VersionMonitor: false,
Version: VERSION,
Bind: ":47788",
ChunkSize: 3,
LookBehind: 3,
GoalBufferMin: 1,
GoalBufferMax: 4,
StreamIdleTime: 60,
ManagerIdleTime: 60,
}
// Parse arguments
for _, arg := range os.Args[1:] {
if arg == "-version-monitor" {
c.VersionMonitor = true
} else if arg == "-version" {
fmt.Print("go-vod " + VERSION)
return
} else {
c.FromFile(arg) // config file
}
}
// Auto detect ffmpeg and ffprobe
c.AutoDetect()
// Start server
code := transcoder.NewHandler(c).Start()
// Exit
log.Println("Exiting go-vod with status code", code)
os.Exit(code)
}

67
go-vod/run.sh 100755
View File

@ -0,0 +1,67 @@
#!/bin/bash
# This script fetches the current version of go-vod from Nextcloud
# to the working directory and runs it. If go-vod exits with a restart
# code, the script will restart it.
# This script is intended to be run by systemd if running on bare metal.
# Environment variables
HOST=$NEXTCLOUD_HOST
ALLOW_INSECURE=$NEXTCLOUD_ALLOW_INSECURE
# check if host is set
if [[ -z $HOST ]]; then
echo "fatal: NEXTCLOUD_HOST is not set"
exit 1
fi
# check if scheme is set
if [[ ! $HOST == http://* ]] && [[ ! $HOST == https://* ]]; then
echo "fatal: NEXTCLOUD_HOST must start with http:// or https://"
exit 1
fi
# check if scheme is http and allow_insecure is not set
if [[ $HOST == http://* ]] && [[ -z $ALLOW_INSECURE ]]; then
echo "fatal: NEXTCLOUD_HOST is set to http:// but NEXTCLOUD_ALLOW_INSECURE is not set"
exit 1
fi
# build URL to fetch binary from Nextcloud
ARCH=$(uname -m)
URL="$HOST/index.php/apps/memories/static/go-vod?arch=$ARCH"
# set the -k option in curl if allow_insecure is set
EXTRA_CURL_ARGS=""
if [[ $ALLOW_INSECURE == true ]]; then
EXTRA_CURL_ARGS="$EXTRA_CURL_ARGS -k"
fi
# fetch binary, sleeping 10 seconds between retries
function fetch_binary {
while true; do
rm -f go-vod
curl $EXTRA_CURL_ARGS -L -f -m 10 -s -o go-vod $URL
if [[ $? == 0 ]]; then
chmod +x go-vod
echo "Fetched $URL successfully!"
break
fi
echo "Failed to fetch $URL"
echo "Are you sure the host is reachable and running Memories v6+?"
echo "Retrying in 10 seconds..."
sleep 10
done
}
# infinite loop
while true; do
fetch_binary
./go-vod -version-monitor
if [[ $? != 12 ]]; then
break
fi
sleep 3 # throttle
done

View File

@ -0,0 +1,15 @@
package transcoder
type Chunk struct {
id int
done bool
notifs []chan bool
}
func NewChunk(id int) *Chunk {
return &Chunk{
id: id,
done: false,
notifs: make([]chan bool, 0),
}
}

View File

@ -0,0 +1,111 @@
package transcoder
import (
"encoding/json"
"io/ioutil"
"log"
"os"
"os/exec"
)
type Config struct {
// Current version of go-vod
Version string
// Is this server configured?
Configured bool
// Restart the server if incorrect version detected
VersionMonitor bool
// Bind address
Bind string `json:"bind"`
// FFmpeg binary
FFmpeg string `json:"ffmpeg"`
// FFprobe binary
FFprobe string `json:"ffprobe"`
// Temp files directory
TempDir string `json:"tempdir"`
// Size of each chunk in seconds
ChunkSize int `json:"chunkSize"`
// How many *chunks* to look behind before restarting transcoding
LookBehind int `json:"lookBehind"`
// Number of chunks in goal to restart encoding
GoalBufferMin int `json:"goalBufferMin"`
// Number of chunks in goal to stop encoding
GoalBufferMax int `json:"goalBufferMax"`
// Number of seconds to wait before shutting down encoding
StreamIdleTime int `json:"streamIdleTime"`
// Number of seconds to wait before shutting down a client
ManagerIdleTime int `json:"managerIdleTime"`
// Quality Factor (e.g. CRF / global_quality)
QF int `json:"qf"`
// Hardware acceleration configuration
// VA-API
VAAPI bool `json:"vaapi"`
VAAPILowPower bool `json:"vaapiLowPower"`
// NVENC
NVENC bool `json:"nvenc"`
NVENCTemporalAQ bool `json:"nvencTemporalAQ"`
NVENCScale string `json:"nvencScale"` // cuda, npp
// Use transpose workaround for streaming (VA-API)
UseTranspose bool `json:"useTranspose"`
// Use GOP size workaround for streaming (NVENC)
UseGopSize bool `json:"useGopSize"`
}
func (c *Config) FromFile(path string) {
// load json config
content, err := ioutil.ReadFile(path)
if err != nil {
log.Fatal("Error when opening file: ", err)
}
err = json.Unmarshal(content, &c)
if err != nil {
log.Fatal("Error loading config file", err)
}
// Set config as loaded
c.Configured = true
c.Print()
}
func (c *Config) AutoDetect() {
// Auto-detect ffmpeg and ffprobe paths
if c.FFmpeg == "" || c.FFprobe == "" {
ffmpeg, err := exec.LookPath("ffmpeg")
if err != nil {
log.Fatal("Could not find ffmpeg")
}
ffprobe, err := exec.LookPath("ffprobe")
if err != nil {
log.Fatal("Could not find ffprobe")
}
c.FFmpeg = ffmpeg
c.FFprobe = ffprobe
}
// Auto-choose tempdir
if c.TempDir == "" {
c.TempDir = os.TempDir() + "/go-vod"
}
// Print updated config
c.Print()
}
func (c *Config) Print() {
log.Printf("%+v\n", c)
}

View File

@ -0,0 +1,239 @@
package transcoder
import (
"context"
"encoding/json"
"io/ioutil"
"log"
"net/http"
"os"
"strings"
"sync"
"time"
)
type Handler struct {
c *Config
server *http.Server
managers map[string]*Manager
mutex sync.RWMutex
close chan string
exitCode int
}
func NewHandler(c *Config) *Handler {
h := &Handler{
c: c,
managers: make(map[string]*Manager),
close: make(chan string),
exitCode: 0,
}
// Recreate tempdir
os.RemoveAll(c.TempDir)
os.MkdirAll(c.TempDir, 0755)
return h
}
func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Check version if monitoring is enabled
if h.c.VersionMonitor && !h.versionOk(w, r) {
return
}
url := r.URL.Path
parts := make([]string, 0)
// log.Println("Serving", url)
// Break url into parts
for _, part := range strings.Split(url, "/") {
if part != "" {
parts = append(parts, part)
}
}
// Serve actual file from manager
if len(parts) < 3 {
log.Println("Invalid URL", url)
w.WriteHeader(http.StatusBadRequest)
return
}
// Get streamid and chunk
streamid := parts[0]
path := "/" + strings.Join(parts[1:len(parts)-1], "/")
chunk := parts[len(parts)-1]
// Check if POST request to create temp file
if r.Method == "POST" && len(parts) >= 2 && parts[1] == "create" {
var err error
path, err = h.createTempFile(w, r, parts)
if err != nil {
return
}
}
// Check if test request
if chunk == "test" {
w.Header().Set("Content-Type", "application/json")
// check if test file is readable
size := 0
info, err := os.Stat(path)
if err == nil {
size = int(info.Size())
}
json.NewEncoder(w).Encode(map[string]interface{}{
"version": h.c.Version,
"size": size,
})
return
}
// Check if configuration request
if r.Method == "POST" && chunk == "config" {
w.Header().Set("Content-Type", "application/json")
// read new config
body, err := ioutil.ReadAll(r.Body)
if err != nil {
log.Println("Error reading body", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
// Unmarshal config
if err := json.Unmarshal(body, h.c); err != nil {
log.Println("Error unmarshaling config", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
// Set config as loaded
h.c.Configured = true
// Print loaded config
log.Printf("%+v\n", h.c)
return
}
// Check if configured
if !h.c.Configured {
w.WriteHeader(http.StatusServiceUnavailable)
return
}
// Check if valid
if streamid == "" || path == "" {
w.WriteHeader(http.StatusBadRequest)
return
}
// Get existing manager or create new one
manager := h.getManager(path, streamid)
if manager == nil {
manager = h.createManager(path, streamid)
}
// Failed to create manager
if manager == nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
// Serve chunk if asked for
if chunk != "" && chunk != "ignore" {
manager.ServeHTTP(w, r, chunk)
}
}
func (h *Handler) versionOk(w http.ResponseWriter, r *http.Request) bool {
expected := r.Header.Get("X-Go-Vod-Version")
if len(expected) > 0 && expected != h.c.Version {
log.Println("Version mismatch", expected, h.c.Version)
// Try again in some time
w.WriteHeader(http.StatusServiceUnavailable)
// Exit with status code 12
h.exitCode = 12
h.Close()
return false
}
return true
}
func (h *Handler) getManager(path string, streamid string) *Manager {
h.mutex.RLock()
defer h.mutex.RUnlock()
m := h.managers[streamid]
if m == nil || m.path != path {
return nil
}
return m
}
func (h *Handler) createManager(path string, streamid string) *Manager {
manager, err := NewManager(h.c, path, streamid, h.close)
if err != nil {
log.Println("Error creating manager", err)
freeIfTemp(path)
return nil
}
h.mutex.Lock()
defer h.mutex.Unlock()
old := h.managers[streamid]
if old != nil {
old.Destroy()
}
h.managers[streamid] = manager
return manager
}
func (h *Handler) removeManager(streamid string) {
h.mutex.Lock()
defer h.mutex.Unlock()
delete(h.managers, streamid)
}
func (h *Handler) Start() int {
log.Println("Starting go-vod " + h.c.Version + " on " + h.c.Bind)
h.server = &http.Server{Addr: h.c.Bind, Handler: h}
go func() {
err := h.server.ListenAndServe()
if err == http.ErrServerClosed {
log.Println("HTTP server closed")
} else if err != nil {
log.Fatal("Error starting server: ", err)
}
}()
for {
id := <-h.close
if id == "" {
break
}
h.removeManager(id)
}
// Stop server
log.Println("Shutting down HTTP server")
ctx, cancel := context.WithDeadline(context.TODO(), time.Now().Add(5*time.Second))
defer cancel()
h.server.Shutdown(ctx)
// Return status code
return h.exitCode
}
func (h *Handler) Close() {
h.close <- ""
}

View File

@ -0,0 +1,375 @@
package transcoder
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"hash/fnv"
"log"
"math"
"net/http"
"os"
"os/exec"
"sort"
"strconv"
"strings"
"time"
)
type Manager struct {
c *Config
path string
tempDir string
id string
close chan string
inactive int
probe *ProbeVideoData
numChunks int
streams map[string]*Stream
}
type ProbeVideoData struct {
Width int
Height int
Duration time.Duration
FrameRate int
CodecName string
BitRate int
Rotation int
}
func NewManager(c *Config, path string, id string, close chan string) (*Manager, error) {
m := &Manager{c: c, path: path, id: id, close: close}
m.streams = make(map[string]*Stream)
h := fnv.New32a()
h.Write([]byte(path))
ph := fmt.Sprint(h.Sum32())
m.tempDir = fmt.Sprintf("%s/%s-%s", m.c.TempDir, id, ph)
// Delete temp dir if exists
os.RemoveAll(m.tempDir)
os.MkdirAll(m.tempDir, 0755)
if err := m.ffprobe(); err != nil {
return nil, err
}
m.numChunks = int(math.Ceil(m.probe.Duration.Seconds() / float64(c.ChunkSize)))
// Possible streams
m.streams["480p"] = &Stream{c: c, m: m, quality: "480p", height: 480, width: 854, bitrate: 400}
m.streams["720p"] = &Stream{c: c, m: m, quality: "720p", height: 720, width: 1280, bitrate: 700}
m.streams["1080p"] = &Stream{c: c, m: m, quality: "1080p", height: 1080, width: 1920, bitrate: 1000}
m.streams["1440p"] = &Stream{c: c, m: m, quality: "1440p", height: 1440, width: 2560, bitrate: 1400}
m.streams["2160p"] = &Stream{c: c, m: m, quality: "2160p", height: 2160, width: 3840, bitrate: 3000}
// height is our primary dimension for scaling
// using the probed size, we adjust the width of the stream
// the smaller dimemension of the output should match the height here
smDim, lgDim := m.probe.Height, m.probe.Width
if m.probe.Height > m.probe.Width {
smDim, lgDim = lgDim, smDim
}
// Get the reference bitrate. This is the same as the current bitrate
// if the video is H.264, otherwise use double the current bitrate.
refBitrate := int(float64(m.probe.BitRate) / 2.0)
if m.probe.CodecName != CODEC_H264 {
refBitrate *= 2
}
// If bitrate could not be read, use 10Mbps
if refBitrate == 0 {
refBitrate = 10000000
}
// Get the multiplier for the reference bitrate.
// For this get the nearest stream size to the original.
origPixels := float64(m.probe.Height * m.probe.Width)
nearestPixels := float64(0)
nearestStream := ""
for key, stream := range m.streams {
streamPixels := float64(stream.height * stream.width)
if nearestPixels == 0 || math.Abs(origPixels-streamPixels) < math.Abs(origPixels-nearestPixels) {
nearestPixels = streamPixels
nearestStream = key
}
}
// Get the bitrate multiplier. This is the ratio of the reference
// bitrate to the nearest stream bitrate, so we can scale all streams.
bitrateMultiplier := 1.0
if nearestStream != "" {
bitrateMultiplier = float64(refBitrate) / float64(m.streams[nearestStream].bitrate)
}
// Only keep streams that are smaller than the video
for k, stream := range m.streams {
stream.order = 0
// scale bitrate using the multiplier
stream.bitrate = int(math.Ceil(float64(stream.bitrate) * bitrateMultiplier))
// now store the width of the stream as the larger dimension
stream.width = int(math.Ceil(float64(lgDim) * float64(stream.height) / float64(smDim)))
// remove invalid streams
if (stream.height >= smDim || stream.width >= lgDim) || // no upscaling; we're not AI
(float64(stream.bitrate) > float64(m.probe.BitRate)*0.8) || // no more than 80% of original bitrate
(stream.height%2 != 0 || stream.width%2 != 0) { // no odd dimensions
// remove stream
delete(m.streams, k)
continue
}
}
// Original stream
m.streams[QUALITY_MAX] = &Stream{
c: c, m: m,
quality: QUALITY_MAX,
height: m.probe.Height,
width: m.probe.Width,
bitrate: refBitrate,
order: 1,
}
// Start all streams
for _, stream := range m.streams {
go stream.Run()
}
log.Printf("%s: new manager for %s", m.id, m.path)
// Check for inactivity
go func() {
t := time.NewTicker(5 * time.Second)
defer t.Stop()
for {
<-t.C
if m.inactive == -1 {
t.Stop()
return
}
m.inactive++
// Check if any stream is active
for _, stream := range m.streams {
if stream.coder != nil {
m.inactive = 0
break
}
}
// Nothing done for 5 minutes
if m.inactive >= m.c.ManagerIdleTime/5 {
t.Stop()
m.Destroy()
m.close <- m.id
return
}
}
}()
return m, nil
}
// Destroys streams. DOES NOT emit on the close channel.
func (m *Manager) Destroy() {
log.Printf("%s: destroying manager", m.id)
m.inactive = -1
for _, stream := range m.streams {
stream.Stop()
}
// Delete temp dir
os.RemoveAll(m.tempDir)
// Delete file if temp
freeIfTemp(m.path)
}
func (m *Manager) ServeHTTP(w http.ResponseWriter, r *http.Request, chunk string) error {
// Master list
if chunk == "index.m3u8" {
return m.ServeIndex(w, r)
}
// Stream list
m3u8Sfx := ".m3u8"
if strings.HasSuffix(chunk, m3u8Sfx) {
quality := strings.TrimSuffix(chunk, m3u8Sfx)
if stream, ok := m.streams[quality]; ok {
return stream.ServeList(w, r)
}
}
// Stream chunk
tsSfx := ".ts"
if strings.HasSuffix(chunk, tsSfx) {
parts := strings.Split(chunk, "-")
if len(parts) != 2 {
w.WriteHeader(http.StatusBadRequest)
return nil
}
quality := parts[0]
chunkIdStr := strings.TrimSuffix(parts[1], tsSfx)
chunkId, err := strconv.Atoi(chunkIdStr)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return nil
}
if stream, ok := m.streams[quality]; ok {
return stream.ServeChunk(w, chunkId)
}
}
// Stream full video
mp4Sfx := ".mp4"
if strings.HasSuffix(chunk, mp4Sfx) {
quality := strings.TrimSuffix(chunk, mp4Sfx)
if stream, ok := m.streams[quality]; ok {
return stream.ServeFullVideo(w, r)
}
// Fall back to original
return m.streams[QUALITY_MAX].ServeFullVideo(w, r)
}
w.WriteHeader(http.StatusNotFound)
return nil
}
func (m *Manager) ServeIndex(w http.ResponseWriter, r *http.Request) error {
WriteM3U8ContentType(w)
w.Write([]byte("#EXTM3U\n"))
// get sorted streams by bitrate
streams := make([]*Stream, 0)
for _, stream := range m.streams {
streams = append(streams, stream)
}
sort.Slice(streams, func(i, j int) bool {
return streams[i].order < streams[j].order ||
(streams[i].order == streams[j].order && streams[i].bitrate < streams[j].bitrate)
})
// Write all streams
query := GetQueryString(r)
for _, stream := range streams {
s := fmt.Sprintf("#EXT-X-STREAM-INF:BANDWIDTH=%d,RESOLUTION=%dx%d,FRAME-RATE=%d\n%s.m3u8%s\n", stream.bitrate, stream.width, stream.height, m.probe.FrameRate, stream.quality, query)
w.Write([]byte(s))
}
return nil
}
func (m *Manager) ffprobe() error {
args := []string{
// Hide debug information
"-v", "error",
// Show everything
"-show_entries", "format:stream",
"-select_streams", "v", // Video stream only, we're not interested in audio
"-of", "json",
m.path,
}
ctx, cancel := context.WithDeadline(context.TODO(), time.Now().Add(5*time.Second))
defer cancel()
cmd := exec.CommandContext(ctx, m.c.FFprobe, args...)
var stdout, stderr bytes.Buffer
cmd.Stdout = &stdout
cmd.Stderr = &stderr
err := cmd.Run()
if err != nil {
log.Println(stderr.String())
return err
}
out := struct {
Streams []struct {
Width int `json:"width"`
Height int `json:"height"`
Duration string `json:"duration"`
FrameRate string `json:"avg_frame_rate"`
CodecName string `json:"codec_name"`
BitRate string `json:"bit_rate"`
SideDataList []struct {
SideDataType string `json:"side_data_type"`
Rotation int `json:"rotation"`
} `json:"side_data_list"`
} `json:"streams"`
Format struct {
Duration string `json:"duration"`
} `json:"format"`
}{}
if err := json.Unmarshal(stdout.Bytes(), &out); err != nil {
return err
}
if len(out.Streams) == 0 {
return errors.New("no video streams found")
}
var duration time.Duration
if out.Streams[0].Duration != "" {
duration, _ = time.ParseDuration(out.Streams[0].Duration + "s")
} else if out.Format.Duration != "" {
duration, _ = time.ParseDuration(out.Format.Duration + "s")
}
// FrameRate is a fraction string
frac := strings.Split(out.Streams[0].FrameRate, "/")
if len(frac) != 2 {
frac = []string{"30", "1"}
}
num, e1 := strconv.Atoi(frac[0])
den, e2 := strconv.Atoi(frac[1])
if e1 != nil || e2 != nil {
num = 30
den = 1
}
frameRate := float64(num) / float64(den)
// BitRate is a string
bitRate, err := strconv.Atoi(out.Streams[0].BitRate)
if err != nil {
bitRate = 5000000
}
// Get rotation from side data
rotation := 0
for _, sideData := range out.Streams[0].SideDataList {
if sideData.SideDataType == "Display Matrix" {
rotation = sideData.Rotation
}
}
m.probe = &ProbeVideoData{
Width: out.Streams[0].Width,
Height: out.Streams[0].Height,
Duration: duration,
FrameRate: int(frameRate),
CodecName: out.Streams[0].CodecName,
BitRate: bitRate,
Rotation: rotation,
}
return nil
}

View File

@ -0,0 +1,704 @@
package transcoder
import (
"bufio"
"fmt"
"io"
"log"
"net/http"
"os"
"os/exec"
"strconv"
"strings"
"sync"
"syscall"
"time"
)
const (
ENCODER_COPY = "copy"
ENCODER_X264 = "libx264"
ENCODER_VAAPI = "h264_vaapi"
ENCODER_NVENC = "h264_nvenc"
QUALITY_MAX = "max"
CODEC_H264 = "h264"
)
type Stream struct {
c *Config
m *Manager
quality string
order int
height int
width int
bitrate int
goal int
mutex sync.Mutex
chunks map[int]*Chunk
seenChunks map[int]bool // only for stdout reader
coder *exec.Cmd
inactive int
stop chan bool
}
func (s *Stream) Run() {
// run every 5s
t := time.NewTicker(5 * time.Second)
defer t.Stop()
s.stop = make(chan bool)
for {
select {
case <-t.C:
s.mutex.Lock()
// Prune chunks
for id := range s.chunks {
if id < s.goal-s.c.GoalBufferMax {
s.pruneChunk(id)
}
}
s.inactive++
// Nothing done for 2 minutes
if s.inactive >= s.c.StreamIdleTime/5 && s.coder != nil {
t.Stop()
s.clear()
}
s.mutex.Unlock()
case <-s.stop:
t.Stop()
s.mutex.Lock()
s.clear()
s.mutex.Unlock()
return
}
}
}
func (s *Stream) clear() {
log.Printf("%s-%s: stopping stream", s.m.id, s.quality)
for _, chunk := range s.chunks {
// Delete files
s.pruneChunk(chunk.id)
}
s.chunks = make(map[int]*Chunk)
s.seenChunks = make(map[int]bool)
s.goal = 0
if s.coder != nil {
s.coder.Process.Kill()
s.coder.Wait()
s.coder = nil
}
}
func (s *Stream) Stop() {
select {
case s.stop <- true:
default:
}
}
func (s *Stream) ServeList(w http.ResponseWriter, r *http.Request) error {
WriteM3U8ContentType(w)
w.Write([]byte("#EXTM3U\n"))
w.Write([]byte("#EXT-X-VERSION:4\n"))
w.Write([]byte("#EXT-X-MEDIA-SEQUENCE:0\n"))
w.Write([]byte("#EXT-X-PLAYLIST-TYPE:VOD\n"))
w.Write([]byte(fmt.Sprintf("#EXT-X-TARGETDURATION:%d\n", s.c.ChunkSize)))
query := GetQueryString(r)
duration := s.m.probe.Duration.Seconds()
i := 0
for duration > 0 {
size := float64(s.c.ChunkSize)
if duration < size {
size = duration
}
w.Write([]byte(fmt.Sprintf("#EXTINF:%.3f, nodesc\n", size)))
w.Write([]byte(fmt.Sprintf("%s-%06d.ts%s\n", s.quality, i, query)))
duration -= float64(s.c.ChunkSize)
i++
}
w.Write([]byte("#EXT-X-ENDLIST\n"))
return nil
}
func (s *Stream) ServeChunk(w http.ResponseWriter, id int) error {
s.mutex.Lock()
defer s.mutex.Unlock()
s.inactive = 0
s.checkGoal(id)
// Already have this chunk
if chunk, ok := s.chunks[id]; ok {
// Chunk is finished, just return it
if chunk.done {
s.returnChunk(w, chunk)
return nil
}
// Still waiting on transcoder
s.waitForChunk(w, chunk)
return nil
}
// Will have this soon enough
foundBehind := false
for i := id - 1; i > id-s.c.LookBehind && i >= 0; i-- {
if _, ok := s.chunks[i]; ok {
foundBehind = true
}
}
if foundBehind {
// Make sure the chunk exists
chunk := s.createChunk(id)
// Wait for it
s.waitForChunk(w, chunk)
return nil
}
// Let's start over
s.restartAtChunk(w, id)
return nil
}
func (s *Stream) ServeFullVideo(w http.ResponseWriter, r *http.Request) error {
args := s.transcodeArgs(0, false)
if s.m.probe.CodecName == CODEC_H264 && s.quality == QUALITY_MAX {
// try to just send the original file
http.ServeFile(w, r, s.m.path)
return nil
}
// Output mov
args = append(args, []string{
"-movflags", "frag_keyframe+empty_moov+faststart",
"-f", "mp4", "pipe:1",
}...)
coder := exec.Command(s.c.FFmpeg, args...)
log.Printf("%s-%s: %s", s.m.id, s.quality, strings.Join(coder.Args[:], " "))
cmdStdOut, err := coder.StdoutPipe()
if err != nil {
log.Printf("FATAL: ffmpeg command stdout failed with %s\n", err)
}
cmdStdErr, err := coder.StderrPipe()
if err != nil {
log.Printf("FATAL: ffmpeg command stdout failed with %s\n", err)
}
err = coder.Start()
if err != nil {
log.Printf("FATAL: ffmpeg command failed with %s\n", err)
}
go s.monitorStderr(cmdStdErr)
// Write to response
defer cmdStdOut.Close()
stdoutReader := bufio.NewReader(cmdStdOut)
// Write mov headers
w.Header().Set("Content-Type", "video/mp4")
w.WriteHeader(http.StatusOK)
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "Server does not support Flusher!",
http.StatusInternalServerError)
return nil
}
// Write data, flusing every 1MB
buf := make([]byte, 1024*1024)
for {
n, err := stdoutReader.Read(buf)
if err != nil {
if err == io.EOF {
break
}
log.Printf("FATAL: ffmpeg command failed with %s\n", err)
break
}
_, err = w.Write(buf[:n])
if err != nil {
log.Printf("%s-%s: client closed connection", s.m.id, s.quality)
log.Println(err)
break
}
flusher.Flush()
}
// Terminate ffmpeg process
coder.Process.Kill()
coder.Wait()
return nil
}
func (s *Stream) createChunk(id int) *Chunk {
if c, ok := s.chunks[id]; ok {
return c
} else {
s.chunks[id] = NewChunk(id)
return s.chunks[id]
}
}
func (s *Stream) pruneChunk(id int) {
delete(s.chunks, id)
// Remove file
filename := s.getTsPath(id)
os.Remove(filename)
}
func (s *Stream) returnChunk(w http.ResponseWriter, chunk *Chunk) {
// This function is called with lock, but we don't need it
s.mutex.Unlock()
defer s.mutex.Lock()
// Read file and write to response
filename := s.getTsPath(chunk.id)
f, err := os.Open(filename)
if err != nil {
log.Println(err)
w.WriteHeader(http.StatusInternalServerError)
return
}
defer f.Close()
w.Header().Set("Content-Type", "video/MP2T")
io.Copy(w, f)
}
func (s *Stream) waitForChunk(w http.ResponseWriter, chunk *Chunk) {
if chunk.done {
s.returnChunk(w, chunk)
return
}
// Add our channel
notif := make(chan bool)
chunk.notifs = append(chunk.notifs, notif)
t := time.NewTimer(10 * time.Second)
coder := s.coder
s.mutex.Unlock()
select {
case <-notif:
t.Stop()
case <-t.C:
}
s.mutex.Lock()
// remove channel
for i, c := range chunk.notifs {
if c == notif {
chunk.notifs = append(chunk.notifs[:i], chunk.notifs[i+1:]...)
break
}
}
// check for success
if chunk.done {
s.returnChunk(w, chunk)
return
}
// Check if coder was changed
if coder != s.coder {
w.WriteHeader(http.StatusConflict)
return
}
// Return timeout error
w.WriteHeader(http.StatusRequestTimeout)
}
func (s *Stream) restartAtChunk(w http.ResponseWriter, id int) {
// Stop current transcoder
s.clear()
chunk := s.createChunk(id) // create first chunk
// Start the transcoder
s.goal = id + s.c.GoalBufferMax
s.transcode(id)
s.waitForChunk(w, chunk) // this is also a request
}
// Get arguments to ffmpeg
func (s *Stream) transcodeArgs(startAt float64, isHls bool) []string {
args := []string{
"-loglevel", "warning",
}
if startAt > 0 {
args = append(args, []string{
"-ss", fmt.Sprintf("%.6f", startAt),
}...)
}
// encoder selection
CV := ENCODER_X264
// Check whether hwaccel should be used
if s.c.VAAPI {
CV = ENCODER_VAAPI
extra := "-hwaccel vaapi -hwaccel_device /dev/dri/renderD128 -hwaccel_output_format vaapi"
args = append(args, strings.Split(extra, " ")...)
} else if s.c.NVENC {
CV = ENCODER_NVENC
extra := "-hwaccel cuda"
args = append(args, strings.Split(extra, " ")...)
}
// Disable autorotation (see transpose comments below)
if s.c.UseTranspose {
args = append(args, []string{"-noautorotate"}...)
}
// Input specs
args = append(args, []string{
"-i", s.m.path, // Input file
"-copyts", // So the "-to" refers to the original TS
"-fflags", "+genpts",
}...)
// Filters
format := "format=nv12"
scaler := "scale"
scalerArgs := make([]string, 0)
scalerArgs = append(scalerArgs, "force_original_aspect_ratio=decrease")
if CV == ENCODER_VAAPI {
format = "format=nv12|vaapi,hwupload"
scaler = "scale_vaapi"
scalerArgs = append(scalerArgs, "format=nv12")
} else if CV == ENCODER_NVENC {
format = "format=nv12|cuda,hwupload"
scaler = fmt.Sprintf("scale_%s", s.c.NVENCScale)
// workaround to force scale_cuda to examine all input frames
if s.c.NVENCScale == "cuda" {
scalerArgs = append(scalerArgs, "passthrough=0")
}
}
// Scale height and width if not max quality
if s.quality != QUALITY_MAX {
maxDim := s.height
if s.width > s.height {
maxDim = s.width
}
scalerArgs = append(scalerArgs, fmt.Sprintf("w=%d", maxDim))
scalerArgs = append(scalerArgs, fmt.Sprintf("h=%d", maxDim))
}
// Apply filter
if CV != ENCODER_COPY {
filter := fmt.Sprintf("%s,%s=%s", format, scaler, strings.Join(scalerArgs, ":"))
// Rotation is a mess: https://trac.ffmpeg.org/ticket/8329
// 1/ -noautorotate copies the sidecar metadata to the output
// 2/ autorotation doesn't seem to work with some types of HW (at least not with VAAPI)
// 3/ autorotation doesn't work with HLS streams
// 4/ VAAPI cannot transport on AMD GPUs
// So: give the user to disable autorotation for HLS and use a manual transpose
if isHls && s.c.UseTranspose {
transposer := "transpose"
if CV == ENCODER_VAAPI {
transposer = "transpose_vaapi"
} else if CV == ENCODER_NVENC {
transposer = fmt.Sprintf("transpose_%s", s.c.NVENCScale)
}
if transposer != "transpose_cuda" { // does not exist
if s.m.probe.Rotation == -90 {
filter = fmt.Sprintf("%s,%s=1", filter, transposer)
} else if s.m.probe.Rotation == 90 {
filter = fmt.Sprintf("%s,%s=2", filter, transposer)
} else if s.m.probe.Rotation == 180 || s.m.probe.Rotation == -180 {
filter = fmt.Sprintf("%s,%s=1,%s=1", filter, transposer, transposer)
}
}
}
args = append(args, []string{"-vf", filter}...)
}
// Output specs for video
args = append(args, []string{
"-map", "0:v:0",
"-c:v", CV,
}...)
// Device specific output args
if CV == ENCODER_VAAPI {
args = append(args, []string{"-global_quality", fmt.Sprintf("%d", s.c.QF)}...)
if s.c.VAAPILowPower {
args = append(args, []string{"-low_power", "1"}...)
}
} else if CV == ENCODER_NVENC {
args = append(args, []string{
"-preset", "p6",
"-tune", "ll",
"-rc", "vbr",
"-rc-lookahead", "30",
"-cq", fmt.Sprintf("%d", s.c.QF),
}...)
if s.c.NVENCTemporalAQ {
args = append(args, []string{"-temporal-aq", "1"}...)
}
} else if CV == ENCODER_X264 {
args = append(args, []string{
"-preset", "faster",
"-crf", fmt.Sprintf("%d", s.c.QF),
}...)
}
// Audio output specs
args = append(args, []string{
"-map", "0:a:0?",
"-c:a", "aac",
"-ac", "1",
}...)
return args
}
func (s *Stream) transcode(startId int) {
if startId > 0 {
// Start one frame before
// This ensures that the keyframes are aligned
startId--
}
startAt := float64(startId * s.c.ChunkSize)
args := s.transcodeArgs(startAt, true)
// Segmenting specs
args = append(args, []string{
"-start_number", fmt.Sprintf("%d", startId),
"-avoid_negative_ts", "disabled",
"-f", "hls",
"-hls_flags", "split_by_time",
"-hls_time", fmt.Sprintf("%d", s.c.ChunkSize),
"-hls_segment_type", "mpegts",
"-hls_segment_filename", s.getTsPath(-1),
}...)
// Keyframe specs
if s.c.UseGopSize && s.m.probe.FrameRate > 0 {
// Fix GOP size
args = append(args, []string{
"-g", fmt.Sprintf("%d", s.c.ChunkSize*s.m.probe.FrameRate),
"-keyint_min", fmt.Sprintf("%d", s.c.ChunkSize*s.m.probe.FrameRate),
}...)
} else {
// Force keyframes every chunk
args = append(args, []string{
"-force_key_frames", fmt.Sprintf("expr:gte(t,n_forced*%d)", s.c.ChunkSize),
}...)
}
// Output to stdout
args = append(args, "-")
// Start the process
s.coder = exec.Command(s.c.FFmpeg, args...)
// Log command, quoting the args as needed
quotedArgs := make([]string, len(s.coder.Args))
invalidChars := strings.Join([]string{" ", "=", ":", "\"", "\\", "\n", "\t"}, "")
for i, arg := range s.coder.Args {
if strings.ContainsAny(arg, invalidChars) {
quotedArgs[i] = fmt.Sprintf("\"%s\"", arg)
} else {
quotedArgs[i] = arg
}
}
log.Printf("%s-%s: %s", s.m.id, s.quality, strings.Join(quotedArgs[:], " "))
cmdStdOut, err := s.coder.StdoutPipe()
if err != nil {
log.Printf("FATAL: ffmpeg command stdout failed with %s\n", err)
}
cmdStdErr, err := s.coder.StderrPipe()
if err != nil {
log.Printf("FATAL: ffmpeg command stdout failed with %s\n", err)
}
err = s.coder.Start()
if err != nil {
log.Printf("FATAL: ffmpeg command failed with %s\n", err)
}
go s.monitorTranscodeOutput(cmdStdOut, startAt)
go s.monitorStderr(cmdStdErr)
go s.monitorExit()
}
func (s *Stream) checkGoal(id int) {
goal := id + s.c.GoalBufferMin
if goal > s.goal {
s.goal = id + s.c.GoalBufferMax
// resume encoding
if s.coder != nil {
log.Printf("%s-%s: resuming transcoding", s.m.id, s.quality)
s.coder.Process.Signal(syscall.SIGCONT)
}
}
}
func (s *Stream) getTsPath(id int) string {
if id == -1 {
return fmt.Sprintf("%s/%s-%%06d.ts", s.m.tempDir, s.quality)
}
return fmt.Sprintf("%s/%s-%06d.ts", s.m.tempDir, s.quality, id)
}
// Separate goroutine
func (s *Stream) monitorTranscodeOutput(cmdStdOut io.ReadCloser, startAt float64) {
s.mutex.Lock()
coder := s.coder
s.mutex.Unlock()
defer cmdStdOut.Close()
stdoutReader := bufio.NewReader(cmdStdOut)
for {
if s.coder != coder {
break
}
line, err := stdoutReader.ReadBytes('\n')
if err == io.EOF {
if len(line) == 0 {
break
}
} else if err != nil {
log.Println(err)
break
} else {
line = line[:(len(line) - 1)]
}
l := string(line)
if strings.Contains(l, ".ts") {
// 1080p-000003.ts
idx := strings.Split(strings.Split(l, "-")[1], ".")[0]
id, err := strconv.Atoi(idx)
if err != nil {
log.Println("Error parsing chunk id")
}
if s.seenChunks[id] {
continue
}
s.seenChunks[id] = true
// Debug
log.Printf("%s-%s: recv %s", s.m.id, s.quality, l)
func() {
s.mutex.Lock()
defer s.mutex.Unlock()
// The coder has changed; do nothing
if s.coder != coder {
return
}
// Notify everyone
chunk := s.createChunk(id)
if chunk.done {
return
}
chunk.done = true
for _, n := range chunk.notifs {
n <- true
}
// Check goal satisfied
if id >= s.goal {
log.Printf("%s-%s: goal satisfied: %d", s.m.id, s.quality, s.goal)
s.coder.Process.Signal(syscall.SIGSTOP)
}
}()
}
}
}
func (s *Stream) monitorStderr(cmdStdErr io.ReadCloser) {
stderrReader := bufio.NewReader(cmdStdErr)
for {
line, err := stderrReader.ReadBytes('\n')
if err == io.EOF {
if len(line) == 0 {
break
}
} else if err != nil {
log.Println(err)
break
} else {
line = line[:(len(line) - 1)]
}
log.Println("ffmpeg-error:", string(line))
}
}
func (s *Stream) monitorExit() {
// Join the process
coder := s.coder
err := coder.Wait()
// Try to get exit status
if exitError, ok := err.(*exec.ExitError); ok {
exitcode := exitError.ExitCode()
log.Printf("%s-%s: ffmpeg exited with status: %d", s.m.id, s.quality, exitcode)
s.mutex.Lock()
defer s.mutex.Unlock()
// If error code is >0, there was an error in transcoding
if exitcode > 0 && s.coder == coder {
// Notify all outstanding chunks
for _, chunk := range s.chunks {
for _, n := range chunk.notifs {
n <- true
}
}
}
}
}

View File

@ -0,0 +1,49 @@
package transcoder
import (
"encoding/json"
"io/ioutil"
"log"
"net/http"
"os"
"strings"
)
func (h *Handler) createTempFile(w http.ResponseWriter, r *http.Request, parts []string) (string, error) {
streamid := parts[0]
body, err := ioutil.ReadAll(r.Body)
if err != nil {
log.Println("Error reading body", err)
w.WriteHeader(http.StatusInternalServerError)
return "", err
}
// Create temporary file
file, err := ioutil.TempFile(h.c.TempDir, streamid+"-govod-temp-")
if err != nil {
log.Println("Error creating temp file", err)
w.WriteHeader(http.StatusInternalServerError)
return "", err
}
defer file.Close()
// Write data to file
if _, err := file.Write(body); err != nil {
log.Println("Error writing to temp file", err)
w.WriteHeader(http.StatusInternalServerError)
return "", err
}
// Return full path to file in JSON
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]string{"path": file.Name()})
// Return path to file
return file.Name(), nil
}
func freeIfTemp(path string) {
if strings.Contains(path, "-govod-temp-") {
os.Remove(path)
}
}

View File

@ -0,0 +1,15 @@
package transcoder
import "net/http"
func GetQueryString(r *http.Request) string {
query := r.URL.Query().Encode()
if query != "" {
query = "?" + query
}
return query
}
func WriteM3U8ContentType(w http.ResponseWriter) {
w.Header().Set("Content-Type", "application/x-mpegURL")
}