Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix to the panic due to the send on a closed channel #2804

Closed
wants to merge 11 commits into from
31 changes: 26 additions & 5 deletions ste/jobStatusManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,28 @@ func (jm *jobMgr) statusMgrClosed() bool {

/* These functions should not fail */
func (jm *jobMgr) SendJobPartCreatedMsg(msg JobPartCreatedMsg) {
jm.jstm.partCreated <- msg
if msg.IsFinalPart {
// Inform statusManager that this is all parts we've
close(jm.jstm.partCreated)
defer func() {
if recErr := recover(); recErr != nil {
jm.Log(common.LogError, "Cannot send message on closed channel")
}
}()
if jm.jstm.partCreated != nil { // Sends not allowed if channel is closed
jm.jstm.partCreated <- msg

if msg.IsFinalPart {
// Inform statusManager that this is all parts we've
close(jm.jstm.partCreated)
jm.jstm.partCreated = nil
}
}
}

func (jm *jobMgr) SendXferDoneMsg(msg xferDoneMsg) {
defer func() {
if recErr := recover(); recErr != nil {
jm.Log(common.LogError, "Cannot send message on channel")
}
}()
jm.jstm.xferDone <- msg
}

Expand Down Expand Up @@ -155,7 +169,13 @@ func (jm *jobMgr) handleStatusUpdateMessage() {
case <-jstm.listReq:
/* Display stats */
js.Timestamp = time.Now().UTC()
jstm.respChan <- *js
if jstm.respChan != nil {
select {
case jstm.respChan <- *js: // Send on the channel
default:
jm.Log(common.LogError, "Cannot send message on respChan")
}
}

// Reset the lists so that they don't keep accumulating and take up excessive memory
// There is no need to keep sending the same items over and over again
Expand All @@ -168,6 +188,7 @@ func (jm *jobMgr) handleStatusUpdateMessage() {
close(jstm.listReq)
jstm.listReq = nil
jstm.respChan = nil
wonwuakpa-msft marked this conversation as resolved.
Show resolved Hide resolved
jstm.statusMgrDone = nil
return
}
}
Expand Down
7 changes: 4 additions & 3 deletions ste/mgr-JobMgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,8 +424,8 @@ type AddJobPartArgs struct {

// These clients are valid if this fits the FromTo. i.e if
// we're uploading
SrcClient *common.ServiceClient
DstClient *common.ServiceClient
SrcClient *common.ServiceClient
DstClient *common.ServiceClient
SrcIsOAuth bool // true if source is authenticated via token

ScheduleTransfers bool
Expand All @@ -446,7 +446,7 @@ func (jm *jobMgr) AddJobPart2(args *AddJobPartArgs) IJobPartMgr {
cacheLimiter: jm.cacheLimiter,
fileCountLimiter: jm.fileCountLimiter,
closeOnCompletion: args.CompletionChan,
srcIsOAuth: args.SrcIsOAuth,
srcIsOAuth: args.SrcIsOAuth,
}
// If an existing plan MMF was supplied, re use it. Otherwise, init a new one.
if args.ExistingPlanMMF == nil {
Expand Down Expand Up @@ -716,6 +716,7 @@ func (jm *jobMgr) reportJobPartDoneHandler() {
if shouldComplete {
// Inform StatusManager that all parts are done.
close(jm.jstm.xferDone)

// Wait for all XferDone messages to be processed by statusManager. Front end
// depends on JobStatus to determine if we've to quit job. Setting it here without
// draining XferDone will make it report incorrect statistics.
Expand Down
Loading