commit 8b36733e366d1375dde944ad524f3b73813caaee Author: Varun Patil Date: Thu Nov 10 03:24:33 2022 -0800 Initial commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 00000000..5543a277 --- /dev/null +++ b/.gitignore @@ -0,0 +1,22 @@ +# If you prefer the allow list template instead of the deny list, see community template: +# https://github.com/github/gitignore/blob/main/community/Golang/Go.AllowList.gitignore +# +# Binaries for programs and plugins +*.exe +*.exe~ +*.dll +*.so +*.dylib +go-vod + +# Test binary, built with `go test -c` +*.test + +# Output of the go coverage tool, specifically when used with LiteIDE +*.out + +# Dependency directories (remove the comment below to include it) +# vendor/ + +# Go workspace file +go.work diff --git a/go.mod b/go.mod new file mode 100644 index 00000000..3b63aa0f --- /dev/null +++ b/go.mod @@ -0,0 +1,3 @@ +module github.com/pulsejet/go-vod + +go 1.16 diff --git a/main.go b/main.go new file mode 100644 index 00000000..7869b255 --- /dev/null +++ b/main.go @@ -0,0 +1,100 @@ +package main + +import ( + "log" + "net/http" + "strings" + "sync" +) + +type Handler struct { + managers map[string]*Manager + mutex sync.RWMutex + close chan string +} + +func NewHandler() *Handler { + h := &Handler{managers: make(map[string]*Manager), close: make(chan string)} + go h.watchClose() + return h +} + +func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + url := r.URL.Path + parts := make([]string, 0) + + for _, part := range strings.Split(url, "/") { + if part != "" { + parts = append(parts, part) + } + } + + if len(parts) != 3 { + log.Println("Invalid URL", url, len(parts)) + w.WriteHeader(http.StatusBadRequest) + return + } + + path := parts[0] + streamid := parts[1] + chunk := parts[2] + + if streamid == "" || chunk == "" || path == "" { + w.WriteHeader(http.StatusBadRequest) + return + } + + manager := h.getManager(streamid) + if manager == nil { + manager = h.createManager(path, streamid) + } + manager.ServeHTTP(w, r, chunk) +} + +func (h *Handler) getManager(streamid string) *Manager { + h.mutex.RLock() + defer h.mutex.RUnlock() + return h.managers[streamid] +} + +func (h *Handler) createManager(path string, streamid string) *Manager { + h.mutex.Lock() + defer h.mutex.Unlock() + manager := NewManager(path, streamid, h.close) + h.managers[streamid] = manager + return manager +} + +func (h *Handler) removeManager(streamid string) { + h.mutex.Lock() + defer h.mutex.Unlock() + delete(h.managers, streamid) +} + +func (h *Handler) watchClose() { + for { + id := <-h.close + if id == "" { + return + } + + log.Println("Closing stream", id) + h.removeManager(id) + } +} + +func (h *Handler) Close() { + h.close <- "" +} + +func main() { + log.Println("Starting VOD server") + + h := NewHandler() + + http.Handle("/", h) + http.ListenAndServe(":47788", nil) + + log.Println("Exiting VOD server") + h.Close() +} diff --git a/manager.go b/manager.go new file mode 100644 index 00000000..7ea4bf84 --- /dev/null +++ b/manager.go @@ -0,0 +1,25 @@ +package main + +import ( + "fmt" + "net/http" +) + +type Manager struct { + path string + id string + close chan string +} + +func NewManager(path string, id string, close chan string) *Manager { + m := &Manager{path: path, id: id, close: close} + return m +} + +func (m *Manager) ServeHTTP(w http.ResponseWriter, r *http.Request, chunk string) { + fmt.Println("Manager.ServeHTTP", m.id, chunk) + w.Write([]byte("Hello, world!")) + w.Write([]byte(chunk)) + w.Write([]byte(m.id)) + +}