Skip to content

Commit

Permalink
Merge pull request #37 from jcantrill/log2770_viaq
Browse files Browse the repository at this point in the history
LOG-2770: Optimize viaq data plugin
  • Loading branch information
openshift-merge-robot authored Jul 20, 2022
2 parents 5bb1c5a + ae4e8b6 commit 67403da
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 209 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,12 @@ class ViaqDataModelFilter < Filter
desc 'Enable functionality to flatten kubernetes.labels'
config_param :enable_flatten_labels, :bool, default: false

desc 'Enable functionality to add openshift normalizations'
config_param :enable_openshift_model, :bool, default: true

desc 'Enable functionality to prune empty fields from record'
config_param :enable_prune_empty_fields, :bool, default: true

desc 'Enable functionality to prune kubernetes.labels and remove the set except for exclusions'
config_param :enable_prune_labels, :bool, default: false

Expand Down Expand Up @@ -262,11 +268,8 @@ def configure(conf)
@formatter_cache = {}
@formatter_cache_nomatch = {}
end
begin
@docker_hostname = File.open('/etc/docker-hostname') { |f| f.readline }.rstrip
rescue
@docker_hostname = ENV['NODE_NAME'] || nil
end

@node_name = ENV['NODE_NAME'] || nil
@ipaddr4 = ENV['IPADDR4'] || '127.0.0.1'
@ipaddr6 = nil

Expand All @@ -278,6 +281,22 @@ def configure(conf)

configure_elasticsearch_index_names

@chain = []

@chain << lambda {|tag,time,record| check_for_match_and_format(tag, time, record)} if @formatters.length > 0
@chain << lambda {|tag,time,record| add_pipeline_metadata(tag, time, record)} if @enable_openshift_model
if @undefined_to_string || @use_undefined || @undefined_dot_replace_char || (@undefined_max_num_fields > NUM_FIELDS_UNLIMITED)
@chain << lambda {|tag,time,record| handle_undefined_fields(tag, time, record)}
end
@chain << lambda {|tag,time,record| add_openshift_data(record)} if @enable_openshift_model
@chain << lambda {|tag,time,record| prune_empty_fields(record)} if @enable_prune_empty_fields
@chain << lambda {|tag,time,record| rename_time_field(record)} if (@rename_time || @rename_time_if_missing)
@chain << lambda {|tag,time,record| flatten_labels(record)} if @enable_flatten_labels
@chain << lambda {|tag,time,record| prune_labels(record, @prune_labels_exclusions)} if @enable_prune_labels
@chain << lambda {|tag,time,record| add_elasticsearch_index_name_field(tag, time, record)} unless @elasticsearch_index_names.empty?

log.info "Configured #{@chain.length} handlers for viaq_data_model"

end

def start
Expand Down Expand Up @@ -316,37 +335,27 @@ def process_sys_var_log_fields(tag, time, record, fmtr = nil)
end
record['time'] = timeobj.utc.to_datetime.rfc3339(6)
end
if record['host'].eql?('localhost') && @docker_hostname
record['hostname'] = @docker_hostname
if record['host'].eql?('localhost') && @node_name
record['hostname'] = @node_name
else
record['hostname'] = record['host']
end
end

def process_k8s_json_file_fields(tag, time, record, fmtr = nil)
record['message'] = record['message'] || record['log']
# TODO remove this line once parser changes merge. assume 'message' is the default
record['message'] = record['log'] if record['message'].nil?
normalize_level!(record)
if record.key?('kubernetes') && record['kubernetes'].respond_to?(:fetch) && \
(k8shost = record['kubernetes'].fetch('host', nil))
if record.key?('kubernetes') && record['kubernetes'].respond_to?(:key?) && \
(k8shost = record['kubernetes']['host'])
record['hostname'] = k8shost
elsif @docker_hostname
record['hostname'] = @docker_hostname
end
if record[@dest_time_name].nil? # e.g. already has @timestamp
unless record['time'].nil?
# convert from string - parses a wide variety of formats
rectime = Time.parse(record['time'])
else
# convert from time_t
rectime = Time.at(time)
end
record['time'] = rectime.utc.to_datetime.rfc3339(6)
elsif @node_name
record['hostname'] = @node_name
end
transform_eventrouter(tag, record, fmtr)
end

def check_for_match_and_format(tag, time, record)
return unless @formatters
return if @formatter_cache_nomatch[tag]
fmtr = @formatter_cache[tag]
unless fmtr
Expand All @@ -360,10 +369,6 @@ def check_for_match_and_format(tag, time, record)
end
fmtr.fmtr_func.call(tag, time, record, fmtr)

if record[@dest_time_name].nil? && record['time'].nil?
record['time'] = Time.at(time).utc.to_datetime.rfc3339(6)
end

if fmtr.fmtr_remove_keys
fmtr.fmtr_remove_keys.each{|k| record.delete(k)}
end
Expand Down Expand Up @@ -396,83 +401,66 @@ def transform_eventrouter(tag, record, fmtr)
((record['pipeline_metadata'] ||= {})[@pipeline_type.to_s] ||= {})['original_raw_message'] = record['message']
end
record['message'] = record["kubernetes"]["event"].delete("message")
record['time'] = record["kubernetes"]["event"]["metadata"].delete("creationTimestamp")
record[@dest_time_name] = record["kubernetes"]["event"]["metadata"].delete("creationTimestamp")
end
end

def handle_undefined_fields(tag, time, record)
if @undefined_to_string || @use_undefined || @undefined_dot_replace_char || (@undefined_max_num_fields > NUM_FIELDS_UNLIMITED)
# undefined contains all of the fields not in keep_fields
undefined_keys = record.keys - @keep_fields.keys
return if undefined_keys.empty?
if @undefined_max_num_fields > NUM_FIELDS_UNLIMITED && undefined_keys.length > @undefined_max_num_fields
undefined = {}
undefined_keys.each{|k|undefined[k] = record.delete(k)}
record[@undefined_name] = JSON.dump(undefined)
# undefined contains all of the fields not in keep_fields
undefined_keys = record.keys - @keep_fields.keys
return if undefined_keys.empty?
if @undefined_max_num_fields > NUM_FIELDS_UNLIMITED && undefined_keys.length > @undefined_max_num_fields
undefined = {}
undefined_keys.each{|k|undefined[k] = record.delete(k)}
record[@undefined_name] = JSON.dump(undefined)
else
if @use_undefined
record[@undefined_name] = {}
modify_hsh = record[@undefined_name]
else
modify_hsh = record
end
undefined_keys.each do |k|
origk = k
if @use_undefined
record[@undefined_name] = {}
modify_hsh = record[@undefined_name]
else
modify_hsh = record
modify_hsh[k] = record.delete(k)
end
if @undefined_dot_replace_char && k.index('.')
newk = k.gsub('.', @undefined_dot_replace_char)
modify_hsh[newk] = modify_hsh.delete(k)
k = newk
end
undefined_keys.each do |k|
origk = k
if @use_undefined
modify_hsh[k] = record.delete(k)
end
if @undefined_dot_replace_char && k.index('.')
newk = k.gsub('.', @undefined_dot_replace_char)
modify_hsh[newk] = modify_hsh.delete(k)
k = newk
end
if @undefined_to_string && !modify_hsh[k].is_a?(String)
modify_hsh[k] = JSON.dump(modify_hsh[k])
end
if @undefined_to_string && !modify_hsh[k].is_a?(String)
modify_hsh[k] = JSON.dump(modify_hsh[k])
end
end
end
end

def filter(tag, time, record)
if ENV['CDM_DEBUG']
unless tag == ENV['CDM_DEBUG_IGNORE_TAG']
log.error("input #{time} #{tag} #{record}")
def rename_time_field(record)
# rename the time field
if (@rename_time || @rename_time_if_missing) && record.key?(@src_time_name)
val = record.delete(@src_time_name)
unless @rename_time_if_missing && record.key?(@dest_time_name)
record[@dest_time_name] = val
end
end
end

check_for_match_and_format(tag, time, record)
add_pipeline_metadata(tag, time, record)
handle_undefined_fields(tag, time, record)
add_openshift_data(record)
def prune_empty_fields(record)
# remove the field from record if it is not in the list of fields to keep and
# it is empty
record.delete_if{|k,v| !@keep_empty_fields_hash.key?(k) && (v.nil? || isempty(delempty(v)) || isempty(v))}
# probably shouldn't remove everything . . .
log.warn("Empty record! tag [#{tag}] time [#{time}]") if record.empty?
# rename the time field
if (@rename_time || @rename_time_if_missing) && record.key?(@src_time_name)
val = record.delete(@src_time_name)
unless @rename_time_if_missing && record.key?(@dest_time_name)
record[@dest_time_name] = val
end
end
end

flatten_labels(record) if @enable_flatten_labels
prune_labels(record, @prune_labels_exclusions) if @enable_prune_labels
def filter(tag, time, record)

if !@elasticsearch_index_names.empty?
add_elasticsearch_index_name_field(tag, time, record)
elsif ENV['CDM_DEBUG']
unless tag == ENV['CDM_DEBUG_IGNORE_TAG']
log.error("not adding elasticsearch index name or prefix")
end
end
if ENV['CDM_DEBUG']
unless tag == ENV['CDM_DEBUG_IGNORE_TAG']
log.error("output #{time} #{tag} #{record}")
end
@chain.each do |l|
l.call(tag,time,record)
end

record
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,13 @@ def process_journal_fields(tag, time, record, fmtr)
break
end
end
if record['time'].nil? # e.g. already has @timestamp
timeobj = Time.at(time)
if timeobj > Time.now
timeobj = Time.new((timeobj.year - 1), timeobj.month, timeobj.day, timeobj.hour, timeobj.min, timeobj.sec, timeobj.utc_offset)
end
record['time'] = timeobj.utc.to_datetime.rfc3339(6)
end
case fmtr.type
when :sys_journal
record['message'] = record['MESSAGE']
Expand Down
Loading

0 comments on commit 67403da

Please sign in to comment.