Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

[WIP] Out of core plugin #311

Draft
wants to merge 47 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
e44b90e
Add fastapi plugin
pingsutw Jan 21, 2023
9bf6784
Add dummy plugin
pingsutw Jan 24, 2023
2aff416
nit
pingsutw Jan 24, 2023
e0361d2
test
pingsutw Feb 16, 2023
c34859b
test
pingsutw Feb 16, 2023
e25bdea
test
pingsutw Feb 17, 2023
23c4b89
test
pingsutw Feb 17, 2023
14c8d1e
wip
pingsutw Feb 17, 2023
d78e1b8
wip
pingsutw Feb 17, 2023
0a6cc94
wip
pingsutw Feb 17, 2023
fc8a0ac
wip
pingsutw Feb 17, 2023
71163cb
wip
pingsutw Feb 17, 2023
c96b6d1
wip
pingsutw Feb 17, 2023
3b23bd4
wip
pingsutw Feb 17, 2023
cffd3fa
wip
pingsutw Feb 17, 2023
54593b6
wip
pingsutw Feb 17, 2023
212dd17
wip
pingsutw Feb 17, 2023
0499cfd
wip
pingsutw Feb 18, 2023
640233f
wip
pingsutw Feb 18, 2023
a1a2132
wip
pingsutw Feb 18, 2023
eb06a38
wip
pingsutw Feb 18, 2023
a2881f3
grpc plugin
pingsutw Feb 24, 2023
4342e5f
updated idl version
pingsutw Feb 24, 2023
bc11ffc
merged master
pingsutw Feb 24, 2023
5f4b1ff
wip
pingsutw Feb 24, 2023
8e569f3
wip
pingsutw Feb 24, 2023
2f2dd00
wip
pingsutw Feb 24, 2023
994e67b
wip
pingsutw Feb 24, 2023
2d491c8
wip
pingsutw Feb 24, 2023
f6f20ac
wip
pingsutw Feb 24, 2023
3db2afb
wip
pingsutw Feb 25, 2023
16f97aa
add grpc plugin
pingsutw Feb 27, 2023
f7bf1f5
nit
pingsutw Feb 28, 2023
ec42cf9
nit
pingsutw Feb 28, 2023
5c6957c
nit
pingsutw Feb 28, 2023
0efb30a
wip
pingsutw Feb 28, 2023
eb80e5b
wip
pingsutw Feb 28, 2023
a62cae8
wip
pingsutw Feb 28, 2023
0025427
wip
pingsutw Feb 28, 2023
d326926
wip
pingsutw Feb 28, 2023
5184e62
wip
pingsutw Feb 28, 2023
762fd94
wip
pingsutw Feb 28, 2023
8b9429d
wip
pingsutw Feb 28, 2023
d54e691
wip
pingsutw Feb 28, 2023
145422f
wip
pingsutw Feb 28, 2023
c185a89
wip
pingsutw Mar 6, 2023
1c20b26
wip
pingsutw Mar 6, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ require (
github.com/google/gofuzz v1.2.0 // indirect
github.com/googleapis/gax-go/v2 v2.3.0 // indirect
github.com/googleapis/go-type-adapters v1.0.0 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
Expand Down Expand Up @@ -134,3 +135,5 @@ require (
)

replace github.com/aws/amazon-sagemaker-operator-for-k8s => github.com/aws/amazon-sagemaker-operator-for-k8s v1.0.1-0.20210303003444-0fb33b1fd49d

replace github.com/flyteorg/flyteidl => github.com/flyteorg/flyteidl v1.3.9-0.20230224194627-a1df35060476
5 changes: 3 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,8 @@ github.com/evanphx/json-patch v4.12.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQL
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w=
github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk=
github.com/flyteorg/flyteidl v1.3.6 h1:PI846AdnrQZ84pxRVAzA3WGihv+xXmjQHO91nj/kV9g=
github.com/flyteorg/flyteidl v1.3.6/go.mod h1:Pkt2skI1LiHs/2ZoekBnyPhuGOFMiuul6HHcKGZBsbM=
github.com/flyteorg/flyteidl v1.3.9-0.20230224194627-a1df35060476 h1:mA3Ry5YjNu5BqjnCTbA+lFRTRFjGKEMDALRhLTtBuuU=
github.com/flyteorg/flyteidl v1.3.9-0.20230224194627-a1df35060476/go.mod h1:Pkt2skI1LiHs/2ZoekBnyPhuGOFMiuul6HHcKGZBsbM=
github.com/flyteorg/flytestdlib v1.0.15 h1:kv9jDQmytbE84caY+pkZN8trJU2ouSAmESzpTEhfTt0=
github.com/flyteorg/flytestdlib v1.0.15/go.mod h1:ghw/cjY0sEWIIbyCtcJnL/Gt7ZS7gf9SUi0CCPhbz3s=
github.com/flyteorg/stow v0.3.6 h1:jt50ciM14qhKBaIrB+ppXXY+SXB59FNREFgTJqCyqIk=
Expand Down Expand Up @@ -443,6 +443,7 @@ github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:Fecb
github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=
github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
github.com/grpc-ecosystem/grpc-gateway v1.16.0 h1:gmcG1KaJ57LophUzW0Hy8NmPhnMZb4M0+kPpLofRdBo=
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
Expand Down
6 changes: 5 additions & 1 deletion go/tasks/pluginmachinery/internal/webapi/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func (c CorePlugin) GetProperties() core.PluginProperties {
}

func (c CorePlugin) Handle(ctx context.Context, tCtx core.TaskExecutionContext) (core.Transition, error) {
c.metrics.NumberOfTasks.Inc(ctx)
incomingState, err := c.unmarshalState(ctx, tCtx.PluginStateReader())
if err != nil {
return core.UnknownTransition, err
Expand Down Expand Up @@ -96,7 +97,10 @@ func (c CorePlugin) Handle(ctx context.Context, tCtx core.TaskExecutionContext)
if err := tCtx.PluginStateWriter().Put(pluginStateVersion, nextState); err != nil {
return core.UnknownTransition, err
}

c.metrics.NumberOfTasks.Dec(ctx)
logger.Infof(ctx, "number of requests [%v]", c.metrics.NumberOfTasks)
// logger.Infof(ctx, "request latency [%v]", time.Since(start).Round(time.Microsecond).String())
logger.Infof(ctx, "phaseInfo [%v]", phaseInfo)
return core.DoTransitionType(core.TransitionTypeBarrier, phaseInfo), nil
}

Expand Down
2 changes: 2 additions & 0 deletions go/tasks/pluginmachinery/internal/webapi/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type Metrics struct {
ResourceWaitTime prometheus.Summary
SucceededUnmarshalState labeled.StopWatch
FailedUnmarshalState labeled.Counter
NumberOfTasks labeled.Gauge
}

var (
Expand All @@ -40,5 +41,6 @@ func newMetrics(scope promutils.Scope) Metrics {
time.Millisecond, scope),
FailedUnmarshalState: labeled.NewCounter("unmarshal_state_failed",
"Failed to unmarshal state", scope, labeled.EmitUnlabeledMetric),
NumberOfTasks: labeled.NewGauge("number_of_tasks", "number of running tasks", scope, labeled.EmitUnlabeledMetric),
}
}
1 change: 1 addition & 0 deletions go/tasks/pluginmachinery/webapi/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ type TaskExecutionContext interface {

type GetContext interface {
ResourceMeta() ResourceMeta
Resource() Resource
}

type DeleteContext interface {
Expand Down
62 changes: 62 additions & 0 deletions go/tasks/plugins/webapi/dummy/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package dummy

import (
"time"

pluginsConfig "github.com/flyteorg/flyteplugins/go/tasks/config"
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core"
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/webapi"
"github.com/flyteorg/flytestdlib/config"
)

var (
defaultConfig = Config{
WebAPI: webapi.PluginConfig{
ResourceQuotas: map[core.ResourceNamespace]int{
"default": 1000,
},
ReadRateLimiter: webapi.RateLimiterConfig{
Burst: 100,
QPS: 10,
},
WriteRateLimiter: webapi.RateLimiterConfig{
Burst: 100,
QPS: 10,
},
Caching: webapi.CachingConfig{
Size: 500000,
ResyncInterval: config.Duration{Duration: 30 * time.Second},
Workers: 10,
MaxSystemFailures: 5,
},
ResourceMeta: nil,
},
ResourceConstraints: core.ResourceConstraintsSpec{
ProjectScopeResourceConstraint: &core.ResourceConstraint{
Value: 100,
},
NamespaceScopeResourceConstraint: &core.ResourceConstraint{
Value: 50,
},
},
}

configSection = pluginsConfig.MustRegisterSubSection("dummy", &defaultConfig)
)

// Config is config for 'databricks' plugin
type Config struct {
// WebAPI defines config for the base WebAPI plugin
WebAPI webapi.PluginConfig `json:"webApi" pflag:",Defines config for the base WebAPI plugin."`

// ResourceConstraints defines resource constraints on how many executions to be created per project/overall at any given time
ResourceConstraints core.ResourceConstraintsSpec `json:"resourceConstraints" pflag:"-,Defines resource constraints on how many executions to be created per project/overall at any given time."`
}

func GetConfig() *Config {
return configSection.GetConfig().(*Config)
}

func SetConfig(cfg *Config) error {
return configSection.SetConfig(cfg)
}
132 changes: 132 additions & 0 deletions go/tasks/plugins/webapi/dummy/plugin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package dummy

import (
"context"
"encoding/gob"
flyteIdlCore "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/ioutils"
"math/rand"
"net/http"

pluginsCore "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core"
"github.com/flyteorg/flytestdlib/promutils"

"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery"
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core"
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/webapi"
)

// for mocking/testing purposes, and we'll override this method
type HTTPClient interface {
Do(req *http.Request) (*http.Response, error)
}

type Plugin struct {
metricScope promutils.Scope
cfg *Config
client HTTPClient
}

type ResourceWrapper struct {
StatusCode int
JobID string
Message string
}

type ResourceMetaWrapper struct {
RunID string
Token string
}

func (p Plugin) GetConfig() webapi.PluginConfig {
return GetConfig().WebAPI
}

func (p Plugin) ResourceRequirements(_ context.Context, _ webapi.TaskExecutionContextReader) (
namespace core.ResourceNamespace, constraints core.ResourceConstraintsSpec, err error) {

// Resource requirements are assumed to be the same.
return "default", p.cfg.ResourceConstraints, nil
}

func (p Plugin) Create(ctx context.Context, taskCtx webapi.TaskExecutionContextReader) (webapi.ResourceMeta,
webapi.Resource, error) {
_, err := taskCtx.TaskReader().Read(ctx)
if err != nil {
return nil, nil, err
}

return &ResourceMetaWrapper{RunID: "runID", Token: "token"},
&ResourceWrapper{StatusCode: 200}, nil
}

func (p Plugin) Get(ctx context.Context, taskCtx webapi.GetContext) (latest webapi.Resource, err error) {
return &ResourceWrapper{
StatusCode: 200,
JobID: "jobID",
}, nil
}

func (p Plugin) Delete(ctx context.Context, taskCtx webapi.DeleteContext) error {
return nil
}

func (p Plugin) Status(ctx context.Context, taskCtx webapi.StatusContext) (phase core.PhaseInfo, err error) {
x := rand.Intn(100)
if x < 50 {
err := writeOutput(ctx, taskCtx, "s3://bucket/key")
if err != nil {
return core.PhaseInfo{}, err
}
return pluginsCore.PhaseInfoSuccess(&core.TaskInfo{}), nil
}
return core.PhaseInfoRunning(pluginsCore.DefaultPhaseVersion, &core.TaskInfo{}), nil
}

func writeOutput(ctx context.Context, tCtx webapi.StatusContext, OutputLocation string) error {
_, err := tCtx.TaskReader().Read(ctx)
if err != nil {
return err
}

return tCtx.OutputWriter().Put(ctx, ioutils.NewInMemoryOutputReader(
&flyteIdlCore.LiteralMap{
Literals: map[string]*flyteIdlCore.Literal{
"results": {
Value: &flyteIdlCore.Literal_Scalar{
Scalar: &flyteIdlCore.Scalar{
Value: &flyteIdlCore.Scalar_StructuredDataset{
StructuredDataset: &flyteIdlCore.StructuredDataset{
Uri: OutputLocation,
Metadata: &flyteIdlCore.StructuredDatasetMetadata{
StructuredDatasetType: &flyteIdlCore.StructuredDatasetType{Format: ""},
},
},
},
},
},
},
},
}, nil, nil))
}

func newDummyTaskPlugin() webapi.PluginEntry {
return webapi.PluginEntry{
ID: "dummy",
SupportedTaskTypes: []core.TaskType{"bigquery_query_job_task", "snowflake", "spark"},
PluginLoader: func(ctx context.Context, iCtx webapi.PluginSetupContext) (webapi.AsyncPlugin, error) {
return &Plugin{
metricScope: iCtx.MetricsScope(),
cfg: GetConfig(),
client: &http.Client{},
}, nil
},
}
}

func init() {
gob.Register(ResourceMetaWrapper{})
gob.Register(ResourceWrapper{})

pluginmachinery.PluginRegistry().RegisterRemotePlugin(newDummyTaskPlugin())
}
70 changes: 70 additions & 0 deletions go/tasks/plugins/webapi/fastapi/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package fastapi

import (
"time"

pluginsConfig "github.com/flyteorg/flyteplugins/go/tasks/config"
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core"
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/webapi"
"github.com/flyteorg/flytestdlib/config"
)

var (
tokenKey = "FLYTE_FAST_API_TOKEN" // nolint: gosec

defaultConfig = Config{
WebAPI: webapi.PluginConfig{
ResourceQuotas: map[core.ResourceNamespace]int{
"default": 1000,
},
ReadRateLimiter: webapi.RateLimiterConfig{
Burst: 100,
QPS: 10,
},
WriteRateLimiter: webapi.RateLimiterConfig{
Burst: 100,
QPS: 10,
},
Caching: webapi.CachingConfig{
Size: 500000,
ResyncInterval: config.Duration{Duration: 30 * time.Second},
Workers: 10,
MaxSystemFailures: 5,
},
ResourceMeta: nil,
},
ResourceConstraints: core.ResourceConstraintsSpec{
ProjectScopeResourceConstraint: &core.ResourceConstraint{
Value: 100,
},
NamespaceScopeResourceConstraint: &core.ResourceConstraint{
Value: 50,
},
},
TokenKey: tokenKey,
}

configSection = pluginsConfig.MustRegisterSubSection("fastapi", &defaultConfig)
)

// Config is config for 'databricks' plugin
type Config struct {
// WebAPI defines config for the base WebAPI plugin
WebAPI webapi.PluginConfig `json:"webApi" pflag:",Defines config for the base WebAPI plugin."`

// ResourceConstraints defines resource constraints on how many executions to be created per project/overall at any given time
ResourceConstraints core.ResourceConstraintsSpec `json:"resourceConstraints" pflag:"-,Defines resource constraints on how many executions to be created per project/overall at any given time."`

TokenKey string `json:"fastApiTokenKey" pflag:",Name of the key where to find Fast API access token in the secret manager."`

// fastAPIEndpoint overrides fastapi server endpoint, only for testing
fastAPIEndpoint string
}

func GetConfig() *Config {
return configSection.GetConfig().(*Config)
}

func SetConfig(cfg *Config) error {
return configSection.SetConfig(cfg)
}
Loading