diff --git a/internal/controller/topic_controller.go b/internal/controller/topic_controller.go index 7c2123a..a22fb9c 100644 --- a/internal/controller/topic_controller.go +++ b/internal/controller/topic_controller.go @@ -57,7 +57,7 @@ func (r *TopicReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl // on deleted requests. return ctrl.Result{}, client.IgnoreNotFound(err) } - logger.Info("Found the topic", "topic", topic) + logger.Info("Found Topic resource") // examine DeletionTimestamp to determine if object is under deletion if topic.ObjectMeta.DeletionTimestamp.IsZero() { @@ -94,34 +94,36 @@ func (r *TopicReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl topicPatch := crclient.MergeFrom(topic.DeepCopy()) topic.Status.Phase = "Creating" if err := r.Client.Status().Patch(ctx, &topic, topicPatch); err != nil { - logger.Error(err, "unable to update Tunnel status") + logger.Error(err, "unable to update status") return ctrl.Result{}, err } + logger.Info("Topic status has been patched to Creating") t, err := r.createTopic(ctx, topic.Spec.ProjectID, topic.Spec.TopicID) if err != nil { if gs, ok := gRPCStatusFromError(err); ok && gs.Code() == codes.AlreadyExists { // don't treat as error - logger.Info("PubSub topic already exists") + logger.Info("Topic already exists in Cloud Pub/Sub") return ctrl.Result{}, nil } topicPatch := crclient.MergeFrom(topic.DeepCopy()) topic.Status.Phase = "Error" if err := r.Client.Status().Patch(ctx, &topic, topicPatch); err != nil { - logger.Error(err, "unable to update Tunnel status") + logger.Error(err, "unable to update status") return ctrl.Result{}, err } return ctrl.Result{}, err } + logger.Info("Created topic into Cloud Pub/Sub", "id", t.ID()) - logger.Info(fmt.Sprintf("Topic created: %v", t.ID()), "topic", topic) topicPatch = crclient.MergeFrom(topic.DeepCopy()) topic.Status.Phase = "Active" // TODO: extract const if err := r.Client.Status().Patch(ctx, &topic, topicPatch); err != nil { - logger.Error(err, "unable to update Tunnel status") + logger.Error(err, "unable to update status") return ctrl.Result{}, err } + logger.Info("Topic status has been patched to Active") return ctrl.Result{}, nil } diff --git a/internal/controller/topic_controller_test.go b/internal/controller/topic_controller_test.go index 33a2f65..cdd113c 100644 --- a/internal/controller/topic_controller_test.go +++ b/internal/controller/topic_controller_test.go @@ -57,6 +57,13 @@ var _ = Describe("Topic controller", func() { g.Expect(err).ShouldNot(HaveOccurred()) g.Expect(topicExists).Should(BeTrue()) }, 3*time.Second, 100*time.Millisecond).Should(Succeed()) + + By("Checking if the status is Active") + Eventually(func(g Gomega) { + var topic googlecloudpubsuboperatorv1.Topic + g.Expect(k8sClient.Get(ctx, types.NamespacedName{Namespace: "default", Name: "example"}, &topic)).Should(Succeed()) + g.Expect(topic.Status.Phase).Should(Equal("Active")) + }, 3*time.Second, 100*time.Millisecond).Should(Succeed()) }) }) })