-
Notifications
You must be signed in to change notification settings - Fork 0
/
s3toelasticsearch.rb
70 lines (53 loc) · 1.49 KB
/
s3toelasticsearch.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
require 'aws-sdk'
require 'elasticsearch'
require 'json'
credentials = JSON.parse(File.read './config/s3-credentials-server.json')
AWS.config(
access_key_id: credentials['accessKey'],
secret_access_key: credentials['secretKey'],
region: 'us-east-1'
)
es = Elasticsearch::Client.new log: true
def es_index client, key, body
client.index index: 'cryptogem', type: 'locator', id: key, body: body
end
queue = AWS::SQS.new.queues.named 'crytogem-locator'
while true
queue_message = queue.receive_message
if queue_message
puts 'found message'
puts queue_message.body
message = JSON.parse queue_message.body
def s3
AWS::S3.new.buckets['cryptogem.locator']
end
def get_from_s3 key
s3.objects[key].read
end
records = message['Records'].map { |record|
begin
content = get_from_s3 record['s3']['object']['key']
metadata = content.split('$$').first
rescue AWS::S3::Errors::AccessDenied
queue_message.delete
metadata = ''
end
if !metadata.empty?
metadata = JSON.parse metadata
{
'time' => record['eventTime'],
'ip' => record['requestParameters']['sourceIPAddress'],
'key' => record['s3']['object']['key'],
'eTag' => record['s3']['object']['eTag'],
}.merge metadata
else
false
end
}.select{ |gem| gem }.each { |gem|
es_index es, gem['key'], gem
}
queue_message.delete
puts records
end
sleep 1
end