-
Notifications
You must be signed in to change notification settings - Fork 4
/
recovery.py
executable file
·450 lines (351 loc) · 14.4 KB
/
recovery.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
#!/usr/bin/env python3
#
# To use this, install with:
#
# pip install --editable .
#
# That will create the command "psbt_recover" in your path... or just use "./psbt_recover ..." here
#
#
import click, sys, os, pdb, struct, io, json, re, time
from psbt import BasicPSBT, BasicPSBTInput, BasicPSBTOutput
from pprint import pformat, pprint
from binascii import b2a_hex as _b2a_hex
from binascii import a2b_hex
from io import BytesIO
from collections import namedtuple
from base64 import b64encode, b64decode
from pycoin.coins.bitcoin.Tx import Tx, TxOut, TxIn
from pycoin.networks.registry import network_for_netcode
#from pycoin.coins.bitcoin.TxOut import TxOut
#from pycoin.coins.bitcoin.TxIn import TxIn
#from pycoin.ui import standard_tx_out_script => network.contract.for_address
#from pycoin.encoding import b2a_hashed_base58, hash160
from pycoin.encoding.hexbytes import b2h_rev, b2h, h2b, h2b_rev
from pycoin.contrib.segwit_addr import encode as bech32_encode
from pycoin.key.BIP32Node import BIP32Node
from pycoin.convention import tx_fee
import urllib.request
b2a_hex = lambda a: str(_b2a_hex(a), 'ascii')
#xfp2hex = lambda a: b2a_hex(a[::-1]).upper()
BTC = network_for_netcode("BTC")
TESTNET = False
def explora(*parts, is_json=True):
base = 'https://blockstream.info/'
if TESTNET:
base += 'testnet/'
url = f'{base}api/' + '/'.join(parts)
time.sleep(0.1)
with urllib.request.urlopen(url) as response:
return json.load(response) if is_json else response.read()
def str2ipath(s):
# convert text to numeric path for BIP174
for i in s.split('/'):
if i == 'm': continue
if not i: continue # trailing or duplicated slashes
if i[-1] in "'ph":
assert len(i) >= 2, i
here = int(i[:-1]) | 0x80000000
else:
here = int(i)
assert 0 <= here < 0x80000000, here
yield here
def xfp2str(xfp):
# Standardized way to show an xpub's fingerprint... it's a 4-byte string
# and not really an integer. Used to show as '0x%08x' but that's wrong endian.
return b2a_hex(struct.pack('>I', xfp)).upper()
def str2xfp(xfp):
# Standardized way to show an xpub's fingerprint... it's a 4-byte string
# and not really an integer. Used to show as '0x%08x' but that's wrong endian.
return struct.unpack('>I', a2b_hex(xfp))[0]
assert str2xfp(xfp2str(0x1234)) == 0x1234
def str2path(xfp, s):
# output binary needed for BIP-174
p = list(str2ipath(s))
# XXX not sure about endian here, but this worked when I needed it
return struct.pack('>I', xfp) + struct.pack('<%dI' % len(p), *p)
def calc_pubkey(xpubs, path):
# given a map of paths to xpubs, and a single path, calculate the pubkey
assert path[0:2] == 'm/'
hard_prefix = '/'.join(s for s in path.split('/') if s[-1] == "'")
if hard_prefix:
want = ('m/'+hard_prefix)
hard_depth = hard_prefix.count('/') + 1
else:
want = 'm'
hard_depth = 0
assert want in xpubs, f"Need: {want} to build pubkey of {path}"
node = BTC.parse.bip32(xpubs[want])
parts = [s for s in path.split('/') if s != 'm'][hard_depth:]
# node = node.subkey_for_path(path[2:])
if not parts:
assert want == path
else:
assert len(parts) >= 2 # 0/1 typical
for sk in parts:
node = node.subkey_for_path(sk)
return node.sec()
def build_psbt(ctx, xfp, addrs, pubkey=None, xpubs=None, redeem=None):
locals().update(ctx.obj)
payout_address = ctx.obj['payout_address']
out_psbt = ctx.obj['output_psbt']
force_fee = ctx.obj['force_fee']
if pubkey:
assert len(addrs) == 1 # can only be single addr in that case
assert len(pubkey) == 33
if not xfp:
print("Warning: Using XFP value of zero. Non standard PSBT will result.")
spending = []
total = 0
psbt = BasicPSBT()
for path, addr in addrs:
print(f"addr: {path} => {addr} ... ", end='')
rr = explora('address', addr, 'utxo')
if not rr:
print('nada')
continue
here = 0
for u in rr:
here += u['value']
tt = TxIn(h2b_rev(u['txid']), u['vout'])
spending.append(tt)
#print(rr)
pin = BasicPSBTInput(idx=len(psbt.inputs))
psbt.inputs.append(pin)
pubkey = pubkey or calc_pubkey(xpubs, path)
pin.bip32_paths[pubkey] = str2path(xfp or 0, path)
# fetch the UTXO for witness signging
td = explora('tx', u['txid'], 'hex', is_json=False)
#print(f"txis {u['txid']}:\b{td!r}")
txn = Tx.from_hex(td.decode('ascii'))
# XXX need to know if input was segwit or not
if 0:
# witness utxo -- prefered
with BytesIO() as b:
txn.txs_out[u['vout']].stream(b)
pin.witness_utxo = b.getvalue()
else:
# full funding txn -- semi-tested
with BytesIO() as b:
txn.stream(b)
pin.utxo = b.getvalue()
if redeem:
pin.redeem_script = redeem
print('%.8f BTC' % (here / 1E8))
total += here
if len(spending) > 15:
print("Reached practical limit on # of inputs. "
"You'll need to repeat this process again later.")
break
assert total, "Sorry! Didn't find any UTXO"
print("Found total: %.8f BTC" % (total / 1E8))
if payout_address:
print("Planning to send to: %s" % payout_address)
dest_scr = BTC.contract.for_address(payout_address)
txn = Tx(2,spending,[TxOut(total, dest_scr)])
else:
print("Output section of PSBT will be empty. Change downstream")
txn = Tx(2,spending,[])
fee = force_fee or tx_fee.recommended_fee_for_tx(txn)
print("Using miner's fee: %.8f BTC" % (fee / 1E8))
# placeholder, single output that isn't change
pout = BasicPSBTOutput(idx=0)
psbt.outputs.append(pout)
if txn.txs_out:
txn.txs_out[0].coin_value -= fee
# write txn into PSBT
with BytesIO() as b:
txn.stream(b)
psbt.txn = b.getvalue()
out_psbt.write(psbt.as_bytes())
print("PSBT to be signed:\n\n\t" + out_psbt.name, end='\n\n')
@click.group()
@click.option('-p', '--payout_address', type=str, default=None, metavar="1bitcoinaddr")
@click.option('-o', '--output_psbt', type=click.File('wb'), default="out.psbt")
@click.option('-t', '--testnet', help="Assume testnet3 addresses", is_flag=True, default=False)
@click.option('-f', '--fee', type=int, help="Force network fee (default: calc)", metavar='SATOSHIS', default=0)
@click.pass_context
def cli(ctx, payout_address, output_psbt, testnet, fee):
ctx.ensure_object(dict)
ctx.obj['payout_address'] = payout_address
ctx.obj['output_psbt'] = output_psbt
ctx.obj['force_fee'] = fee
global TESTNET
TESTNET = testnet
@cli.command('desc', help="Use a descriptor path")
@click.argument('descriptor', type=str, metavar='FULL-DESCRIPTOR')
@click.argument('address', type=str, metavar='Address')
@click.option('--xfp', '--fingerprint', help="Provide XFP value, otherwise some checks will be skipped", default=None)
@click.option('--xpub', help="Optional XPUB at hardened depth", default=None)
@click.option('--depth', help="Depth of xpub given", type=int, default=None)
@click.pass_context
def descriptor(ctx, descriptor, address, xfp, xpub, depth):
locals().update(ctx.obj)
if xpub and not depth:
print("need depth if xpub given")
sys.exit(1)
# XXX could not find quick python lib to read miniscript
# - not checking checksum TODO
m = re.match(r"(.*)\(\[([a-f0-9/']*)\]([a-f0-9]{66})", descriptor)
if not m:
print("descriptor fail")
return
# ex = "sh(wpkh([e0000002/84'/0'/0'/0/9]022c...43434))#v90hljj9"
mode = m.group(1) # sh(wpkh
mode = mode.replace('(', '/').replace(')', '').upper()
deriv = m.group(2) # e0000002/84'/0'/0'/0/9
expect_pubkey = m.group(3) # 022c...34
parts = deriv.split('/')
if xfp:
assert parts[0].lower() == xfp.lower(), f'wrong xfp? got={parts[0]} expected={xfp}'
else:
# expect 8 hex digits
xfp = parts[0]
assert len(xfp) == 8
xfp = str2xfp(xfp)
path = '/'.join(parts[1:])
addr_fmt = None
if xpub:
wallet = BTC.parse.bip32(xpub)
sub = '/'.join(parts[1+depth:])
ph = '/'.join(["_'"] * depth)
print(f"Assuming: m/{ph}/{sub} is path")
node = wallet.subkey_for_path(sub)
pubkey = node.sec()
assert b2a_hex(pubkey) == expect_pubkey
p2sh_segwit_script = None
fails = []
for pc_name, guess_addr, *junk in BTC.output_for_public_pair(node.public_pair()):
if pc_name == 'p2sh_segwit_script':
p2sh_segwit_script = a2b_hex(guess_addr)
continue
if guess_addr == address:
addr_fmt = pc_name
print(f"Address Format: {addr_fmt} vs {mode} must be right")
fails.append(guess_addr)
if not addr_fmt:
print("Can't confirm address based on xpub + path")
print("tried: " + ' '.join(fails))
print(f"none match: {address}")
sys.exit(1)
else:
pubkey = a2b_hex(expect_pubkey)
addrs = [ (path, address) ]
build_psbt(ctx, xfp, addrs, pubkey=pubkey, redeem=p2sh_segwit_script)
@cli.command('public', help="Walk contents of public.txt from Coldcard")
@click.argument('public_txt', type=click.File('rt'))
@click.option('--xfp', '--fingerprint', help="Provide XFP value, otherwise discovered from file", default=None)
@click.option('--gap', help="Widen search by searching /[0/1]/0...gap", default=None, type=int)
@click.option('--xpub', 'single_xpub', help="Limit work to single xpub", default=None)
@click.option('--dump_addrs', help="Dump addrs and paths we will check (and stop)", default=None)
@click.pass_context
def recovery(ctx, public_txt, xfp=None, gap=None, single_xpub=None, dump_addrs=None):
global TESTNET
locals().update(ctx.obj)
''' Match lines like:
m/0'/0'/0' => n3ieqYKgVR8oB2zsHVX1Pr7Zc31pP3C7ZJ
m/0/2 => mh7finD8ctq159hbRzAeevSuFBJ1NQjoH2
and also
m => tpubD6NzVbkrYhZ4XzL5Dhayo67Gorv1YMS7j8pRUvVMd5odC2LBPLAygka9p7748JtSq82FNGPppFEz5xxZUdasBRCqJqXvUHq6xpnsMcYJzeh
'''
pat_dest = re.compile(r"(m[0-9'/]*)\s+=>\s+(\w+)")
# match pubkeys, including SLIP132 confusion
pat_pk = re.compile(r"(\wpub\w{100,140})")
if gap and not single_xpub:
print("Must specify xpub if gap feature to be used")
sys.exit(1)
addrs = []
xpubs = {}
last_xpub = None
for ln in public_txt:
m = pat_dest.search(ln)
if m:
path, addr = m.group(1), m.group(2)
xp = pat_pk.search(addr)
if xp:
xp = xp.group(1)
if path not in xpubs:
xpubs[path] = xp
last_xpub = xp
elif xpubs[path] != xp:
if xp[0] in 'vVuUzyZY':
# slip-132 junk
pass
else:
print(f'Conflict for {path} xpub:\n {xp}\n {xpubs[path]}')
else:
#print(f"{path} => {addr}")
assert path[0:2] == 'm/'
if single_xpub and last_xpub != single_xpub:
continue
addrs.append( (path, addr) )
if addr.startswith('tb1') and not TESTNET:
print("Looks like TESTNET addresses; switching.")
TESTNET = True
if not xfp:
if 'master key fingerprint: 0x' in ln:
# pre 2.1.0 firmware w/ LE32 value
xfp = int(ln.split(': ')[1], 16)
elif 'master key fingerprint: ' in ln:
# after 2.1.0 firmware w/ BE32 value
xfp, = struct.unpack('>I', a2b_hex(ln.split(': ')[1].strip()))
if xfp:
print("Fingerprint is: " + xfp2str(xfp))
pubkeys = {}
if single_xpub:
assert single_xpub in xpubs.values(), "Specific xpub not found: " + repr(xpubs)
the_path = [p for p,xp in xpubs.items() if xp == single_xpub][0]
the_path += '/{change}/{index}'
if gap:
print(f"Will use deriv path: {the_path}")
wallet = BTC.parse.bip32(single_xpub)
expect_addr = addrs[0][1] # for .../0/0
addrs = []
addr_fmt = None
for ch in range(2):
for idx in range(gap):
p = the_path.format(change=ch, index=idx)
node = wallet.subkey(ch).subkey(idx)
garbage = dict((a,b) for a,b,*c in BTC.output_for_public_pair(node.public_pair()))
if not addr_fmt:
assert idx==0 and ch==0
for k,v in garbage.items():
if v == expect_addr:
addr_fmt = k
print(f"Address format will be: {addr_fmt}")
break
else:
assert not expect, "Could not find 0/0 addr in public?!"
addr = garbage[addr_fmt]
pubkeys[p] = node.sec()
if idx == 0 and ch == 0:
assert addr == expect_addr
addrs.append( (p, addr) )
else:
print("Found %d xpubs: %s" % (len(xpubs), ' '.join(xpubs)))
if not addrs:
print("No addresses found!")
sys.exit(1)
if dump_addrs:
with open(dump_addrs, 'wt') as fd:
for p,a in addrs:
fd.write(f'{p} => {a}\n')
print(f'Wrote: {dump_addrs}')
sys.exit(0)
print(f"Found {len(addrs)} addresses: from {addrs[0][0]} to {addrs[-1][0]}")
print("Checking for balances.\n")
if 0:
# verify we have enough data
trouble = 0
for path, addr in addrs:
try:
calc_pubkey(xpubs, path)
except AssertionError as exc:
print(str(exc))
trouble += 1
if trouble:
sys.exit(1)
build_psbt(ctx, xfp, addrs, xpubs=xpubs)
if __name__ == '__main__':
cli()
# EOF