Skip to content

Commit

Permalink
Replace more cascading if with match.
Browse files Browse the repository at this point in the history
Another CL to cleanup a few conditionals with a cleaner `match` block.

PiperOrigin-RevId: 558238534
  • Loading branch information
kenjitoyama authored and copybara-github committed Aug 18, 2023
1 parent d4a30e9 commit c279593
Show file tree
Hide file tree
Showing 3 changed files with 171 additions and 145 deletions.
259 changes: 139 additions & 120 deletions android_env/components/adb_call_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,24 +261,25 @@ def _install_apk(
location_type = install_apk.WhichOneof('location')
logging.info('location_type: %s', location_type)

if location_type == 'filesystem':
fpath = install_apk.filesystem.path
if not os.path.exists(fpath):
response.status = adb_pb2.AdbResponse.Status.INTERNAL_ERROR
response.error_message = f'Could not find local_apk_path: {fpath}'
match location_type:
case 'filesystem':
fpath = install_apk.filesystem.path
if not os.path.exists(fpath):
response.status = adb_pb2.AdbResponse.Status.INTERNAL_ERROR
response.error_message = f'Could not find local_apk_path: {fpath}'
return response
case 'blob':
with tempfile.NamedTemporaryFile(
dir=self._tmp_dir, suffix='.apk', delete=False
) as f:
fpath = f.name
f.write(install_apk.blob.contents)
case _:
response.status = adb_pb2.AdbResponse.Status.FAILED_PRECONDITION
response.error_message = (
f'Unsupported `install_apk.location` type: {location_type}'
)
return response
elif location_type == 'blob':
with tempfile.NamedTemporaryFile(
dir=self._tmp_dir, suffix='.apk', delete=False
) as f:
fpath = f.name
f.write(install_apk.blob.contents)
else:
response.status = adb_pb2.AdbResponse.Status.FAILED_PRECONDITION
response.error_message = (
f'Unsupported `install_apk.location` type: {location_type}'
)
return response

response, _ = self._execute_command(
['install', '-r', '-t', '-g', fpath], timeout=timeout
Expand Down Expand Up @@ -647,64 +648,78 @@ def _handle_settings(
namespace = adb_pb2.AdbRequest.SettingsRequest.Namespace.Name(
request.name_space).lower()

verb = request.WhichOneof('verb')
if verb == 'get':
get = request.get
if not get.key:
response.status = adb_pb2.AdbResponse.Status.FAILED_PRECONDITION
response.error_message = (
f'Empty SettingsRequest.get.key. Got: {request}.')
return response
response, command_output = self._execute_command(
['shell', 'settings', 'get', namespace, get.key], timeout=timeout)
response.settings.output = command_output
elif verb == 'put':
put = request.put
if not put.key or not put.value:
response.status = adb_pb2.AdbResponse.Status.FAILED_PRECONDITION
response.error_message = (
f'Empty SettingsRequest.put key or value. Got: {request}.')
return response
response, command_output = self._execute_command(
['shell', 'settings', 'put', namespace, put.key, put.value],
timeout=timeout)
response.settings.output = command_output
elif verb == 'delete_key':
delete = request.delete_key
if not delete.key:
match request.WhichOneof('verb'):
case 'get':
get = request.get
if not get.key:
response.status = adb_pb2.AdbResponse.Status.FAILED_PRECONDITION
response.error_message = (
f'Empty SettingsRequest.get.key. Got: {request}.'
)
return response
response, command_output = self._execute_command(
['shell', 'settings', 'get', namespace, get.key], timeout=timeout
)
response.settings.output = command_output
case 'put':
put = request.put
if not put.key or not put.value:
response.status = adb_pb2.AdbResponse.Status.FAILED_PRECONDITION
response.error_message = (
f'Empty SettingsRequest.put key or value. Got: {request}.'
)
return response
response, command_output = self._execute_command(
['shell', 'settings', 'put', namespace, put.key, put.value],
timeout=timeout,
)
response.settings.output = command_output
case 'delete_key':
delete = request.delete_key
if not delete.key:
response.status = adb_pb2.AdbResponse.Status.FAILED_PRECONDITION
response.error_message = (
f'Empty SettingsRequest.delete_key.key. Got: {request}.'
)
return response
response, command_output = self._execute_command(
['shell', 'settings', 'delete', namespace, delete.key],
timeout=timeout,
)
response.settings.output = command_output
case 'reset':
reset = request.reset
# At least one of `package_name` or `mode` should be given.
if (
not reset.package_name
and reset.mode
== adb_pb2.AdbRequest.SettingsRequest.Reset.Mode.UNKNOWN
):
response.status = adb_pb2.AdbResponse.Status.FAILED_PRECONDITION
response.error_message = (
'At least one of SettingsRequest.reset package_name or mode'
f' should be given. Got: {request}.'
)
return response

mode = adb_pb2.AdbRequest.SettingsRequest.Reset.Mode.Name(
reset.mode
).lower()
arg = reset.package_name or mode
response, command_output = self._execute_command(
['shell', 'settings', 'reset', namespace, arg], timeout=timeout
)
response.settings.output = command_output
case 'list':
response, command_output = self._execute_command(
['shell', 'settings', 'list', namespace], timeout=timeout
)
response.settings.output = command_output
case _:
response.status = adb_pb2.AdbResponse.Status.FAILED_PRECONDITION
response.error_message = (
f'Empty SettingsRequest.delete_key.key. Got: {request}.')
return response
response, command_output = self._execute_command(
['shell', 'settings', 'delete', namespace, delete.key],
timeout=timeout)
response.settings.output = command_output
elif verb == 'reset':
reset = request.reset
# At least one of `package_name` or `mode` should be given.
if (not reset.package_name and
reset.mode == adb_pb2.AdbRequest.SettingsRequest.Reset.Mode.UNKNOWN):
response.status = adb_pb2.AdbResponse.Status.FAILED_PRECONDITION
response.error_message = (
'At least one of SettingsRequest.reset package_name or mode should '
f'be given. Got: {request}.')
return response

mode = adb_pb2.AdbRequest.SettingsRequest.Reset.Mode.Name(
reset.mode).lower()
arg = reset.package_name or mode
response, command_output = self._execute_command(
['shell', 'settings', 'reset', namespace, arg], timeout=timeout)
response.settings.output = command_output
elif verb == 'list':
response, command_output = self._execute_command(
['shell', 'settings', 'list', namespace], timeout=timeout)
response.settings.output = command_output
else:
response.status = adb_pb2.AdbResponse.Status.FAILED_PRECONDITION
response.error_message = (
f'Unknown SettingsRequest.verb. Got: {request}.')
f'Unknown SettingsRequest.verb. Got: {request}.'
)

return response

Expand Down Expand Up @@ -744,54 +759,58 @@ def _handle_package_manager(
request = request.package_manager
response = adb_pb2.AdbResponse()

verb = request.WhichOneof('verb')
if verb == 'list':
what = request.list.WhichOneof('what')
response, output = self._execute_command(['shell', 'pm', 'list', what],
timeout=timeout)

if output:
items = output.decode('utf-8').split()
# Remove prefix for each item.
prefix = {
'features': 'feature:',
'libraries': 'library:',
'packages': 'package:',
}[what]
items = [x[len(prefix):] for x in items if x.startswith(prefix)]
response.package_manager.list.items.extend(items)
response.package_manager.output = output
elif verb == 'clear':
package_name = request.clear.package_name
if not package_name:
response.status = adb_pb2.AdbResponse.Status.FAILED_PRECONDITION
response.error_message = (
f'Empty PackageManagerRequest.clear.package_name. Got: {request}.')
return response

args = ['shell', 'pm', 'clear', package_name]
if request.clear.user_id:
args.insert(3, '-f')
args.insert(4, request.clear.user_id)
response, response.package_manager.output = self._execute_command(
args, timeout=timeout)
elif verb == 'grant':
grant = request.grant
if not grant.package_name:
response.status = adb_pb2.AdbResponse.Status.FAILED_PRECONDITION
response.error_message = ('`grant.package_name` cannot be empty.')
return response

if not grant.permissions:
response.status = adb_pb2.AdbResponse.Status.FAILED_PRECONDITION
response.error_message = ('`grant.permissions` cannot be empty.')
return response

for permission in grant.permissions:
logging.info('Granting permission: %r', permission)
match request.WhichOneof('verb'):
case 'list':
what = request.list.WhichOneof('what')
response, output = self._execute_command(
['shell', 'pm', 'list', what], timeout=timeout
)

if output:
items = output.decode('utf-8').split()
# Remove prefix for each item.
prefix = {
'features': 'feature:',
'libraries': 'library:',
'packages': 'package:',
}[what]
items = [x[len(prefix) :] for x in items if x.startswith(prefix)]
response.package_manager.list.items.extend(items)
response.package_manager.output = output
case 'clear':
package_name = request.clear.package_name
if not package_name:
response.status = adb_pb2.AdbResponse.Status.FAILED_PRECONDITION
response.error_message = (
f'Empty PackageManagerRequest.clear.package_name. Got: {request}.'
)
return response

args = ['shell', 'pm', 'clear', package_name]
if request.clear.user_id:
args.insert(3, '-f')
args.insert(4, request.clear.user_id)
response, response.package_manager.output = self._execute_command(
['shell', 'pm', 'grant', grant.package_name, permission],
timeout=timeout)
args, timeout=timeout
)
case 'grant':
grant = request.grant
if not grant.package_name:
response.status = adb_pb2.AdbResponse.Status.FAILED_PRECONDITION
response.error_message = '`grant.package_name` cannot be empty.'
return response

if not grant.permissions:
response.status = adb_pb2.AdbResponse.Status.FAILED_PRECONDITION
response.error_message = '`grant.permissions` cannot be empty.'
return response

for permission in grant.permissions:
logging.info('Granting permission: %r', permission)
response, response.package_manager.output = self._execute_command(
['shell', 'pm', 'grant', grant.package_name, permission],
timeout=timeout,
)

return response

Expand Down
32 changes: 18 additions & 14 deletions android_env/components/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,20 +387,24 @@ def _send_action_to_simulator(self, action: dict[str, np.ndarray]) -> None:
"""

try:
# If the action is a TOUCH or LIFT, send a touch event to the simulator.
if (action['action_type'] == action_type_lib.ActionType.TOUCH or
action['action_type'] == action_type_lib.ActionType.LIFT):
prepared_action = self._prepare_touch_action(action)
self._simulator.send_touch(prepared_action)
# If the action is a key event, send a key event to the simulator.
elif action['action_type'] == action_type_lib.ActionType.KEYDOWN:
self._simulator.send_key(
action['keycode'].item(0), event_type='keydown'
)
elif action['action_type'] == action_type_lib.ActionType.KEYUP:
self._simulator.send_key(action['keycode'].item(0), event_type='keyup')
elif action['action_type'] == action_type_lib.ActionType.KEYPRESS:
self._simulator.send_key(action['keycode'].item(0), event_type='keypress')
match action['action_type']:
# If the action is a TOUCH or LIFT, send a touch event to the simulator.
case action_type_lib.ActionType.TOUCH | action_type_lib.ActionType.LIFT:
prepared_action = self._prepare_touch_action(action)
self._simulator.send_touch(prepared_action)
# If the action is a key event, send a key event to the simulator.
case action_type_lib.ActionType.KEYDOWN:
self._simulator.send_key(
action['keycode'].item(0), event_type='keydown'
)
case action_type_lib.ActionType.KEYUP:
self._simulator.send_key(
action['keycode'].item(0), event_type='keyup'
)
case action_type_lib.ActionType.KEYPRESS:
self._simulator.send_key(
action['keycode'].item(0), event_type='keypress'
)
except (socket.error, errors.SendActionError):
logging.exception('Unable to execute action. Restarting simulator.')
self._stats['relaunch_count_execute_action'] += 1
Expand Down
25 changes: 14 additions & 11 deletions android_env/components/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,20 @@ def transpose_pixels(frame: np.ndarray) -> np.ndarray:

def orient_pixels(frame: np.ndarray, orientation: int) -> np.ndarray:
"""Rotates screen pixels according to the given orientation."""
if orientation == 0: # PORTRAIT_90
return frame
elif orientation == 1: # LANDSCAPE_90
return np.rot90(frame, k=3, axes=(0, 1))
elif orientation == 2: # PORTRAIT_180
return np.rot90(frame, k=2, axes=(0, 1))
elif orientation == 3: # LANDSCAPE_270
return np.rot90(frame, k=1, axes=(0, 1))
else:
raise ValueError(
'Orientation must be an integer in [0, 3] but is %r' % orientation)

match orientation:
case 0: # PORTRAIT_90
return frame
case 1: # LANDSCAPE_90
return np.rot90(frame, k=3, axes=(0, 1))
case 2: # PORTRAIT_180
return np.rot90(frame, k=2, axes=(0, 1))
case 3: # LANDSCAPE_270
return np.rot90(frame, k=1, axes=(0, 1))
case _:
raise ValueError(
'Orientation must be an integer in [0, 3] but is %r' % orientation
)


def convert_int_to_float(data: np.ndarray,
Expand Down

0 comments on commit c279593

Please sign in to comment.