Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HWORKS-681] Flink job apis for hopsworks python library #160

Closed
wants to merge 16 commits into from

Conversation

davitbzh
Copy link
Contributor

No description provided.

@davitbzh davitbzh requested a review from robzor92 August 13, 2023 20:42
self._project_name = project_name
self._job_api = job_api.JobsApi(project_id, project_name)

def get_configuration(self):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing docs

def get_configuration(self):
return self._job_api.get_configuration("FLINK")

def setup_cluster(self, name, config=None):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing docs

return self._job_api.get_configuration("FLINK")

def setup_cluster(self, name, config=None):
if self._job_api.exists(name):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should also validate that the retrieved job is a FLINK job

python/hopsworks/core/flink_cluster_api.py Outdated Show resolved Hide resolved
python/hopsworks/core/flink_cluster_api.py Outdated Show resolved Hide resolved
self._project_name,
)

def get_jobs(self, execution):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing docs

polling_time = 0
while polling_time < start_time_out:
execution = self._execution_api._get(self, execution.id)
if execution.state == "RUNNING":
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should have an else statement here how handle if the job failed while starting up. Otherwise the user will wait 10 minutes and then get the exception thrown on line 43.

@davitbzh davitbzh requested a review from robzor92 August 22, 2023 08:35
)
return response["files"]

def upload_jar(self, execution, jar_file):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we can now do FlinkExecution.upload_jar, can we make this and any other function being called from FlinkCluster/FlinkExecution private?

python/hopsworks/core/flink_cluster_api.py Outdated Show resolved Hide resolved
@davitbzh davitbzh requested a review from robzor92 August 22, 2023 21:36
python/hopsworks/flink_execution.py Outdated Show resolved Hide resolved
flink_cluster = flink_cluster_api.get_cluster(name="myFlinkCluster")

# get all executions(This will return empty list of no execution is running on this Flink cluster)
executions = flink_job.get_executions()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this snippet works, flink_job is not defined

flink_cluster = flink_cluster_api.get_cluster(name="myFlinkCluster")

# get all executions(This will return empty list of no execution is running on this Flink cluster)
executions = flink_job.get_executions()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same problem, flink_job not defined

flink_cluster = flink_cluster_api.get_cluster(name="myFlinkCluster")

# get all executions(This will return empty list of no execution is running on this Flink cluster)
executions = flink_job.get_executions()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flink_job not defined

flink_cluster = flink_cluster_api.get_cluster(name="myFlinkCluster")

# get all executions(This will return empty list of no execution is running on this Flink cluster)
executions = flink_job.get_executions()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flink_job not defined

flink_cluster = flink_cluster_api.get_cluster(name="myFlinkCluster")

# get all executions(This will return empty list of no execution is running on this Flink cluster)
executions = flink_job.get_executions()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flink_job not defined

flink_cluster = flink_cluster_api.get_cluster(name="myFlinkCluster")

# get all executions(This will return empty list of no execution is running on this Flink cluster)
executions = flink_job.get_executions()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flink_job not defined

flink_cluster = flink_cluster_api.get_cluster(name="myFlinkCluster")

# get all executions(This will return empty list of no execution is running on this Flink cluster)
executions = flink_job.get_executions()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flink_job not defined

flink_cluster = flink_cluster_api.get_cluster(name="myFlinkCluster")

# get all executions(This will return empty list of no execution is running on this Flink cluster)
executions = flink_job.get_executions()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flink_job not defined


return self._flink_cluster_api._get_job(self, job_id)

def stop_execution(self):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function does the same thing as stop()? I think we should just keep the stop() function and remove the stop_execution() one

Copy link
Collaborator

@robzor92 robzor92 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to update this file to include the documentation for Flink:

https://github.com/logicalclocks/hopsworks-api/blob/main/auto_doc.py

robzor92

This comment was marked as outdated.

Copy link
Collaborator

@robzor92 robzor92 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need change the mkdocs.yml file to include the documentation. For example for jobs we do it here

- Jobs: generated/api/jobs.md

@robzor92 robzor92 closed this Sep 7, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants