diff --git a/api/v1/subscription_types.go b/api/v1/subscription_types.go index 7731a4b..79b4e6a 100644 --- a/api/v1/subscription_types.go +++ b/api/v1/subscription_types.go @@ -37,10 +37,16 @@ type SubscriptionSpec struct { // SubscriptionStatus defines the observed state of Subscription type SubscriptionStatus struct { - // INSERT ADDITIONAL STATUS FIELD - define observed state of cluster - // Important: Run "make" to regenerate code after modifying this file + Phase SubscriptionStatusPhase `json:"phase,omitempty"` } +type SubscriptionStatusPhase string + +const ( + SubscriptionStatusPhaseActive SubscriptionStatusPhase = "Active" + SubscriptionStatusPhaseError SubscriptionStatusPhase = "Error" +) + //+kubebuilder:object:root=true //+kubebuilder:subresource:status diff --git a/config/crd/bases/googlecloudpubsuboperator.quipper.github.io_subscriptions.yaml b/config/crd/bases/googlecloudpubsuboperator.quipper.github.io_subscriptions.yaml index 20f9ecf..8d80d6a 100644 --- a/config/crd/bases/googlecloudpubsuboperator.quipper.github.io_subscriptions.yaml +++ b/config/crd/bases/googlecloudpubsuboperator.quipper.github.io_subscriptions.yaml @@ -49,6 +49,9 @@ spec: type: object status: description: SubscriptionStatus defines the observed state of Subscription + properties: + phase: + type: string type: object type: object served: true diff --git a/internal/controller/subscription_controller.go b/internal/controller/subscription_controller.go index b6bab0a..131705a 100644 --- a/internal/controller/subscription_controller.go +++ b/internal/controller/subscription_controller.go @@ -55,8 +55,7 @@ func (r *SubscriptionReconciler) Reconcile(ctx context.Context, req ctrl.Request // on deleted requests. return ctrl.Result{}, crclient.IgnoreNotFound(err) } - - logger.Info("Found the subscription", "subscription", subscription) + logger.Info("Found the subscription resource") // examine DeletionTimestamp to determine if object is under deletion if subscription.ObjectMeta.DeletionTimestamp.IsZero() { @@ -95,14 +94,34 @@ func (r *SubscriptionReconciler) Reconcile(ctx context.Context, req ctrl.Request if isPubSubAlreadyExistsError(err) { // don't treat as error logger.Info("PubSub subscription already exists") + subscriptionPatch := crclient.MergeFrom(subscription.DeepCopy()) + subscription.Status.Phase = googlecloudpubsuboperatorv1.SubscriptionStatusPhaseActive + if err := r.Client.Status().Patch(ctx, &subscription, subscriptionPatch); err != nil { + logger.Error(err, "unable to update status") + return ctrl.Result{}, err + } + logger.Info("Subscription status has been patched to Active") return ctrl.Result{}, nil } + subscriptionPatch := crclient.MergeFrom(subscription.DeepCopy()) + subscription.Status.Phase = googlecloudpubsuboperatorv1.SubscriptionStatusPhaseError + if err := r.Client.Status().Patch(ctx, &subscription, subscriptionPatch); err != nil { + logger.Error(err, "unable to update status") + return ctrl.Result{}, err + } + logger.Info("Subscription status has been patched to Error") return ctrl.Result{}, err } - logger.Info(fmt.Sprintf("Subscription created: %v", s.ID()), "subscription", subscription) + subscriptionPatch := crclient.MergeFrom(subscription.DeepCopy()) + subscription.Status.Phase = googlecloudpubsuboperatorv1.SubscriptionStatusPhaseActive + if err := r.Client.Status().Patch(ctx, &subscription, subscriptionPatch); err != nil { + logger.Error(err, "unable to update status") + return ctrl.Result{}, err + } + logger.Info("Subscription status has been patched to Active") return ctrl.Result{}, nil } diff --git a/internal/controller/subscription_controller_test.go b/internal/controller/subscription_controller_test.go index b11745b..5ac10b0 100644 --- a/internal/controller/subscription_controller_test.go +++ b/internal/controller/subscription_controller_test.go @@ -9,29 +9,31 @@ import ( googlecloudpubsuboperatorv1 "github.com/quipper/google-cloud-pubsub-operator/api/v1" "github.com/quipper/google-cloud-pubsub-operator/internal/pubsubtest" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" //+kubebuilder:scaffold:imports ) var _ = Describe("Subscription controller", func() { Context("When creating a Subscription resource", func() { - const projectID = "subscription-project" It("Should create a Pub/Sub Subscription", func(ctx context.Context) { + const projectID = "subscription-project-1" psClient, err := pubsubtest.NewClient(ctx, projectID, psServer) Expect(err).ShouldNot(HaveOccurred()) + By("Creating a Topic") topicID := "my-topic" _, err = psClient.CreateTopic(ctx, topicID) Expect(err).ShouldNot(HaveOccurred()) By("Creating a Subscription") - topic := &googlecloudpubsuboperatorv1.Subscription{ + subscription := &googlecloudpubsuboperatorv1.Subscription{ TypeMeta: metav1.TypeMeta{ APIVersion: "googlecloudpubsuboperator.quipper.github.io/v1", Kind: "Subscription", }, ObjectMeta: metav1.ObjectMeta{ - Name: "example", - Namespace: "default", + GenerateName: "example-", + Namespace: "default", }, Spec: googlecloudpubsuboperatorv1.SubscriptionSpec{ SubscriptionProjectID: projectID, @@ -40,7 +42,7 @@ var _ = Describe("Subscription controller", func() { TopicID: topicID, }, } - Expect(k8sClient.Create(ctx, topic)).Should(Succeed()) + Expect(k8sClient.Create(ctx, subscription)).Should(Succeed()) By("Checking if the Subscription exists") Eventually(func(g Gomega) { @@ -49,5 +51,38 @@ var _ = Describe("Subscription controller", func() { g.Expect(subscriptionExists).Should(BeTrue()) }, 3*time.Second, 100*time.Millisecond).Should(Succeed()) }) + + It("Should update the status if error", func(ctx context.Context) { + const projectID = "subscription-project-2" + + By("Creating a Subscription") + subscription := &googlecloudpubsuboperatorv1.Subscription{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "googlecloudpubsuboperator.quipper.github.io/v1", + Kind: "Subscription", + }, + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "example-", + Namespace: "default", + }, + Spec: googlecloudpubsuboperatorv1.SubscriptionSpec{ + SubscriptionProjectID: projectID, + SubscriptionID: "my-subscription", + // CreateSubscription API should fail because the topic does not exist. + // We don't need to explicitly inject an error. + TopicProjectID: projectID, + TopicID: "invalid-topic", + }, + } + Expect(k8sClient.Create(ctx, subscription)).Should(Succeed()) + subscriptionRef := types.NamespacedName{Namespace: subscription.Namespace, Name: subscription.Name} + + By("Checking if the status is Error") + Eventually(func(g Gomega) { + var subscription googlecloudpubsuboperatorv1.Subscription + g.Expect(k8sClient.Get(ctx, subscriptionRef, &subscription)).Should(Succeed()) + g.Expect(subscription.Status.Phase).Should(Equal(googlecloudpubsuboperatorv1.SubscriptionStatusPhaseError)) + }, 3*time.Second, 100*time.Millisecond).Should(Succeed()) + }) }) }) diff --git a/internal/controller/topic_controller_test.go b/internal/controller/topic_controller_test.go index 7c4e3d9..766086f 100644 --- a/internal/controller/topic_controller_test.go +++ b/internal/controller/topic_controller_test.go @@ -54,7 +54,7 @@ var _ = Describe("Topic controller", func() { }) It("Should update the status if error", func(ctx context.Context) { - const projectID = "error-injected-project-1" + const projectID = "error-injected-project-topic-1" By("Creating a Topic") topic := &googlecloudpubsuboperatorv1.Topic{