Fluentd
1.0
1.0
  • Introduction
  • Overview
    • Life of a Fluentd event
    • Support
    • FAQ
    • Logo
    • fluent-package v5 vs td-agent v4
  • Installation
    • Before Installation
    • Install fluent-package
      • RPM Package (Red Hat Linux)
      • DEB Package (Debian/Ubuntu)
      • .dmg Package (macOS)
      • .msi Installer (Windows)
    • Install calyptia-fluentd
      • RPM Package (Red Hat Linux)
      • DEB Package (Debian/Ubuntu)
      • .dmg Package (macOS)
      • .msi Installer (Windows)
    • Install by Ruby Gem
    • Install from Source
    • Post Installation Guide
    • Obsolete Installation
      • Treasure Agent v4 (EOL) Installation
        • Install by RPM Package v4 (Red Hat Linux)
        • Install by DEB Package v4 (Debian/Ubuntu)
        • Install by .dmg Package v4 (macOS)
        • Install by .msi Installer v4 (Windows)
      • Treasure Agent v3 (EOL) Installation
        • Install by RPM Package v3 (Red Hat Linux)
        • Install by DEB Package v3 (Debian/Ubuntu)
        • Install by .dmg Package v3 (macOS)
        • Install by .msi Installer v3 (Windows)
  • Configuration
    • Config File Syntax
    • Config File Syntax (YAML)
    • Routing Examples
    • Config: Common Parameters
    • Config: Parse Section
    • Config: Buffer Section
    • Config: Format Section
    • Config: Extract Section
    • Config: Inject Section
    • Config: Transport Section
    • Config: Storage Section
    • Config: Service Discovery Section
  • Deployment
    • System Configuration
    • Logging
    • Signals
    • RPC
    • High Availability Config
    • Performance Tuning
    • Multi Process Workers
    • Failure Scenarios
    • Plugin Management
    • Trouble Shooting
    • Fluentd UI
    • Linux Capability
    • Command Line Option
    • Source Only Mode
    • Zero-downtime restart
  • Container Deployment
    • Docker Image
    • Docker Logging Driver
    • Docker Compose
    • Kubernetes
  • Monitoring Fluentd
    • Overview
    • Monitoring by Prometheus
    • Monitoring by REST API
  • Input Plugins
    • tail
    • forward
    • udp
    • tcp
    • unix
    • http
    • syslog
    • exec
    • sample
    • monitor_agent
    • windows_eventlog
  • Output Plugins
    • file
    • forward
    • http
    • exec
    • exec_filter
    • secondary_file
    • copy
    • relabel
    • roundrobin
    • stdout
    • null
    • s3
    • kafka
    • elasticsearch
    • opensearch
    • mongo
    • mongo_replset
    • rewrite_tag_filter
    • webhdfs
    • buffer
  • Filter Plugins
    • record_transformer
    • grep
    • parser
    • geoip
    • stdout
  • Parser Plugins
    • regexp
    • apache2
    • apache_error
    • nginx
    • syslog
    • ltsv
    • csv
    • tsv
    • json
    • msgpack
    • multiline
    • none
  • Formatter Plugins
    • out_file
    • json
    • ltsv
    • csv
    • msgpack
    • hash
    • single_value
    • stdout
    • tsv
  • Buffer Plugins
    • memory
    • file
    • file_single
  • Storage Plugins
    • local
  • Service Discovery Plugins
    • static
    • file
    • srv
  • Metrics Plugins
    • local
  • How-to Guides
    • Stream Analytics with Materialize
    • Send Apache Logs to S3
    • Send Apache Logs to Minio
    • Send Apache Logs to Mongodb
    • Send Syslog Data to Graylog
    • Send Syslog Data to InfluxDB
    • Send Syslog Data to Sematext
    • Data Analytics with Treasure Data
    • Data Collection with Hadoop (HDFS)
    • Simple Stream Processing with Fluentd
    • Stream Processing with Norikra
    • Stream Processing with Kinesis
    • Free Alternative To Splunk
    • Email Alerting like Splunk
    • How to Parse Syslog Messages
    • Cloud Data Logging with Raspberry Pi
  • Language Bindings
    • Java
    • Ruby
    • Python
    • Perl
    • PHP
    • Nodejs
    • Scala
  • Plugin Development
    • How to Write Input Plugin
    • How to Write Base Plugin
    • How to Write Buffer Plugin
    • How to Write Filter Plugin
    • How to Write Formatter Plugin
    • How to Write Output Plugin
    • How to Write Parser Plugin
    • How to Write Storage Plugin
    • How to Write Service Discovery Plugin
    • How to Write Tests for Plugin
    • Configuration Parameter Types
    • Upgrade Plugin from v0.12
  • Plugin Helper API
    • Plugin Helper: Child Process
    • Plugin Helper: Compat Parameters
    • Plugin Helper: Event Emitter
    • Plugin Helper: Event Loop
    • Plugin Helper: Extract
    • Plugin Helper: Formatter
    • Plugin Helper: Inject
    • Plugin Helper: Parser
    • Plugin Helper: Record Accessor
    • Plugin Helper: Server
    • Plugin Helper: Socket
    • Plugin Helper: Storage
    • Plugin Helper: Thread
    • Plugin Helper: Timer
    • Plugin Helper: Http Server
    • Plugin Helper: Service Discovery
  • Troubleshooting Guide
  • Appendix
    • Update from v0.12 to v1
    • td-agent v2 vs v3 vs v4
Powered by GitBook
On this page
  • Record Format
  • Methods
  • zero_downtime_restart_ready?
  • Writing Tests
  • Overview of Tests

Was this helpful?

  1. Plugin Development

How to Write Input Plugin

PreviousPlugin DevelopmentNextHow to Write Base Plugin

Last updated 4 months ago

Was this helpful?

Extend Fluent::Plugin::Input class and implement its methods.

See for details on the common APIs for all plugin types.

In most cases, input plugins start timers, threads, or network servers to listen on ports in #start method and then call router.emit in the callbacks of timers, threads or network servers to emit events.

Example:

require 'fluent/plugin/input'

module Fluent::Plugin
  class SomeInput < Input
    # First, register the plugin. 'NAME' is the name of this plugin
    # and identifies the plugin in the configuration file.
    Fluent::Plugin.register_input('NAME', self)

    # `config_param` defines a parameter.
    # You can refer to a parameter like an instance variable e.g. @port.
    # `:default` means that the parameter is optional.
    config_param :port, :integer, default: 8888

    # `configure` is called before `start`.
    # 'conf' is a `Hash` that includes the configuration parameters.
    # If the configuration is invalid, raise `Fluent::ConfigError`.
    def configure(conf)
      super

      # The configured 'port' is referred by `@port` or instance method `#port`.
      if @port < 1024
        raise Fluent::ConfigError, "well-known ports cannot be used."
      end

      # You can also refer to raw parameter via `conf[name]`.
      @port = conf['port']
      # ...
    end

    # `start` is called when starting and after `configure` is successfully completed.
    # Open sockets or files and create threads here.
    def start
      super

      # Startup code goes here!
    end

    # `shutdown` is called while closing down.
    def shutdown
      # Shutdown code goes here!

      super
    end
  end
end

To submit events, use router.emit(tag, time, record) method, where:

  • tag is a String,

  • time is the Fluent::EventTime or Integer as Unix time; and,

  • record is a Hash object.

Example:

tag = "myapp.access"
time = Fluent::Engine.now
record = {"message"=>"body"}
router.emit(tag, time, record)

To submit multiple events, use router.emit_stream(tag, es) method, where:

  • tag is a String; and,

  • es is a MultiEventStream object.

Example:

es = MultiEventStream.new
records.each do |record|
  es.add(time, record)
end
router.emit_stream(tag, es)

Record Format

Fluentd plugins assume the record is in JSON format so the key should be the String, not Symbol. If you emit a record with a key as Symbol, it may cause a problem.

Example:

# Good
router.emit(tag, time, {'foo' => 'bar'})

# Bad
router.emit(tag, time, {:foo => 'bar'})

Methods

zero_downtime_restart_ready?

def zero_downtime_restart_ready?
  true
end

To do this, the following condition must be met:

  • This plugin can run in parallel with another Fluentd.

This is because there is a period when the old process and the new process run in parallel during a zero-downtime restart.

After addressing the following considerations and ensuring there are no issues, override this method. Then, the plugin will succeed with zero-downtime restart.

  • Handling Files

    • When handling files, there is a possibility of conflict.

    • Basically, input plugins that handle files should not support Zero-downtime restart.

  • Handling Sockets

    • When handling sockets on your own, be careful to avoid conflicts.

Writing Tests

Fluentd input plugin has one or more points to be tested. Others aspects (parsing configurations, controlling buffers, retries, flushes, etc.) are controlled by the Fluentd core.

Fluentd also provides the test drivers for plugins. You can write tests for your own plugins very easily:

# test/plugin/test_in_your_own.rb

require 'test/unit'
require 'fluent/test'
require 'fluent/test/driver/input'

# Your own plugin
require 'fluent/plugin/in_your_own'

class YourOwnInputTest < Test::Unit::TestCase
  def setup
    # This line is required to set up router, and other required components.
    Fluent::Test.setup
  end

  # Default configuration for tests
  CONFIG = %[
    param1 value1
    param2 value2
  ]

  def create_driver(conf = CONFIG)
    Fluent::Test::Driver::Input.new(Fluent::Plugin::YourOwnInput).configure(conf)
  end

  sub_test_case 'configured with invalid configurations' do
    test 'param1 should reject too short string' do
      assert_raise Fluent::ConfigError do
        create_driver(%[
          param1 a
        ])
      end
    end

    test 'param2 is set correctly' do
      d = create_driver
      assert_equal 'value2', d.instance.param2
    end
    # ...
  end

  sub_test_case 'plugin will emit some events' do
    test 'test expects plugin emits events 4 times' do
      d = create_driver

      # This method blocks until the input plugin emits events 4 times
      # or 10 seconds lapse.
      d.run(expect_emits: 4, timeout: 10)

      # An array of `[tag, time, record]`
      events = d.events

      assert_equal 'expected_tag', events[0][0]
      # ...
    end
  end
  # ...
end

Overview of Tests

Testing for input plugins is mainly for:

  • Validation of configuration (i.e. #configure)

  • Validation of the emitted events

To make testing easy, the test driver provides a dummy router, a logger and the functionality to override system and parser configurations, etc.

The lifecycle of plugin and test driver is:

  1. Instantiate plugin driver which then instantiates the plugin

  2. Configure plugin

  3. Register conditions to stop/break running tests

  4. Run test code (provided as a block for d.run)

  5. Assert results of tests by data provided by the driver

Test driver calls methods for plugin lifecycle at the beginning of Step # 4 (i.e. #start) and at the end (i.e. #stop, #shutdown, etc.). It can be skipped by optional arguments of #run.

For:

  • configuration tests, repeat steps # 1-2

  • full feature tests, repeat steps # 1-5

To support , you can override this method to return true.

A socket provided as a shared socket by is shared between the old and new processes. So, such a plugin can support Zero-downtime restart.

See for details.

If this article is incorrect or outdated, or omits critical information, please . is an open-source project under . All components are available under the Apache 2 License.

Plugin Base Class API
Zero-downtime restart
server plugin helper
Testing API for Plugins
let us know
Fluentd
Cloud Native Computing Foundation (CNCF)