diff --git a/api/v1/subscription_types.go b/api/v1/subscription_types.go index 79b4e6a..8b4d40f 100644 --- a/api/v1/subscription_types.go +++ b/api/v1/subscription_types.go @@ -23,15 +23,19 @@ import ( // SubscriptionSpec defines the desired state of Subscription type SubscriptionSpec struct { // subscription ID + //+kubebuilder:validation:XValidation:message="Immutable field",rule="self == oldSelf" SubscriptionID string `json:"subscriptionID,omitempty"` // project ID of subscription + //+kubebuilder:validation:XValidation:message="Immutable field",rule="self == oldSelf" SubscriptionProjectID string `json:"subscriptionProjectID,omitempty"` // topic ID + //+kubebuilder:validation:XValidation:message="Immutable field",rule="self == oldSelf" TopicID string `json:"topicID,omitempty"` // project ID of topic + //+kubebuilder:validation:XValidation:message="Immutable field",rule="self == oldSelf" TopicProjectID string `json:"topicProjectID,omitempty"` } diff --git a/api/v1/topic_types.go b/api/v1/topic_types.go index a88bc79..da5528d 100644 --- a/api/v1/topic_types.go +++ b/api/v1/topic_types.go @@ -23,9 +23,11 @@ import ( // TopicSpec defines the desired state of Topic type TopicSpec struct { // ID of project + //+kubebuilder:validation:XValidation:message="Immutable field",rule="self == oldSelf" ProjectID string `json:"projectID,omitempty"` // ID of topic + //+kubebuilder:validation:XValidation:message="Immutable field",rule="self == oldSelf" TopicID string `json:"topicID,omitempty"` } diff --git a/cmd/main.go b/cmd/main.go index a434bfb..f816937 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -21,6 +21,8 @@ import ( "os" "cloud.google.com/go/pubsub" + "k8s.io/utils/clock" + // Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.) // to ensure that exec-entrypoint and run can make use of them. _ "k8s.io/client-go/plugin/pkg/client/auth" @@ -106,6 +108,7 @@ func main() { Scheme: mgr.GetScheme(), NewClient: pubsub.NewClient, Recorder: mgr.GetEventRecorderFor("subscription-controller"), + Clock: clock.RealClock{}, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "Subscription") os.Exit(1) diff --git a/config/crd/bases/googlecloudpubsuboperator.quipper.github.io_subscriptions.yaml b/config/crd/bases/googlecloudpubsuboperator.quipper.github.io_subscriptions.yaml index 8d80d6a..0dbc4b0 100644 --- a/config/crd/bases/googlecloudpubsuboperator.quipper.github.io_subscriptions.yaml +++ b/config/crd/bases/googlecloudpubsuboperator.quipper.github.io_subscriptions.yaml @@ -37,15 +37,27 @@ spec: subscriptionID: description: subscription ID type: string + x-kubernetes-validations: + - message: Immutable field + rule: self == oldSelf subscriptionProjectID: description: project ID of subscription type: string + x-kubernetes-validations: + - message: Immutable field + rule: self == oldSelf topicID: description: topic ID type: string + x-kubernetes-validations: + - message: Immutable field + rule: self == oldSelf topicProjectID: description: project ID of topic type: string + x-kubernetes-validations: + - message: Immutable field + rule: self == oldSelf type: object status: description: SubscriptionStatus defines the observed state of Subscription diff --git a/config/crd/bases/googlecloudpubsuboperator.quipper.github.io_topics.yaml b/config/crd/bases/googlecloudpubsuboperator.quipper.github.io_topics.yaml index 07e4fa6..d698ab7 100644 --- a/config/crd/bases/googlecloudpubsuboperator.quipper.github.io_topics.yaml +++ b/config/crd/bases/googlecloudpubsuboperator.quipper.github.io_topics.yaml @@ -37,9 +37,15 @@ spec: projectID: description: ID of project type: string + x-kubernetes-validations: + - message: Immutable field + rule: self == oldSelf topicID: description: ID of topic type: string + x-kubernetes-validations: + - message: Immutable field + rule: self == oldSelf type: object status: description: TopicStatus defines the observed state of Topic diff --git a/internal/controller/subscription_controller.go b/internal/controller/subscription_controller.go index 43db8b8..6f9a6cd 100644 --- a/internal/controller/subscription_controller.go +++ b/internal/controller/subscription_controller.go @@ -25,6 +25,7 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/tools/record" + "k8s.io/utils/clock" ctrl "sigs.k8s.io/controller-runtime" crclient "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" @@ -35,12 +36,15 @@ import ( const subscriptionFinalizerName = "subscription.googlecloudpubsuboperator.quipper.github.io/finalizer" +const subscriptionCreationWarningDeadline = 10 * time.Minute + // SubscriptionReconciler reconciles a Subscription object type SubscriptionReconciler struct { crclient.Client Scheme *runtime.Scheme NewClient newPubSubClientFunc Recorder record.EventRecorder + Clock clock.PassiveClock } //+kubebuilder:rbac:groups=googlecloudpubsuboperator.quipper.github.io,resources=subscriptions,verbs=get;list;watch;create;update;patch;delete @@ -108,8 +112,13 @@ func (r *SubscriptionReconciler) Reconcile(ctx context.Context, req ctrl.Request return ctrl.Result{}, nil } - r.Recorder.Event(&subscription, corev1.EventTypeWarning, "SubscriptionCreateError", - fmt.Sprintf("Failed to create Subscription in Pub/Sub: %s", err)) + if r.Clock.Since(subscription.CreationTimestamp.Time) > subscriptionCreationWarningDeadline { + r.Recorder.Event(&subscription, corev1.EventTypeWarning, "SubscriptionCreateErrorDeadlineExceeded", + fmt.Sprintf("Failed to create Subscription for %s in Pub/Sub: %s", subscriptionCreationWarningDeadline, err)) + } else { + r.Recorder.Event(&subscription, corev1.EventTypeNormal, "SubscriptionCreateError", + fmt.Sprintf("Failed to create Subscription in Pub/Sub: %s", err)) + } subscriptionPatch := crclient.MergeFrom(subscription.DeepCopy()) subscription.Status.Phase = googlecloudpubsuboperatorv1.SubscriptionStatusPhaseError if err := r.Client.Status().Patch(ctx, &subscription, subscriptionPatch); err != nil { diff --git a/internal/controller/suite_test.go b/internal/controller/suite_test.go index ce1308e..2eb1acb 100644 --- a/internal/controller/suite_test.go +++ b/internal/controller/suite_test.go @@ -20,6 +20,7 @@ import ( "context" "path/filepath" "testing" + "time" "cloud.google.com/go/pubsub" "cloud.google.com/go/pubsub/pstest" @@ -29,6 +30,7 @@ import ( "google.golang.org/api/option" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" + clocktesting "k8s.io/utils/clock/testing" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/envtest" @@ -113,6 +115,8 @@ var _ = BeforeSuite(func() { Scheme: k8sManager.GetScheme(), NewClient: newClient, Recorder: k8sManager.GetEventRecorderFor("subscription-controller"), + // TODO: how to change the time in test code? + Clock: clocktesting.NewFakePassiveClock(time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC)), }).SetupWithManager(k8sManager) Expect(err).ToNot(HaveOccurred())