Fluentd
1.0
1.0
  • Introduction
  • Overview
    • Life of a Fluentd event
    • Support
    • FAQ
    • Logo
    • fluent-package v5 vs td-agent v4
  • Installation
    • Before Installation
    • Install fluent-package
      • RPM Package (Red Hat Linux)
      • DEB Package (Debian/Ubuntu)
      • .dmg Package (macOS)
      • .msi Installer (Windows)
    • Install calyptia-fluentd
      • RPM Package (Red Hat Linux)
      • DEB Package (Debian/Ubuntu)
      • .dmg Package (macOS)
      • .msi Installer (Windows)
    • Install by Ruby Gem
    • Install from Source
    • Post Installation Guide
    • Obsolete Installation
      • Treasure Agent v4 (EOL) Installation
        • Install by RPM Package v4 (Red Hat Linux)
        • Install by DEB Package v4 (Debian/Ubuntu)
        • Install by .dmg Package v4 (macOS)
        • Install by .msi Installer v4 (Windows)
      • Treasure Agent v3 (EOL) Installation
        • Install by RPM Package v3 (Red Hat Linux)
        • Install by DEB Package v3 (Debian/Ubuntu)
        • Install by .dmg Package v3 (macOS)
        • Install by .msi Installer v3 (Windows)
  • Configuration
    • Config File Syntax
    • Config File Syntax (YAML)
    • Routing Examples
    • Config: Common Parameters
    • Config: Parse Section
    • Config: Buffer Section
    • Config: Format Section
    • Config: Extract Section
    • Config: Inject Section
    • Config: Transport Section
    • Config: Storage Section
    • Config: Service Discovery Section
  • Deployment
    • System Configuration
    • Logging
    • Signals
    • RPC
    • High Availability Config
    • Performance Tuning
    • Multi Process Workers
    • Failure Scenarios
    • Plugin Management
    • Trouble Shooting
    • Fluentd UI
    • Linux Capability
    • Command Line Option
    • Source Only Mode
    • Zero-downtime restart
  • Container Deployment
    • Docker Image
    • Docker Logging Driver
    • Docker Compose
    • Kubernetes
  • Monitoring Fluentd
    • Overview
    • Monitoring by Prometheus
    • Monitoring by REST API
  • Input Plugins
    • tail
    • forward
    • udp
    • tcp
    • unix
    • http
    • syslog
    • exec
    • sample
    • monitor_agent
    • windows_eventlog
  • Output Plugins
    • file
    • forward
    • http
    • exec
    • exec_filter
    • secondary_file
    • copy
    • relabel
    • roundrobin
    • stdout
    • null
    • s3
    • kafka
    • elasticsearch
    • opensearch
    • mongo
    • mongo_replset
    • rewrite_tag_filter
    • webhdfs
    • buffer
  • Filter Plugins
    • record_transformer
    • grep
    • parser
    • geoip
    • stdout
  • Parser Plugins
    • regexp
    • apache2
    • apache_error
    • nginx
    • syslog
    • ltsv
    • csv
    • tsv
    • json
    • msgpack
    • multiline
    • none
  • Formatter Plugins
    • out_file
    • json
    • ltsv
    • csv
    • msgpack
    • hash
    • single_value
    • stdout
    • tsv
  • Buffer Plugins
    • memory
    • file
    • file_single
  • Storage Plugins
    • local
  • Service Discovery Plugins
    • static
    • file
    • srv
  • Metrics Plugins
    • local
  • How-to Guides
    • Stream Analytics with Materialize
    • Send Apache Logs to S3
    • Send Apache Logs to Minio
    • Send Apache Logs to Mongodb
    • Send Syslog Data to Graylog
    • Send Syslog Data to InfluxDB
    • Send Syslog Data to Sematext
    • Data Analytics with Treasure Data
    • Data Collection with Hadoop (HDFS)
    • Simple Stream Processing with Fluentd
    • Stream Processing with Norikra
    • Stream Processing with Kinesis
    • Free Alternative To Splunk
    • Email Alerting like Splunk
    • How to Parse Syslog Messages
    • Cloud Data Logging with Raspberry Pi
  • Language Bindings
    • Java
    • Ruby
    • Python
    • Perl
    • PHP
    • Nodejs
    • Scala
  • Plugin Development
    • How to Write Input Plugin
    • How to Write Base Plugin
    • How to Write Buffer Plugin
    • How to Write Filter Plugin
    • How to Write Formatter Plugin
    • How to Write Output Plugin
    • How to Write Parser Plugin
    • How to Write Storage Plugin
    • How to Write Service Discovery Plugin
    • How to Write Tests for Plugin
    • Configuration Parameter Types
    • Upgrade Plugin from v0.12
  • Plugin Helper API
    • Plugin Helper: Child Process
    • Plugin Helper: Compat Parameters
    • Plugin Helper: Event Emitter
    • Plugin Helper: Event Loop
    • Plugin Helper: Extract
    • Plugin Helper: Formatter
    • Plugin Helper: Inject
    • Plugin Helper: Parser
    • Plugin Helper: Record Accessor
    • Plugin Helper: Server
    • Plugin Helper: Socket
    • Plugin Helper: Storage
    • Plugin Helper: Thread
    • Plugin Helper: Timer
    • Plugin Helper: Http Server
    • Plugin Helper: Service Discovery
  • Troubleshooting Guide
  • Appendix
    • Update from v0.12 to v1
    • td-agent v2 vs v3 vs v4
Powered by GitBook
On this page
  • Methods
  • Writing Tests
  • Overview of Tests

Was this helpful?

  1. Plugin Development

How to Write Filter Plugin

PreviousHow to Write Buffer PluginNextHow to Write Formatter Plugin

Last updated 3 months ago

Was this helpful?

This section shows how to write a custom filter plugin in addition to the core . The plugin filenames, starting with filter_ prefix, are registered as filter plugins.

See for more details on the common APIs of all the plugins.

Here is the implementation of the most basic filter that passes through all the events as-is:

require 'fluent/plugin/filter'

module Fluent::Plugin
  class PassThruFilter < Filter
    # Register this filter as "passthru"
    Fluent::Plugin.register_filter('passthru', self)

    # config_param works like other plugins

    def configure(conf)
      super
      # Do the usual configuration here
    end

    # def start
    #   super
    #   # Override this method if anything needed as startup.
    # end

    # def shutdown
    #   # Override this method to use it to free up resources, etc.
    #   super
    # end

    def filter(tag, time, record)
      # Since our example is a pass-through filter, it does nothing and just
      # returns the record as-is.
      # If returns nil, that records are ignored.
      record
    end
  end
end

Methods

A filter plugin overrides the one of filter/filter_with_time/filter_stream method.

#filter(tag, time, record)

This method implements the filtering logic.

  • tag: is a String,

  • time is a Fluent::EventTime or an Integer; and,

  • record is a Hash with String keys.

The return value of this method should be a Hash of modified record, or nil. If it is nil, the event will be discarded.

# example
def filter(tag, time, record)
  # process record
  record['fluentd_tag'] = tag
  record
end

#filter_with_time(tag, time, record)

This method implements the filtering logic with time update. Event time will be replaced with the return value.

  • tag: is a String,

  • time is a Fluent::EventTime or an Integer; and,

  • record is a Hash with String keys.

The return value of this method should be two element array, [new_time, new_record] , or nil. If it is nil, the event will be discarded.

# example
def filter_with_time(tag, time, record)
  new_time = get_time_from_record(record)
  new_record = update_record(tag, record)
  return new_time, new_record  # this is same with return [new_time, new_record]
end

#filter_stream(tag, es)

This method implements the event stream based filtering logic. If you hard to implement the logic with filter, e.g. need to handle multiple records in one processing, use this method.

  • tag: is a String,

The return value of this method should be MultiEventStream. If it is nil, the event will be discarded.

# example
def filter_stream(tag, es)
  new_es = Fluent::MultiEventStream.new
  es.each { |time, record|
    new_time = process_time(tag, time, record)
    new_record = process_record(tag, time, record)
    new_es.add(time, record)
  }
  new_es
end

Writing Tests

Fluentd filter plugin has one or some points to be tested. Others (parsing configurations, controlling buffers, retries, flushes and many others) are controlled by Fluentd core.

Fluentd also provides test driver for plugins. You can write tests for your own plugins very easily:

# test/plugin/test_filter_your_own.rb

require 'test/unit'
require 'fluent/plugin/test/driver/filter'

# your own plugin
require 'fluent/plugin/filter_your_own'

class YourOwnFilterTest < Test::Unit::TestCase
  def setup
    Fluent::Test.setup # this is required to setup router and others
  end

  # default configuration for tests
  CONFIG = %[
    param1 value1
    param2 value2
  ]

  def create_driver(conf = CONFIG)
    Fluent::Test::Driver::Filter.new(Fluent::Plugin::YourOwnFilter).configure(conf)
  end

  def filter(config, messages)
    d = create_driver(config)
    d.run(default_tag: 'input.access') do
      messages.each do |message|
        d.feed(message)
      end
    end
    d.filtered_records
  end

  sub_test_case 'configured with invalid configuration' do
    test 'empty configuration' do
      assert_raise(Fluent::ConfigError) do
         create_driver('')
      end
    end

    test 'param1 should reject too short string' do
      conf = %[
        param1 a
      ]
      assert_raise(Fluent::ConfigError) do
         create_driver(conf)
      end
    end
    # ...
  end

  sub_test_case 'plugin will add some fields' do
    test 'add hostname to record' do
      conf = CONFIG
      messages = [
        { 'message' => 'This is test message' }
      ]
      expected = [
        { 'message' => 'This is test message', 'hostname' => 'example.com' }
      ]
      filtered_records = filter(conf, messages)
      assert_equal(expected, filtered_records)
    end
    # ...
  end
  # ...
end

Overview of Tests

Testing for the filter plugins is mainly for:

  • Validation of configuration parameters (i.e. #configure)

  • Validation of the filtered records

To make testing easy, the plugin test driver provides a dummy router, a logger and general functionality to override the system, parser and other relevant configurations.

The lifecycle of the plugin and its test driver is:

  1. Instantiate the test driver which then instantiates the plugin

  2. Configure plugin

  3. Register conditions to stop/break running tests

  4. Run test code (provided as a block for d.run)

  5. Assert results of tests using data provided by the driver

At the start of Step # 4, the test driver calls the startup methods of the plugin e.g. #start and at the end #stop, #shutdown, etc. It can be skipped by optional arguments of #run.

For:

  • configuration tests, repeat steps # 1-2

  • full feature tests, repeat steps # 1-5

es is a Fluent::EventStream classes. See

For more details, see .

If this article is incorrect or outdated, or omits critical information, please . is an open-source project under . All components are available under the Apache 2 License.

ones
Plugin Base Class API
EventStream code
Testing API for Plugins
let us know
Fluentd
Cloud Native Computing Foundation (CNCF)