diff --git a/internal/builder/run.go b/internal/builder/run.go index 2708abd..07a7d8e 100644 --- a/internal/builder/run.go +++ b/internal/builder/run.go @@ -64,12 +64,12 @@ func (r *Run) Run(ctx context.Context) error { ) } - podsClient := clientset.CoreV1().Pods(r.KubeNamespace) + jobsClient := clientset.BatchV1().Jobs(r.KubeNamespace) - pod, err := util.CreateChaincodePod( + job, err := util.CreateChaincodeJob( ctx, logger, - podsClient, + jobsClient, kubeObjectName, r.KubeNamespace, r.KubeServiceAccount, @@ -82,11 +82,13 @@ func (r *Run) Run(ctx context.Context) error { } logger.Printf( - "Running chaincode ID %s in kubernetes pod %s/%s", + "Running chaincode ID %s with kubernetes job %s/%s", chaincodeData.ChaincodeID, - pod.Namespace, - pod.Name, + job.Namespace, + job.Name, ) - return util.WaitForChaincodePod(ctx, logger, podsClient, pod, chaincodeData.ChaincodeID) + batchClient := clientset.BatchV1().RESTClient() + + return util.WaitForChaincodeJob(ctx, logger, batchClient, job, chaincodeData.ChaincodeID) } diff --git a/internal/util/k8s.go b/internal/util/k8s.go index 4aa2593..482b1df 100644 --- a/internal/util/k8s.go +++ b/internal/util/k8s.go @@ -14,14 +14,15 @@ import ( "time" "github.com/hyperledger-labs/fabric-builder-k8s/internal/log" + batchv1 "k8s.io/api/batch/v1" apiv1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" - "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/rand" "k8s.io/apimachinery/pkg/watch" applycorev1 "k8s.io/client-go/applyconfigurations/core/v1" "k8s.io/client-go/kubernetes" + typedBatchv1 "k8s.io/client-go/kubernetes/typed/batch/v1" v1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/clientcmd" @@ -44,154 +45,113 @@ const ( TLSClientRootCertFile string = "/etc/hyperledger/fabric/peer.crt" ) -func waitForPod( +func waitForJob( ctx context.Context, - timeout time.Duration, - podsClient v1.PodInterface, - podName, namespace string, + // jobsClient typedBatchv1.JobInterface, + client cache.Getter, + jobName, namespace string, conditionFunc watchtools.ConditionFunc, -) (*apiv1.PodStatus, error) { - fieldSelector := fields.OneTermEqualSelector("metadata.name", podName).String() +) (*batchv1.JobStatus, error) { + // fieldSelector := fields.OneTermEqualSelector("metadata.name", jobName).String() - listWatch := &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - options.FieldSelector = fieldSelector + // listWatch := &cache.ListWatch{ + // ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + // options.FieldSelector = fieldSelector - return podsClient.List(context.TODO(), options) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - options.FieldSelector = fieldSelector + // return jobsClient.List(context.TODO(), options) + // }, + // WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + // options.FieldSelector = fieldSelector - return podsClient.Watch(context.TODO(), options) - }, - } + // return jobsClient.Watch(context.TODO(), options) + // }, + // } - // TODO it might be nice to use NewListWatchFromClient instead but not sure what - // client to give it to avoid forbidden errors for pod list. - // var client kubernetes.Interface - // listWatch := cache.NewListWatchFromClient(client, "pods", namespace, fieldSelector) + fieldSelector := fields.OneTermEqualSelector("metadata.name", jobName) + listWatch := cache.NewListWatchFromClient(client, "jobs", namespace, fieldSelector) - ctx, cancel := watchtools.ContextWithOptionalTimeout(ctx, timeout) + ctx, cancel := watchtools.ContextWithOptionalTimeout(ctx, 0) defer cancel() - event, err := watchtools.UntilWithSync(ctx, listWatch, &apiv1.Pod{}, nil, conditionFunc) + event, err := watchtools.UntilWithSync(ctx, listWatch, &batchv1.Job{}, nil, conditionFunc) if err != nil { return nil, err } if event == nil { - return nil, fmt.Errorf("no events received for pod %s/%s", namespace, podName) + return nil, fmt.Errorf("no events received for job %s/%s", namespace, jobName) } - pod, ok := event.Object.(*apiv1.Pod) + job, ok := event.Object.(*batchv1.Job) if !ok { - return nil, fmt.Errorf("unexpected object while watching pod %s/%s", namespace, podName) + return nil, fmt.Errorf("event contained unexpected object %T while watching job %s/%s", job, namespace, jobName) } - return &pod.Status, nil + return &job.Status, nil } -func waitForPodRunning( +func waitForJobTermination( ctx context.Context, - timeout time.Duration, - podsClient v1.PodInterface, - podName, namespace string, -) (*apiv1.PodStatus, error) { - podRunningCondition := func(event watch.Event) (bool, error) { - pod, ok := event.Object.(*apiv1.Pod) - if !ok { - return false, fmt.Errorf( - "unexpected object while watching pod %s/%s", - namespace, - podName, - ) - } - - phase := pod.Status.Phase - if phase == apiv1.PodRunning { - return true, nil - } - - return false, nil - } - - return waitForPod(ctx, timeout, podsClient, podName, namespace, podRunningCondition) -} - -func waitForPodTermination( - ctx context.Context, - timeout time.Duration, - podsClient v1.PodInterface, - podName, namespace string, -) (*apiv1.PodStatus, error) { - podTerminationCondition := func(event watch.Event) (bool, error) { + // jobsClient typedBatchv1.JobInterface, + client cache.Getter, + jobName, namespace string, +) (*batchv1.JobStatus, error) { + jobTerminationCondition := func(event watch.Event) (bool, error) { if event.Type == watch.Deleted { return true, nil } - pod, ok := event.Object.(*apiv1.Pod) + job, ok := event.Object.(*batchv1.Job) if !ok { return false, fmt.Errorf( - "unexpected object while watching pod %s/%s", + "event contained unexpected object %T while watching job %s/%s", + job, namespace, - podName, + jobName, ) } - phase := pod.Status.Phase - if phase != apiv1.PodRunning { - return true, nil + for _, c := range job.Status.Conditions { + if c.Type == batchv1.JobComplete && c.Status == "True" { + return true, nil + } else if c.Type == batchv1.JobFailed && c.Status == "True" { + return true, fmt.Errorf("job %s/%s failed for reason %s: %s", namespace, jobName, c.Reason, c.Message) + } } return false, nil } - return waitForPod(ctx, timeout, podsClient, podName, namespace, podTerminationCondition) + return waitForJob(ctx, client, jobName, namespace, jobTerminationCondition) } -func WaitForChaincodePod( +func WaitForChaincodeJob( ctx context.Context, logger *log.CmdLogger, - podsClient v1.PodInterface, - pod *apiv1.Pod, + // jobsClient typedBatchv1.JobInterface, + client cache.Getter, + job *batchv1.Job, chaincodeID string, ) error { - logger.Debugf("Waiting for pod %s/%s for chaincode ID %s", pod.Namespace, pod.Name, chaincodeID) + logger.Debugf("Waiting for job %s/%s to terminate for chaincode ID %s", job.Namespace, job.Name, chaincodeID) - _, err := waitForPodRunning(ctx, time.Minute, podsClient, pod.Name, pod.Namespace) + _, err := waitForJobTermination(ctx, client, job.Name, job.Namespace) if err != nil { return fmt.Errorf( - "error waiting for chaincode pod %s/%s for chaincode ID %s: %w", - pod.Namespace, - pod.Name, + "error waiting for chaincode job %s/%s to terminate for chaincode ID %s: %w", + job.Namespace, + job.Name, chaincodeID, err, ) } - status, err := waitForPodTermination(ctx, 0, podsClient, pod.Name, pod.Namespace) - if err != nil { - return fmt.Errorf( - "error waiting for chaincode pod %s/%s to terminate for chaincode ID %s: %w", - pod.Namespace, - pod.Name, - chaincodeID, - err, - ) - } - - if status != nil { - return fmt.Errorf( - "chaincode pod %s/%s for chaincode ID %s terminated %s: %s", - pod.Namespace, - pod.Name, - chaincodeID, - status.Reason, - status.Message, - ) - } - - return fmt.Errorf("unexpected chaincode pod termination for chaincode ID %s", chaincodeID) + return fmt.Errorf( + "chaincode job %s/%s for chaincode ID %s terminated", + job.Namespace, + job.Name, + chaincodeID, + ) } // GetKubeClientset returns a client object for a provided kubeconfig filepath @@ -226,16 +186,20 @@ func GetKubeNamespace() (string, error) { return string(namespace), nil } -func getChaincodePodObject( +func getChaincodeJobSpec( imageData *ImageJSON, - namespace, serviceAccount, podName, peerID string, + namespace, serviceAccount, jobName, peerID string, chaincodeData *ChaincodeJSON, -) *apiv1.Pod { +) *batchv1.Job { chaincodeImage := imageData.Name + "@" + imageData.Digest - return &apiv1.Pod{ + var backoffLimit int32 = 0 + var ttSecondsAfterFinished int32 = int32((5 * time.Minute) / time.Second) + + return &batchv1.Job{ ObjectMeta: metav1.ObjectMeta{ - Name: podName, + // TODO: add random suffix to jobname?! + Name: jobName + "-" + rand.String(5), Namespace: namespace, Labels: map[string]string{ "app.kubernetes.io/name": "fabric", @@ -249,70 +213,78 @@ func getChaincodePodObject( "fabric-builder-k8s-ccid": chaincodeData.ChaincodeID, }, }, - Spec: apiv1.PodSpec{ - ServiceAccountName: serviceAccount, - Containers: []apiv1.Container{ - { - Name: "main", - Image: chaincodeImage, - VolumeMounts: []apiv1.VolumeMount{ + Spec: batchv1.JobSpec{ + Template: apiv1.PodTemplateSpec{ + Spec: apiv1.PodSpec{ + // TODO explicitly name and label the pod? + // Should be able to reuse the job name if it already has a random suffix? + ServiceAccountName: serviceAccount, + Containers: []apiv1.Container{ { - Name: "certs", - MountPath: "/etc/hyperledger/fabric", - ReadOnly: true, + Name: "chaincode", + Image: chaincodeImage, + VolumeMounts: []apiv1.VolumeMount{ + { + Name: "certs", + MountPath: "/etc/hyperledger/fabric", + ReadOnly: true, + }, + }, + Env: []apiv1.EnvVar{ + { + Name: "CORE_CHAINCODE_ID_NAME", + Value: chaincodeData.ChaincodeID, + }, + { + Name: "CORE_PEER_ADDRESS", + Value: chaincodeData.PeerAddress, + }, + { + Name: "CORE_PEER_TLS_ENABLED", + Value: "true", // TODO only if there are certs? + }, + { + Name: "CORE_PEER_TLS_ROOTCERT_FILE", + Value: TLSClientRootCertFile, + }, + { + Name: "CORE_TLS_CLIENT_KEY_PATH", + Value: TLSClientKeyPath, + }, + { + Name: "CORE_TLS_CLIENT_CERT_PATH", + Value: TLSClientCertPath, + }, + { + Name: "CORE_TLS_CLIENT_KEY_FILE", + Value: TLSClientKeyFile, + }, + { + Name: "CORE_TLS_CLIENT_CERT_FILE", + Value: TLSClientCertFile, + }, + { + Name: "CORE_PEER_LOCALMSPID", + Value: chaincodeData.MspID, + }, + }, }, }, - Env: []apiv1.EnvVar{ - { - Name: "CORE_CHAINCODE_ID_NAME", - Value: chaincodeData.ChaincodeID, - }, - { - Name: "CORE_PEER_ADDRESS", - Value: chaincodeData.PeerAddress, - }, - { - Name: "CORE_PEER_TLS_ENABLED", - Value: "true", // TODO only if there are certs? - }, - { - Name: "CORE_PEER_TLS_ROOTCERT_FILE", - Value: TLSClientRootCertFile, - }, - { - Name: "CORE_TLS_CLIENT_KEY_PATH", - Value: TLSClientKeyPath, - }, - { - Name: "CORE_TLS_CLIENT_CERT_PATH", - Value: TLSClientCertPath, - }, - { - Name: "CORE_TLS_CLIENT_KEY_FILE", - Value: TLSClientKeyFile, - }, - { - Name: "CORE_TLS_CLIENT_CERT_FILE", - Value: TLSClientCertFile, - }, + RestartPolicy: apiv1.RestartPolicyNever, + Volumes: []apiv1.Volume{ { - Name: "CORE_PEER_LOCALMSPID", - Value: chaincodeData.MspID, - }, - }, - }, - }, - RestartPolicy: apiv1.RestartPolicyNever, - Volumes: []apiv1.Volume{ - { - Name: "certs", - VolumeSource: apiv1.VolumeSource{ - Secret: &apiv1.SecretVolumeSource{ - SecretName: podName, + Name: "certs", + VolumeSource: apiv1.VolumeSource{ + Secret: &apiv1.SecretVolumeSource{ + SecretName: jobName, + }, + }, }, }, }, }, + BackoffLimit: &backoffLimit, + TTLSecondsAfterFinished: &ttSecondsAfterFinished, }, } } @@ -378,67 +350,15 @@ func ApplyChaincodeSecrets( return nil } -func deleteChaincodePod( - ctx context.Context, - logger *log.CmdLogger, - podsClient v1.PodInterface, - podName, namespace string, - chaincodeData *ChaincodeJSON, -) error { - logger.Debugf( - "Deleting any existing chaincode pod for chaincode ID %s: %s/%s", - chaincodeData.ChaincodeID, - namespace, - podName, - ) - - err := podsClient.Delete(ctx, podName, metav1.DeleteOptions{}) - if err != nil { - if errors.IsNotFound(err) { - logger.Debugf( - "No existing chaincode pod for chaincode ID %s: %s/%s", - chaincodeData.ChaincodeID, - namespace, - podName, - ) - - return nil - } - - return err - } - - logger.Debugf( - "Waiting for existing chaincode pod to terminate for chaincode ID %s: %s/%s", - chaincodeData.ChaincodeID, - namespace, - podName, - ) - - _, err = waitForPodTermination(ctx, time.Minute, podsClient, podName, namespace) - if err != nil { - return err - } - - logger.Debugf( - "Existing chaincode pod deleted for chaincode ID %s: %s/%s", - chaincodeData.ChaincodeID, - namespace, - podName, - ) - - return nil -} - -func CreateChaincodePod( +func CreateChaincodeJob( ctx context.Context, logger *log.CmdLogger, - podsClient v1.PodInterface, + jobsClient typedBatchv1.JobInterface, objectName, namespace, serviceAccount, peerID string, chaincodeData *ChaincodeJSON, imageData *ImageJSON, -) (*apiv1.Pod, error) { - podDefinition := getChaincodePodObject( +) (*batchv1.Job, error) { + jobDefinition := getChaincodeJobSpec( imageData, namespace, serviceAccount, @@ -447,28 +367,18 @@ func CreateChaincodePod( chaincodeData, ) - err := deleteChaincodePod(ctx, logger, podsClient, objectName, namespace, chaincodeData) - if err != nil { - return nil, fmt.Errorf( - "unable to delete existing chaincode pod %s/%s for chaincode ID %s: %w", - namespace, - objectName, - chaincodeData.ChaincodeID, - err, - ) - } - + // TODO doesn't log expected random suffix (split objectName and randomObjectName?) logger.Debugf( - "Creating chaincode pod for chaincode ID %s: %s/%s", + "Creating chaincode job for chaincode ID %s: %s/%s", chaincodeData.ChaincodeID, namespace, objectName, ) - pod, err := podsClient.Create(ctx, podDefinition, metav1.CreateOptions{}) + job, err := jobsClient.Create(ctx, jobDefinition, metav1.CreateOptions{}) if err != nil { return nil, fmt.Errorf( - "unable to create chaincode pod %s/%s for chaincode ID %s: %w", + "unable to create chaincode job %s/%s for chaincode ID %s: %w", namespace, objectName, chaincodeData.ChaincodeID, @@ -477,13 +387,13 @@ func CreateChaincodePod( } logger.Debugf( - "Created chaincode pod for chaincode ID %s: %s/%s", + "Created chaincode job for chaincode ID %s: %s/%s", chaincodeData.ChaincodeID, - pod.Namespace, - pod.Name, + job.Namespace, + job.Name, ) - return pod, nil + return job, nil } // GetValidRfc1035LabelName returns a valid RFC 1035 label name with the format @@ -492,6 +402,7 @@ func GetValidRfc1035LabelName(prefix, peerID string, chaincodeData *ChaincodeJSO const ( maxRfc1035LabelLength = 63 labelSeparators = 2 + // TODO reservedCharacters = 3 separators + 5 random characters ) runHash := fnv.New64a() @@ -509,6 +420,7 @@ func GetValidRfc1035LabelName(prefix, peerID string, chaincodeData *ChaincodeJSO // Make sure the chaincode package label fits in the space available, // taking in to account the prefix, suffix, and two '-' separators + // TODO leave space for an additional separator and 5 random characters! maxLabelLength := maxRfc1035LabelLength - len(prefix) - len(suffix) - labelSeparators if maxLabelLength < len(safeLabel) { safeLabel = safeLabel[:maxLabelLength]