diff --git a/adbutils/_device.py b/adbutils/_device.py index edebd44..9746572 100644 --- a/adbutils/_device.py +++ b/adbutils/_device.py @@ -46,7 +46,7 @@ from ._proto import * from ._proto import StrOrPathLike, AppInfo from ._utils import (APKReader, ReadProgress, StopEvent, adb_path, - get_free_port, humanize, list2cmdline) + get_free_port, humanize, list2cmdline, append_path) from ._version import __version__ from .errors import AdbError, AdbInstallError @@ -585,7 +585,23 @@ def read_text(self, path: str, encoding: str = 'utf-8') -> str: """ read content of a file """ return self.read_bytes(path).decode(encoding=encoding) - def pull(self, src: str, dst: typing.Union[str, pathlib.Path]) -> int: + def pull(self, src: str, dst: typing.Union[str, pathlib.Path], exist_ok: bool = False) -> int: + """ + Pull file or directory from device:src to local:dst + + Returns: + total file size pulled + """ + src_file_info = self.stat(src) + is_src_file = src_file_info.mode & stat.S_IFREG != 0 + + if is_src_file: + return self.pull_file(src, dst) + else: + return self.pull_dir(src, dst, exist_ok) + + + def pull_file(self, src: str, dst: typing.Union[str, pathlib.Path]) -> int: """ Pull file from device:src to local:dst @@ -600,6 +616,54 @@ def pull(self, src: str, dst: typing.Union[str, pathlib.Path]) -> int: f.write(chunk) size += len(chunk) return size + + def pull_dir(self, src: str, dst: typing.Union[str, pathlib.Path], exist_ok: bool = True) -> int: + """Pull directory from device:src into local:dst + + Returns: + total files size pulled + """ + + def rec_pull_contents(src: str, dst: typing.Union[str, pathlib.Path], exist_ok: bool = True) -> int: + s = 0 + items = list(self.iter_directory(src)) + + items = list(filter( + lambda i: i.path != '.' and i.path != '..', + items + )) + + dirs = list( + filter( + lambda f: stat.S_IFDIR & f.mode != 0, + items + )) + files = list( + filter( + lambda f: stat.S_IFREG & f.mode != 0, + items + )) + + for dir in dirs: + new_src:str = append_path(src, dir.path) + new_dst:pathlib.Path = pathlib.Path(append_path(dst, dir.path)) + os.makedirs(new_dst, exist_ok=exist_ok) + s += rec_pull_contents(new_src, new_dst ,exist_ok=exist_ok) + + for file in files: + new_src:str = append_path(src, file.path) + new_dst:str = append_path(dst, file.path) + s += self.pull_file(new_src, new_dst) + + return s + + + if isinstance(dst, str): + dst = pathlib.Path(dst) + os.makedirs(dst, exist_ok=exist_ok) + + return rec_pull_contents(src, dst, exist_ok=exist_ok) + class AbstractScreenRecord: @@ -1226,6 +1290,10 @@ def remove(self, path: str): """ rm device file """ self.shell(["rm", path]) + def remove_dir(self, path: str): + """ rm -r directory""" + self.shell(["rm", "-r", path]) + def __get_screenrecord_impl(self) -> AbstractScreenRecord: if self._record_client: return self._record_client diff --git a/adbutils/_utils.py b/adbutils/_utils.py index fba791e..d551511 100644 --- a/adbutils/_utils.py +++ b/adbutils/_utils.py @@ -10,6 +10,8 @@ import time import typing import zipfile +import typing +import pathlib import whichcraft from apkutils2.axml.axmlparser import AXML @@ -18,6 +20,13 @@ MB = 1024 * 1024 + +def append_path(base: typing.Union[str, pathlib.Path], addition: str) -> str: + if isinstance(base, pathlib.Path): + return (base / addition).as_posix() + else: + return base + '/' + addition if base[-1] != '/' else base + addition + def humanize(n: int) -> str: return '%.1f MB' % (float(n) / MB) diff --git a/tests/conftest.py b/tests/conftest.py index b606dd4..4ee53db 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -15,3 +15,40 @@ def device_tmp_path(device: AdbDevice): tmp_path = "/data/local/tmp/Hi-世界.txt" yield tmp_path device.remove(tmp_path) + +@pytest.fixture +def device_tmp_dir_path(device: AdbDevice): + tmp_dir_path = "/sdcard/test_d" + yield tmp_dir_path + device.remove_dir(tmp_dir_path) + +@pytest.fixture +def local_src_in_dir(tmpdir): + + tmpdir.join('1.txt').write('1\n') + tmpdir.join('2.txt').write('2\n') + tmpdir.join('3.txt').write('3\n') + + a = tmpdir.mkdir('a') + a.join('a1.txt').write('a1\n') + + aa = a.mkdir('aa') + aa.join('aa1.txt').write('aa1\n') + + ab = a.mkdir('ab') + ab.join('ab1.txt').write('ab1\n') + ab.join('ab2.txt').write('ab2\n') + + b = tmpdir.mkdir('b') + b.join('b1.txt').write('b1\n') + + c = tmpdir.mkdir('c') + ca = c.mkdir('ca') + ca.join('ca1.txt').write('ca1\n') + + caa = ca.mkdir('caa') + caa.join('caa1.txt').write('caa1\n') + + cb = c.mkdir('cb') + + yield tmpdir diff --git a/tests/test_device.py b/tests/test_device.py index 562eb03..47db9c0 100644 --- a/tests/test_device.py +++ b/tests/test_device.py @@ -4,10 +4,12 @@ 效果生效与否不好判定(例如屏幕亮暗),部分用例仅作冒烟测试 """ +import os import io import pathlib import re import time +import filecmp import pytest @@ -132,6 +134,29 @@ def test_sync_pull_push(device: AdbDevice, device_tmp_path, tmp_path: pathlib.Pa data += chunk assert b"Hello Android" == data +def test_sync_pull_file_push(device: AdbDevice, device_tmp_path, tmp_path: pathlib.Path): + src = io.BytesIO(b"Hello 1") + device.sync.push(src, device_tmp_path) + assert b"Hello 1" == device.sync.read_bytes(device_tmp_path) + + device.sync.push(b"Hello 12", device_tmp_path) + assert "Hello 12" == device.sync.read_text(device_tmp_path) + + target_path = tmp_path / "hi.txt" + target_path.write_text("Hello Android") + dst_path = tmp_path / "dst.txt" + dst_path.unlink(missing_ok=True) + + device.sync.push(target_path, device_tmp_path) + assert "Hello Android" == device.sync.read_text(device_tmp_path) + device.sync.pull_file(device_tmp_path, dst_path) + assert "Hello Android" == dst_path.read_text(encoding="utf-8") + + data = b"" + for chunk in device.sync.iter_content(device_tmp_path): + data += chunk + assert b"Hello Android" == data + def test_screenshot(device: AdbDevice): im = device.screenshot() @@ -193,4 +218,68 @@ def test_logcat(device: AdbDevice, tmp_path: pathlib.Path): time.sleep(.1) logcat.stop() assert logcat_path.exists() - assert re.compile(r"I/TAG.*hello").search(logcat_path.read_text(encoding="utf-8")) \ No newline at end of file + assert re.compile(r"I/TAG.*hello").search(logcat_path.read_text(encoding="utf-8")) + + +# todo: make independent of already present stuff on the phone +def test_pull_push_dirs( + device: AdbDevice, + device_tmp_dir_path:str, + local_src_in_dir:pathlib.Path, + tmp_path:pathlib.Path, + ): + + def are_dir_trees_equal(dir1, dir2): + """ + Compare two directories recursively. Files in each directory are + assumed to be equal if their names and contents are equal. + + NB: retreived from: https://stackoverflow.com/a/6681395 + + @param dir1: First directory path + @param dir2: Second directory path + + @return: True if the directory trees are the same and + there were no errors while accessing the directories or files, + False otherwise. + """ + + dirs_cmp = filecmp.dircmp(dir1, dir2) + if len(dirs_cmp.left_only)>0 or len(dirs_cmp.right_only)>0 or \ + len(dirs_cmp.funny_files)>0: + return False + (_, mismatch, errors) = filecmp.cmpfiles( + dir1, dir2, dirs_cmp.common_files, shallow=False) + if len(mismatch)>0 or len(errors)>0: + return False + for common_dir in dirs_cmp.common_dirs: + new_dir1 = os.path.join(dir1, common_dir) + new_dir2 = os.path.join(dir2, common_dir) + if not are_dir_trees_equal(new_dir1, new_dir2): + return False + return True + + local_src_out_dir1 = tmp_path / 'dir1' + local_src_out_dir2 = tmp_path / 'dir2' + + device.push(local_src_in_dir, device_tmp_dir_path) + + device.sync.pull_dir(device_tmp_dir_path, local_src_out_dir1) + + assert local_src_out_dir1.exists() + assert local_src_out_dir1.is_dir() + + are_dir_trees_equal(local_src_in_dir, local_src_out_dir1) + + + device.sync.pull(device_tmp_dir_path, local_src_out_dir2) + + assert local_src_out_dir2.exists() + assert local_src_out_dir2.is_dir() + + are_dir_trees_equal(local_src_in_dir, local_src_out_dir2) + + + + +