Skip to content

Commit

Permalink
Merge pull request #7 from Vilez0/encryption_support
Browse files Browse the repository at this point in the history
filesystem repairing encryption support
  • Loading branch information
sulincix authored Jun 25, 2024
2 parents 4b31758 + 93f836a commit dec17c4
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 66 deletions.
148 changes: 83 additions & 65 deletions src/Main.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,29 +265,46 @@ def pre():
self.update_status_page(_("Detecting Partitions"), "content-loading-symbolic", _(
"We're scanning your system to locate available partitions."), False, False)

partitions = self.list_partitions()
if len(partitions) == 0:
self.partitions = self.list_partitions()
if len(self.partitions) == 0:
self.update_status_page(_("Unable to Detect Partitions"), "dialog-error-symbolic", _(
"We couldn't find any partitions on your system. This could indicate a problem with your disk or partition table. Please double-check your disk connections and configuration."), True, True)
return

partition_names = [part.name for part in partitions]
partition_os = [part.operating_system for part in partitions]
partition_names = [part.name for part in self.partitions]
partition_os = [part.operating_system for part in self.partitions]

self.repair_page = self.new_page_listbox(
_("Choose Partition for Filesystem Repair"), partition_names, partition_os, after_userdata)

def after_userdata(widget, userdata):
partition_for_repair = self.repair_page.listbox.get_selected_row().get_title()
self.update_status_page(_("Repairing Filesystem on {}").format(partition_for_repair), "content-loading-symbolic", _(
def after_userdata(widget=None, userdata=None, partition=None):
if partition == None:
selected = self.repair_page.listbox.get_selected_row().get_title()
part = next((x for x in self.partitions if x.name == selected), None)
return process_partition(after_userdata, part)
else:
part = partition

self.update_status_page(_("Repairing Filesystem on {}").format(part.path), "content-loading-symbolic", _(
"We're currently repairing the filesystem on the selected partition. This process may take some time, depending on the size and severity of the issues found. Please be patient while we work to restore the partition's functionality."), False, False)
self.vte_command(
"env disk={} check-filesystem".format(partition_for_repair), post)
"env disk={} check-filesystem".format(part.name), post)

def post():
self.update_status_page(_("Filesystem Repair Successful"), "emblem-ok-symbolic", _(
"The filesystem has been successfully repaired. Your data should now be accessible without any issues."), True, True)

def process_partition(pending_func, part):
if part.is_luks:
self.unlock_luks(part, process_partition, pending_func)
return None
if part.is_lvm:
self.mount_lvm(part, process_partition, pending_func)
return None

if pending_func != None:
Thread(target=pending_func, args=(None, None, part)).start()

self.row_init_func(pre)

def on_row_reset_config_activated(self, widget):
Expand Down Expand Up @@ -424,40 +441,43 @@ def run_command(self, command: str, return_exitcode=False):

def get_rootfs(self, widget, pending_func):
def pre():
if not hasattr(self, 'rootfs') or self.rootfs == None:
self.rootfs_list = self.detect_rootfs()
if self.rootfs_list == None or len(self.rootfs_list) == 0:
self.update_status_page(_("Root Filesystem Missing"), "dialog-error-symbolic", _(
"We couldn't locate the root filesystem on your system. This could be due to a disk failure, misconfiguration, or other issues. Please ensure that your disk is properly connected and configured."), True, True)
return None
elif len(self.rootfs_list) > 1:
partition_names = [part.name for part in self.rootfs_list]
partition_os = [
part.operating_system for part in self.rootfs_list]
self.rootfs_page = self.new_page_listbox(
_("Select a root filesystem"), partition_names, partition_os, post, pending_func)
return None
self.rootfs = self.rootfs_list[0]

if self.rootfs.is_luks:
self.unlock_luks(self.rootfs, pending_func)
return
if self.rootfs.is_lvm:
self.mount_lvm(self.rootfs, pending_func)
return
return self.rootfs
if hasattr(self, 'rootfs') and self.rootfs != None:
return self.rootfs
self.rootfs_list = self.detect_rootfs()
if self.rootfs_list == None or len(self.rootfs_list) == 0:
self.update_status_page(_("Root Filesystem Missing"), "dialog-error-symbolic", _(
"We couldn't locate the root filesystem on your system. This could be due to a disk failure, misconfiguration, or other issues. Please ensure that your disk is properly connected and configured."), True, True)
return None
elif len(self.rootfs_list) > 1:
partition_names = [part.name for part in self.rootfs_list]
partition_os = [
part.operating_system for part in self.rootfs_list]
self.rootfs_page = self.new_page_listbox(
_("Select a root filesystem"), partition_names, partition_os, post, pending_func)
return None
process_partition(pending_func, self.rootfs_list[0])
return None

def post(widget, pending_func):
selected = self.rootfs_page.listbox.get_selected_row().get_title()
self.rootfs = next(
rootfs = next(
(x for x in self.rootfs_list if x.name == selected), None)
if self.rootfs.is_luks:
self.unlock_luks(self.rootfs, pending_func)
return
if self.rootfs.is_lvm:
self.mount_lvm(self.rootfs, pending_func)
return
process_partition(pending_func, rootfs)

def process_partition(pending_func, part):

if part.is_luks:
self.unlock_luks(part, process_partition, pending_func)
return None
if part.is_lvm:
self.mount_lvm(part, process_partition, pending_func)
return None

part = self.check_if_rootfs(part)
if part.is_rootfs:
self.rootfs = part
else:
self.rootfs = None
self.update_status_page(_("Root Filesystem Chosen"), "emblem-ok-symbolic", _(
"You've selected the root filesystem for further action."), False, False)
if pending_func != None:
Expand Down Expand Up @@ -595,14 +615,14 @@ def detect_btrfs_rootfs_subvolume(self, mountdir):
return subvol, is_rootfs
return None, False

def unlock_luks(self, luks_part, pending_func= None):
def unlock_luks(self, part, handler_func=None, pending_func= None):
def pre():
self.luks_page = self.new_page_input(
_("Enter LUKS Password"), after_userdata, (luks_part, pending_func))
_("Enter LUKS Password"), after_userdata, (part, handler_func, pending_func))

def after_userdata(widget, userdata):
password = self.luks_page.entry.get_text()
part, pending_func = userdata
part, handler_func, pending_func = userdata

self.update_status_page(_("Unlocking Encrypted Device"), "content-loading-symbolic", _(
"We're unlocking the encrypted device to access the data. This process may take a moment. Please wait while we unlock the device."), False, False)
Expand All @@ -627,26 +647,23 @@ def after_userdata(widget, userdata):
part.path = "/dev/mapper/luks-{}".format(part.name)
part.name = "/mapper/luks-{}".format(part.name)
part.fstype = self.run_command('lsblk -no FSTYPE {}'.format(part.path)).strip()
part.is_luks = False
if part.fstype == "LVM2_member":
part.is_lvm = True
self.mount_lvm(part, pending_func)
self.mount_lvm(part, handler_func, pending_func)
return
if part.fstype == "crypto_LUKS":
return

part = self.check_if_rootfs(part)
if part.is_rootfs:
self.rootfs = part
else:
self.rootfs = None

if pending_func != None:
if handler_func != None:
Thread(target=handler_func, args=(pending_func, part)).start()
elif pending_func != None:
Thread(target=pending_func).start()
pre()

def mount_lvm(self, lvm_part, pending_func):
def mount_lvm(self, part, handler_func=None, pending_func=None):
def pre():
vg_names = self.run_command("pvs -o vg_name --noheadings --select pv_name={}".format(lvm_part.path)).split("\n")
vg_names = self.run_command(
"pvs -o vg_name --noheadings --select pv_name={}".format(part.path)).split("\n")
if len(vg_names) == 0:
self.update_status_page(_("No Volume Groups Detected"), "dialog-error-symbolic", _(
"We couldn't find any volume groups on your system. This could indicate an issue with your LVM configuration. Please ensure that your LVM setup is correct."), True, True)
Expand All @@ -655,12 +672,12 @@ def pre():
for vg in vg_names:
vg = vg.strip()
self.vg_page = self.new_page_listbox(
_("Select a Volume Group"), vg_names, None, after_userdata, (lvm_part, pending_func))
_("Select a Volume Group"), vg_names, None, after_userdata, (part, handler_func, pending_func))
return None
after_userdata(None, (lvm_part, pending_func), vg_names[0])
after_userdata(None, (part, handler_func, pending_func), vg_names[0])

def after_userdata(widget, userdata, vg_name=None):
lvm_part, pending_func = userdata
part, handler_func, pending_func = userdata
if vg_name == None:
vg_name = self.vg_page.listbox.get_selected_row().get_title()
self.run_command("vgchange -ay {}".format(vg_name))
Expand All @@ -678,26 +695,24 @@ def after_userdata(widget, userdata, vg_name=None):
return None
elif len(LV_NAMES) > 1:
self.lv_page = self.new_page_listbox(
_("Select a Logical Volume"), LV_NAMES, None, after_lvm_selection, (vg_name, lvm_part, pending_func))
_("Select a Logical Volume"), LV_NAMES, None, after_lvm_selection, (vg_name, part, handler_func, pending_func))
return None

after_lvm_selection(None, (vg_name, lvm_part, pending_func), LV_NAMES[0])
after_lvm_selection(None, (vg_name, part, handler_func, pending_func), LV_NAMES[0])

def after_lvm_selection(widget, userdata, lv_name=None):
vg_name, lvm_part, pending_func = userdata
vg_name, part, handler_func, pending_func = userdata
if lv_name == None:
lv_name = self.lv_page.listbox.get_selected_row().get_title()

lvm_part.path = "/dev/{}/{}".format(vg_name, lv_name)
lvm_part.name = "{}/{}".format(vg_name, lv_name)
part.path = "/dev/{}/{}".format(vg_name, lv_name)
part.name = "{}/{}".format(vg_name, lv_name)
part.is_lvm = False

lvm_part = self.check_if_rootfs(lvm_part)
if lvm_part.is_rootfs:
self.rootfs = lvm_part
else:
self.rootfs = None

if pending_func != None:
if handler_func != None:
Thread(target=handler_func, args=(pending_func, part)).start()
elif pending_func != None:
Thread(target=pending_func).start()
Thread(target=pre).start()

Expand All @@ -724,6 +739,9 @@ def list_partitions(self):
if output == None:
continue
partition.__setattr__(x.lower(), output.strip())
partition.is_luks = (partition.fstype == "crypto_LUKS")
partition.is_lvm = (partition.fstype == "LVM2_member")

partitions.append(partition)

partitions = self.get_operating_system(partitions)
Expand Down
2 changes: 1 addition & 1 deletion src/scripts/check-filesystem
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ while [[ ! -b /dev/$disk ]] ; do
echo -n "partition >>> "
read disk
done
fs=$(blkid | grep /dev/$disk | sed "s/.*TYPE=\"//g;s/\".*//g")
fs="$(lsblk -rno FSTYPE /dev/$disk | head -n 1)"

source /usr/bin/pardus-open-luks-lvm

Expand Down

0 comments on commit dec17c4

Please sign in to comment.