From 95643380e9f96a5fd1de816530dd9d7edb0bb2f3 Mon Sep 17 00:00:00 2001 From: Artem Dvoretskii Date: Thu, 16 May 2024 20:06:21 +0300 Subject: [PATCH] add comments --- .golangci.yml | 2 +- e2e/vidit_test.go | 4 ++ internal/api/http/middleware.go | 2 + internal/api/model/model.go | 2 + internal/cli/cli.go | 3 ++ internal/event/event.go | 3 ++ internal/event/notificator/notificator.go | 2 +- internal/file/file.go | 9 ++++ internal/generators/generators.go | 2 + internal/logging/logger.go | 1 + internal/media/processor/mpd.go | 8 ---- internal/media/processor/mreader.go | 26 ++++++------ internal/media/processor/processor.go | 9 +++- internal/media/store/s3/s3.go | 11 ++++- internal/media/store/store.go | 5 --- internal/media/streamer/service.go | 5 +-- internal/media/uploader/service.go | 6 ++- internal/mp4/meta/meta.go | 2 + internal/mp4/segmenter/segmenter.go | 5 ++- internal/tool/tool.go | 52 +++++++++++++++++++++-- 20 files changed, 119 insertions(+), 40 deletions(-) delete mode 100644 internal/media/store/store.go diff --git a/.golangci.yml b/.golangci.yml index 7c27691..2d0821f 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -1,6 +1,6 @@ # Options for analysis running. run: - go: "1.21" + go: "1.22" # Settable parameters # timeout: 5m tests: true diff --git a/e2e/vidit_test.go b/e2e/vidit_test.go index 790926a..7978a3a 100644 --- a/e2e/vidit_test.go +++ b/e2e/vidit_test.go @@ -43,6 +43,10 @@ func init() { VIDITe2eUser = VIDITe2eUserTmpl + strconv.Itoa(int(time.Now().Unix())) } +// TestVidit_MainFlow tests main vidit functions. +// In needs db, redis and s3 containers running. +// Only TestVidit_MainFlow could be executed individually with go test -run, +// remaining tests in this file rely on side effects of TestVidit_MainFlow. func TestVidit_MainFlow(t *testing.T) { // -------------------------------------------------------------------------------------- // Prepare remote config and serve it diff --git a/internal/api/http/middleware.go b/internal/api/http/middleware.go index 61b47bb..2f0bcc6 100644 --- a/internal/api/http/middleware.go +++ b/internal/api/http/middleware.go @@ -6,6 +6,8 @@ import ( "github.com/labstack/echo/v4/middleware" ) +// GetEchoWithDefaultMiddleware returns configured echo middleware +// to be used together with http servers. func GetEchoWithDefaultMiddleware() *echo.Echo { e := echo.New() diff --git a/internal/api/model/model.go b/internal/api/model/model.go index 49f9ce3..39cb10f 100644 --- a/internal/api/model/model.go +++ b/internal/api/model/model.go @@ -1,5 +1,6 @@ package model +// Prepared http api common responses. var ( ResponseUnauthorized = &Response{ Error: "unauthorized", @@ -24,6 +25,7 @@ var ( } ) +// Response is a common response struct that is used by APIs. type Response struct { Message string `json:"msg,omitempty"` Error string `json:"error,omitempty"` diff --git a/internal/cli/cli.go b/internal/cli/cli.go index 7c6dfc1..1f42cde 100644 --- a/internal/cli/cli.go +++ b/internal/cli/cli.go @@ -1,3 +1,6 @@ +// Package cli contains cli tool that has +// - helpful mp4 operations like dumping and segmenting mp4 file +// - video api service token creation (which can be used later in apps config). package cli import ( diff --git a/internal/event/event.go b/internal/event/event.go index 89b693f..f91ccdf 100644 --- a/internal/event/event.go +++ b/internal/event/event.go @@ -1,3 +1,6 @@ +// Package event defines generic event that +// sent by processor and uploader to notify videoapi +// about video object changes. package event const ( diff --git a/internal/event/notificator/notificator.go b/internal/event/notificator/notificator.go index aa2fc0e..3797743 100644 --- a/internal/event/notificator/notificator.go +++ b/internal/event/notificator/notificator.go @@ -19,7 +19,7 @@ const ( ) // Notificator is asynchronous Video API notification service. -// It takes events and calls Video API in separate goroutine. +// It takes events and calls videoapi service-side API in separate goroutine. // // TODO In the future could be replaced with actual message queue. type Notificator struct { diff --git a/internal/file/file.go b/internal/file/file.go index 4be6a5b..b9d5e6f 100644 --- a/internal/file/file.go +++ b/internal/file/file.go @@ -1,3 +1,4 @@ +// Package file provides helpers that are used during video file upload. package file import ( @@ -16,6 +17,13 @@ type Part struct { Size uint `json:"size"` } +// MakePartsFromFile splits file to parts. It returns slice of parts with fixed size +// (but last part most probably will have less size) and calculated sha256 checksums. +// +// []Part actually just represent points in file, and later actual bytes are +// anyway read from source file using offsets. +// +// Main purpose of []Part is that it goes directly into Video object (which is sent to videoapi). func MakePartsFromFile(filePath string, partSize, size uint64) ([]Part, error) { f, err := os.Open(filePath) if err != nil { @@ -45,6 +53,7 @@ func MakePartsFromFile(filePath string, partSize, size uint64) ([]Part, error) { return parts, nil } +// GetSize opens file and returns its size. func GetSize(filePath string) (uint64, error) { f, err := os.Open(filePath) if err != nil { diff --git a/internal/generators/generators.go b/internal/generators/generators.go index 5369bf6..9be7e1f 100644 --- a/internal/generators/generators.go +++ b/internal/generators/generators.go @@ -1,3 +1,5 @@ +// Package generators provides simple unique string generator +// that is internally based on uuid v4. package generators import ( diff --git a/internal/logging/logger.go b/internal/logging/logger.go index db09841..ecace8c 100644 --- a/internal/logging/logger.go +++ b/internal/logging/logger.go @@ -1,3 +1,4 @@ +// Package logging provides various zap logger initialization functions. package logging import ( diff --git a/internal/media/processor/mpd.go b/internal/media/processor/mpd.go index cf18df9..05f35b9 100644 --- a/internal/media/processor/mpd.go +++ b/internal/media/processor/mpd.go @@ -56,14 +56,6 @@ func (p *Processor) generatePlaybackMeta( }, nil } -/* - b, err := metaCfg.StaticMPD() - if err != nil { - return nil, fmt.Errorf("cannot generate MPD: %w", err) - } - return b, nil -*/ - func getMimeTypeFromMP4TrackHandlerType(handlerType string) (string, error) { switch handlerType { case "soun": diff --git a/internal/media/processor/mreader.go b/internal/media/processor/mreader.go index b2573eb..9e7af61 100644 --- a/internal/media/processor/mreader.go +++ b/internal/media/processor/mreader.go @@ -14,7 +14,7 @@ const ( maxOpenReaders = 10 ) -// MediaReader is an uploaded media file parts reader. +// mediaReader is an uploaded media file parts reader. // It abstracts multiple io.ReadSeekClosers as one, // so it can be used together with lazy read mode in mp4ff. // @@ -29,19 +29,19 @@ const ( // and routes Read/Seek call to reader with a particular number. // // If desired amount of data exceeds current reader's boundary -// MediaReader switches to next reader in order until data is fully read. +// mediaReader switches to next reader in order until data is fully read. // -// To save memory and reduce number of open connections MediaReader +// To save memory and reduce number of open connections mediaReader // only keeps last 'maxOpenReaders' readers open. If amount of readers // is more than maxOpenReaders, several least recently used readers // are closed in order to satisfy condition. Closed reader can be // reopened just the same as if it was never used before. // // If error is occurred in any of the stages, Read/Seek will always -// return last error, and MediaReader must be closed. +// return last error, and mediaReader must be closed. // // Current implementation is not thread-safe and should be used by only one goroutine. -type MediaReader struct { +type mediaReader struct { traceLogger *zap.Logger store MediaStore readers map[uint64]io.ReadSeekCloser @@ -54,8 +54,8 @@ type MediaReader struct { pos uint64 } -func NewMediaReader(ms MediaStore, path string, parts uint, totalSize, partSize uint64) *MediaReader { - return &MediaReader{ +func newMediaReader(ms MediaStore, path string, parts uint, totalSize, partSize uint64) *mediaReader { + return &mediaReader{ store: ms, parts: parts, s3ath: path, @@ -66,7 +66,7 @@ func NewMediaReader(ms MediaStore, path string, parts uint, totalSize, partSize } } -func (mr *MediaReader) Read(b []byte) (int, error) { +func (mr *mediaReader) Read(b []byte) (int, error) { if mr.traceLogger != nil { mr.traceLogger.Debug("read call", zap.Int("len", len(b))) } @@ -157,7 +157,7 @@ func (mr *MediaReader) Read(b []byte) (int, error) { } } -func (mr *MediaReader) Seek(offset int64, whence int) (int64, error) { +func (mr *mediaReader) Seek(offset int64, whence int) (int64, error) { if mr.traceLogger != nil { mr.traceLogger.Debug("seek call", zap.Uint64("pos", mr.pos), @@ -222,7 +222,7 @@ func (mr *MediaReader) Seek(offset int64, whence int) (int64, error) { return int64(mr.pos), nil } -func (mr *MediaReader) Close() error { +func (mr *mediaReader) Close() error { var err error for _, rc := range mr.readers { if rErr := rc.Close(); rErr != nil { @@ -234,7 +234,7 @@ func (mr *MediaReader) Close() error { return err } -func (mr *MediaReader) ensureReaderIsOpen(partNum uint64) error { +func (mr *mediaReader) ensureReaderIsOpen(partNum uint64) error { if _, ok := mr.readers[partNum]; !ok { // spawn reader if it not exists var artifactName = fmt.Sprintf("%s/%d", mr.s3ath, partNum) @@ -249,7 +249,7 @@ func (mr *MediaReader) ensureReaderIsOpen(partNum uint64) error { return nil } -func (mr *MediaReader) recycleReader(partNum uint64) error { +func (mr *mediaReader) recycleReader(partNum uint64) error { err := mr.readers[partNum].Close() delete(mr.readers, partNum) delete(mr.readersTS, partNum) @@ -259,7 +259,7 @@ func (mr *MediaReader) recycleReader(partNum uint64) error { return nil } -func (mr *MediaReader) gc() { +func (mr *mediaReader) gc() { for len(mr.readers) > maxOpenReaders { // sweep var ( diff --git a/internal/media/processor/processor.go b/internal/media/processor/processor.go index bd193a0..03b551f 100644 --- a/internal/media/processor/processor.go +++ b/internal/media/processor/processor.go @@ -32,6 +32,13 @@ type MediaStore interface { Get(ctx context.Context, name string) (io.ReadSeekCloser, int64, error) } +// Processor is worker-style app that polls videoapi for uploaded videos, +// processes them and notifies videoapi about results. +// +// Processing includes +// - segmentation +// - MPD generation +// Segments and static MPD are stored in MediaStore (s3). type Processor struct { logger *zap.Logger notificator *notificator.Notificator @@ -170,7 +177,7 @@ func (p *Processor) processVideo(ctx context.Context, v *pb.Video) ([]byte, erro case defaultPartSize*uint64(len(v.Parts)-1) > v.Size || v.Size > defaultPartSize*uint64(len(v.Parts)): return nil, fmt.Errorf("incorrect parts amount(%d) for video size(%d)", len(v.Parts), v.Size) } - mr := NewMediaReader( + mr := newMediaReader( p.st, fmt.Sprintf("%s/%s", p.inputPathPrefix, v.Location), uint(len(v.Parts)), diff --git a/internal/media/store/s3/s3.go b/internal/media/store/s3/s3.go index b4c4769..49d4b35 100644 --- a/internal/media/store/s3/s3.go +++ b/internal/media/store/s3/s3.go @@ -3,17 +3,24 @@ package s3 import ( "context" "encoding/base64" + "errors" "fmt" "io" "net/http" - "github.com/adwski/vidi/internal/media/store" "github.com/minio/minio-go/v7" "github.com/minio/sha256-simd" "go.uber.org/zap" ) +var ErrNotFount = errors.New("not found") + // Store is media store that uses s3 compatible storage. +// It implements simple Get/Set operations, and in addition +// it can calculate sha256 checksum of already uploaded object. +// +// TODO Seems like S3 API actually can calculate sha256 on server side, +// TODO but I couldn't get it working with minio. Need to investigate further. type Store struct { logger *zap.Logger client *minio.Client @@ -51,7 +58,7 @@ func (s *Store) Get(ctx context.Context, name string) (io.ReadSeekCloser, int64, if errS != nil { er := minio.ToErrorResponse(errS) if er.StatusCode == http.StatusNotFound { - return nil, 0, store.ErrNotFount + return nil, 0, ErrNotFount } return nil, 0, fmt.Errorf("cannot get object stats: %w", errS) } diff --git a/internal/media/store/store.go b/internal/media/store/store.go deleted file mode 100644 index f0d20a7..0000000 --- a/internal/media/store/store.go +++ /dev/null @@ -1,5 +0,0 @@ -package store - -import "errors" - -var ErrNotFount = errors.New("not found") diff --git a/internal/media/streamer/service.go b/internal/media/streamer/service.go index 1fdd3b4..d0b6b8a 100644 --- a/internal/media/streamer/service.go +++ b/internal/media/streamer/service.go @@ -1,3 +1,4 @@ +// Package streamer contains media segments streaming app. package streamer import ( @@ -6,8 +7,6 @@ import ( "fmt" "strings" - "github.com/adwski/vidi/internal/media/store" - "github.com/adwski/vidi/internal/media/store/s3" "github.com/adwski/vidi/internal/session" sessionStore "github.com/adwski/vidi/internal/session/store" @@ -125,7 +124,7 @@ func (svc *Service) handleWatch(ctx *fasthttp.RequestCtx) { // Get segment reader rc, size, errS3 := svc.mediaS.Get(ctx, svc.getSegmentName(sess, path)) if errS3 != nil { - if errors.Is(errS3, store.ErrNotFount) { + if errors.Is(errS3, s3.ErrNotFount) { ctx.Error(notFoundError, fasthttp.StatusNotFound) return } diff --git a/internal/media/uploader/service.go b/internal/media/uploader/service.go index c300f55..5d831a8 100644 --- a/internal/media/uploader/service.go +++ b/internal/media/uploader/service.go @@ -1,3 +1,4 @@ +// Package uploader contains app that handles media part uploads. package uploader import ( @@ -26,8 +27,11 @@ var ( ) // Service is a media file uploader service. It implements fasthttp handler that -// reads uploaded file and stores it in media store. +// reads uploaded part and stores it in media store. // Every request is also checked for valid "upload"-session. +// +// After each successful part upload, uploader calculates sha256 checksum +// and asynchronously notifies videoapi. type Service struct { logger *zap.Logger sessS *sessionStore.Store diff --git a/internal/mp4/meta/meta.go b/internal/mp4/meta/meta.go index cc34b8b..905d080 100644 --- a/internal/mp4/meta/meta.go +++ b/internal/mp4/meta/meta.go @@ -1,3 +1,5 @@ +// Package meta contains playback metadata definition +// and static MPD generator. package meta import ( diff --git a/internal/mp4/segmenter/segmenter.go b/internal/mp4/segmenter/segmenter.go index 9ba0ffc..6e93770 100644 --- a/internal/mp4/segmenter/segmenter.go +++ b/internal/mp4/segmenter/segmenter.go @@ -15,7 +15,7 @@ import ( type BoxStoreFunc func(context.Context, string, mp4ff.BoxStructure, uint64) error -// Segmenter can segment progressive mp4 according to predefined segment duration. +// Segmenter segments progressive mp4 according to predefined segment duration. // Resulting segments are passed to boxStoreFunc, and it is up to user to define how to store them. // // Segmentation flow: @@ -26,6 +26,9 @@ type BoxStoreFunc func(context.Context, string, mp4ff.BoxStructure, uint64) erro // // This flow uses high-level functions, implemented in segmentation package. // +// Configured segment duration should be treated like 'preference'. +// Segmenter can increase it if necessary in order to make segments with equal sizes. +// // Tracks get new numbers since they will be in separate files. // For example, if input file has video track with ID 0 and audio with ID 1, // then in resulting video segments will be single track with ID 0, diff --git a/internal/tool/tool.go b/internal/tool/tool.go index 480afb3..c0d75ca 100644 --- a/internal/tool/tool.go +++ b/internal/tool/tool.go @@ -1,3 +1,40 @@ +// Package tool contains TUI client that can be used together with ViDi. +// It supports most of user-operations (excluding video watch). +// +// The tool (called 'vidit') is build using bubbletea ecosystem and +// has corresponding design patterns. +// +// Vidit consists of bubbletea program and number of screens, each of them +// responsible for displaying particular data and interacting +// with user about this data if necessary (e.g. menu screen, upload screen). +// +// At each particular time only one screen is active, and this is +// determined by vidit's state and user input (e.g. Vidit has no initial +// config and user should provide it). +// +// Bubbletea message flow is redirected to the active screen which is +// responsible for reacting to messages and rendering info using +// same bubbletea Update()-View() pattern. +// +// Vidit keeps internal state in order to track active user, +// login info, upload info, endpoint configuration. This allows +// to auto login during startup and resume interrupted upload. +// +// Each screen internally also has its own state which in most cases +// CANNOT BE REVERTED (main example is huh forms), which is why +// when vidit changes screens it instantiates new one and not reuses +// exising. +// +// During message processing screen may send outerControl message +// back to vidit Update() method. And this means vidit should decide +// what to do next (usual case is that user finished interacting with +// current screen, and it should be changed to something else). +// +// Vidit also has 'world-event' channel that is 'external world to active screen' +// communication channel. This channel is used when screen should track something +// happening asynchronously (independent from screen itself). This is mainly +// used for upload progress. +// //nolint:godot // false positives package tool @@ -25,7 +62,11 @@ var ErrAlreadyStarted = errors.New("already started") type ( // Tool is a vidit tui client side tool. - Tool struct { + // It has clients to interact with APIs, + // screens to display info to user and + // react to user input, and state that is + // persisted between restarts. + Tool struct { // a.k.a 'Vidit' userapi *userapi.Client videoapi videoapi.UsersideapiClient httpC *resty.Client @@ -84,6 +125,8 @@ func New() (*Tool, error) { } // NewWithConfig creates ViDi tui tool instance using specified config. +// Logger is instantiated with file output with hardcoded debug level. +// It could also run early initialization if specified (mainly used by tests). func NewWithConfig(cfg Config) (*Tool, error) { dir, err := initStateDir(cfg.EnforceHomeDir) if err != nil { @@ -257,6 +300,7 @@ func (t *Tool) Update(msg tea.Msg) (tea.Model, tea.Cmd) { t.resumingUpload = true t.mainFlowScreen = mainFlowScreenUpload // Spawn resume upload goroutine, it will produce world events. + // TODO Should add context (at the moment not so obvious where to take it from) go t.resumeUploadFileNotify(t.state.activeUserUnsafe().CurrentUpload) default: @@ -377,7 +421,7 @@ const ( type ( // screen is responsible for rendering set of elements - // for particular purpose, i.e 'main menu screen' or 'new user screen'. + // for a particular purpose, i.e 'main menu screen' or 'new user screen'. screen interface { init() tea.Cmd update(msg tea.Msg) (tea.Cmd, *outerControl) @@ -385,10 +429,10 @@ type ( name() string } - // outerControl is control structure returned by screen. + // outerControl is a control structure returned by screen. // It contains data necessary to continue screen cycle. outerControl struct { - data interface{} + data interface{} // TODO may be just 'outerControl any' ? } )