Skip to content

Commit

Permalink
Merge branch 'devel' of github.com:alinelena/dvc_x into devel
Browse files Browse the repository at this point in the history
  • Loading branch information
paskino committed Mar 31, 2021
2 parents 7f80cb5 + ecdde8a commit 07d167b
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 78 deletions.
7 changes: 5 additions & 2 deletions dvc_x/dvc_x.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,11 @@ def local_home_dir(self):

return os.path.expanduser("~")

def get_file(self,filename):
self.sftp.get(filename, filename)
def get_file(self,filename, localdir=None):
if localdir is None:
self.sftp.get(filename, filename)
else:
self.sftp.get(filename, os.path.join(os.path.abspath(localdir), filename))

def remove_file(self,filename):
self.sftp.remove(filename)
Expand Down
24 changes: 19 additions & 5 deletions dvc_x/ui/RemoteFileDialog.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,10 @@ def loadIntoTableWidget(self, data):
self.tableWidget.setColumnCount(len(data[0]))
for i, v in enumerate(data):
for j, w in enumerate(v):
self.tableWidget.setItem(i, j, QtWidgets.QTableWidgetItem(str(w)))
item = QtWidgets.QTableWidgetItem(str(w))
if j == 1:
item.setToolTip(str(w))
self.tableWidget.setItem(i, j, item)

self.tableWidget.setHorizontalHeaderLabels(['Type', 'Name'])

Expand All @@ -277,8 +280,11 @@ def updateTypeColumn(self):
username = self.conn.username
private_key = self.conn.private_key
remote_os = self.conn.remote_os
worker = Worker(self.asyncStatRemotePath, path, i, self.tableWidget,
host, port, username, private_key, remote_os)
worker = Worker(self.asyncStatRemotePath, path=path, row=i,
table=self.tableWidget, host=host, port=port,
username=username, private_key=private_key,
remote_os=remote_os
)
self.threadpool.start(worker)

def isFile(self, path):
Expand Down Expand Up @@ -320,10 +326,18 @@ def isDir(self, path):
msg.exec()
return

def asyncStatRemotePath(self, path, row, table, host, port, username, private_key, remote_os,
progress_callback, message_callback):
def asyncStatRemotePath(self, **kwargs):#path, row, table, host, port, username, private_key, remote_os):
'''asynchronously stat remote files for their type and adds info to the tablewidget'''
# TODO logfile should be created and deleted
path = kwargs.get('path')
row = kwargs.get('row')
table = kwargs.get('table')
host = kwargs.get('host')
port = kwargs.get('port')
username = kwargs.get('username')
private_key = kwargs.get('private_key')
remote_os = kwargs.get('remote_os')

logfile = 'logfile.log'
conn = drx.DVCRem(logfile=logfile,
port=port, host=host, username=username,
Expand Down
188 changes: 117 additions & 71 deletions tests/run_dvc_gui.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,19 @@ def openRunRemoteDialog(self):

logfile = os.path.join(os.getcwd(), "RemoteFileDialog.log")
# logfile = os.path.abspath("C:/Users/ofn77899/Documents/Projects/CCPi/GitHub/PythonWorkRemote/dvc_x/RemoteFileDialogue.log")
dialogue = DVCRunDialog( parent=self,
if self.run_control.job_id is not None:

dialogue = DVCSLURMProgressDialog( parent=self,
title="Run Remote Monitor",
connection_details=self.connection_details
connection_details=self.connection_details,
job_ids = [self.run_control.job_id]
)
dialogue.Ok.clicked.connect(dialogue.close)
dialogue.Ok.clicked.connect(dialogue.close)

dialogue.exec()
dialogue.exec()
else:
pass

def runRemote(self):
if hasattr(self, 'connection_details'):
username = self.connection_details['username']
Expand All @@ -123,13 +129,17 @@ def runRemote(self):
else:
print ("No connection details")

def cancel_job(self, job_id):
pass


def updateStatusBar(self, status):
if isinstance(status, str):
msg = status
elif isinstance(status, tuple):
msg = "Job {}: {}".format(*status)
if status[1] in ['PENDING', 'RUNNING', 'FINISHED']:
self.job_id = status[0]
else:
msg = 'Some update, type status {}'.format(type(status))
self.statusBar().showMessage(msg)
Expand All @@ -138,9 +148,14 @@ def processFinished(self):
msg = ( self.run_control.job_id, "FINISHED" )
self.updateStatusBar(msg)

class DVCRunDialog(QtWidgets.QDialog):
def __init__(self, parent, title, connection_details):
class DVCSLURMProgressDialog(QtWidgets.QDialog):
def __init__(self, parent, title, connection_details, job_ids):
QtWidgets.QDialog.__init__(self, parent)

self._job_ids = None
self.job_ids = job_ids
self.parent_app = parent

bb = QtWidgets.QDialogButtonBox(QtWidgets.QDialogButtonBox.Ok
| QtWidgets.QDialogButtonBox.Apply
| QtWidgets.QDialogButtonBox.Abort)
Expand All @@ -153,10 +168,15 @@ def __init__(self, parent, title, connection_details):
lambda: self.cancel_job()
)
bb.button(QtWidgets.QDialogButtonBox.Abort).setText("Cancel Job")
# let's use this button to connect to a running job
bb.button(QtWidgets.QDialogButtonBox.Apply).clicked.connect(
lambda: self.run_job()
lambda: self.monitor_job()
)
bb.button(QtWidgets.QDialogButtonBox.Apply).setText("Submit Job")
# bb.button(QtWidgets.QDialogButtonBox.Apply).clicked.connect(
# lambda: self.run_job()
# )

bb.button(QtWidgets.QDialogButtonBox.Apply).setText("Monitor Job")


# select logfile
Expand All @@ -165,8 +185,18 @@ def __init__(self, parent, title, connection_details):
# self.addWidget()

# add widgets
jobidl = QtWidgets.QLabel(parent=self)
jobidl.setText("Job id: ")
# jobidl = QtWidgets.QLabel(parent=self)
# jobidl.setText("Job id: ")

# combo box to select the job id
formWidget = UIFormFactory.getQWidget(parent=self)
combo = QtWidgets.QComboBox(formWidget.groupBox)
for job in self.job_ids:
combo.addItem(str(job))
formWidget.addWidget(combo,'Job id:', 'job_selector')

status = QtWidgets.QLabel(formWidget.groupBox)
formWidget.addWidget(status, 'Status:', 'status')

# progress bar
prog_bar = QtWidgets.QProgressBar(parent=self)
Expand All @@ -180,7 +210,8 @@ def __init__(self, parent, title, connection_details):
cat.setMinimumWidth(560)

vert_layout = QtWidgets.QVBoxLayout(self)
vert_layout.addWidget(jobidl)
# vert_layout.addWidget(jobidl)
vert_layout.addWidget(formWidget)
vert_layout.addWidget(prog_bar)
vert_layout.addWidget(cat)
# finally
Expand All @@ -192,20 +223,39 @@ def __init__(self, parent, title, connection_details):

# save references
self.widgets = {'layout': vert_layout,
'buttonBox': bb, 'textEdit': cat, 'jobid': jobidl,
'progressBar': prog_bar}

# store a reference
self.threadpool = QtCore.QThreadPool()
'buttonBox': bb, 'textEdit': cat,
# 'jobid': jobidl,
'progressBar': prog_bar, 'formWidget': formWidget, 'job_selector': combo}


self.connection_details = connection_details

# store a reference
# self.threadpool = QtCore.QThreadPool()
# # create a RemoteRunControl
# self.runner = RemoteRunControl()
# self.runner.connection_details = connection_details
# folder=dpath.abspath("/work3/cse/dvc/test-edo")
# logfile = dpath.join(folder, "remotedvc.out")
# self.runner.dvclog_fname = logfile

# create a RemoteRunControl
self.runner = RemoteRunControl()
self.runner.connection_details = connection_details
folder=dpath.abspath("/work3/cse/dvc/test-edo")
logfile = dpath.join(folder, "remotedvc.out")
self.runner.dvclog_fname = logfile
@property
def job_ids(self):
return self._job_ids
@job_ids.setter
def job_ids(self, value):
self._job_ids = list(value)

def monitor_job(self):
print ("should know which job this is! {}".format(self.job_ids))
jobid = self.widgets['job_selector'].currentText()
run_control = self.parent_app.run_control
run_control.signals.message.connect(self.appendText)
run_control.signals.progress.connect(self.update_progress)
run_control.signals.finished.connect(lambda: self.reset_interface() )
run_control.signals.status.connect(self.update_status)
# run_control.internalsignals.cancelled(self.on_cancelled)


@property
def Ok(self):
Expand All @@ -219,74 +269,54 @@ def Apply(self):
def Abort(self):
return self.widgets['buttonBox'].button(QtWidgets.QDialogButtonBox.Abort)



def run_job(self):
if hasattr(self, 'connection_details'):
username = self.connection_details['username']
port = self.connection_details['server_port']
host = self.connection_details['server_name']
private_key = self.connection_details['private_key']
folder=dpath.abspath("/work3/cse/dvc/test-edo")
logfile = dpath.join(folder, "remotedvc.out")
print ("logfile", logfile)
self.dvcWorker = Worker(self.runner.run_dvc_worker,
host=host, username=username, port=port,
private_key=private_key, logfile=logfile, update_delay=10)
# connect signal/slots
self.dvcWorker.signals.message.connect(self.appendText)
self.dvcWorker.signals.progress.connect(self.update_progress)
self.dvcWorker.signals.finished.connect(lambda: self.reset_interface() )
self.dvcWorker.signals.status.connect(self.update_status)


self.threadpool.start(self.dvcWorker)

self.Apply.setEnabled(False)
self.Apply.setText("Queueing")
else:
print ("No connection details")
def reset_interface(self):
self.Apply.setEnabled(True)
self.Apply.setText("Submit job")

#
import re
m = re.search("^Job ([0-9]*)", self.widgets['jobid'].text())
jid = ''
if m is not None:
jid = m.group(0)
self.widgets['jobid'].setText('{}: Finished'.format(jid))
# check if the job had been cancelled
run_control = self.parent_app.run_control
selected_job = self.widgets['job_selector'].currentText()
status = run_control.jobs[selected_job]
self.widgets['formWidget'].widgets['status_field'].setText(status)

def cancel_job(self):
print ("Should cancel running job")
self.widgets['jobid'].setText("Canceling job")
self.runner.cancel_job()
self.widgets['jobid'].setText("Job cancelled")
# self.statusBar().showMessage("Canceling job")
selected_job = self.widgets['job_selector'].currentText()
run_control = self.parent_app.run_control
run_control.cancel_job(selected_job)
# self.statusBar().showMessage("Job cancelled")
self.widgets['progressBar'].setValue(0)

def appendText(self, txt):
print ("DVCSLURMProgressDialog appendText")
self.widgets['textEdit'].append(txt)

def update_progress(self, value):
print ("DVCSLURMProgressDialog update_progress")
self.widgets['progressBar'].setValue(value)

def update_status(self, update):

self.widgets['jobid'].setText("Job {}: {}".format(*update))
print ("DVCSLURMProgressDialog update_status")
self.widgets['formWidget'].widgets['status_field'].setText(update[1])
# self.statusBar().showMessage("Job {}: {}".format(*update))
if update[1] == b'PENDING':
self.widgets['progressBar'].setMaximum(0)
self.widgets['progressBar'].setMinimum(0)
self.widgets['progressBar'].setValue(0)
else:
self.widgets['progressBar'].setMaximum(100)

def on_cancelled(self):
#self.
jobid = self.widgets['job_selector'].currentText()





import functools

class RemoteRunControlSignals(QtCore.QObject):
status = QtCore.Signal(str)
status = QtCore.Signal(tuple)
job_id = QtCore.Signal(int)

class RemoteRunControl(object):
Expand All @@ -301,14 +331,23 @@ def __init__(self, parent=None, connection_details=None,
self.conn = None
self._jobid = None
self._job_status = None
self.jobs = {}

self.internalsignals = RemoteRunControlSignals()
self.internalsignals.job_id.connect(self.job_id)
self.internalsignals.status.connect(self.job_status)
self.internalsignals.job_id.connect(self.set_job_id)
self.internalsignals.status.connect(self.set_job_status)

self.threadpool = QtCore.QThreadPool()
self.dvcWorker = None


def set_job_id(self, value):
self.job_id = value
self.jobs[value] = None
# attach finished signal
# self.dvcWorker.signals.finished.connect(lambda: self.job_finished())
def set_job_status(self, value):
self.job_status = value[1]
self.jobs[value[0]] = value[1]

def create_job(self):
if not self.check_configuration():
Expand All @@ -322,7 +361,8 @@ def create_job(self):
host=host, username=username, port=port,
private_key=private_key, logfile=self.dvclog_fname,
update_delay=10)
# signal/slots should be connected from outside

# other signal/slots should be connected from outside

@property
def signals(self):
Expand Down Expand Up @@ -385,7 +425,7 @@ def job_status(self, value):
if self.job_id is not None:
self._job_status = value

@pysnooper.snoop()

def check_configuration(self):
def f (a,x,y):
return x in a and y
Expand Down Expand Up @@ -507,6 +547,7 @@ def run_dvc_worker(self, **kwargs):
i+=1
# widgets['jobid'].setText("Job id: {} {}".format(jobid, str(status)))
status_callback.emit((jobid, status.decode('utf-8')))
self.internalsignals.status.emit((jobid, status.decode('utf-8')))
if status == b'PENDING':
print("job is queueing")
# message_callback.emit("Job {} queueing".format(jobid))
Expand Down Expand Up @@ -558,18 +599,23 @@ def progress(line):
# output_filename\t{1}/small_grid\t### base name for output files

a.logout()
self.internalsignals.status.emit((jobid, 'FINISHED'))


def cancel_job(self):
def cancel_job(self, job_id):
host = self.connection_details['server_name']
username = self.connection_details['username']
port = self.connection_details['server_port']
private_key = self.connection_details['private_key']

a=drx.DVCRem(host=host,username=username,port=22,private_key=private_key)
a.login(passphrase=False)
a.job_cancel(self.job_id)
self.internalsignals.status.emit((job_id, "CANCELLING"))
a.job_cancel(job_id)
self.internalsignals.status.emit((job_id, "CANCELLED"))
a.logout()



def pytail(self, connection, logfile, start_at):

Expand Down

0 comments on commit 07d167b

Please sign in to comment.