Skip to content

Commit

Permalink
add comments
Browse files Browse the repository at this point in the history
  • Loading branch information
adwski committed May 16, 2024
1 parent e39f4e7 commit 9564338
Show file tree
Hide file tree
Showing 20 changed files with 119 additions and 40 deletions.
2 changes: 1 addition & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Options for analysis running.
run:
go: "1.21"
go: "1.22"
# Settable parameters #
timeout: 5m
tests: true
Expand Down
4 changes: 4 additions & 0 deletions e2e/vidit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ func init() {
VIDITe2eUser = VIDITe2eUserTmpl + strconv.Itoa(int(time.Now().Unix()))
}

// TestVidit_MainFlow tests main vidit functions.
// In needs db, redis and s3 containers running.
// Only TestVidit_MainFlow could be executed individually with go test -run,
// remaining tests in this file rely on side effects of TestVidit_MainFlow.
func TestVidit_MainFlow(t *testing.T) {
// --------------------------------------------------------------------------------------
// Prepare remote config and serve it
Expand Down
2 changes: 2 additions & 0 deletions internal/api/http/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"github.com/labstack/echo/v4/middleware"
)

// GetEchoWithDefaultMiddleware returns configured echo middleware
// to be used together with http servers.
func GetEchoWithDefaultMiddleware() *echo.Echo {
e := echo.New()

Expand Down
2 changes: 2 additions & 0 deletions internal/api/model/model.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package model

// Prepared http api common responses.
var (
ResponseUnauthorized = &Response{
Error: "unauthorized",
Expand All @@ -24,6 +25,7 @@ var (
}
)

// Response is a common response struct that is used by APIs.
type Response struct {
Message string `json:"msg,omitempty"`
Error string `json:"error,omitempty"`
Expand Down
3 changes: 3 additions & 0 deletions internal/cli/cli.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Package cli contains cli tool that has
// - helpful mp4 operations like dumping and segmenting mp4 file
// - video api service token creation (which can be used later in apps config).
package cli

import (
Expand Down
3 changes: 3 additions & 0 deletions internal/event/event.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Package event defines generic event that
// sent by processor and uploader to notify videoapi
// about video object changes.
package event

const (
Expand Down
2 changes: 1 addition & 1 deletion internal/event/notificator/notificator.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const (
)

// Notificator is asynchronous Video API notification service.
// It takes events and calls Video API in separate goroutine.
// It takes events and calls videoapi service-side API in separate goroutine.
//
// TODO In the future could be replaced with actual message queue.
type Notificator struct {
Expand Down
9 changes: 9 additions & 0 deletions internal/file/file.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
// Package file provides helpers that are used during video file upload.
package file

import (
Expand All @@ -16,6 +17,13 @@ type Part struct {
Size uint `json:"size"`
}

// MakePartsFromFile splits file to parts. It returns slice of parts with fixed size
// (but last part most probably will have less size) and calculated sha256 checksums.
//
// []Part actually just represent points in file, and later actual bytes are
// anyway read from source file using offsets.
//
// Main purpose of []Part is that it goes directly into Video object (which is sent to videoapi).
func MakePartsFromFile(filePath string, partSize, size uint64) ([]Part, error) {
f, err := os.Open(filePath)
if err != nil {
Expand Down Expand Up @@ -45,6 +53,7 @@ func MakePartsFromFile(filePath string, partSize, size uint64) ([]Part, error) {
return parts, nil
}

// GetSize opens file and returns its size.
func GetSize(filePath string) (uint64, error) {
f, err := os.Open(filePath)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions internal/generators/generators.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// Package generators provides simple unique string generator
// that is internally based on uuid v4.
package generators

import (
Expand Down
1 change: 1 addition & 0 deletions internal/logging/logger.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
// Package logging provides various zap logger initialization functions.
package logging

import (
Expand Down
8 changes: 0 additions & 8 deletions internal/media/processor/mpd.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,6 @@ func (p *Processor) generatePlaybackMeta(
}, nil
}

/*
b, err := metaCfg.StaticMPD()
if err != nil {
return nil, fmt.Errorf("cannot generate MPD: %w", err)
}
return b, nil
*/

func getMimeTypeFromMP4TrackHandlerType(handlerType string) (string, error) {
switch handlerType {
case "soun":
Expand Down
26 changes: 13 additions & 13 deletions internal/media/processor/mreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ const (
maxOpenReaders = 10
)

// MediaReader is an uploaded media file parts reader.
// mediaReader is an uploaded media file parts reader.
// It abstracts multiple io.ReadSeekClosers as one,
// so it can be used together with lazy read mode in mp4ff.
//
Expand All @@ -29,19 +29,19 @@ const (
// and routes Read/Seek call to reader with a particular number.
//
// If desired amount of data exceeds current reader's boundary
// MediaReader switches to next reader in order until data is fully read.
// mediaReader switches to next reader in order until data is fully read.
//
// To save memory and reduce number of open connections MediaReader
// To save memory and reduce number of open connections mediaReader
// only keeps last 'maxOpenReaders' readers open. If amount of readers
// is more than maxOpenReaders, several least recently used readers
// are closed in order to satisfy condition. Closed reader can be
// reopened just the same as if it was never used before.
//
// If error is occurred in any of the stages, Read/Seek will always
// return last error, and MediaReader must be closed.
// return last error, and mediaReader must be closed.
//
// Current implementation is not thread-safe and should be used by only one goroutine.
type MediaReader struct {
type mediaReader struct {
traceLogger *zap.Logger
store MediaStore
readers map[uint64]io.ReadSeekCloser
Expand All @@ -54,8 +54,8 @@ type MediaReader struct {
pos uint64
}

func NewMediaReader(ms MediaStore, path string, parts uint, totalSize, partSize uint64) *MediaReader {
return &MediaReader{
func newMediaReader(ms MediaStore, path string, parts uint, totalSize, partSize uint64) *mediaReader {
return &mediaReader{
store: ms,
parts: parts,
s3ath: path,
Expand All @@ -66,7 +66,7 @@ func NewMediaReader(ms MediaStore, path string, parts uint, totalSize, partSize
}
}

func (mr *MediaReader) Read(b []byte) (int, error) {
func (mr *mediaReader) Read(b []byte) (int, error) {
if mr.traceLogger != nil {
mr.traceLogger.Debug("read call", zap.Int("len", len(b)))
}
Expand Down Expand Up @@ -157,7 +157,7 @@ func (mr *MediaReader) Read(b []byte) (int, error) {
}
}

func (mr *MediaReader) Seek(offset int64, whence int) (int64, error) {
func (mr *mediaReader) Seek(offset int64, whence int) (int64, error) {
if mr.traceLogger != nil {
mr.traceLogger.Debug("seek call",
zap.Uint64("pos", mr.pos),
Expand Down Expand Up @@ -222,7 +222,7 @@ func (mr *MediaReader) Seek(offset int64, whence int) (int64, error) {
return int64(mr.pos), nil
}

func (mr *MediaReader) Close() error {
func (mr *mediaReader) Close() error {
var err error
for _, rc := range mr.readers {
if rErr := rc.Close(); rErr != nil {
Expand All @@ -234,7 +234,7 @@ func (mr *MediaReader) Close() error {
return err
}

func (mr *MediaReader) ensureReaderIsOpen(partNum uint64) error {
func (mr *mediaReader) ensureReaderIsOpen(partNum uint64) error {
if _, ok := mr.readers[partNum]; !ok {
// spawn reader if it not exists
var artifactName = fmt.Sprintf("%s/%d", mr.s3ath, partNum)
Expand All @@ -249,7 +249,7 @@ func (mr *MediaReader) ensureReaderIsOpen(partNum uint64) error {
return nil
}

func (mr *MediaReader) recycleReader(partNum uint64) error {
func (mr *mediaReader) recycleReader(partNum uint64) error {
err := mr.readers[partNum].Close()
delete(mr.readers, partNum)
delete(mr.readersTS, partNum)
Expand All @@ -259,7 +259,7 @@ func (mr *MediaReader) recycleReader(partNum uint64) error {
return nil
}

func (mr *MediaReader) gc() {
func (mr *mediaReader) gc() {
for len(mr.readers) > maxOpenReaders {
// sweep
var (
Expand Down
9 changes: 8 additions & 1 deletion internal/media/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@ type MediaStore interface {
Get(ctx context.Context, name string) (io.ReadSeekCloser, int64, error)
}

// Processor is worker-style app that polls videoapi for uploaded videos,
// processes them and notifies videoapi about results.
//
// Processing includes
// - segmentation
// - MPD generation
// Segments and static MPD are stored in MediaStore (s3).
type Processor struct {
logger *zap.Logger
notificator *notificator.Notificator
Expand Down Expand Up @@ -170,7 +177,7 @@ func (p *Processor) processVideo(ctx context.Context, v *pb.Video) ([]byte, erro
case defaultPartSize*uint64(len(v.Parts)-1) > v.Size || v.Size > defaultPartSize*uint64(len(v.Parts)):
return nil, fmt.Errorf("incorrect parts amount(%d) for video size(%d)", len(v.Parts), v.Size)
}
mr := NewMediaReader(
mr := newMediaReader(
p.st,
fmt.Sprintf("%s/%s", p.inputPathPrefix, v.Location),
uint(len(v.Parts)),
Expand Down
11 changes: 9 additions & 2 deletions internal/media/store/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,24 @@ package s3
import (
"context"
"encoding/base64"
"errors"
"fmt"
"io"
"net/http"

"github.com/adwski/vidi/internal/media/store"
"github.com/minio/minio-go/v7"
"github.com/minio/sha256-simd"
"go.uber.org/zap"
)

var ErrNotFount = errors.New("not found")

// Store is media store that uses s3 compatible storage.
// It implements simple Get/Set operations, and in addition
// it can calculate sha256 checksum of already uploaded object.
//
// TODO Seems like S3 API actually can calculate sha256 on server side,
// TODO but I couldn't get it working with minio. Need to investigate further.
type Store struct {
logger *zap.Logger
client *minio.Client
Expand Down Expand Up @@ -51,7 +58,7 @@ func (s *Store) Get(ctx context.Context, name string) (io.ReadSeekCloser, int64,
if errS != nil {
er := minio.ToErrorResponse(errS)
if er.StatusCode == http.StatusNotFound {
return nil, 0, store.ErrNotFount
return nil, 0, ErrNotFount
}
return nil, 0, fmt.Errorf("cannot get object stats: %w", errS)
}
Expand Down
5 changes: 0 additions & 5 deletions internal/media/store/store.go

This file was deleted.

5 changes: 2 additions & 3 deletions internal/media/streamer/service.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
// Package streamer contains media segments streaming app.
package streamer

import (
Expand All @@ -6,8 +7,6 @@ import (
"fmt"
"strings"

"github.com/adwski/vidi/internal/media/store"

"github.com/adwski/vidi/internal/media/store/s3"
"github.com/adwski/vidi/internal/session"
sessionStore "github.com/adwski/vidi/internal/session/store"
Expand Down Expand Up @@ -125,7 +124,7 @@ func (svc *Service) handleWatch(ctx *fasthttp.RequestCtx) {
// Get segment reader
rc, size, errS3 := svc.mediaS.Get(ctx, svc.getSegmentName(sess, path))
if errS3 != nil {
if errors.Is(errS3, store.ErrNotFount) {
if errors.Is(errS3, s3.ErrNotFount) {
ctx.Error(notFoundError, fasthttp.StatusNotFound)
return
}
Expand Down
6 changes: 5 additions & 1 deletion internal/media/uploader/service.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
// Package uploader contains app that handles media part uploads.
package uploader

import (
Expand Down Expand Up @@ -26,8 +27,11 @@ var (
)

// Service is a media file uploader service. It implements fasthttp handler that
// reads uploaded file and stores it in media store.
// reads uploaded part and stores it in media store.
// Every request is also checked for valid "upload"-session.
//
// After each successful part upload, uploader calculates sha256 checksum
// and asynchronously notifies videoapi.
type Service struct {
logger *zap.Logger
sessS *sessionStore.Store
Expand Down
2 changes: 2 additions & 0 deletions internal/mp4/meta/meta.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// Package meta contains playback metadata definition
// and static MPD generator.
package meta

import (
Expand Down
5 changes: 4 additions & 1 deletion internal/mp4/segmenter/segmenter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (

type BoxStoreFunc func(context.Context, string, mp4ff.BoxStructure, uint64) error

// Segmenter can segment progressive mp4 according to predefined segment duration.
// Segmenter segments progressive mp4 according to predefined segment duration.
// Resulting segments are passed to boxStoreFunc, and it is up to user to define how to store them.
//
// Segmentation flow:
Expand All @@ -26,6 +26,9 @@ type BoxStoreFunc func(context.Context, string, mp4ff.BoxStructure, uint64) erro
//
// This flow uses high-level functions, implemented in segmentation package.
//
// Configured segment duration should be treated like 'preference'.
// Segmenter can increase it if necessary in order to make segments with equal sizes.
//
// Tracks get new numbers since they will be in separate files.
// For example, if input file has video track with ID 0 and audio with ID 1,
// then in resulting video segments will be single track with ID 0,
Expand Down
Loading

0 comments on commit 9564338

Please sign in to comment.