Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Add timestamps to task log plugin structure (#183)
Browse files Browse the repository at this point in the history
  • Loading branch information
wild-endeavor authored Jun 16, 2021
1 parent 6996bb8 commit b0684d9
Show file tree
Hide file tree
Showing 7 changed files with 353 additions and 61 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/aws/aws-sdk-go-v2/config v1.0.0
github.com/aws/aws-sdk-go-v2/service/athena v1.0.0
github.com/coocood/freecache v1.1.1
github.com/flyteorg/flyteidl v0.18.48
github.com/flyteorg/flyteidl v0.19.2
github.com/flyteorg/flytestdlib v0.3.13
github.com/go-logr/zapr v0.4.0 // indirect
github.com/go-test/deep v1.0.7
Expand Down
227 changes: 227 additions & 0 deletions go.sum

Large diffs are not rendered by default.

13 changes: 8 additions & 5 deletions go/tasks/logs/logging_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package logs
import (
"context"
"fmt"
"time"

"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/tasklog"

Expand Down Expand Up @@ -44,11 +45,13 @@ func GetLogsForContainerInPod(ctx context.Context, pod *v1.Pod, index uint32, na

logs, err := logPlugin.GetTaskLogs(
tasklog.Input{
PodName: pod.Name,
Namespace: pod.Namespace,
ContainerName: pod.Spec.Containers[index].Name,
ContainerID: pod.Status.ContainerStatuses[index].ContainerID,
LogName: nameSuffix,
PodName: pod.Name,
Namespace: pod.Namespace,
ContainerName: pod.Spec.Containers[index].Name,
ContainerID: pod.Status.ContainerStatuses[index].ContainerID,
LogName: nameSuffix,
PodUnixStartTime: pod.CreationTimestamp.Unix(),
PodUnixFinishTime: time.Now().Unix(),
},
)

Expand Down
14 changes: 8 additions & 6 deletions go/tasks/pluginmachinery/tasklog/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ import "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
// Input contains all available information about task's execution that a log plugin can use to construct task's
// log links.
type Input struct {
HostName string `json:"hostname"`
PodName string `json:"podName"`
Namespace string `json:"namespace"`
ContainerName string `json:"containerName"`
ContainerID string `json:"containerId"`
LogName string `json:"logName"`
HostName string `json:"hostname"`
PodName string `json:"podName"`
Namespace string `json:"namespace"`
ContainerName string `json:"containerName"`
ContainerID string `json:"containerId"`
LogName string `json:"logName"`
PodUnixStartTime int64 `json:"podUnixStartTime"`
PodUnixFinishTime int64 `json:"podUnixFinishTime"`
}

// Output contains all task logs a plugin generates for a given Input.
Expand Down
53 changes: 35 additions & 18 deletions go/tasks/pluginmachinery/tasklog/template.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package tasklog
import (
"fmt"
"regexp"
"strconv"
"strings"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
Expand All @@ -15,6 +16,8 @@ import (
// {{ .containerId }}: The container id docker/crio generated at run time,
// {{ .logName }}: A deployment specific name where to expect the logs to be.
// {{ .hostname }}: The hostname where the pod is running and where logs reside.
// {{ .podUnixStartTime }}: The pod creation time (in unix seconds, not millis)
// {{ .podUnixFinishTime }}: Don't have a good mechanism for this yet, but approximating with time.Now for now
type TemplateLogPlugin struct {
templateUris []string
messageFormat core.TaskLog_MessageFormat
Expand All @@ -26,22 +29,26 @@ type regexValPair struct {
}

type templateRegexes struct {
PodName *regexp.Regexp
Namespace *regexp.Regexp
ContainerName *regexp.Regexp
ContainerID *regexp.Regexp
LogName *regexp.Regexp
Hostname *regexp.Regexp
PodName *regexp.Regexp
Namespace *regexp.Regexp
ContainerName *regexp.Regexp
ContainerID *regexp.Regexp
LogName *regexp.Regexp
Hostname *regexp.Regexp
PodUnixStartTime *regexp.Regexp
PodUnixFinishTime *regexp.Regexp
}

func mustInitTemplateRegexes() templateRegexes {
return templateRegexes{
PodName: mustCreateRegex("podName"),
Namespace: mustCreateRegex("namespace"),
ContainerName: mustCreateRegex("containerName"),
ContainerID: mustCreateRegex("containerID"),
LogName: mustCreateRegex("logName"),
Hostname: mustCreateRegex("hostname"),
PodName: mustCreateRegex("podName"),
Namespace: mustCreateRegex("namespace"),
ContainerName: mustCreateRegex("containerName"),
ContainerID: mustCreateRegex("containerID"),
LogName: mustCreateRegex("logName"),
Hostname: mustCreateRegex("hostname"),
PodUnixStartTime: mustCreateRegex("podUnixStartTime"),
PodUnixFinishTime: mustCreateRegex("podUnixFinishTime"),
}
}

Expand All @@ -59,13 +66,15 @@ func replaceAll(template string, values []regexValPair) string {
return template
}

func (s TemplateLogPlugin) GetTaskLog(podName, namespace, containerName, containerID, logName string) (core.TaskLog, error) {
func (s TemplateLogPlugin) GetTaskLog(podName, namespace, containerName, containerID, logName string, podUnixStartTime, podUnixFinishTime int64) (core.TaskLog, error) {
o, err := s.GetTaskLogs(Input{
LogName: logName,
Namespace: namespace,
PodName: podName,
ContainerName: containerName,
ContainerID: containerID,
LogName: logName,
Namespace: namespace,
PodName: podName,
ContainerName: containerName,
ContainerID: containerID,
PodUnixStartTime: podUnixStartTime,
PodUnixFinishTime: podUnixFinishTime,
})

if err != nil || len(o.TaskLogs) == 0 {
Expand Down Expand Up @@ -114,6 +123,14 @@ func (s TemplateLogPlugin) GetTaskLogs(input Input) (Output, error) {
regex: regexes.Hostname,
val: input.HostName,
},
{
regex: regexes.PodUnixStartTime,
val: strconv.FormatInt(input.PodUnixStartTime, 10),
},
{
regex: regexes.PodUnixFinishTime,
val: strconv.FormatInt(input.PodUnixFinishTime, 10),
},
},
),
Name: input.LogName,
Expand Down
98 changes: 70 additions & 28 deletions go/tasks/pluginmachinery/tasklog/template_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ func TestTemplateLog(t *testing.T) {
"flyteexamples-production",
"spark-kubernetes-driver",
"cri-o://abc",
"main_logs")
"main_logs",
1426349294,
1623782877,
)
assert.NoError(t, err)
assert.Equal(t, tl.GetName(), "main_logs")
assert.Equal(t, tl.GetMessageFormat(), core.TaskLog_JSON)
Expand All @@ -37,11 +40,13 @@ func Test_templateLogPlugin_Regression(t *testing.T) {
messageFormat core.TaskLog_MessageFormat
}
type args struct {
podName string
namespace string
containerName string
containerID string
logName string
podName string
namespace string
containerName string
containerID string
logName string
podUnixStartTime int64
podUnixFinishTime int64
}
tests := []struct {
name string
Expand All @@ -57,11 +62,13 @@ func Test_templateLogPlugin_Regression(t *testing.T) {
messageFormat: core.TaskLog_JSON,
},
args{
podName: "f-uuid-driver",
namespace: "flyteexamples-production",
containerName: "spark-kubernetes-driver",
containerID: "cri-o://abc",
logName: "main_logs",
podName: "f-uuid-driver",
namespace: "flyteexamples-production",
containerName: "spark-kubernetes-driver",
containerID: "cri-o://abc",
logName: "main_logs",
podUnixStartTime: 123,
podUnixFinishTime: 12345,
},
core.TaskLog{
Uri: "https://console.aws.amazon.com/cloudwatch/home?region=us-east-1#logEventViewer:group=/flyte-production/kubernetes;stream=var.log.containers.f-uuid-driver_flyteexamples-production_spark-kubernetes-driver-abc.log",
Expand All @@ -77,11 +84,13 @@ func Test_templateLogPlugin_Regression(t *testing.T) {
messageFormat: core.TaskLog_JSON,
},
args{
podName: "podName",
namespace: "flyteexamples-production",
containerName: "spark-kubernetes-driver",
containerID: "cri-o://abc",
logName: "main_logs",
podName: "podName",
namespace: "flyteexamples-production",
containerName: "spark-kubernetes-driver",
containerID: "cri-o://abc",
logName: "main_logs",
podUnixStartTime: 123,
podUnixFinishTime: 12345,
},
core.TaskLog{
Uri: "https://console.cloud.google.com/logs/viewer?project=test-gcp-project&angularJsUrl=%2Flogs%2Fviewer%3Fproject%3Dtest-gcp-project&resource=aws_ec2_instance&advancedFilter=resource.labels.pod_name%3DpodName",
Expand All @@ -97,11 +106,13 @@ func Test_templateLogPlugin_Regression(t *testing.T) {
messageFormat: core.TaskLog_JSON,
},
args{
podName: "flyteexamples-development-task-name",
namespace: "flyteexamples-development",
containerName: "ignore",
containerID: "ignore",
logName: "main_logs",
podName: "flyteexamples-development-task-name",
namespace: "flyteexamples-development",
containerName: "ignore",
containerID: "ignore",
logName: "main_logs",
podUnixStartTime: 123,
podUnixFinishTime: 12345,
},
core.TaskLog{
Uri: "https://dashboard.k8s.net/#!/log/flyteexamples-development/flyteexamples-development-task-name/pod?namespace=flyteexamples-development",
Expand All @@ -118,7 +129,7 @@ func Test_templateLogPlugin_Regression(t *testing.T) {
messageFormat: tt.fields.messageFormat,
}

got, err := s.GetTaskLog(tt.args.podName, tt.args.namespace, tt.args.containerName, tt.args.containerID, tt.args.logName)
got, err := s.GetTaskLog(tt.args.podName, tt.args.namespace, tt.args.containerName, tt.args.containerID, tt.args.logName, tt.args.podUnixStartTime, tt.args.podUnixFinishTime)
if (err != nil) != tt.wantErr {
t.Errorf("GetTaskLog() error = %v, wantErr %v", err, tt.wantErr)
return
Expand Down Expand Up @@ -154,12 +165,14 @@ func TestTemplateLogPlugin_NewTaskLog(t *testing.T) {
},
args{
input: Input{
HostName: "my-host",
PodName: "my-pod",
Namespace: "my-namespace",
ContainerName: "my-container",
ContainerID: "ignore",
LogName: "main_logs",
HostName: "my-host",
PodName: "my-pod",
Namespace: "my-namespace",
ContainerName: "my-container",
ContainerID: "ignore",
LogName: "main_logs",
PodUnixStartTime: 123,
PodUnixFinishTime: 12345,
},
},
Output{
Expand All @@ -173,6 +186,35 @@ func TestTemplateLogPlugin_NewTaskLog(t *testing.T) {
},
false,
},
{
"ddog",
fields{
templateURI: "https://app.datadoghq.com/logs?event&from_ts={{ .podUnixStartTime }}&live=true&query=pod_name%3A{{ .podName }}&to_ts={{ .podUnixFinishTime }}",
messageFormat: core.TaskLog_JSON,
},
args{
input: Input{
HostName: "my-host",
PodName: "my-pod",
Namespace: "my-namespace",
ContainerName: "my-container",
ContainerID: "ignore",
LogName: "main_logs",
PodUnixStartTime: 123,
PodUnixFinishTime: 12345,
},
},
Output{
TaskLogs: []*core.TaskLog{
{
Uri: "https://app.datadoghq.com/logs?event&from_ts=123&live=true&query=pod_name%3Amy-pod&to_ts=12345",
MessageFormat: core.TaskLog_JSON,
Name: "main_logs",
},
},
},
false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
7 changes: 4 additions & 3 deletions go/tasks/plugins/array/k8s/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,9 +222,10 @@ func FetchPodStatusAndLogs(ctx context.Context, client core.KubeClient, name k8s

if logPlugin != nil {
o, err := logPlugin.GetTaskLogs(tasklog.Input{
PodName: pod.Name,
Namespace: pod.Namespace,
LogName: fmt.Sprintf(" #%d-%d", index, retryAttempt),
PodName: pod.Name,
Namespace: pod.Namespace,
LogName: fmt.Sprintf(" #%d-%d", index, retryAttempt),
PodUnixStartTime: pod.CreationTimestamp.Unix(),
})

if err != nil {
Expand Down

0 comments on commit b0684d9

Please sign in to comment.