diff --git a/cmd/cli/devstack/devstack.go b/cmd/cli/devstack/devstack.go index 2dac65d703..8f74b9c88a 100644 --- a/cmd/cli/devstack/devstack.go +++ b/cmd/cli/devstack/devstack.go @@ -10,6 +10,7 @@ import ( "k8s.io/kubectl/pkg/util/i18n" "github.com/bacalhau-project/bacalhau/cmd/util/flags/configflags" + "github.com/bacalhau-project/bacalhau/pkg/analytics" "github.com/bacalhau-project/bacalhau/pkg/config/types" "github.com/bacalhau-project/bacalhau/pkg/config_legacy" baccrypto "github.com/bacalhau-project/bacalhau/pkg/lib/crypto" @@ -191,7 +192,7 @@ func runDevstack(cmd *cobra.Command, cfg types.Bacalhau, fsr *repo.FsRepo, ODs * return err } - requesterConfig, err := serve.GetRequesterConfig(cfg, true) + requesterConfig, err := serve.GetRequesterConfig(cfg, true, &analytics.NoopRecorder{}) if err != nil { return err } diff --git a/cmd/cli/serve/serve.go b/cmd/cli/serve/serve.go index 9b10c5547a..402e9d3fc3 100644 --- a/cmd/cli/serve/serve.go +++ b/cmd/cli/serve/serve.go @@ -17,6 +17,7 @@ import ( "github.com/bacalhau-project/bacalhau/cmd/util" "github.com/bacalhau-project/bacalhau/cmd/util/flags/configflags" + "github.com/bacalhau-project/bacalhau/pkg/analytics" "github.com/bacalhau-project/bacalhau/pkg/config" "github.com/bacalhau-project/bacalhau/pkg/config/types" "github.com/bacalhau-project/bacalhau/pkg/lib/crypto" @@ -123,12 +124,29 @@ func serve(cmd *cobra.Command, cfg types.Bacalhau, fsRepo *repo.FsRepo) error { return err } + installationID, err := fsRepo.ReadInstallationID() + if err != nil { + return err + } + computeConfig, err := GetComputeConfig(ctx, cfg, isComputeNode) if err != nil { return errors.Wrapf(err, "failed to configure compute node") } - requesterConfig, err := GetRequesterConfig(cfg, isRequesterNode) + var recorder analytics.Recorder + if cfg.DisableAnalytics { + recorder = &analytics.NoopRecorder{} + } else { + recorder, err = analytics.New(ctx, analytics.WithNodeNodeID(nodeName), + analytics.WithInstallationID(installationID), analytics.WithNodeType(isRequesterNode, isComputeNode)) + if err != nil { + // if we fail here for some reason, do so silently and use the noop recorder. + recorder = &analytics.NoopRecorder{} + } + } + + requesterConfig, err := GetRequesterConfig(cfg, isRequesterNode, recorder) if err != nil { return errors.Wrapf(err, "failed to configure requester node") } diff --git a/cmd/cli/serve/util.go b/cmd/cli/serve/util.go index 644f5200f7..f0be514a89 100644 --- a/cmd/cli/serve/util.go +++ b/cmd/cli/serve/util.go @@ -7,6 +7,7 @@ import ( pkgerrors "github.com/pkg/errors" + "github.com/bacalhau-project/bacalhau/pkg/analytics" "github.com/bacalhau-project/bacalhau/pkg/bidstrategy/semantic" "github.com/bacalhau-project/bacalhau/pkg/compute/capacity/system" "github.com/bacalhau-project/bacalhau/pkg/compute/store" @@ -86,7 +87,7 @@ func GetComputeConfig( return node.NewComputeConfigWith(executionsPath, params) } -func GetRequesterConfig(cfg types.Bacalhau, createJobStore bool) (node.RequesterConfig, error) { +func GetRequesterConfig(cfg types.Bacalhau, createJobStore bool, recorder analytics.Recorder) (node.RequesterConfig, error) { var err error var jobStore jobstore.Store if createJobStore { @@ -94,7 +95,7 @@ func GetRequesterConfig(cfg types.Bacalhau, createJobStore bool) (node.Requester if err != nil { return node.RequesterConfig{}, err } - jobStore, err = boltjobstore.NewBoltJobStore(jobStoreDBPath) + jobStore, err = boltjobstore.NewBoltJobStore(jobStoreDBPath, boltjobstore.WithRecorder(recorder)) if err != nil { return node.RequesterConfig{}, pkgerrors.Wrapf(err, "failed to create job store") } diff --git a/go.mod b/go.mod index b91c561a46..86c14a5713 100644 --- a/go.mod +++ b/go.mod @@ -61,16 +61,20 @@ require ( github.com/vincent-petithory/dataurl v1.0.0 go.etcd.io/bbolt v1.3.8 go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 - go.opentelemetry.io/otel v1.28.0 + go.opentelemetry.io/otel v1.29.0 + go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc v0.5.0 go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.28.0 go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.28.0 go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.28.0 go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.28.0 go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.28.0 - go.opentelemetry.io/otel/metric v1.28.0 - go.opentelemetry.io/otel/sdk v1.28.0 + go.opentelemetry.io/otel/exporters/stdout/stdoutlog v0.5.0 + go.opentelemetry.io/otel/log v0.5.0 + go.opentelemetry.io/otel/metric v1.29.0 + go.opentelemetry.io/otel/sdk v1.29.0 + go.opentelemetry.io/otel/sdk/log v0.5.0 go.opentelemetry.io/otel/sdk/metric v1.28.0 - go.opentelemetry.io/otel/trace v1.28.0 + go.opentelemetry.io/otel/trace v1.29.0 go.ptx.dk/multierrgroup v0.0.3 go.uber.org/mock v0.4.0 go.uber.org/zap v1.27.0 @@ -151,8 +155,8 @@ require ( go.uber.org/multierr v1.11.0 // indirect golang.org/x/mod v0.17.0 // indirect gonum.org/v1/gonum v0.14.0 // indirect - google.golang.org/genproto/googleapis/api v0.0.0-20240808171019-573a1156607a // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20240808171019-573a1156607a // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20240822170219-fc7c04adadcd // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240822170219-fc7c04adadcd // indirect k8s.io/cli-runtime v0.29.0 // indirect ) @@ -187,7 +191,7 @@ require ( github.com/golang/protobuf v1.5.4 // indirect github.com/google/gofuzz v1.2.0 // indirect github.com/google/gopacket v1.1.19 // indirect - github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-cleanhttp v0.5.2 // indirect github.com/hashicorp/golang-lru v1.0.2 // indirect @@ -267,7 +271,7 @@ require ( go.uber.org/atomic v1.11.0 go4.org v0.0.0-20230225012048-214862532bf5 // indirect golang.org/x/net v0.28.0 // indirect - golang.org/x/oauth2 v0.21.0 // indirect + golang.org/x/oauth2 v0.22.0 // indirect golang.org/x/sync v0.8.0 golang.org/x/sys v0.24.0 // indirect golang.org/x/term v0.23.0 diff --git a/go.sum b/go.sum index 019e0d8f6a..a8fcf6af1b 100644 --- a/go.sum +++ b/go.sum @@ -669,8 +669,8 @@ github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/ github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY= github.com/grpc-ecosystem/grpc-gateway/v2 v2.11.3/go.mod h1:o//XUCC/F+yRGJoPO/VU0GSB0f8Nhgmxx0VIRUvaC0w= github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0/go.mod h1:YN5jB8ie0yfIUg6VvR9Kz84aCaG7AsGZnLjhHbUqwPg= -github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 h1:bkypFPDjIYGfCYD5mRBvpqxfYX1YCS1PXdKYWi8FsN0= -github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0/go.mod h1:P+Lt/0by1T8bfcF3z737NnSbmxQAppXMRziHUxPOC8k= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 h1:asbCHRVmodnJTuQ3qamDwqVOIjwqUPTYmYuemVOx+Ys= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0/go.mod h1:ggCgvZ2r7uOoQjOyu2Y1NhHmEPPzzuhWgcza5M1Ji1I= github.com/hamba/avro/v2 v2.17.2/go.mod h1:Q9YK+qxAhtVrNqOhwlZTATLgLA8qxG2vtvkhK8fJ7Jo= github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= @@ -1268,8 +1268,10 @@ go.opentelemetry.io/otel v1.21.0/go.mod h1:QZzNPQPm1zLX4gZK4cMi+71eaorMSGT3A4znn go.opentelemetry.io/otel v1.22.0/go.mod h1:eoV4iAi3Ea8LkAEI9+GFT44O6T/D0GWAVFyZVCC6pMI= go.opentelemetry.io/otel v1.23.0/go.mod h1:YCycw9ZeKhcJFrb34iVSkyT0iczq/zYDtZYFufObyB0= go.opentelemetry.io/otel v1.24.0/go.mod h1:W7b9Ozg4nkF5tWI5zsXkaKKDjdVjpD4oAt9Qi/MArHo= -go.opentelemetry.io/otel v1.28.0 h1:/SqNcYk+idO0CxKEUOtKQClMK/MimZihKYMruSMViUo= -go.opentelemetry.io/otel v1.28.0/go.mod h1:q68ijF8Fc8CnMHKyzqL6akLO46ePnjkgfIMIjUIX9z4= +go.opentelemetry.io/otel v1.29.0 h1:PdomN/Al4q/lN6iBJEN3AwPvUiHPMlt93c8bqTG5Llw= +go.opentelemetry.io/otel v1.29.0/go.mod h1:N/WtXPs1CNCUEx+Agz5uouwCba+i+bJGFicT8SR4NP8= +go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc v0.5.0 h1:iWyFL+atC9S1e6MFDLNUZieyKTmsrvsDzuozUDbFg8E= +go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc v0.5.0/go.mod h1:0Ur7rPCJmkHksYcBywsFXnKBG3pqGl4TGltZ+T3qhSA= go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.28.0 h1:U2guen0GhqH8o/G2un8f/aG/y++OuW6MyCo6hT9prXk= go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.28.0/go.mod h1:yeGZANgEcpdx/WK0IvvRFC+2oLiMS2u4L/0Rj2M2Qr0= go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.28.0 h1:aLmmtjRke7LPDQ3lvpFz+kNEH43faFhzW7v8BFIEydg= @@ -1280,29 +1282,35 @@ go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.28.0 h1:R3X6Z go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.28.0/go.mod h1:QWFXnDavXWwMx2EEcZsf3yxgEKAqsxQ+Syjp+seyInw= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.28.0 h1:j9+03ymgYhPKmeXGk5Zu+cIZOlVzd9Zv7QIiyItjFBU= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.28.0/go.mod h1:Y5+XiUG4Emn1hTfciPzGPJaSI+RpDts6BnCIir0SLqk= +go.opentelemetry.io/otel/exporters/stdout/stdoutlog v0.5.0 h1:ThVXnEsdwNcxdBO+r96ci1xbF+PgNjwlk457VNuJODo= +go.opentelemetry.io/otel/exporters/stdout/stdoutlog v0.5.0/go.mod h1:rHWcSmC4q2h3gje/yOq6sAOaq8+UHxN/Ru3BbmDXOfY= go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.21.0 h1:VhlEQAPp9R1ktYfrPk5SOryw1e9LDDTZCbIPFrho0ec= go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.21.0/go.mod h1:kB3ufRbfU+CQ4MlUcqtW8Z7YEOBeK2DJ6CmR5rYYF3E= go.opentelemetry.io/otel/exporters/zipkin v1.21.0 h1:D+Gv6lSfrFBWmQYyxKjDd0Zuld9SRXpIrEsKZvE4DO4= go.opentelemetry.io/otel/exporters/zipkin v1.21.0/go.mod h1:83oMKR6DzmHisFOW3I+yIMGZUTjxiWaiBI8M8+TU5zE= +go.opentelemetry.io/otel/log v0.5.0 h1:x1Pr6Y3gnXgl1iFBwtGy1W/mnzENoK0w0ZoaeOI3i30= +go.opentelemetry.io/otel/log v0.5.0/go.mod h1:NU/ozXeGuOR5/mjCRXYbTC00NFJ3NYuraV/7O78F0rE= go.opentelemetry.io/otel/metric v1.21.0/go.mod h1:o1p3CA8nNHW8j5yuQLdc1eeqEaPfzug24uvsyIEJRWM= go.opentelemetry.io/otel/metric v1.22.0/go.mod h1:evJGjVpZv0mQ5QBRJoBF64yMuOf4xCWdXjK8pzFvliY= go.opentelemetry.io/otel/metric v1.23.0/go.mod h1:MqUW2X2a6Q8RN96E2/nqNoT+z9BSms20Jb7Bbp+HiTo= go.opentelemetry.io/otel/metric v1.24.0/go.mod h1:VYhLe1rFfxuTXLgj4CBiyz+9WYBA8pNGJgDcSFRKBco= -go.opentelemetry.io/otel/metric v1.28.0 h1:f0HGvSl1KRAU1DLgLGFjrwVyismPlnuU6JD6bOeuA5Q= -go.opentelemetry.io/otel/metric v1.28.0/go.mod h1:Fb1eVBFZmLVTMb6PPohq3TO9IIhUisDsbJoL/+uQW4s= +go.opentelemetry.io/otel/metric v1.29.0 h1:vPf/HFWTNkPu1aYeIsc98l4ktOQaL6LeSoeV2g+8YLc= +go.opentelemetry.io/otel/metric v1.29.0/go.mod h1:auu/QWieFVWx+DmQOUMgj0F8LHWdgalxXqvp7BII/W8= go.opentelemetry.io/otel/sdk v1.21.0/go.mod h1:Nna6Yv7PWTdgJHVRD9hIYywQBRx7pbox6nwBnZIxl/E= go.opentelemetry.io/otel/sdk v1.22.0/go.mod h1:iu7luyVGYovrRpe2fmj3CVKouQNdTOkxtLzPvPz1DOc= go.opentelemetry.io/otel/sdk v1.24.0/go.mod h1:KVrIYw6tEubO9E96HQpcmpTKDVn9gdv35HoYiQWGDFg= -go.opentelemetry.io/otel/sdk v1.28.0 h1:b9d7hIry8yZsgtbmM0DKyPWMMUMlK9NEKuIG4aBqWyE= -go.opentelemetry.io/otel/sdk v1.28.0/go.mod h1:oYj7ClPUA7Iw3m+r7GeEjz0qckQRJK2B8zjcZEfu7Pg= +go.opentelemetry.io/otel/sdk v1.29.0 h1:vkqKjk7gwhS8VaWb0POZKmIEDimRCMsopNYnriHyryo= +go.opentelemetry.io/otel/sdk v1.29.0/go.mod h1:pM8Dx5WKnvxLCb+8lG1PRNIDxu9g9b9g59Qr7hfAAok= +go.opentelemetry.io/otel/sdk/log v0.5.0 h1:A+9lSjlZGxkQOr7QSBJcuyyYBw79CufQ69saiJLey7o= +go.opentelemetry.io/otel/sdk/log v0.5.0/go.mod h1:zjxIW7sw1IHolZL2KlSAtrUi8JHttoeiQy43Yl3WuVQ= go.opentelemetry.io/otel/sdk/metric v1.28.0 h1:OkuaKgKrgAbYrrY0t92c+cC+2F6hsFNnCQArXCKlg08= go.opentelemetry.io/otel/sdk/metric v1.28.0/go.mod h1:cWPjykihLAPvXKi4iZc1dpER3Jdq2Z0YLse3moQUCpg= go.opentelemetry.io/otel/trace v1.21.0/go.mod h1:LGbsEB0f9LGjN+OZaQQ26sohbOmiMR+BaslueVtS/qQ= go.opentelemetry.io/otel/trace v1.22.0/go.mod h1:RbbHXVqKES9QhzZq/fE5UnOSILqRt40a21sPw2He1xo= go.opentelemetry.io/otel/trace v1.23.0/go.mod h1:GSGTbIClEsuZrGIzoEHqsVfxgn5UkggkflQwDScNUsk= go.opentelemetry.io/otel/trace v1.24.0/go.mod h1:HPc3Xr/cOApsBI154IU0OI0HJexz+aw5uPdbs3UCjNU= -go.opentelemetry.io/otel/trace v1.28.0 h1:GhQ9cUuQGmNDd5BTCP2dAvv75RdMxEfTmYejp+lkx9g= -go.opentelemetry.io/otel/trace v1.28.0/go.mod h1:jPyXzNPg6da9+38HEwElrQiHlVMTnVfM3/yv2OlIHaI= +go.opentelemetry.io/otel/trace v1.29.0 h1:J/8ZNK4XgR7a21DZUAsbF8pZ5Jcw1VhACmnYt39JTi4= +go.opentelemetry.io/otel/trace v1.29.0/go.mod h1:eHl3w0sp3paPkYstJOmAimxhiFXPg+MMTlEh3nsQgWQ= go.opentelemetry.io/proto/otlp v1.0.0/go.mod h1:Sy6pihPLfYHkr3NkUbEhGHFhINUSI/v80hjKIs5JXpM= go.opentelemetry.io/proto/otlp v1.3.1 h1:TrMUixzpM0yuc/znrFTP9MMRh8trP93mkCiDVeXrui0= go.opentelemetry.io/proto/otlp v1.3.1/go.mod h1:0X1WI4de4ZsLrrJNLAQbFeLCm3T7yBkR0XqQ7niQU+8= @@ -1529,8 +1537,9 @@ golang.org/x/oauth2 v0.17.0/go.mod h1:OzPDGQiuQMguemayvdylqddI7qcD9lnSDb+1FiwQ5H golang.org/x/oauth2 v0.18.0/go.mod h1:Wf7knwG0MPoWIMMBgFlEaSUDaKskp0dCfrlJRJXbBi8= golang.org/x/oauth2 v0.19.0/go.mod h1:vYi7skDa1x015PmRRYZ7+s1cWyPgrPiSYRe4rnsexc8= golang.org/x/oauth2 v0.20.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= -golang.org/x/oauth2 v0.21.0 h1:tsimM75w1tF/uws5rbeHzIWxEqElMehnc+iW793zsZs= golang.org/x/oauth2 v0.21.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= +golang.org/x/oauth2 v0.22.0 h1:BzDx2FehcG7jJwgWLELCdmLuxk2i+x9UDpSiss2u0ZA= +golang.org/x/oauth2 v0.22.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -1897,8 +1906,8 @@ google.golang.org/genproto/googleapis/api v0.0.0-20240604185151-ef581f913117/go. google.golang.org/genproto/googleapis/api v0.0.0-20240610135401-a8a62080eff3/go.mod h1:kdrSS/OiLkPrNUpzD4aHgCq2rVuC/YRxok32HXZ4vRE= google.golang.org/genproto/googleapis/api v0.0.0-20240617180043-68d350f18fd4/go.mod h1:px9SlOOZBg1wM1zdnr8jEL4CNGUBZ+ZKYtNPApNQc4c= google.golang.org/genproto/googleapis/api v0.0.0-20240624140628-dc46fd24d27d/go.mod h1:Od4k8V1LQSizPRUK4OzZ7TBE/20k+jPczUDAEyvn69Y= -google.golang.org/genproto/googleapis/api v0.0.0-20240808171019-573a1156607a h1:KyUe15n7B1YCu+kMmPtlXxgkLQbp+Dw0tCRZf9Sd+CE= -google.golang.org/genproto/googleapis/api v0.0.0-20240808171019-573a1156607a/go.mod h1:4+X6GvPs+25wZKbQq9qyAXrwIRExv7w0Ea6MgZLZiDM= +google.golang.org/genproto/googleapis/api v0.0.0-20240822170219-fc7c04adadcd h1:BBOTEWLuuEGQy9n1y9MhVJ9Qt0BDu21X8qZs71/uPZo= +google.golang.org/genproto/googleapis/api v0.0.0-20240822170219-fc7c04adadcd/go.mod h1:fO8wJzT2zbQbAjbIoos1285VfEIYKDDY+Dt+WpTkh6g= google.golang.org/genproto/googleapis/bytestream v0.0.0-20230530153820-e85fd2cbaebc/go.mod h1:ylj+BE99M198VPbBh6A8d9n3w8fChvyLK3wwBOjXBFA= google.golang.org/genproto/googleapis/bytestream v0.0.0-20231030173426-d783a09b4405/go.mod h1:GRUCuLdzVqZte8+Dl/D4N25yLzcGqqWaYkeVOwulFqw= google.golang.org/genproto/googleapis/bytestream v0.0.0-20231212172506-995d672761c0/go.mod h1:guYXGPwC6jwxgWKW5Y405fKWOFNwlvUlUnzyp9i0uqo= @@ -1952,8 +1961,8 @@ google.golang.org/genproto/googleapis/rpc v0.0.0-20240604185151-ef581f913117/go. google.golang.org/genproto/googleapis/rpc v0.0.0-20240610135401-a8a62080eff3/go.mod h1:EfXuqaE1J41VCDicxHzUDm+8rk+7ZdXzHV0IhO/I6s0= google.golang.org/genproto/googleapis/rpc v0.0.0-20240617180043-68d350f18fd4/go.mod h1:Ue6ibwXGpU+dqIcODieyLOcgj7z8+IcskoNIgZxtrFY= google.golang.org/genproto/googleapis/rpc v0.0.0-20240624140628-dc46fd24d27d/go.mod h1:Ue6ibwXGpU+dqIcODieyLOcgj7z8+IcskoNIgZxtrFY= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240808171019-573a1156607a h1:EKiZZXueP9/T68B8Nl0GAx9cjbQnCId0yP3qPMgaaHs= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240808171019-573a1156607a/go.mod h1:Ue6ibwXGpU+dqIcODieyLOcgj7z8+IcskoNIgZxtrFY= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240822170219-fc7c04adadcd h1:6TEm2ZxXoQmFWFlt1vNxvVOa1Q0dXFQD1m/rYjXmS0E= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240822170219-fc7c04adadcd/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU= google.golang.org/grpc v1.64.0 h1:KH3VH9y/MgNQg1dE7b3XfVK0GsPSIzJwdF617gUSbvY= google.golang.org/grpc v1.64.0/go.mod h1:oxjF8E3FBnjp+/gVFYdWacaLDx9na1aqy9oovLpxQYg= google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0/go.mod h1:6Kw0yEErY5E/yWrBtf03jp27GLLJujG4z/JK95pnjjw= diff --git a/pkg/analytics/analytics.go b/pkg/analytics/analytics.go new file mode 100644 index 0000000000..2717ecf0e2 --- /dev/null +++ b/pkg/analytics/analytics.go @@ -0,0 +1,116 @@ +package analytics + +import ( + "context" + "fmt" + "time" + + "github.com/rs/zerolog/log" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc" + "go.opentelemetry.io/otel/exporters/stdout/stdoutlog" + otellog "go.opentelemetry.io/otel/log" + sdklog "go.opentelemetry.io/otel/sdk/log" + "go.opentelemetry.io/otel/sdk/resource" + semconv "go.opentelemetry.io/otel/semconv/v1.26.0" + + "github.com/bacalhau-project/bacalhau/pkg/models" +) + +type Recorder interface { + EmitEvent(ctx context.Context, event EventType, properties ...otellog.KeyValue) + EmitJobEvent(ctx context.Context, event EventType, j models.Job) + Stop(ctx context.Context) error +} + +var _ Recorder = (*LogRecorder)(nil) + +type LogRecorder struct { + provider *sdklog.LoggerProvider +} + +func New(ctx context.Context, opts ...Option) (*LogRecorder, error) { + config := &Config{ + otlpEndpoint: "localhost:4317", // Default endpoint + attributes: make([]attribute.KeyValue, 0), + } + // Apply options + for _, opt := range opts { + if err := opt(config); err != nil { + return nil, fmt.Errorf("failed to apply option: %w", err) + } + } + + // Create the file exporter + // TODO before merging we'll need to disable this + stdoutExporter, err := stdoutlog.New() + if err != nil { + return nil, fmt.Errorf("failed to create stdout exporter: %w", err) + } + + exporter, err := otlploggrpc.New(ctx, + otlploggrpc.WithEndpoint(config.otlpEndpoint), otlploggrpc.WithInsecure(), + ) + if err != nil { + return nil, fmt.Errorf("failed to create OTLP exporter: %w", err) + } + + // Create a new resource with auto-detected host information + res, err := resource.New(ctx, + resource.WithOS(), + resource.WithSchemaURL(semconv.SchemaURL), + resource.WithAttributes(config.attributes...), + ) + if err != nil { + return nil, fmt.Errorf("failed to create resource: %w", err) + } + + loggerProvider := sdklog.NewLoggerProvider( + sdklog.WithResource(res), + sdklog.WithProcessor(sdklog.NewBatchProcessor(exporter)), + sdklog.WithProcessor(sdklog.NewBatchProcessor(stdoutExporter)), + ) + + return &LogRecorder{ + provider: loggerProvider, + }, nil +} + +func (a *LogRecorder) Stop(ctx context.Context) error { + defer func() { + if err := a.provider.Shutdown(ctx); err != nil { + log.Warn().Err(err).Msg("failed to shutdown analytics") + } + }() + if err := a.provider.ForceFlush(ctx); err != nil { + return fmt.Errorf("failed to flush analytics: %w", err) + } + return nil +} + +type EventType string + +const ( + JobComplete EventType = "job_complete" +) + +const ( + EventKey = "event" + PropertiesKey = "properties" +) + +func (a *LogRecorder) EmitEvent(ctx context.Context, event EventType, properties ...otellog.KeyValue) { + record := otellog.Record{} + record.SetTimestamp(time.Now().UTC()) + record.AddAttributes( + otellog.String(EventKey, string(event)), + otellog.Map(PropertiesKey, properties...), + ) + a.provider.Logger("bacalhau-analytics").Emit(ctx, record) +} + +func (a *LogRecorder) EmitJobEvent(ctx context.Context, event EventType, j models.Job) { + jobAttributes := makeJobAttributes(j) + taskAttributes := makeTaskAttributes(j.Task()) + a.EmitEvent(ctx, event, append(jobAttributes, taskAttributes...)...) +} diff --git a/pkg/analytics/config.go b/pkg/analytics/config.go new file mode 100644 index 0000000000..b55e829e73 --- /dev/null +++ b/pkg/analytics/config.go @@ -0,0 +1,56 @@ +package analytics + +import ( + "go.opentelemetry.io/otel/attribute" +) + +const ( + InstallationIDKey = "installation_id" + InstanceIDKey = "instance_id" + NodeIDKey = "node_id" + NodeTypeKey = "node_type" +) + +type Config struct { + attributes []attribute.KeyValue + otlpEndpoint string +} + +// Option is a functional option for configuring the LogRecorder instance +type Option func(*Config) error + +func WithNodeNodeID(id string) Option { + return func(c *Config) error { + c.attributes = append(c.attributes, attribute.String(NodeIDKey, id)) + return nil + } +} + +func WithNodeType(isRequester, isCompute bool) Option { + return func(c *Config) error { + var typ string + if isRequester && isCompute { + typ = "hybrid" + } else if isRequester { + typ = "orchestrator" + } else if isCompute { + typ = "compute" + } + c.attributes = append(c.attributes, attribute.String(NodeTypeKey, typ)) + return nil + } +} + +func WithInstallationID(id string) Option { + return func(c *Config) error { + c.attributes = append(c.attributes, attribute.String(InstallationIDKey, id)) + return nil + } +} + +func WithInstanceID(id string) Option { + return func(c *Config) error { + c.attributes = append(c.attributes, attribute.String(InstanceIDKey, id)) + return nil + } +} diff --git a/pkg/analytics/noop.go b/pkg/analytics/noop.go new file mode 100644 index 0000000000..92579b908d --- /dev/null +++ b/pkg/analytics/noop.go @@ -0,0 +1,25 @@ +package analytics + +import ( + "context" + + otellog "go.opentelemetry.io/otel/log" + + "github.com/bacalhau-project/bacalhau/pkg/models" +) + +var _ Recorder = (*NoopRecorder)(nil) + +type NoopRecorder struct{} + +func (n *NoopRecorder) Stop(ctx context.Context) error { + return nil +} + +func (n *NoopRecorder) EmitEvent(ctx context.Context, event EventType, properties ...otellog.KeyValue) { + return +} + +func (n *NoopRecorder) EmitJobEvent(ctx context.Context, event EventType, j models.Job) { + return +} diff --git a/pkg/analytics/util.go b/pkg/analytics/util.go new file mode 100644 index 0000000000..efba35f1e9 --- /dev/null +++ b/pkg/analytics/util.go @@ -0,0 +1,48 @@ +package analytics + +import ( + otellog "go.opentelemetry.io/otel/log" + + "github.com/bacalhau-project/bacalhau/pkg/models" +) + +func makeJobAttributes(j models.Job) []otellog.KeyValue { + return []otellog.KeyValue{ + otellog.String("id", j.ID), + otellog.String("name", j.Name), + otellog.String("namespace", j.Namespace), + otellog.String("type", j.Type), + otellog.Int("count", j.Count), + // TODO(forrest): consider collecting constraints, meta, and labels + otellog.String("state", j.State.StateType.String()), + otellog.String("state_message", j.State.Message), + otellog.Int64("version", int64(j.Version)), + otellog.Int64("revision", int64(j.Revision)), + otellog.Int64("create_time", j.CreateTime), + otellog.Int64("modified_time", j.ModifyTime), + } +} + +func makeTaskAttributes(t *models.Task) []otellog.KeyValue { + inputTypes := make([]otellog.Value, len(t.InputSources)) + for i, s := range t.InputSources { + inputTypes[i] = otellog.StringValue(s.Source.Type) + } + + return []otellog.KeyValue{ + otellog.String("task_name", t.Name), + otellog.String("task_engine", t.Engine.Type), + otellog.String("task_publisher", t.Engine.Type), + otellog.Slice("task_inputs", inputTypes...), + otellog.Int("task_env_count", len(t.Env)), + otellog.Int("task_meta_count", len(t.Meta)), + otellog.String("task_cpu", t.ResourcesConfig.CPU), + otellog.String("task_memory", t.ResourcesConfig.Memory), + otellog.String("task_disk", t.ResourcesConfig.Disk), + otellog.String("task_gpu", t.ResourcesConfig.GPU), + otellog.String("task_network_type", t.Network.Type.String()), + otellog.Int64("task_timout_execution", t.Timeouts.ExecutionTimeout), + otellog.Int64("task_timout_queue", t.Timeouts.QueueTimeout), + otellog.Int64("task_timout_total", t.Timeouts.TotalTimeout), + } +} diff --git a/pkg/config/types/bacalhau.go b/pkg/config/types/bacalhau.go index a173e64fae..00cde5721e 100644 --- a/pkg/config/types/bacalhau.go +++ b/pkg/config/types/bacalhau.go @@ -23,6 +23,7 @@ type Bacalhau struct { Logging Logging `yaml:"Logging,omitempty"` UpdateConfig UpdateConfig `yaml:"UpdateConfig,omitempty"` FeatureFlags FeatureFlags `yaml:"FeatureFlags,omitempty"` + DisableAnalytics bool `yaml:"DisableAnalytics,omitempty"` } type API struct { diff --git a/pkg/config/types/generated_constants.go b/pkg/config/types/generated_constants.go index 34ac738d8e..ce5dda4a77 100644 --- a/pkg/config/types/generated_constants.go +++ b/pkg/config/types/generated_constants.go @@ -121,3 +121,4 @@ const LoggingModeKey = "logging.mode" const LoggingLogDebugInfoIntervalKey = "logging.logdebuginfointerval" const UpdateConfigIntervalKey = "updateconfig.interval" const FeatureFlagsExecTranslationKey = "featureflags.exectranslation" +const DisableAnalyticsKey = "disableanalytics" diff --git a/pkg/config/types/generated_descriptions.go b/pkg/config/types/generated_descriptions.go index 00ad539fe5..9054d6e700 100644 --- a/pkg/config/types/generated_descriptions.go +++ b/pkg/config/types/generated_descriptions.go @@ -4,8 +4,8 @@ package types // ConfigDescriptions maps configuration paths to their descriptions var ConfigDescriptions = map[string]string{ - APIHostKey: "Host specifies the hostname or IP address for cluster communication.", - APIPortKey: "Port specifies the port number for cluster communication.", + APIHostKey: "Host specifies the hostname or IP address on which the API server listens or the client connects.", + APIPortKey: "Port specifies the port number on which the API server listens or the client connects.", APITLSCertFileKey: "CertFile specifies the path to the TLS certificate file.", APITLSKeyFileKey: "KeyFile specifies the path to the TLS private key file.", APITLSCAFileKey: "CAFile specifies the path to the Certificate Authority file.", @@ -19,9 +19,9 @@ var ConfigDescriptions = map[string]string{ NameProviderKey: "NameProvider specifies the method used to generate names for the node. One of: hostname, aws, gcp, uuid, puuid.", DataDirKey: "DataDir specifies a location on disk where the bacalhau node will maintain state.", StrictVersionMatchKey: "StrictVersionMatch indicates whether to enforce strict version matching.", - OrchestratorEnabledKey: "Enabled indicates whether the orchestrator node is active and available for job submission.", - OrchestratorHostKey: "Host specifies the hostname or IP address for cluster communication.", - OrchestratorPortKey: "Port specifies the port number for cluster communication.", + OrchestratorEnabledKey: "Enabled indicates whether the Web UI is enabled.", + OrchestratorHostKey: "Host specifies the hostname or IP address on which the API server listens or the client connects.", + OrchestratorPortKey: "Port specifies the port number on which the API server listens or the client connects.", OrchestratorAdvertiseKey: "Advertise specifies the address to advertise to other cluster members.", OrchestratorAuthSecretKey: "AuthSecret key specifies the key used by compute nodes to connect to an orchestrator.", OrchestratorTLSCertFileKey: "CertFile specifies the path to the TLS certificate file.", @@ -33,8 +33,8 @@ var ConfigDescriptions = map[string]string{ OrchestratorTLSAutoCertKey: "AutoCert specifies the domain for automatic certificate generation.", OrchestratorTLSAutoCertCachePathKey: "AutoCertCachePath specifies the directory to cache auto-generated certificates.", OrchestratorClusterNameKey: "Name specifies the unique identifier for this orchestrator cluster.", - OrchestratorClusterHostKey: "Host specifies the hostname or IP address for cluster communication.", - OrchestratorClusterPortKey: "Port specifies the port number for cluster communication.", + OrchestratorClusterHostKey: "Host specifies the hostname or IP address on which the API server listens or the client connects.", + OrchestratorClusterPortKey: "Port specifies the port number on which the API server listens or the client connects.", OrchestratorClusterAdvertiseKey: "Advertise specifies the address to advertise to other cluster members.", OrchestratorClusterPeersKey: "Peers is a list of other cluster members to connect to on startup.", OrchestratorNodeManagerDisconnectTimeoutKey: "DisconnectTimeout specifies how long to wait before considering a node disconnected.", @@ -44,7 +44,7 @@ var ConfigDescriptions = map[string]string{ OrchestratorSchedulerHousekeepingTimeoutKey: "HousekeepingTimeout specifies the maximum time allowed for a single housekeeping run.", OrchestratorEvaluationBrokerVisibilityTimeoutKey: "VisibilityTimeout specifies how long an evaluation can be claimed before it's returned to the queue.", OrchestratorEvaluationBrokerMaxRetryCountKey: "ReadTimeout specifies the maximum number of attempts for reading from a storage.", - ComputeEnabledKey: "Enabled indicates whether the orchestrator node is active and available for job submission.", + ComputeEnabledKey: "Enabled indicates whether the Web UI is enabled.", ComputeOrchestratorsKey: "Orchestrators specifies a list of orchestrator endpoints that this compute node connects to.", ComputeTLSCertFileKey: "CertFile specifies the path to the TLS certificate file.", ComputeTLSKeyFileKey: "KeyFile specifies the path to the TLS private key file.", @@ -58,62 +58,62 @@ var ConfigDescriptions = map[string]string{ ComputeHeartbeatResourceUpdateIntervalKey: "ResourceUpdateInterval specifies the time between updates of resource information to the orchestrator.", ComputeHeartbeatIntervalKey: "Interval specifies the time between update checks, when set to 0 update checks are not performed.", ComputeLabelsKey: "Labels are key-value pairs used to describe and categorize the compute node.", - ComputeAllocatedCapacityCPUKey: "CPU specifies the amount of CPU a compute node allocates for running jobs. It can be expressed as a percentage (e.g., \"85%\") or a Kubernetes resource string (e.g., \"100m\").", - ComputeAllocatedCapacityMemoryKey: "Memory specifies the amount of Memory a compute node allocates for running jobs. It can be expressed as a percentage (e.g., \"85%\") or a Kubernetes resource string (e.g., \"1Gi\").", - ComputeAllocatedCapacityDiskKey: "Disk specifies the amount of Disk space a compute node allocates for running jobs. It can be expressed as a percentage (e.g., \"85%\") or a Kubernetes resource string (e.g., \"10Gi\").", - ComputeAllocatedCapacityGPUKey: "GPU specifies the amount of GPU a compute node allocates for running jobs. It can be expressed as a percentage (e.g., \"85%\") or a Kubernetes resource string (e.g., \"1\"). Note: When using percentages, the result is always rounded up to the nearest whole GPU.", + ComputeAllocatedCapacityCPUKey: "CPU specifies the default amount of CPU allocated to a task. It uses Kubernetes resource string format (e.g., \"100m\" for 0.1 CPU cores). This value is used when the task hasn't explicitly set its CPU requirement.", + ComputeAllocatedCapacityMemoryKey: "Memory specifies the default amount of memory allocated to a task. It uses Kubernetes resource string format (e.g., \"256Mi\" for 256 mebibytes). This value is used when the task hasn't explicitly set its memory requirement.", + ComputeAllocatedCapacityDiskKey: "Disk specifies the default amount of disk space allocated to a task. It uses Kubernetes resource string format (e.g., \"1Gi\" for 1 gibibyte). This value is used when the task hasn't explicitly set its disk space requirement.", + ComputeAllocatedCapacityGPUKey: "GPU specifies the default number of GPUs allocated to a task. It uses Kubernetes resource string format (e.g., \"1\" for 1 GPU). This value is used when the task hasn't explicitly set its GPU requirement.", ComputeAllowListedLocalPathsKey: "AllowListedLocalPaths specifies a list of local file system paths that the compute node is allowed to access.", - WebUIEnabledKey: "Enabled indicates whether the orchestrator node is active and available for job submission.", + WebUIEnabledKey: "Enabled indicates whether the Web UI is enabled.", WebUIListenKey: "Listen specifies the address and port on which the Web UI listens.", - InputSourcesDisabledKey: "Disabled specifies a list of storages that are disabled.", + InputSourcesDisabledKey: "Disabled is a list of downloaders that are disabled.", InputSourcesReadTimeoutKey: "ReadTimeout specifies the maximum time allowed for reading from a storage.", InputSourcesMaxRetryCountKey: "ReadTimeout specifies the maximum number of attempts for reading from a storage.", - InputSourcesTypesIPFSEndpointKey: "Endpoint specifies the endpoint URL for the S3 input source.", - InputSourcesTypesS3EndpointKey: "Endpoint specifies the endpoint URL for the S3 input source.", + InputSourcesTypesIPFSEndpointKey: "Endpoint specifies the multi-address to connect to for IPFS. e.g /ip4/127.0.0.1/tcp/5001", + InputSourcesTypesS3EndpointKey: "Endpoint specifies the multi-address to connect to for IPFS. e.g /ip4/127.0.0.1/tcp/5001", InputSourcesTypesS3AccessKeyKey: "AccessKey specifies the access key for the S3 input source.", InputSourcesTypesS3SecretKeyKey: "SecretKey specifies the secret key for the S3 input source.", - PublishersDisabledKey: "Disabled specifies a list of storages that are disabled.", - PublishersTypesIPFSEndpointKey: "Endpoint specifies the endpoint URL for the S3 input source.", + PublishersDisabledKey: "Disabled is a list of downloaders that are disabled.", + PublishersTypesIPFSEndpointKey: "Endpoint specifies the multi-address to connect to for IPFS. e.g /ip4/127.0.0.1/tcp/5001", PublishersTypesS3PreSignedURLDisabledKey: "PreSignedURLDisabled specifies whether pre-signed URLs are enabled for the S3 provider.", PublishersTypesS3PreSignedURLExpirationKey: "PreSignedURLExpiration specifies the duration before a pre-signed URL expires.", PublishersTypesLocalAddressKey: "Address specifies the endpoint the publisher serves on.", - PublishersTypesLocalPortKey: "Port specifies the port number for cluster communication.", + PublishersTypesLocalPortKey: "Port specifies the port number on which the API server listens or the client connects.", PublishersTypesLocalDirectoryKey: "Directory specifies a path to location on disk where content is served from.", - EnginesDisabledKey: "Disabled specifies a list of storages that are disabled.", + EnginesDisabledKey: "Disabled is a list of downloaders that are disabled.", EnginesTypesDockerManifestCacheSizeKey: "Size specifies the size of the Docker manifest cache.", EnginesTypesDockerManifestCacheTTLKey: "TTL specifies the time-to-live duration for cache entries.", EnginesTypesDockerManifestCacheRefreshKey: "Refresh specifies the refresh interval for cache entries.", - ResultDownloadersDisabledKey: "Disabled specifies a list of storages that are disabled.", + ResultDownloadersDisabledKey: "Disabled is a list of downloaders that are disabled.", ResultDownloadersTimeoutKey: "Timeout specifies the maximum time allowed for a download operation.", - ResultDownloadersTypesIPFSEndpointKey: "Endpoint specifies the endpoint URL for the S3 input source.", + ResultDownloadersTypesIPFSEndpointKey: "Endpoint specifies the multi-address to connect to for IPFS. e.g /ip4/127.0.0.1/tcp/5001", JobDefaultsBatchPriorityKey: "Priority specifies the default priority allocated to a service or daemon job. This value is used when the job hasn't explicitly set its priority requirement.", - JobDefaultsBatchTaskResourcesCPUKey: "CPU specifies the amount of CPU a compute node allocates for running jobs. It can be expressed as a percentage (e.g., \"85%\") or a Kubernetes resource string (e.g., \"100m\").", - JobDefaultsBatchTaskResourcesMemoryKey: "Memory specifies the amount of Memory a compute node allocates for running jobs. It can be expressed as a percentage (e.g., \"85%\") or a Kubernetes resource string (e.g., \"1Gi\").", - JobDefaultsBatchTaskResourcesDiskKey: "Disk specifies the amount of Disk space a compute node allocates for running jobs. It can be expressed as a percentage (e.g., \"85%\") or a Kubernetes resource string (e.g., \"10Gi\").", - JobDefaultsBatchTaskResourcesGPUKey: "GPU specifies the amount of GPU a compute node allocates for running jobs. It can be expressed as a percentage (e.g., \"85%\") or a Kubernetes resource string (e.g., \"1\"). Note: When using percentages, the result is always rounded up to the nearest whole GPU.", + JobDefaultsBatchTaskResourcesCPUKey: "CPU specifies the default amount of CPU allocated to a task. It uses Kubernetes resource string format (e.g., \"100m\" for 0.1 CPU cores). This value is used when the task hasn't explicitly set its CPU requirement.", + JobDefaultsBatchTaskResourcesMemoryKey: "Memory specifies the default amount of memory allocated to a task. It uses Kubernetes resource string format (e.g., \"256Mi\" for 256 mebibytes). This value is used when the task hasn't explicitly set its memory requirement.", + JobDefaultsBatchTaskResourcesDiskKey: "Disk specifies the default amount of disk space allocated to a task. It uses Kubernetes resource string format (e.g., \"1Gi\" for 1 gibibyte). This value is used when the task hasn't explicitly set its disk space requirement.", + JobDefaultsBatchTaskResourcesGPUKey: "GPU specifies the default number of GPUs allocated to a task. It uses Kubernetes resource string format (e.g., \"1\" for 1 GPU). This value is used when the task hasn't explicitly set its GPU requirement.", JobDefaultsBatchTaskPublisherConfigTypeKey: "No description available", JobDefaultsBatchTaskPublisherConfigParamsKey: "No description available", JobDefaultsBatchTaskTimeoutsTotalTimeoutKey: "TotalTimeout is the maximum total time allowed for a task", JobDefaultsBatchTaskTimeoutsExecutionTimeoutKey: "ExecutionTimeout is the maximum time allowed for task execution", JobDefaultsOpsPriorityKey: "Priority specifies the default priority allocated to a service or daemon job. This value is used when the job hasn't explicitly set its priority requirement.", - JobDefaultsOpsTaskResourcesCPUKey: "CPU specifies the amount of CPU a compute node allocates for running jobs. It can be expressed as a percentage (e.g., \"85%\") or a Kubernetes resource string (e.g., \"100m\").", - JobDefaultsOpsTaskResourcesMemoryKey: "Memory specifies the amount of Memory a compute node allocates for running jobs. It can be expressed as a percentage (e.g., \"85%\") or a Kubernetes resource string (e.g., \"1Gi\").", - JobDefaultsOpsTaskResourcesDiskKey: "Disk specifies the amount of Disk space a compute node allocates for running jobs. It can be expressed as a percentage (e.g., \"85%\") or a Kubernetes resource string (e.g., \"10Gi\").", - JobDefaultsOpsTaskResourcesGPUKey: "GPU specifies the amount of GPU a compute node allocates for running jobs. It can be expressed as a percentage (e.g., \"85%\") or a Kubernetes resource string (e.g., \"1\"). Note: When using percentages, the result is always rounded up to the nearest whole GPU.", + JobDefaultsOpsTaskResourcesCPUKey: "CPU specifies the default amount of CPU allocated to a task. It uses Kubernetes resource string format (e.g., \"100m\" for 0.1 CPU cores). This value is used when the task hasn't explicitly set its CPU requirement.", + JobDefaultsOpsTaskResourcesMemoryKey: "Memory specifies the default amount of memory allocated to a task. It uses Kubernetes resource string format (e.g., \"256Mi\" for 256 mebibytes). This value is used when the task hasn't explicitly set its memory requirement.", + JobDefaultsOpsTaskResourcesDiskKey: "Disk specifies the default amount of disk space allocated to a task. It uses Kubernetes resource string format (e.g., \"1Gi\" for 1 gibibyte). This value is used when the task hasn't explicitly set its disk space requirement.", + JobDefaultsOpsTaskResourcesGPUKey: "GPU specifies the default number of GPUs allocated to a task. It uses Kubernetes resource string format (e.g., \"1\" for 1 GPU). This value is used when the task hasn't explicitly set its GPU requirement.", JobDefaultsOpsTaskPublisherConfigTypeKey: "No description available", JobDefaultsOpsTaskPublisherConfigParamsKey: "No description available", JobDefaultsOpsTaskTimeoutsTotalTimeoutKey: "TotalTimeout is the maximum total time allowed for a task", JobDefaultsOpsTaskTimeoutsExecutionTimeoutKey: "ExecutionTimeout is the maximum time allowed for task execution", JobDefaultsDaemonPriorityKey: "Priority specifies the default priority allocated to a service or daemon job. This value is used when the job hasn't explicitly set its priority requirement.", - JobDefaultsDaemonTaskResourcesCPUKey: "CPU specifies the amount of CPU a compute node allocates for running jobs. It can be expressed as a percentage (e.g., \"85%\") or a Kubernetes resource string (e.g., \"100m\").", - JobDefaultsDaemonTaskResourcesMemoryKey: "Memory specifies the amount of Memory a compute node allocates for running jobs. It can be expressed as a percentage (e.g., \"85%\") or a Kubernetes resource string (e.g., \"1Gi\").", - JobDefaultsDaemonTaskResourcesDiskKey: "Disk specifies the amount of Disk space a compute node allocates for running jobs. It can be expressed as a percentage (e.g., \"85%\") or a Kubernetes resource string (e.g., \"10Gi\").", - JobDefaultsDaemonTaskResourcesGPUKey: "GPU specifies the amount of GPU a compute node allocates for running jobs. It can be expressed as a percentage (e.g., \"85%\") or a Kubernetes resource string (e.g., \"1\"). Note: When using percentages, the result is always rounded up to the nearest whole GPU.", + JobDefaultsDaemonTaskResourcesCPUKey: "CPU specifies the default amount of CPU allocated to a task. It uses Kubernetes resource string format (e.g., \"100m\" for 0.1 CPU cores). This value is used when the task hasn't explicitly set its CPU requirement.", + JobDefaultsDaemonTaskResourcesMemoryKey: "Memory specifies the default amount of memory allocated to a task. It uses Kubernetes resource string format (e.g., \"256Mi\" for 256 mebibytes). This value is used when the task hasn't explicitly set its memory requirement.", + JobDefaultsDaemonTaskResourcesDiskKey: "Disk specifies the default amount of disk space allocated to a task. It uses Kubernetes resource string format (e.g., \"1Gi\" for 1 gibibyte). This value is used when the task hasn't explicitly set its disk space requirement.", + JobDefaultsDaemonTaskResourcesGPUKey: "GPU specifies the default number of GPUs allocated to a task. It uses Kubernetes resource string format (e.g., \"1\" for 1 GPU). This value is used when the task hasn't explicitly set its GPU requirement.", JobDefaultsServicePriorityKey: "Priority specifies the default priority allocated to a service or daemon job. This value is used when the job hasn't explicitly set its priority requirement.", - JobDefaultsServiceTaskResourcesCPUKey: "CPU specifies the amount of CPU a compute node allocates for running jobs. It can be expressed as a percentage (e.g., \"85%\") or a Kubernetes resource string (e.g., \"100m\").", - JobDefaultsServiceTaskResourcesMemoryKey: "Memory specifies the amount of Memory a compute node allocates for running jobs. It can be expressed as a percentage (e.g., \"85%\") or a Kubernetes resource string (e.g., \"1Gi\").", - JobDefaultsServiceTaskResourcesDiskKey: "Disk specifies the amount of Disk space a compute node allocates for running jobs. It can be expressed as a percentage (e.g., \"85%\") or a Kubernetes resource string (e.g., \"10Gi\").", - JobDefaultsServiceTaskResourcesGPUKey: "GPU specifies the amount of GPU a compute node allocates for running jobs. It can be expressed as a percentage (e.g., \"85%\") or a Kubernetes resource string (e.g., \"1\"). Note: When using percentages, the result is always rounded up to the nearest whole GPU.", + JobDefaultsServiceTaskResourcesCPUKey: "CPU specifies the default amount of CPU allocated to a task. It uses Kubernetes resource string format (e.g., \"100m\" for 0.1 CPU cores). This value is used when the task hasn't explicitly set its CPU requirement.", + JobDefaultsServiceTaskResourcesMemoryKey: "Memory specifies the default amount of memory allocated to a task. It uses Kubernetes resource string format (e.g., \"256Mi\" for 256 mebibytes). This value is used when the task hasn't explicitly set its memory requirement.", + JobDefaultsServiceTaskResourcesDiskKey: "Disk specifies the default amount of disk space allocated to a task. It uses Kubernetes resource string format (e.g., \"1Gi\" for 1 gibibyte). This value is used when the task hasn't explicitly set its disk space requirement.", + JobDefaultsServiceTaskResourcesGPUKey: "GPU specifies the default number of GPUs allocated to a task. It uses Kubernetes resource string format (e.g., \"1\" for 1 GPU). This value is used when the task hasn't explicitly set its GPU requirement.", JobAdmissionControlRejectStatelessJobsKey: "RejectStatelessJobs indicates whether to reject stateless jobs, i.e. jobs without inputs.", JobAdmissionControlAcceptNetworkedJobsKey: "AcceptNetworkedJobs indicates whether to accept jobs that require network access.", JobAdmissionControlProbeHTTPKey: "ProbeHTTP specifies the HTTP endpoint for probing job submission.", @@ -123,4 +123,5 @@ var ConfigDescriptions = map[string]string{ LoggingLogDebugInfoIntervalKey: "LogDebugInfoInterval specifies the interval for logging debug information.", UpdateConfigIntervalKey: "Interval specifies the time between update checks, when set to 0 update checks are not performed.", FeatureFlagsExecTranslationKey: "ExecTranslation enables the execution translation feature.", + DisableAnalyticsKey: "No description available", } diff --git a/pkg/jobstore/boltdb/store.go b/pkg/jobstore/boltdb/store.go index 18d6b3f297..5b86c38e84 100644 --- a/pkg/jobstore/boltdb/store.go +++ b/pkg/jobstore/boltdb/store.go @@ -15,6 +15,7 @@ import ( bolt "go.etcd.io/bbolt" "k8s.io/apimachinery/pkg/labels" + "github.com/bacalhau-project/bacalhau/pkg/analytics" "github.com/bacalhau-project/bacalhau/pkg/bacerrors" "github.com/bacalhau-project/bacalhau/pkg/jobstore" "github.com/bacalhau-project/bacalhau/pkg/lib/marshaller" @@ -45,6 +46,7 @@ type BoltJobStore struct { clock clock.Clock marshaller marshaller.Marshaller watchersManager *jobstore.WatchersManager + recorder analytics.Recorder inProgressIndex *Index namespacesIndex *Index @@ -61,6 +63,12 @@ func WithClock(clock clock.Clock) Option { } } +func WithRecorder(r analytics.Recorder) Option { + return func(store *BoltJobStore) { + store.recorder = r + } +} + // NewBoltJobStore creates a new job store where data is held in buckets, // and indexed by special [Index] instances, also backed by buckets. // Data is currently structured as followed @@ -93,6 +101,7 @@ func NewBoltJobStore(dbPath string, options ...Option) (*BoltJobStore, error) { clock: clock.New(), marshaller: marshaller.NewJSONMarshaller(), watchersManager: jobstore.NewWatchersManager(), + recorder: &analytics.NoopRecorder{}, } for _, opt := range options { @@ -1000,6 +1009,7 @@ func (b *BoltJobStore) updateJobState(tx *bolt.Tx, request jobstore.UpdateJobSta } if job.IsTerminal() { + b.recorder.EmitJobEvent(context.TODO(), analytics.JobComplete, job) // Remove the job from the in progress index, first checking for legacy items // and then removing the composite. Once we are confident no legacy items // are left in the old index we can stick to just the composite