Skip to content

Commit

Permalink
Use bucket based on placeholder
Browse files Browse the repository at this point in the history
Signed-off-by: rmontenegroo <[email protected]>
  • Loading branch information
rmontenegroo committed Dec 21, 2020
1 parent 26ce74e commit b5151b4
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 25 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,5 @@ vendor
.ruby-version

test/tmp/

docker/
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.5.0
1.5.0
92 changes: 72 additions & 20 deletions lib/fluent/plugin/out_s3.rb
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ def initialize
config_param :aws_iam_retries, :integer, default: nil, deprecated: "Use 'instance_profile_credentials' instead"
desc "S3 bucket name"
config_param :s3_bucket, :string
desc "Set bucket name fallback if fails fetching from placeholders"
config_param :s3_bucket_fallback, :string, :default => nil
desc "S3 region name"
config_param :s3_region, :string, default: ENV["AWS_REGION"] || "us-east-1"
desc "Use 's3_region' instead"
Expand Down Expand Up @@ -249,11 +251,14 @@ def start

s3_client = Aws::S3::Client.new(options)
@s3 = Aws::S3::Resource.new(client: s3_client)
@bucket = @s3.bucket(@s3_bucket)

check_apikeys if @check_apikey_on_start
ensure_bucket if @check_bucket
ensure_bucket_lifecycle
@tag_placeholders = get_placeholders_tag(@s3_bucket)
@keys_placeholders = get_placeholders_keys(@s3_bucket)
@time_placeholders = ext_get_placeholders_time(@s3_bucket)

if @tag_placeholders.empty? && @keys_placeholders.empty? && @time_placeholders.empty?
@bucket = create_bucket(@s3_bucket)
end

super
end
Expand All @@ -263,7 +268,28 @@ def format(tag, time, record)
@formatter.format(tag, time, r)
end

def create_bucket(name)
bucket = @s3.bucket(name)
check_apikeys(bucket) if @check_apikey_on_start
ensure_bucket(bucket) if @check_bucket
ensure_bucket_lifecycle(bucket)
bucket
end

def use_fallback(placeholder)
if !@s3_bucket_fallback
raise "It was not possible to extract '#{placeholder}' placeholder from chunk and @s3_bucket_fallback is not set."
end
log.warn "Using @s3_bucket_fallback ('#{@s3_bucket_fallback}') as a fallback bucket name."
@s3_bucket_fallback
end

def ext_get_placeholders_time(str)
output = [ "%S", "%M", "%H", "%d", "%m", "%Y" ].select { |tp| str.include? tp }
end

def write(chunk)

i = 0
metadata = chunk.metadata
previous_path = nil
Expand All @@ -273,6 +299,31 @@ def write(chunk)
@time_slice_with_tz.call(metadata.timekey)
end

bucket = @bucket ? @bucket : nil

if (!bucket) && (!@tag_placeholders.empty?)
if (!chunk.metadata.tag) || ((@tag_placeholders.max + 1) > chunk.metadata.tag.split('.').length)
bucket = create_bucket(use_fallback("tag"))
end
end

if !bucket
@keys_placeholders.each do |placeholder|
if (!chunk.metadata.variables) || (!chunk.metadata.variables.keys.include?(placeholder.to_sym))
bucket = create_bucket(use_fallback(placeholder))
break
end
end
end

if (!bucket) && (!chunk.metadata.timekey) && @time_placeholders
bucket = create_bucket(use_fallback("time"))
end

if !bucket
bucket = create_bucket(extract_placeholders(@s3_bucket, chunk))
end

if @check_object
begin
@values_for_s3_object_chunk[chunk.unique_id] ||= {
Expand Down Expand Up @@ -304,7 +355,7 @@ def write(chunk)

i += 1
previous_path = s3path
end while @bucket.object(s3path).exists?
end while bucket.object(s3path).exists?
else
if @localtime
hms_slicer = Time.now.strftime("%H%M%S")
Expand Down Expand Up @@ -362,18 +413,19 @@ def write(chunk)
put_options[:metadata][k] = extract_placeholders(v, chunk).gsub(%r(%{[^}]+}), {"%{index}" => sprintf(@index_format, i - 1)})
end
end
@bucket.object(s3path).put(put_options)
bucket.object(s3path).put(put_options)

@values_for_s3_object_chunk.delete(chunk.unique_id)

if @warn_for_delay
if Time.at(chunk.metadata.timekey) < Time.now - @warn_for_delay
log.warn "out_s3: delayed events were put to s3://#{@s3_bucket}/#{s3path}"
log.warn "out_s3: delayed events were put to s3://#{bucket.name}/#{s3path}"
end
end
ensure
tmp.close(true) rescue nil
end

end

private
Expand All @@ -399,34 +451,34 @@ def timekey_to_timeformat(timekey)
end
end

def ensure_bucket
if !@bucket.exists?
def ensure_bucket(bucket)
if !bucket.exists?
if @auto_create_bucket
log.info "Creating bucket #{@s3_bucket} on #{@s3_endpoint}"
@s3.create_bucket(bucket: @s3_bucket)
log.info "Creating bucket #{bucket.name} on #{@s3_endpoint}"
@s3.create_bucket(bucket: bucket.name)
else
raise "The specified bucket does not exist: bucket = #{@s3_bucket}"
raise "The specified bucket does not exist: bucket = #{bucket.name}"
end
end
end

def ensure_bucket_lifecycle
def ensure_bucket_lifecycle(bucket)
unless @bucket_lifecycle_rules.empty?
old_rules = get_bucket_lifecycle_rules
old_rules = get_bucket_lifecycle_rules(bucket)
new_rules = @bucket_lifecycle_rules.sort_by { |rule| rule.id }.map do |rule|
{ id: rule.id, expiration: { days: rule.expiration_days }, prefix: rule.prefix, status: "Enabled" }
end

unless old_rules == new_rules
log.info "Configuring bucket lifecycle rules for #{@s3_bucket} on #{@s3_endpoint}"
@bucket.lifecycle_configuration.put({ lifecycle_configuration: { rules: new_rules } })
log.info "Configuring bucket lifecycle rules for #{bucket.name} on #{@s3_endpoint}"
bucket.lifecycle_configuration.put({ lifecycle_configuration: { rules: new_rules } })
end
end
end

def get_bucket_lifecycle_rules
def get_bucket_lifecycle_rules(bucket)
begin
@bucket.lifecycle_configuration.rules.sort_by { |rule| rule[:id] }.map do |rule|
bucket.lifecycle_configuration.rules.sort_by { |rule| rule[:id] }.map do |rule|
{ id: rule[:id], expiration: { days: rule[:expiration][:days] }, prefix: rule[:prefix], status: rule[:status] }
end
rescue Aws::S3::Errors::NoSuchLifecycleConfiguration
Expand Down Expand Up @@ -461,8 +513,8 @@ def check_s3_path_safety(conf)
end
end

def check_apikeys
@bucket.objects(prefix: @path, :max_keys => 1).first
def check_apikeys(bucket)
bucket.objects(prefix: @path, :max_keys => 1).first
rescue Aws::S3::Errors::NoSuchBucket
# ignore NoSuchBucket Error because ensure_bucket checks it.
rescue => e
Expand Down
8 changes: 4 additions & 4 deletions test/test_out_s3.rb
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ def write(chunk)

private

def ensure_bucket
def ensure_bucket(bucket)
end

def check_apikeys
def check_apikeys(bucket)
end
end.configure(conf)
end
Expand Down Expand Up @@ -287,7 +287,7 @@ def write(chunk)

private

def check_apikeys
def check_apikeys(bucket)
end
end.configure(conf)
end
Expand Down Expand Up @@ -427,7 +427,7 @@ def setup_mocks(exists_return = false)
mock(Aws::S3::Client).new(anything).at_least(0) { @s3_client }
@s3_resource = mock(Aws::S3::Resource.new(client: @s3_client))
mock(Aws::S3::Resource).new(client: @s3_client) { @s3_resource }
@s3_bucket = mock(Aws::S3::Bucket.new(name: "test",
@s3_bucket = mock(Aws::S3::Bucket.new(name: "test_bucket",
client: @s3_client))
@s3_bucket.exists? { exists_return }
@s3_object = mock(Aws::S3::Object.new(bucket_name: "test_bucket",
Expand Down

0 comments on commit b5151b4

Please sign in to comment.