Skip to content

Commit

Permalink
client session: 1. Start失败时,外层可以不调用Dispose 2. flv pull session 增加Star…
Browse files Browse the repository at this point in the history
…t, WithOnReadFlvTag函数替代Pull函数
  • Loading branch information
q191201771 committed Jun 25, 2024
1 parent b1dcb8e commit b5de3f9
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 8 deletions.
7 changes: 4 additions & 3 deletions app/demo/analyseflv/analyseflv.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,11 +176,12 @@ func main() {
}()

if strings.HasPrefix(in, "http") || strings.HasPrefix(in, "https") {
session := httpflv.NewPullSession()
// TODO(chef): [refactor] 统一 PullSession 和 FilePump 的回调格式 202211
err := session.Pull(in, func(tag httpflv.Tag) {
session := httpflv.NewPullSession().WithOnReadFlvTag(func(tag httpflv.Tag) {
handleTags(tag)
})

// TODO(chef): [refactor] 统一 PullSession 和 FilePump 的回调格式 202211
err := session.Start(in)
nazalog.Assert(nil, err)

// 临时测试一下主动关闭client session
Expand Down
51 changes: 46 additions & 5 deletions pkg/httpflv/client_pull_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ type PullSession struct {
conn connection.Connection
sessionStat base.BasicSessionStat

onReadFlvTag OnReadFlvTag

urlCtx base.UrlContext

disposeOnce sync.Once
Expand All @@ -67,15 +69,26 @@ func NewPullSession(modOptions ...ModPullSessionOption) *PullSession {
// OnReadFlvTag @param tag: 底层保证回调上来的Raw数据长度是完整的(但是不会分析Raw内部的编码数据)
type OnReadFlvTag func(tag Tag)

// Pull 阻塞直到和对端完成拉流前,握手部分的工作,或者发生错误。
// WithOnReadFlvTag
//
// @param onReadFlvTag 读取到 flv tag 数据时回调。回调结束后,PullSession 不会再使用这块 <tag> 数据。
func (session *PullSession) WithOnReadFlvTag(onReadFlvTag OnReadFlvTag) *PullSession {
session.onReadFlvTag = onReadFlvTag
return session
}

// Start 阻塞直到和对端完成拉流前,握手部分的工作,或者发生错误。
//
// 注意,握手指的是发送完HTTP Request,不包含接收任何数据,因为有的httpflv服务端,如果流不存在不会发送任何内容,此时我们也应该认为是握手完成了。
//
// @param rawUrl 支持如下两种格式(当然,关键点是对端支持):
// 1. `http://{domain}/{app_name}/{stream_name}.flv`
// 2. `http://{ip}/{domain}/{app_name}/{stream_name}.flv`
//
// @param onReadFlvTag 读取到 flv tag 数据时回调。回调结束后,PullSession 不会再使用这块 <tag> 数据。
func (session *PullSession) Start(rawUrl string) error {
return session.Pull(rawUrl, session.onReadFlvTag)
}

// Pull deprecated. use Start instead.
func (session *PullSession) Pull(rawUrl string, onReadFlvTag OnReadFlvTag) error {
Log.Debugf("[%s] pull. url=%s", session.UniqueKey(), rawUrl)

Expand All @@ -89,7 +102,12 @@ func (session *PullSession) Pull(rawUrl string, onReadFlvTag OnReadFlvTag) error
ctx, cancel = context.WithTimeout(context.Background(), time.Duration(session.option.PullTimeoutMs)*time.Millisecond)
}
defer cancel()
return session.pullContext(ctx, rawUrl, onReadFlvTag)

err := session.pullContext(ctx, rawUrl, onReadFlvTag)
if err != nil {
_ = session.dispose(err)
}
return err
}

// ---------------------------------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -160,6 +178,27 @@ func (session *PullSession) IsAlive() (readAlive, writeAlive bool) {

// ---------------------------------------------------------------------------------------------------------------------

func (session *PullSession) pull(rawUrl string) error {
Log.Debugf("[%s] pull. url=%s", session.UniqueKey(), rawUrl)

var (
ctx context.Context
cancel context.CancelFunc
)
if session.option.PullTimeoutMs == 0 {
ctx, cancel = context.WithCancel(context.Background())
} else {
ctx, cancel = context.WithTimeout(context.Background(), time.Duration(session.option.PullTimeoutMs)*time.Millisecond)
}
defer cancel()

err := session.pullContext(ctx, rawUrl, session.onReadFlvTag)
if err != nil {
_ = session.dispose(err)
}
return err
}

func (session *PullSession) pullContext(ctx context.Context, rawUrl string, onReadFlvTag OnReadFlvTag) error {
errChan := make(chan error, 1)
url := rawUrl
Expand Down Expand Up @@ -311,7 +350,9 @@ func (session *PullSession) runReadLoop(onReadFlvTag OnReadFlvTag) {
if err != nil {
return
}
onReadFlvTag(tag)
if onReadFlvTag != nil {
onReadFlvTag(tag)
}
}
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/rtmp/client_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,9 @@ func (s *ClientSession) Do(rawUrl string) error {
}

err := s.doContext(ctx)
if err != nil {
_ = s.dispose(err)
}

return err
}
Expand Down
1 change: 1 addition & 0 deletions pkg/rtsp/client_pull_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func (session *PullSession) WithOnDescribeResponse(onDescribeResponse func()) *P
func (session *PullSession) Pull(rawUrl string) error {
Log.Debugf("[%s] pull. url=%s", session.UniqueKey(), rawUrl)
if err := session.cmdSession.Do(rawUrl); err != nil {
_ = session.dispose(err)
return err
}

Expand Down
1 change: 1 addition & 0 deletions pkg/rtsp/client_push_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func (session *PushSession) Push(rawUrl string, sdpCtx sdp.LogicContext) error {
session.cmdSession.InitWithSdp(sdpCtx)
session.baseOutSession.InitWithSdp(sdpCtx)
if err := session.cmdSession.Do(rawUrl); err != nil {
_ = session.dispose(err)
return err
}

Expand Down

0 comments on commit b5de3f9

Please sign in to comment.