Skip to content

Commit

Permalink
Add subscription status
Browse files Browse the repository at this point in the history
  • Loading branch information
int128 committed Aug 31, 2023
1 parent bc6069e commit e74a8ca
Show file tree
Hide file tree
Showing 9 changed files with 95 additions and 13 deletions.
10 changes: 8 additions & 2 deletions api/v1/subscription_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,16 @@ type SubscriptionSpec struct {

// SubscriptionStatus defines the observed state of Subscription
type SubscriptionStatus struct {
// INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
// Important: Run "make" to regenerate code after modifying this file
Phase SubscriptionStatusPhase `json:"phase,omitempty"`
}

type SubscriptionStatusPhase string

const (
SubscriptionStatusPhaseActive SubscriptionStatusPhase = "Active"
SubscriptionStatusPhaseError SubscriptionStatusPhase = "Error"
)

//+kubebuilder:object:root=true
//+kubebuilder:subresource:status

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.12.0
controller-gen.kubebuilder.io/version: v0.11.3
creationTimestamp: null
name: subscriptions.googlecloudpubsuboperator.quipper.github.io
spec:
group: googlecloudpubsuboperator.quipper.github.io
Expand Down Expand Up @@ -49,6 +50,9 @@ spec:
type: object
status:
description: SubscriptionStatus defines the observed state of Subscription
properties:
phase:
type: string
type: object
type: object
served: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.12.0
controller-gen.kubebuilder.io/version: v0.11.3
creationTimestamp: null
name: topics.googlecloudpubsuboperator.quipper.github.io
spec:
group: googlecloudpubsuboperator.quipper.github.io
Expand Down
1 change: 1 addition & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
creationTimestamp: null
name: manager-role
rules:
- apiGroups:
Expand Down
25 changes: 22 additions & 3 deletions internal/controller/subscription_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ func (r *SubscriptionReconciler) Reconcile(ctx context.Context, req ctrl.Request
// on deleted requests.
return ctrl.Result{}, crclient.IgnoreNotFound(err)
}

logger.Info("Found the subscription", "subscription", subscription)
logger.Info("Found the subscription resource")

// examine DeletionTimestamp to determine if object is under deletion
if subscription.ObjectMeta.DeletionTimestamp.IsZero() {
Expand Down Expand Up @@ -95,14 +94,34 @@ func (r *SubscriptionReconciler) Reconcile(ctx context.Context, req ctrl.Request
if isPubSubAlreadyExistsError(err) {
// don't treat as error
logger.Info("PubSub subscription already exists")
subscriptionPatch := crclient.MergeFrom(subscription.DeepCopy())
subscription.Status.Phase = googlecloudpubsuboperatorv1.SubscriptionStatusPhaseActive
if err := r.Client.Status().Patch(ctx, &subscription, subscriptionPatch); err != nil {
logger.Error(err, "unable to update status")
return ctrl.Result{}, err
}
logger.Info("Subscription status has been patched to Active")
return ctrl.Result{}, nil
}

subscriptionPatch := crclient.MergeFrom(subscription.DeepCopy())
subscription.Status.Phase = googlecloudpubsuboperatorv1.SubscriptionStatusPhaseError
if err := r.Client.Status().Patch(ctx, &subscription, subscriptionPatch); err != nil {
logger.Error(err, "unable to update status")
return ctrl.Result{}, err
}
logger.Info("Subscription status has been patched to Error")
return ctrl.Result{}, err
}

logger.Info(fmt.Sprintf("Subscription created: %v", s.ID()), "subscription", subscription)

subscriptionPatch := crclient.MergeFrom(subscription.DeepCopy())
subscription.Status.Phase = googlecloudpubsuboperatorv1.SubscriptionStatusPhaseActive
if err := r.Client.Status().Patch(ctx, &subscription, subscriptionPatch); err != nil {
logger.Error(err, "unable to update status")
return ctrl.Result{}, err
}
logger.Info("Subscription status has been patched to Active")
return ctrl.Result{}, nil
}

Expand Down
45 changes: 40 additions & 5 deletions internal/controller/subscription_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,29 +9,31 @@ import (
googlecloudpubsuboperatorv1 "github.com/quipper/google-cloud-pubsub-operator/api/v1"
"github.com/quipper/google-cloud-pubsub-operator/internal/pubsubtest"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
//+kubebuilder:scaffold:imports
)

var _ = Describe("Subscription controller", func() {
Context("When creating a Subscription resource", func() {
const projectID = "subscription-project"
It("Should create a Pub/Sub Subscription", func(ctx context.Context) {
const projectID = "subscription-project-1"
psClient, err := pubsubtest.NewClient(ctx, projectID, psServer)
Expect(err).ShouldNot(HaveOccurred())

By("Creating a Topic")
topicID := "my-topic"
_, err = psClient.CreateTopic(ctx, topicID)
Expect(err).ShouldNot(HaveOccurred())

By("Creating a Subscription")
topic := &googlecloudpubsuboperatorv1.Subscription{
subscription := &googlecloudpubsuboperatorv1.Subscription{
TypeMeta: metav1.TypeMeta{
APIVersion: "googlecloudpubsuboperator.quipper.github.io/v1",
Kind: "Subscription",
},
ObjectMeta: metav1.ObjectMeta{
Name: "example",
Namespace: "default",
GenerateName: "example-",
Namespace: "default",
},
Spec: googlecloudpubsuboperatorv1.SubscriptionSpec{
SubscriptionProjectID: projectID,
Expand All @@ -40,7 +42,7 @@ var _ = Describe("Subscription controller", func() {
TopicID: topicID,
},
}
Expect(k8sClient.Create(ctx, topic)).Should(Succeed())
Expect(k8sClient.Create(ctx, subscription)).Should(Succeed())

By("Checking if the Subscription exists")
Eventually(func(g Gomega) {
Expand All @@ -49,5 +51,38 @@ var _ = Describe("Subscription controller", func() {
g.Expect(subscriptionExists).Should(BeTrue())
}, 3*time.Second, 100*time.Millisecond).Should(Succeed())
})

It("Should update the status if error", func(ctx context.Context) {
const projectID = "subscription-project-2"

By("Creating a Subscription")
subscription := &googlecloudpubsuboperatorv1.Subscription{
TypeMeta: metav1.TypeMeta{
APIVersion: "googlecloudpubsuboperator.quipper.github.io/v1",
Kind: "Subscription",
},
ObjectMeta: metav1.ObjectMeta{
GenerateName: "example-",
Namespace: "default",
},
Spec: googlecloudpubsuboperatorv1.SubscriptionSpec{
SubscriptionProjectID: projectID,
SubscriptionID: "my-subscription",
// CreateSubscription API should fail because the topic does not exist.
// We don't need to explicitly inject an error.
TopicProjectID: projectID,
TopicID: "invalid-topic",
},
}
Expect(k8sClient.Create(ctx, subscription)).Should(Succeed())
subscriptionRef := types.NamespacedName{Namespace: subscription.Namespace, Name: subscription.Name}

By("Checking if the status is Error")
Eventually(func(g Gomega) {
var subscription googlecloudpubsuboperatorv1.Subscription
g.Expect(k8sClient.Get(ctx, subscriptionRef, &subscription)).Should(Succeed())
g.Expect(subscription.Status.Phase).Should(Equal(googlecloudpubsuboperatorv1.SubscriptionStatusPhaseError))
}, 3*time.Second, 100*time.Millisecond).Should(Succeed())
})
})
})
1 change: 1 addition & 0 deletions internal/controller/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ var _ = BeforeSuite(func() {

psServer = pstest.NewServer(
pubsubtest.CreateTopicErrorInjectionReactor(),
pubsubtest.CreateSubscriptionErrorInjectionReactor(),
)
DeferCleanup(func() {
Expect(psServer.Close()).Should(Succeed())
Expand Down
2 changes: 1 addition & 1 deletion internal/controller/topic_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ var _ = Describe("Topic controller", func() {
})

It("Should update the status if error", func(ctx context.Context) {
const projectID = "error-injected-project-1"
const projectID = "error-injected-project-topic-1"

By("Creating a Topic")
topic := &googlecloudpubsuboperatorv1.Topic{
Expand Down
15 changes: 15 additions & 0 deletions internal/pubsubtest/errorinjection.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,21 @@ func CreateTopicErrorInjectionReactor() pstest.ServerReactorOption {
}
}

func CreateSubscriptionErrorInjectionReactor() pstest.ServerReactorOption {
return pstest.ServerReactorOption{
FuncName: "CreateSubscription",
Reactor: errorInjectionReactorFunc(func(req interface{}) (bool, interface{}, error) {
topic, ok := req.(*pubsubpb.Topic)
if ok {
if strings.HasPrefix(topic.Name, "projects/error-injected-") {
return true, nil, status.Errorf(codes.InvalidArgument, "error injected")
}
}
return false, nil, nil
}),
}
}

type errorInjectionReactorFunc func(req interface{}) (bool, interface{}, error)

func (r errorInjectionReactorFunc) React(req interface{}) (bool, interface{}, error) {
Expand Down

0 comments on commit e74a8ca

Please sign in to comment.