diff --git a/Makefile b/Makefile index 88d3f9f..2f6ff5e 100644 --- a/Makefile +++ b/Makefile @@ -34,7 +34,8 @@ deploy-dev-environment: deploy-samples: deploy kubectl wait --for=condition=Established=True \ crds/subscriptions.hoptimator.linkedin.com \ - crds/kafkatopics.hoptimator.linkedin.com + crds/kafkatopics.hoptimator.linkedin.com \ + crds/sqljobs.hoptimator.linkedin.com kubectl apply -f ./deploy/samples deploy-config: diff --git a/deploy/rbac.yaml b/deploy/rbac.yaml index bb3785b..023a87e 100644 --- a/deploy/rbac.yaml +++ b/deploy/rbac.yaml @@ -5,10 +5,10 @@ metadata: name: hoptimator-operator rules: - apiGroups: ["hoptimator.linkedin.com"] - resources: ["acls", "kafkatopics", "subscriptions"] + resources: ["acls", "kafkatopics", "subscriptions", "sqljobs"] verbs: ["get", "watch", "list", "update", "create"] - apiGroups: ["hoptimator.linkedin.com"] - resources: ["kafkatopics/status", "subscriptions/status", "acls/status"] + resources: ["kafkatopics/status", "subscriptions/status", "acls/status", "sqljobs/status"] verbs: ["get", "patch"] - apiGroups: ["flink.apache.org"] resources: ["flinkdeployments"] diff --git a/deploy/samples/sqljobs.yaml b/deploy/samples/sqljobs.yaml new file mode 100644 index 0000000..6b57f1f --- /dev/null +++ b/deploy/samples/sqljobs.yaml @@ -0,0 +1,10 @@ +apiVersion: hoptimator.linkedin.com/v1alpha1 +kind: SqlJob +metadata: + name: hello-world +spec: + dialect: Flink + executionMode: Streaming + sql: + - create table bh (text varchar) with ('connector' = 'blackhole'); + - insert into bh values ('hello world'); diff --git a/deploy/sqljobs.crd.yaml b/deploy/sqljobs.crd.yaml new file mode 100644 index 0000000..0425354 --- /dev/null +++ b/deploy/sqljobs.crd.yaml @@ -0,0 +1,87 @@ +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: sqljobs.hoptimator.linkedin.com +spec: + group: hoptimator.linkedin.com + names: + kind: SqlJob + listKind: SqlJobList + plural: sqljobs + singular: sqljob + shortNames: + - sql + - sj + preserveUnknownFields: false + scope: Namespaced + versions: + - name: v1alpha1 + served: true + storage: true + schema: + openAPIV3Schema: + description: Hoptimator generic SQL job + type: object + properties: + apiVersion: + type: string + kind: + type: string + metadata: + type: object + spec: + description: SQL job spec + type: object + properties: + sql: + description: SQL script the job should run. + type: array + items: + type: string + dialect: + description: Flink, etc. + type: string + enum: + - Flink + default: Flink + executionMode: + description: Streaming or Batch. + type: string + enum: + - Streaming + - Batch + default: Streaming + required: + - sql + status: + description: Filled in by the operator. + type: object + properties: + ready: + description: Whether the SqlJob is running or completed. + type: boolean + failed: + description: Whether the SqlJob has failed. + type: boolean + message: + description: Error or success message, for information only. + type: string + sql: + description: The SQL being implemented by this SqlJob. + type: string + subresources: + status: {} + additionalPrinterColumns: + - name: DIALECT + type: string + description: SQL dialect. + jsonPath: .spec.dialect + - name: MODE + type: string + description: Execution mode. + jsonPath: .spec.executionMode + - name: STATUS + type: string + description: Job status. + jsonPath: .status.message + diff --git a/hoptimator-flink-adapter/build.gradle b/hoptimator-flink-adapter/build.gradle index 16752fc..19ea302 100644 --- a/hoptimator-flink-adapter/build.gradle +++ b/hoptimator-flink-adapter/build.gradle @@ -6,6 +6,7 @@ plugins { dependencies { implementation project(':hoptimator-catalog') + implementation project(':hoptimator-models') implementation project(':hoptimator-operator') implementation libs.kubernetesClient implementation libs.kubernetesExtendedClient diff --git a/hoptimator-flink-adapter/src/main/java/com/linkedin/hoptimator/catalog/flink/FlinkStreamingSqlJob.java b/hoptimator-flink-adapter/src/main/java/com/linkedin/hoptimator/catalog/flink/FlinkStreamingSqlJob.java new file mode 100644 index 0000000..92cc0d3 --- /dev/null +++ b/hoptimator-flink-adapter/src/main/java/com/linkedin/hoptimator/catalog/flink/FlinkStreamingSqlJob.java @@ -0,0 +1,13 @@ +package com.linkedin.hoptimator.catalog.flink; + +import com.linkedin.hoptimator.catalog.Resource; + +public class FlinkStreamingSqlJob extends Resource { + + public FlinkStreamingSqlJob(String namespace, String name, String sql) { + super("FlinkStreamingSqlJob"); + export("namespace", namespace); + export("name", name); + export("sql", sql); + } +} diff --git a/hoptimator-flink-adapter/src/main/java/com/linkedin/hoptimator/operator/flink/FlinkControllerProvider.java b/hoptimator-flink-adapter/src/main/java/com/linkedin/hoptimator/operator/flink/FlinkControllerProvider.java index 0cda8e7..bac1c55 100644 --- a/hoptimator-flink-adapter/src/main/java/com/linkedin/hoptimator/operator/flink/FlinkControllerProvider.java +++ b/hoptimator-flink-adapter/src/main/java/com/linkedin/hoptimator/operator/flink/FlinkControllerProvider.java @@ -1,9 +1,13 @@ package com.linkedin.hoptimator.operator.flink; +import com.linkedin.hoptimator.models.V1alpha1SqlJob; +import com.linkedin.hoptimator.models.V1alpha1SqlJobList; import com.linkedin.hoptimator.operator.ControllerProvider; import com.linkedin.hoptimator.operator.Operator; import io.kubernetes.client.extended.controller.Controller; +import io.kubernetes.client.extended.controller.builder.ControllerBuilder; +import io.kubernetes.client.extended.controller.reconciler.Reconciler; import java.util.Collection; import java.util.Collections; @@ -16,7 +20,17 @@ public Collection controllers(Operator operator) { operator.registerApi("FlinkDeployment", "flinkdeployment", "flinkdeployments", "flink.apache.org", "v1beta1"); - // We don't need a controller - return Collections.emptyList(); + operator.registerApi("SqlJob", "sqljob", "sqljobs", + "hoptimator.linkedin.com", "v1alpha1", V1alpha1SqlJob.class, V1alpha1SqlJobList.class); + + Reconciler reconciler = new FlinkStreamingSqlJobReconciler(operator); + Controller controller = ControllerBuilder.defaultBuilder(operator.informerFactory()) + .withReconciler(reconciler) + .withName("flink-streaming-sql-job-controller") + .withWorkerCount(1) + .watch(x -> ControllerBuilder.controllerWatchBuilder(V1alpha1SqlJob.class, x).build()) + .build(); + + return Collections.singleton(controller); } } diff --git a/hoptimator-flink-adapter/src/main/java/com/linkedin/hoptimator/operator/flink/FlinkStreamingSqlJobReconciler.java b/hoptimator-flink-adapter/src/main/java/com/linkedin/hoptimator/operator/flink/FlinkStreamingSqlJobReconciler.java new file mode 100644 index 0000000..052201a --- /dev/null +++ b/hoptimator-flink-adapter/src/main/java/com/linkedin/hoptimator/operator/flink/FlinkStreamingSqlJobReconciler.java @@ -0,0 +1,109 @@ +package com.linkedin.hoptimator.operator.flink; + +import com.linkedin.hoptimator.catalog.Resource; +import com.linkedin.hoptimator.catalog.flink.FlinkStreamingSqlJob; +import com.linkedin.hoptimator.operator.Operator; +import com.linkedin.hoptimator.models.V1alpha1SqlJob; +import com.linkedin.hoptimator.models.V1alpha1SqlJobSpec.DialectEnum; +import com.linkedin.hoptimator.models.V1alpha1SqlJobSpec.ExecutionModeEnum; +import com.linkedin.hoptimator.models.V1alpha1SqlJobStatus; + +import io.kubernetes.client.extended.controller.reconciler.Reconciler; +import io.kubernetes.client.extended.controller.reconciler.Request; +import io.kubernetes.client.extended.controller.reconciler.Result; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.stream.Collectors; + +/** + * Manifests streaming SqlJobs as Flink jobs. + * + */ +public class FlinkStreamingSqlJobReconciler implements Reconciler { + private final static Logger log = LoggerFactory.getLogger(FlinkStreamingSqlJobReconciler.class); + private final static String SQLJOB = "hoptimator.linkedin.com/v1alpha1/SqlJob"; + + private final Operator operator; + + public FlinkStreamingSqlJobReconciler(Operator operator) { + this.operator = operator; + } + + @Override + public Result reconcile(Request request) { + log.info("Reconciling request {}", request); + String name = request.getName(); + String namespace = request.getNamespace(); + Result result = new Result(true, operator.pendingRetryDuration()); + + try { + V1alpha1SqlJob object = operator.fetch(SQLJOB, namespace, name); + + if (object == null) { + log.info("Object {}/{} deleted. Skipping."); + return new Result(false); + } + + V1alpha1SqlJobStatus status = object.getStatus(); + if (status == null) { + status = new V1alpha1SqlJobStatus(); + object.setStatus(status); + } + + List sql = object.getSpec().getSql(); + String script = sql.stream().collect(Collectors.joining(";\n")); + + DialectEnum dialect = object.getSpec().getDialect(); + if (!DialectEnum.FLINK.equals(dialect)) { + log.info("Not Flink SQL. Skipping."); + return new Result(false); + } + + ExecutionModeEnum mode = object.getSpec().getExecutionMode(); + if (!ExecutionModeEnum.STREAMING.equals(mode)) { + log.info("Not a streaming job. Skipping."); + return new Result(false); + } + + Resource.TemplateFactory templateFactory = new Resource.SimpleTemplateFactory(Resource.Environment.EMPTY); + Resource sqlJob = new FlinkStreamingSqlJob(namespace, name, script); + boolean allReady = true; + boolean anyFailed = false; + for (String yaml : sqlJob.render(templateFactory)) { + operator.apply(yaml, object); + if (!operator.isReady(yaml)) { + allReady = false; + } + if (operator.isFailed(yaml)) { + anyFailed = true; + allReady = false; + } + } + + object.getStatus().setReady(allReady); + object.getStatus().setFailed(anyFailed); + + if (allReady) { + object.getStatus().setMessage("Ready."); + result = new Result(false); // done + } + if (anyFailed) { + object.getStatus().setMessage("Failed."); + result = new Result(false); // done + } + + operator.apiFor(SQLJOB).updateStatus(object, x -> object.getStatus()) + .onFailure((x, y) -> log.error("Failed to update status of SqlJob {}: {}.", name, y.getMessage())) + .throwsApiException(); + + } catch (Exception e) { + log.error("Encountered exception while reconciling Flink streaming SqlJob {}/{}", namespace, name, e); + return new Result(true, operator.failureRetryDuration()); + } + return result; + } +} + diff --git a/hoptimator-flink-adapter/src/main/resources/FlinkStreamingSqlJob.yaml.template b/hoptimator-flink-adapter/src/main/resources/FlinkStreamingSqlJob.yaml.template new file mode 100644 index 0000000..e72a8a0 --- /dev/null +++ b/hoptimator-flink-adapter/src/main/resources/FlinkStreamingSqlJob.yaml.template @@ -0,0 +1,29 @@ +apiVersion: flink.apache.org/v1beta1 +kind: FlinkDeployment +metadata: + namespace: {{namespace}} + name: {{name}}-flink-job +spec: + image: docker.io/library/hoptimator-flink-runner + imagePullPolicy: Never + flinkVersion: v1_16 + flinkConfiguration: + taskmanager.numberOfTaskSlots: "1" + serviceAccount: flink + jobManager: + resource: + memory: "2048m" + cpu: 0.1 + taskManager: + resource: + memory: "2048m" + cpu: 0.1 + job: + entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner + args: + - {{sql}} + jarURI: local:///opt/hoptimator-flink-runner.jar + parallelism: 1 + upgradeMode: stateless + state: running + diff --git a/hoptimator-models/generate-models.sh b/hoptimator-models/generate-models.sh index 473697f..ced58d5 100755 --- a/hoptimator-models/generate-models.sh +++ b/hoptimator-models/generate-models.sh @@ -13,4 +13,5 @@ docker run \ -u "$(pwd)/deploy/acls.crd.yaml" \ -u "$(pwd)/deploy/kafkatopics.crd.yaml" \ -u "$(pwd)/deploy/subscriptions.crd.yaml" \ + -u "$(pwd)/deploy/sqljobs.crd.yaml" \ && echo "done." diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1Acl.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1Acl.java index 9df4d51..50b1e0a 100644 --- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1Acl.java +++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1Acl.java @@ -31,7 +31,7 @@ * Access control rule (colloquially, an Acl) */ @ApiModel(description = "Access control rule (colloquially, an Acl)") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-09T06:53:38.470Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-07-26T20:21:01.735Z[Etc/UTC]") public class V1alpha1Acl implements io.kubernetes.client.common.KubernetesObject { public static final String SERIALIZED_NAME_API_VERSION = "apiVersion"; @SerializedName(SERIALIZED_NAME_API_VERSION) diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclList.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclList.java index 8334649..5c88d08 100644 --- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclList.java +++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclList.java @@ -32,7 +32,7 @@ * AclList is a list of Acl */ @ApiModel(description = "AclList is a list of Acl") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-09T06:53:38.470Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-07-26T20:21:01.735Z[Etc/UTC]") public class V1alpha1AclList implements io.kubernetes.client.common.KubernetesListObject { public static final String SERIALIZED_NAME_API_VERSION = "apiVersion"; @SerializedName(SERIALIZED_NAME_API_VERSION) diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclSpec.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclSpec.java index 9ee42fc..fd1ddaf 100644 --- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclSpec.java +++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclSpec.java @@ -29,7 +29,7 @@ * A set of related ACL rules. */ @ApiModel(description = "A set of related ACL rules.") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-09T06:53:38.470Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-07-26T20:21:01.735Z[Etc/UTC]") public class V1alpha1AclSpec { /** * The resource access method. diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclSpecResource.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclSpecResource.java index 522ed41..f89782a 100644 --- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclSpecResource.java +++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclSpecResource.java @@ -28,7 +28,7 @@ * The resource being controlled. */ @ApiModel(description = "The resource being controlled.") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-09T06:53:38.470Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-07-26T20:21:01.735Z[Etc/UTC]") public class V1alpha1AclSpecResource { public static final String SERIALIZED_NAME_KIND = "kind"; @SerializedName(SERIALIZED_NAME_KIND) diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclStatus.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclStatus.java index 3d083b8..8b22465 100644 --- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclStatus.java +++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclStatus.java @@ -28,7 +28,7 @@ * Status, as set by the operator. */ @ApiModel(description = "Status, as set by the operator.") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-09T06:53:38.470Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-07-26T20:21:01.735Z[Etc/UTC]") public class V1alpha1AclStatus { public static final String SERIALIZED_NAME_MESSAGE = "message"; @SerializedName(SERIALIZED_NAME_MESSAGE) diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopic.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopic.java index 61a5b24..bd70046 100644 --- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopic.java +++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopic.java @@ -31,7 +31,7 @@ * Kafka Topic */ @ApiModel(description = "Kafka Topic") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-09T06:53:38.470Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-07-26T20:21:01.735Z[Etc/UTC]") public class V1alpha1KafkaTopic implements io.kubernetes.client.common.KubernetesObject { public static final String SERIALIZED_NAME_API_VERSION = "apiVersion"; @SerializedName(SERIALIZED_NAME_API_VERSION) diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicList.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicList.java index 0af5478..448761c 100644 --- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicList.java +++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicList.java @@ -32,7 +32,7 @@ * KafkaTopicList is a list of KafkaTopic */ @ApiModel(description = "KafkaTopicList is a list of KafkaTopic") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-09T06:53:38.470Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-07-26T20:21:01.735Z[Etc/UTC]") public class V1alpha1KafkaTopicList implements io.kubernetes.client.common.KubernetesListObject { public static final String SERIALIZED_NAME_API_VERSION = "apiVersion"; @SerializedName(SERIALIZED_NAME_API_VERSION) diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpec.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpec.java index 74f2b5c..d81d7cc 100644 --- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpec.java +++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpec.java @@ -33,7 +33,7 @@ * Desired Kafka topic configuration. */ @ApiModel(description = "Desired Kafka topic configuration.") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-09T06:53:38.470Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-07-26T20:21:01.735Z[Etc/UTC]") public class V1alpha1KafkaTopicSpec { public static final String SERIALIZED_NAME_CLIENT_CONFIGS = "clientConfigs"; @SerializedName(SERIALIZED_NAME_CLIENT_CONFIGS) diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpecClientConfigs.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpecClientConfigs.java index 5081585..e58e490 100644 --- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpecClientConfigs.java +++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpecClientConfigs.java @@ -28,7 +28,7 @@ /** * V1alpha1KafkaTopicSpecClientConfigs */ -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-09T06:53:38.470Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-07-26T20:21:01.735Z[Etc/UTC]") public class V1alpha1KafkaTopicSpecClientConfigs { public static final String SERIALIZED_NAME_CONFIG_MAP_REF = "configMapRef"; @SerializedName(SERIALIZED_NAME_CONFIG_MAP_REF) diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpecConfigMapRef.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpecConfigMapRef.java index 7c8b924..2be1faa 100644 --- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpecConfigMapRef.java +++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpecConfigMapRef.java @@ -28,7 +28,7 @@ * Reference to a ConfigMap to use for AdminClient configuration. */ @ApiModel(description = "Reference to a ConfigMap to use for AdminClient configuration.") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-09T06:53:38.470Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-07-26T20:21:01.735Z[Etc/UTC]") public class V1alpha1KafkaTopicSpecConfigMapRef { public static final String SERIALIZED_NAME_NAME = "name"; @SerializedName(SERIALIZED_NAME_NAME) diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicStatus.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicStatus.java index f29d602..0544422 100644 --- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicStatus.java +++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicStatus.java @@ -28,7 +28,7 @@ * Current state of the topic. */ @ApiModel(description = "Current state of the topic.") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-09T06:53:38.470Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-07-26T20:21:01.735Z[Etc/UTC]") public class V1alpha1KafkaTopicStatus { public static final String SERIALIZED_NAME_MESSAGE = "message"; @SerializedName(SERIALIZED_NAME_MESSAGE) diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SqlJob.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SqlJob.java new file mode 100644 index 0000000..db8a84c --- /dev/null +++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SqlJob.java @@ -0,0 +1,219 @@ +/* + * Kubernetes + * No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator) + * + * The version of the OpenAPI document: v1.21.1 + * + * + * NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech). + * https://openapi-generator.tech + * Do not edit the class manually. + */ + + +package com.linkedin.hoptimator.models; + +import java.util.Objects; +import java.util.Arrays; +import com.google.gson.TypeAdapter; +import com.google.gson.annotations.JsonAdapter; +import com.google.gson.annotations.SerializedName; +import com.google.gson.stream.JsonReader; +import com.google.gson.stream.JsonWriter; +import com.linkedin.hoptimator.models.V1alpha1SqlJobSpec; +import com.linkedin.hoptimator.models.V1alpha1SqlJobStatus; +import io.kubernetes.client.openapi.models.V1ObjectMeta; +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import java.io.IOException; + +/** + * Hoptimator generic SQL job + */ +@ApiModel(description = "Hoptimator generic SQL job") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-07-26T20:21:01.735Z[Etc/UTC]") +public class V1alpha1SqlJob implements io.kubernetes.client.common.KubernetesObject { + public static final String SERIALIZED_NAME_API_VERSION = "apiVersion"; + @SerializedName(SERIALIZED_NAME_API_VERSION) + private String apiVersion; + + public static final String SERIALIZED_NAME_KIND = "kind"; + @SerializedName(SERIALIZED_NAME_KIND) + private String kind; + + public static final String SERIALIZED_NAME_METADATA = "metadata"; + @SerializedName(SERIALIZED_NAME_METADATA) + private V1ObjectMeta metadata = null; + + public static final String SERIALIZED_NAME_SPEC = "spec"; + @SerializedName(SERIALIZED_NAME_SPEC) + private V1alpha1SqlJobSpec spec; + + public static final String SERIALIZED_NAME_STATUS = "status"; + @SerializedName(SERIALIZED_NAME_STATUS) + private V1alpha1SqlJobStatus status; + + + public V1alpha1SqlJob apiVersion(String apiVersion) { + + this.apiVersion = apiVersion; + return this; + } + + /** + * APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources + * @return apiVersion + **/ + @javax.annotation.Nullable + @ApiModelProperty(value = "APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources") + + public String getApiVersion() { + return apiVersion; + } + + + public void setApiVersion(String apiVersion) { + this.apiVersion = apiVersion; + } + + + public V1alpha1SqlJob kind(String kind) { + + this.kind = kind; + return this; + } + + /** + * Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds + * @return kind + **/ + @javax.annotation.Nullable + @ApiModelProperty(value = "Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds") + + public String getKind() { + return kind; + } + + + public void setKind(String kind) { + this.kind = kind; + } + + + public V1alpha1SqlJob metadata(V1ObjectMeta metadata) { + + this.metadata = metadata; + return this; + } + + /** + * Get metadata + * @return metadata + **/ + @javax.annotation.Nullable + @ApiModelProperty(value = "") + + public V1ObjectMeta getMetadata() { + return metadata; + } + + + public void setMetadata(V1ObjectMeta metadata) { + this.metadata = metadata; + } + + + public V1alpha1SqlJob spec(V1alpha1SqlJobSpec spec) { + + this.spec = spec; + return this; + } + + /** + * Get spec + * @return spec + **/ + @javax.annotation.Nullable + @ApiModelProperty(value = "") + + public V1alpha1SqlJobSpec getSpec() { + return spec; + } + + + public void setSpec(V1alpha1SqlJobSpec spec) { + this.spec = spec; + } + + + public V1alpha1SqlJob status(V1alpha1SqlJobStatus status) { + + this.status = status; + return this; + } + + /** + * Get status + * @return status + **/ + @javax.annotation.Nullable + @ApiModelProperty(value = "") + + public V1alpha1SqlJobStatus getStatus() { + return status; + } + + + public void setStatus(V1alpha1SqlJobStatus status) { + this.status = status; + } + + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + V1alpha1SqlJob v1alpha1SqlJob = (V1alpha1SqlJob) o; + return Objects.equals(this.apiVersion, v1alpha1SqlJob.apiVersion) && + Objects.equals(this.kind, v1alpha1SqlJob.kind) && + Objects.equals(this.metadata, v1alpha1SqlJob.metadata) && + Objects.equals(this.spec, v1alpha1SqlJob.spec) && + Objects.equals(this.status, v1alpha1SqlJob.status); + } + + @Override + public int hashCode() { + return Objects.hash(apiVersion, kind, metadata, spec, status); + } + + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("class V1alpha1SqlJob {\n"); + sb.append(" apiVersion: ").append(toIndentedString(apiVersion)).append("\n"); + sb.append(" kind: ").append(toIndentedString(kind)).append("\n"); + sb.append(" metadata: ").append(toIndentedString(metadata)).append("\n"); + sb.append(" spec: ").append(toIndentedString(spec)).append("\n"); + sb.append(" status: ").append(toIndentedString(status)).append("\n"); + sb.append("}"); + return sb.toString(); + } + + /** + * Convert the given object to string with each line indented by 4 spaces + * (except the first line). + */ + private String toIndentedString(Object o) { + if (o == null) { + return "null"; + } + return o.toString().replace("\n", "\n "); + } + +} + diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SqlJobList.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SqlJobList.java new file mode 100644 index 0000000..1d2cb56 --- /dev/null +++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SqlJobList.java @@ -0,0 +1,195 @@ +/* + * Kubernetes + * No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator) + * + * The version of the OpenAPI document: v1.21.1 + * + * + * NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech). + * https://openapi-generator.tech + * Do not edit the class manually. + */ + + +package com.linkedin.hoptimator.models; + +import java.util.Objects; +import java.util.Arrays; +import com.google.gson.TypeAdapter; +import com.google.gson.annotations.JsonAdapter; +import com.google.gson.annotations.SerializedName; +import com.google.gson.stream.JsonReader; +import com.google.gson.stream.JsonWriter; +import com.linkedin.hoptimator.models.V1alpha1SqlJob; +import io.kubernetes.client.openapi.models.V1ListMeta; +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * SqlJobList is a list of SqlJob + */ +@ApiModel(description = "SqlJobList is a list of SqlJob") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-07-26T20:21:01.735Z[Etc/UTC]") +public class V1alpha1SqlJobList implements io.kubernetes.client.common.KubernetesListObject { + public static final String SERIALIZED_NAME_API_VERSION = "apiVersion"; + @SerializedName(SERIALIZED_NAME_API_VERSION) + private String apiVersion; + + public static final String SERIALIZED_NAME_ITEMS = "items"; + @SerializedName(SERIALIZED_NAME_ITEMS) + private List items = new ArrayList<>(); + + public static final String SERIALIZED_NAME_KIND = "kind"; + @SerializedName(SERIALIZED_NAME_KIND) + private String kind; + + public static final String SERIALIZED_NAME_METADATA = "metadata"; + @SerializedName(SERIALIZED_NAME_METADATA) + private V1ListMeta metadata = null; + + + public V1alpha1SqlJobList apiVersion(String apiVersion) { + + this.apiVersion = apiVersion; + return this; + } + + /** + * APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources + * @return apiVersion + **/ + @javax.annotation.Nullable + @ApiModelProperty(value = "APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources") + + public String getApiVersion() { + return apiVersion; + } + + + public void setApiVersion(String apiVersion) { + this.apiVersion = apiVersion; + } + + + public V1alpha1SqlJobList items(List items) { + + this.items = items; + return this; + } + + public V1alpha1SqlJobList addItemsItem(V1alpha1SqlJob itemsItem) { + this.items.add(itemsItem); + return this; + } + + /** + * List of sqljobs. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md + * @return items + **/ + @ApiModelProperty(required = true, value = "List of sqljobs. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md") + + public List getItems() { + return items; + } + + + public void setItems(List items) { + this.items = items; + } + + + public V1alpha1SqlJobList kind(String kind) { + + this.kind = kind; + return this; + } + + /** + * Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds + * @return kind + **/ + @javax.annotation.Nullable + @ApiModelProperty(value = "Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds") + + public String getKind() { + return kind; + } + + + public void setKind(String kind) { + this.kind = kind; + } + + + public V1alpha1SqlJobList metadata(V1ListMeta metadata) { + + this.metadata = metadata; + return this; + } + + /** + * Get metadata + * @return metadata + **/ + @javax.annotation.Nullable + @ApiModelProperty(value = "") + + public V1ListMeta getMetadata() { + return metadata; + } + + + public void setMetadata(V1ListMeta metadata) { + this.metadata = metadata; + } + + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + V1alpha1SqlJobList v1alpha1SqlJobList = (V1alpha1SqlJobList) o; + return Objects.equals(this.apiVersion, v1alpha1SqlJobList.apiVersion) && + Objects.equals(this.items, v1alpha1SqlJobList.items) && + Objects.equals(this.kind, v1alpha1SqlJobList.kind) && + Objects.equals(this.metadata, v1alpha1SqlJobList.metadata); + } + + @Override + public int hashCode() { + return Objects.hash(apiVersion, items, kind, metadata); + } + + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("class V1alpha1SqlJobList {\n"); + sb.append(" apiVersion: ").append(toIndentedString(apiVersion)).append("\n"); + sb.append(" items: ").append(toIndentedString(items)).append("\n"); + sb.append(" kind: ").append(toIndentedString(kind)).append("\n"); + sb.append(" metadata: ").append(toIndentedString(metadata)).append("\n"); + sb.append("}"); + return sb.toString(); + } + + /** + * Convert the given object to string with each line indented by 4 spaces + * (except the first line). + */ + private String toIndentedString(Object o) { + if (o == null) { + return "null"; + } + return o.toString().replace("\n", "\n "); + } + +} + diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SqlJobSpec.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SqlJobSpec.java new file mode 100644 index 0000000..40192a8 --- /dev/null +++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SqlJobSpec.java @@ -0,0 +1,256 @@ +/* + * Kubernetes + * No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator) + * + * The version of the OpenAPI document: v1.21.1 + * + * + * NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech). + * https://openapi-generator.tech + * Do not edit the class manually. + */ + + +package com.linkedin.hoptimator.models; + +import java.util.Objects; +import java.util.Arrays; +import com.google.gson.TypeAdapter; +import com.google.gson.annotations.JsonAdapter; +import com.google.gson.annotations.SerializedName; +import com.google.gson.stream.JsonReader; +import com.google.gson.stream.JsonWriter; +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * SQL job spec + */ +@ApiModel(description = "SQL job spec") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-07-26T20:21:01.735Z[Etc/UTC]") +public class V1alpha1SqlJobSpec { + /** + * Flink, etc. + */ + @JsonAdapter(DialectEnum.Adapter.class) + public enum DialectEnum { + FLINK("Flink"); + + private String value; + + DialectEnum(String value) { + this.value = value; + } + + public String getValue() { + return value; + } + + @Override + public String toString() { + return String.valueOf(value); + } + + public static DialectEnum fromValue(String value) { + for (DialectEnum b : DialectEnum.values()) { + if (b.value.equals(value)) { + return b; + } + } + throw new IllegalArgumentException("Unexpected value '" + value + "'"); + } + + public static class Adapter extends TypeAdapter { + @Override + public void write(final JsonWriter jsonWriter, final DialectEnum enumeration) throws IOException { + jsonWriter.value(enumeration.getValue()); + } + + @Override + public DialectEnum read(final JsonReader jsonReader) throws IOException { + String value = jsonReader.nextString(); + return DialectEnum.fromValue(value); + } + } + } + + public static final String SERIALIZED_NAME_DIALECT = "dialect"; + @SerializedName(SERIALIZED_NAME_DIALECT) + private DialectEnum dialect; + + /** + * Streaming or Batch. + */ + @JsonAdapter(ExecutionModeEnum.Adapter.class) + public enum ExecutionModeEnum { + STREAMING("Streaming"), + + BATCH("Batch"); + + private String value; + + ExecutionModeEnum(String value) { + this.value = value; + } + + public String getValue() { + return value; + } + + @Override + public String toString() { + return String.valueOf(value); + } + + public static ExecutionModeEnum fromValue(String value) { + for (ExecutionModeEnum b : ExecutionModeEnum.values()) { + if (b.value.equals(value)) { + return b; + } + } + throw new IllegalArgumentException("Unexpected value '" + value + "'"); + } + + public static class Adapter extends TypeAdapter { + @Override + public void write(final JsonWriter jsonWriter, final ExecutionModeEnum enumeration) throws IOException { + jsonWriter.value(enumeration.getValue()); + } + + @Override + public ExecutionModeEnum read(final JsonReader jsonReader) throws IOException { + String value = jsonReader.nextString(); + return ExecutionModeEnum.fromValue(value); + } + } + } + + public static final String SERIALIZED_NAME_EXECUTION_MODE = "executionMode"; + @SerializedName(SERIALIZED_NAME_EXECUTION_MODE) + private ExecutionModeEnum executionMode; + + public static final String SERIALIZED_NAME_SQL = "sql"; + @SerializedName(SERIALIZED_NAME_SQL) + private List sql = new ArrayList<>(); + + + public V1alpha1SqlJobSpec dialect(DialectEnum dialect) { + + this.dialect = dialect; + return this; + } + + /** + * Flink, etc. + * @return dialect + **/ + @javax.annotation.Nullable + @ApiModelProperty(value = "Flink, etc.") + + public DialectEnum getDialect() { + return dialect; + } + + + public void setDialect(DialectEnum dialect) { + this.dialect = dialect; + } + + + public V1alpha1SqlJobSpec executionMode(ExecutionModeEnum executionMode) { + + this.executionMode = executionMode; + return this; + } + + /** + * Streaming or Batch. + * @return executionMode + **/ + @javax.annotation.Nullable + @ApiModelProperty(value = "Streaming or Batch.") + + public ExecutionModeEnum getExecutionMode() { + return executionMode; + } + + + public void setExecutionMode(ExecutionModeEnum executionMode) { + this.executionMode = executionMode; + } + + + public V1alpha1SqlJobSpec sql(List sql) { + + this.sql = sql; + return this; + } + + public V1alpha1SqlJobSpec addSqlItem(String sqlItem) { + this.sql.add(sqlItem); + return this; + } + + /** + * SQL script the job should run. + * @return sql + **/ + @ApiModelProperty(required = true, value = "SQL script the job should run.") + + public List getSql() { + return sql; + } + + + public void setSql(List sql) { + this.sql = sql; + } + + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + V1alpha1SqlJobSpec v1alpha1SqlJobSpec = (V1alpha1SqlJobSpec) o; + return Objects.equals(this.dialect, v1alpha1SqlJobSpec.dialect) && + Objects.equals(this.executionMode, v1alpha1SqlJobSpec.executionMode) && + Objects.equals(this.sql, v1alpha1SqlJobSpec.sql); + } + + @Override + public int hashCode() { + return Objects.hash(dialect, executionMode, sql); + } + + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("class V1alpha1SqlJobSpec {\n"); + sb.append(" dialect: ").append(toIndentedString(dialect)).append("\n"); + sb.append(" executionMode: ").append(toIndentedString(executionMode)).append("\n"); + sb.append(" sql: ").append(toIndentedString(sql)).append("\n"); + sb.append("}"); + return sb.toString(); + } + + /** + * Convert the given object to string with each line indented by 4 spaces + * (except the first line). + */ + private String toIndentedString(Object o) { + if (o == null) { + return "null"; + } + return o.toString().replace("\n", "\n "); + } + +} + diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SqlJobStatus.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SqlJobStatus.java new file mode 100644 index 0000000..54a2c01 --- /dev/null +++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SqlJobStatus.java @@ -0,0 +1,187 @@ +/* + * Kubernetes + * No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator) + * + * The version of the OpenAPI document: v1.21.1 + * + * + * NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech). + * https://openapi-generator.tech + * Do not edit the class manually. + */ + + +package com.linkedin.hoptimator.models; + +import java.util.Objects; +import java.util.Arrays; +import com.google.gson.TypeAdapter; +import com.google.gson.annotations.JsonAdapter; +import com.google.gson.annotations.SerializedName; +import com.google.gson.stream.JsonReader; +import com.google.gson.stream.JsonWriter; +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import java.io.IOException; + +/** + * Filled in by the operator. + */ +@ApiModel(description = "Filled in by the operator.") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-07-26T20:21:01.735Z[Etc/UTC]") +public class V1alpha1SqlJobStatus { + public static final String SERIALIZED_NAME_FAILED = "failed"; + @SerializedName(SERIALIZED_NAME_FAILED) + private Boolean failed; + + public static final String SERIALIZED_NAME_MESSAGE = "message"; + @SerializedName(SERIALIZED_NAME_MESSAGE) + private String message; + + public static final String SERIALIZED_NAME_READY = "ready"; + @SerializedName(SERIALIZED_NAME_READY) + private Boolean ready; + + public static final String SERIALIZED_NAME_SQL = "sql"; + @SerializedName(SERIALIZED_NAME_SQL) + private String sql; + + + public V1alpha1SqlJobStatus failed(Boolean failed) { + + this.failed = failed; + return this; + } + + /** + * Whether the SqlJob has failed. + * @return failed + **/ + @javax.annotation.Nullable + @ApiModelProperty(value = "Whether the SqlJob has failed.") + + public Boolean getFailed() { + return failed; + } + + + public void setFailed(Boolean failed) { + this.failed = failed; + } + + + public V1alpha1SqlJobStatus message(String message) { + + this.message = message; + return this; + } + + /** + * Error or success message, for information only. + * @return message + **/ + @javax.annotation.Nullable + @ApiModelProperty(value = "Error or success message, for information only.") + + public String getMessage() { + return message; + } + + + public void setMessage(String message) { + this.message = message; + } + + + public V1alpha1SqlJobStatus ready(Boolean ready) { + + this.ready = ready; + return this; + } + + /** + * Whether the SqlJob is running or completed. + * @return ready + **/ + @javax.annotation.Nullable + @ApiModelProperty(value = "Whether the SqlJob is running or completed.") + + public Boolean getReady() { + return ready; + } + + + public void setReady(Boolean ready) { + this.ready = ready; + } + + + public V1alpha1SqlJobStatus sql(String sql) { + + this.sql = sql; + return this; + } + + /** + * The SQL being implemented by this SqlJob. + * @return sql + **/ + @javax.annotation.Nullable + @ApiModelProperty(value = "The SQL being implemented by this SqlJob.") + + public String getSql() { + return sql; + } + + + public void setSql(String sql) { + this.sql = sql; + } + + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + V1alpha1SqlJobStatus v1alpha1SqlJobStatus = (V1alpha1SqlJobStatus) o; + return Objects.equals(this.failed, v1alpha1SqlJobStatus.failed) && + Objects.equals(this.message, v1alpha1SqlJobStatus.message) && + Objects.equals(this.ready, v1alpha1SqlJobStatus.ready) && + Objects.equals(this.sql, v1alpha1SqlJobStatus.sql); + } + + @Override + public int hashCode() { + return Objects.hash(failed, message, ready, sql); + } + + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("class V1alpha1SqlJobStatus {\n"); + sb.append(" failed: ").append(toIndentedString(failed)).append("\n"); + sb.append(" message: ").append(toIndentedString(message)).append("\n"); + sb.append(" ready: ").append(toIndentedString(ready)).append("\n"); + sb.append(" sql: ").append(toIndentedString(sql)).append("\n"); + sb.append("}"); + return sb.toString(); + } + + /** + * Convert the given object to string with each line indented by 4 spaces + * (except the first line). + */ + private String toIndentedString(Object o) { + if (o == null) { + return "null"; + } + return o.toString().replace("\n", "\n "); + } + +} + diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1Subscription.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1Subscription.java index 4da7849..57f1888 100644 --- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1Subscription.java +++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1Subscription.java @@ -31,7 +31,7 @@ * Hoptimator Subscription */ @ApiModel(description = "Hoptimator Subscription") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-09T06:53:38.470Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-07-26T20:21:01.735Z[Etc/UTC]") public class V1alpha1Subscription implements io.kubernetes.client.common.KubernetesObject { public static final String SERIALIZED_NAME_API_VERSION = "apiVersion"; @SerializedName(SERIALIZED_NAME_API_VERSION) diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionList.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionList.java index 2d5f0eb..432ab25 100644 --- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionList.java +++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionList.java @@ -32,7 +32,7 @@ * SubscriptionList is a list of Subscription */ @ApiModel(description = "SubscriptionList is a list of Subscription") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-09T06:53:38.470Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-07-26T20:21:01.735Z[Etc/UTC]") public class V1alpha1SubscriptionList implements io.kubernetes.client.common.KubernetesListObject { public static final String SERIALIZED_NAME_API_VERSION = "apiVersion"; @SerializedName(SERIALIZED_NAME_API_VERSION) diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionSpec.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionSpec.java index d55e49f..451480d 100644 --- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionSpec.java +++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionSpec.java @@ -31,7 +31,7 @@ * Subscription spec */ @ApiModel(description = "Subscription spec") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-09T06:53:38.470Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-07-26T20:21:01.735Z[Etc/UTC]") public class V1alpha1SubscriptionSpec { public static final String SERIALIZED_NAME_DATABASE = "database"; @SerializedName(SERIALIZED_NAME_DATABASE) diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionStatus.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionStatus.java index 8ed4ef4..b06e0ee 100644 --- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionStatus.java +++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionStatus.java @@ -32,7 +32,7 @@ * Filled in by the operator. */ @ApiModel(description = "Filled in by the operator.") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-09T06:53:38.470Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-07-26T20:21:01.735Z[Etc/UTC]") public class V1alpha1SubscriptionStatus { public static final String SERIALIZED_NAME_ATTRIBUTES = "attributes"; @SerializedName(SERIALIZED_NAME_ATTRIBUTES) diff --git a/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/Operator.java b/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/Operator.java index 27fbd96..ae73927 100644 --- a/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/Operator.java +++ b/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/Operator.java @@ -1,6 +1,8 @@ package com.linkedin.hoptimator.operator; import io.kubernetes.client.openapi.ApiClient; +import io.kubernetes.client.openapi.ApiException; +import io.kubernetes.client.openapi.models.V1OwnerReference; import io.kubernetes.client.informer.SharedInformerFactory; import io.kubernetes.client.informer.SharedIndexInformer; import io.kubernetes.client.informer.cache.Indexer; @@ -134,6 +136,150 @@ public Duration resyncPeriod() { return Duration.ofMinutes(10); } + public void apply(String yaml, KubernetesObject owner) throws ApiException { + V1OwnerReference ownerReference = new V1OwnerReference() + .kind(owner.getKind()) + .name(owner.getMetadata().getName()) + .apiVersion(owner.getApiVersion()) + .uid(owner.getMetadata().getUid()); + DynamicKubernetesObject obj = Dynamics.newFromYaml(yaml); + if (obj.getMetadata().getNamespace() == null) { + obj.getMetadata().setNamespace(owner.getMetadata().getNamespace()); + } + if (obj.getMetadata().getName() == null) { + throw new IllegalArgumentException("Object has no name."); + } + String namespace = obj.getMetadata().getNamespace(); + String name = obj.getMetadata().getName(); + KubernetesApiResponse existing = apiFor(obj).get(namespace, name); + if (existing.isSuccess()) { + String resourceVersion = existing.getObject().getMetadata().getResourceVersion(); + log.info("Updating existing downstream resource {}/{} {} as \n{}", + namespace, name, resourceVersion, yaml); + List owners = existing.getObject().getMetadata().getOwnerReferences(); + if (owners == null) { + owners = new ArrayList<>(); + } + if (owners.stream().anyMatch(x -> x.getUid().equals(ownerReference.getUid()))) { + log.info("Existing downstream resource {}/{} is already owned by {}/{}.", + namespace, name, ownerReference.getKind(), ownerReference.getName()); + } else { + log.info("Existing downstream resource {}/{} will be owned by {}/{} and {} others.", + namespace, name, ownerReference.getKind(), ownerReference.getName(), owners.size()); + owners.add(ownerReference); + } + obj.setMetadata(obj.getMetadata().ownerReferences(owners).resourceVersion(resourceVersion)); + apiFor(obj).update(obj) + .onFailure((x, y) -> log.error("Error updating downstream resource {}/{}: {}.", namespace, name, y.getMessage())) + .throwsApiException(); + } else { + log.info("Creating downstream resource {}/{} as \n{}", namespace, name, yaml); + obj.setMetadata(obj.getMetadata().addOwnerReferencesItem(ownerReference)); + apiFor(obj).create(obj) + .onFailure((x, y) -> log.error("Error creating downstream resource {}/{}: {}.", namespace, name, y.getMessage())) + .throwsApiException(); + } + } + + public boolean isReady(String yaml) { + DynamicKubernetesObject obj = Dynamics.newFromYaml(yaml); + String namespace = obj.getMetadata().getNamespace(); + String name = obj.getMetadata().getName(); + String kind = obj.getKind(); + try { + KubernetesApiResponse existing = apiFor(obj).get(namespace, name); + existing.onFailure((code, status) -> log.warn("Failed to fetch {}/{}: {}.", kind, name, status.getMessage())); + if (!existing.isSuccess()) { + return false; + } + if (isReady(existing.getObject())) { + log.info("{}/{} is ready.", kind, name); + return true; + } else { + log.info("{}/{} is NOT ready.", kind, name); + return false; + } + } catch (Exception e) { + return false; + } + } + + public static boolean isReady(DynamicKubernetesObject obj) { + // We make a best effort to guess the status of the dynamic object. By default, it's ready. + if (obj == null || obj.getRaw() == null) { + return false; + } + try { + return obj.getRaw().get("status").getAsJsonObject().get("ready").getAsBoolean(); + } catch (Exception e) { + log.debug("Exception looking for .status.ready. Swallowing.", e); + } + try { + return obj.getRaw().get("status").getAsJsonObject().get("state").getAsString() + .matches("(?i)READY|RUNNING|FINISHED"); + } catch (Exception e) { + log.debug("Exception looking for .status.state. Swallowing.", e); + } + try { + return obj.getRaw().get("status").getAsJsonObject().get("jobStatus").getAsJsonObject() + .get("state").getAsString().matches("(?i)READY|RUNNING|FINISHED"); + } catch (Exception e) { + log.debug("Exception looking for .status.jobStatus.state. Swallowing.", e); + } + // TODO: Look for common Conditions + log.warn("Resource {}/{}/{} considered ready by default.", obj.getMetadata().getNamespace(), + obj.getKind(), obj.getMetadata().getName()); + return true; + } + + public boolean isFailed(String yaml) { + DynamicKubernetesObject obj = Dynamics.newFromYaml(yaml); + String namespace = obj.getMetadata().getNamespace(); + String name = obj.getMetadata().getName(); + String kind = obj.getKind(); + try { + KubernetesApiResponse existing = apiFor(obj).get(namespace, name); + existing.onFailure((code, status) -> log.warn("Failed to fetch {}/{}: {}.", kind, name, status.getMessage())); + if (!existing.isSuccess()) { + return false; + } + if (isFailed(existing.getObject())) { + log.info("{}/{} is FAILED.", kind, name); + return true; + } else { + return false; + } + } catch (Exception e) { + return false; + } + } + + public static boolean isFailed(DynamicKubernetesObject obj) { + // We make a best effort to guess the status of the dynamic object. By default, it's not failed. + if (obj == null || obj.getRaw() == null) { + return false; + } + try { + return obj.getRaw().get("status").getAsJsonObject().get("failed").getAsBoolean(); + } catch (Exception e) { + log.debug("Exception looking for .status.ready. Swallowing.", e); + } + try { + return obj.getRaw().get("status").getAsJsonObject().get("state").getAsString() + .matches("(?i)FAILED|ERROR"); + } catch (Exception e) { + log.debug("Exception looking for .status.state. Swallowing.", e); + } + try { + return obj.getRaw().get("status").getAsJsonObject().get("jobStatus").getAsJsonObject() + .get("state").getAsString().matches("(?i)FAILED|ERROR"); + } catch (Exception e) { + log.debug("Exception looking for .status.jobStatus.state. Swallowing.", e); + } + // TODO: Look for common Conditions + return false; + } + public static class ApiInfo { private final String kind; private final String singular; diff --git a/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/subscription/SubscriptionReconciler.java b/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/subscription/SubscriptionReconciler.java index 1131fc1..7f54cae 100644 --- a/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/subscription/SubscriptionReconciler.java +++ b/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/subscription/SubscriptionReconciler.java @@ -11,6 +11,7 @@ import com.linkedin.hoptimator.planner.Pipeline; import com.linkedin.hoptimator.planner.PipelineRel; +import io.kubernetes.client.common.KubernetesObject; import io.kubernetes.client.extended.controller.Controller; import io.kubernetes.client.extended.controller.builder.ControllerBuilder; import io.kubernetes.client.extended.controller.reconciler.Reconciler; @@ -161,7 +162,7 @@ public Result reconcile(Request request) { } else if (status.getReady() == null && status.getResources() != null) { // Phase 2 log.info("Deploying pipeline for {}/{}...", kind, name); - + boolean deployed = status.getResources().stream() .allMatch(x -> apply(x, object)); @@ -176,7 +177,7 @@ public Result reconcile(Request request) { log.info("Checking status of pipeline for {}/{}...", kind, name); boolean ready = status.getResources().stream() - .allMatch(x -> checkStatus(x)); + .allMatch(x -> operator.isReady(x)); if (ready) { status.setReady(true); @@ -206,6 +207,15 @@ public Result reconcile(Request request) { return result; } + private boolean apply(String yaml, KubernetesObject owner) { + try { + operator.apply(yaml, owner); + } catch (Exception e) { + return false; + } + return true; + } + private Pipeline pipeline(V1alpha1Subscription object) throws Exception { String name = object.getMetadata().getName(); String sql = object.getSpec().getSql(); @@ -222,106 +232,6 @@ private Pipeline pipeline(V1alpha1Subscription object) throws Exception { return impl.pipeline(sink); } - private boolean apply(String yaml, V1alpha1Subscription owner) { - V1OwnerReference ownerReference = new V1OwnerReference(); - ownerReference.kind(owner.getKind()); - ownerReference.name(owner.getMetadata().getName()); - ownerReference.apiVersion(owner.getApiVersion()); - ownerReference.uid(owner.getMetadata().getUid()); - - DynamicKubernetesObject obj = Dynamics.newFromYaml(yaml); - String namespace = obj.getMetadata().getNamespace(); - if (namespace == null) { - namespace = owner.getMetadata().getNamespace(); - obj.getMetadata().setNamespace(namespace); - } - String name = obj.getMetadata().getName(); - KubernetesApiResponse existing = operator.apiFor(obj).get(namespace, name); - if (existing.isSuccess()) { - String resourceVersion = existing.getObject().getMetadata().getResourceVersion(); - log.info("Updating existing downstream resource {}/{} {} as \n{}", - namespace, name, resourceVersion, yaml); - List owners = existing.getObject().getMetadata().getOwnerReferences(); - if (owners == null) { - owners = new ArrayList<>(); - } - if (owners.stream().anyMatch(x -> x.getUid().equals(ownerReference.getUid()))) { - log.info("Existing downstream resource {}/{} is already owned by {}/{}.", - namespace, name, ownerReference.getKind(), ownerReference.getName()); - } else { - log.info("Existing downstream resource {}/{} will be owned by {}/{} and {} others.", - namespace, name, ownerReference.getKind(), ownerReference.getName(), owners.size()); - owners.add(ownerReference); - } - obj.setMetadata(obj.getMetadata().ownerReferences(owners).resourceVersion(resourceVersion)); - KubernetesApiResponse response = operator.apiFor(obj).update(obj); - if (!response.isSuccess()) { - log.error("Error updating downstream resource {}/{}: {}.", namespace, name, response.getStatus().getMessage()); - return false; - } - } else { - log.info("Creating downstream resource {}/{} as \n{}", namespace, name, yaml); - obj.setMetadata(obj.getMetadata().addOwnerReferencesItem(ownerReference)); - KubernetesApiResponse response = operator.apiFor(obj).create(obj); - if (!response.isSuccess()) { - log.error("Error creating downstream resource {}/{}: {}.", namespace, name, response.getStatus().getMessage()); - return false; - } - } - return true; - } - - private boolean checkStatus(String yaml) { - DynamicKubernetesObject obj = Dynamics.newFromYaml(yaml); - String namespace = obj.getMetadata().getNamespace(); - String name = obj.getMetadata().getName(); - String kind = obj.getKind(); - try { - KubernetesApiResponse existing = operator.apiFor(obj).get(namespace, name); - existing.onFailure((code, status) -> log.warn("Failed to fetch {}/{}: {}.", kind, name, status.getMessage())); - if (!existing.isSuccess()) { - return false; - } - if (isReady(existing.getObject())) { - log.info("{}/{} is ready.", kind, name); - return true; - } else { - log.info("{}/{} is NOT ready.", kind, name); - return false; - } - } catch (Exception e) { - return false; - } - } - - private static boolean isReady(DynamicKubernetesObject obj) { - // We make a best effort to guess the status of the dynamic object. By default, it's ready. - if (obj == null || obj.getRaw() == null) { - return false; - } - try { - return obj.getRaw().get("status").getAsJsonObject().get("ready").getAsBoolean(); - } catch (Exception e) { - log.debug("Exception looking for .status.ready. Swallowing.", e); - } - try { - return obj.getRaw().get("status").getAsJsonObject().get("state").getAsString() - .matches("(?i)READY|RUNNING|FINISHED"); - } catch (Exception e) { - log.debug("Exception looking for .status.state. Swallowing.", e); - } - try { - return obj.getRaw().get("status").getAsJsonObject().get("jobStatus").getAsJsonObject() - .get("state").getAsString().matches("(?i)READY|RUNNING|FINISHED"); - } catch (Exception e) { - log.debug("Exception looking for .status.jobStatus.state. Swallowing.", e); - } - // TODO: Look for common Conditions - log.warn("Resource {}/{}/{} considered ready by default.", obj.getMetadata().getNamespace(), - obj.getKind(), obj.getMetadata().getName()); - return true; - } - // Whether status has diverged from spec (i.e. we need to re-plan the pipeline) private static boolean diverged(V1alpha1SubscriptionSpec spec, V1alpha1SubscriptionStatus status) { return status.getSql() == null || !status.getSql().equals(spec.getSql())