From 47a580dd77f796490435faf5a6a3f6c63cb3d5ad Mon Sep 17 00:00:00 2001 From: h4sh Date: Sat, 23 Sep 2023 23:05:21 +1000 Subject: [PATCH] test improvements --- test/conftest.py | 65 ++++++++---- test/pytest.ini | 2 + test/test_sshfs.py | 246 ++++++++++++++++++++++++++++----------------- test/util.py | 73 +++++++++----- 4 files changed, 247 insertions(+), 139 deletions(-) diff --git a/test/conftest.py b/test/conftest.py index 70cd0c62..9416dde4 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -3,24 +3,28 @@ import time import re -# If a test fails, wait a moment before retrieving the captured -# stdout/stderr. When using a server process, this makes sure that we capture -# any potential output of the server that comes *after* a test has failed. For -# example, if a request handler raises an exception, the server first signals an -# error to FUSE (causing the test to fail), and then logs the exception. Without -# the extra delay, the exception will go into nowhere. -@pytest.mark.hookwrapper +# If a test fails, wait a moment before retrieving the captured stdout/stderr. +# When using a server process, this makes sure that we capture any potential +# output of the server that comes *after* a test has failed. For example, if a +# request handler raises an exception, the server first signals an error to +# FUSE (causing the test to fail), and then logs the exception. Without the +# extra delay, the exception will go into nowhere. + + +@pytest.hookimpl(hookwrapper=True) def pytest_pyfunc_call(pyfuncitem): outcome = yield failed = outcome.excinfo is not None if failed: time.sleep(1) + @pytest.fixture() def pass_capfd(request, capfd): - '''Provide capfd object to UnitTest instances''' + """Provide capfd object to UnitTest instances""" request.instance.capfd = capfd + def check_test_output(capfd): (stdout, stderr) = capfd.readouterr() @@ -31,39 +35,57 @@ def check_test_output(capfd): # Strip out false positives for (pattern, flags, count) in capfd.false_positives: cp = re.compile(pattern, flags) - (stdout, cnt) = cp.subn('', stdout, count=count) + (stdout, cnt) = cp.subn("", stdout, count=count) if count == 0 or count - cnt > 0: - stderr = cp.sub('', stderr, count=count - cnt) + stderr = cp.sub("", stderr, count=count - cnt) - patterns = [ r'\b{}\b'.format(x) for x in - ('exception', 'error', 'warning', 'fatal', 'traceback', - 'fault', 'crash(?:ed)?', 'abort(?:ed)', - 'uninitiali[zs]ed') ] - patterns += ['^==[0-9]+== '] + patterns = [ + r"\b{}\b".format(x) + for x in ( + "exception", + "error", + "warning", + "fatal", + "traceback", + "fault", + "crash(?:ed)?", + "abort(?:ed)", + "uninitiali[zs]ed", + ) + ] + patterns += ["^==[0-9]+== "] for pattern in patterns: cp = re.compile(pattern, re.IGNORECASE | re.MULTILINE) hit = cp.search(stderr) if hit: - raise AssertionError('Suspicious output to stderr (matched "%s")' % hit.group(0)) + raise AssertionError( + 'Suspicious output to stderr (matched "%s")' % hit.group(0) + ) hit = cp.search(stdout) if hit: - raise AssertionError('Suspicious output to stdout (matched "%s")' % hit.group(0)) + raise AssertionError( + 'Suspicious output to stdout (matched "%s")' % hit.group(0) + ) + def register_output(self, pattern, count=1, flags=re.MULTILINE): - '''Register *pattern* as false positive for output checking + """Register *pattern* as false positive for output checking This prevents the test from failing because the output otherwise appears suspicious. - ''' + """ self.false_positives.append((pattern, flags, count)) + # This is a terrible hack that allows us to access the fixtures from the # pytest_runtest_call hook. Among a lot of other hidden assumptions, it probably # relies on tests running sequential (i.e., don't dare to use e.g. the xdist # plugin) current_capfd = None -@pytest.yield_fixture(autouse=True) + + +@pytest.fixture(autouse=True) def save_cap_fixtures(request, capfd): global current_capfd capfd.false_positives = [] @@ -71,7 +93,7 @@ def save_cap_fixtures(request, capfd): # Monkeypatch in a function to register false positives type(capfd).register_output = register_output - if request.config.getoption('capture') == 'no': + if request.config.getoption("capture") == "no": capfd = None current_capfd = capfd bak = current_capfd @@ -82,6 +104,7 @@ def save_cap_fixtures(request, capfd): assert bak is current_capfd current_capfd = None + @pytest.hookimpl(trylast=True) def pytest_runtest_call(item): capfd = current_capfd diff --git a/test/pytest.ini b/test/pytest.ini index 95161546..95edd52c 100644 --- a/test/pytest.ini +++ b/test/pytest.ini @@ -1,2 +1,4 @@ [pytest] addopts = --verbose --assert=rewrite --tb=native -x -r a + +markers = uses_fuse: Mark to indicate that FUSE is available on the system running the test diff --git a/test/test_sshfs.py b/test/test_sshfs.py index 1724555d..d0f47ab4 100755 --- a/test/test_sshfs.py +++ b/test/test_sshfs.py @@ -1,8 +1,9 @@ #!/usr/bin/env python3 -if __name__ == '__main__': +if __name__ == "__main__": import pytest import sys + sys.exit(pytest.main([__file__] + sys.argv[1:])) import subprocess @@ -13,31 +14,60 @@ import shutil import filecmp import errno -from contextlib import contextmanager from tempfile import NamedTemporaryFile -from util import (wait_for_mount, umount, cleanup, base_cmdline, - basename, fuse_test_marker, safe_sleep) +from util import ( + wait_for_mount, + umount, + cleanup, + base_cmdline, + basename, + fuse_test_marker, + safe_sleep, + os_create, + os_open, +) from os.path import join as pjoin TEST_FILE = __file__ pytestmark = fuse_test_marker() -with open(TEST_FILE, 'rb') as fh: +with open(TEST_FILE, "rb") as fh: TEST_DATA = fh.read() -def name_generator(__ctr=[0]): - __ctr[0] += 1 - return 'testfile_%d' % __ctr[0] -@pytest.mark.parametrize("debug", (False, True)) -@pytest.mark.parametrize("cache_timeout", (0,1)) -@pytest.mark.parametrize("sync_rd", (True, False)) -@pytest.mark.parametrize("multiconn", (True,False)) -def test_sshfs(tmpdir, debug, cache_timeout, sync_rd, multiconn, capfd): +def name_generator(__ctr=[0]) -> str: + """Generate a fresh filename on each call""" + + __ctr[0] += 1 + return f"testfile_{__ctr[0]}" + + +@pytest.mark.parametrize( + "debug", + [pytest.param(False, id="debug=false"), pytest.param(True, id="debug=true")], +) +@pytest.mark.parametrize( + "cache_timeout", + [pytest.param(0, id="cache_timeout=0"), pytest.param(1, id="cache_timeout=1")], +) +@pytest.mark.parametrize( + "sync_rd", + [pytest.param(True, id="sync_rd=true"), pytest.param(False, id="sync_rd=false")], +) +@pytest.mark.parametrize( + "multiconn", + [ + pytest.param(True, id="multiconn=true"), + pytest.param(False, id="multiconn=false"), + ], +) +def test_sshfs( + tmpdir, debug: bool, cache_timeout: int, sync_rd: bool, multiconn: bool, capfd +) -> None: # Avoid false positives from debug messages - #if debug: + # if debug: # capfd.register_output(r'^ unique: [0-9]+, error: -[0-9]+ .+$', # count=0) @@ -46,45 +76,60 @@ def test_sshfs(tmpdir, debug, cache_timeout, sync_rd, multiconn, capfd): # Test if we can ssh into localhost without password try: - res = subprocess.call(['ssh', '-o', 'KbdInteractiveAuthentication=no', - '-o', 'ChallengeResponseAuthentication=no', - '-o', 'PasswordAuthentication=no', - 'localhost', '--', 'true'], stdin=subprocess.DEVNULL, - timeout=10) + res = subprocess.call( + [ + "ssh", + "-o", + "StrictHostKeyChecking=no", + "-o", + "KbdInteractiveAuthentication=no", + "-o", + "ChallengeResponseAuthentication=no", + "-o", + "PasswordAuthentication=no", + "localhost", + "--", + "true", + ], + stdin=subprocess.DEVNULL, + timeout=10, + ) except subprocess.TimeoutExpired: res = 1 if res != 0: - pytest.fail('Unable to ssh into localhost without password prompt.') + pytest.fail("Unable to ssh into localhost without password prompt.") - mnt_dir = str(tmpdir.mkdir('mnt')) - src_dir = str(tmpdir.mkdir('src')) + mnt_dir = str(tmpdir.mkdir("mnt")) + src_dir = str(tmpdir.mkdir("src")) - cmdline = base_cmdline + [ pjoin(basename, 'sshfs'), - '-f', 'localhost:' + src_dir, mnt_dir ] + cmdline = base_cmdline + [ + pjoin(basename, "sshfs"), + "-f", + f"localhost:{src_dir}", + mnt_dir, + ] if debug: - cmdline += [ '-o', 'sshfs_debug' ] + cmdline += ["-o", "sshfs_debug"] if sync_rd: - cmdline += [ '-o', 'sync_readdir' ] + cmdline += ["-o", "sync_readdir"] # SSHFS Cache if cache_timeout == 0: - cmdline += [ '-o', 'dir_cache=no' ] + cmdline += ["-o", "dir_cache=no"] else: - cmdline += [ '-o', 'dcache_timeout=%d' % cache_timeout, - '-o', 'dir_cache=yes' ] + cmdline += ["-o", f"dcache_timeout={cache_timeout}", "-o", "dir_cache=yes"] # FUSE Cache - cmdline += [ '-o', 'entry_timeout=0', - '-o', 'attr_timeout=0' ] + cmdline += ["-o", "entry_timeout=0", "-o", "attr_timeout=0"] if multiconn: - cmdline += [ '-o', 'max_conns=3' ] + cmdline += ["-o", "max_conns=3"] - new_env = dict(os.environ) # copy, don't modify + new_env = dict(os.environ) # copy, don't modify # Abort on warnings from glib - new_env['G_DEBUG'] = 'fatal-warnings' + new_env["G_DEBUG"] = "fatal-warnings" mount_process = subprocess.Popen(cmdline, env=new_env) try: @@ -114,30 +159,20 @@ def test_sshfs(tmpdir, debug, cache_timeout, sync_rd, multiconn, capfd): tst_truncate_path(mnt_dir) tst_truncate_fd(mnt_dir) tst_open_unlink(mnt_dir) - except: + except Exception as exc: cleanup(mount_process, mnt_dir) - raise + raise exc else: umount(mount_process, mnt_dir) -@contextmanager -def os_open(name, flags): - fd = os.open(name, flags) - try: - yield fd - finally: - os.close(fd) - -def os_create(name): - os.close(os.open(name, os.O_CREAT | os.O_RDWR)) def tst_unlink(src_dir, mnt_dir, cache_timeout): name = name_generator() fullname = mnt_dir + "/" + name - with open(pjoin(src_dir, name), 'wb') as fh: - fh.write(b'hello') + with open(pjoin(src_dir, name), "wb") as fh: + fh.write(b"hello") if cache_timeout: - safe_sleep(cache_timeout+1) + safe_sleep(cache_timeout + 1) assert name in os.listdir(mnt_dir) os.unlink(fullname) with pytest.raises(OSError) as exc_info: @@ -146,22 +181,24 @@ def tst_unlink(src_dir, mnt_dir, cache_timeout): assert name not in os.listdir(mnt_dir) assert name not in os.listdir(src_dir) + def tst_mkdir(mnt_dir): dirname = name_generator() fullname = mnt_dir + "/" + dirname os.mkdir(fullname) fstat = os.stat(fullname) assert stat.S_ISDIR(fstat.st_mode) - assert os.listdir(fullname) == [] - assert fstat.st_nlink in (1,2) + assert os.listdir(fullname) == [] + assert fstat.st_nlink in (1, 2) assert dirname in os.listdir(mnt_dir) + def tst_rmdir(src_dir, mnt_dir, cache_timeout): name = name_generator() fullname = mnt_dir + "/" + name os.mkdir(pjoin(src_dir, name)) if cache_timeout: - safe_sleep(cache_timeout+1) + safe_sleep(cache_timeout + 1) assert name in os.listdir(mnt_dir) os.rmdir(fullname) with pytest.raises(OSError) as exc_info: @@ -170,6 +207,7 @@ def tst_rmdir(src_dir, mnt_dir, cache_timeout): assert name not in os.listdir(mnt_dir) assert name not in os.listdir(src_dir) + def tst_symlink(mnt_dir): linkname = name_generator() fullname = mnt_dir + "/" + linkname @@ -180,6 +218,7 @@ def tst_symlink(mnt_dir): assert fstat.st_nlink == 1 assert linkname in os.listdir(mnt_dir) + def tst_create(mnt_dir): name = name_generator() fullname = pjoin(mnt_dir, name) @@ -197,6 +236,7 @@ def tst_create(mnt_dir): assert fstat.st_nlink == 1 assert fstat.st_size == 0 + def tst_chown(mnt_dir): filename = pjoin(mnt_dir, name_generator()) os.mkdir(filename) @@ -216,37 +256,38 @@ def tst_chown(mnt_dir): assert fstat.st_uid == uid_new assert fstat.st_gid == gid_new + def tst_open_read(src_dir, mnt_dir): name = name_generator() - with open(pjoin(src_dir, name), 'wb') as fh_out, \ - open(TEST_FILE, 'rb') as fh_in: + with open(pjoin(src_dir, name), "wb") as fh_out, open(TEST_FILE, "rb") as fh_in: shutil.copyfileobj(fh_in, fh_out) assert filecmp.cmp(pjoin(mnt_dir, name), TEST_FILE, False) + def tst_open_write(src_dir, mnt_dir): name = name_generator() - fd = os.open(pjoin(src_dir, name), - os.O_CREAT | os.O_RDWR) + fd = os.open(pjoin(src_dir, name), os.O_CREAT | os.O_RDWR) os.close(fd) fullname = pjoin(mnt_dir, name) - with open(fullname, 'wb') as fh_out, \ - open(TEST_FILE, 'rb') as fh_in: + with open(fullname, "wb") as fh_out, open(TEST_FILE, "rb") as fh_in: shutil.copyfileobj(fh_in, fh_out) assert filecmp.cmp(fullname, TEST_FILE, False) + def tst_append(src_dir, mnt_dir): name = name_generator() os_create(pjoin(src_dir, name)) fullname = pjoin(mnt_dir, name) with os_open(fullname, os.O_WRONLY) as fd: - os.write(fd, b'foo\n') - with os_open(fullname, os.O_WRONLY|os.O_APPEND) as fd: - os.write(fd, b'bar\n') + os.write(fd, b"foo\n") + with os_open(fullname, os.O_WRONLY | os.O_APPEND) as fd: + os.write(fd, b"bar\n") + + with open(fullname, "rb") as fh: + assert fh.read() == b"foo\nbar\n" - with open(fullname, 'rb') as fh: - assert fh.read() == b'foo\nbar\n' def tst_seek(src_dir, mnt_dir): name = name_generator() @@ -254,20 +295,21 @@ def tst_seek(src_dir, mnt_dir): fullname = pjoin(mnt_dir, name) with os_open(fullname, os.O_WRONLY) as fd: os.lseek(fd, 1, os.SEEK_SET) - os.write(fd, b'foobar\n') + os.write(fd, b"foobar\n") with os_open(fullname, os.O_WRONLY) as fd: os.lseek(fd, 4, os.SEEK_SET) - os.write(fd, b'com') + os.write(fd, b"com") + + with open(fullname, "rb") as fh: + assert fh.read() == b"\0foocom\n" - with open(fullname, 'rb') as fh: - assert fh.read() == b'\0foocom\n' def tst_open_unlink(mnt_dir): name = pjoin(mnt_dir, name_generator()) - data1 = b'foo' - data2 = b'bar' + data1 = b"foo" + data2 = b"bar" fullname = pjoin(mnt_dir, name) - with open(fullname, 'wb+', buffering=0) as fh: + with open(fullname, "wb+", buffering=0) as fh: fh.write(data1) os.unlink(fullname) with pytest.raises(OSError) as exc_info: @@ -276,11 +318,13 @@ def tst_open_unlink(mnt_dir): assert name not in os.listdir(mnt_dir) fh.write(data2) fh.seek(0) - assert fh.read() == data1+data2 + assert fh.read() == data1 + data2 + def tst_statvfs(mnt_dir): os.statvfs(mnt_dir) + def tst_link(mnt_dir, cache_timeout): name1 = pjoin(mnt_dir, name_generator()) name2 = pjoin(mnt_dir, name_generator()) @@ -302,8 +346,16 @@ def tst_link(mnt_dir, cache_timeout): fstat1 = os.lstat(name1) fstat2 = os.lstat(name2) - for attr in ('st_mode', 'st_dev', 'st_uid', 'st_gid', - 'st_size', 'st_atime', 'st_mtime', 'st_ctime'): + for attr in ( + "st_mode", + "st_dev", + "st_uid", + "st_gid", + "st_size", + "st_atime", + "st_mtime", + "st_ctime", + ): assert getattr(fstat1, attr) == getattr(fstat2, attr) assert os.path.basename(name2) in os.listdir(mnt_dir) assert filecmp.cmp(name1, name2, False) @@ -316,6 +368,7 @@ def tst_link(mnt_dir, cache_timeout): os.unlink(name1) + def tst_readdir(src_dir, mnt_dir): newdir = name_generator() src_newdir = pjoin(src_dir, newdir) @@ -331,7 +384,7 @@ def tst_readdir(src_dir, mnt_dir): listdir_is = os.listdir(mnt_newdir) listdir_is.sort() - listdir_should = [ os.path.basename(file_), os.path.basename(subdir) ] + listdir_should = [os.path.basename(file_), os.path.basename(subdir)] listdir_should.sort() assert listdir_is == listdir_should @@ -340,11 +393,12 @@ def tst_readdir(src_dir, mnt_dir): os.rmdir(subdir) os.rmdir(src_newdir) + def tst_truncate_path(mnt_dir): assert len(TEST_DATA) > 1024 filename = pjoin(mnt_dir, name_generator()) - with open(filename, 'wb') as fh: + with open(filename, "wb") as fh: fh.write(TEST_DATA) fstat = os.stat(filename) @@ -354,21 +408,22 @@ def tst_truncate_path(mnt_dir): # Add zeros at the end os.truncate(filename, size + 1024) assert os.stat(filename).st_size == size + 1024 - with open(filename, 'rb') as fh: + with open(filename, "rb") as fh: assert fh.read(size) == TEST_DATA - assert fh.read(1025) == b'\0' * 1024 + assert fh.read(1025) == b"\0" * 1024 # Truncate data os.truncate(filename, size - 1024) assert os.stat(filename).st_size == size - 1024 - with open(filename, 'rb') as fh: - assert fh.read(size) == TEST_DATA[:size-1024] + with open(filename, "rb") as fh: + assert fh.read(size) == TEST_DATA[: size - 1024] os.unlink(filename) + def tst_truncate_fd(mnt_dir): assert len(TEST_DATA) > 1024 - with NamedTemporaryFile('w+b', 0, dir=mnt_dir) as fh: + with NamedTemporaryFile("w+b", 0, dir=mnt_dir) as fh: fd = fh.fileno() fh.write(TEST_DATA) fstat = os.fstat(fd) @@ -380,13 +435,14 @@ def tst_truncate_fd(mnt_dir): assert os.fstat(fd).st_size == size + 1024 fh.seek(0) assert fh.read(size) == TEST_DATA - assert fh.read(1025) == b'\0' * 1024 + assert fh.read(1025) == b"\0" * 1024 # Truncate data os.ftruncate(fd, size - 1024) assert os.fstat(fd).st_size == size - 1024 fh.seek(0) - assert fh.read(size) == TEST_DATA[:size-1024] + assert fh.read(size) == TEST_DATA[: size - 1024] + def tst_utimens(mnt_dir, tol=0): filename = pjoin(mnt_dir, name_generator()) @@ -395,20 +451,21 @@ def tst_utimens(mnt_dir, tol=0): atime = fstat.st_atime + 42.28 mtime = fstat.st_mtime - 42.23 - if sys.version_info < (3,3): + if sys.version_info < (3, 3): os.utime(filename, (atime, mtime)) else: - atime_ns = fstat.st_atime_ns + int(42.28*1e9) - mtime_ns = fstat.st_mtime_ns - int(42.23*1e9) + atime_ns = fstat.st_atime_ns + int(42.28 * 1e9) + mtime_ns = fstat.st_mtime_ns - int(42.23 * 1e9) os.utime(filename, None, ns=(atime_ns, mtime_ns)) fstat = os.lstat(filename) assert abs(fstat.st_atime - atime) < tol assert abs(fstat.st_mtime - mtime) < tol - if sys.version_info >= (3,3): - assert abs(fstat.st_atime_ns - atime_ns) < tol*1e9 - assert abs(fstat.st_mtime_ns - mtime_ns) < tol*1e9 + if sys.version_info >= (3, 3): + assert abs(fstat.st_atime_ns - atime_ns) < tol * 1e9 + assert abs(fstat.st_mtime_ns - mtime_ns) < tol * 1e9 + def tst_utimens_now(mnt_dir): fullname = pjoin(mnt_dir, name_generator()) @@ -422,17 +479,18 @@ def tst_utimens_now(mnt_dir): assert fstat.st_atime != 0 assert fstat.st_mtime != 0 + def tst_passthrough(src_dir, mnt_dir, cache_timeout): name = name_generator() src_name = pjoin(src_dir, name) mnt_name = pjoin(src_dir, name) assert name not in os.listdir(src_dir) assert name not in os.listdir(mnt_dir) - with open(src_name, 'w') as fh: - fh.write('Hello, world') + with open(src_name, "w") as fh: + fh.write("Hello, world") assert name in os.listdir(src_dir) if cache_timeout: - safe_sleep(cache_timeout+1) + safe_sleep(cache_timeout + 1) assert name in os.listdir(mnt_dir) assert os.stat(src_name) == os.stat(mnt_name) @@ -441,10 +499,10 @@ def tst_passthrough(src_dir, mnt_dir, cache_timeout): mnt_name = pjoin(src_dir, name) assert name not in os.listdir(src_dir) assert name not in os.listdir(mnt_dir) - with open(mnt_name, 'w') as fh: - fh.write('Hello, world') + with open(mnt_name, "w") as fh: + fh.write("Hello, world") assert name in os.listdir(src_dir) if cache_timeout: - safe_sleep(cache_timeout+1) + safe_sleep(cache_timeout + 1) assert name in os.listdir(mnt_dir) assert os.stat(src_name) == os.stat(mnt_name) diff --git a/test/util.py b/test/util.py index 261a1c68..ce443899 100644 --- a/test/util.py +++ b/test/util.py @@ -5,25 +5,42 @@ import stat import time from os.path import join as pjoin +from contextlib import contextmanager -basename = pjoin(os.path.dirname(__file__), '..') +basename = pjoin(os.path.dirname(__file__), "..") -def wait_for_mount(mount_process, mnt_dir, - test_fn=os.path.ismount): + +def os_create(name): + os.close(os.open(name, os.O_CREAT | os.O_RDWR)) + + +@contextmanager +def os_open(name, flags): + fd = os.open(name, flags) + try: + yield fd + finally: + os.close(fd) + + +def wait_for_mount(mount_process, mnt_dir, test_fn=os.path.ismount): elapsed = 0 while elapsed < 30: if test_fn(mnt_dir): return True if mount_process.poll() is not None: - pytest.fail('file system process terminated prematurely') + pytest.fail("file system process terminated prematurely") time.sleep(0.1) elapsed += 0.1 pytest.fail("mountpoint failed to come up") + def cleanup(mount_process, mnt_dir): - subprocess.call(['fusermount', '-z', '-u', mnt_dir], - stdout=subprocess.DEVNULL, - stderr=subprocess.STDOUT) + subprocess.call( + ["fusermount", "-z", "-u", mnt_dir], + stdout=subprocess.DEVNULL, + stderr=subprocess.STDOUT, + ) mount_process.terminate() try: mount_process.wait(1) @@ -32,7 +49,7 @@ def cleanup(mount_process, mnt_dir): def umount(mount_process, mnt_dir): - subprocess.check_call(['fusermount3', '-z', '-u', mnt_dir ]) + subprocess.check_call(["fusermount3", "-z", "-u", mnt_dir]) assert not os.path.ismount(mnt_dir) # Give mount process a little while to terminate. Popen.wait(timeout) @@ -43,18 +60,19 @@ def umount(mount_process, mnt_dir): if code is not None: if code == 0: return - pytest.fail('file system process terminated with code %s' % (code,)) + pytest.fail(f"file system process terminated with code {code}") time.sleep(0.1) elapsed += 0.1 - pytest.fail('mount process did not terminate') + pytest.fail("mount process did not terminate") + def safe_sleep(secs): - '''Like time.sleep(), but sleep for at least *secs* + """Like time.sleep(), but sleep for at least *secs* `time.sleep` may sleep less than the given period if a signal is received. This function ensures that we sleep for at least the desired time. - ''' + """ now = time.time() end = now + secs @@ -62,24 +80,27 @@ def safe_sleep(secs): time.sleep(end - now) now = time.time() + def fuse_test_marker(): - '''Return a pytest.marker that indicates FUSE availability + """Return a pytest.marker that indicates FUSE availability If system/user/environment does not support FUSE, return a `pytest.mark.skip` object with more details. If FUSE is supported, return `pytest.mark.uses_fuse()`. - ''' + """ - skip = lambda x: pytest.mark.skip(reason=x) + def skip(reason: str): + return pytest.mark.skip(reason=reason) - with subprocess.Popen(['which', 'fusermount'], stdout=subprocess.PIPE, - universal_newlines=True) as which: + with subprocess.Popen( + ["which", "fusermount"], stdout=subprocess.PIPE, universal_newlines=True + ) as which: fusermount_path = which.communicate()[0].strip() if not fusermount_path or which.returncode != 0: return skip("Can't find fusermount executable") - if not os.path.exists('/dev/fuse'): + if not os.path.exists("/dev/fuse"): return skip("FUSE kernel module does not seem to be loaded") if os.getuid() == 0: @@ -87,20 +108,24 @@ def fuse_test_marker(): mode = os.stat(fusermount_path).st_mode if mode & stat.S_ISUID == 0: - return skip('fusermount executable not setuid, and we are not root.') + return skip("fusermount executable not setuid, and we are not root.") try: - fd = os.open('/dev/fuse', os.O_RDWR) + fd = os.open("/dev/fuse", os.O_RDWR) except OSError as exc: - return skip('Unable to open /dev/fuse: %s' % exc.strerror) + return skip(f"Unable to open /dev/fuse: {exc.strerror}") else: os.close(fd) return pytest.mark.uses_fuse() + # Use valgrind if requested -if os.environ.get('TEST_WITH_VALGRIND', 'no').lower().strip() \ - not in ('no', 'false', '0'): - base_cmdline = [ 'valgrind', '-q', '--' ] +if os.environ.get("TEST_WITH_VALGRIND", "no").lower().strip() not in ( + "no", + "false", + "0", +): + base_cmdline = ["valgrind", "-q", "--"] else: base_cmdline = []