-
Notifications
You must be signed in to change notification settings - Fork 0
/
mysd2csv.py
134 lines (108 loc) · 3.28 KB
/
mysd2csv.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
import argparse
import pathlib
import re
from io import TextIOWrapper
from typing import List
def get_progress_bar(percent: float, width: int) -> str:
num_chars = int(percent / 100 * width)
return '[' + '#' * num_chars + ' ' * (width - num_chars) + ']'
def get_filename(
f: TextIOWrapper,
) -> str:
filename_pattern = re.compile(r'(create table `(.*)` \()')
for line in f:
line = line.strip().lower()
if filename_groups := filename_pattern.match(line):
filename = filename_groups.group(2)
return filename + '.csv'
raise ValueError('Table name not found')
def get_columns(
f: TextIOWrapper,
) -> List[str]:
columns = []
for line in f:
if column_match := re.match(r'`(.*)`', line.strip()):
columns.append(column_match.group(1).strip())
else:
break
return columns
def save_csv_from_chunk(
line: str,
output_file: pathlib.Path,
null: str = '',
) -> None:
line = line[line.find('(')+1: -2]
with open(output_file, 'a') as f:
values = line.split(r'),(')
values = list(map(lambda x: x.replace('NULL', null), values))
f.writelines('\n'.join(values) + '\n')
def init_csv_file(
filename: pathlib.Path,
columns: List[str],
sep: str = ',',
) -> None:
with filename.open('w') as f:
f.write(sep.join(columns) + '\n')
def mysql_to_csv():
parser = argparse.ArgumentParser()
parser.add_argument(
'-I',
'--input-file',
help='Path to mysqldump',
dest='input',
)
parser.add_argument(
'-o',
'--output-file',
help='Output filename. Default - name from dump.',
dest='output',
)
parser.add_argument(
'--null',
default='',
help='How to replace NULL. Default=\'\'',
)
args = parser.parse_args()
if not args.input:
parser.print_help()
exit(1)
try:
path = pathlib.Path(args.input)
input_file = open(path, 'r')
num_lines = sum(1 for r in open(args.input) if r.lower().startswith('insert'))
except FileNotFoundError:
print(f'Wrong input file path {args.input:!r}')
exit(1)
try:
output_filename = get_filename(f=input_file)
if args.output:
output_filename = args.input.lstrip('.csv') + '.csv'
columns = get_columns(f=input_file)
init_csv_file(
filename=pathlib.Path(output_filename),
columns=columns,
)
c = 0
for line in input_file:
line = line.strip()
if line.lower().startswith('insert into'):
line = line[line.find('('): -2]
save_csv_from_chunk(
line=line,
output_file=pathlib.Path(output_filename),
null=args.null,
)
c += 1
percent_complete = c / num_lines * 100
progress_bar = get_progress_bar(percent_complete, width=20)
print(f'{progress_bar} {c}/{num_lines}', end='\r')
print()
except Exception as e:
print(e)
print('Can"t parse {args.input:!r}')
finally:
input_file.close()
def main():
mysql_to_csv()
if __name__ == '__main__':
main()