Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add THP credential manager #3862

Merged
merged 4 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions common/protob/messages-thp.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
syntax = "proto2";
package hw.trezor.messages.thp;

// Sugar for easier handling in Java
option java_package = "com.satoshilabs.trezor.lib.protobuf";
option java_outer_classname = "TrezorMessageThp";

M1nd3r marked this conversation as resolved.
Show resolved Hide resolved
option (include_in_bitcoin_only) = true;

import "messages.proto";

/**
* Only for internal use.
* @embed
*/
message ThpCredentialMetadata {
option (internal_only) = true;
optional string host_name = 1; // Human-readable host name
}

/**
* Only for internal use.
* @embed
*/
message ThpPairingCredential {
option (internal_only) = true;
optional ThpCredentialMetadata cred_metadata = 1; // Credential metadata
optional bytes mac = 2; // Message authentication code generated by the Trezor
}

/**
* Only for internal use.
* @embed
*/
message ThpAuthenticatedCredentialData {
option (internal_only) = true;
optional bytes host_static_pubkey = 1; // Host's static public key used in the handshake
optional ThpCredentialMetadata cred_metadata = 2; // Credential metadata
}
5 changes: 3 additions & 2 deletions common/protob/messages.proto
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,9 @@ extend google.protobuf.EnumOptions {

/** Options for tagging message types */
extend google.protobuf.MessageOptions {
optional bool experimental_message = 52001; // indicate that a message is intended for development and beta testing only and its definition may change at any time
optional uint32 wire_type = 52002; // override wire type specified in the MessageType enum
optional bool experimental_message = 52001; // indicate that a message is intended for development and beta testing only and its definition may change at any time
optional uint32 wire_type = 52002; // override wire type specified in the MessageType enum
optional bool internal_only = 52003; // indicate that a message is intended for internal use only and should not be transmitted via the wire
M1nd3r marked this conversation as resolved.
Show resolved Hide resolved
}

/** Options for tagging field types */
Expand Down
11 changes: 10 additions & 1 deletion core/SConscript.firmware
Original file line number Diff line number Diff line change
Expand Up @@ -473,9 +473,15 @@ env.Replace(
#

PROTO_SOURCES_DIR = '../../../common/protob/'

exclude_list = [PROTO_SOURCES_DIR + 'messages-bootloader.proto']
if not THP:
exclude_list.append(PROTO_SOURCES_DIR + 'messages-thp.proto')

PROTO_SOURCES = Glob(PROTO_SOURCES_DIR + '*.proto',
exclude=[PROTO_SOURCES_DIR + 'messages-bootloader.proto']
exclude=exclude_list
)

qstr_protobuf = env.Command(
target=[
'genhdr/qstrdefs.protobuf.h',
Expand Down Expand Up @@ -652,6 +658,9 @@ if FROZEN:
SOURCE_PY.extend(Glob(SOURCE_PY_DIR + 'apps/tezos/*.py'))
SOURCE_PY.extend(Glob(SOURCE_PY_DIR + 'trezor/enums/Tezos*.py'))

if THP:
SOURCE_PY.extend(Glob(SOURCE_PY_DIR + 'apps/thp/*.py'))

SOURCE_PY.extend(Glob(SOURCE_PY_DIR + 'apps/zcash/*.py'))

SOURCE_PY.extend(Glob(SOURCE_PY_DIR + 'apps/webauthn/*.py'))
Expand Down
11 changes: 10 additions & 1 deletion core/SConscript.unix
Original file line number Diff line number Diff line change
Expand Up @@ -551,9 +551,15 @@ env.Replace(
#

PROTO_SOURCES_DIR = '../../../common/protob/'

exclude_list = [PROTO_SOURCES_DIR + 'messages-bootloader.proto']
if not THP:
exclude_list.append(PROTO_SOURCES_DIR + 'messages-thp.proto')

PROTO_SOURCES = Glob(PROTO_SOURCES_DIR + '*.proto',
exclude=[PROTO_SOURCES_DIR + 'messages-bootloader.proto']
exclude=exclude_list
)

qstr_protobuf = env.Command(
target=[
'genhdr/qstrdefs.protobuf.h',
Expand Down Expand Up @@ -733,6 +739,9 @@ if FROZEN:
SOURCE_PY.extend(Glob(SOURCE_PY_DIR + 'apps/tezos/*.py'))
SOURCE_PY.extend(Glob(SOURCE_PY_DIR + 'trezor/enums/Tezos*.py'))

if THP:
SOURCE_PY.extend(Glob(SOURCE_PY_DIR + 'apps/thp/*.py'))

SOURCE_PY.extend(Glob(SOURCE_PY_DIR + 'apps/zcash/*.py'))

SOURCE_PY.extend(Glob(SOURCE_PY_DIR + 'apps/webauthn/*.py'))
Expand Down
7 changes: 6 additions & 1 deletion core/src/all_modules.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 9 additions & 3 deletions core/src/all_modules.py.mako
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@ def make_import_name(pyfile):

imports = [make_import_name(f) for f in pyfiles]

imports_common = [import_name for import_name in imports if not any(a in import_name.lower() for a in ALTCOINS)]
imports_altcoin = [import_name for import_name in imports if import_name not in imports_common]
imports_thp = [import_name for import_name in imports if ".thp" in import_name.lower()]
imports_common = [import_name for import_name in imports if (not any(a in import_name.lower() for a in ALTCOINS) and import_name not in imports_thp)]
imports_altcoin = [import_name for import_name in imports if import_name not in imports_common and import_name not in imports_thp]

%>\
from trezor.utils import halt
Expand Down Expand Up @@ -79,12 +80,17 @@ ${import_name}
import ${import_name}
% endfor

if utils.USE_THP:
M1nd3r marked this conversation as resolved.
Show resolved Hide resolved
% for import_name in imports_thp:
${import_name}
import ${import_name}
% endfor

if not utils.BITCOIN_ONLY:
% for import_name in imports_altcoin:
${import_name}
import ${import_name}
% endfor

# generate full alphabet
<%
ALPHABET = "abcdefghijklmnopqrstuvwxyz"
Expand Down
Empty file added core/src/apps/thp/__init__.py
Empty file.
90 changes: 90 additions & 0 deletions core/src/apps/thp/credential_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
from typing import TYPE_CHECKING

from trezor import protobuf
from trezor.crypto import hmac
from trezor.messages import (
ThpAuthenticatedCredentialData,
ThpCredentialMetadata,
ThpPairingCredential,
)
from trezor.wire import wrap_protobuf_load

if TYPE_CHECKING:
from apps.common.paths import Slip21Path


def derive_cred_auth_key() -> bytes:
"""
Derive current credential authentication mac-ing key from device secret.
"""
from storage.device import get_cred_auth_key_counter, get_device_secret

from apps.common.seed import Slip21Node

# Derive the key using SLIP-21 https://github.com/satoshilabs/slips/blob/master/slip-0021.md,
# the derivation path is m/"Credential authentication key"/(counter 4-byte BE)

thp_secret = get_device_secret()
label = b"Credential authentication key"
counter = get_cred_auth_key_counter()
path: Slip21Path = [label, counter]

symmetric_key_node: Slip21Node = Slip21Node(thp_secret)
symmetric_key_node.derive_path(path)
cred_auth_key = symmetric_key_node.key()

return cred_auth_key


def invalidate_cred_auth_key() -> None:
from storage.device import increment_cred_auth_key_counter

increment_cred_auth_key_counter()


def issue_credential(
host_static_pubkey: bytes,
credential_metadata: ThpCredentialMetadata,
) -> bytes:
"""
Issue a pairing credential binded to the provided host static public key
and credential metadata.
"""
cred_auth_key = derive_cred_auth_key()
proto_msg = ThpAuthenticatedCredentialData(
host_static_pubkey=host_static_pubkey,
cred_metadata=credential_metadata,
)
authenticated_credential_data = _encode_message_into_new_buffer(proto_msg)
mac = hmac(hmac.SHA256, cred_auth_key, authenticated_credential_data).digest()
mmilata marked this conversation as resolved.
Show resolved Hide resolved

proto_msg = ThpPairingCredential(cred_metadata=credential_metadata, mac=mac)
credential_raw = _encode_message_into_new_buffer(proto_msg)
return credential_raw


def validate_credential(
encoded_pairing_credential_message: bytes,
host_static_pubkey: bytes,
) -> bool:
"""
Validate a pairing credential binded to the provided host static public key.
"""
cred_auth_key = derive_cred_auth_key()
expected_type = protobuf.type_for_name("ThpPairingCredential")
credential = wrap_protobuf_load(encoded_pairing_credential_message, expected_type)
mmilata marked this conversation as resolved.
Show resolved Hide resolved
assert ThpPairingCredential.is_type_of(credential)
proto_msg = ThpAuthenticatedCredentialData(
host_static_pubkey=host_static_pubkey,
cred_metadata=credential.cred_metadata,
)
authenticated_credential_data = _encode_message_into_new_buffer(proto_msg)
mac = hmac(hmac.SHA256, cred_auth_key, authenticated_credential_data).digest()
return mac == credential.mac


def _encode_message_into_new_buffer(msg: protobuf.MessageType) -> bytes:
msg_len = protobuf.encoded_length(msg)
new_buffer = bytearray(msg_len)
protobuf.encode(new_buffer, msg)
return new_buffer
46 changes: 46 additions & 0 deletions core/src/trezor/messages.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

79 changes: 79 additions & 0 deletions core/tests/test_apps.thp.credential_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
from common import *
from trezor import config, utils
from trezor import log

if utils.USE_THP:
from apps.thp import credential_manager
from trezor.messages import ThpCredentialMetadata

def _issue_credential(host_name: str, host_static_pubkey: bytes) -> bytes:
metadata = ThpCredentialMetadata(host_name=host_name)
return credential_manager.issue_credential(host_static_pubkey, metadata)

def _dummy_log(name: str, msg: str, *args):
pass

log.debug = _dummy_log
DUMMY_KEY_1 = b"\x00\x00"
DUMMY_KEY_2 = b"\xff\xff"
HOST_NAME_1 = "host_name"
HOST_NAME_2 = "different host_name"


@unittest.skipUnless(utils.USE_THP, "only needed for THP")
class TestTrezorHostProtocolCredentialManager(unittest.TestCase):
def setUp(self):
config.init()
config.wipe()

def test_derive_cred_auth_key(self):
key1 = credential_manager.derive_cred_auth_key()
key2 = credential_manager.derive_cred_auth_key()
self.assertEqual(len(key1), 32)
self.assertEqual(key1, key2)

def test_invalidate_cred_auth_key(self):
key1 = credential_manager.derive_cred_auth_key()
credential_manager.invalidate_cred_auth_key()
key2 = credential_manager.derive_cred_auth_key()
self.assertNotEqual(key1, key2)

def test_credentials(self):

cred_1 = _issue_credential(HOST_NAME_1, DUMMY_KEY_1)
cred_2 = _issue_credential(HOST_NAME_1, DUMMY_KEY_1)
self.assertEqual(cred_1, cred_2)

cred_3 = _issue_credential(HOST_NAME_2, DUMMY_KEY_1)
self.assertNotEqual(cred_1, cred_3)

self.assertTrue(credential_manager.validate_credential(cred_1, DUMMY_KEY_1))
self.assertTrue(credential_manager.validate_credential(cred_3, DUMMY_KEY_1))
self.assertFalse(credential_manager.validate_credential(cred_1, DUMMY_KEY_2))

credential_manager.invalidate_cred_auth_key()
cred_4 = _issue_credential(HOST_NAME_1, DUMMY_KEY_1)
self.assertNotEqual(cred_1, cred_4)
self.assertFalse(credential_manager.validate_credential(cred_1, DUMMY_KEY_1))
self.assertFalse(credential_manager.validate_credential(cred_3, DUMMY_KEY_1))
self.assertTrue(credential_manager.validate_credential(cred_4, DUMMY_KEY_1))

def test_protobuf_encoding(self):
"""
If the protobuf encoding of credentials changes in the future, this
test should be able to catch it.

When the test fails, it might be necessary to create custom parser
of credentials to ensure that credentials remain valid after FW update.
"""
expected = b"\x0a\x0b\x0a\x09\x68\x6f\x73\x74\x5f\x6e\x61\x6d\x65\x12\x20\xf4\x44\x86\x2d\x00\x23\x1d\x02\xf3\x20\xbb\x58\xed\x13\x8f\xc6\x84\x9b\x6b\x73\x7a\x33\x25\xc4\x71\x79\x3b\x45\x15\xe4\x76\x67"

# Use hard-coded bytes as a "credential auth key" when issuing a credential
credential_manager.derive_cred_auth_key = lambda: b"\xBE\xEF"

credential = _issue_credential(HOST_NAME_1, DUMMY_KEY_1)
self.assertEqual(credential, expected)


if __name__ == "__main__":
unittest.main()
Loading
Loading