From d7929ca0dabec733c701016419f6bf56eb044ee9 Mon Sep 17 00:00:00 2001 From: Anindita Ghosh <88458927+AnieeG@users.noreply.github.com> Date: Mon, 9 Sep 2024 07:54:12 -0700 Subject: [PATCH] Enabling Job-Distributor container module (#1123) --- lib/docker/test_env/job_distributor/jd.go | 193 ++++++++++++++++++ .../test_env/job_distributor/jd_test.go | 54 +++++ lib/docker/test_env/postgres.go | 4 +- lib/go.mod | 2 +- wasp/profile.go | 2 +- 5 files changed, 251 insertions(+), 4 deletions(-) create mode 100644 lib/docker/test_env/job_distributor/jd.go create mode 100644 lib/docker/test_env/job_distributor/jd_test.go diff --git a/lib/docker/test_env/job_distributor/jd.go b/lib/docker/test_env/job_distributor/jd.go new file mode 100644 index 000000000..c2638ae52 --- /dev/null +++ b/lib/docker/test_env/job_distributor/jd.go @@ -0,0 +1,193 @@ +package job_distributor + +import ( + "fmt" + "testing" + "time" + + "github.com/google/uuid" + "github.com/pkg/errors" + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" + tc "github.com/testcontainers/testcontainers-go" + tcwait "github.com/testcontainers/testcontainers-go/wait" + + "github.com/smartcontractkit/chainlink-testing-framework/lib/docker" + "github.com/smartcontractkit/chainlink-testing-framework/lib/docker/test_env" + "github.com/smartcontractkit/chainlink-testing-framework/lib/logging" + "github.com/smartcontractkit/chainlink-testing-framework/lib/utils/testcontext" +) + +const ( + JDContainerName string = "job-distributor" + DEAFULTJDContainerPort string = "42242" + DEFAULTCSAKeyEncryptionKey string = "!PASsword000!" + DEAFULTWSRPCContainerPort string = "8080" +) + +type Option = func(j *Component) + +type Component struct { + test_env.EnvComponent + Grpc string + Wsrpc string + InternalGRPC string + InternalWSRPC string + l zerolog.Logger + t *testing.T + dbConnection string + containerPort string + wsrpcPort string + csaKeyEncryptionKey string +} + +func (j *Component) startOrRestartContainer(withReuse bool) error { + req := j.getContainerRequest() + l := logging.GetTestContainersGoTestLogger(j.t) + c, err := docker.StartContainerWithRetry(j.l, tc.GenericContainerRequest{ + ContainerRequest: *req, + Started: true, + Reuse: withReuse, + Logger: l, + }) + if err != nil { + return err + } + j.Container = c + ctx := testcontext.Get(j.t) + host, err := test_env.GetHost(ctx, c) + if err != nil { + return errors.Wrapf(err, "cannot get host for container %s", j.ContainerName) + } + + p, err := c.MappedPort(ctx, test_env.NatPort(j.containerPort)) + if err != nil { + return errors.Wrapf(err, "cannot get container mapped port for container %s", j.ContainerName) + } + j.Grpc = fmt.Sprintf("%s:%s", host, p.Port()) + + p, err = c.MappedPort(ctx, test_env.NatPort(j.wsrpcPort)) + if err != nil { + return errors.Wrapf(err, "cannot get wsrpc mapped port for container %s", j.ContainerName) + } + j.Wsrpc = fmt.Sprintf("%s:%s", host, p.Port()) + j.InternalGRPC = fmt.Sprintf("%s:%s", j.ContainerName, j.containerPort) + + j.InternalWSRPC = fmt.Sprintf("%s:%s", j.ContainerName, j.wsrpcPort) + j.l.Info(). + Str("containerName", j.ContainerName). + Str("grpcURI", j.Grpc). + Str("wsrpcURI", j.Wsrpc). + Str("internalGRPC", j.InternalGRPC). + Str("internalWSRPC", j.InternalWSRPC). + Msg("Started Job Distributor container") + + return nil +} + +func (j *Component) getContainerRequest() *tc.ContainerRequest { + return &tc.ContainerRequest{ + Name: j.ContainerName, + Image: fmt.Sprintf("%s:%s", j.ContainerImage, j.ContainerVersion), + ExposedPorts: []string{ + test_env.NatPortFormat(j.containerPort), + test_env.NatPortFormat(j.wsrpcPort), + }, + Env: map[string]string{ + "DATABASE_URL": j.dbConnection, + "PORT": j.containerPort, + "NODE_RPC_PORT": j.wsrpcPort, + "CSA_KEY_ENCRYPTION_SECRET": j.csaKeyEncryptionKey, + }, + Networks: j.Networks, + WaitingFor: tcwait.ForAll( + tcwait.ForListeningPort(test_env.NatPort(j.containerPort)), + tcwait.ForListeningPort(test_env.NatPort(j.wsrpcPort)), + ), + LifecycleHooks: []tc.ContainerLifecycleHooks{ + { + PostStarts: j.PostStartsHooks, + PostStops: j.PostStopsHooks, + }, + }, + } +} + +func (j *Component) StartContainer() error { + return j.startOrRestartContainer(false) +} + +func (j *Component) RestartContainer() error { + return j.startOrRestartContainer(true) +} + +func New(networks []string, opts ...Option) *Component { + id, _ := uuid.NewRandom() + j := &Component{ + EnvComponent: test_env.EnvComponent{ + ContainerName: fmt.Sprintf("%s-%s", JDContainerName, id.String()[0:8]), + Networks: networks, + StartupTimeout: 2 * time.Minute, + }, + containerPort: DEAFULTJDContainerPort, + wsrpcPort: DEAFULTWSRPCContainerPort, + csaKeyEncryptionKey: DEFAULTCSAKeyEncryptionKey, + l: log.Logger, + } + j.SetDefaultHooks() + for _, opt := range opts { + opt(j) + } + return j +} + +func WithTestInstance(t *testing.T) Option { + return func(j *Component) { + j.l = logging.GetTestLogger(t) + j.t = t + } +} + +func WithContainerPort(port string) Option { + return func(j *Component) { + j.containerPort = port + } +} + +func WithWSRPCContainerPort(port string) Option { + return func(j *Component) { + j.wsrpcPort = port + } +} + +func WithDBURL(db string) Option { + return func(j *Component) { + if db != "" { + j.dbConnection = db + } + } +} + +func WithContainerName(name string) Option { + return func(j *Component) { + j.ContainerName = name + } +} + +func WithImage(image string) Option { + return func(j *Component) { + j.ContainerImage = image + } +} + +func WithVersion(version string) Option { + return func(j *Component) { + j.ContainerVersion = version + } +} + +func WithCSAKeyEncryptionKey(key string) Option { + return func(j *Component) { + j.csaKeyEncryptionKey = key + } +} diff --git a/lib/docker/test_env/job_distributor/jd_test.go b/lib/docker/test_env/job_distributor/jd_test.go new file mode 100644 index 000000000..8f6b8515e --- /dev/null +++ b/lib/docker/test_env/job_distributor/jd_test.go @@ -0,0 +1,54 @@ +package job_distributor + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + "github.com/smartcontractkit/chainlink-testing-framework/lib/docker" + "github.com/smartcontractkit/chainlink-testing-framework/lib/docker/test_env" + "github.com/smartcontractkit/chainlink-testing-framework/lib/logging" +) + +func TestJDSpinUp(t *testing.T) { + t.Skipf("TODO enable this when jd image is available in ci") + l := logging.GetTestLogger(t) + network, err := docker.CreateNetwork(l) + require.NoError(t, err) + + // create a postgres first + pg, err := test_env.NewPostgresDb( + []string{network.Name}, + test_env.WithPostgresDbName("jd-db"), + test_env.WithPostgresImageVersion("14.1")) + require.NoError(t, err) + err = pg.StartContainer() + require.NoError(t, err) + + jd := New([]string{network.Name}, + //TODO: replace with actual image + WithImage("localhost:5001/jd"), + WithVersion("latest"), + WithDBURL(pg.InternalURL.String()), + ) + + err = jd.StartContainer() + require.NoError(t, err) + // create a jd connection + _, err = newConnection(jd.Grpc) + require.NoError(t, err) +} + +func newConnection(target string) (*grpc.ClientConn, error) { + var opts []grpc.DialOption + opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) + conn, err := grpc.NewClient(target, opts...) + if err != nil { + return nil, fmt.Errorf("Failed to connect to service at %s. Err: %w", target, err) + } + + return conn, nil +} diff --git a/lib/docker/test_env/postgres.go b/lib/docker/test_env/postgres.go index 882ca2c0f..e2f1f6ad9 100644 --- a/lib/docker/test_env/postgres.go +++ b/lib/docker/test_env/postgres.go @@ -158,13 +158,13 @@ func (pg *PostgresDb) startOrRestartContainer(withReuse bool) error { internalUrl, err := url.Parse(fmt.Sprintf("postgres://%s:%s@%s:%s/%s?sslmode=disable", pg.User, pg.Password, pg.ContainerName, "5432", pg.DbName)) if err != nil { - return fmt.Errorf("error parsing mercury db internal url: %w", err) + return errors.Wrap(err, "error parsing db internal url") } pg.InternalURL = internalUrl externalUrl, err := url.Parse(fmt.Sprintf("postgres://%s:%s@127.0.0.1:%s/%s?sslmode=disable", pg.User, pg.Password, externalPort.Port(), pg.DbName)) if err != nil { - return fmt.Errorf("error parsing mercury db external url: %w", err) + return errors.Wrap(err, "error parsing db external url") } pg.ExternalURL = externalUrl diff --git a/lib/go.mod b/lib/go.mod index 03efc1bc0..2e78a07d2 100644 --- a/lib/go.mod +++ b/lib/go.mod @@ -305,7 +305,7 @@ require ( gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240528184218-531527333157 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240701130421-f6361c86f094 // indirect - google.golang.org/grpc v1.65.0 // indirect + google.golang.org/grpc v1.65.0 google.golang.org/protobuf v1.34.2 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect diff --git a/wasp/profile.go b/wasp/profile.go index 5b2fd15e6..114eeae4e 100644 --- a/wasp/profile.go +++ b/wasp/profile.go @@ -84,7 +84,7 @@ func (m *Profile) printDashboardLink() { if err != nil { log.Warn().Msgf("could not get dashboard link: %s", err) } else { - log.Info().Msgf("Dashboard URL: " + url) + log.Info().Msgf("Dashboard URL: %s", url) } }