Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: GROW-1241-health-command-single-provider-arg #1475

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
6 changes: 5 additions & 1 deletion protocol/monitoring/alerting.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,11 @@ func (al *Alerting) ProvidersAlerts(healthResults *HealthResults) {
latestBlock := healthResults.LatestBlocks[specId]
if latestBlock > data.Block {
gap := latestBlock - data.Block
timeGap := time.Duration(gap*healthResults.Specs[specId].AverageBlockTime) * time.Millisecond
specHealthResult, ok := healthResults.Specs[specId]
if !ok {
utils.LavaFormatFatal("Invalid specid - missing in healthResults", nil, utils.Attribute{Key: "specId", Value: specId})
}
timeGap := time.Duration(gap*specHealthResult.AverageBlockTime) * time.Millisecond
if timeGap > al.allowedTimeGapVsReference {
attrs = append(attrs, AlertAttribute{entity: provider, data: fmt.Sprintf("block gap: %s/%s", utils.StrValue(data.Block), utils.StrValue(latestBlock))})
}
Expand Down
112 changes: 107 additions & 5 deletions protocol/monitoring/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ func RunHealth(ctx context.Context,
consumerEndpoints []*HealthRPCEndpoint,
referenceEndpoints []*HealthRPCEndpoint,
prometheusListenAddr string,
resultsPostGUID string,
singleProviderSpecsInterfacesData map[string][]string,
) (*HealthResults, error) {
specQuerier := spectypes.NewQueryClient(clientCtx)
healthResults := &HealthResults{
Expand All @@ -89,6 +91,8 @@ func RunHealth(ctx context.Context,
UnhealthyProviders: map[LavaEntity]string{},
UnhealthyConsumers: map[LavaEntity]string{},
Specs: map[string]*spectypes.Spec{},
ResultsPostGUID: resultsPostGUID,
ProviderAddresses: providerAddresses,
}
currentBlock := int64(0)
for i := 0; i < BasicQueryRetries; i++ {
Expand Down Expand Up @@ -125,6 +129,24 @@ func RunHealth(ctx context.Context,
getAllProviders = true
}

// we can limit the specs we are checking if those where given as arguments
var lookupSpecsFromArg []string

if singleProviderSpecsInterfacesData != nil {
lookupSpecsFromArg = make([]string, 0, len(singleProviderSpecsInterfacesData))
for k := range singleProviderSpecsInterfacesData {
k = strings.ToUpper(strings.TrimSpace(k))
if len(k) > 2 {
lookupSpecsFromArg = append(lookupSpecsFromArg, k)
}
}
if len(lookupSpecsFromArg) == 0 {
lookupSpecsFromArg = nil
}
} else {
lookupSpecsFromArg = nil
}

errCh := make(chan error, 1)

// get a list of all necessary specs for the test
Expand All @@ -150,6 +172,19 @@ func RunHealth(ctx context.Context,
for _, specInfo := range specsResp.ChainInfoList {
healthResults.setSpec(&spectypes.Spec{Index: specInfo.ChainID})
}
} else if len(singleProviderSpecsInterfacesData) > 0 && len(providerAddresses) > 1 {
for _, providerAddress := range providerAddresses {
for spec, apiInterfaces := range singleProviderSpecsInterfacesData {
healthResults.setSpec(&spectypes.Spec{Index: spec})
for _, apiInterface := range apiInterfaces {
healthResults.SetProviderData(LavaEntity{
Address: providerAddress,
SpecId: spec,
ApiInterface: apiInterface,
}, ReplyData{})
}
}
}
mikecot marked this conversation as resolved.
Show resolved Hide resolved
} else {
var wgproviders sync.WaitGroup
wgproviders.Add(len(providerAddresses))
Expand All @@ -163,11 +198,13 @@ func RunHealth(ctx context.Context,
Delegator: providerAddress,
WithPending: false,
})

cancel()
if err != nil || response == nil {
time.Sleep(QuerySleepTime)
continue
}

delegations := response.GetDelegations()
for _, delegation := range delegations {
if delegation.Provider == providerAddress {
Expand Down Expand Up @@ -267,10 +304,24 @@ func RunHealth(ctx context.Context,
}

for _, providerEntry := range response.StakeEntry {
if len(providerAddresses) > 0 {
found := false
for _, address := range providerAddresses {
if address == providerEntry.Address {
found = true
break
}
}
if !found {
continue
}
}

providerKey := LavaEntity{
Address: providerEntry.Address,
SpecId: specId,
}

apiInterfaces := chainIdToApiInterfaces[specId]
// just to check if this is a provider we need to check we need one of the apiInterfaces
if len(apiInterfaces) == 0 {
Expand Down Expand Up @@ -303,11 +354,44 @@ func RunHealth(ctx context.Context,
}
}
}
// get provider stake entries
for specId := range healthResults.getSpecs() {
go processSpecProviders(specId)
// get provider stake entries for each spec or only for the ones given as arguments
if lookupSpecsFromArg != nil {
for specId := range healthResults.getSpecs() {
for _, arg := range lookupSpecsFromArg {
if arg == strings.ToUpper(specId) {
processSpecProviders(specId)
break
}
}
}
} else {
for specId := range healthResults.getSpecs() {
go processSpecProviders(specId)
}
}

wgspecs.Wait()

// check for mismtaches in the pairings query and the arguments
// This flow can be triggered with the following command:
// lavap test health health_all_providers.yml --node https://public-rpc.lavanet.xyz:443 --single-provider-address lava@1czgrha7ys2698xve2gz4xaccteplrzx8s9fh7e --post-results-guid 6IJN3OroilsAB030rXIeh3PeJbRpp5Wy --run-once-and-exit --post-results-skip-spec --single-provider-specs-interfaces-data '{"ARB1": ["grpc"] }' --log_level debug --post-results-address http://localhost:6510
if len(providerAddresses) > 0 && len(singleProviderSpecsInterfacesData) > 0 {
for _, address := range providerAddresses {
for specId, apiInterfaces := range singleProviderSpecsInterfacesData {
for _, apiInterface := range apiInterfaces {
lookupKey := LavaEntity{
Address: address,
SpecId: specId,
ApiInterface: apiInterface,
}
if _, ok := stakeEntries[lookupKey]; !ok {
healthResults.SetUnhealthyProvider(lookupKey, "no pairings found")
}
}
}
}
}
mikecot marked this conversation as resolved.
Show resolved Hide resolved

if len(errCh) > 0 {
return nil, utils.LavaFormatWarning("[-] processing providers entries", <-errCh)
}
Expand Down Expand Up @@ -543,11 +627,23 @@ func CheckProviders(ctx context.Context, clientCtx client.Context, healthResults
for _, endpoint := range providerEntry.Endpoints {
checkOneProvider := func(endpoint epochstoragetypes.Endpoint, apiInterface string, addon string, providerEntry epochstoragetypes.StakeEntry) (time.Duration, string, int64, error) {
cswp := lavasession.ConsumerSessionsWithProvider{}
relayerClientPt, conn, err := cswp.ConnectRawClientWithTimeout(ctx, endpoint.IPPORT)
if err != nil {

var relayerClientPt *pairingtypes.RelayerClient
var conn *grpc.ClientConn
var err error

for i := 0; i < 3; i++ {
relayerClientPt, conn, err = cswp.ConnectRawClientWithTimeout(ctx, endpoint.IPPORT)
if err == nil {
break
}
utils.LavaFormatDebug("failed connecting to provider endpoint", utils.LogAttr("error", err), utils.Attribute{Key: "apiInterface", Value: apiInterface}, utils.Attribute{Key: "addon", Value: addon}, utils.Attribute{Key: "chainID", Value: providerEntry.Chain}, utils.Attribute{Key: "network address", Value: endpoint.IPPORT})
}

if err != nil {
return 0, "", 0, err
}

defer conn.Close()
relayerClient := *relayerClientPt
guid := uint64(rand.Int63())
Expand Down Expand Up @@ -588,17 +684,23 @@ func CheckProviders(ctx context.Context, clientCtx client.Context, healthResults
}
return relayLatency, versions, probeResp.GetLatestBlock(), nil
}

endpointServices := endpoint.GetSupportedServices()
if len(endpointServices) == 0 {
utils.LavaFormatWarning("endpoint has no supported services", nil, utils.Attribute{Key: "endpoint", Value: endpoint})
}

for _, endpointService := range endpointServices {
providerKey := LavaEntity{
Address: providerEntry.Address,
SpecId: providerEntry.Chain,
ApiInterface: endpointService.ApiInterface,
}

probeLatency, version, latestBlockFromProbe, err := checkOneProvider(endpoint, endpointService.ApiInterface, endpointService.Addon, providerEntry)

utils.LavaFormatDebug("[+] checked provider", utils.LogAttr("endpoint", endpoint), utils.LogAttr("apiInterface", endpointService.ApiInterface), utils.LogAttr("addon", endpointService.Addon), utils.LogAttr("providerEntry", providerEntry))

if err != nil {
errMsg := prettifyProviderError(err)
healthResults.SetUnhealthyProvider(providerKey, errMsg)
Expand Down
117 changes: 83 additions & 34 deletions protocol/monitoring/health_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,34 +24,39 @@ import (
)

const (
allowedBlockTimeDefaultLag = 30 * time.Second
intervalDefaultDuration = 0 * time.Second
defaultCUPercentageThreshold = 0.2
defaultSubscriptionLeftDays = 10
defaultMaxProviderLatency = 200 * time.Millisecond
defaultAlertSuppressionInterval = 6 * time.Hour
DefaultSuppressionCountThreshold = 3
DisableAlertLogging = "disable-alert-logging"
maxProviderLatencyFlagName = "max-provider-latency"
subscriptionLeftTimeFlagName = "subscription-days-left-alert"
providerAddressesFlagName = "provider_addresses"
subscriptionAddressesFlagName = "subscription_addresses"
intervalFlagName = "interval"
consumerEndpointPropertyName = "consumer_endpoints"
referenceEndpointPropertyName = "reference_endpoints"
allowedBlockTimeLagFlagName = "allowed_time_lag"
queryRetriesFlagName = "query-retries"
alertingWebHookFlagName = "alert-webhook-url"
identifierFlagName = "identifier"
percentageCUFlagName = "cu-percent-threshold"
alertSuppressionIntervalFlagName = "alert-suppression-interval"
disableAlertSuppressionFlagName = "disable-alert-suppression"
SuppressionCountThresholdFlagName = "suppression-alert-count-threshold"
resultsPostAddressFlagName = "post-results-address"
AllProvidersFlagName = "all-providers"
AllProvidersMarker = "all"
ConsumerGrpcTLSFlagName = "consumer-grpc-tls"
allowInsecureConsumerDialingFlagName = "allow-insecure-consumer-dialing"
allowedBlockTimeDefaultLag = 30 * time.Second
intervalDefaultDuration = 0 * time.Second
defaultCUPercentageThreshold = 0.2
defaultSubscriptionLeftDays = 10
defaultMaxProviderLatency = 200 * time.Millisecond
defaultAlertSuppressionInterval = 6 * time.Hour
DefaultSuppressionCountThreshold = 3
DisableAlertLogging = "disable-alert-logging"
maxProviderLatencyFlagName = "max-provider-latency"
subscriptionLeftTimeFlagName = "subscription-days-left-alert"
providerAddressesFlagName = "provider_addresses"
subscriptionAddressesFlagName = "subscription_addresses"
intervalFlagName = "interval"
consumerEndpointPropertyName = "consumer_endpoints"
referenceEndpointPropertyName = "reference_endpoints"
allowedBlockTimeLagFlagName = "allowed_time_lag"
queryRetriesFlagName = "query-retries"
alertingWebHookFlagName = "alert-webhook-url"
identifierFlagName = "identifier"
percentageCUFlagName = "cu-percent-threshold"
alertSuppressionIntervalFlagName = "alert-suppression-interval"
disableAlertSuppressionFlagName = "disable-alert-suppression"
SuppressionCountThresholdFlagName = "suppression-alert-count-threshold"
resultsPostAddressFlagName = "post-results-address"
resultsPostGUIDFlagName = "post-results-guid"
resultsPostSkipSepcFlagName = "post-results-skip-spec"
AllProvidersFlagName = "all-providers"
AllProvidersMarker = "all"
ConsumerGrpcTLSFlagName = "consumer-grpc-tls"
allowInsecureConsumerDialingFlagName = "allow-insecure-consumer-dialing"
singleProviderAddressFlagName = "single-provider-address"
singleProviderSpecsInterfacesDataFlagName = "single-provider-specs-interfaces-data"
runOnceAndExitFlagName = "run-once-and-exit"
)

func ParseEndpoints(keyName string, viper_endpoints *viper.Viper) (endpoints []*HealthRPCEndpoint, err error) {
Expand Down Expand Up @@ -142,8 +147,31 @@ reference_endpoints:
prometheusListenAddr := viper.GetString(metrics.MetricsListenFlagName)
providerAddresses := viper.GetStringSlice(providerAddressesFlagName)
allProviders := viper.GetBool(AllProvidersFlagName)
if allProviders {
singleProvider := viper.GetString(singleProviderAddressFlagName)
if singleProvider != "" {
providerAddresses = []string{singleProvider}
utils.LavaFormatInfo("Health probe provider addresses set to a single provider address", utils.Attribute{Key: "provider", Value: singleProvider})
} else if allProviders {
providerAddresses = []string{AllProvidersMarker}
utils.LavaFormatInfo("Health probe provider addresses set to all")
}
singleProviderSpecsInterfacesRawData := viper.GetString(singleProviderSpecsInterfacesDataFlagName)
var singleProviderSpecsInterfacesData map[string][]string
if singleProviderSpecsInterfacesRawData != "" {
if singleProvider == "" {
utils.LavaFormatFatal("single provider address and single provider specs interfaces data must be set together", nil)
}
err := json.Unmarshal([]byte(singleProviderSpecsInterfacesRawData), &singleProviderSpecsInterfacesData)
if err != nil {
utils.LavaFormatFatal("Failed to parse singleProviderSpecsInterfacesDataFlagName as JSON", err)
}
if len(singleProviderSpecsInterfacesData) == 0 {
utils.LavaFormatFatal("singleProviderSpecsInterfacesData is empty", nil)
}
}
runOnceAndExit := viper.GetBool(runOnceAndExitFlagName)
if runOnceAndExit {
utils.LavaFormatInfo("Run once and exit flag set")
}
subscriptionAddresses := viper.GetStringSlice(subscriptionAddressesFlagName)
keyName := consumerEndpointPropertyName
Expand All @@ -167,6 +195,8 @@ reference_endpoints:
SuppressionCounterThreshold: viper.GetUint64(SuppressionCountThresholdFlagName),
}
resultsPostAddress := viper.GetString(resultsPostAddressFlagName)
resultsPostGUID := viper.GetString(resultsPostGUIDFlagName)
resultsPostSkipSepc := viper.GetBool(resultsPostSkipSepcFlagName)

alerting := NewAlerting(alertingOptions)
RunHealthCheck := func(ctx context.Context,
Expand All @@ -178,24 +208,36 @@ reference_endpoints:
prometheusListenAddr string,
) {
utils.LavaFormatInfo("[+] starting health run")
healthResult, err := RunHealth(ctx, clientCtx, subscriptionAddresses, providerAddresses, consumerEndpoints, referenceEndpoints, prometheusListenAddr)
healthResult, err := RunHealth(ctx, clientCtx, subscriptionAddresses, providerAddresses, consumerEndpoints, referenceEndpoints, prometheusListenAddr, resultsPostGUID, singleProviderSpecsInterfacesData)
if err != nil {
utils.LavaFormatError("[-] invalid health run", err)
if runOnceAndExit {
os.Exit(0)
}
healthMetrics.SetFailedRun(identifier)
} else {
if resultsPostAddress != "" {
if resultsPostSkipSepc {
healthResult.Specs = nil
}
jsonData, err := json.Marshal(healthResult)
if err == nil {
if err != nil {
utils.LavaFormatError("[-] failed marshaling results", err)
} else {
resp, err := http.Post(resultsPostAddress, "application/json", bytes.NewBuffer(jsonData))
if err != nil {
utils.LavaFormatError("[-] failed posting health results", err, utils.LogAttr("address", resultsPostAddress))
} else {
defer resp.Body.Close()
}
defer resp.Body.Close()
} else {
utils.LavaFormatError("[-] failed marshaling results", err)
}
}

utils.LavaFormatInfo("[+] completed health run")
if runOnceAndExit {
os.Exit(0)
}

healthMetrics.SetLatestBlockData(identifier, healthResult.FormatForLatestBlock())
alerting.CheckHealthResults(healthResult)
activeAlerts, unhealthy, healthy := alerting.ActiveAlerts()
Expand Down Expand Up @@ -241,12 +283,19 @@ reference_endpoints:
cmdTestHealth.Flags().String(alertingWebHookFlagName, "", "a url to post an alert to")
cmdTestHealth.Flags().String(metrics.MetricsListenFlagName, metrics.DisabledFlagOption, "the address to expose prometheus metrics (such as localhost:7779)")
cmdTestHealth.Flags().String(resultsPostAddressFlagName, "", "the address to send the raw results to")
cmdTestHealth.Flags().String(resultsPostGUIDFlagName, "", "a guid marker to add to the results posted to the results post address")
cmdTestHealth.Flags().Bool(resultsPostSkipSepcFlagName, false, "enable to send the results without the specs to the results post address")
cmdTestHealth.Flags().Duration(intervalFlagName, intervalDefaultDuration, "the interval duration for the health check, (defaults to 0s) if 0 runs once")
cmdTestHealth.Flags().Duration(allowedBlockTimeLagFlagName, allowedBlockTimeDefaultLag, "the amount of time one rpc can be behind the most advanced one")
cmdTestHealth.Flags().Uint64Var(&QueryRetries, queryRetriesFlagName, QueryRetries, "set the amount of max queries to send every health run to consumers and references")
cmdTestHealth.Flags().Bool(AllProvidersFlagName, false, "a flag to overwrite the provider addresses with all the currently staked providers")
cmdTestHealth.Flags().Bool(ConsumerGrpcTLSFlagName, true, "use tls configuration for grpc connections to your consumer")
cmdTestHealth.Flags().Bool(allowInsecureConsumerDialingFlagName, false, "used to test grpc, to allow insecure (self signed cert).")
cmdTestHealth.Flags().String(singleProviderAddressFlagName, "", "single provider address in bach32 to override config settings")
cmdTestHealth.Flags().String(singleProviderSpecsInterfacesDataFlagName, "", "a json of spec:[interfaces...] to make the single provider query faster")

cmdTestHealth.Flags().Bool(runOnceAndExitFlagName, false, "exit after first run.")

viper.BindPFlag(queryRetriesFlagName, cmdTestHealth.Flags().Lookup(queryRetriesFlagName)) // bind the flag
flags.AddQueryFlagsToCmd(cmdTestHealth)
common.AddRollingLogConfig(cmdTestHealth)
Expand Down
2 changes: 2 additions & 0 deletions protocol/monitoring/health_results.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ type HealthResults struct {
UnhealthyProviders map[LavaEntity]string `json:"unhealthyProviders,omitempty"`
UnhealthyConsumers map[LavaEntity]string `json:"unhealthyConsumers,omitempty"`
Specs map[string]*spectypes.Spec `json:"specs,omitempty"`
ResultsPostGUID string `json:"resultsGUID,omitempty"`
ProviderAddresses []string `json:"providerAddresses,omitempty"`
Lock sync.RWMutex `json:"-"`
}

Expand Down
Loading