Skip to content

Commit

Permalink
update stop logic in plugin_wrapper_v1
Browse files Browse the repository at this point in the history
  • Loading branch information
Assassin718 committed Sep 24, 2024
1 parent b675236 commit e0ad752
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 104 deletions.
109 changes: 56 additions & 53 deletions pluginmanager/plugin_wrapper_metric_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,10 +167,6 @@ func (p *MetricWrapperV1) AddRawLogWithContext(log *protocol.Log, ctx map[string
}

func (p *MetricWrapperV1) runPushNativeProcessQueueInternal() {
p.eventCached = make([]*protocol.LogEvent, 0, p.MaxCachedSize+1)
p.tagCached = make([]map[string]string, 0, p.MaxCachedSize+1)
p.ctxCached = make([]map[string]interface{}, 0, p.MaxCachedSize+1)
p.timer = time.NewTimer(p.PushNativeTimeout)
var event *pipeline.LogEventWithContext
isValidToPushNativeProcessQueue := true

Expand All @@ -183,61 +179,71 @@ func (p *MetricWrapperV1) runPushNativeProcessQueueInternal() {
p.eventCached = append(p.eventCached, event.LogEvent)
p.tagCached = append(p.tagCached, event.Tags)
p.ctxCached = append(p.ctxCached, event.Context)
if len(p.eventCached) < p.MaxCachedSize {
continue
if len(p.eventCached) >= p.MaxCachedSize {
isValidToPushNativeProcessQueue = p.pushNativeProcessQueue(5)
}
isValidToPushNativeProcessQueue = p.pushNativeProcessQueue(5)
case <-p.ShutdownCachedChan:
for len(p.LogsCachedChan) > 0 {
<-p.LogsCachedChan
for event = range p.LogsCachedChan {
p.eventCached = append(p.eventCached, event.LogEvent)
p.tagCached = append(p.tagCached, event.Tags)
p.ctxCached = append(p.ctxCached, event.Context)
}
endTime := time.Now().Add(time.Duration(30) * time.Second)
for {
if time.Now().After(endTime) || (len(p.eventCached) == 0 && len(p.pbBuffer) == 0) {
break
}
p.pushNativeProcessQueue(1)
time.Sleep(time.Duration(10) * time.Millisecond)
}
p.timer.Stop()
close(p.LogsCachedChan)
close(p.ShutdownCachedChan)
return
}
} else {
select {
case <-p.timer.C:
isValidToPushNativeProcessQueue = p.pushNativeProcessQueue(5)
case <-p.ShutdownCachedChan:
for len(p.LogsCachedChan) > 0 {
<-p.LogsCachedChan
for event = range p.LogsCachedChan {
p.eventCached = append(p.eventCached, event.LogEvent)
p.tagCached = append(p.tagCached, event.Tags)
p.ctxCached = append(p.ctxCached, event.Context)
}
endTime := time.Now().Add(time.Duration(30) * time.Second)
for {
if time.Now().After(endTime) || (len(p.eventCached) == 0 && len(p.pbBuffer) == 0) {
break
}
p.pushNativeProcessQueue(1)
time.Sleep(time.Duration(10) * time.Millisecond)
}
p.timer.Stop()
close(p.LogsCachedChan)
close(p.ShutdownCachedChan)
return
}
}
}

}

func (p *MetricWrapperV1) pushNativeProcessQueue(retryCnt int) bool {
if len(p.eventCached) == 0 {
if len(p.pbBuffer) == 0 && len(p.eventCached) == 0 {
return true
}

// create pipelineEventGroup and marshal to pbBuffer
pushSize := 0
if len(p.pbBuffer) == 0 {
// create pipelineEventGroup and marshal to pbBuffer
tag := p.tagCached[0]
ctx := p.ctxCached[0]
i := 1
for ; i < len(p.eventCached); i++ {
pushSize := 1
for ; pushSize < len(p.eventCached); pushSize++ {
same := true
for k, v := range p.tagCached[i] {
for k, v := range p.tagCached[pushSize] {
if tag[k] != v {
same = false
break
}
}
if !same || !reflect.DeepEqual(ctx, p.ctxCached[i]) {
if !same || !reflect.DeepEqual(ctx, p.ctxCached[pushSize]) {
break
}
}
pushSize = i
group, _ := helper.CreatePipelineEventGroupV1(p.eventCached[:pushSize], p.Tags, tag, ctx)
pbSize := group.Size()
if cap(p.pbBuffer) < pbSize {
Expand All @@ -249,37 +255,44 @@ func (p *MetricWrapperV1) pushNativeProcessQueue(retryCnt int) bool {
}
n, _ := group.MarshalTo(p.pbBuffer)
p.pbBuffer = p.pbBuffer[:n]

// clear eventCached, tagCached and ctxCached
for i := 0; i < pushSize; i++ {
helper.LogEventPool.Put(p.eventCached[i])
}
for i := pushSize; i < len(p.eventCached); i++ {
p.eventCached[i-pushSize] = p.eventCached[i]
p.tagCached[i-pushSize] = p.tagCached[i]
p.ctxCached[i-pushSize] = p.ctxCached[i]
}
p.eventCached = p.eventCached[:len(p.eventCached)-pushSize]
p.tagCached = p.tagCached[:len(p.tagCached)-pushSize]
p.ctxCached = p.ctxCached[:len(p.ctxCached)-pushSize]
}

// try to pushNativeProcessQueue
rst := 0
rst := -1
switch p.Input.GetMode() {
case pipeline.PUSH:
if retryCnt <= 0 {
for {
if logtail.IsValidToProcess(p.Config.ConfigName) {
if rst = logtail.PushQueue(p.Config.ConfigName, p.pbBuffer); rst == 0 {
break
}
}
time.Sleep(time.Duration(10) * time.Millisecond)
i := 0
for {
if retryCnt > 0 && i >= retryCnt {
break
}
} else {
for i := 0; i < retryCnt; i++ {
if logtail.IsValidToProcess(p.Config.ConfigName) {
if rst = logtail.PushQueue(p.Config.ConfigName, p.pbBuffer); rst == 0 {
break
}
if logtail.IsValidToProcess(p.Config.ConfigName) {
if rst = logtail.PushQueue(p.Config.ConfigName, p.pbBuffer); rst == 0 {
break
}
time.Sleep(time.Duration(10) * time.Millisecond)
}
time.Sleep(time.Duration(10) * time.Millisecond)
}
case pipeline.PULL:
logtail.PushQueue(p.Config.ConfigName, p.pbBuffer)
rst = 0
default:
}

// clear buffer and reset timer
// clear pbBuffer and reset timer
if !p.timer.Stop() {
select {
case <-p.timer.C:
Expand All @@ -290,16 +303,6 @@ func (p *MetricWrapperV1) pushNativeProcessQueue(retryCnt int) bool {
if rst != 0 {
return false
}
if pushSize != 0 {
i := 0
for ; i < pushSize; i++ {
helper.LogEventPool.Put(p.eventCached[i])
}
for ; i < len(p.eventCached); i++ {
p.eventCached[i-pushSize] = p.eventCached[i]
}
p.eventCached = p.eventCached[:len(p.eventCached)-pushSize]
}
p.pbBuffer = p.pbBuffer[:0]
return true
}
108 changes: 57 additions & 51 deletions pluginmanager/plugin_wrapper_service_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ func (p *ServiceWrapperV1) Run(cc *pipeline.AsyncControl) {
if p.Config.GlobalConfig.GoInputToNativeProcessor {
p.LogsCachedChan = make(chan *pipeline.LogEventWithContext, 10)
p.ShutdownCachedChan = make(chan struct{})
p.eventCached = make([]*protocol.LogEvent, 0, p.MaxCachedSize+1)
p.tagCached = make([]map[string]string, 0, p.MaxCachedSize+1)
p.ctxCached = make([]map[string]interface{}, 0, p.MaxCachedSize+1)
p.eventCached = make([]*protocol.LogEvent, 0, p.MaxCachedSize+10)
p.tagCached = make([]map[string]string, 0, p.MaxCachedSize+10)
p.ctxCached = make([]map[string]interface{}, 0, p.MaxCachedSize+10)
p.timer = time.NewTimer(p.PushNativeTimeout)
go p.runPushNativeProcessQueueInternal()
}
Expand Down Expand Up @@ -175,12 +175,22 @@ func (p *ServiceWrapperV1) runPushNativeProcessQueueInternal() {
p.eventCached = append(p.eventCached, event.LogEvent)
p.tagCached = append(p.tagCached, event.Tags)
p.ctxCached = append(p.ctxCached, event.Context)
if len(p.eventCached) < p.MaxCachedSize {
continue
if len(p.eventCached) >= p.MaxCachedSize {
isValidToPushNativeProcessQueue = p.pushNativeProcessQueue(5)
}
isValidToPushNativeProcessQueue = p.pushNativeProcessQueue(5)
case <-p.ShutdownCachedChan:
for event = range p.LogsCachedChan {
p.eventCached = append(p.eventCached, event.LogEvent)
p.tagCached = append(p.tagCached, event.Tags)
p.ctxCached = append(p.ctxCached, event.Context)
}
endTime := time.Now().Add(time.Duration(30) * time.Second)
for {
if time.Now().After(endTime) || (len(p.eventCached) == 0 && len(p.pbBuffer) == 0) {
break
}
p.pushNativeProcessQueue(1)
time.Sleep(time.Duration(10) * time.Millisecond)
}
return
}
Expand All @@ -190,38 +200,46 @@ func (p *ServiceWrapperV1) runPushNativeProcessQueueInternal() {
isValidToPushNativeProcessQueue = p.pushNativeProcessQueue(5)
case <-p.ShutdownCachedChan:
for event = range p.LogsCachedChan {
p.eventCached = append(p.eventCached, event.LogEvent)
p.tagCached = append(p.tagCached, event.Tags)
p.ctxCached = append(p.ctxCached, event.Context)
}
endTime := time.Now().Add(time.Duration(30) * time.Second)
for {
if time.Now().After(endTime) || (len(p.eventCached) == 0 && len(p.pbBuffer) == 0) {
break
}
p.pushNativeProcessQueue(1)
time.Sleep(time.Duration(10) * time.Millisecond)
}
return
}
}
}

}

func (p *ServiceWrapperV1) pushNativeProcessQueue(retryCnt int) bool {
if len(p.eventCached) == 0 {
if len(p.pbBuffer) == 0 && len(p.eventCached) == 0 {
return true
}

// create pipelineEventGroup and marshal to pbBuffer
pushSize := 0
if len(p.pbBuffer) == 0 {
// create pipelineEventGroup and marshal to pbBuffer
tag := p.tagCached[0]
ctx := p.ctxCached[0]
i := 1
for ; i < len(p.eventCached); i++ {
pushSize := 1
for ; pushSize < len(p.eventCached); pushSize++ {
same := true
for k, v := range p.tagCached[i] {
for k, v := range p.tagCached[pushSize] {
if tag[k] != v {
same = false
break
}
}
if !same || !reflect.DeepEqual(ctx, p.ctxCached[i]) {
if !same || !reflect.DeepEqual(ctx, p.ctxCached[pushSize]) {
break
}
}
pushSize = i
group, _ := helper.CreatePipelineEventGroupV1(p.eventCached[:pushSize], p.Tags, tag, ctx)
pbSize := group.Size()
if cap(p.pbBuffer) < pbSize {
Expand All @@ -233,42 +251,44 @@ func (p *ServiceWrapperV1) pushNativeProcessQueue(retryCnt int) bool {
}
n, _ := group.MarshalTo(p.pbBuffer)
p.pbBuffer = p.pbBuffer[:n]

// clear eventCached, tagCached and ctxCached
for i := 0; i < pushSize; i++ {
helper.LogEventPool.Put(p.eventCached[i])
}
for i := pushSize; i < len(p.eventCached); i++ {
p.eventCached[i-pushSize] = p.eventCached[i]
p.tagCached[i-pushSize] = p.tagCached[i]
p.ctxCached[i-pushSize] = p.ctxCached[i]
}
p.eventCached = p.eventCached[:len(p.eventCached)-pushSize]
p.tagCached = p.tagCached[:len(p.tagCached)-pushSize]
p.ctxCached = p.ctxCached[:len(p.ctxCached)-pushSize]
}

// try to pushNativeProcessQueue
rst := 0
rst := -1
switch p.Input.GetMode() {
case pipeline.PUSH:
if retryCnt <= 0 {
for {
if logtail.IsValidToProcess(p.Config.ConfigName) {
if rst = logtail.PushQueue(p.Config.ConfigName, p.pbBuffer); rst == 0 {
break
}
}
time.Sleep(time.Duration(10) * time.Millisecond)
i := 0
for {
if retryCnt > 0 && i >= retryCnt {
break
}
} else {
for i := 0; i < retryCnt; i++ {
if logtail.IsValidToProcess(p.Config.ConfigName) {
if rst = logtail.PushQueue(p.Config.ConfigName, p.pbBuffer); rst == 0 {
break
}
if logtail.IsValidToProcess(p.Config.ConfigName) {
if rst = logtail.PushQueue(p.Config.ConfigName, p.pbBuffer); rst == 0 {
break
}
time.Sleep(time.Duration(10) * time.Millisecond)
}
time.Sleep(time.Duration(10) * time.Millisecond)
}
// if logtail.IsValidToProcess(p.Config.ConfigName) {
// rst = logtail.PushQueue(p.Config.ConfigName, p.pbBuffer)
// } else {
// rst = -1
// }
case pipeline.PULL:
logtail.PushQueue(p.Config.ConfigName, p.pbBuffer)
rst = 0
default:
}

// clear buffer and reset timer
// clear pbBuffer and reset timer
if !p.timer.Stop() {
select {
case <-p.timer.C:
Expand All @@ -279,20 +299,6 @@ func (p *ServiceWrapperV1) pushNativeProcessQueue(retryCnt int) bool {
if rst != 0 {
return false
}
if pushSize != 0 {
i := 0
for ; i < pushSize; i++ {
helper.LogEventPool.Put(p.eventCached[i])
}
for ; i < len(p.eventCached); i++ {
p.eventCached[i-pushSize] = p.eventCached[i]
p.tagCached[i-pushSize] = p.tagCached[i]
p.ctxCached[i-pushSize] = p.ctxCached[i]
}
p.eventCached = p.eventCached[:len(p.eventCached)-pushSize]
p.tagCached = p.tagCached[:len(p.tagCached)-pushSize]
p.ctxCached = p.ctxCached[:len(p.ctxCached)-pushSize]
}
p.pbBuffer = p.pbBuffer[:0]
return true
}

0 comments on commit e0ad752

Please sign in to comment.