-
Notifications
You must be signed in to change notification settings - Fork 86
/
server_ip_address.py
69 lines (54 loc) · 2.13 KB
/
server_ip_address.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import ujson as json
from urllib.parse import urlparse
from pyspark.sql.types import StructType, StructField, StringType, LongType
from sparkcc import CCSparkJob
class ServerIPAddressJob(CCSparkJob):
""" Collect server IP addresses from WARC response records
(WARC and WAT is allowed as input)"""
name = "ServerIPAddresses"
output_schema = StructType([
StructField("key", StructType([
StructField("host", StringType(), True),
StructField("ip", StringType(), True)]), True),
StructField("cnt", LongType(), True)
])
response_no_ip_address = '(no IP address)'
response_no_host = '(no host name)'
def process_record(self, record):
ip_address = None
url = None
if self.is_wat_json_record(record):
# WAT (response) record
record = json.loads(self.get_payload_stream(record).read())
try:
warc_header = record['Envelope']['WARC-Header-Metadata']
if warc_header['WARC-Type'] != 'response':
# WAT request or metadata records
return
if 'WARC-IP-Address' in warc_header:
ip_address = warc_header['WARC-IP-Address']
url = warc_header['WARC-Target-URI']
else:
# WAT metadata records
return
except KeyError:
pass
elif self.is_response_record(record):
# WARC response record
ip_address = self.get_warc_header(record, 'WARC-IP-Address')
url = self.get_warc_header(record, 'WARC-Target-URI')
else:
# warcinfo, request, non-WAT metadata records
return
if not ip_address or ip_address == '':
ip_address = ServerIPAddressJob.response_no_ip_address
host_name = ServerIPAddressJob.response_no_host
if url:
try:
host_name = urlparse(url).hostname
except:
pass
yield (host_name, ip_address), 1
if __name__ == "__main__":
job = ServerIPAddressJob()
job.run()