Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: audio #79

Open
wants to merge 1 commit into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions constants/ffmpeg.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,19 @@ package constants
type FILE_TYPE int

const (
Audio192K FILE_TYPE = iota
ImagePng
ImagePng FILE_TYPE = iota
Video5M
Video3M
Video1M
Video800K
Video400K
)

const (
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets keep this const in the upper one, else iota value starts at 0, so ImagePng and Audio192K will have same value

Audio192K FILE_TYPE = iota
Audio128K
)

type FFMPEG_KWARGS int

const (
Expand Down Expand Up @@ -52,6 +56,7 @@ const Scale = "scale"

var AudioFileTypeMap = map[FILE_TYPE]string{
Audio192K: "_audio192k.m4a",
Audio128K: "_audio128k.m4a",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

m4a is supported only for dash protocol

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let discuss on this

}

var ImageFileTypeMap = map[FILE_TYPE]string{
Expand All @@ -68,6 +73,7 @@ var VideoFileTypeMap = map[FILE_TYPE]string{

var AudioBitrateMap = map[FILE_TYPE]string{
Audio192K: "192k",
Audio128K: "128k",
}

var VideoBitrateMap = map[FILE_TYPE]string{
Expand Down
31 changes: 31 additions & 0 deletions controllers/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package controllers

import (
"encoding/json"
"log"
"net/http"
"zestream-server/configs"
"zestream-server/types"
Expand All @@ -26,6 +27,36 @@ func (*Process) Video(c *gin.Context) {
return
}

log.Println("request", request)

_, channel, queue, ctx, _ := configs.GetRabbitMQ()

err = utils.PublishEvent(channel, queue, *ctx, jsonBytes)

if err != nil {
c.JSON(http.StatusExpectationFailed, gin.H{"error": err.Error()})
return
}

c.JSON(http.StatusCreated, gin.H{"status": "success"})
}

func (*Process) Audio(c *gin.Context) {
var request types.Audio

if err := c.ShouldBindJSON(&request); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}

jsonBytes, err := json.Marshal(request)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}

log.Println("request", request)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: printing request twice


_, channel, queue, ctx, _ := configs.GetRabbitMQ()

err = utils.PublishEvent(channel, queue, *ctx, jsonBytes)
Expand Down
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func main() {
conn, ch, q, _, cancel := configs.InitRabbitMQ()

if *isConsumer {
service.VideoProcessConsumer(ch, q)
service.ProcessConsumer(ch, q)
return
}

Expand Down
1 change: 1 addition & 0 deletions routes/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func Init() *gin.Engine {

// /api/v1
apiV1.POST("video/process", process.Video)
apiV1.POST("audio/process", process.Audio)
apiV1.GET("url/presigned", controllers.GetPresignedURL)

return r
Expand Down
63 changes: 51 additions & 12 deletions service/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
rmq "github.com/rabbitmq/amqp091-go"
)

func VideoProcessConsumer(ch *rmq.Channel, q *rmq.Queue) {
func ProcessConsumer(ch *rmq.Channel, q *rmq.Queue) {
log.Println("Running ZeStream as Consumer")
var forever chan struct{}

Expand All @@ -30,13 +30,13 @@ func VideoProcessConsumer(ch *rmq.Channel, q *rmq.Queue) {
utils.LogErr(err)

msgs, err := ch.Consume(
q.Name, // queue
"VideoProcessConsumer", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
q.Name, // queue
"ProcessConsumer", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)

if err != nil {
Expand All @@ -50,14 +50,27 @@ func VideoProcessConsumer(ch *rmq.Channel, q *rmq.Queue) {
guard <- 1

var video types.Video
videoErr := json.Unmarshal(d.Body, &video)
if videoErr != nil {
log.Println(videoErr)
continue
}

err := json.Unmarshal(d.Body, &video)
if video.Type == "mp4" {
go processVideo(&video, guard)
continue
}

var audio types.Audio
audioErr := json.Unmarshal(d.Body, &audio)
if err != nil {
log.Println(err)
log.Println(audioErr)
continue
}

go processVideo(&video, guard)
if audio.Type == "mp3" {
go processAudio(&audio, guard)
}
}
}()

Expand All @@ -81,7 +94,7 @@ func processVideo(video *types.Video, guard <-chan int) {
utils.LogErr(err)
}

generateDash(videoFileName, video.Watermark)
generateVideoDash(videoFileName, video.Watermark)

uploader := utils.GetUploader(constants.CloudContainerNames[constants.Dashes], video.ID)

Expand All @@ -95,3 +108,29 @@ func processVideo(video *types.Video, guard <-chan int) {

<-guard
}

func processAudio(audio *types.Audio, guard <-chan int) {
log.Println("Processing Audio: ", audio)

var fileName = audio.ID + "." + audio.Type

err := utils.Fetch(audio.Src, fileName)
if err != nil {
utils.LogErr(err)
return
}

generateAudioDash(fileName)

uploader := utils.GetUploader(constants.CloudContainerNames[constants.Dashes], audio.ID)

outputDir, err := utils.GetOutputFilePathName(fileName, "")
utils.LogErr(err)

utils.UploadToCloudStorage(uploader, outputDir)

err = os.RemoveAll(outputDir)
utils.LogErr(err)

<-guard
}
28 changes: 27 additions & 1 deletion service/dash.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
ffmpeg "github.com/u2takey/ffmpeg-go"
)

func generateDash(fileName string, watermark types.WaterMark) {
func generateVideoDash(fileName string, watermark types.WaterMark) {
targetFile, err := utils.GetDownloadFilePathName(fileName)
if err != nil {
log.Println(err)
Expand Down Expand Up @@ -45,6 +45,32 @@ func generateDash(fileName string, watermark types.WaterMark) {
utils.DeleteFile(targetFile)
}

func generateAudioDash(fileName string) {
targetFile, err := utils.GetDownloadFilePathName(fileName)
if err != nil {
log.Println(err)
}

var fileNameStripped = utils.RemoveExtensionFromFile(fileName)

outputPath, err := utils.GetOutputFilePathName(fileName, fileNameStripped)
if err != nil {
log.Println(err)
return
}

var wg sync.WaitGroup

wg.Add(len(constants.AudioFileTypeMap))

go generateAudioFiles(targetFile, outputPath, &wg)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets connect once to test


wg.Wait()

deleteVideoFiles(outputPath)
utils.DeleteFile(targetFile)
}

func generateAudioFiles(targetFile string, outputPath string, wg *sync.WaitGroup) {
for fileType, filePrefix := range constants.AudioFileTypeMap {
var outputFile = outputPath + filePrefix
Expand Down
14 changes: 11 additions & 3 deletions types/video.go → types/media.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
package types

type Media struct {
ID string `json:"id" binding:"required"`
Src string `json:"src" binding:"required,url"`
Type string `json:"type" binding:"required"`
}

type Video struct {
ID string `json:"id" binding:"required"`
Src string `json:"src" binding:"required,url"`
Type string `json:"type" binding:"required"`
Media
Watermark WaterMark `json:"watermark"`
}

type Audio struct {
Media
}

type WaterMark struct {
ID string `json:"id" binding:"required_if=Watermark 1"`
Src string `json:"src" binding:"required_if=Watermark 1"`
Expand Down
2 changes: 1 addition & 1 deletion utils/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ Fetch downlods a file to downloads folder from the given url,
and names it as given fileName
*/
func Fetch(url string, fileName string) error {
log.Println("Downloading Video: ", url)
log.Println("Downloading: ", url)

newFileName, err := GetDownloadFilePathName(fileName)
if err != nil {
Expand Down
Loading