diff --git a/fluentd/Dockerfile b/fluentd/Dockerfile index 2bd21ac8c..a368f4cfc 100644 --- a/fluentd/Dockerfile +++ b/fluentd/Dockerfile @@ -48,6 +48,7 @@ RUN yum-config-manager --enable rhel-7-server-ose-3.7-rpms && \ ADD configs.d/ /etc/fluent/configs.d/ ADD filter_k8s_meta_for_mux_client.rb /etc/fluent/plugin/ +ADD out_syslog_buffered.rb out_syslog.rb /etc/fluent/plugin/ ADD run.sh generate_throttle_configs.rb generate_syslog_config.rb ${HOME}/ COPY lib/*.gem /tmp/ RUN cd /tmp/ && scl enable ${SCL_VERSION} -- \ diff --git a/fluentd/Dockerfile.centos7 b/fluentd/Dockerfile.centos7 index 684622b25..7748780f7 100644 --- a/fluentd/Dockerfile.centos7 +++ b/fluentd/Dockerfile.centos7 @@ -53,6 +53,7 @@ RUN curl -L -s https://github.com/ViaQ/fluent-plugin-elasticsearch/releases/down ADD configs.d/ /etc/fluent/configs.d/ ADD filter_k8s_meta_for_mux_client.rb /etc/fluent/plugin/ +ADD out_syslog_buffered.rb out_syslog.rb /etc/fluent/plugin/ ADD run.sh generate_throttle_configs.rb generate_syslog_config.rb ${HOME}/ COPY lib/fluent-plugin-viaq_docker_audit_log_parser-${FLUENTD_AUDIT_LOG_PARSER_VERSION}.gem /tmp/ RUN scl enable ${SCL_VERSION} -- \ diff --git a/fluentd/out_syslog.rb b/fluentd/out_syslog.rb new file mode 100644 index 000000000..391da4e2a --- /dev/null +++ b/fluentd/out_syslog.rb @@ -0,0 +1,107 @@ +require 'fluent/mixin/config_placeholders' +module Fluent + + class SyslogOutput < Fluent::Output + # First, register the plugin. NAME is the name of this plugin + # and identifies the plugin in the configuration file. + Fluent::Plugin.register_output('syslog', self) + + # This method is called before starting. + + config_param :remote_syslog, :string, :default => nil + config_param :port, :integer, :default => 25 + config_param :hostname, :string, :default => "" + config_param :remove_tag_prefix, :string, :default => nil + config_param :tag_key, :string, :default => nil + config_param :facility, :string, :default => 'user' + config_param :severity, :string, :default => 'debug' + config_param :use_record, :string, :default => nil + config_param :payload_key, :string, :default => 'message' + + + def initialize + super + require 'socket' + require 'syslog_protocol' + end + + def configure(conf) + super + if not conf['remote_syslog'] + raise Fluent::ConfigError.new("remote syslog required") + end + @socket = UDPSocket.new + @packet = SyslogProtocol::Packet.new + if remove_tag_prefix = conf['remove_tag_prefix'] + @remove_tag_prefix = Regexp.new('^' + Regexp.escape(remove_tag_prefix)) + end + @facilty = conf['facility'] + @severity = conf['severity'] + @use_record = conf['use_record'] + @payload_key = conf['payload_key'] + if not @payload_key + @payload_key = "message" + end + end + + + # This method is called when starting. + def start + super + end + + # This method is called when shutting down. + def shutdown + super + end + + # This method is called when an event reaches Fluentd. + # 'es' is a Fluent::EventStream object that includes multiple events. + # You can use 'es.each {|time,record| ... }' to retrieve events. + # 'chain' is an object that manages transactions. Call 'chain.next' at + # appropriate points and rollback if it raises an exception. + def emit(tag, es, chain) + tag = tag.sub(@remove_tag_prefix, '') if @remove_tag_prefix + chain.next + es.each {|time,record| + @packet.hostname = hostname + if @use_record + @packet.facility = record['facility'] || @facilty + @packet.severity = record['severity'] || @severity + else + @packet.facility = @facilty + @packet.severity = @severity + end + if record['time'] + time = Time.parse(record['time']) + else + time = Time.now + end + @packet.time = time + @packet.tag = if @tag_key + begin + record[@tag_key][0..31].gsub(/[\[\]]/,'') # tag is trimmed to 32 chars for syslog_protocol gem compatibility + rescue + tag[0..31] # tag is trimmed to 32 chars for syslog_protocol gem compatibility + end + else + tag[0..31] # tag is trimmed to 32 chars for syslog_protocol gem compatibility + end + packet = @packet.dup + packet.content = record[@payload_key] + @socket.send(packet.assemble, 0, @remote_syslog, @port) + } + end + end + class Time < Time + def timezone(timezone = 'UTC') + old = ENV['TZ'] + utc = self.dup.utc + ENV['TZ'] = timezone + output = utc.localtime + ENV['TZ'] = old + output + end + end + +end diff --git a/fluentd/out_syslog_buffered.rb b/fluentd/out_syslog_buffered.rb new file mode 100644 index 000000000..9044f8538 --- /dev/null +++ b/fluentd/out_syslog_buffered.rb @@ -0,0 +1,150 @@ +require 'fluent/mixin/config_placeholders' +module Fluent + class SyslogBufferedOutput < Fluent::BufferedOutput + # First, register the plugin. NAME is the name of this plugin + # and identifies the plugin in the configuration file. + Fluent::Plugin.register_output('syslog_buffered', self) + + # This method is called before starting. + + config_param :remote_syslog, :string, :default => "" + config_param :port, :integer, :default => 25 + config_param :hostname, :string, :default => "" + config_param :remove_tag_prefix, :string, :default => nil + config_param :tag_key, :string, :default => nil + config_param :facility, :string, :default => 'user' + config_param :severity, :string, :default => 'debug' + config_param :use_record, :string, :default => nil + config_param :payload_key, :string, :default => 'message' + + + def initialize + super + require 'socket' + require 'syslog_protocol' + require 'timeout' + end + + def configure(conf) + super + if not conf['remote_syslog'] + raise Fluent::ConfigError.new("remote syslog required") + end + @socket = create_tcp_socket(conf['remote_syslog'], conf['port']) + @packet = SyslogProtocol::Packet.new + if remove_tag_prefix = conf['remove_tag_prefix'] + @remove_tag_prefix = Regexp.new('^' + Regexp.escape(remove_tag_prefix)) + end + @facilty = conf['facility'] + @severity = conf['severity'] + @use_record = conf['use_record'] + @payload_key = conf['payload_key'] + if not @payload_key + @payload_key = "message" + end + end + + def format(tag, time, record) + [tag, time, record].to_msgpack + end + + def create_tcp_socket(host, port) + begin + Timeout.timeout(10) do + begin + socket = TCPSocket.new(host, port) + rescue Errno::ENETUNREACH + retry + end + end + socket = TCPSocket.new(host, port) + secs = Integer(1) + usecs = Integer((1 - secs) * 1_000_000) + optval = [secs, usecs].pack("l_2") + socket.setsockopt Socket::SOL_SOCKET, Socket::SO_SNDTIMEO, optval + rescue SocketError, Errno::ECONNREFUSED, Errno::ECONNRESET, Errno::EPIPE, Timeout::Error, OpenSSL::SSL::SSLError, Timeout::Error => e + log.warn "out:syslog: failed to open tcp socket #{@remote_syslog}:#{@port} :#{e}" + socket = nil + end + socket + end + + # This method is called when starting. + def start + super + end + + # This method is called when shutting down. + def shutdown + super + end + + + def write(chunk) + chunk.msgpack_each {|(tag,time,record)| + send_to_syslog(tag, time, record) + } + end + + def send_to_syslog(tag, time, record) + tag = tag.sub(@remove_tag_prefix, '') if @remove_tag_prefix + @packet.hostname = hostname + if @use_record + @packet.facility = record['facility'] || @facilty + @packet.severity = record['severity'] || @severity + else + @packet.facility = @facilty + @packet.severity = @severity + end + if record['time'] + time = Time.parse(record['time']) + else + time = Time.now + end + @packet.time = time + @packet.tag = if @tag_key + begin + record[@tag_key][0..31].gsub(/[\[\]]/,'') # tag is trimmed to 32 chars for syslog_protocol gem compatibility + rescue + tag[0..31] # tag is trimmed to 32 chars for syslog_protocol gem compatibility + end + else + tag[0..31] # tag is trimmed to 32 chars for syslog_protocol gem compatibility + end + packet = @packet.dup + packet.content = record[@payload_key] + begin + if not @socket + @socket = create_tcp_socket(@remote_syslog, @port) + end + if @socket + begin + @socket.write packet.assemble + "\n" + @socket.flush + rescue SocketError, Errno::ECONNREFUSED, Errno::ECONNRESET, Errno::EPIPE, Timeout::Error, OpenSSL::SSL::SSLError => e + log.warn "out:syslog: connection error by #{@remote_syslog}:#{@port} :#{e}" + @socket = nil + raise #{e} + end + else + log.warn "out:syslog: Socket connection couldn't be reestablished" + raise #{e} + end + end + end + + + end + + class Time + def timezone(timezone = 'UTC') + old = ENV['TZ'] + utc = self.dup.utc + ENV['TZ'] = timezone + output = utc.localtime + ENV['TZ'] = old + output + end + end +end + diff --git a/fluentd/run.sh b/fluentd/run.sh index 2e99354ac..87672301f 100644 --- a/fluentd/run.sh +++ b/fluentd/run.sh @@ -238,7 +238,8 @@ if [[ "${USE_REMOTE_SYSLOG:-}" = "true" ]] ; then # The symlink is a workaround for https://github.com/openshift/origin-aggregated-logging/issues/604 found= for file in /usr/share/gems/gems/fluent-plugin-remote-syslog-*/lib/fluentd/plugin/*.rb ; do - if [ -f "$file" ] ; then + bname=$(basename $file) + if [ ! -e "/etc/fluent/plugin/$bname" -a -f "$file" ] ; then ln -s $file /etc/fluent/plugin/ found=true fi @@ -246,7 +247,8 @@ if [[ "${USE_REMOTE_SYSLOG:-}" = "true" ]] ; then if [ -z "${found:-}" ] ; then # not found in rpm location - look in alternate location for file in /opt/app-root/src/gems/fluent-plugin-remote-syslog*/lib/fluentd/plugin/*.rb ; do - if [ -f "$file" ] ; then + bname=$(basename $file) + if [ ! -e "/etc/fluent/plugin/$bname" -a -f "$file" ] ; then ln -s $file /etc/fluent/plugin/ fi done diff --git a/test/remote-syslog.sh b/test/remote-syslog.sh index c8b3066b9..29cd625c0 100755 --- a/test/remote-syslog.sh +++ b/test/remote-syslog.sh @@ -51,7 +51,6 @@ os::log::debug "$( oc set env daemonset/logging-fluentd USE_REMOTE_SYSLOG=true R os::log::debug "$( oc label node --all logging-infra-fluentd=true --overwrite=true )" os::cmd::try_until_text "oc get pods -l component=fluentd" "^logging-fluentd-.* Running " - fpod=$( get_running_pod fluentd ) os::cmd::try_until_failure "oc exec $fpod find /etc/fluent/configs.d/dynamic/output-remote-syslog.conf" @@ -69,5 +68,56 @@ fpod=$( get_running_pod fluentd ) os::cmd::try_until_text "oc exec $fpod grep '' /etc/fluent/configs.d/dynamic/output-remote-syslog.conf | wc -l" '^2$' +os::log::info Test 4, making sure tag_key=message does not cause remote-syslog plugin crash + +os::log::debug "$( oc label node --all logging-infra-fluentd- )" +os::cmd::try_until_text "oc get daemonset logging-fluentd -o jsonpath='{ .status.numberReady }'" "0" $FLUENTD_WAIT_TIME + +os::log::debug "$( oc set env daemonset/logging-fluentd USE_REMOTE_SYSLOG=true REMOTE_SYSLOG_HOST=127.0.0.1 REMOTE_SYSLOG_TAG_KEY=message)" +os::log::debug "$( oc label node --all logging-infra-fluentd=true --overwrite=true )" +os::cmd::try_until_text "oc get pods -l component=fluentd" "^logging-fluentd-.* Running " + +fpod=$( get_running_pod fluentd ) +os::cmd::try_until_success "oc exec $fpod find /etc/fluent/configs.d/dynamic/output-remote-syslog.conf" +os::cmd::expect_success "oc exec $fpod grep 'tag_key message' /etc/fluent/configs.d/dynamic/output-remote-syslog.conf" +os::cmd::expect_success_and_not_text "oc logs $fpod" "nil:NilClass" + +extra_rsyslog_artifacts=$ARTIFACT_DIR/rsyslog-artifacts.txt +if [ -n "${DEBUG:-}" ] ; then + echo Test 4, making sure tag_key=message does not cause remote-syslog plugin crash > $extra_rsyslog_artifacts + echo "$( date --rfc-3339=ns )" "Enabled REMOTE_SYSLOG: USE_REMOTE_SYSLOG=true, REMOTE_SYSLOG_HOST=127.0.0.1, REMOTE_SYSLOG_HOST2=127.0.0.1, REMOTE_SYSLOG_TAG_KEY=message" >> $extra_rsyslog_artifacts + + oc logs $fpod >> $extra_rsyslog_artifacts 2>&1 + + echo "output-remote-syslog.conf: " >> $extra_rsyslog_artifacts + + oc exec $fpod -- cat /etc/fluent/configs.d/dynamic/output-remote-syslog.conf >> $extra_rsyslog_artifacts +fi + +os::log::info Test 5, making sure tag_key=bogus does not cause remote-syslog plugin crash + +os::log::debug "$( oc label node --all logging-infra-fluentd- )" +os::cmd::try_until_text "oc get daemonset logging-fluentd -o jsonpath='{ .status.numberReady }'" "0" $FLUENTD_WAIT_TIME + +os::log::debug "$( oc set env daemonset/logging-fluentd USE_REMOTE_SYSLOG=true REMOTE_SYSLOG_HOST=127.0.0.1 REMOTE_SYSLOG_TAG_KEY=bogus)" +os::log::debug "$( oc label node --all logging-infra-fluentd=true --overwrite=true )" +os::cmd::try_until_text "oc get pods -l component=fluentd" "^logging-fluentd-.* Running " + +fpod=$( get_running_pod fluentd ) +os::cmd::try_until_success "oc exec $fpod find /etc/fluent/configs.d/dynamic/output-remote-syslog.conf" +os::cmd::expect_success "oc exec $fpod grep 'tag_key bogus' /etc/fluent/configs.d/dynamic/output-remote-syslog.conf" +os::cmd::expect_success_and_not_text "oc logs $fpod" "nil:NilClass" + +if [ -n "${DEBUG:-}" ] ; then + echo Test Test 5, making sure tag_key=bogus does not cause remote-syslog plugin crash >> $extra_rsyslog_artifacts + echo "$( date --rfc-3339=ns )" "Enabled REMOTE_SYSLOG: USE_REMOTE_SYSLOG=true, REMOTE_SYSLOG_HOST=127.0.0.1, REMOTE_SYSLOG_HOST2=127.0.0.1, REMOTE_SYSLOG_TAG_KEY=bogus" >> $extra_rsyslog_artifacts + + oc logs $fpod >> $extra_rsyslog_artifacts 2>&1 + + echo "output-remote-syslog.conf: " >> $extra_rsyslog_artifacts + + oc exec $fpod -- cat /etc/fluent/configs.d/dynamic/output-remote-syslog.conf >> $extra_rsyslog_artifacts +fi + reset_fluentd_daemonset os::test::junit::reconcile_output