Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add subscription status #133

Merged
merged 3 commits into from
Sep 21, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions api/v1/subscription_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,16 @@ type SubscriptionSpec struct {

// SubscriptionStatus defines the observed state of Subscription
type SubscriptionStatus struct {
// INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
// Important: Run "make" to regenerate code after modifying this file
Phase SubscriptionStatusPhase `json:"phase,omitempty"`
}

type SubscriptionStatusPhase string

const (
SubscriptionStatusPhaseActive SubscriptionStatusPhase = "Active"
SubscriptionStatusPhaseError SubscriptionStatusPhase = "Error"
)

//+kubebuilder:object:root=true
//+kubebuilder:subresource:status

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ spec:
type: object
status:
description: SubscriptionStatus defines the observed state of Subscription
properties:
phase:
type: string
type: object
type: object
served: true
Expand Down
25 changes: 22 additions & 3 deletions internal/controller/subscription_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ func (r *SubscriptionReconciler) Reconcile(ctx context.Context, req ctrl.Request
// on deleted requests.
return ctrl.Result{}, crclient.IgnoreNotFound(err)
}

logger.Info("Found the subscription", "subscription", subscription)
logger.Info("Found the subscription resource")

// examine DeletionTimestamp to determine if object is under deletion
if subscription.ObjectMeta.DeletionTimestamp.IsZero() {
Expand Down Expand Up @@ -95,14 +94,34 @@ func (r *SubscriptionReconciler) Reconcile(ctx context.Context, req ctrl.Request
if isPubSubAlreadyExistsError(err) {
// don't treat as error
logger.Info("PubSub subscription already exists")
subscriptionPatch := crclient.MergeFrom(subscription.DeepCopy())
subscription.Status.Phase = googlecloudpubsuboperatorv1.SubscriptionStatusPhaseActive
if err := r.Client.Status().Patch(ctx, &subscription, subscriptionPatch); err != nil {
logger.Error(err, "unable to update status")
return ctrl.Result{}, err
}
logger.Info("Subscription status has been patched to Active")
return ctrl.Result{}, nil
}

subscriptionPatch := crclient.MergeFrom(subscription.DeepCopy())
subscription.Status.Phase = googlecloudpubsuboperatorv1.SubscriptionStatusPhaseError
if err := r.Client.Status().Patch(ctx, &subscription, subscriptionPatch); err != nil {
logger.Error(err, "unable to update status")
return ctrl.Result{}, err
}
logger.Info("Subscription status has been patched to Error")
return ctrl.Result{}, err
}

logger.Info(fmt.Sprintf("Subscription created: %v", s.ID()), "subscription", subscription)

subscriptionPatch := crclient.MergeFrom(subscription.DeepCopy())
subscription.Status.Phase = googlecloudpubsuboperatorv1.SubscriptionStatusPhaseActive
if err := r.Client.Status().Patch(ctx, &subscription, subscriptionPatch); err != nil {
logger.Error(err, "unable to update status")
return ctrl.Result{}, err
}
logger.Info("Subscription status has been patched to Active")
return ctrl.Result{}, nil
}

Expand Down
45 changes: 40 additions & 5 deletions internal/controller/subscription_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,29 +9,31 @@ import (
googlecloudpubsuboperatorv1 "github.com/quipper/google-cloud-pubsub-operator/api/v1"
"github.com/quipper/google-cloud-pubsub-operator/internal/pubsubtest"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
//+kubebuilder:scaffold:imports
)

var _ = Describe("Subscription controller", func() {
Context("When creating a Subscription resource", func() {
const projectID = "subscription-project"
It("Should create a Pub/Sub Subscription", func(ctx context.Context) {
const projectID = "subscription-project-1"
psClient, err := pubsubtest.NewClient(ctx, projectID, psServer)
Expect(err).ShouldNot(HaveOccurred())

By("Creating a Topic")
topicID := "my-topic"
_, err = psClient.CreateTopic(ctx, topicID)
Expect(err).ShouldNot(HaveOccurred())

By("Creating a Subscription")
topic := &googlecloudpubsuboperatorv1.Subscription{
subscription := &googlecloudpubsuboperatorv1.Subscription{
TypeMeta: metav1.TypeMeta{
APIVersion: "googlecloudpubsuboperator.quipper.github.io/v1",
Kind: "Subscription",
},
ObjectMeta: metav1.ObjectMeta{
Name: "example",
Namespace: "default",
GenerateName: "example-",
Namespace: "default",
},
Spec: googlecloudpubsuboperatorv1.SubscriptionSpec{
SubscriptionProjectID: projectID,
Expand All @@ -40,7 +42,7 @@ var _ = Describe("Subscription controller", func() {
TopicID: topicID,
},
}
Expect(k8sClient.Create(ctx, topic)).Should(Succeed())
Expect(k8sClient.Create(ctx, subscription)).Should(Succeed())

By("Checking if the Subscription exists")
Eventually(func(g Gomega) {
Expand All @@ -49,5 +51,38 @@ var _ = Describe("Subscription controller", func() {
g.Expect(subscriptionExists).Should(BeTrue())
}, 3*time.Second, 100*time.Millisecond).Should(Succeed())
})

It("Should update the status if error", func(ctx context.Context) {
const projectID = "subscription-project-2"

By("Creating a Subscription")
subscription := &googlecloudpubsuboperatorv1.Subscription{
TypeMeta: metav1.TypeMeta{
APIVersion: "googlecloudpubsuboperator.quipper.github.io/v1",
Kind: "Subscription",
},
ObjectMeta: metav1.ObjectMeta{
GenerateName: "example-",
Namespace: "default",
},
Spec: googlecloudpubsuboperatorv1.SubscriptionSpec{
SubscriptionProjectID: projectID,
SubscriptionID: "my-subscription",
// CreateSubscription API should fail because the topic does not exist.
// We don't need to explicitly inject an error.
TopicProjectID: projectID,
TopicID: "invalid-topic",
},
}
Expect(k8sClient.Create(ctx, subscription)).Should(Succeed())
subscriptionRef := types.NamespacedName{Namespace: subscription.Namespace, Name: subscription.Name}

By("Checking if the status is Error")
Eventually(func(g Gomega) {
var subscription googlecloudpubsuboperatorv1.Subscription
g.Expect(k8sClient.Get(ctx, subscriptionRef, &subscription)).Should(Succeed())
g.Expect(subscription.Status.Phase).Should(Equal(googlecloudpubsuboperatorv1.SubscriptionStatusPhaseError))
}, 3*time.Second, 100*time.Millisecond).Should(Succeed())
})
})
})
1 change: 1 addition & 0 deletions internal/controller/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ var _ = BeforeSuite(func() {

psServer = pstest.NewServer(
pubsubtest.CreateTopicErrorInjectionReactor(),
pubsubtest.CreateSubscriptionErrorInjectionReactor(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may be unnecessary

)
DeferCleanup(func() {
Expect(psServer.Close()).Should(Succeed())
Expand Down
2 changes: 1 addition & 1 deletion internal/controller/topic_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ var _ = Describe("Topic controller", func() {
})

It("Should update the status if error", func(ctx context.Context) {
const projectID = "error-injected-project-1"
const projectID = "error-injected-project-topic-1"

By("Creating a Topic")
topic := &googlecloudpubsuboperatorv1.Topic{
Expand Down
15 changes: 15 additions & 0 deletions internal/pubsubtest/errorinjection.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,21 @@ func CreateTopicErrorInjectionReactor() pstest.ServerReactorOption {
}
}

func CreateSubscriptionErrorInjectionReactor() pstest.ServerReactorOption {
return pstest.ServerReactorOption{
FuncName: "CreateSubscription",
Reactor: errorInjectionReactorFunc(func(req interface{}) (bool, interface{}, error) {
topic, ok := req.(*pubsubpb.Topic)
if ok {
if strings.HasPrefix(topic.Name, "projects/error-injected-") {
return true, nil, status.Errorf(codes.InvalidArgument, "error injected")
}
}
return false, nil, nil
}),
}
}

type errorInjectionReactorFunc func(req interface{}) (bool, interface{}, error)

func (r errorInjectionReactorFunc) React(req interface{}) (bool, interface{}, error) {
Expand Down