Skip to content

Commit

Permalink
Merge pull request #1214 from threefoldtech/main_features_indexer
Browse files Browse the repository at this point in the history
init the features indexer
  • Loading branch information
Omarabdul3ziz authored Oct 2, 2024
2 parents 9f4d31b + 59429ea commit ebb7050
Show file tree
Hide file tree
Showing 28 changed files with 397 additions and 23 deletions.
4 changes: 4 additions & 0 deletions grid-proxy/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ db-fill: ## Fill the database with a randomly generated data
--reset \
--seed 13

db-update:
@echo "Updating node uptimes"
@psql postgresql://postgres:postgres@$(PQ_HOST):5432/tfgrid-graphql < ./internal/explorer/db/helpers.sql

db-dump: ## Load a dump of the database (Args: `p=<path/to/file.sql`)
@docker cp $(p) postgres:/dump.sql;
@docker exec $(PQ_CONTAINER) bash -c "psql -U postgres -d tfgrid-graphql < ./dump.sql"
Expand Down
14 changes: 14 additions & 0 deletions grid-proxy/cmds/proxy_server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ type flags struct {
ipv6IndexerIntervalMins uint
workloadsIndexerNumWorkers uint
workloadsIndexerIntervalMins uint
featuresIndexerNumWorkers uint
featuresIndexerIntervalMins uint
}

func main() {
Expand Down Expand Up @@ -107,6 +109,8 @@ func main() {
flag.UintVar(&f.ipv6IndexerNumWorkers, "ipv6-indexer-workers", 10, "number of workers checking on node having ipv6")
flag.UintVar(&f.workloadsIndexerIntervalMins, "workloads-indexer-interval", 60, "node workloads check interval in min")
flag.UintVar(&f.workloadsIndexerNumWorkers, "workloads-indexer-workers", 10, "number of workers checking on node workloads number")
flag.UintVar(&f.featuresIndexerIntervalMins, "features-indexer-interval", 60*24, "node features check interval in min")
flag.UintVar(&f.featuresIndexerNumWorkers, "features-indexer-workers", 10, "number of workers checking on node supported features")
flag.Parse()

// shows version and exit
Expand Down Expand Up @@ -155,6 +159,7 @@ func main() {
indexerIntervals["workloads"] = f.workloadsIndexerIntervalMins
indexerIntervals["ipv6"] = f.ipv6IndexerIntervalMins
indexerIntervals["speed"] = f.speedIndexerIntervalMins
indexerIntervals["features"] = f.featuresIndexerIntervalMins
} else {
log.Info().Msg("Indexers did not start")
}
Expand Down Expand Up @@ -224,6 +229,15 @@ func startIndexers(ctx context.Context, f flags, db db.Database, rpcRmbClient *p
f.workloadsIndexerNumWorkers,
)
wlNumIdx.Start(ctx)

featIdx := indexer.NewIndexer[types.NodeFeatures](
indexer.NewFeatureWork(f.featuresIndexerIntervalMins),
"features",
db,
rpcRmbClient,
f.featuresIndexerNumWorkers,
)
featIdx.Start(ctx)
}

func app(s *http.Server, f flags) error {
Expand Down
24 changes: 24 additions & 0 deletions grid-proxy/docs/docs.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 24 additions & 0 deletions grid-proxy/docs/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,12 @@
"name": "node_certified",
"in": "query"
},
{
"type": "string",
"description": "filter farms with list of supported features on its nods",
"name": "node_features",
"in": "query"
},
{
"type": "string",
"description": "farm country",
Expand Down Expand Up @@ -1190,6 +1196,12 @@
"description": "get nodes with price smaller than this",
"name": "price_max",
"in": "query"
},
{
"type": "string",
"description": "filter nodes with list of supported features",
"name": "features",
"in": "query"
}
],
"responses": {
Expand Down Expand Up @@ -1829,6 +1841,12 @@
"farmingPolicyId": {
"type": "integer"
},
"features": {
"type": "array",
"items": {
"type": "string"
}
},
"gpus": {
"type": "array",
"items": {
Expand Down Expand Up @@ -2027,6 +2045,12 @@
"farmingPolicyId": {
"type": "integer"
},
"features": {
"type": "array",
"items": {
"type": "string"
}
},
"gpus": {
"type": "array",
"items": {
Expand Down
16 changes: 16 additions & 0 deletions grid-proxy/docs/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,10 @@ definitions:
type: string
farmingPolicyId:
type: integer
features:
items:
type: string
type: array
gpus:
items:
$ref: '#/definitions/types.NodeGPU'
Expand Down Expand Up @@ -287,6 +291,10 @@ definitions:
type: string
farmingPolicyId:
type: integer
features:
items:
type: string
type: array
gpus:
items:
$ref: '#/definitions/types.NodeGPU'
Expand Down Expand Up @@ -748,6 +756,10 @@ paths:
in: query
name: node_certified
type: boolean
- description: filter farms with list of supported features on its nods
in: query
name: node_features
type: string
- description: farm country
in: query
name: country
Expand Down Expand Up @@ -1252,6 +1264,10 @@ paths:
in: query
name: price_max
type: string
- description: filter nodes with list of supported features
in: query
name: features
type: string
produces:
- application/json
responses:
Expand Down
9 changes: 9 additions & 0 deletions grid-proxy/internal/explorer/converters.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,14 @@ func nodeFromDBNode(info db.Node) types.Node {
GPUs: info.Gpus,
PriceUsd: math.Round(info.PriceUsd*1000) / 1000,
FarmFreeIps: info.FarmFreeIps,
Features: info.Features,
}
node.Status = nodestatus.DecideNodeStatus(node.Power, node.UpdatedAt)
node.Dedicated = info.FarmDedicated || info.NodeContractsCount == 0 || info.Renter != 0 || info.ExtraFee > 0
// have an empty list instead of null in the json response
if node.Features == nil {
node.Features = []string{}
}
return node
}

Expand Down Expand Up @@ -163,9 +168,13 @@ func nodeWithNestedCapacityFromDBNode(info db.Node) types.NodeWithNestedCapacity
GPUs: info.Gpus,
PriceUsd: math.Round(info.PriceUsd*1000) / 1000,
FarmFreeIps: info.FarmFreeIps,
Features: info.Features,
}
node.Status = nodestatus.DecideNodeStatus(node.Power, node.UpdatedAt)
node.Dedicated = info.FarmDedicated || info.NodeContractsCount == 0 || info.Renter != 0 || info.ExtraFee > 0
if node.Features == nil {
node.Features = []string{}
}
return node
}

Expand Down
24 changes: 24 additions & 0 deletions grid-proxy/internal/explorer/db/helpers.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
-- THIS IS JUST FOR DEBUGGING PURPOSES


-- by the time, updated_at gets outdated which break some functionalities
-- that depends on node status. this help update the nodes that
-- got updated in the last period to now.
CREATE OR REPLACE FUNCTION update_node_uptimes()
RETURNS void AS $$
DECLARE
last_updated_at INT;
BEGIN
-- Step 1: Get the latest uptime report's updated_at timestamp
SELECT updated_at
INTO last_updated_at
FROM node
ORDER BY updated_at DESC
LIMIT 1;

-- Step 2: Update nodes where updated_at is > last_updated_at - 39 minutes
UPDATE node
SET updated_at = CAST(EXTRACT(epoch FROM NOW()) AS INT)
WHERE updated_at > last_updated_at - 2340;
END;
$$ LANGUAGE plpgsql;
8 changes: 8 additions & 0 deletions grid-proxy/internal/explorer/db/indexer_calls.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,11 @@ func (p *PostgresDatabase) UpsertNodeWorkloads(ctx context.Context, workloads []
}
return p.gormDB.WithContext(ctx).Table("node_workloads").Clauses(conflictClause).Create(&workloads).Error
}

func (p *PostgresDatabase) UpsertNodeFeatures(ctx context.Context, features []types.NodeFeatures) error {
conflictClause := clause.OnConflict{
Columns: []clause.Column{{Name: "node_twin_id"}},
DoUpdates: clause.AssignmentColumns([]string{"features", "updated_at"}),
}
return p.gormDB.WithContext(ctx).Table("node_features").Clauses(conflictClause).Create(&features).Error
}
42 changes: 33 additions & 9 deletions grid-proxy/internal/explorer/db/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package db

import (
"context"
"encoding/json"
"fmt"
"strings"

Expand Down Expand Up @@ -132,6 +133,9 @@ func (d *PostgresDatabase) GetLastUpsertsTimestamp() (types.IndexersState, error
if res := d.gormDB.Table("node_workloads").Select("updated_at").Where("updated_at IS NOT NULL").Order("updated_at DESC").Limit(1).Scan(&report.Workloads.UpdatedAt); res.Error != nil {
return report, errors.Wrap(res.Error, "couldn't get workloads last updated_at")
}
if res := d.gormDB.Table("node_features").Select("updated_at").Where("updated_at IS NOT NULL").Order("updated_at DESC").Limit(1).Scan(&report.Features.UpdatedAt); res.Error != nil {
return report, errors.Wrap(res.Error, "couldn't get features last updated_at")
}
return report, nil
}

Expand All @@ -143,6 +147,7 @@ func (d *PostgresDatabase) Initialize() error {
&types.Speed{},
&types.HasIpv6{},
&types.NodesWorkloads{},
&types.NodeFeatures{},
); err != nil {
return errors.Wrap(err, "failed to migrate indexer tables")
}
Expand Down Expand Up @@ -363,6 +368,7 @@ func (d *PostgresDatabase) nodeTableQuery(ctx context.Context, filter types.Node
"resources_cache.gpus",
"health_report.healthy",
"node_ipv6.has_ipv6",
"node_features.features as features",
"resources_cache.bios",
"resources_cache.baseboard",
"resources_cache.memory",
Expand All @@ -380,11 +386,10 @@ func (d *PostgresDatabase) nodeTableQuery(ctx context.Context, filter types.Node
LEFT JOIN location ON node.location_id = location.id
LEFT JOIN health_report ON node.twin_id = health_report.node_twin_id
LEFT JOIN node_ipv6 ON node.twin_id = node_ipv6.node_twin_id
LEFT JOIN node_features ON node.twin_id = node_features.node_twin_id
`)

if filter.HasGPU != nil || filter.GpuDeviceName != nil ||
filter.GpuVendorName != nil || filter.GpuVendorID != nil ||
filter.GpuDeviceID != nil || filter.GpuAvailable != nil {
if filter.IsGpuFilterRequested() {
q.Joins(
`RIGHT JOIN (?) AS gpu ON gpu.node_twin_id = node.twin_id`, nodeGpuSubquery,
)
Expand All @@ -411,12 +416,9 @@ func (d *PostgresDatabase) farmTableQuery(ctx context.Context, filter types.Farm
"LEFT JOIN public_ips_cache ON public_ips_cache.farm_id = farm.farm_id",
)

if filter.NodeAvailableFor != nil || filter.NodeFreeHRU != nil ||
filter.NodeCertified != nil || filter.NodeFreeMRU != nil ||
filter.NodeFreeSRU != nil || filter.NodeHasGPU != nil ||
filter.NodeRentedBy != nil || len(filter.NodeStatus) != 0 ||
filter.NodeTotalCRU != nil || filter.Country != nil ||
filter.Region != nil || filter.NodeHasIpv6 != nil {
if filter.IsNodeFilterRequested() {
// TODO: would it be a good option to delegate here to the GetNodes?
// how this will affect the performance benchmark?
q.Joins(`RIGHT JOIN (?) AS resources_cache on resources_cache.farm_id = farm.farm_id`, nodeQuery).
Group(`
farm.id,
Expand Down Expand Up @@ -485,6 +487,17 @@ func (d *PostgresDatabase) GetFarms(ctx context.Context, filter types.FarmFilter
Where("COALESCE(has_ipv6, false) = ?", *filter.NodeHasIpv6)
}

if len(filter.NodeFeatures) != 0 {
jsonList, err := json.Marshal(filter.NodeFeatures)
if err != nil {
return nil, 0, errors.Wrap(err, "failed to marshal the features filter to json list")
}

nodeQuery = nodeQuery.
Joins("LEFT JOIN node_features ON node_features.node_twin_id = node.twin_id").
Where(`COALESCE(node_features.features, '[]') @> ?`, jsonList)
}

q := d.farmTableQuery(ctx, filter, nodeQuery)

if filter.NodeAvailableFor != nil {
Expand Down Expand Up @@ -735,6 +748,17 @@ func (d *PostgresDatabase) GetNodes(ctx context.Context, filter types.NodeFilter
if filter.PriceMax != nil {
q = q.Where(`calc_discount(resources_cache.price_usd, ?) <= ?`, limit.Balance, *filter.PriceMax)
}
if len(filter.Features) != 0 {
// The @> operator checks if all the right elements exist on the left,
// it needs a proper json object on the right hand side.
// check https://www.postgresql.org/docs/9.4/functions-json.html for jsonb operators
jsonList, err := json.Marshal(filter.Features)
if err != nil {
return nil, 0, errors.Wrap(err, "failed to marshal the features filter to json list")
}

q = q.Where(`COALESCE(node_features.features, '[]') @> ?`, jsonList)
}

// Sorting
if limit.Randomize {
Expand Down
2 changes: 2 additions & 0 deletions grid-proxy/internal/explorer/db/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type Database interface {
UpsertNetworkSpeed(ctx context.Context, speeds []types.Speed) error
UpsertNodeIpv6Report(ctx context.Context, ips []types.HasIpv6) error
UpsertNodeWorkloads(ctx context.Context, workloads []types.NodesWorkloads) error
UpsertNodeFeatures(ctx context.Context, features []types.NodeFeatures) error
}

type ContractBilling types.ContractBilling
Expand Down Expand Up @@ -111,6 +112,7 @@ type Node struct {
DownloadSpeed float64
PriceUsd float64
FarmFreeIps uint
Features []string `gorm:"type:jsonb;serializer:json"`
}

// NodePower struct is the farmerbot report for node status
Expand Down
3 changes: 2 additions & 1 deletion grid-proxy/internal/explorer/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ func createReport(db DBClient, peer rmb.Client, idxIntervals map[string]uint) ty
isIndexerStale(indexers.Health.UpdatedAt, idxIntervals["health"]) ||
isIndexerStale(indexers.Ipv6.UpdatedAt, idxIntervals["ipv6"]) ||
isIndexerStale(indexers.Speed.UpdatedAt, idxIntervals["speed"]) ||
isIndexerStale(indexers.Workloads.UpdatedAt, idxIntervals["workloads"]) {
isIndexerStale(indexers.Workloads.UpdatedAt, idxIntervals["workloads"]) ||
isIndexerStale(indexers.Features.UpdatedAt, idxIntervals["features"]) {
report.TotalStateOk = false
}

Expand Down
Loading

0 comments on commit ebb7050

Please sign in to comment.