diff --git a/README.md b/README.md index b0f0e4c..a15a516 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,8 @@ Elasticsearch For Beginners: Indexing your Gmail Inbox ======================= -#### What's this all about? + +#### What's this all about? I recently looked at my Gmail inbox and noticed that I have well over 50k emails, taking up about 12GB of space but there is no good way to tell what emails take up space, who sent them to, who emails me, etc @@ -15,11 +16,11 @@ __Related tutorial:__ [Index and Search Hacker News using Elasticsearch and the Set up [Elasticsearch](http://ohardt.us/es-install) and make sure it's running at [http://localhost:9200](http://localhost:9200) -I use Python and [Tornado](https://github.com/tornadoweb/tornado/) for the scripts to import and query the data. Run `pip install tornado` to install Tornado. +I use Python and [Tornado](https://github.com/tornadoweb/tornado/) for the scripts to import and query the data. Run `pip install tornado chardet` to install Tornado and chardet. -#### Aight, where do we start? +#### Aight, where do we start? First, go [here](http://ohardt.us/download-gmail-mailbox) and download your Gmail mailbox, depending on the amount of emails you have accumulated this might take a while. @@ -100,7 +101,7 @@ for part in parts: ##### Index the data with Elasticsearch -The most simple aproach is a PUT request per item: +The most simple approach is a PUT request per item: ```python def upload_item_to_es(item): @@ -109,12 +110,12 @@ def upload_item_to_es(item): response = yield http_client.fetch(request) if not response.code in [200, 201]: print "\nfailed to add item %s" % item['message-id'] - + ``` However, Elasticsearch provides a better method for importing large chunks of data: [bulk indexing](http://ohardt.us/es-bulk-indexing) Instead of making a HTTP request per document and indexing individually, we batch them in chunks of eg. 1000 documents and then index them.
-Bulk messages are of the format: +Bulk messages are of the format: ``` cmd\n @@ -195,11 +196,9 @@ You can also quickly query for certain fields via the `q` parameter. This exampl curl "localhost:9200/gmail/email/_search?pretty&q=from:ship-confirm@amazon.com" ``` - - ##### Aggregation queries -Aggregation queries let us bucket data by a given key and count the number of messages per bucket. +Aggregation queries let us bucket data by a given key and count the number of messages per bucket. For example, number of messages grouped by recipient: ``` @@ -255,7 +254,7 @@ Result: "doc_count" : 4285 }, { "key" : "unread", "doc_count" : 510 - }, + }, ... ] } @@ -269,7 +268,7 @@ curl -s "localhost:9200/gmail/email/_search?pretty&search_type=count" -d ' "years": { "date_histogram": { "field": "date_ts", "interval": "year" -}}}} +}}}} ' ``` @@ -296,6 +295,37 @@ Result: } ``` +Write aggregation queries to work out how much you spent on Amazon/Steam: + +``` +GET _search +{ + "query": { + "match_all": {} + }, + "size": 0, + "aggs": { + "group_by_company": { + "terms": { + "field": "order_details.merchant" + }, + "aggs": { + "total_spent": { + "sum": { + "field": "order_details.order_total" + } + }, + "postage": { + "sum": { + "field": "order_details.postage" + } + } + } + } + } + } +``` + #### Todo diff --git a/src/AmazonEmailParser.py b/src/AmazonEmailParser.py new file mode 100644 index 0000000..69ea211 --- /dev/null +++ b/src/AmazonEmailParser.py @@ -0,0 +1,81 @@ +import json +import re + +class AmazonEmailParser(object): + + def __init__(self): + self.orderTotalRE = re.compile(r"(?<=Order Total:) (?:.*?)(\d+.\d+)") + self.postageRE = re.compile(r"(?<=Postage & Packing:) (?:.*?)(\d+.\d+)") + self.deliveryRE = re.compile(r"(?<=Delivery & Handling::) (?:.*?)(\d+.\d+)") + self.orderItemsRE = re.compile(r"==========\r\n\r\n") + self.costRE = re.compile(r"(\d+\.\d+)") + + def canParse(self, email): + try: + if 'auto-confirm@amazon' in email['from']: + return True + else: + return False + except: + return False + + def parse(self, email): + body = email['body'] + + if 'Order Confirmation' in body: + postage = 0 + orderTotal = 0 + + result = re.search(self.orderTotalRE, body) + + if result: + orderTotal = float(result.groups()[0]) + + result = re.search(self.postageRE, body) + + if result: + postage = float(result.groups()[0]) + else: + result = re.search(self.deliveryRE, body) + if result: + postage = float(result.groups()[0]) + + email['order_details'] = { + "order_items" : [], + "order_total" : orderTotal, + "postage" : postage, + "merchant" : "amazon" + } + + orders = re.split(self.orderItemsRE, body)[1] + orders = orders.split('\r\n\r\n') + + #Remove first and last 3 items + orders.pop(0) + orders.pop() + orders.pop() + orders.pop() + + costTotal = orderTotal + + for item in orders: + if 'Your estimated delivery date is:' in item or 'Your order will be sent to:' in item: + continue + else: + lines = item.replace('_','').split('\r\n') + if len(lines) < 4: + continue + itemName = lines[0].strip() + cost = float(re.search(self.costRE, lines[1].strip()).groups()[0]) + condition = lines[2].rpartition(':')[2].strip() + seller = lines[3].replace('Sold by', '').strip() + + email['order_details']['order_items'].append({"item":itemName, "cost":cost, "condition": condition, "seller": seller}) + costTotal -= cost + + if costTotal != 0: + print "Warning order not parsed correctly, order items may be missing, or promotion may have been applied." + print email['order_details'] + print body + + return email diff --git a/src/DelegatingEmailParser.py b/src/DelegatingEmailParser.py new file mode 100644 index 0000000..11c291d --- /dev/null +++ b/src/DelegatingEmailParser.py @@ -0,0 +1,11 @@ +class DelegatingEmailParser(object): + + def __init__(self, parsers): + self.parsers = parsers + + def parse(self, email): + for parser in self.parsers: + if parser.canParse(email): + return parser.parse(email) + + return email diff --git a/src/SteamEmailParser.py b/src/SteamEmailParser.py new file mode 100644 index 0000000..cf7aef9 --- /dev/null +++ b/src/SteamEmailParser.py @@ -0,0 +1,61 @@ +import json +import re + +class SteamEmailParser(object): + + def __init__(self): + self.orderTotalRE = re.compile(r"(?<=Total:)[ \t]+(\d+.\d+)") + self.orderItemsRE = re.compile(r"(?:\.\r\n)+") + self.costRE = re.compile(r"(\d+\.\d+)") + + def canParse(self, email): + try: + if 'noreply@steampowered.com' in email['from']: + return True + else: + return False + except: + return False + + def parse(self, email): + body = email['body'] + + if 'Thank you' in email['subject'] and 'purchase' in body: + orderTotal = 0 + + result = re.search(self.orderTotalRE, body) + + if result: + orderTotal = float(result.groups()[0]) + + email['order_details'] = { + "order_items" : [], + "order_total" : orderTotal, + "merchant" : "steam" + } + + order = re.split(self.orderItemsRE, body)[2].split('\r\n') #This parser to get order total is currently broken, gift purchases are not parsed + + costTotal = orderTotal + + costTotal = orderTotal + + for item in order: + if '-------' in item: + break + else: + if item == '' or ': ' not in item: + continue + splitResult = item.rpartition(':') + itemName = splitResult[0].strip() + cost = float(re.match(self.costRE, splitResult[2].strip()).groups()[0]) + + email['order_details']['order_items'].append({"item":itemName, "cost":cost}) + costTotal -= cost + + if costTotal != 0: + print "Warning order not parsed correctly, order items may be missing, or promotion may have been applied." + print email['order_details'] + print body + + return email diff --git a/src/index_emails.py b/src/index_emails.py index b3b4fc8..20515af 100644 --- a/src/index_emails.py +++ b/src/index_emails.py @@ -7,6 +7,11 @@ import email.utils import mailbox import email +import quopri +import chardet +from DelegatingEmailParser import DelegatingEmailParser +from AmazonEmailParser import AmazonEmailParser +from SteamEmailParser import SteamEmailParser import logging http_client = HTTPClient() @@ -19,13 +24,12 @@ def delete_index(): try: url = "%s/%s?refresh=true" % (tornado.options.options.es_url, tornado.options.options.index_name) request = HTTPRequest(url, method="DELETE", request_timeout=240) + body = {"refresh": True} response = http_client.fetch(request) logging.info('Delete index done %s' % response.body) except: pass - - def create_index(): schema = { @@ -135,6 +139,9 @@ def load_from_file(): upload_data = list() logging.info("Starting import from file %s" % tornado.options.options.infile) mbox = mailbox.UnixMailbox(open(tornado.options.options.infile, 'rb'), email.message_from_file) + + emailParser = DelegatingEmailParser([AmazonEmailParser(), SteamEmailParser()]) + for msg in mbox: count += 1 if count < tornado.options.options.skip: