-
Notifications
You must be signed in to change notification settings - Fork 0
/
emlv
executable file
·266 lines (215 loc) · 8.89 KB
/
emlv
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
#!/usr/bin/env python2
# -*- coding: utf-8 -*-
import os
import sys
import traceback
import locale
import gettext
from gemlv.sysutils import warnx
try:
import better_exchook
except ImportError:
pass
else:
better_exchook.install()
import email
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.base import MIMEBase
from email.mime.message import MIMEMessage
from gemlv.contenttypestring import ContentTypeString
import gemlv.email
import re
from tempfile import mkstemp
from tempfile import mkdtemp
from mimetypes import guess_extension
import magic
import argparse
import time
try:
import setproctitle
except ImportError:
warnx("No module named setproctitle. Degrading.")
class setproctitle(object):
@classmethod
def setproctitle(self, *x, **kv):
pass
import xdg.BaseDirectory
from gemlv.utils import AddressLine
from gemlv.utils import getaddresslines
from gemlv.utils import decode_mimetext
from gemlv.utils import decode_readably
from gemlv.mimetext import MimeDecoded
from gemlv.mimetext import MimeEncoded
import gemlv.addressbook
from gemlv.constants import *
import gemlv.mime as mime
from gemlv.textutils import human_size
from gemlv.textutils import wildcardmatch
from gemlv.pythonutils import SubstractableList
def output_write(s):
if isinstance(s, unicode): s = s.encode('utf-8')
output.write(str(s))
def td_esc_cb(m):
return repr(m.group(0)).replace('\'', '')
def td_esc(s):
return re.sub(r'[\t\n\r]', td_esc_cb, s)
def print_mime_part_td_header():
output_write("INDEX MIME_TYPE SIZE FILENAME DESCRIPTION\n")
def list_mime_parts(eml, input_file_index):
for part_idx, eml in walk_mime_parts(eml, input_file_index):
size = human_size(eml.size_decoded_approx)
output_write('\t'.join([part_idx, eml.content_type, size, td_esc(eml.filename), td_esc(eml.header['Content-Description'].decoded)])+'\n')
def display_headers(eml, extra_headers=[]):
headers = CommonUserInterestedHeaders
headers.extend(SubstractableList(extra_headers) - CommonUserInterestedHeaders)
for hname_patt in headers:
for header in eml.headers.items:
if wildcardmatch(header.raw_name, hname_patt):
hval = header.value.decoded.strip().replace('\n', '\n\t')
output_write(header.raw_name+': '+hval+'\n')
class DecodeIfTTY(object):
def __init__(self, output, decoder):
self.output = output
self.decoder = decoder
def decode(self, data):
if self.output.isatty():
return self.decoder(data)
else:
return data
def display(eml):
decoder = DecodeIfTTY(output, lambda data: decode_readably(data, eml))
if eml.is_multipart():
output_write("%s\n%s" % (decoder.decode(eml.preamble), decoder.decode(eml.epilogue)))
else:
output_write(decoder.decode(eml.payload_decoded))
def load_email(filepath):
return gemlv.email.Email(email.message_from_file(open(filepath, 'r')))
def walk_mime_parts(eml, idx):
yield (idx, eml)
for part_idx, part in enumerate(eml.parts):
for x in walk_mime_parts(part, '%s.%d' % (idx, part_idx)):
yield x
default_filter_prefix = '~/.local/bin/emlv-filter/'
argparser = argparse.ArgumentParser(formatter_class=argparse.RawDescriptionHelpFormatter)
argparser.description = """Display RFC-822 email file(s) headers, content, and attachments, list MIME parts, extract attachments from it.
Without --list or --extract option, i.e. read-mode, display the first text/plain part as content.
It searches executables named <MAIN> and <MAIN>_<SUB> in """+default_filter_prefix+""" to filter content with MIME type <MAIN>/<SUB> through them.
You can disable this by "--body_filter=-" parameter."""
argp_mxg1 = argparser.add_mutually_exclusive_group()
argp_mxg1.add_argument('--list', '-l', action='store_true', help="List all MIME parts in email files given in FILE.")
argp_mxg1.add_argument('--extract', '-x', action='append', metavar='INDEX', help="Extract MIME part specified by INDEX to stdout. May be specified multiple times. See --list to get the indices of each part of a MIME-multipart file.")
argp_mxg1.add_argument('--save', '-s', action='append', metavar='INDEX', help="Save MIME part specified by INDEX to file. May be specified multiple times.")
argparser.add_argument('--html', action='store_true', help="Take the first text/html part as content, not text/plain. Option --body-filter is also recommended here.")
argparser.add_argument('--header', '-H', action='append', default=[], metavar='HEADER', help="Extra headers to show in read-mode besides "+', '.join(CommonUserInterestedHeaders)+". Wildcards are supported.")
argparser.add_argument('--header-filter', '-fh', metavar='COMMAND', help="Filter the headers through this command.")
argparser.add_argument('--body-filter', '-fb', metavar='COMMAND', help=("Filter the message body through this command."))
argparser.add_argument('--attachments-filter', '-fa', metavar='COMMAND', help="Filter the table of attachments through this command (in read-mode).")
argparser.add_argument('FILE', nargs='*', help="Process the given FILEs as raw RFC-822 emails. Read STDIN if FILE is not given.")
option = argparser.parse_args(args=sys.argv[1:])
if option.header_filter == '-': option.header_filter = ''
if option.body_filter == '-': option.body_filter = ''
if option.attachments_filter == '-': option.attachments_filter = ''
if option.save and option.body_filter:
raise Exception("Filter options can not be combined with --save.")
if not option.list and not option.extract and not option.save:
# "read primary part" mode
read_mode = True
else:
read_mode = False
if option.header_filter or option.attachments_filter:
raise Exception("Filter options (header, attachments) are meaningless here.")
primary_content_type = MIMETYPE_TEXT
if option.html:
primary_content_type = MIMETYPE_HTML
if not option.FILE:
option.FILE.append('/dev/stdin')
if read_mode:
option.extract = []
for input_file_index, filename in enumerate(option.FILE):
option.extract.append('%d.primary' % input_file_index)
output = sys.stdout
if option.list:
print_mime_part_td_header()
for idx, filename in enumerate(option.FILE):
eml = load_email(filename)
list_mime_parts(eml, str(idx))
else:
filter_status_codes = []
if option.save:
option.extract = option.save
for idx in option.extract:
indices = idx.split('.')
input_file_index = int(indices[0])
eml = load_email(option.FILE[input_file_index])
part = eml
if len(indices) == 2 and indices[1] == 'primary':
mime_part_indices = []
for part_idx, sub_part in walk_mime_parts(part, str(input_file_index)):
if sub_part.content_type == primary_content_type:
mime_part_indices = part_idx.split('.')[1:]
break
else:
mime_part_indices = indices[1:]
for part_idx in map(int, mime_part_indices):
part = part.parts[part_idx]
if read_mode:
if option.header_filter:
output.flush()
output = os.popen(option.header_filter, 'w')
display_headers(eml, extra_headers=option.header)
if output != sys.stdout:
output.flush()
filter_status = output.close()
filter_status_codes.append(filter_status)
output = sys.stdout
output_write("\n")
output.flush()
if option.save:
attachment_filename = part.filename
# replace directory separator to unicode U+2044 FRACTION SLASH
output_filename = attachment_filename.replace(os.path.sep, u'⁄')
if not output_filename: output_filename = 'attachment-'+idx.replace('.', '-')+'.dat'
output = open(output_filename, 'w')
output.write(part.payload_decoded)
output.flush()
output.close()
sys.stderr.write("emlv: saved: "+output_filename+"\n")
output = sys.stdout
else:
if option.body_filter:
output = os.popen(option.body_filter, 'w')
elif read_mode and option.body_filter != '':
for exename in part.content_type.main, part.content_type.replace('/', '_'):
exepath = os.path.expanduser(os.path.join(default_filter_prefix, exename))
if os.path.exists(exepath):
os.environ['CONTENT_TYPE_CHARSET'] = str(part.header[HDR_CT].param['charset'].decoded)
output = os.popen(exepath, 'w')
break
display(part)
if output != sys.stdout:
output.flush()
filter_status = output.close()
filter_status_codes.append(filter_status)
output = sys.stdout
if read_mode:
output_write('\n') # in case of body did not end with newline
output_write('\n') # separate content from table of attachments
if option.attachments_filter:
output.flush()
output = os.popen(option.attachments_filter, 'w')
print_mime_part_td_header()
list_mime_parts(eml, str(input_file_index))
if output != sys.stdout:
output.flush()
filter_status = output.close()
filter_status_codes.append(filter_status)
output = sys.stdout
output.flush()
filter_status_codes = [code for code in filter_status_codes if code is not None]
if filter_status_codes:
highest_status_code = max(filter_status_codes)
if os.WIFSIGNALED(highest_status_code):
sys.exit(128 + os.WTERMSIG(highest_status_code))
else:
sys.exit(os.WEXITSTATUS(highest_status_code))