Skip to content

Commit

Permalink
Gather attributes of downstream resources (#61)
Browse files Browse the repository at this point in the history
  • Loading branch information
ryannedolan committed Feb 9, 2024
1 parent 1755ea7 commit 927057e
Show file tree
Hide file tree
Showing 18 changed files with 223 additions and 21 deletions.
17 changes: 16 additions & 1 deletion deploy/subscriptions.crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,23 @@ spec:
type: object
additionalProperties:
type: string
attributes:
description: Physical attributes of the job and sink/output table.
type: object
additionalProperties:
type: string
resources:
description: The YAML generated to implement this pipeline.
description: The yaml generated to implement this pipeline.
type: array
items:
type: string
jobResources:
description: The yaml generated to implement the job.
type: array
items:
type: string
downstreamResources:
description: The yaml generated to implement the sink/output table.
type: array
items:
type: string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.linkedin.hoptimator.operator.Operator;
import com.linkedin.hoptimator.operator.ConfigAssembler;
import com.linkedin.hoptimator.models.V1alpha1KafkaTopic;
import com.linkedin.hoptimator.models.V1alpha1KafkaTopicStatus;

import io.kubernetes.client.extended.controller.reconciler.Reconciler;
import io.kubernetes.client.extended.controller.reconciler.Request;
Expand Down Expand Up @@ -52,6 +53,10 @@ public Result reconcile(Request request) {
return new Result(false);
}

if (object.getStatus() == null) {
object.setStatus(new V1alpha1KafkaTopicStatus());
}

String topicName = object.getSpec().getTopicName();
Integer desiredPartitions = object.getSpec().getNumPartitions();
Integer desiredReplicationFactor = object.getSpec().getReplicationFactor();
Expand All @@ -72,22 +77,29 @@ public Result reconcile(Request request) {

log.info("Found existing topic {}", topicName);
int actualPartitions = topicDescription.partitions().size();
object.getStatus().setNumPartitions(actualPartitions);
if (desiredPartitions != null && desiredPartitions > actualPartitions) {
log.info("Desired partitions {} > actual partitions {}. Creating additional partitions.",
desiredPartitions, actualPartitions);
admin.createPartitions(Collections.singletonMap(topicName, NewPartitions.increaseTo(desiredPartitions))).all().get();
object.getStatus().setNumPartitions(desiredPartitions);
}
} catch(ExecutionException e) {
if (e.getCause() instanceof UnknownTopicOrPartitionException ) {
log.info("No existing topic {}. Will create it.", topicName);
admin.createTopics(Collections.singleton(new NewTopic(topicName, Optional.ofNullable(desiredPartitions),
Optional.ofNullable(desiredReplicationFactor).map(x -> x.shortValue())))).all().get();
object.getStatus().setNumPartitions(desiredPartitions);
} else {
throw e;
}
} finally {
admin.close();
}

operator.apiFor(KAFKATOPIC).updateStatus(object, x -> object.getStatus())
.onFailure((x, y) -> log.error("Failed to update status of KafkaTopic {}/{}: {}.", namespace, name,
y.getMessage()));
} catch (Exception e) {
log.error("Encountered exception while reconciling KafkaTopic {}/{}", namespace, name, e);
return new Result(true, operator.failureRetryDuration());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
* Access control rule (colloquially, an Acl)
*/
@ApiModel(description = "Access control rule (colloquially, an Acl)")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-05T21:19:50.466Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-09T06:53:38.470Z[Etc/UTC]")
public class V1alpha1Acl implements io.kubernetes.client.common.KubernetesObject {
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
@SerializedName(SERIALIZED_NAME_API_VERSION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
* AclList is a list of Acl
*/
@ApiModel(description = "AclList is a list of Acl")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-05T21:19:50.466Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-09T06:53:38.470Z[Etc/UTC]")
public class V1alpha1AclList implements io.kubernetes.client.common.KubernetesListObject {
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
@SerializedName(SERIALIZED_NAME_API_VERSION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
* A set of related ACL rules.
*/
@ApiModel(description = "A set of related ACL rules.")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-05T21:19:50.466Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-09T06:53:38.470Z[Etc/UTC]")
public class V1alpha1AclSpec {
/**
* The resource access method.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
* The resource being controlled.
*/
@ApiModel(description = "The resource being controlled.")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-05T21:19:50.466Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-09T06:53:38.470Z[Etc/UTC]")
public class V1alpha1AclSpecResource {
public static final String SERIALIZED_NAME_KIND = "kind";
@SerializedName(SERIALIZED_NAME_KIND)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
* Status, as set by the operator.
*/
@ApiModel(description = "Status, as set by the operator.")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-05T21:19:50.466Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-09T06:53:38.470Z[Etc/UTC]")
public class V1alpha1AclStatus {
public static final String SERIALIZED_NAME_MESSAGE = "message";
@SerializedName(SERIALIZED_NAME_MESSAGE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
* Kafka Topic
*/
@ApiModel(description = "Kafka Topic")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-05T21:19:50.466Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-09T06:53:38.470Z[Etc/UTC]")
public class V1alpha1KafkaTopic implements io.kubernetes.client.common.KubernetesObject {
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
@SerializedName(SERIALIZED_NAME_API_VERSION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
* KafkaTopicList is a list of KafkaTopic
*/
@ApiModel(description = "KafkaTopicList is a list of KafkaTopic")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-05T21:19:50.466Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-09T06:53:38.470Z[Etc/UTC]")
public class V1alpha1KafkaTopicList implements io.kubernetes.client.common.KubernetesListObject {
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
@SerializedName(SERIALIZED_NAME_API_VERSION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
* Desired Kafka topic configuration.
*/
@ApiModel(description = "Desired Kafka topic configuration.")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-05T21:19:50.466Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-09T06:53:38.470Z[Etc/UTC]")
public class V1alpha1KafkaTopicSpec {
public static final String SERIALIZED_NAME_CLIENT_CONFIGS = "clientConfigs";
@SerializedName(SERIALIZED_NAME_CLIENT_CONFIGS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
/**
* V1alpha1KafkaTopicSpecClientConfigs
*/
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-05T21:19:50.466Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-09T06:53:38.470Z[Etc/UTC]")
public class V1alpha1KafkaTopicSpecClientConfigs {
public static final String SERIALIZED_NAME_CONFIG_MAP_REF = "configMapRef";
@SerializedName(SERIALIZED_NAME_CONFIG_MAP_REF)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
* Reference to a ConfigMap to use for AdminClient configuration.
*/
@ApiModel(description = "Reference to a ConfigMap to use for AdminClient configuration.")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-05T21:19:50.466Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-09T06:53:38.470Z[Etc/UTC]")
public class V1alpha1KafkaTopicSpecConfigMapRef {
public static final String SERIALIZED_NAME_NAME = "name";
@SerializedName(SERIALIZED_NAME_NAME)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
* Current state of the topic.
*/
@ApiModel(description = "Current state of the topic.")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-05T21:19:50.466Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-09T06:53:38.470Z[Etc/UTC]")
public class V1alpha1KafkaTopicStatus {
public static final String SERIALIZED_NAME_MESSAGE = "message";
@SerializedName(SERIALIZED_NAME_MESSAGE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
* Hoptimator Subscription
*/
@ApiModel(description = "Hoptimator Subscription")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-05T21:19:50.466Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-09T06:53:38.470Z[Etc/UTC]")
public class V1alpha1Subscription implements io.kubernetes.client.common.KubernetesObject {
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
@SerializedName(SERIALIZED_NAME_API_VERSION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
* SubscriptionList is a list of Subscription
*/
@ApiModel(description = "SubscriptionList is a list of Subscription")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-05T21:19:50.466Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-09T06:53:38.470Z[Etc/UTC]")
public class V1alpha1SubscriptionList implements io.kubernetes.client.common.KubernetesListObject {
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
@SerializedName(SERIALIZED_NAME_API_VERSION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
* Subscription spec
*/
@ApiModel(description = "Subscription spec")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-05T21:19:50.466Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-09T06:53:38.470Z[Etc/UTC]")
public class V1alpha1SubscriptionSpec {
public static final String SERIALIZED_NAME_DATABASE = "database";
@SerializedName(SERIALIZED_NAME_DATABASE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,16 @@
* Filled in by the operator.
*/
@ApiModel(description = "Filled in by the operator.")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-05T21:19:50.466Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-09T06:53:38.470Z[Etc/UTC]")
public class V1alpha1SubscriptionStatus {
public static final String SERIALIZED_NAME_ATTRIBUTES = "attributes";
@SerializedName(SERIALIZED_NAME_ATTRIBUTES)
private Map<String, String> attributes = null;

public static final String SERIALIZED_NAME_DOWNSTREAM_RESOURCES = "downstreamResources";
@SerializedName(SERIALIZED_NAME_DOWNSTREAM_RESOURCES)
private List<String> downstreamResources = null;

public static final String SERIALIZED_NAME_FAILED = "failed";
@SerializedName(SERIALIZED_NAME_FAILED)
private Boolean failed;
Expand All @@ -42,6 +50,10 @@ public class V1alpha1SubscriptionStatus {
@SerializedName(SERIALIZED_NAME_HINTS)
private Map<String, String> hints = null;

public static final String SERIALIZED_NAME_JOB_RESOURCES = "jobResources";
@SerializedName(SERIALIZED_NAME_JOB_RESOURCES)
private List<String> jobResources = null;

public static final String SERIALIZED_NAME_MESSAGE = "message";
@SerializedName(SERIALIZED_NAME_MESSAGE)
private String message;
Expand All @@ -59,6 +71,68 @@ public class V1alpha1SubscriptionStatus {
private String sql;


public V1alpha1SubscriptionStatus attributes(Map<String, String> attributes) {

this.attributes = attributes;
return this;
}

public V1alpha1SubscriptionStatus putAttributesItem(String key, String attributesItem) {
if (this.attributes == null) {
this.attributes = new HashMap<>();
}
this.attributes.put(key, attributesItem);
return this;
}

/**
* Physical attributes of the job and sink/output table.
* @return attributes
**/
@javax.annotation.Nullable
@ApiModelProperty(value = "Physical attributes of the job and sink/output table.")

public Map<String, String> getAttributes() {
return attributes;
}


public void setAttributes(Map<String, String> attributes) {
this.attributes = attributes;
}


public V1alpha1SubscriptionStatus downstreamResources(List<String> downstreamResources) {

this.downstreamResources = downstreamResources;
return this;
}

public V1alpha1SubscriptionStatus addDownstreamResourcesItem(String downstreamResourcesItem) {
if (this.downstreamResources == null) {
this.downstreamResources = new ArrayList<>();
}
this.downstreamResources.add(downstreamResourcesItem);
return this;
}

/**
* The yaml generated to implement the sink/output table.
* @return downstreamResources
**/
@javax.annotation.Nullable
@ApiModelProperty(value = "The yaml generated to implement the sink/output table.")

public List<String> getDownstreamResources() {
return downstreamResources;
}


public void setDownstreamResources(List<String> downstreamResources) {
this.downstreamResources = downstreamResources;
}


public V1alpha1SubscriptionStatus failed(Boolean failed) {

this.failed = failed;
Expand Down Expand Up @@ -113,6 +187,37 @@ public void setHints(Map<String, String> hints) {
}


public V1alpha1SubscriptionStatus jobResources(List<String> jobResources) {

this.jobResources = jobResources;
return this;
}

public V1alpha1SubscriptionStatus addJobResourcesItem(String jobResourcesItem) {
if (this.jobResources == null) {
this.jobResources = new ArrayList<>();
}
this.jobResources.add(jobResourcesItem);
return this;
}

/**
* The yaml generated to implement the job.
* @return jobResources
**/
@javax.annotation.Nullable
@ApiModelProperty(value = "The yaml generated to implement the job.")

public List<String> getJobResources() {
return jobResources;
}


public void setJobResources(List<String> jobResources) {
this.jobResources = jobResources;
}


public V1alpha1SubscriptionStatus message(String message) {

this.message = message;
Expand Down Expand Up @@ -174,11 +279,11 @@ public V1alpha1SubscriptionStatus addResourcesItem(String resourcesItem) {
}

/**
* The YAML generated to implement this pipeline.
* The yaml generated to implement this pipeline.
* @return resources
**/
@javax.annotation.Nullable
@ApiModelProperty(value = "The YAML generated to implement this pipeline.")
@ApiModelProperty(value = "The yaml generated to implement this pipeline.")

public List<String> getResources() {
return resources;
Expand Down Expand Up @@ -222,8 +327,11 @@ public boolean equals(Object o) {
return false;
}
V1alpha1SubscriptionStatus v1alpha1SubscriptionStatus = (V1alpha1SubscriptionStatus) o;
return Objects.equals(this.failed, v1alpha1SubscriptionStatus.failed) &&
return Objects.equals(this.attributes, v1alpha1SubscriptionStatus.attributes) &&
Objects.equals(this.downstreamResources, v1alpha1SubscriptionStatus.downstreamResources) &&
Objects.equals(this.failed, v1alpha1SubscriptionStatus.failed) &&
Objects.equals(this.hints, v1alpha1SubscriptionStatus.hints) &&
Objects.equals(this.jobResources, v1alpha1SubscriptionStatus.jobResources) &&
Objects.equals(this.message, v1alpha1SubscriptionStatus.message) &&
Objects.equals(this.ready, v1alpha1SubscriptionStatus.ready) &&
Objects.equals(this.resources, v1alpha1SubscriptionStatus.resources) &&
Expand All @@ -232,16 +340,19 @@ public boolean equals(Object o) {

@Override
public int hashCode() {
return Objects.hash(failed, hints, message, ready, resources, sql);
return Objects.hash(attributes, downstreamResources, failed, hints, jobResources, message, ready, resources, sql);
}


@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("class V1alpha1SubscriptionStatus {\n");
sb.append(" attributes: ").append(toIndentedString(attributes)).append("\n");
sb.append(" downstreamResources: ").append(toIndentedString(downstreamResources)).append("\n");
sb.append(" failed: ").append(toIndentedString(failed)).append("\n");
sb.append(" hints: ").append(toIndentedString(hints)).append("\n");
sb.append(" jobResources: ").append(toIndentedString(jobResources)).append("\n");
sb.append(" message: ").append(toIndentedString(message)).append("\n");
sb.append(" ready: ").append(toIndentedString(ready)).append("\n");
sb.append(" resources: ").append(toIndentedString(resources)).append("\n");
Expand Down
Loading

0 comments on commit 927057e

Please sign in to comment.