diff --git a/e2e/e2e_helpers_test.go b/e2e/e2e_helpers_test.go new file mode 100644 index 0000000..422eb11 --- /dev/null +++ b/e2e/e2e_helpers_test.go @@ -0,0 +1,312 @@ +//go:build e2e +// +build e2e + +package e2e + +import ( + "net/http" + "os" + "strings" + "testing" + + "github.com/Eyevinn/dash-mpd/mpd" + common "github.com/adwski/vidi/internal/api/model" + "github.com/adwski/vidi/internal/api/user/model" + video "github.com/adwski/vidi/internal/api/video/model" + "github.com/adwski/vidi/internal/mp4" + "github.com/go-resty/resty/v2" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const ( + endpointUserLogin = "http://localhost:18081/api/user/login" + endpointUserRegister = "http://localhost:18081/api/user/register" + endpointVideo = "http://localhost:18082/api/video/user/" +) + +func userRegister(t *testing.T, user *model.UserRequest) *http.Cookie { + t.Helper() + + resp, body := makeCommonRequest(t, endpointUserRegister, user) + require.True(t, resp.IsSuccess()) + require.Empty(t, body.Error) + require.Equal(t, "registration complete", body.Message) + return getCookieWithToken(t, resp.Cookies()) +} + +func userLogin(t *testing.T, user *model.UserRequest) *http.Cookie { + t.Helper() + + resp, body := makeCommonRequest(t, endpointUserLogin, user) + require.True(t, resp.IsSuccess()) + require.Empty(t, body.Error) + require.Equal(t, "login ok", body.Message) + return getCookieWithToken(t, resp.Cookies()) +} + +func userLoginFail(t *testing.T, user *model.UserRequest) { + t.Helper() + + resp, body := makeCommonRequest(t, endpointUserLogin, user) + require.Truef(t, resp.IsError(), "user should not exist") + require.Empty(t, body.Message) + require.NotEmpty(t, body.Error) +} + +func makeCommonRequest(t *testing.T, url string, reqBody interface{}) (*resty.Response, *common.Response) { + t.Helper() + + var ( + body common.Response + ) + resp, err := resty.New().R().SetHeader("Accept", "application/json"). + SetError(&body). + SetResult(&body). + SetBody(reqBody).Post(url) + require.NoError(t, err) + return resp, &body +} + +func getCookieWithToken(t *testing.T, cookies []*http.Cookie) *http.Cookie { + t.Helper() + + var ( + userCookie *http.Cookie + ) + for _, cookie := range cookies { + if cookie.Name == "vidiSessID" { + userCookie = cookie + break + } + } + require.NotNilf(t, userCookie, "cookie should exist") + require.NotEmpty(t, userCookie.Value, "cookie should not be empty") + return userCookie +} + +func videoWatch(t *testing.T, userCookie *http.Cookie, v *video.Response) *video.WatchResponse { + t.Helper() + + var ( + errBody common.Response + watchBody video.WatchResponse + ) + resp, err := resty.New().R().SetHeader("Accept", "application/json"). + SetError(&errBody). + SetCookie(userCookie). + SetResult(&watchBody).Post(endpointVideo + v.ID + "/watch") + require.NoError(t, err) + require.True(t, resp.IsSuccess()) + require.Equal(t, http.StatusAccepted, resp.StatusCode()) + require.Empty(t, errBody.Error) + require.NotEmpty(t, watchBody.WatchURL) + + return &watchBody +} + +func videoUpload(t *testing.T, url string) { + t.Helper() + + f, errF := os.Open("../testFiles/test_seq_h264_high.mp4") + require.NoError(t, errF) + + resp, err := resty.New().R(). + SetHeader("Content-Type", "video/mp4"). + SetBody(f).Post(url) + require.NoError(t, err) + require.True(t, resp.IsSuccess()) + require.Equal(t, http.StatusNoContent, resp.StatusCode()) +} + +func videoDelete(t *testing.T, userCookie *http.Cookie, id string) { + t.Helper() + + var ( + body common.Response + ) + resp, err := resty.New().R().SetHeader("Accept", "application/json"). + SetError(&body). + SetCookie(userCookie). + SetResult(&body).Delete(endpointVideo + id) + require.NoError(t, err) + require.True(t, resp.IsSuccess()) + require.Equal(t, http.StatusOK, resp.StatusCode()) + require.Empty(t, body.Error) + require.Equal(t, "ok", body.Message) +} + +func videoGet(t *testing.T, userCookie *http.Cookie, id string) *video.Response { + t.Helper() + + var ( + videoBody video.Response + errBody common.Response + ) + resp, err := resty.New().R().SetHeader("Accept", "application/json"). + SetError(&errBody). + SetCookie(userCookie). + SetResult(&videoBody).Get(endpointVideo + id) + require.NoError(t, err) + require.True(t, resp.IsSuccess()) + require.Equal(t, http.StatusOK, resp.StatusCode()) + require.Empty(t, errBody.Error) + require.NotEmpty(t, videoBody.CreatedAt) + require.NotEmpty(t, videoBody.ID) + + return &videoBody +} + +func videoGetAll(t *testing.T, userCookie *http.Cookie) []*video.Response { + t.Helper() + + var ( + videoBody = make([]*video.Response, 0) + errBody common.Response + ) + resp, err := resty.New().R().SetHeader("Accept", "application/json"). + SetError(&errBody). + SetCookie(userCookie). + SetResult(&videoBody).Get(endpointVideo) + require.NoError(t, err) + require.True(t, resp.IsSuccess()) + require.Equal(t, http.StatusOK, resp.StatusCode()) + require.Empty(t, errBody.Error) + require.NotEmpty(t, videoBody) + + return videoBody +} + +func videoCreate(t *testing.T, userCookie *http.Cookie) *video.Response { + t.Helper() + + var ( + videoBody video.Response + errBody common.Response + ) + resp, err := resty.New().R().SetHeader("Accept", "application/json"). + SetError(&errBody). + SetCookie(userCookie). + SetResult(&videoBody).Post(endpointVideo) + require.NoError(t, err) + require.True(t, resp.IsSuccess()) + require.Empty(t, errBody.Error) + require.NotEmpty(t, videoBody.CreatedAt) + require.NotEmpty(t, videoBody.ID) + require.NotEmpty(t, videoBody.UploadURL) + + status, err := videoBody.GetStatus() + require.NoError(t, err) + require.Equal(t, video.StatusCreated, status) + + return &videoBody +} + +func videoCreateFail(t *testing.T) { + t.Helper() + + var ( + videoBody video.Response + ) + resp, err := resty.New().R().SetHeader("Accept", "application/json"). + SetResult(&videoBody).Post(endpointVideo) + require.NoError(t, err) + require.True(t, resp.IsError()) + require.Equal(t, http.StatusUnauthorized, resp.StatusCode()) +} + +func watchVideo(t *testing.T, url string) { + t.Helper() + + resp, err := resty.New().R().Get(url) + require.NoError(t, err) + require.True(t, resp.IsSuccess()) + require.NotEmpty(t, resp.Body()) + + vMpd, err := mpd.MPDFromBytes(resp.Body()) + require.NoError(t, err) + + checkStaticMPD(t, vMpd) + downloadSegments(t, url) +} + +func downloadSegments(t *testing.T, url string) { + t.Helper() + + prefixURL := strings.TrimSuffix(url, "manifest.mpd") + + r := resty.New() + + downloadSegment(t, r, prefixURL+"vide1_init.mp4", "video/mp4") + downloadSegment(t, r, prefixURL+"vide1_1.m4s", "video/iso.segment") + downloadSegment(t, r, prefixURL+"vide1_2.m4s", "video/iso.segment") + downloadSegment(t, r, prefixURL+"vide1_3.m4s", "video/iso.segment") + downloadSegment(t, r, prefixURL+"vide1_4.m4s", "video/iso.segment") + + downloadSegment(t, r, prefixURL+"soun1_init.mp4", "video/mp4") + downloadSegment(t, r, prefixURL+"soun1_1.m4s", "video/iso.segment") + downloadSegment(t, r, prefixURL+"soun1_2.m4s", "video/iso.segment") + downloadSegment(t, r, prefixURL+"soun1_3.m4s", "video/iso.segment") + downloadSegment(t, r, prefixURL+"soun1_4.m4s", "video/iso.segment") + + t.Log("all segments downloaded") +} + +func downloadSegment(t *testing.T, r *resty.Client, url, contentType string) { + t.Helper() + + resp, err := r.R().Get(url) + require.NoError(t, err) + require.True(t, resp.IsSuccess()) + require.NotEmpty(t, resp.Body()) + + assert.Equal(t, contentType, resp.Header().Get("Content-Type")) +} + +func checkStaticMPD(t *testing.T, vMpd *mpd.MPD) { + t.Helper() + + assert.Equal(t, "urn:mpeg:dash:schema:mpd:2011", vMpd.XMLNs) + assert.Equal(t, mpd.ListOfProfilesType("urn:mpeg:dash:profile:isoff-on-demand:2011"), vMpd.Profiles) + assert.Equal(t, "static", *vMpd.Type) + assert.Equal(t, 10.0, vMpd.MediaPresentationDuration.Seconds()) + + require.NotEmpty(t, vMpd.Periods) + require.NotNil(t, vMpd.Periods[0]) + require.Len(t, vMpd.Periods[0].AdaptationSets, 2) + + var ( + vide, soun *mpd.RepresentationType + ) + for _, as := range vMpd.Periods[0].AdaptationSets { + require.NotNil(t, as) + require.NotNil(t, as.SegmentTemplate) + + st := as.SegmentTemplate + assert.Equal(t, "$RepresentationID$_$Number$"+mp4.SegmentSuffix, st.Media) + assert.Equal(t, "$RepresentationID$_"+mp4.SegmentSuffixInit, st.Initialization) + assert.Equal(t, uint32(1), *st.MultipleSegmentBaseType.StartNumber) + assert.NotZero(t, *st.MultipleSegmentBaseType.Duration) + assert.NotZero(t, *st.MultipleSegmentBaseType.SegmentBaseType.Timescale) + assert.Equalf(t, 3, int(*st.MultipleSegmentBaseType.Duration / + *st.MultipleSegmentBaseType.SegmentBaseType.Timescale), + "segment duration should be equal to processor.segment_duration") + + require.Len(t, as.Representations, 1) + + switch as.Representations[0].Id { + case "vide1": + vide = as.Representations[0] + assert.Equal(t, "avc1.64001f", vide.RepresentationBaseType.Codecs) + assert.Equal(t, "video/mp4", as.RepresentationBaseType.MimeType) + case "soun1": + soun = as.Representations[0] + assert.Equal(t, "mp4a.40.2", soun.RepresentationBaseType.Codecs) + assert.Equal(t, "audio/mp4", as.RepresentationBaseType.MimeType) + } + } + require.NotNilf(t, vide, "video representation must be in MPD") + require.NotNilf(t, soun, "audio representation must be in MPD") + + t.Log("mpd is ok") +} diff --git a/e2e/e2e_test.go b/e2e/e2e_test.go index 385aa11..a70766b 100644 --- a/e2e/e2e_test.go +++ b/e2e/e2e_test.go @@ -5,131 +5,52 @@ package e2e import ( "context" - "net/http" + "github.com/adwski/vidi/internal/app/processor" + "github.com/adwski/vidi/internal/app/streamer" + "github.com/adwski/vidi/internal/app/uploader" + "github.com/adwski/vidi/internal/app/video" "os" + "sync" "testing" "time" - common "github.com/adwski/vidi/internal/api/model" - "github.com/adwski/vidi/internal/api/user/model" "github.com/adwski/vidi/internal/app/user" - "github.com/go-resty/resty/v2" - "github.com/stretchr/testify/require" ) func TestMain(m *testing.M) { var ( - done = make(chan struct{}) + wg = &sync.WaitGroup{} ctx, cancel = context.WithCancel(context.Background()) ) + wg.Add(5) go func() { user.NewApp().RunWithContextAndConfig(ctx, "userapi.yaml") - done <- struct{}{} + wg.Done() + }() + go func() { + video.NewApp().RunWithContextAndConfig(ctx, "videoapi.yaml") + wg.Done() + }() + go func() { + uploader.NewApp().RunWithContextAndConfig(ctx, "uploader.yaml") + wg.Done() + }() + go func() { + processor.NewApp().RunWithContextAndConfig(ctx, "processor.yaml") + wg.Done() + }() + go func() { + streamer.NewApp().RunWithContextAndConfig(ctx, "streamer.yaml") + wg.Done() }() - time.Sleep(time.Second) + time.Sleep(3 * time.Second) code := m.Run() cancel() - <-done + wg.Wait() defer func() { os.Exit(code) }() } - -func TestE2EUserRegistration(t *testing.T) { - //------------------------------------------------------------------------------- - // Login with not-existent user - //------------------------------------------------------------------------------- - userLoginFail(t, &model.UserRequest{ - Username: "qweqweqwe", - Password: "asdasdasd", - }) - - //------------------------------------------------------------------------------- - // Register user - //------------------------------------------------------------------------------- - cookie := userRegister(t, &model.UserRequest{ - Username: "qweqweqwe", - Password: "asdasdasd", - }) - t.Logf("user is registered, token: %v", cookie.Value) - - //------------------------------------------------------------------------------- - // Login with existent user - //------------------------------------------------------------------------------- - cookie2 := userLogin(t, &model.UserRequest{ - Username: "qweqweqwe", - Password: "asdasdasd", - }) - t.Logf("user is logged in, token: %v", cookie2.Value) - - //------------------------------------------------------------------------------- - // Login with not-existent user - //------------------------------------------------------------------------------- - userLoginFail(t, &model.UserRequest{ - Username: "qweqweqwe1", - Password: "asdasdasd2", - }) -} - -func userRegister(t *testing.T, user *model.UserRequest) *http.Cookie { - t.Helper() - - resp, body := makeCommonRequest(t, "http://localhost:18081/api/user/register", user) - require.True(t, resp.IsSuccess()) - require.Empty(t, body.Error) - require.Equal(t, "registration complete", body.Message) - return getCookieWithToken(t, resp.Cookies()) -} - -func userLogin(t *testing.T, user *model.UserRequest) *http.Cookie { - t.Helper() - - resp, body := makeCommonRequest(t, "http://localhost:18081/api/user/login", user) - require.True(t, resp.IsSuccess()) - require.Empty(t, body.Error) - require.Equal(t, "login ok", body.Message) - return getCookieWithToken(t, resp.Cookies()) -} - -func userLoginFail(t *testing.T, user *model.UserRequest) { - t.Helper() - - resp, body := makeCommonRequest(t, "http://localhost:18081/api/user/login", user) - require.Truef(t, resp.IsError(), "user should not exist") - require.Empty(t, body.Message) - require.NotEmpty(t, body.Error) -} - -func makeCommonRequest(t *testing.T, url string, reqBody interface{}) (*resty.Response, *common.Response) { - t.Helper() - - var ( - body common.Response - ) - resp, err := resty.New().R().SetHeader("Accept", "application/json"). - SetError(&body). - SetResult(&body). - SetBody(reqBody).Post(url) - require.NoError(t, err) - return resp, &body -} - -func getCookieWithToken(t *testing.T, cookies []*http.Cookie) *http.Cookie { - t.Helper() - - var ( - userCookie *http.Cookie - ) - for _, cookie := range cookies { - if cookie.Name == "vidiSessID" { - userCookie = cookie - break - } - } - require.NotNilf(t, userCookie, "cookie should exist") - require.NotEmpty(t, userCookie.Value, "cookie should not be empty") - return userCookie -} diff --git a/e2e/processor.yaml b/e2e/processor.yaml new file mode 100644 index 0000000..f4cbb1a --- /dev/null +++ b/e2e/processor.yaml @@ -0,0 +1,15 @@ +log: + level: debug +videoapi: + endpoint: http://localhost:18082/api/video + token: "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1aWQiOiJzdmMtQnNwMjhpcG1TSFdkSTdTSXdQMEo0QSIsIm5hbWUiOiJub3RpZmljYXRvciIsInJvbGUiOiJzZXJ2aWNlIiwiZXhwIjoxNzM2ODgwMDk0fQ.xjMV5Z_EBGch6fYM9ZSSC3W9QKjij2S0zI8HGz8wrAY" +s3: + prefix_upload: /upload + prefix_watch: /watch + endpoint: localhost:9000 + access_key: admin + secret_key: password + bucket: vidi +processor: + segment_duration: 3s + video_check_period: 5s diff --git a/e2e/streamer.yaml b/e2e/streamer.yaml new file mode 100644 index 0000000..1104984 --- /dev/null +++ b/e2e/streamer.yaml @@ -0,0 +1,14 @@ +log: + level: debug +server: + address: ":18084" +api: + prefix: /watch +redis: + dsn: redis://localhost:6379/0 +s3: + prefix_watch: /watch + endpoint: localhost:9000 + access_key: admin + secret_key: password + bucket: vidi diff --git a/e2e/uploader.yaml b/e2e/uploader.yaml new file mode 100644 index 0000000..5672abc --- /dev/null +++ b/e2e/uploader.yaml @@ -0,0 +1,17 @@ +log: + level: debug +server: + address: ":18083" +api: + prefix: /upload +redis: + dsn: redis://localhost:6379/0 +videoapi: + endpoint: http://localhost:18082/api/video + token: "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1aWQiOiJzdmMtQnNwMjhpcG1TSFdkSTdTSXdQMEo0QSIsIm5hbWUiOiJub3RpZmljYXRvciIsInJvbGUiOiJzZXJ2aWNlIiwiZXhwIjoxNzM2ODgwMDk0fQ.xjMV5Z_EBGch6fYM9ZSSC3W9QKjij2S0zI8HGz8wrAY" +s3: + prefix_upload: /upload + endpoint: localhost:9000 + access_key: admin + secret_key: password + bucket: vidi diff --git a/e2e/userapi_test.go b/e2e/userapi_test.go new file mode 100644 index 0000000..900c8f1 --- /dev/null +++ b/e2e/userapi_test.go @@ -0,0 +1,48 @@ +//go:build e2e +// +build e2e + +package e2e + +import ( + "testing" + + "github.com/adwski/vidi/internal/api/user/model" +) + +func TestUserRegistration(t *testing.T) { + //------------------------------------------------------------------------------- + // Login with not-existent user + //------------------------------------------------------------------------------- + userLoginFail(t, &model.UserRequest{ + Username: "testuser", + Password: "testpass", + }) + + //------------------------------------------------------------------------------- + // Register user + //------------------------------------------------------------------------------- + cookie := userRegister(t, &model.UserRequest{ + Username: "testuser", + Password: "testpass", + }) + t.Logf("user is registered, token: %v", cookie.Value) +} + +func TestUserLogin(t *testing.T) { + //------------------------------------------------------------------------------- + // Login with existent user + //------------------------------------------------------------------------------- + cookie2 := userLogin(t, &model.UserRequest{ + Username: "testuser", + Password: "testpass", + }) + t.Logf("user is logged in, token: %v", cookie2.Value) + + //------------------------------------------------------------------------------- + // Login with not-existent user + //------------------------------------------------------------------------------- + userLoginFail(t, &model.UserRequest{ + Username: "testuser2", + Password: "testpass2", + }) +} diff --git a/e2e/videoapi.yaml b/e2e/videoapi.yaml new file mode 100644 index 0000000..24b230a --- /dev/null +++ b/e2e/videoapi.yaml @@ -0,0 +1,14 @@ +log: + level: debug +server: + address: ":18082" +api: + prefix: /api/video +database: + dsn: postgres://videoapi:videoapi@localhost:5400/videoapi?sslmode=disable +redis: + dsn: redis://localhost:6379/0 +media: + url: + watch: http://localhost:18084/watch + upload: http://localhost:18083/upload diff --git a/e2e/videoapi_test.go b/e2e/videoapi_test.go new file mode 100644 index 0000000..4afe3ed --- /dev/null +++ b/e2e/videoapi_test.go @@ -0,0 +1,122 @@ +//go:build e2e +// +build e2e + +package e2e + +import ( + "testing" + "time" + + user "github.com/adwski/vidi/internal/api/user/model" + video "github.com/adwski/vidi/internal/api/video/model" + "github.com/stretchr/testify/require" +) + +func TestCreateAndDeleteVideo(t *testing.T) { + //------------------------------------------------------------------------------- + // Create video with no cookie + //------------------------------------------------------------------------------- + videoCreateFail(t) + + //------------------------------------------------------------------------------- + // Login with existent user + //------------------------------------------------------------------------------- + cookie := userLogin(t, &user.UserRequest{ + Username: "testuser", + Password: "testpass", + }) + t.Logf("user logged in, token: %v", cookie.Value) + + //------------------------------------------------------------------------------- + // Create video + //------------------------------------------------------------------------------- + videoResponse := videoCreate(t, cookie) + t.Logf("video created, id: %s, upload url: %v", videoResponse.ID, videoResponse.UploadURL) + + //------------------------------------------------------------------------------- + // Get video + //------------------------------------------------------------------------------- + videoResponse2 := videoGet(t, cookie, videoResponse.ID) + t.Logf("video retrieved, id: %s", videoResponse2.ID) + + require.Equal(t, videoResponse.ID, videoResponse2.ID) + require.Equal(t, videoResponse.Status, videoResponse2.Status) + require.Equal(t, videoResponse.CreatedAt, videoResponse2.CreatedAt) + + //------------------------------------------------------------------------------- + // Delete video + //------------------------------------------------------------------------------- + videoDelete(t, cookie, videoResponse.ID) + t.Logf("video deleted, id: %s", videoResponse.ID) +} + +func TestCreateAndUploadVideo(t *testing.T) { + //------------------------------------------------------------------------------- + // Login with existent user + //------------------------------------------------------------------------------- + cookie := userLogin(t, &user.UserRequest{ + Username: "testuser", + Password: "testpass", + }) + t.Logf("user logged in, token: %v", cookie.Value) + + //------------------------------------------------------------------------------- + // Create video + //------------------------------------------------------------------------------- + videoResponse := videoCreate(t, cookie) + t.Logf("video created, id: %s, upload url: %v", videoResponse.ID, videoResponse.UploadURL) + + //------------------------------------------------------------------------------- + // Upload video + //------------------------------------------------------------------------------- + videoUpload(t, videoResponse.UploadURL) + + //------------------------------------------------------------------------------- + // Wait until processed + //------------------------------------------------------------------------------- + deadline := time.After(10 * time.Second) +Loop: + for { + select { + case <-time.After(3 * time.Second): + videoResponse2 := videoGet(t, cookie, videoResponse.ID) + t.Logf("video retrieved, id: %s, status: %v", videoResponse2.ID, videoResponse2.Status) + status, err := videoResponse2.GetStatus() + require.NoError(t, err) + if status != video.StatusReady { + continue Loop + } + break Loop + + case <-deadline: + t.Errorf("video did not became ready") + break Loop + } + } + t.Log("video processed") +} + +func TestWatchVideo(t *testing.T) { + //------------------------------------------------------------------------------- + // Login with existent user + //------------------------------------------------------------------------------- + cookie := userLogin(t, &user.UserRequest{ + Username: "testuser", + Password: "testpass", + }) + t.Logf("user logged in, token: %v", cookie.Value) + + //------------------------------------------------------------------------------- + // Get videos + //------------------------------------------------------------------------------- + videosResponse := videoGetAll(t, cookie) + t.Logf("videos retrieved: %d", len(videosResponse)) + + //------------------------------------------------------------------------------- + // Get watch URL + //------------------------------------------------------------------------------- + watchResponse := videoWatch(t, cookie, videosResponse[0]) + t.Logf("watch url retrieved: %s", watchResponse.WatchURL) + + watchVideo(t, watchResponse.WatchURL) +} diff --git a/internal/api/video/client/client.go b/internal/api/video/client/client.go index fb66b2c..8c265f9 100644 --- a/internal/api/video/client/client.go +++ b/internal/api/video/client/client.go @@ -57,7 +57,7 @@ func (c *Client) GetUploadedVideos(ctx context.Context) ([]*model.Video, error) SetError(&errResponse). SetResult(&videosResponse). SetBody(&model.ListRequest{ - Status: model.VideoStatusUploaded, + Status: model.StatusUploaded, }). Post(fmt.Sprintf("%s/service/search", c.endpoint)) if err != nil { @@ -84,7 +84,7 @@ func (c *Client) UpdateVideo(videoID, status, location string) error { response, req := c.constructUpdateRequest() spew.Dump(req.Token) resp, err := req. - SetBody(&model.VideoUpdateRequest{ + SetBody(&model.UpdateRequest{ Status: status, Location: location, }). diff --git a/internal/api/video/model/model.go b/internal/api/video/model/model.go index 7938740..935f980 100644 --- a/internal/api/video/model/model.go +++ b/internal/api/video/model/model.go @@ -19,18 +19,22 @@ type Video struct { Status Status `json:"status,omitempty"` } -type VideoUpdateRequest struct { +type UpdateRequest struct { Status string `json:"status"` Location string `json:"location"` } -type VideoResponse struct { +type Response struct { ID string `json:"id"` Status string `json:"status"` CreatedAt string `json:"created_at"` UploadURL string `json:"upload_url,omitempty"` } +func (r *Response) GetStatus() (Status, error) { + return GetStatusFromName(r.Status) +} + type ListRequest struct { Status Status `json:"status"` } @@ -45,34 +49,35 @@ type WatchResponse struct { func NewVideo(id, userID string) *Video { return &Video{ - CreatedAt: time.Now(), + CreatedAt: time.Now().In(time.UTC), ID: id, UserID: userID, - Status: VideoStatusCreated, + Status: StatusCreated, } } -func (v *Video) Response() *VideoResponse { - return &VideoResponse{ +func (v *Video) Response() *Response { + return &Response{ ID: v.ID, Status: v.Status.String(), - CreatedAt: v.CreatedAt.String(), + CreatedAt: v.CreatedAt.Format(time.RFC3339), } } -func (v *Video) UploadResponse(url string) *VideoResponse { - return &VideoResponse{ - ID: v.ID, - Status: v.Status.String(), - CreatedAt: v.CreatedAt.String(), - UploadURL: url, - } +func (v *Video) UploadResponse(url string) *Response { + resp := v.Response() + resp.UploadURL = url + return resp } func (v *Video) IsReady() bool { - return v.Status == VideoStatusReady + return v.Status == StatusReady +} + +func (v *Video) IsCreated() bool { + return v.Status == StatusCreated } func (v *Video) IsErrored() bool { - return v.Status == VideoStatusError + return v.Status == StatusError } diff --git a/internal/api/video/model/status.go b/internal/api/video/model/status.go index f480c38..851fd1c 100644 --- a/internal/api/video/model/status.go +++ b/internal/api/video/model/status.go @@ -9,23 +9,23 @@ import ( // Video statuses. // TODO: May be transitive statuses are redundant? const ( - VideoStatusError Status = iota - 1 - VideoStatusCreated - VideoStatusUploading - VideoStatusUploaded - VideoStatusProcessing - VideoStatusReady + StatusError Status = iota - 1 + StatusCreated + StatusUploading + StatusUploaded + StatusProcessing + StatusReady ) type Status int var statusNames = map[Status]string{ - VideoStatusError: "error", - VideoStatusCreated: "created", - VideoStatusUploading: "uploading", - VideoStatusUploaded: "uploaded", - VideoStatusProcessing: "processing", - VideoStatusReady: "ready", + StatusError: "error", + StatusCreated: "created", + StatusUploading: "uploading", + StatusUploaded: "uploaded", + StatusProcessing: "processing", + StatusReady: "ready", } var statusFromName = make(map[string]Status) diff --git a/internal/api/video/service.go b/internal/api/video/service.go index 78787d1..8511899 100644 --- a/internal/api/video/service.go +++ b/internal/api/video/service.go @@ -24,6 +24,7 @@ import ( type Store interface { Create(ctx context.Context, vi *model.Video) error Get(ctx context.Context, id string, userID string) (*model.Video, error) + GetAll(ctx context.Context, userID string) ([]*model.Video, error) Delete(ctx context.Context, id string, userID string) error GetListByStatus(ctx context.Context, status model.Status) ([]*model.Video, error) @@ -92,6 +93,7 @@ func NewService(cfg *ServiceConfig) (*Service, error) { userAPI := api.Group("/user") userAPI.Use(authenticator.MiddlewareUser()) userAPI.GET("/:id", svc.getVideo) + userAPI.GET("/", svc.getVideos) userAPI.POST("/", svc.createVideo) userAPI.POST("/:id/watch", svc.watchVideo) userAPI.DELETE("/:id", svc.deleteVideo) diff --git a/internal/api/video/serviceapi.go b/internal/api/video/serviceapi.go index 9328226..19dd376 100644 --- a/internal/api/video/serviceapi.go +++ b/internal/api/video/serviceapi.go @@ -86,7 +86,7 @@ func (svc *Service) updateVideo(c echo.Context) error { } func getVideoFromUpdateRequest(c echo.Context) (*model.Video, error) { - var req model.VideoUpdateRequest + var req model.UpdateRequest if err := c.Bind(&req); err != nil { return nil, errors.New("invalid params") } diff --git a/internal/api/video/store/store.go b/internal/api/video/store/store.go index 8080284..9ee360f 100644 --- a/internal/api/video/store/store.go +++ b/internal/api/video/store/store.go @@ -60,6 +60,30 @@ func (s *Store) Get(ctx context.Context, id, userID string) (*model.Video, error return vi, nil } +func (s *Store) GetAll(ctx context.Context, userID string) ([]*model.Video, error) { + query := `select id, location, status, created_at from videos where user_id = $1` + rows, err := s.Pool().Query(ctx, query, userID) + if err != nil { + err = model.ErrNotFound + return nil, err + } + videos, errR := pgx.CollectRows(rows, func(row pgx.CollectableRow) (*model.Video, error) { + var vi model.Video + vi.UserID = userID + if errS := row.Scan(&vi.ID, &vi.Location, &vi.Status, &vi.CreatedAt); errS != nil { + return nil, fmt.Errorf("error while scanning row: %w", errS) + } + return &vi, nil + }) + if errR != nil { + return nil, fmt.Errorf("error while collecting rows: %w", errR) + } + if len(videos) == 0 { + return nil, model.ErrNotFound + } + return videos, nil +} + func (s *Store) Delete(ctx context.Context, id, userID string) error { query := `delete from videos where id = $1 and user_id = $2` tag, err := s.Pool().Exec(ctx, query, id, userID) diff --git a/internal/api/video/userapi.go b/internal/api/video/userapi.go index 16c7865..0d82152 100644 --- a/internal/api/video/userapi.go +++ b/internal/api/video/userapi.go @@ -42,6 +42,22 @@ func (svc *Service) getVideo(c echo.Context) error { return svc.erroredResponse(c, errV) } +func (svc *Service) getVideos(c echo.Context) error { + u, err, ok := svc.getUserSession(c) + if !ok { + return err + } + videos, errV := svc.s.GetAll(c.Request().Context(), u.ID) + if errV != nil { + return svc.erroredResponse(c, errV) + } + resp := make([]*model.Response, 0, len(videos)) + for _, v := range videos { + resp = append(resp, v.Response()) + } + return c.JSON(http.StatusOK, resp) +} + func (svc *Service) watchVideo(c echo.Context) error { u, err, ok := svc.getUserSession(c) if !ok { diff --git a/internal/media/processor/processor.go b/internal/media/processor/processor.go index d5abb43..b9dc708 100644 --- a/internal/media/processor/processor.go +++ b/internal/media/processor/processor.go @@ -10,7 +10,7 @@ import ( "time" "github.com/adwski/vidi/internal/api/video/client" - "github.com/adwski/vidi/internal/api/video/model" + video "github.com/adwski/vidi/internal/api/video/model" "github.com/adwski/vidi/internal/event" "github.com/adwski/vidi/internal/event/notificator" "go.uber.org/zap" @@ -95,38 +95,38 @@ func (p *Processor) checkAndProcessVideos(ctx context.Context) { p.logger.Info("got videos for processing", zap.Int("count", len(videos))) - for _, video := range videos { + for _, v := range videos { p.notificator.Send(&event.Event{ - Video: model.Video{ - ID: video.ID, - Status: model.VideoStatusProcessing, + Video: video.Video{ + ID: v.ID, + Status: video.StatusProcessing, }, Kind: event.KindUpdateStatus, }) - if err = p.processVideo(ctx, video); err != nil { + if err = p.processVideo(ctx, v); err != nil { // TODO In the future we should distinguish between errors caused by video content // and any other error. For example i/o errors are related to other failures // and in such cases video processing could be retried later. (So we need retry mechanism). p.notificator.Send(&event.Event{ - Video: model.Video{ - ID: video.ID, - Status: model.VideoStatusError, + Video: video.Video{ + ID: v.ID, + Status: video.StatusError, }, Kind: event.KindUpdateStatus, }) p.logger.Error("error while processing video", - zap.String("id", video.ID), - zap.String("location", video.Location), + zap.String("id", v.ID), + zap.String("location", v.Location), zap.Error(err)) continue } p.logger.Debug("video processed successfully", - zap.String("id", video.ID), - zap.String("location", video.Location)) + zap.String("id", v.ID), + zap.String("location", v.Location)) p.notificator.Send(&event.Event{ - Video: model.Video{ - ID: video.ID, - Status: model.VideoStatusReady, + Video: video.Video{ + ID: v.ID, + Status: video.StatusReady, }, Kind: event.KindUpdateStatus, }) @@ -134,8 +134,8 @@ func (p *Processor) checkAndProcessVideos(ctx context.Context) { p.logger.Debug("processing done") } -func (p *Processor) processVideo(ctx context.Context, video *model.Video) error { - fullInputPath := fmt.Sprintf("%s/%s/%s", p.inputPathPrefix, video.Location, defaultMediaStoreArtifactName) +func (p *Processor) processVideo(ctx context.Context, v *video.Video) error { + fullInputPath := fmt.Sprintf("%s/%s/%s", p.inputPathPrefix, v.Location, defaultMediaStoreArtifactName) rc, _, err := p.st.Get(ctx, fullInputPath) if err != nil { return fmt.Errorf("cannot get input file: %w", err) @@ -145,7 +145,7 @@ func (p *Processor) processVideo(ctx context.Context, video *model.Video) error p.logger.Error("error closing storage reader", zap.Error(errC)) } }() - outLocation := fmt.Sprintf("%s/%s", p.outputPathPrefix, video.Location) + outLocation := fmt.Sprintf("%s/%s", p.outputPathPrefix, v.Location) if err = p.ProcessFileFromReader(ctx, rc, outLocation); err != nil { return fmt.Errorf("error processing file: %w", err) } diff --git a/internal/media/uploader/service.go b/internal/media/uploader/service.go index e6bbda9..07e430b 100644 --- a/internal/media/uploader/service.go +++ b/internal/media/uploader/service.go @@ -8,7 +8,7 @@ import ( "io" "strings" - "github.com/adwski/vidi/internal/api/video/model" + video "github.com/adwski/vidi/internal/api/video/model" "github.com/adwski/vidi/internal/event" "github.com/adwski/vidi/internal/event/notificator" "github.com/adwski/vidi/internal/media/store/s3" @@ -125,9 +125,9 @@ func (svc *Service) handleUpload(ctx *fasthttp.RequestCtx) { // Proceed with upload // -------------------------------------------------- svc.notificator.Send(&event.Event{ // send uploading event - Video: model.Video{ + Video: video.Video{ ID: sess.VideoID, - Status: model.VideoStatusUploading, + Status: video.StatusUploading, }, Kind: event.KindUpdateStatus, }) @@ -188,9 +188,9 @@ func (svc *Service) postProcess(sess *session.Session) { // TODO Should we also send error status event in case of upload error // or allow user to retry upload? svc.notificator.Send(&event.Event{ - Video: model.Video{ + Video: video.Video{ ID: sess.VideoID, - Status: model.VideoStatusUploaded, + Status: video.StatusUploaded, Location: sess.ID, }, Kind: event.KindUpdateStatusAndLocation, diff --git a/internal/mp4/dumper.go b/internal/mp4/dumper.go index 5a50d57..38bcd41 100644 --- a/internal/mp4/dumper.go +++ b/internal/mp4/dumper.go @@ -2,6 +2,8 @@ package mp4 import ( "fmt" + "io" + "os" "time" mp4ff "github.com/Eyevinn/mp4ff/mp4" @@ -19,26 +21,30 @@ func Dump(path string, segDuration time.Duration) { if segDuration < defaultSegmentDuration { segmentDuration = defaultSegmentDuration } + + dump(os.Stdout, path, segmentDuration) +} +func dump(w io.Writer, path string, segmentDuration time.Duration) { mF, err := mp4ff.ReadMP4File(path) if err != nil { fmt.Printf("cannot open mp4 file: %v\n", err) return } - fmt.Printf("ftyp: %s\n", mF.Ftyp) - fmt.Printf("segmented: %v\n", mF.IsFragmented()) + printW(w, "ftyp: %s\n", mF.Ftyp) + printW(w, "segmented: %v\n", mF.IsFragmented()) vTrack, timescale, totalDuration, errV := segmentation.GetFirstVideoTrackParams(mF) if errV != nil { fmt.Printf("cannot get first video track: %v\n", errV) return } - fmt.Printf("timescale: %d units per second\n", timescale) - fmt.Printf("duration: %v\n", time.Duration(totalDuration/uint64(timescale))*time.Second) - fmt.Println("\nsegmentation info:") + printW(w, "timescale: %d units per second\n", timescale) + printW(w, "duration: %v\n", time.Duration(totalDuration/uint64(timescale))*time.Second) + printW(w, "\nsegmentation info:\n") segmentPoints, errSP := segmentation.MakePoints(vTrack, timescale, segmentDuration) - fmt.Printf("segment points with %v duration (err: %v): %v\n", segmentDuration, errSP, segmentPoints) + printW(w, "segment points with %v duration (err: %v): %v\n", segmentDuration, errSP, segmentPoints) if errSP != nil { return } @@ -48,13 +54,13 @@ func Dump(path string, segDuration time.Duration) { audioTrack bool ) for _, track := range mF.Moov.Traks { - fmt.Printf("TrackID: %v, type: %v, sampleCount: %v\n", + printW(w, "TrackID: %v, type: %v, sampleCount: %v\n", track.Tkhd.TrackID, track.Mdia.Hdlr.HandlerType, track.Mdia.Minf.Stbl.Stts.SampleCount) codec, errC := meta.NewCodecFromSTSD(track.Mdia.Minf.Stbl.Stsd) - fmt.Printf("Codec info: %v (err: %v)\n", codec, errC) + printW(w, "Codec info: %v (err: %v)\n", codec, errC) if errC == nil { switch track.Mdia.Hdlr.HandlerType { case "vide": @@ -65,12 +71,19 @@ func Dump(path string, segDuration time.Duration) { } sI, errSI := segmentation.MakeIntervals(timescale, segmentPoints, track) - fmt.Printf("Segment intervals (err: %v): %v\n", errSI, sI) + printW(w, "Segment intervals (err: %v): %v\n", errSI, sI) } if videoTrack && audioTrack { - fmt.Println("\nCodecs are supported!") + printW(w, "\nCodecs are supported!\n") } else { - fmt.Println("\nSome codec is not yet supported!") + printW(w, "\nSome codec is not yet supported!\n") + } +} + +func printW(w io.Writer, format string, a ...any) { + _, err := fmt.Fprintf(w, format, a...) + if err != nil { + panic(err) } } diff --git a/internal/mp4/dumper_test.go b/internal/mp4/dumper_test.go new file mode 100644 index 0000000..099b8d5 --- /dev/null +++ b/internal/mp4/dumper_test.go @@ -0,0 +1,25 @@ +package mp4 + +import ( + "bytes" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestDump(t *testing.T) { + buf := bytes.NewBuffer(make([]byte, 0, 10000)) + + dump(buf, "../../testfiles/test_seq_h264_high.mp4", time.Second) + + require.NotZero(t, buf.Len()) + str := buf.String() + + assert.Contains(t, str, "timescale: 15360 units per second") + assert.Contains(t, str, "duration: 10s") + assert.Contains(t, str, "TrackID: 1, type: vide, sampleCount: [300]") + assert.Contains(t, str, "TrackID: 2, type: soun, sampleCount: [472]") + assert.Contains(t, str, "Codecs are supported!") +}