Skip to content

Commit

Permalink
whitelist containers for which logs should be extracted (#5612)
Browse files Browse the repository at this point in the history
  • Loading branch information
dejanzele authored Jun 27, 2024
1 parent e249f5d commit f994ef3
Show file tree
Hide file tree
Showing 7 changed files with 127 additions and 50 deletions.
2 changes: 2 additions & 0 deletions cmd/api-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,7 @@ func main() {
logsStream,
features,
cfg.TestkubeDefaultStorageClassName,
cfg.WhitelistedContainers,
)
if err != nil {
exitOnError("Creating executor client", err)
Expand Down Expand Up @@ -509,6 +510,7 @@ func main() {
logsStream,
features,
cfg.TestkubeDefaultStorageClassName,
cfg.WhitelistedContainers,
)
if err != nil {
exitOnError("Creating container executor", err)
Expand Down
62 changes: 32 additions & 30 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,36 +7,38 @@ import (
)

type Config struct {
APIServerPort string `envconfig:"APISERVER_PORT" default:"8088"`
APIServerConfig string `envconfig:"APISERVER_CONFIG" default:""`
APIServerFullname string `envconfig:"APISERVER_FULLNAME" default:"testkube-api-server"`
APIMongoDSN string `envconfig:"API_MONGO_DSN" default:"mongodb://localhost:27017"`
APIMongoAllowTLS bool `envconfig:"API_MONGO_ALLOW_TLS" default:"false"`
APIMongoSSLCert string `envconfig:"API_MONGO_SSL_CERT" default:""`
APIMongoSSLCAFileKey string `envconfig:"API_MONGO_SSL_CA_FILE_KEY" default:"sslCertificateAuthorityFile"`
APIMongoSSLClientFileKey string `envconfig:"API_MONGO_SSL_CLIENT_FILE_KEY" default:"sslClientCertificateKeyFile"`
APIMongoSSLClientFilePass string `envconfig:"API_MONGO_SSL_CLIENT_FILE_PASS_KEY" default:"sslClientCertificateKeyFilePassword"`
APIMongoAllowDiskUse bool `envconfig:"API_MONGO_ALLOW_DISK_USE" default:"false"`
APIMongoDB string `envconfig:"API_MONGO_DB" default:"testkube"`
APIMongoDBType string `envconfig:"API_MONGO_DB_TYPE" default:"mongo"`
SlackToken string `envconfig:"SLACK_TOKEN" default:""`
SlackConfig string `envconfig:"SLACK_CONFIG" default:""`
SlackTemplate string `envconfig:"SLACK_TEMPLATE" default:""`
StorageEndpoint string `envconfig:"STORAGE_ENDPOINT" default:"localhost:9000"`
StorageBucket string `envconfig:"STORAGE_BUCKET" default:"testkube-logs"`
StorageExpiration int `envconfig:"STORAGE_EXPIRATION"`
StorageAccessKeyID string `envconfig:"STORAGE_ACCESSKEYID" default:""`
StorageSecretAccessKey string `envconfig:"STORAGE_SECRETACCESSKEY" default:""`
StorageRegion string `envconfig:"STORAGE_REGION" default:""`
StorageToken string `envconfig:"STORAGE_TOKEN" default:""`
StorageSSL bool `envconfig:"STORAGE_SSL" default:"false"`
StorageSkipVerify bool `envconfig:"STORAGE_SKIP_VERIFY" default:"false"`
StorageCertFile string `envconfig:"STORAGE_CERT_FILE" default:""`
StorageKeyFile string `envconfig:"STORAGE_KEY_FILE" default:""`
StorageCAFile string `envconfig:"STORAGE_CA_FILE" default:""`
ScrapperEnabled bool `envconfig:"SCRAPPERENABLED" default:"false"`
LogsBucket string `envconfig:"LOGS_BUCKET" default:""`
LogsStorage string `envconfig:"LOGS_STORAGE" default:""`
APIServerPort string `envconfig:"APISERVER_PORT" default:"8088"`
APIServerConfig string `envconfig:"APISERVER_CONFIG" default:""`
APIServerFullname string `envconfig:"APISERVER_FULLNAME" default:"testkube-api-server"`
APIMongoDSN string `envconfig:"API_MONGO_DSN" default:"mongodb://localhost:27017"`
APIMongoAllowTLS bool `envconfig:"API_MONGO_ALLOW_TLS" default:"false"`
APIMongoSSLCert string `envconfig:"API_MONGO_SSL_CERT" default:""`
APIMongoSSLCAFileKey string `envconfig:"API_MONGO_SSL_CA_FILE_KEY" default:"sslCertificateAuthorityFile"`
APIMongoSSLClientFileKey string `envconfig:"API_MONGO_SSL_CLIENT_FILE_KEY" default:"sslClientCertificateKeyFile"`
APIMongoSSLClientFilePass string `envconfig:"API_MONGO_SSL_CLIENT_FILE_PASS_KEY" default:"sslClientCertificateKeyFilePassword"`
APIMongoAllowDiskUse bool `envconfig:"API_MONGO_ALLOW_DISK_USE" default:"false"`
APIMongoDB string `envconfig:"API_MONGO_DB" default:"testkube"`
APIMongoDBType string `envconfig:"API_MONGO_DB_TYPE" default:"mongo"`
SlackToken string `envconfig:"SLACK_TOKEN" default:""`
SlackConfig string `envconfig:"SLACK_CONFIG" default:""`
SlackTemplate string `envconfig:"SLACK_TEMPLATE" default:""`
StorageEndpoint string `envconfig:"STORAGE_ENDPOINT" default:"localhost:9000"`
StorageBucket string `envconfig:"STORAGE_BUCKET" default:"testkube-logs"`
StorageExpiration int `envconfig:"STORAGE_EXPIRATION"`
StorageAccessKeyID string `envconfig:"STORAGE_ACCESSKEYID" default:""`
StorageSecretAccessKey string `envconfig:"STORAGE_SECRETACCESSKEY" default:""`
StorageRegion string `envconfig:"STORAGE_REGION" default:""`
StorageToken string `envconfig:"STORAGE_TOKEN" default:""`
StorageSSL bool `envconfig:"STORAGE_SSL" default:"false"`
StorageSkipVerify bool `envconfig:"STORAGE_SKIP_VERIFY" default:"false"`
StorageCertFile string `envconfig:"STORAGE_CERT_FILE" default:""`
StorageKeyFile string `envconfig:"STORAGE_KEY_FILE" default:""`
StorageCAFile string `envconfig:"STORAGE_CA_FILE" default:""`
ScrapperEnabled bool `envconfig:"SCRAPPERENABLED" default:"false"`
LogsBucket string `envconfig:"LOGS_BUCKET" default:""`
LogsStorage string `envconfig:"LOGS_STORAGE" default:""`
// WhitelistedContainers is a list of containers from which logs should be collected.
WhitelistedContainers []string `envconfig:"WHITELISTED_CONTAINERS" default:"init,logs,scraper"`
NatsEmbedded bool `envconfig:"NATS_EMBEDDED" default:"false"`
NatsEmbeddedStoreDir string `envconfig:"NATS_EMBEDDED_STORE_DIR" default:"/app/nats"`
NatsURI string `envconfig:"NATS_URI" default:"nats://localhost:4222"`
Expand Down
32 changes: 22 additions & 10 deletions pkg/executor/client/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ func NewJobExecutor(
logsStream logsclient.Stream,
features featureflags.FeatureFlags,
defaultStorageClassName string,
whitelistedContainers []string,
) (client *JobExecutor, err error) {
if serviceAccountNames == nil {
serviceAccountNames = make(map[string]string)
Expand Down Expand Up @@ -128,6 +129,7 @@ func NewJobExecutor(
logsStream: logsStream,
features: features,
defaultStorageClassName: defaultStorageClassName,
whitelistedContainers: whitelistedContainers,
}, nil
}

Expand Down Expand Up @@ -160,6 +162,8 @@ type JobExecutor struct {
logsStream logsclient.Stream
features featureflags.FeatureFlags
defaultStorageClassName string
// whitelistedContainers is a list of containers from which logs are allowed to be streamed.
whitelistedContainers []string
}

type JobOptions struct {
Expand Down Expand Up @@ -413,9 +417,9 @@ func (c *JobExecutor) updateResultsFromPod(ctx context.Context, pod corev1.Pod,
}
l.Debug("poll immediate end")

c.streamLog(ctx, execution.Id, events.NewLog("analyzing test results and artfacts"))
c.streamLog(ctx, execution.Id, events.NewLog("analyzing test results and artifacts"))

logs, err := executor.GetPodLogs(ctx, c.ClientSet, execution.TestNamespace, pod)
logs, err := executor.GetPodLogs(ctx, c.ClientSet, execution.TestNamespace, pod, execution.Id, c.whitelistedContainers)
if err != nil {
l.Errorw("get pod logs error", "error", err)
c.streamLog(ctx, execution.Id, events.NewErrorLog(err))
Expand Down Expand Up @@ -667,12 +671,12 @@ func (c *JobExecutor) TailJobLogs(ctx context.Context, id, namespace string, log

case corev1.PodRunning:
l.Debug("tailing pod logs: immediately")
return c.TailPodLogs(ctx, pod, logs)
return c.TailPodLogs(ctx, id, pod, logs)

case corev1.PodFailed:
err := errors.Errorf("can't get pod logs, pod failed: %s/%s", pod.Namespace, pod.Name)
l.Errorw(err.Error())
return c.GetLastLogLineError(ctx, pod)
return c.GetLastLogLineError(ctx, pod, id)

default:
l.Debugw("tailing job logs: waiting for pod to be ready")
Expand All @@ -682,15 +686,17 @@ func (c *JobExecutor) TailJobLogs(ctx context.Context, id, namespace string, log
}

l.Debug("tailing pod logs")
return c.TailPodLogs(ctx, pod, logs)
return c.TailPodLogs(ctx, id, pod, logs)
}
}
}

return
}

func (c *JobExecutor) TailPodLogs(ctx context.Context, pod corev1.Pod, logs chan []byte) (err error) {
// TailPodLogs returns logs from all containers in pod in parallel.
// id parameter corresponds to the test execution id, and test pod containers are prefixed by it.
func (c *JobExecutor) TailPodLogs(ctx context.Context, id string, pod corev1.Pod, logs chan []byte) (err error) {
var containers []string
for _, container := range pod.Spec.InitContainers {
containers = append(containers, container.Name)
Expand All @@ -706,6 +712,10 @@ func (c *JobExecutor) TailPodLogs(ctx context.Context, pod corev1.Pod, logs chan
wg.Add(len(containers))

for _, container := range containers {
if !executor.IsWhitelistedContainer(container, id, c.whitelistedContainers) {
wg.Done()
continue
}
go func(container string) {
defer wg.Done()

Expand Down Expand Up @@ -751,15 +761,17 @@ func (c *JobExecutor) TailPodLogs(ctx context.Context, pod corev1.Pod, logs chan
}

// GetPodLogError returns last line as error
func (c *JobExecutor) GetPodLogError(ctx context.Context, pod corev1.Pod) (logsBytes []byte, err error) {
// id parameter corresponds to the test execution id, and test pod containers are prefixed by it.
func (c *JobExecutor) GetPodLogError(ctx context.Context, pod corev1.Pod, id string) (logsBytes []byte, err error) {
// error line should be last one
return executor.GetPodLogs(ctx, c.ClientSet, pod.Namespace, pod, 1)
return executor.GetPodLogs(ctx, c.ClientSet, pod.Namespace, pod, id, c.whitelistedContainers, 1)
}

// GetLastLogLineError return error if last line is failed
func (c *JobExecutor) GetLastLogLineError(ctx context.Context, pod corev1.Pod) error {
// id parameter corresponds to the test execution id, and test pod containers are prefixed by it.
func (c *JobExecutor) GetLastLogLineError(ctx context.Context, pod corev1.Pod, id string) error {
l := c.Log.With("pod", pod.Name, "namespace", pod.Namespace)
errorLog, err := c.GetPodLogError(ctx, pod)
errorLog, err := c.GetPodLogError(ctx, pod, id)
if err != nil {
l.Errorw("getPodLogs error", "error", err, "pod", pod)
return errors.Errorf("getPodLogs error: %v", err)
Expand Down
21 changes: 20 additions & 1 deletion pkg/executor/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ func GetJobPods(ctx context.Context, podsClient tcorev1.PodInterface, jobName st
}

// GetPodLogs returns pod logs bytes
func GetPodLogs(ctx context.Context, c kubernetes.Interface, namespace string, pod corev1.Pod, logLinesCount ...int64) (logs []byte, err error) {
func GetPodLogs(ctx context.Context, c kubernetes.Interface, namespace string, pod corev1.Pod, id string, whitelistedContainers []string, logLinesCount ...int64) (logs []byte, err error) {
var count int64 = defaultLogLinesCount
if len(logLinesCount) > 0 {
count = logLinesCount[0]
Expand All @@ -388,6 +388,9 @@ func GetPodLogs(ctx context.Context, c kubernetes.Interface, namespace string, p
}

for _, container := range containers {
if !IsWhitelistedContainer(container, id, whitelistedContainers) {
continue
}
containerLogs, err := GetContainerLogs(ctx, c, &pod, container, namespace, &count)
if err != nil {
if errors.Is(err, ErrPodInitializing) {
Expand Down Expand Up @@ -618,3 +621,19 @@ func GetPodEventsSummary(ctx context.Context, client kubernetes.Interface, pod *

return message, nil
}

func IsWhitelistedContainer(containerName string, id string, whitelistedContainers []string) bool {
if containerName == id {
return true
}
withID := func(name string) string {
return fmt.Sprintf("%s-%s", id, name)
}
for _, whitelistedContainer := range whitelistedContainers {
if containerName == withID(whitelistedContainer) {
return true
}
}

return false
}
42 changes: 38 additions & 4 deletions pkg/executor/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package executor

import (
"context"
"fmt"
"reflect"
"testing"

Expand Down Expand Up @@ -99,12 +100,12 @@ func TestGetPodLogs(t *testing.T) {
Spec: corev1.PodSpec{
InitContainers: []corev1.Container{
{
Name: "init_container",
Name: "1234-init",
},
},
Containers: []corev1.Container{
{
Name: "first_container",
Name: "1234",
},
{
Name: "second_container",
Expand All @@ -113,13 +114,14 @@ func TestGetPodLogs(t *testing.T) {
},
},
},
wantLogs: []byte("fake logsfake logsfake logs"),
wantLogs: []byte("fake logsfake logs"),
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
gotLogs, err := GetPodLogs(context.Background(), tt.args.c, tt.args.namespace, tt.args.pod, tt.args.logLinesCount...)
whitelistedContainers := []string{"logs", "init", "scraper"}
gotLogs, err := GetPodLogs(context.Background(), tt.args.c, tt.args.namespace, tt.args.pod, "1234", whitelistedContainers, tt.args.logLinesCount...)
if (err != nil) != tt.wantErr {
t.Errorf("GetPodLogs() error = %v, wantErr %v", err, tt.wantErr)
return
Expand All @@ -130,3 +132,35 @@ func TestGetPodLogs(t *testing.T) {
})
}
}

func TestIsWhitelistedContainer(t *testing.T) {
t.Parallel()

tests := []struct {
containerName string
id string
expected bool
}{
{"mycontainer", "mycontainer", true},
{"mycontainer-init", "mycontainer", true},
{"mycontainer-scraper", "mycontainer", true},
{"mycontainer-logs", "mycontainer", true},
{"anothercontainer", "mycontainer", false},
{"istio-init", "mycontainer", false},
{"istio-proxy", "mycontainer", false},
{"scraper-mycontainer", "mycontainer", false},
{"logs-mycontainer", "mycontainer", false},
{"", "mycontainer", false},
{"mycontainer", "", false},
}

for _, tt := range tests {
t.Run(fmt.Sprintf("containerName: %s, id: %s", tt.containerName, tt.id), func(t *testing.T) {
whitelisted := []string{"logs", "init", "scraper"}
result := IsWhitelistedContainer(tt.containerName, tt.id, whitelisted)
if result != tt.expected {
t.Errorf("expected %v, got %v", tt.expected, result)
}
})
}
}
8 changes: 6 additions & 2 deletions pkg/executor/containerexecutor/containerexecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ func NewContainerExecutor(
logsStream logsclient.Stream,
features featureflags.FeatureFlags,
defaultStorageClassName string,
whitelistedContainers []string,
) (client *ContainerExecutor, err error) {
clientSet, err := k8sclient.ConnectToK8s()
if err != nil {
Expand Down Expand Up @@ -119,6 +120,7 @@ func NewContainerExecutor(
logsStream: logsStream,
features: features,
defaultStorageClassName: defaultStorageClassName,
whitelistedContainers: whitelistedContainers,
}, nil
}

Expand Down Expand Up @@ -152,6 +154,8 @@ type ContainerExecutor struct {
logsStream logsclient.Stream
features featureflags.FeatureFlags
defaultStorageClassName string
// whitelistedContainers is a list of containers from which logs are allowed to be streamed.
whitelistedContainers []string
}

type JobOptions struct {
Expand Down Expand Up @@ -455,7 +459,7 @@ func (c *ContainerExecutor) updateResultsFromPod(
execution.ExecutionResult.Error()
}

scraperLogs, err = executor.GetPodLogs(ctx, c.clientSet, execution.TestNamespace, *latestScraperPod)
scraperLogs, err = executor.GetPodLogs(ctx, c.clientSet, execution.TestNamespace, *latestScraperPod, execution.Id, c.whitelistedContainers)
if err != nil {
l.Errorw("get scraper pod logs error", "error", err)
}
Expand All @@ -474,7 +478,7 @@ func (c *ContainerExecutor) updateResultsFromPod(
}
}

executorLogs, err := executor.GetPodLogs(ctx, c.clientSet, execution.TestNamespace, *latestExecutorPod)
executorLogs, err := executor.GetPodLogs(ctx, c.clientSet, execution.TestNamespace, *latestExecutorPod, execution.Id, c.whitelistedContainers)
if err != nil {
l.Errorw("get executor pod logs error", "error", err)
}
Expand Down
10 changes: 7 additions & 3 deletions pkg/executor/containerexecutor/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (c *ContainerExecutor) TailJobLogs(ctx context.Context, id, namespace strin

case corev1.PodRunning:
l.Debug("tailing pod logs: immediately")
return c.TailPodLogs(namespace, pod, logs)
return c.TailPodLogs(id, namespace, pod, logs)

case corev1.PodFailed:
err := fmt.Errorf("can't get pod logs, pod failed: %s/%s", pod.Namespace, pod.Name)
Expand All @@ -48,14 +48,14 @@ func (c *ContainerExecutor) TailJobLogs(ctx context.Context, id, namespace strin
}

l.Debug("tailing pod logs")
return c.TailPodLogs(namespace, pod, logs)
return c.TailPodLogs(id, namespace, pod, logs)
}
}
}
return
}

func (c *ContainerExecutor) TailPodLogs(namespace string, pod corev1.Pod, logs chan []byte) (err error) {
func (c *ContainerExecutor) TailPodLogs(id, namespace string, pod corev1.Pod, logs chan []byte) (err error) {
var containers []string
for _, container := range pod.Spec.InitContainers {
containers = append(containers, container.Name)
Expand All @@ -73,6 +73,10 @@ func (c *ContainerExecutor) TailPodLogs(namespace string, pod corev1.Pod, logs c
ctx := context.Background()

for _, container := range containers {
if !executor.IsWhitelistedContainer(container, id, c.whitelistedContainers) {
wg.Done()
continue
}
go func(container string) {
defer wg.Done()
podLogOptions := corev1.PodLogOptions{
Expand Down

0 comments on commit f994ef3

Please sign in to comment.