From aaeea38d0210c448928f906bfbdd29a297ba7369 Mon Sep 17 00:00:00 2001 From: Le Zhang Date: Tue, 10 Sep 2024 13:27:20 -0400 Subject: [PATCH] Issue open-horizon#4146 - Bug: Searching business policies will stop for all orgs if one org issues an error; need to have remaining orgs still searched Signed-off-by: Le Zhang --- agreementbot/node_search.go | 48 +++++++++++++++++++++++++++++++------ config/config.go | 9 ++++++- config/constants.go | 3 +++ 3 files changed, 52 insertions(+), 8 deletions(-) diff --git a/agreementbot/node_search.go b/agreementbot/node_search.go index c4a868453..01006fc49 100644 --- a/agreementbot/node_search.go +++ b/agreementbot/node_search.go @@ -27,7 +27,9 @@ type NodeSearch struct { ec exchange.ExchangeContext msgs chan events.Message // Outgoing internal event messages are placed here. nextScanIntervalS uint64 // The interval between scans when there are changes in the system. It allows the system to process existing work before injecting new agreements. + errRecanIntervalS uint64 // The interval between scans if error occured during last scan fullRescanIntervalS uint64 // The interval between scans when there are NOT changes in the system. This is a safety net in case changes are missed. + lastSearchHasErr bool // Indicate there is an error happened during prvious round of node search lastSearchComplete bool lastSearchTime uint64 searchThread chan bool @@ -44,7 +46,9 @@ type NodeSearch struct { func NewNodeSearch() *NodeSearch { ns := &NodeSearch{ nextScanIntervalS: 0, + errRecanIntervalS: 0, fullRescanIntervalS: 0, + lastSearchHasErr: false, lastSearchComplete: true, lastSearchTime: 0, searchThread: make(chan bool, 10), @@ -64,6 +68,7 @@ func (n *NodeSearch) Init(db persistence.AgbotDatabase, pm *policy.PolicyManager n.msgs = msgs n.ec = ec n.nextScanIntervalS = cfg.AgreementBot.NewContractIntervalS + n.errRecanIntervalS = cfg.GetAgbotErrReschanInterval() n.fullRescanIntervalS = cfg.GetAgbotFullRescan() n.batchSize = cfg.GetAgbotAgreementBatchSize() n.activeDeviceTimeoutS = cfg.AgreementBot.ActiveDeviceTimeoutS @@ -111,6 +116,26 @@ func (n *NodeSearch) IsRescanNeeded() bool { return n.rescanNeeded } +// Indicate there is a error happened during last scan +func (n *NodeSearch) setLastSearchHasErr() { + n.rescanLock.Lock() + defer n.rescanLock.Unlock() + n.lastSearchHasErr = true +} + +func (n *NodeSearch) UnsetLastSearchHasErr() { + n.rescanLock.Lock() + defer n.rescanLock.Unlock() + n.lastSearchHasErr = false +} + +// Check if there is an error happened during last node scan. This function is thread safe. +func (n *NodeSearch) LastSearchHasErr() bool { + n.rescanLock.Lock() + defer n.rescanLock.Unlock() + return n.lastSearchHasErr +} + // This is the main driving function in this object. It will initiate a node scan if needed, using an exiting search session or obtain a new one if needed. // The actual processing of a node scan for all policies and patterns is actually performed on a sub-thread. This function also also handles updating // itself if a previous scan has completed since the last time this method was called. @@ -136,7 +161,7 @@ func (n *NodeSearch) Scan() { go n.findAndMakeAgreements() } - // If changes in the system have occurred such that a rescan is needed, start a scan now. + // If changes in the system have occurred such that a rescan is needed, start a scan now. nextScanIntervalS is set to 1 by default if n.lastSearchComplete && n.IsRescanNeeded() && ((uint64(time.Now().Unix()) - n.lastSearchTime) >= uint64(n.nextScanIntervalS)) { n.lastSearchTime = uint64(time.Now().Unix()) glog.V(3).Infof(AWlogString("Polling Exchange")) @@ -145,6 +170,14 @@ func (n *NodeSearch) Scan() { go n.findAndMakeAgreements() } + // If error happens and rescan is needed, start a scan + if n.lastSearchComplete && n.LastSearchHasErr() && ((uint64(time.Now().Unix()) - n.lastSearchTime) >= uint64(n.errRecanIntervalS)) { + n.lastSearchTime = uint64(time.Now().Unix()) + glog.V(3).Infof(AWlogString("Polling Exchange (recover)")) + n.lastSearchComplete = false + n.UnsetLastSearchHasErr() + go n.findAndMakeAgreements() + } } // Go through all the patterns and deployment polices and make agreements. This function runs on a sub-thread of the agbot @@ -155,6 +188,10 @@ func (n *NodeSearch) findAndMakeAgreements() { glog.Errorf(AWlogString(fmt.Sprintf("unable to dump search session records, error: %v", err))) } + if n.LastSearchHasErr() { + n.UnsetLastSearchHasErr() + } + // Errors encountered during the search will cause the next set of searches to be performed with the same changedSince // time and the same search session. searchError := false @@ -201,7 +238,7 @@ func (n *NodeSearch) findAndMakeAgreements() { _, polName := cutil.SplitOrgSpecUrl(consumerPolicy.Header.Name) // Get the hash of the business policy entry - pBE_hash := string(pBE.Hash) + pBE_hash := org + string(pBE.Hash) //Check to see if we have already searched this policy... makes sure we check other policies before circling back and repeating re-searching ones we already did _, ok := n.completedSearches[pBE_hash] // Use the hash since the policy may get updated which would change the hash but not the name. Do want to search changed/new policies @@ -210,6 +247,7 @@ func (n *NodeSearch) findAndMakeAgreements() { if lastPage, err := n.searchNodesAndMakeAgreements(&consumerPolicy, org, polName, pBE.Updated); err != nil { // Dont move the changed since time forward since there was an error. searchError = true + glog.Errorf(AWlogString(fmt.Sprintf("received error searching for nodes under org %v: %v", org, err))) break } else if !lastPage { // The search returned a large number of results that need to be processed. Let the system work on them @@ -229,14 +267,11 @@ func (n *NodeSearch) findAndMakeAgreements() { } } - if searchError { - break - } } // Done scanning all nodes across all policies, and no errors were encountered. if searchError { - n.SetRescanNeeded() + n.setLastSearchHasErr() } if doClearSearchedMap { @@ -507,7 +542,6 @@ func (n *NodeSearch) searchExchange(pol *policy.Policy, polOrg string, polName s for { glog.V(3).Infof(AWlogString(fmt.Sprintf("searching %v with %v", pol.Header.Name, ser))) - // Invoke the exchange and return the device list or any hard errors that occur. resp, err := exchange.GetHTTPAgbotPolicyNodeSearchHandler(n.ec)(&ser, polOrg, polName) if err != nil { diff --git a/config/config.go b/config/config.go index 8ebd80967..af8b9f8bb 100644 --- a/config/config.go +++ b/config/config.go @@ -126,6 +126,7 @@ type AGConfig struct { AgreementQueueSize uint64 // The agreement bot work queue max size. MessageQueueScale float64 // Scaling factor applied to the AgreementQueueSize when determining how deep to keep the queues. QueueHistorySize int // The number of statistics records to retain in the prioritized queue history. + ErrRescanS uint64 // The number of seconds between rescan if error occurs from last rescan FullRescanS uint64 // The number of seconds between policy scans when there have been no changes reported by the exchange. MaxExchangeChanges int // The maximum number of exchange changes to request on a given call the exchange /changes API. RetryLookBackWindow uint64 // The time window (in seconds) used by the agbot to look backward in time for node changes when node agreements are retried. @@ -222,6 +223,10 @@ func (c *HorizonConfig) GetAgbotQueueHistorySize() int { return c.AgreementBot.QueueHistorySize } +func (c *HorizonConfig) GetAgbotErrReschanInterval() uint64 { + return c.AgreementBot.ErrRescanS +} + func (c *HorizonConfig) GetAgbotFullRescan() uint64 { return c.AgreementBot.FullRescanS } @@ -397,6 +402,7 @@ func Read(file string) (*HorizonConfig, error) { AgreementQueueSize: AgbotAgreementQueueSize_DEFAULT, MessageQueueScale: AgbotMessageQueueScale_DEFAULT, QueueHistorySize: AgbotQueueHistorySize_DEFAULT, + ErrRescanS: AgbotErrRescan_DEFAULT, FullRescanS: AgbotFullRescan_DEFAULT, MaxExchangeChanges: AgbotMaxChanges_DEFAULT, RetryLookBackWindow: AgbotRetryLookBackWindow_DEFAULT, @@ -585,6 +591,7 @@ func (agc *AGConfig) String() string { ", MessageQueueScale: %v"+ ", QueueHistorySize: %v"+ ", FullRescanS: %v"+ + ", ErrRescanS: %v"+ ", MaxExchangeChanges: %v"+ ", RetryLookBackWindow: %v"+ ", PolicySearchOrder: %v"+ @@ -596,7 +603,7 @@ func (agc *AGConfig) String() string { mask, agc.DVPrefix, agc.ActiveDeviceTimeoutS, agc.ExchangeMessageTTL, agc.MessageKeyPath, mask, agc.APIListen, agc.SecureAPIListenHost, agc.SecureAPIListenPort, agc.SecureAPIServerCert, agc.SecureAPIServerKey, agc.PurgeArchivedAgreementHours, agc.CheckUpdatedPolicyS, agc.CSSURL, agc.CSSSSLCert, agc.CSSDestinationBatchSize, agc.AgreementBatchSize, - agc.AgreementQueueSize, agc.MessageQueueScale, agc.QueueHistorySize, agc.FullRescanS, agc.MaxExchangeChanges, + agc.AgreementQueueSize, agc.MessageQueueScale, agc.QueueHistorySize, agc.FullRescanS, agc.ErrRescanS, agc.MaxExchangeChanges, agc.RetryLookBackWindow, agc.PolicySearchOrder, agc.Vault) } diff --git a/config/constants.go b/config/constants.go index 52a4fd516..eee3c5f2e 100644 --- a/config/constants.go +++ b/config/constants.go @@ -111,6 +111,9 @@ const AgbotMessageQueueScale_DEFAULT = 33.0 // The default number of prioritized queue history records to keep before aging out the old ones. const AgbotQueueHistorySize_DEFAULT = 30 +// The default full rescan interval if error happens during the node search +const AgbotErrRescan_DEFAULT = 15 + // The default full rescan interval const AgbotFullRescan_DEFAULT = 600