Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

init the features indexer #1214

Merged
merged 3 commits into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions grid-proxy/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ db-fill: ## Fill the database with a randomly generated data
--reset \
--seed 13

db-update:
@echo "Updating node uptimes"
@psql postgresql://postgres:postgres@$(PQ_HOST):5432/tfgrid-graphql < ./internal/explorer/db/helpers.sql

db-dump: ## Load a dump of the database (Args: `p=<path/to/file.sql`)
@docker cp $(p) postgres:/dump.sql;
@docker exec $(PQ_CONTAINER) bash -c "psql -U postgres -d tfgrid-graphql < ./dump.sql"
Expand Down
14 changes: 14 additions & 0 deletions grid-proxy/cmds/proxy_server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ type flags struct {
ipv6IndexerIntervalMins uint
workloadsIndexerNumWorkers uint
workloadsIndexerIntervalMins uint
featuresIndexerNumWorkers uint
featuresIndexerIntervalMins uint
}

func main() {
Expand Down Expand Up @@ -107,6 +109,8 @@ func main() {
flag.UintVar(&f.ipv6IndexerNumWorkers, "ipv6-indexer-workers", 10, "number of workers checking on node having ipv6")
flag.UintVar(&f.workloadsIndexerIntervalMins, "workloads-indexer-interval", 60, "node workloads check interval in min")
flag.UintVar(&f.workloadsIndexerNumWorkers, "workloads-indexer-workers", 10, "number of workers checking on node workloads number")
flag.UintVar(&f.featuresIndexerIntervalMins, "features-indexer-interval", 60*24, "node features check interval in min")
flag.UintVar(&f.featuresIndexerNumWorkers, "features-indexer-workers", 10, "number of workers checking on node supported features")
flag.Parse()

// shows version and exit
Expand Down Expand Up @@ -155,6 +159,7 @@ func main() {
indexerIntervals["workloads"] = f.workloadsIndexerIntervalMins
indexerIntervals["ipv6"] = f.ipv6IndexerIntervalMins
indexerIntervals["speed"] = f.speedIndexerIntervalMins
indexerIntervals["features"] = f.featuresIndexerIntervalMins
} else {
log.Info().Msg("Indexers did not start")
}
Expand Down Expand Up @@ -224,6 +229,15 @@ func startIndexers(ctx context.Context, f flags, db db.Database, rpcRmbClient *p
f.workloadsIndexerNumWorkers,
)
wlNumIdx.Start(ctx)

featIdx := indexer.NewIndexer[types.NodeFeatures](
indexer.NewFeatureWork(f.featuresIndexerIntervalMins),
"features",
db,
rpcRmbClient,
f.featuresIndexerNumWorkers,
)
featIdx.Start(ctx)
}

func app(s *http.Server, f flags) error {
Expand Down
24 changes: 24 additions & 0 deletions grid-proxy/docs/docs.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 24 additions & 0 deletions grid-proxy/docs/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,12 @@
"name": "node_certified",
"in": "query"
},
{
"type": "string",
"description": "filter farms with list of supported features on its nods",
"name": "node_features",
"in": "query"
},
{
"type": "string",
"description": "farm country",
Expand Down Expand Up @@ -1190,6 +1196,12 @@
"description": "get nodes with price smaller than this",
"name": "price_max",
"in": "query"
},
{
"type": "string",
"description": "filter nodes with list of supported features",
"name": "features",
"in": "query"
}
],
"responses": {
Expand Down Expand Up @@ -1829,6 +1841,12 @@
"farmingPolicyId": {
"type": "integer"
},
"features": {
"type": "array",
"items": {
"type": "string"
}
},
"gpus": {
"type": "array",
"items": {
Expand Down Expand Up @@ -2027,6 +2045,12 @@
"farmingPolicyId": {
"type": "integer"
},
"features": {
"type": "array",
"items": {
"type": "string"
}
},
"gpus": {
"type": "array",
"items": {
Expand Down
16 changes: 16 additions & 0 deletions grid-proxy/docs/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,10 @@ definitions:
type: string
farmingPolicyId:
type: integer
features:
items:
type: string
type: array
gpus:
items:
$ref: '#/definitions/types.NodeGPU'
Expand Down Expand Up @@ -287,6 +291,10 @@ definitions:
type: string
farmingPolicyId:
type: integer
features:
items:
type: string
type: array
gpus:
items:
$ref: '#/definitions/types.NodeGPU'
Expand Down Expand Up @@ -748,6 +756,10 @@ paths:
in: query
name: node_certified
type: boolean
- description: filter farms with list of supported features on its nods
in: query
name: node_features
type: string
- description: farm country
in: query
name: country
Expand Down Expand Up @@ -1252,6 +1264,10 @@ paths:
in: query
name: price_max
type: string
- description: filter nodes with list of supported features
in: query
name: features
type: string
produces:
- application/json
responses:
Expand Down
9 changes: 9 additions & 0 deletions grid-proxy/internal/explorer/converters.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,14 @@ func nodeFromDBNode(info db.Node) types.Node {
GPUs: info.Gpus,
PriceUsd: math.Round(info.PriceUsd*1000) / 1000,
FarmFreeIps: info.FarmFreeIps,
Features: info.Features,
}
node.Status = nodestatus.DecideNodeStatus(node.Power, node.UpdatedAt)
node.Dedicated = info.FarmDedicated || info.NodeContractsCount == 0 || info.Renter != 0 || info.ExtraFee > 0
// have an empty list instead of null in the json response
if node.Features == nil {
node.Features = []string{}
}
return node
}

Expand Down Expand Up @@ -163,9 +168,13 @@ func nodeWithNestedCapacityFromDBNode(info db.Node) types.NodeWithNestedCapacity
GPUs: info.Gpus,
PriceUsd: math.Round(info.PriceUsd*1000) / 1000,
FarmFreeIps: info.FarmFreeIps,
Features: info.Features,
}
node.Status = nodestatus.DecideNodeStatus(node.Power, node.UpdatedAt)
node.Dedicated = info.FarmDedicated || info.NodeContractsCount == 0 || info.Renter != 0 || info.ExtraFee > 0
if node.Features == nil {
node.Features = []string{}
}
return node
}

Expand Down
24 changes: 24 additions & 0 deletions grid-proxy/internal/explorer/db/helpers.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
-- THIS IS JUST FOR DEBUGGING PURPOSES


-- by the time, updated_at gets outdated which break some functionalities
-- that depends on node status. this help update the nodes that
-- got updated in the last period to now.
CREATE OR REPLACE FUNCTION update_node_uptimes()
RETURNS void AS $$
DECLARE
last_updated_at INT;
BEGIN
-- Step 1: Get the latest uptime report's updated_at timestamp
SELECT updated_at
INTO last_updated_at
FROM node
ORDER BY updated_at DESC
LIMIT 1;

-- Step 2: Update nodes where updated_at is > last_updated_at - 39 minutes
UPDATE node
SET updated_at = CAST(EXTRACT(epoch FROM NOW()) AS INT)
WHERE updated_at > last_updated_at - 2340;
END;
$$ LANGUAGE plpgsql;
8 changes: 8 additions & 0 deletions grid-proxy/internal/explorer/db/indexer_calls.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,11 @@ func (p *PostgresDatabase) UpsertNodeWorkloads(ctx context.Context, workloads []
}
return p.gormDB.WithContext(ctx).Table("node_workloads").Clauses(conflictClause).Create(&workloads).Error
}

func (p *PostgresDatabase) UpsertNodeFeatures(ctx context.Context, features []types.NodeFeatures) error {
conflictClause := clause.OnConflict{
Columns: []clause.Column{{Name: "node_twin_id"}},
DoUpdates: clause.AssignmentColumns([]string{"features", "updated_at"}),
}
return p.gormDB.WithContext(ctx).Table("node_features").Clauses(conflictClause).Create(&features).Error
}
42 changes: 33 additions & 9 deletions grid-proxy/internal/explorer/db/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package db

import (
"context"
"encoding/json"
"fmt"
"strings"

Expand Down Expand Up @@ -132,6 +133,9 @@ func (d *PostgresDatabase) GetLastUpsertsTimestamp() (types.IndexersState, error
if res := d.gormDB.Table("node_workloads").Select("updated_at").Where("updated_at IS NOT NULL").Order("updated_at DESC").Limit(1).Scan(&report.Workloads.UpdatedAt); res.Error != nil {
return report, errors.Wrap(res.Error, "couldn't get workloads last updated_at")
}
if res := d.gormDB.Table("node_features").Select("updated_at").Where("updated_at IS NOT NULL").Order("updated_at DESC").Limit(1).Scan(&report.Features.UpdatedAt); res.Error != nil {
return report, errors.Wrap(res.Error, "couldn't get features last updated_at")
}
return report, nil
}

Expand All @@ -143,6 +147,7 @@ func (d *PostgresDatabase) Initialize() error {
&types.Speed{},
&types.HasIpv6{},
&types.NodesWorkloads{},
&types.NodeFeatures{},
); err != nil {
return errors.Wrap(err, "failed to migrate indexer tables")
}
Expand Down Expand Up @@ -363,6 +368,7 @@ func (d *PostgresDatabase) nodeTableQuery(ctx context.Context, filter types.Node
"resources_cache.gpus",
"health_report.healthy",
"node_ipv6.has_ipv6",
"node_features.features as features",
"resources_cache.bios",
"resources_cache.baseboard",
"resources_cache.memory",
Expand All @@ -380,11 +386,10 @@ func (d *PostgresDatabase) nodeTableQuery(ctx context.Context, filter types.Node
LEFT JOIN location ON node.location_id = location.id
LEFT JOIN health_report ON node.twin_id = health_report.node_twin_id
LEFT JOIN node_ipv6 ON node.twin_id = node_ipv6.node_twin_id
LEFT JOIN node_features ON node.twin_id = node_features.node_twin_id
`)

if filter.HasGPU != nil || filter.GpuDeviceName != nil ||
filter.GpuVendorName != nil || filter.GpuVendorID != nil ||
filter.GpuDeviceID != nil || filter.GpuAvailable != nil {
if filter.IsGpuFilterRequested() {
q.Joins(
`RIGHT JOIN (?) AS gpu ON gpu.node_twin_id = node.twin_id`, nodeGpuSubquery,
)
Expand All @@ -411,12 +416,9 @@ func (d *PostgresDatabase) farmTableQuery(ctx context.Context, filter types.Farm
"LEFT JOIN public_ips_cache ON public_ips_cache.farm_id = farm.farm_id",
)

if filter.NodeAvailableFor != nil || filter.NodeFreeHRU != nil ||
filter.NodeCertified != nil || filter.NodeFreeMRU != nil ||
filter.NodeFreeSRU != nil || filter.NodeHasGPU != nil ||
filter.NodeRentedBy != nil || len(filter.NodeStatus) != 0 ||
filter.NodeTotalCRU != nil || filter.Country != nil ||
filter.Region != nil || filter.NodeHasIpv6 != nil {
if filter.IsNodeFilterRequested() {
// TODO: would it be a good option to delegate here to the GetNodes?
// how this will affect the performance benchmark?
q.Joins(`RIGHT JOIN (?) AS resources_cache on resources_cache.farm_id = farm.farm_id`, nodeQuery).
Group(`
farm.id,
Expand Down Expand Up @@ -485,6 +487,17 @@ func (d *PostgresDatabase) GetFarms(ctx context.Context, filter types.FarmFilter
Where("COALESCE(has_ipv6, false) = ?", *filter.NodeHasIpv6)
}

if len(filter.NodeFeatures) != 0 {
jsonList, err := json.Marshal(filter.NodeFeatures)
if err != nil {
return nil, 0, errors.Wrap(err, "failed to marshal the features filter to json list")
}

nodeQuery = nodeQuery.
Joins("LEFT JOIN node_features ON node_features.node_twin_id = node.twin_id").
Where(`COALESCE(node_features.features, '[]') @> ?`, jsonList)
}

q := d.farmTableQuery(ctx, filter, nodeQuery)

if filter.NodeAvailableFor != nil {
Expand Down Expand Up @@ -735,6 +748,17 @@ func (d *PostgresDatabase) GetNodes(ctx context.Context, filter types.NodeFilter
if filter.PriceMax != nil {
q = q.Where(`calc_discount(resources_cache.price_usd, ?) <= ?`, limit.Balance, *filter.PriceMax)
}
if len(filter.Features) != 0 {
// The @> operator checks if all the right elements exist on the left,
// it needs a proper json object on the right hand side.
// check https://www.postgresql.org/docs/9.4/functions-json.html for jsonb operators
jsonList, err := json.Marshal(filter.Features)
if err != nil {
return nil, 0, errors.Wrap(err, "failed to marshal the features filter to json list")
}

q = q.Where(`COALESCE(node_features.features, '[]') @> ?`, jsonList)
}

// Sorting
if limit.Randomize {
Expand Down
2 changes: 2 additions & 0 deletions grid-proxy/internal/explorer/db/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type Database interface {
UpsertNetworkSpeed(ctx context.Context, speeds []types.Speed) error
UpsertNodeIpv6Report(ctx context.Context, ips []types.HasIpv6) error
UpsertNodeWorkloads(ctx context.Context, workloads []types.NodesWorkloads) error
UpsertNodeFeatures(ctx context.Context, features []types.NodeFeatures) error
}

type ContractBilling types.ContractBilling
Expand Down Expand Up @@ -111,6 +112,7 @@ type Node struct {
DownloadSpeed float64
PriceUsd float64
FarmFreeIps uint
Features []string `gorm:"type:jsonb;serializer:json"`
}

// NodePower struct is the farmerbot report for node status
Expand Down
3 changes: 2 additions & 1 deletion grid-proxy/internal/explorer/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ func createReport(db DBClient, peer rmb.Client, idxIntervals map[string]uint) ty
isIndexerStale(indexers.Health.UpdatedAt, idxIntervals["health"]) ||
isIndexerStale(indexers.Ipv6.UpdatedAt, idxIntervals["ipv6"]) ||
isIndexerStale(indexers.Speed.UpdatedAt, idxIntervals["speed"]) ||
isIndexerStale(indexers.Workloads.UpdatedAt, idxIntervals["workloads"]) {
isIndexerStale(indexers.Workloads.UpdatedAt, idxIntervals["workloads"]) ||
isIndexerStale(indexers.Features.UpdatedAt, idxIntervals["features"]) {
report.TotalStateOk = false
}

Expand Down
Loading
Loading