Skip to content

Commit

Permalink
Merge pull request #39 from sssata/add-updater
Browse files Browse the repository at this point in the history
Add firmware updater, other bug fixes
  • Loading branch information
sssata committed Oct 15, 2023
2 parents 1e23eb5 + 8494325 commit bb26f30
Show file tree
Hide file tree
Showing 7 changed files with 267 additions and 82 deletions.
Binary file added APP/binaries/firmware.uf2
Binary file not shown.
Binary file removed APP/binaries/flux_arduino.ino.bin
Binary file not shown.
305 changes: 233 additions & 72 deletions APP/firmware_updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,40 +8,232 @@
import logging
import traceback
import pathlib
import threading
import os
import shutil
import traceback

from fluxpad_interface import Fluxpad

BOSSAC_PATH = (pathlib.Path(__file__).parent / "tools" / "bossac.exe").resolve()
# BOSSAC_PATH = (pathlib.Path(__file__).parent / "tools" / "bossac.exe").resolve()

BUFFER_SIZE = 128 * 1024

class SameFileError(OSError):
"""Raised when source and destination are the same file."""


def copy_with_callback(
src, dest, callback=None, buffer_size=BUFFER_SIZE
):
""" Copy file with a callback.
callback, if provided, must be a callable and will be
called after ever buffer_size bytes are copied.
Args:
src: source file, must exist
dest: destination path; if an existing directory,
file will be copied to the directory;
if it is not a directory, assumed to be destination filename
callback: callable to call after every buffer_size bytes are copied
callback will called as callback(bytes_copied since last callback, total bytes copied, total bytes in source file)
buffer_size: how many bytes to copy before each call to the callback, default = 4Mb
Returns:
Full path to destination file
Raises:
FileNotFoundError if src doesn't exist
SameFileError if src and dest are the same file
Note: Does not copy extended attributes, resource forks or other metadata.
"""

srcfile = pathlib.Path(src)
destpath = pathlib.Path(dest)

if not srcfile.is_file():
raise FileNotFoundError(f"src file `{src}` doesn't exist")

destfile = destpath / srcfile.name if destpath.is_dir() else destpath

if destfile.exists() and srcfile.samefile(destfile):
raise SameFileError(
f"source file `{src}` and destinaton file `{dest}` are the same file."
)

if callback is not None and not callable(callback):
raise ValueError("callback is not callable")

size = os.stat(src).st_size
with open(srcfile, "rb") as fsrc:
with open(destfile, "wb") as fdest:
_copyfileobj(
fsrc, fdest, callback=callback, total=size, length=buffer_size
)
return str(destfile)


def _copyfileobj(fsrc, fdest, callback, total, length):
""" copy from fsrc to fdest
Args:
fsrc: filehandle to source file
fdest: filehandle to destination file
callback: callable callback that will be called after every length bytes copied
total: total bytes in source file (will be passed to callback)
length: how many bytes to copy at once (between calls to callback)
"""
copied = 0
while True:
buf = fsrc.read(length)
if not buf:
break
fdest.write(buf)
copied += len(buf)
if callback is not None:
callback(len(buf), copied, total)

class FirmwareUploadProgress:
# Object to hold upload progress info

def __init__(self):
self.lock = threading.RLock()
self.progress_percent = 0.0
self.current_step = "None"
self.is_done = False
self.error_string = ""
self.update_event = threading.Event()


def upload_firmware_threaded(port: serial.Serial, bin_path: pathlib.Path):
progress = FirmwareUploadProgress()
fw_upload_thread = threading.Thread(target=_upload_firmware, args=(progress, port, bin_path), name="fwupdatethread", daemon=True)
fw_upload_thread.start()
return progress

def _upload_firmware(progress: FirmwareUploadProgress, port: serial.Serial, bin_path: pathlib.Path):

def copy_progress_callback(buf_size, copied, total):
with progress.lock:
progress.progress_percent = 30 + 70 * copied / total
progress.update_event.set()
print(progress.progress_percent)

try:
with progress.lock:
progress.current_step = "Resetting to bootloader"
progress.progress_percent = 10
progress.update_event.set()
_reset_port(port)

with progress.lock:
progress.current_step = "Waiting for enumeration"
progress.progress_percent = 20
progress.update_event.set()
rpi_drive = _listen_for_rpi_drive()

with progress.lock:
progress.current_step = "Uploading firmware"
progress.progress_percent = 30
progress.update_event.set()

dest_file = rpi_drive / bin_path.name
copy_with_callback(bin_path, dest_file, copy_progress_callback)

with progress.lock:
progress.current_step = "Done"
progress.is_done = True
progress.progress_percent = 100
progress.update_event.set()

except Exception:
with progress.lock:
progress.error_string = traceback.format_exc()
progress.is_done = True
progress.update_event.set()


def _listen_for_rpi_drive(timeout_s: float = 5, period_s: float = 0.3):

start_time_s = time.monotonic()
while start_time_s - time.monotonic() < timeout_s :
for drive in _list_available_drives():
print(drive)
if _has_info_uf2_file(drive):
return drive

time.sleep(period_s)


def _list_available_drives():
"""
Detects and returns a list of available drives on the system.
This function finds available drives on Unix/Linux/MacOS in common mount points like '/media' and '/mnt',
and on Windows, by iterating through drive letters from 'A' to 'Z'.
Returns:
list of pathlib.Path objects: A list of pathlib.Path objects representing available drives.
"""
drives = []
if os.name == 'posix': # Unix/Linux/MacOS
# On Unix-based systems, drives are typically mounted in /media or /mnt
mounts = ['/media', '/mnt']
for mount in mounts:
mount_path = pathlib.Path(mount)
if mount_path.is_dir():
drives.extend([entry for entry in mount_path.iterdir() if entry.is_dir() and not entry.name.startswith('.')])
elif os.name == 'nt': # Windows
# On Windows, you can list drives by iterating from 'A' to 'Z'
drives = [pathlib.Path(f'{chr(d)}:') for d in range(65, 91) if pathlib.Path(f'{chr(d)}:').is_dir()]
return drives


def _has_info_uf2_file(directory_path: pathlib.Path):
"""
Checks if the given pathlib directory contains a file named 'INFO_UF2.TXT'.
Parameters:
directory_path (pathlib.Path): The path to the directory to be checked.
Returns:
bool: True if 'INFO_UF2.TXT' file exists in the directory, False otherwise.
"""
info_uf2_file_path = directory_path / 'INFO_UF2.TXT'
return info_uf2_file_path.is_file()

def _reset_port(port: serial.Serial):
"""Resets given port by opening and closing port at 1200 baud"""
port.close()
time.sleep(0.1)
# assert not port.is_open
port.baudrate = 1200
port.open()
time.sleep(0.5)
port.close()
time.sleep(0.2)


class FirmwareUpdateFrame(ttk.Labelframe):

def __init__(self, master, *args, **kwargs):
def __init__(self, master, firmware_dir: pathlib.Path, *args, **kwargs):
super().__init__(master, *args, **kwargs)

# Firmware Update elements
self.update_frame = ttk.Labelframe(self, text="Firmware Update")
self.update_frame.grid(row=1, column=0, padx=5, pady=5, sticky="W")
# self.update_frame = ttk.Labelframe(self, text="Firmware Update")
self.configure(text="Firmware Update")
# self.update_frame.grid(row=1, column=0, padx=5, pady=5, sticky="W")

self.label_progress = ttk.Label(self, text="FW Update Progress")
self.label_progress.grid(row=1, column=0, padx=5, pady=5, sticky="W")

self.progressbar_update = ttk.Progressbar(self, mode="determinate", orient="horizontal", length=200, maximum=100)
self.progressbar_update.grid(row=2, column=0, padx=5, pady=5, sticky="W")

self.btn_update = ttk.Button(
self.update_frame, text="Update", command=self.upload_firmware_callback
self, text="Update", command=self.upload_firmware_callback
)
self.btn_update["state"] = "disabled"
self.btn_update.grid(row=1, column=0, padx=5, pady=5, sticky="W")
self.fetch_releases_button = ttk.Button(
self.update_frame,
text="Fetch Releases",
# command=self.fetch_firmware_callback,
)
self.fetch_releases_button.grid(row=0, column=1)
self.firmware_combobox = ttk.Combobox(
self.update_frame,
# textvariable=self.selected_firmware_release,
width=10
)
self.firmware_combobox.grid(row=0, column=0)
self.btn_update.grid(row=3, column=0, padx=5, pady=5, sticky="W")

self.fluxpad: Optional[Fluxpad] = None
self.stop_listener_callback = None

self.fw_bin_path = (firmware_dir / "firmware.uf2").resolve()

def set_fluxpad(self, fluxpad: Fluxpad):
def set_fluxpad(self, fluxpad: Optional[Fluxpad]):
self.fluxpad = fluxpad

def disable_update(self):
Expand All @@ -50,62 +242,31 @@ def disable_update(self):
def enable_update(self):
self.btn_update.state(["!disabled"])

def set_stop_listener_callback(self, callback):
self.stop_listener_callback = callback

def upload_firmware_callback(self):

try:
self.stop_listener_callback()
assert self.fluxpad is not None
bin_path = pathlib.Path(pathlib.Path(__file__).parent / "binaries" / "flux_arduino.ino.bin").resolve()
upload_firmware(self.fluxpad.port, str(bin_path))
progress = upload_firmware_threaded(self.fluxpad.port, self.fw_bin_path)
while progress.update_event.wait(timeout=10):
print("waited")
with progress.lock:
self.label_progress.configure(text=progress.current_step)
self.progressbar_update.configure(value=progress.progress_percent)
progress.update_event.clear()
self.progressbar_update.update()
self.label_progress.update()
if progress.is_done:
print("isdone")
break
if progress.error_string:
raise Exception(progress.error_string)

self.label_progress.configure(text="Done")
self.progressbar_update.configure(value=0)

except Exception:
logging.error(f"Failed to upload firmware", exc_info=True)
messagebox.showerror("Exception", f"Failed to load image\n\n{traceback.format_exc()}")

def upload_firmware(port: serial.Serial, bin_path: str):

_reset_port(port)
_bossac_upload(port.name, str(BOSSAC_PATH), bin_path)


def _reset_port(port: serial.Serial):
"""Resets given port by opening and closing port twice within 500ms at 1200 baud"""
port.close()
time.sleep(2)
assert not port.is_open
port.baudrate = 1200
port.open()
time.sleep(0.05)
port.close()
time.sleep(0.05)
port.open()
time.sleep(0.05)
port.close()
# Wait for reset and enumeration
time.sleep(2)

def _bossac_upload(port: str, bossac_path: str, bin_path: str):
"""Run Bossac to upload given bin file to given serial port"""

args = (
f"{bossac_path}",
# f"--info",
"-p", # Select port
f"{port}",
"-U",
"true",
"-i", # INFO
"-e", # Erase
"-w", # Write
"-v", # Verify
f"{bin_path}",
"-R",
)
print(args)

popen = subprocess.run(
args=args, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
output = popen.stdout.decode("ASCII")
err = popen.stderr.decode("ASCII")
logging.info(output)
logging.info(err)
messagebox.showerror("Exception", f"Failed to update firmware\n\n{traceback.format_exc()}")
Loading

0 comments on commit bb26f30

Please sign in to comment.