diff --git a/go-vod/.dockerignore b/go-vod/.dockerignore new file mode 100644 index 00000000..94143827 --- /dev/null +++ b/go-vod/.dockerignore @@ -0,0 +1 @@ +Dockerfile diff --git a/go-vod/.github/workflows/release.yml b/go-vod/.github/workflows/release.yml new file mode 100644 index 00000000..54dd8f0d --- /dev/null +++ b/go-vod/.github/workflows/release.yml @@ -0,0 +1,68 @@ +name: release + +on: + push: + tags: + - "*" + +jobs: + binary: + name: Binary + runs-on: ubuntu-latest + + container: + image: golang:1.20-bullseye + + steps: + - name: Checkout + uses: actions/checkout@v3 + + - name: Build + run: | + CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -buildvcs=false -ldflags="-s -w" -o go-vod-amd64 + CGO_ENABLED=0 GOOS=linux GOARCH=arm64 go build -buildvcs=false -ldflags="-s -w" -o go-vod-arm64 + + - name: Upload to releases + uses: svenstaro/upload-release-action@v2 + id: attach_to_release + with: + file: go-vod-* + file_glob: true + tag: ${{ github.ref }} + overwrite: true + + docker: + runs-on: ubuntu-latest + + name: Docker + + steps: + - name: Check out the repo + uses: actions/checkout@v4 + + - name: Set up QEMU + uses: docker/setup-qemu-action@v3 + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + + - name: Login to DockerHub + uses: docker/login-action@v3 + with: + username: ${{ secrets.DOCKERHUB_USERNAME }} + password: ${{ secrets.DOCKERHUB_TOKEN }} + + - name: Get image label + id: image_label + run: echo "label=${GITHUB_REF#refs/tags/}" >> $GITHUB_OUTPUT + + - name: Build container image + uses: docker/build-push-action@v5 + with: + push: true + platforms: linux/amd64,linux/arm64 + context: './' + no-cache: true + file: 'Dockerfile' + tags: radialapps/go-vod:${{ steps.image_label.outputs.label }} , radialapps/go-vod:latest + provenance: false diff --git a/go-vod/.gitignore b/go-vod/.gitignore new file mode 100644 index 00000000..5543a277 --- /dev/null +++ b/go-vod/.gitignore @@ -0,0 +1,22 @@ +# If you prefer the allow list template instead of the deny list, see community template: +# https://github.com/github/gitignore/blob/main/community/Golang/Go.AllowList.gitignore +# +# Binaries for programs and plugins +*.exe +*.exe~ +*.dll +*.so +*.dylib +go-vod + +# Test binary, built with `go test -c` +*.test + +# Output of the go coverage tool, specifically when used with LiteIDE +*.out + +# Dependency directories (remove the comment below to include it) +# vendor/ + +# Go workspace file +go.work diff --git a/go-vod/Dockerfile b/go-vod/Dockerfile new file mode 100644 index 00000000..360e1d88 --- /dev/null +++ b/go-vod/Dockerfile @@ -0,0 +1,7 @@ +FROM linuxserver/ffmpeg:latest + +COPY run.sh /go-vod.sh + +EXPOSE 47788 + +ENTRYPOINT ["/go-vod.sh"] diff --git a/go-vod/LICENSE b/go-vod/LICENSE new file mode 100644 index 00000000..7a4a3ea2 --- /dev/null +++ b/go-vod/LICENSE @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. \ No newline at end of file diff --git a/go-vod/README.md b/go-vod/README.md new file mode 100644 index 00000000..3be9c5f4 --- /dev/null +++ b/go-vod/README.md @@ -0,0 +1,26 @@ +# go-vod + +Extremely minimal on-demand video transcoding server in go. Used by the FOSS photos app, [Memories](https://github.com/pulsejet/memories). + +## Filing Issues + +Please file issues at the [Memories](https://github.com/pulsejet/memories) repository. + +## Usage + +Note: this package provides bespoke functionality for Memories. As such it is not intended to be used as a library. + +You need go and ffmpeg/ffprobe installed + +```bash +CGO_ENABLED=0 go build -ldflags="-s -w" +./go-vod +``` + +The server exposes all files as HLS streams, at the URL +``` +http://localhost:47788/player-id/path/to/file/index.m3u8 +``` + +## Thanks +Partially inspired from [go-transcode](https://github.com/m1k1o/go-transcode). The projects use different approaches for segmenting the transcodes. diff --git a/go-vod/build-ffmpeg-nvidia.sh b/go-vod/build-ffmpeg-nvidia.sh new file mode 100755 index 00000000..107ed36a --- /dev/null +++ b/go-vod/build-ffmpeg-nvidia.sh @@ -0,0 +1,39 @@ +#!/bin/bash + +set -e + +# This script is intended for bare-metal installations. +# It builds ffmpeg and NVENC drivers from source. + +apt-get remove -y ffmpeg + +apt-get update +apt-get install -y \ + sudo curl wget \ + autoconf libtool libdrm-dev xorg xorg-dev openbox \ + libx11-dev libgl1-mesa-glx libgl1-mesa-dev \ + xcb libxcb-xkb-dev x11-xkb-utils libx11-xcb-dev \ + libxkbcommon-x11-dev libxcb-dri3-dev \ + cmake git nasm build-essential \ + libx264-dev \ + libffmpeg-nvenc-dev clang + +git clone --branch sdk/11.1 https://git.videolan.org/git/ffmpeg/nv-codec-headers.git +cd nv-codec-headers +sudo make install +cd .. + +git clone --depth 1 --branch n5.1.3 https://github.com/FFmpeg/FFmpeg +cd FFmpeg +./configure \ + --enable-nonfree \ + --enable-gpl \ + --enable-libx264 \ + --enable-nvenc \ + --enable-ffnvcodec \ + --enable-cuda-llvm + +make -j"$(nproc)" +sudo make install +sudo ldconfig +cd .. diff --git a/go-vod/build-ffmpeg.sh b/go-vod/build-ffmpeg.sh new file mode 100755 index 00000000..96bf7288 --- /dev/null +++ b/go-vod/build-ffmpeg.sh @@ -0,0 +1,62 @@ +#!/bin/bash + +# This script is intended for bare-metal installations. +# It builds ffmpeg and VA-API drivers from source. + +set -e + +apt-get remove -y libva ffmpeg + +apt-get update +apt-get install -y \ + sudo curl wget \ + autoconf libtool libdrm-dev xorg xorg-dev openbox \ + libx11-dev libgl1-mesa-glx libgl1-mesa-dev \ + xcb libxcb-xkb-dev x11-xkb-utils libx11-xcb-dev \ + libxkbcommon-x11-dev libxcb-dri3-dev \ + cmake git nasm build-essential \ + libx264-dev + +mkdir qsvbuild +cd qsvbuild + +git clone --depth 1 --branch 2.18.0 https://github.com/intel/libva +cd libva +./autogen.sh +make +sudo make install +sudo ldconfig +cd .. + +git clone --depth 1 --branch intel-gmmlib-22.3.5 https://github.com/intel/gmmlib +cd gmmlib +mkdir build && cd build +cmake .. +make -j"$(nproc)" +sudo make install +sudo ldconfig +cd ../.. + +git clone --depth 1 --branch intel-media-23.1.6 https://github.com/intel/media-driver +mkdir -p build_media +cd build_media +cmake ../media-driver +make -j"$(nproc)" +sudo make install +sudo ldconfig +cd .. + +git clone --depth 1 --branch n6.0 https://github.com/FFmpeg/FFmpeg +cd FFmpeg +./configure \ + --enable-nonfree \ + --enable-gpl \ + --enable-libx264 + +make -j"$(nproc)" +sudo make install +sudo ldconfig +cd .. + +cd .. +rm -rf qsvbuild diff --git a/go-vod/dev.Dockerfile b/go-vod/dev.Dockerfile new file mode 100644 index 00000000..d3aececf --- /dev/null +++ b/go-vod/dev.Dockerfile @@ -0,0 +1,12 @@ +FROM golang:bullseye AS builder +WORKDIR /app +COPY . . +RUN CGO_ENABLED=0 go build -buildvcs=false -ldflags="-s -w" + +FROM linuxserver/ffmpeg:latest + +COPY --from=builder /app/go-vod . + +EXPOSE 47788 + +ENTRYPOINT ["/go-vod"] \ No newline at end of file diff --git a/go-vod/go.mod b/go-vod/go.mod new file mode 100644 index 00000000..3b63aa0f --- /dev/null +++ b/go-vod/go.mod @@ -0,0 +1,3 @@ +module github.com/pulsejet/go-vod + +go 1.16 diff --git a/go-vod/main.go b/go-vod/main.go new file mode 100644 index 00000000..d84db95c --- /dev/null +++ b/go-vod/main.go @@ -0,0 +1,48 @@ +package main + +import ( + "fmt" + "log" + "os" + + "github.com/pulsejet/go-vod/transcoder" +) + +const VERSION = "0.1.28" + +func main() { + // Build initial configuration + c := &transcoder.Config{ + VersionMonitor: false, + Version: VERSION, + Bind: ":47788", + ChunkSize: 3, + LookBehind: 3, + GoalBufferMin: 1, + GoalBufferMax: 4, + StreamIdleTime: 60, + ManagerIdleTime: 60, + } + + // Parse arguments + for _, arg := range os.Args[1:] { + if arg == "-version-monitor" { + c.VersionMonitor = true + } else if arg == "-version" { + fmt.Print("go-vod " + VERSION) + return + } else { + c.FromFile(arg) // config file + } + } + + // Auto detect ffmpeg and ffprobe + c.AutoDetect() + + // Start server + code := transcoder.NewHandler(c).Start() + + // Exit + log.Println("Exiting go-vod with status code", code) + os.Exit(code) +} diff --git a/go-vod/run.sh b/go-vod/run.sh new file mode 100755 index 00000000..0feb5d2a --- /dev/null +++ b/go-vod/run.sh @@ -0,0 +1,67 @@ +#!/bin/bash + +# This script fetches the current version of go-vod from Nextcloud +# to the working directory and runs it. If go-vod exits with a restart +# code, the script will restart it. + +# This script is intended to be run by systemd if running on bare metal. + +# Environment variables +HOST=$NEXTCLOUD_HOST +ALLOW_INSECURE=$NEXTCLOUD_ALLOW_INSECURE + +# check if host is set +if [[ -z $HOST ]]; then + echo "fatal: NEXTCLOUD_HOST is not set" + exit 1 +fi + +# check if scheme is set +if [[ ! $HOST == http://* ]] && [[ ! $HOST == https://* ]]; then + echo "fatal: NEXTCLOUD_HOST must start with http:// or https://" + exit 1 +fi + +# check if scheme is http and allow_insecure is not set +if [[ $HOST == http://* ]] && [[ -z $ALLOW_INSECURE ]]; then + echo "fatal: NEXTCLOUD_HOST is set to http:// but NEXTCLOUD_ALLOW_INSECURE is not set" + exit 1 +fi + +# build URL to fetch binary from Nextcloud +ARCH=$(uname -m) +URL="$HOST/index.php/apps/memories/static/go-vod?arch=$ARCH" + +# set the -k option in curl if allow_insecure is set +EXTRA_CURL_ARGS="" +if [[ $ALLOW_INSECURE == true ]]; then + EXTRA_CURL_ARGS="$EXTRA_CURL_ARGS -k" +fi + +# fetch binary, sleeping 10 seconds between retries +function fetch_binary { + while true; do + rm -f go-vod + curl $EXTRA_CURL_ARGS -L -f -m 10 -s -o go-vod $URL + if [[ $? == 0 ]]; then + chmod +x go-vod + echo "Fetched $URL successfully!" + break + fi + echo "Failed to fetch $URL" + echo "Are you sure the host is reachable and running Memories v6+?" + echo "Retrying in 10 seconds..." + sleep 10 + done +} + +# infinite loop +while true; do + fetch_binary + ./go-vod -version-monitor + if [[ $? != 12 ]]; then + break + fi + + sleep 3 # throttle +done \ No newline at end of file diff --git a/go-vod/transcoder/chunk.go b/go-vod/transcoder/chunk.go new file mode 100644 index 00000000..77018491 --- /dev/null +++ b/go-vod/transcoder/chunk.go @@ -0,0 +1,15 @@ +package transcoder + +type Chunk struct { + id int + done bool + notifs []chan bool +} + +func NewChunk(id int) *Chunk { + return &Chunk{ + id: id, + done: false, + notifs: make([]chan bool, 0), + } +} diff --git a/go-vod/transcoder/config.go b/go-vod/transcoder/config.go new file mode 100644 index 00000000..526ed890 --- /dev/null +++ b/go-vod/transcoder/config.go @@ -0,0 +1,111 @@ +package transcoder + +import ( + "encoding/json" + "io/ioutil" + "log" + "os" + "os/exec" +) + +type Config struct { + // Current version of go-vod + Version string + + // Is this server configured? + Configured bool + + // Restart the server if incorrect version detected + VersionMonitor bool + + // Bind address + Bind string `json:"bind"` + + // FFmpeg binary + FFmpeg string `json:"ffmpeg"` + // FFprobe binary + FFprobe string `json:"ffprobe"` + // Temp files directory + TempDir string `json:"tempdir"` + + // Size of each chunk in seconds + ChunkSize int `json:"chunkSize"` + // How many *chunks* to look behind before restarting transcoding + LookBehind int `json:"lookBehind"` + // Number of chunks in goal to restart encoding + GoalBufferMin int `json:"goalBufferMin"` + // Number of chunks in goal to stop encoding + GoalBufferMax int `json:"goalBufferMax"` + + // Number of seconds to wait before shutting down encoding + StreamIdleTime int `json:"streamIdleTime"` + // Number of seconds to wait before shutting down a client + ManagerIdleTime int `json:"managerIdleTime"` + + // Quality Factor (e.g. CRF / global_quality) + QF int `json:"qf"` + + // Hardware acceleration configuration + + // VA-API + VAAPI bool `json:"vaapi"` + VAAPILowPower bool `json:"vaapiLowPower"` + + // NVENC + NVENC bool `json:"nvenc"` + NVENCTemporalAQ bool `json:"nvencTemporalAQ"` + NVENCScale string `json:"nvencScale"` // cuda, npp + + // Use transpose workaround for streaming (VA-API) + UseTranspose bool `json:"useTranspose"` + + // Use GOP size workaround for streaming (NVENC) + UseGopSize bool `json:"useGopSize"` +} + +func (c *Config) FromFile(path string) { + // load json config + content, err := ioutil.ReadFile(path) + if err != nil { + log.Fatal("Error when opening file: ", err) + } + + err = json.Unmarshal(content, &c) + if err != nil { + log.Fatal("Error loading config file", err) + } + + // Set config as loaded + c.Configured = true + c.Print() +} + +func (c *Config) AutoDetect() { + // Auto-detect ffmpeg and ffprobe paths + if c.FFmpeg == "" || c.FFprobe == "" { + ffmpeg, err := exec.LookPath("ffmpeg") + if err != nil { + log.Fatal("Could not find ffmpeg") + } + + ffprobe, err := exec.LookPath("ffprobe") + if err != nil { + log.Fatal("Could not find ffprobe") + } + + c.FFmpeg = ffmpeg + c.FFprobe = ffprobe + } + + // Auto-choose tempdir + if c.TempDir == "" { + c.TempDir = os.TempDir() + "/go-vod" + } + + // Print updated config + c.Print() +} + +func (c *Config) Print() { + log.Printf("%+v\n", c) +} diff --git a/go-vod/transcoder/handler.go b/go-vod/transcoder/handler.go new file mode 100644 index 00000000..8c5bb085 --- /dev/null +++ b/go-vod/transcoder/handler.go @@ -0,0 +1,239 @@ +package transcoder + +import ( + "context" + "encoding/json" + "io/ioutil" + "log" + "net/http" + "os" + "strings" + "sync" + "time" +) + +type Handler struct { + c *Config + server *http.Server + managers map[string]*Manager + mutex sync.RWMutex + close chan string + exitCode int +} + +func NewHandler(c *Config) *Handler { + h := &Handler{ + c: c, + managers: make(map[string]*Manager), + close: make(chan string), + exitCode: 0, + } + + // Recreate tempdir + os.RemoveAll(c.TempDir) + os.MkdirAll(c.TempDir, 0755) + + return h +} + +func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + // Check version if monitoring is enabled + if h.c.VersionMonitor && !h.versionOk(w, r) { + return + } + + url := r.URL.Path + parts := make([]string, 0) + + // log.Println("Serving", url) + + // Break url into parts + for _, part := range strings.Split(url, "/") { + if part != "" { + parts = append(parts, part) + } + } + + // Serve actual file from manager + if len(parts) < 3 { + log.Println("Invalid URL", url) + w.WriteHeader(http.StatusBadRequest) + return + } + + // Get streamid and chunk + streamid := parts[0] + path := "/" + strings.Join(parts[1:len(parts)-1], "/") + chunk := parts[len(parts)-1] + + // Check if POST request to create temp file + if r.Method == "POST" && len(parts) >= 2 && parts[1] == "create" { + var err error + path, err = h.createTempFile(w, r, parts) + if err != nil { + return + } + } + + // Check if test request + if chunk == "test" { + w.Header().Set("Content-Type", "application/json") + + // check if test file is readable + size := 0 + info, err := os.Stat(path) + if err == nil { + size = int(info.Size()) + } + + json.NewEncoder(w).Encode(map[string]interface{}{ + "version": h.c.Version, + "size": size, + }) + return + } + + // Check if configuration request + if r.Method == "POST" && chunk == "config" { + w.Header().Set("Content-Type", "application/json") + // read new config + body, err := ioutil.ReadAll(r.Body) + if err != nil { + log.Println("Error reading body", err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + // Unmarshal config + if err := json.Unmarshal(body, h.c); err != nil { + log.Println("Error unmarshaling config", err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + // Set config as loaded + h.c.Configured = true + + // Print loaded config + log.Printf("%+v\n", h.c) + return + } + + // Check if configured + if !h.c.Configured { + w.WriteHeader(http.StatusServiceUnavailable) + return + } + + // Check if valid + if streamid == "" || path == "" { + w.WriteHeader(http.StatusBadRequest) + return + } + + // Get existing manager or create new one + manager := h.getManager(path, streamid) + if manager == nil { + manager = h.createManager(path, streamid) + } + + // Failed to create manager + if manager == nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + + // Serve chunk if asked for + if chunk != "" && chunk != "ignore" { + manager.ServeHTTP(w, r, chunk) + } +} + +func (h *Handler) versionOk(w http.ResponseWriter, r *http.Request) bool { + expected := r.Header.Get("X-Go-Vod-Version") + if len(expected) > 0 && expected != h.c.Version { + log.Println("Version mismatch", expected, h.c.Version) + + // Try again in some time + w.WriteHeader(http.StatusServiceUnavailable) + + // Exit with status code 12 + h.exitCode = 12 + h.Close() + return false + } + + return true +} + +func (h *Handler) getManager(path string, streamid string) *Manager { + h.mutex.RLock() + defer h.mutex.RUnlock() + + m := h.managers[streamid] + if m == nil || m.path != path { + return nil + } + return m +} + +func (h *Handler) createManager(path string, streamid string) *Manager { + manager, err := NewManager(h.c, path, streamid, h.close) + if err != nil { + log.Println("Error creating manager", err) + freeIfTemp(path) + return nil + } + + h.mutex.Lock() + defer h.mutex.Unlock() + + old := h.managers[streamid] + if old != nil { + old.Destroy() + } + + h.managers[streamid] = manager + return manager +} + +func (h *Handler) removeManager(streamid string) { + h.mutex.Lock() + defer h.mutex.Unlock() + delete(h.managers, streamid) +} + +func (h *Handler) Start() int { + log.Println("Starting go-vod " + h.c.Version + " on " + h.c.Bind) + h.server = &http.Server{Addr: h.c.Bind, Handler: h} + + go func() { + err := h.server.ListenAndServe() + if err == http.ErrServerClosed { + log.Println("HTTP server closed") + } else if err != nil { + log.Fatal("Error starting server: ", err) + } + }() + + for { + id := <-h.close + if id == "" { + break + } + h.removeManager(id) + } + + // Stop server + log.Println("Shutting down HTTP server") + ctx, cancel := context.WithDeadline(context.TODO(), time.Now().Add(5*time.Second)) + defer cancel() + h.server.Shutdown(ctx) + + // Return status code + return h.exitCode +} + +func (h *Handler) Close() { + h.close <- "" +} diff --git a/go-vod/transcoder/manager.go b/go-vod/transcoder/manager.go new file mode 100644 index 00000000..7e0e2a60 --- /dev/null +++ b/go-vod/transcoder/manager.go @@ -0,0 +1,375 @@ +package transcoder + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "hash/fnv" + "log" + "math" + "net/http" + "os" + "os/exec" + "sort" + "strconv" + "strings" + "time" +) + +type Manager struct { + c *Config + + path string + tempDir string + id string + close chan string + inactive int + + probe *ProbeVideoData + numChunks int + + streams map[string]*Stream +} + +type ProbeVideoData struct { + Width int + Height int + Duration time.Duration + FrameRate int + CodecName string + BitRate int + Rotation int +} + +func NewManager(c *Config, path string, id string, close chan string) (*Manager, error) { + m := &Manager{c: c, path: path, id: id, close: close} + m.streams = make(map[string]*Stream) + + h := fnv.New32a() + h.Write([]byte(path)) + ph := fmt.Sprint(h.Sum32()) + m.tempDir = fmt.Sprintf("%s/%s-%s", m.c.TempDir, id, ph) + + // Delete temp dir if exists + os.RemoveAll(m.tempDir) + os.MkdirAll(m.tempDir, 0755) + + if err := m.ffprobe(); err != nil { + return nil, err + } + + m.numChunks = int(math.Ceil(m.probe.Duration.Seconds() / float64(c.ChunkSize))) + + // Possible streams + m.streams["480p"] = &Stream{c: c, m: m, quality: "480p", height: 480, width: 854, bitrate: 400} + m.streams["720p"] = &Stream{c: c, m: m, quality: "720p", height: 720, width: 1280, bitrate: 700} + m.streams["1080p"] = &Stream{c: c, m: m, quality: "1080p", height: 1080, width: 1920, bitrate: 1000} + m.streams["1440p"] = &Stream{c: c, m: m, quality: "1440p", height: 1440, width: 2560, bitrate: 1400} + m.streams["2160p"] = &Stream{c: c, m: m, quality: "2160p", height: 2160, width: 3840, bitrate: 3000} + + // height is our primary dimension for scaling + // using the probed size, we adjust the width of the stream + // the smaller dimemension of the output should match the height here + smDim, lgDim := m.probe.Height, m.probe.Width + if m.probe.Height > m.probe.Width { + smDim, lgDim = lgDim, smDim + } + + // Get the reference bitrate. This is the same as the current bitrate + // if the video is H.264, otherwise use double the current bitrate. + refBitrate := int(float64(m.probe.BitRate) / 2.0) + if m.probe.CodecName != CODEC_H264 { + refBitrate *= 2 + } + + // If bitrate could not be read, use 10Mbps + if refBitrate == 0 { + refBitrate = 10000000 + } + + // Get the multiplier for the reference bitrate. + // For this get the nearest stream size to the original. + origPixels := float64(m.probe.Height * m.probe.Width) + nearestPixels := float64(0) + nearestStream := "" + for key, stream := range m.streams { + streamPixels := float64(stream.height * stream.width) + if nearestPixels == 0 || math.Abs(origPixels-streamPixels) < math.Abs(origPixels-nearestPixels) { + nearestPixels = streamPixels + nearestStream = key + } + } + + // Get the bitrate multiplier. This is the ratio of the reference + // bitrate to the nearest stream bitrate, so we can scale all streams. + bitrateMultiplier := 1.0 + if nearestStream != "" { + bitrateMultiplier = float64(refBitrate) / float64(m.streams[nearestStream].bitrate) + } + + // Only keep streams that are smaller than the video + for k, stream := range m.streams { + stream.order = 0 + + // scale bitrate using the multiplier + stream.bitrate = int(math.Ceil(float64(stream.bitrate) * bitrateMultiplier)) + + // now store the width of the stream as the larger dimension + stream.width = int(math.Ceil(float64(lgDim) * float64(stream.height) / float64(smDim))) + + // remove invalid streams + if (stream.height >= smDim || stream.width >= lgDim) || // no upscaling; we're not AI + (float64(stream.bitrate) > float64(m.probe.BitRate)*0.8) || // no more than 80% of original bitrate + (stream.height%2 != 0 || stream.width%2 != 0) { // no odd dimensions + + // remove stream + delete(m.streams, k) + continue + } + } + + // Original stream + m.streams[QUALITY_MAX] = &Stream{ + c: c, m: m, + quality: QUALITY_MAX, + height: m.probe.Height, + width: m.probe.Width, + bitrate: refBitrate, + order: 1, + } + + // Start all streams + for _, stream := range m.streams { + go stream.Run() + } + + log.Printf("%s: new manager for %s", m.id, m.path) + + // Check for inactivity + go func() { + t := time.NewTicker(5 * time.Second) + defer t.Stop() + for { + <-t.C + + if m.inactive == -1 { + t.Stop() + return + } + + m.inactive++ + + // Check if any stream is active + for _, stream := range m.streams { + if stream.coder != nil { + m.inactive = 0 + break + } + } + + // Nothing done for 5 minutes + if m.inactive >= m.c.ManagerIdleTime/5 { + t.Stop() + m.Destroy() + m.close <- m.id + return + } + } + }() + + return m, nil +} + +// Destroys streams. DOES NOT emit on the close channel. +func (m *Manager) Destroy() { + log.Printf("%s: destroying manager", m.id) + m.inactive = -1 + + for _, stream := range m.streams { + stream.Stop() + } + + // Delete temp dir + os.RemoveAll(m.tempDir) + + // Delete file if temp + freeIfTemp(m.path) +} + +func (m *Manager) ServeHTTP(w http.ResponseWriter, r *http.Request, chunk string) error { + // Master list + if chunk == "index.m3u8" { + return m.ServeIndex(w, r) + } + + // Stream list + m3u8Sfx := ".m3u8" + if strings.HasSuffix(chunk, m3u8Sfx) { + quality := strings.TrimSuffix(chunk, m3u8Sfx) + if stream, ok := m.streams[quality]; ok { + return stream.ServeList(w, r) + } + } + + // Stream chunk + tsSfx := ".ts" + if strings.HasSuffix(chunk, tsSfx) { + parts := strings.Split(chunk, "-") + if len(parts) != 2 { + w.WriteHeader(http.StatusBadRequest) + return nil + } + + quality := parts[0] + chunkIdStr := strings.TrimSuffix(parts[1], tsSfx) + chunkId, err := strconv.Atoi(chunkIdStr) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + return nil + } + + if stream, ok := m.streams[quality]; ok { + return stream.ServeChunk(w, chunkId) + } + } + + // Stream full video + mp4Sfx := ".mp4" + if strings.HasSuffix(chunk, mp4Sfx) { + quality := strings.TrimSuffix(chunk, mp4Sfx) + if stream, ok := m.streams[quality]; ok { + return stream.ServeFullVideo(w, r) + } + + // Fall back to original + return m.streams[QUALITY_MAX].ServeFullVideo(w, r) + } + + w.WriteHeader(http.StatusNotFound) + return nil +} + +func (m *Manager) ServeIndex(w http.ResponseWriter, r *http.Request) error { + WriteM3U8ContentType(w) + w.Write([]byte("#EXTM3U\n")) + + // get sorted streams by bitrate + streams := make([]*Stream, 0) + for _, stream := range m.streams { + streams = append(streams, stream) + } + sort.Slice(streams, func(i, j int) bool { + return streams[i].order < streams[j].order || + (streams[i].order == streams[j].order && streams[i].bitrate < streams[j].bitrate) + }) + + // Write all streams + query := GetQueryString(r) + for _, stream := range streams { + s := fmt.Sprintf("#EXT-X-STREAM-INF:BANDWIDTH=%d,RESOLUTION=%dx%d,FRAME-RATE=%d\n%s.m3u8%s\n", stream.bitrate, stream.width, stream.height, m.probe.FrameRate, stream.quality, query) + w.Write([]byte(s)) + } + return nil +} + +func (m *Manager) ffprobe() error { + args := []string{ + // Hide debug information + "-v", "error", + + // Show everything + "-show_entries", "format:stream", + "-select_streams", "v", // Video stream only, we're not interested in audio + + "-of", "json", + m.path, + } + + ctx, cancel := context.WithDeadline(context.TODO(), time.Now().Add(5*time.Second)) + defer cancel() + cmd := exec.CommandContext(ctx, m.c.FFprobe, args...) + + var stdout, stderr bytes.Buffer + cmd.Stdout = &stdout + cmd.Stderr = &stderr + + err := cmd.Run() + if err != nil { + log.Println(stderr.String()) + return err + } + + out := struct { + Streams []struct { + Width int `json:"width"` + Height int `json:"height"` + Duration string `json:"duration"` + FrameRate string `json:"avg_frame_rate"` + CodecName string `json:"codec_name"` + BitRate string `json:"bit_rate"` + SideDataList []struct { + SideDataType string `json:"side_data_type"` + Rotation int `json:"rotation"` + } `json:"side_data_list"` + } `json:"streams"` + Format struct { + Duration string `json:"duration"` + } `json:"format"` + }{} + + if err := json.Unmarshal(stdout.Bytes(), &out); err != nil { + return err + } + + if len(out.Streams) == 0 { + return errors.New("no video streams found") + } + + var duration time.Duration + if out.Streams[0].Duration != "" { + duration, _ = time.ParseDuration(out.Streams[0].Duration + "s") + } else if out.Format.Duration != "" { + duration, _ = time.ParseDuration(out.Format.Duration + "s") + } + + // FrameRate is a fraction string + frac := strings.Split(out.Streams[0].FrameRate, "/") + if len(frac) != 2 { + frac = []string{"30", "1"} + } + num, e1 := strconv.Atoi(frac[0]) + den, e2 := strconv.Atoi(frac[1]) + if e1 != nil || e2 != nil { + num = 30 + den = 1 + } + frameRate := float64(num) / float64(den) + + // BitRate is a string + bitRate, err := strconv.Atoi(out.Streams[0].BitRate) + if err != nil { + bitRate = 5000000 + } + + // Get rotation from side data + rotation := 0 + for _, sideData := range out.Streams[0].SideDataList { + if sideData.SideDataType == "Display Matrix" { + rotation = sideData.Rotation + } + } + + m.probe = &ProbeVideoData{ + Width: out.Streams[0].Width, + Height: out.Streams[0].Height, + Duration: duration, + FrameRate: int(frameRate), + CodecName: out.Streams[0].CodecName, + BitRate: bitRate, + Rotation: rotation, + } + + return nil +} diff --git a/go-vod/transcoder/stream.go b/go-vod/transcoder/stream.go new file mode 100644 index 00000000..721f071d --- /dev/null +++ b/go-vod/transcoder/stream.go @@ -0,0 +1,704 @@ +package transcoder + +import ( + "bufio" + "fmt" + "io" + "log" + "net/http" + "os" + "os/exec" + "strconv" + "strings" + "sync" + "syscall" + "time" +) + +const ( + ENCODER_COPY = "copy" + ENCODER_X264 = "libx264" + ENCODER_VAAPI = "h264_vaapi" + ENCODER_NVENC = "h264_nvenc" + + QUALITY_MAX = "max" + CODEC_H264 = "h264" +) + +type Stream struct { + c *Config + m *Manager + quality string + order int + height int + width int + bitrate int + + goal int + + mutex sync.Mutex + chunks map[int]*Chunk + seenChunks map[int]bool // only for stdout reader + + coder *exec.Cmd + + inactive int + stop chan bool +} + +func (s *Stream) Run() { + // run every 5s + t := time.NewTicker(5 * time.Second) + defer t.Stop() + + s.stop = make(chan bool) + + for { + select { + case <-t.C: + s.mutex.Lock() + // Prune chunks + for id := range s.chunks { + if id < s.goal-s.c.GoalBufferMax { + s.pruneChunk(id) + } + } + + s.inactive++ + + // Nothing done for 2 minutes + if s.inactive >= s.c.StreamIdleTime/5 && s.coder != nil { + t.Stop() + s.clear() + } + s.mutex.Unlock() + + case <-s.stop: + t.Stop() + s.mutex.Lock() + s.clear() + s.mutex.Unlock() + return + } + } +} + +func (s *Stream) clear() { + log.Printf("%s-%s: stopping stream", s.m.id, s.quality) + + for _, chunk := range s.chunks { + // Delete files + s.pruneChunk(chunk.id) + } + + s.chunks = make(map[int]*Chunk) + s.seenChunks = make(map[int]bool) + s.goal = 0 + + if s.coder != nil { + s.coder.Process.Kill() + s.coder.Wait() + s.coder = nil + } +} + +func (s *Stream) Stop() { + select { + case s.stop <- true: + default: + } +} + +func (s *Stream) ServeList(w http.ResponseWriter, r *http.Request) error { + WriteM3U8ContentType(w) + w.Write([]byte("#EXTM3U\n")) + w.Write([]byte("#EXT-X-VERSION:4\n")) + w.Write([]byte("#EXT-X-MEDIA-SEQUENCE:0\n")) + w.Write([]byte("#EXT-X-PLAYLIST-TYPE:VOD\n")) + w.Write([]byte(fmt.Sprintf("#EXT-X-TARGETDURATION:%d\n", s.c.ChunkSize))) + + query := GetQueryString(r) + + duration := s.m.probe.Duration.Seconds() + i := 0 + for duration > 0 { + size := float64(s.c.ChunkSize) + if duration < size { + size = duration + } + + w.Write([]byte(fmt.Sprintf("#EXTINF:%.3f, nodesc\n", size))) + w.Write([]byte(fmt.Sprintf("%s-%06d.ts%s\n", s.quality, i, query))) + + duration -= float64(s.c.ChunkSize) + i++ + } + + w.Write([]byte("#EXT-X-ENDLIST\n")) + + return nil +} + +func (s *Stream) ServeChunk(w http.ResponseWriter, id int) error { + s.mutex.Lock() + defer s.mutex.Unlock() + + s.inactive = 0 + s.checkGoal(id) + + // Already have this chunk + if chunk, ok := s.chunks[id]; ok { + // Chunk is finished, just return it + if chunk.done { + s.returnChunk(w, chunk) + return nil + } + + // Still waiting on transcoder + s.waitForChunk(w, chunk) + return nil + } + + // Will have this soon enough + foundBehind := false + for i := id - 1; i > id-s.c.LookBehind && i >= 0; i-- { + if _, ok := s.chunks[i]; ok { + foundBehind = true + } + } + if foundBehind { + // Make sure the chunk exists + chunk := s.createChunk(id) + + // Wait for it + s.waitForChunk(w, chunk) + return nil + } + + // Let's start over + s.restartAtChunk(w, id) + return nil +} + +func (s *Stream) ServeFullVideo(w http.ResponseWriter, r *http.Request) error { + args := s.transcodeArgs(0, false) + + if s.m.probe.CodecName == CODEC_H264 && s.quality == QUALITY_MAX { + // try to just send the original file + http.ServeFile(w, r, s.m.path) + return nil + } + + // Output mov + args = append(args, []string{ + "-movflags", "frag_keyframe+empty_moov+faststart", + "-f", "mp4", "pipe:1", + }...) + + coder := exec.Command(s.c.FFmpeg, args...) + log.Printf("%s-%s: %s", s.m.id, s.quality, strings.Join(coder.Args[:], " ")) + + cmdStdOut, err := coder.StdoutPipe() + if err != nil { + log.Printf("FATAL: ffmpeg command stdout failed with %s\n", err) + } + + cmdStdErr, err := coder.StderrPipe() + if err != nil { + log.Printf("FATAL: ffmpeg command stdout failed with %s\n", err) + } + + err = coder.Start() + if err != nil { + log.Printf("FATAL: ffmpeg command failed with %s\n", err) + } + go s.monitorStderr(cmdStdErr) + + // Write to response + defer cmdStdOut.Close() + stdoutReader := bufio.NewReader(cmdStdOut) + + // Write mov headers + w.Header().Set("Content-Type", "video/mp4") + w.WriteHeader(http.StatusOK) + flusher, ok := w.(http.Flusher) + if !ok { + http.Error(w, "Server does not support Flusher!", + http.StatusInternalServerError) + return nil + } + + // Write data, flusing every 1MB + buf := make([]byte, 1024*1024) + for { + n, err := stdoutReader.Read(buf) + if err != nil { + if err == io.EOF { + break + } + log.Printf("FATAL: ffmpeg command failed with %s\n", err) + break + } + + _, err = w.Write(buf[:n]) + if err != nil { + log.Printf("%s-%s: client closed connection", s.m.id, s.quality) + log.Println(err) + break + } + flusher.Flush() + } + + // Terminate ffmpeg process + coder.Process.Kill() + coder.Wait() + + return nil +} + +func (s *Stream) createChunk(id int) *Chunk { + if c, ok := s.chunks[id]; ok { + return c + } else { + s.chunks[id] = NewChunk(id) + return s.chunks[id] + } +} + +func (s *Stream) pruneChunk(id int) { + delete(s.chunks, id) + + // Remove file + filename := s.getTsPath(id) + os.Remove(filename) +} + +func (s *Stream) returnChunk(w http.ResponseWriter, chunk *Chunk) { + // This function is called with lock, but we don't need it + s.mutex.Unlock() + defer s.mutex.Lock() + + // Read file and write to response + filename := s.getTsPath(chunk.id) + f, err := os.Open(filename) + if err != nil { + log.Println(err) + w.WriteHeader(http.StatusInternalServerError) + return + } + defer f.Close() + w.Header().Set("Content-Type", "video/MP2T") + io.Copy(w, f) +} + +func (s *Stream) waitForChunk(w http.ResponseWriter, chunk *Chunk) { + if chunk.done { + s.returnChunk(w, chunk) + return + } + + // Add our channel + notif := make(chan bool) + chunk.notifs = append(chunk.notifs, notif) + t := time.NewTimer(10 * time.Second) + coder := s.coder + + s.mutex.Unlock() + + select { + case <-notif: + t.Stop() + case <-t.C: + } + + s.mutex.Lock() + + // remove channel + for i, c := range chunk.notifs { + if c == notif { + chunk.notifs = append(chunk.notifs[:i], chunk.notifs[i+1:]...) + break + } + } + + // check for success + if chunk.done { + s.returnChunk(w, chunk) + return + } + + // Check if coder was changed + if coder != s.coder { + w.WriteHeader(http.StatusConflict) + return + } + + // Return timeout error + w.WriteHeader(http.StatusRequestTimeout) +} + +func (s *Stream) restartAtChunk(w http.ResponseWriter, id int) { + // Stop current transcoder + s.clear() + + chunk := s.createChunk(id) // create first chunk + + // Start the transcoder + s.goal = id + s.c.GoalBufferMax + s.transcode(id) + + s.waitForChunk(w, chunk) // this is also a request +} + +// Get arguments to ffmpeg +func (s *Stream) transcodeArgs(startAt float64, isHls bool) []string { + args := []string{ + "-loglevel", "warning", + } + + if startAt > 0 { + args = append(args, []string{ + "-ss", fmt.Sprintf("%.6f", startAt), + }...) + } + + // encoder selection + CV := ENCODER_X264 + + // Check whether hwaccel should be used + if s.c.VAAPI { + CV = ENCODER_VAAPI + extra := "-hwaccel vaapi -hwaccel_device /dev/dri/renderD128 -hwaccel_output_format vaapi" + args = append(args, strings.Split(extra, " ")...) + } else if s.c.NVENC { + CV = ENCODER_NVENC + extra := "-hwaccel cuda" + args = append(args, strings.Split(extra, " ")...) + } + + // Disable autorotation (see transpose comments below) + if s.c.UseTranspose { + args = append(args, []string{"-noautorotate"}...) + } + + // Input specs + args = append(args, []string{ + "-i", s.m.path, // Input file + "-copyts", // So the "-to" refers to the original TS + "-fflags", "+genpts", + }...) + + // Filters + format := "format=nv12" + scaler := "scale" + scalerArgs := make([]string, 0) + scalerArgs = append(scalerArgs, "force_original_aspect_ratio=decrease") + + if CV == ENCODER_VAAPI { + format = "format=nv12|vaapi,hwupload" + scaler = "scale_vaapi" + scalerArgs = append(scalerArgs, "format=nv12") + } else if CV == ENCODER_NVENC { + format = "format=nv12|cuda,hwupload" + scaler = fmt.Sprintf("scale_%s", s.c.NVENCScale) + + // workaround to force scale_cuda to examine all input frames + if s.c.NVENCScale == "cuda" { + scalerArgs = append(scalerArgs, "passthrough=0") + } + } + + // Scale height and width if not max quality + if s.quality != QUALITY_MAX { + maxDim := s.height + if s.width > s.height { + maxDim = s.width + } + + scalerArgs = append(scalerArgs, fmt.Sprintf("w=%d", maxDim)) + scalerArgs = append(scalerArgs, fmt.Sprintf("h=%d", maxDim)) + } + + // Apply filter + if CV != ENCODER_COPY { + filter := fmt.Sprintf("%s,%s=%s", format, scaler, strings.Join(scalerArgs, ":")) + + // Rotation is a mess: https://trac.ffmpeg.org/ticket/8329 + // 1/ -noautorotate copies the sidecar metadata to the output + // 2/ autorotation doesn't seem to work with some types of HW (at least not with VAAPI) + // 3/ autorotation doesn't work with HLS streams + // 4/ VAAPI cannot transport on AMD GPUs + // So: give the user to disable autorotation for HLS and use a manual transpose + if isHls && s.c.UseTranspose { + transposer := "transpose" + if CV == ENCODER_VAAPI { + transposer = "transpose_vaapi" + } else if CV == ENCODER_NVENC { + transposer = fmt.Sprintf("transpose_%s", s.c.NVENCScale) + } + + if transposer != "transpose_cuda" { // does not exist + if s.m.probe.Rotation == -90 { + filter = fmt.Sprintf("%s,%s=1", filter, transposer) + } else if s.m.probe.Rotation == 90 { + filter = fmt.Sprintf("%s,%s=2", filter, transposer) + } else if s.m.probe.Rotation == 180 || s.m.probe.Rotation == -180 { + filter = fmt.Sprintf("%s,%s=1,%s=1", filter, transposer, transposer) + } + } + } + + args = append(args, []string{"-vf", filter}...) + } + + // Output specs for video + args = append(args, []string{ + "-map", "0:v:0", + "-c:v", CV, + }...) + + // Device specific output args + if CV == ENCODER_VAAPI { + args = append(args, []string{"-global_quality", fmt.Sprintf("%d", s.c.QF)}...) + + if s.c.VAAPILowPower { + args = append(args, []string{"-low_power", "1"}...) + } + } else if CV == ENCODER_NVENC { + args = append(args, []string{ + "-preset", "p6", + "-tune", "ll", + "-rc", "vbr", + "-rc-lookahead", "30", + "-cq", fmt.Sprintf("%d", s.c.QF), + }...) + + if s.c.NVENCTemporalAQ { + args = append(args, []string{"-temporal-aq", "1"}...) + } + } else if CV == ENCODER_X264 { + args = append(args, []string{ + "-preset", "faster", + "-crf", fmt.Sprintf("%d", s.c.QF), + }...) + } + + // Audio output specs + args = append(args, []string{ + "-map", "0:a:0?", + "-c:a", "aac", + "-ac", "1", + }...) + + return args +} + +func (s *Stream) transcode(startId int) { + if startId > 0 { + // Start one frame before + // This ensures that the keyframes are aligned + startId-- + } + startAt := float64(startId * s.c.ChunkSize) + + args := s.transcodeArgs(startAt, true) + + // Segmenting specs + args = append(args, []string{ + "-start_number", fmt.Sprintf("%d", startId), + "-avoid_negative_ts", "disabled", + "-f", "hls", + "-hls_flags", "split_by_time", + "-hls_time", fmt.Sprintf("%d", s.c.ChunkSize), + "-hls_segment_type", "mpegts", + "-hls_segment_filename", s.getTsPath(-1), + }...) + + // Keyframe specs + if s.c.UseGopSize && s.m.probe.FrameRate > 0 { + // Fix GOP size + args = append(args, []string{ + "-g", fmt.Sprintf("%d", s.c.ChunkSize*s.m.probe.FrameRate), + "-keyint_min", fmt.Sprintf("%d", s.c.ChunkSize*s.m.probe.FrameRate), + }...) + } else { + // Force keyframes every chunk + args = append(args, []string{ + "-force_key_frames", fmt.Sprintf("expr:gte(t,n_forced*%d)", s.c.ChunkSize), + }...) + } + + // Output to stdout + args = append(args, "-") + + // Start the process + s.coder = exec.Command(s.c.FFmpeg, args...) + + // Log command, quoting the args as needed + quotedArgs := make([]string, len(s.coder.Args)) + invalidChars := strings.Join([]string{" ", "=", ":", "\"", "\\", "\n", "\t"}, "") + for i, arg := range s.coder.Args { + if strings.ContainsAny(arg, invalidChars) { + quotedArgs[i] = fmt.Sprintf("\"%s\"", arg) + } else { + quotedArgs[i] = arg + } + } + log.Printf("%s-%s: %s", s.m.id, s.quality, strings.Join(quotedArgs[:], " ")) + + cmdStdOut, err := s.coder.StdoutPipe() + if err != nil { + log.Printf("FATAL: ffmpeg command stdout failed with %s\n", err) + } + + cmdStdErr, err := s.coder.StderrPipe() + if err != nil { + log.Printf("FATAL: ffmpeg command stdout failed with %s\n", err) + } + + err = s.coder.Start() + if err != nil { + log.Printf("FATAL: ffmpeg command failed with %s\n", err) + } + + go s.monitorTranscodeOutput(cmdStdOut, startAt) + go s.monitorStderr(cmdStdErr) + go s.monitorExit() +} + +func (s *Stream) checkGoal(id int) { + goal := id + s.c.GoalBufferMin + if goal > s.goal { + s.goal = id + s.c.GoalBufferMax + + // resume encoding + if s.coder != nil { + log.Printf("%s-%s: resuming transcoding", s.m.id, s.quality) + s.coder.Process.Signal(syscall.SIGCONT) + } + } +} + +func (s *Stream) getTsPath(id int) string { + if id == -1 { + return fmt.Sprintf("%s/%s-%%06d.ts", s.m.tempDir, s.quality) + } + return fmt.Sprintf("%s/%s-%06d.ts", s.m.tempDir, s.quality, id) +} + +// Separate goroutine +func (s *Stream) monitorTranscodeOutput(cmdStdOut io.ReadCloser, startAt float64) { + s.mutex.Lock() + coder := s.coder + s.mutex.Unlock() + + defer cmdStdOut.Close() + stdoutReader := bufio.NewReader(cmdStdOut) + + for { + if s.coder != coder { + break + } + + line, err := stdoutReader.ReadBytes('\n') + if err == io.EOF { + if len(line) == 0 { + break + } + } else if err != nil { + log.Println(err) + break + } else { + line = line[:(len(line) - 1)] + } + + l := string(line) + + if strings.Contains(l, ".ts") { + // 1080p-000003.ts + idx := strings.Split(strings.Split(l, "-")[1], ".")[0] + id, err := strconv.Atoi(idx) + if err != nil { + log.Println("Error parsing chunk id") + } + + if s.seenChunks[id] { + continue + } + s.seenChunks[id] = true + + // Debug + log.Printf("%s-%s: recv %s", s.m.id, s.quality, l) + + func() { + s.mutex.Lock() + defer s.mutex.Unlock() + + // The coder has changed; do nothing + if s.coder != coder { + return + } + + // Notify everyone + chunk := s.createChunk(id) + if chunk.done { + return + } + chunk.done = true + for _, n := range chunk.notifs { + n <- true + } + + // Check goal satisfied + if id >= s.goal { + log.Printf("%s-%s: goal satisfied: %d", s.m.id, s.quality, s.goal) + s.coder.Process.Signal(syscall.SIGSTOP) + } + }() + } + } +} + +func (s *Stream) monitorStderr(cmdStdErr io.ReadCloser) { + stderrReader := bufio.NewReader(cmdStdErr) + + for { + line, err := stderrReader.ReadBytes('\n') + if err == io.EOF { + if len(line) == 0 { + break + } + } else if err != nil { + log.Println(err) + break + } else { + line = line[:(len(line) - 1)] + } + log.Println("ffmpeg-error:", string(line)) + } +} + +func (s *Stream) monitorExit() { + // Join the process + coder := s.coder + err := coder.Wait() + + // Try to get exit status + if exitError, ok := err.(*exec.ExitError); ok { + exitcode := exitError.ExitCode() + log.Printf("%s-%s: ffmpeg exited with status: %d", s.m.id, s.quality, exitcode) + + s.mutex.Lock() + defer s.mutex.Unlock() + + // If error code is >0, there was an error in transcoding + if exitcode > 0 && s.coder == coder { + // Notify all outstanding chunks + for _, chunk := range s.chunks { + for _, n := range chunk.notifs { + n <- true + } + } + } + } +} diff --git a/go-vod/transcoder/temp.go b/go-vod/transcoder/temp.go new file mode 100644 index 00000000..08c53733 --- /dev/null +++ b/go-vod/transcoder/temp.go @@ -0,0 +1,49 @@ +package transcoder + +import ( + "encoding/json" + "io/ioutil" + "log" + "net/http" + "os" + "strings" +) + +func (h *Handler) createTempFile(w http.ResponseWriter, r *http.Request, parts []string) (string, error) { + streamid := parts[0] + body, err := ioutil.ReadAll(r.Body) + if err != nil { + log.Println("Error reading body", err) + w.WriteHeader(http.StatusInternalServerError) + return "", err + } + + // Create temporary file + file, err := ioutil.TempFile(h.c.TempDir, streamid+"-govod-temp-") + if err != nil { + log.Println("Error creating temp file", err) + w.WriteHeader(http.StatusInternalServerError) + return "", err + } + defer file.Close() + + // Write data to file + if _, err := file.Write(body); err != nil { + log.Println("Error writing to temp file", err) + w.WriteHeader(http.StatusInternalServerError) + return "", err + } + + // Return full path to file in JSON + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(map[string]string{"path": file.Name()}) + + // Return path to file + return file.Name(), nil +} + +func freeIfTemp(path string) { + if strings.Contains(path, "-govod-temp-") { + os.Remove(path) + } +} diff --git a/go-vod/transcoder/util.go b/go-vod/transcoder/util.go new file mode 100644 index 00000000..c2d7940f --- /dev/null +++ b/go-vod/transcoder/util.go @@ -0,0 +1,15 @@ +package transcoder + +import "net/http" + +func GetQueryString(r *http.Request) string { + query := r.URL.Query().Encode() + if query != "" { + query = "?" + query + } + return query +} + +func WriteM3U8ContentType(w http.ResponseWriter) { + w.Header().Set("Content-Type", "application/x-mpegURL") +}