forked from vincentzlt/textprep
-
Notifications
You must be signed in to change notification settings - Fork 0
/
vocab.py
156 lines (120 loc) · 4.4 KB
/
vocab.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
import os
import argparse as ap
import fileinput as fi
import json
import collections as cl
from tqdm import tqdm
import re
import itertools as it
CURRENT_DIR = os.path.dirname(os.path.realpath(__file__))
DUP = '〾'
IDCs = '⿰⿱⿲⿳⿴⿵⿶⿷⿸⿹⿺⿻'
IDS_FNAME = os.path.join(CURRENT_DIR, 'cjkvi-ids', 'ids.txt')
CIRCLE_FNAME = os.path.join(CURRENT_DIR, 'data', 'circle_char.txt')
SINGLE_FNAME = os.path.join(CURRENT_DIR, 'data', 'single_char.txt')
RE_squarebrackets = re.compile(r'\[[^[]*\]')
RE_IDCs = re.compile(r'[⿰⿱⿲⿳⿴⿵⿶⿷⿸⿹⿺⿻]')
def _str2bool(v):
if v.lower() in ('yes', 'true', 't', 'y', '1'):
return True
elif v.lower() in ('no', 'false', 'f', 'n', '0'):
return False
else:
raise ap.ArgumentTypeError('Boolean value expected.')
def _get_char2ideos(fnames):
char2ideos = {}
ideos_set = set()
for l in it.chain.from_iterable(open(fname) for fname in fnames):
if not l.startswith('#'):
if l.startswith('U'):
u, c, *ds = l.strip().split()
else:
c, *ds = l.strip().split()
char2ideos[c], ideos_set = _get_d(ds, ideos_set)
return char2ideos
def _get_d(ds, ideos_set):
ds = [RE_squarebrackets.sub('', d) for d in ds]
difference = set(ds).difference(ideos_set)
while not difference:
ds = [DUP + d for d in ds]
difference = set(ds).difference(ideos_set)
d = difference.pop()
ideos_set.add(d)
return d, ideos_set
def _recursive_decomp(char2ideos):
ideos_set = set()
for c, d in char2ideos.items():
while True:
new_d = ''.join([char2ideos.get(c_, c_) for c_ in d])
if new_d == d:
break
else:
d = new_d
while d in ideos_set:
d = DUP + d
char2ideos[c] = d
ideos_set.add(d)
def _word_decomp(w, char2ideos, decomp_set):
decomp = ''.join([char2ideos.get(c, c) for c in w])
while decomp in decomp_set:
decomp = DUP + decomp
decomp_set.add(decomp)
return decomp, decomp_set
def _vocab2ideos(vocab, char2ideos):
vocab_decomps = {}
decomp_set = set()
for w in vocab:
decomp, decomp_set = _word_decomp(w, char2ideos, decomp_set)
vocab_decomps[w] = decomp
return vocab_decomps
def _chunks(l, n):
"""Yield successive n-sized chunks from l."""
for i in range(0, len(l), n):
yield l[i:i + n]
def vocab(args):
if os.path.exists(args.vocab) and os.path.getsize(args.vocab):
vocab = cl.Counter(json.loads(open(args.vocab).read()))
else:
vocab = cl.Counter(
w for l in tqdm(
it.chain.from_iterable(
open(fname, errors='replace') for fname in args.input))
for w in l.strip().split())
# print('vocab len: {}'.format(len(vocab)))
with open(args.vocab, 'wt') as fout:
fout.write(json.dumps(vocab, indent=4, ensure_ascii=False))
if args.level.startswith('ideo'):
IDS_fnames = [IDS_FNAME, CIRCLE_FNAME]
elif args.level.startswith('stroke'):
IDS_fnames = [IDS_FNAME, CIRCLE_FNAME, SINGLE_FNAME]
char2ideos = _get_char2ideos(IDS_fnames)
if not args.idc:
for c, d in char2ideos.items():
char2ideos[c] = RE_IDCs.sub('', d)
if args.level in ['ideo_finest', 'stroke']:
_recursive_decomp(char2ideos)
vocab2ideos = _vocab2ideos(vocab, char2ideos)
assert (len(vocab2ideos) == len(set(vocab2ideos.values())))
assert (len(vocab) == len(vocab2ideos))
# print('saved len {}'.format(len(vocab2ideos)))
js = json.dumps(vocab2ideos, indent=4, ensure_ascii=False)
open(args.vocab_decomp, 'wt').write(js)
if __name__ == "__main__":
vocab_parser = ap.ArgumentParser()
vocab_parser.add_argument('input', nargs='*', help='input fnames.')
vocab_parser.add_argument('vocab', help='output vocab fname.')
vocab_parser.add_argument(
'vocab_decomp', help='output vocab_decomp fname.')
vocab_parser.add_argument(
'--level',
default='ideo_raw',
choices=['ideo_raw', 'ideo_finest', 'stroke'],
help='to what level should the decomposition be.')
vocab_parser.add_argument(
'--idc',
default=True,
type=_str2bool,
help='whether to include structual IDCs in the decomp. (yes/no)')
args = vocab_parser.parse_args()
print(args)
vocab(args)