
Fluentd
Fluentd is another tool to process log files. There are three components of Fluentd, the same as in Logstash, which are input, filter, and output. There are multiple input and output plugins are available as per the needs of your use case. Here, we will demonstrate a similar example to those seen previously, that is, reading from the log file and pushing it into Kafka.
Download Fluentd from https://www.fluentd.org/download. As we are using Ubuntu, select Debian installation. Download td-agent_2.3.4-0_amd64.deb and install it using Software Center in Ubuntu.
Once it is installed on the system, validate it using the following command:
sudo td-agent --dry-run
The following output will be generated and certify that everything is good:
2017-02-25 16:19:49 +0530 [info]: reading config file path="/etc/td-agent/td-agent.conf" 2017-02-25 16:19:49 +0530 [info]: starting fluentd-0.12.31 as dry run mode 2017-02-25 16:19:49 +0530 [info]: gem 'fluent-mixin-config-placeholders' version '0.4.0' 2017-02-25 16:19:49 +0530 [info]: gem 'fluent-mixin-plaintextformatter' version '0.2.6' 2017-02-25 16:19:49 +0530 [info]: gem 'fluent-plugin-kafka' version '0.5.3' 2017-02-25 16:19:49 +0530 [info]: gem 'fluent-plugin-kafka' version '0.4.1' 2017-02-25 16:19:49 +0530 [info]: gem 'fluent-plugin-mongo' version '0.7.16' 2017-02-25 16:19:49 +0530 [info]: gem 'fluent-plugin-rewrite-tag-filter' version '1.5.5' 2017-02-25 16:19:49 +0530 [info]: gem 'fluent-plugin-s3' version '0.8.0' 2017-02-25 16:19:49 +0530 [info]: gem 'fluent-plugin-scribe' version '0.10.14' 2017-02-25 16:19:49 +0530 [info]: gem 'fluent-plugin-td' version '0.10.29' 2017-02-25 16:19:49 +0530 [info]: gem 'fluent-plugin-td-monitoring' version '0.2.2' 2017-02-25 16:19:49 +0530 [info]: gem 'fluent-plugin-webhdfs' version '0.4.2' 2017-02-25 16:19:49 +0530 [info]: gem 'fluentd' version '0.12.31' 2017-02-25 16:19:49 +0530 [info]: adding match pattern="td.*.*" type="tdlog" 2017-02-25 16:19:49 +0530 [info]: adding match pattern="debug.**" type="stdout" 2017-02-25 16:19:49 +0530 [info]: adding source type="forward" 2017-02-25 16:19:49 +0530 [info]: adding source type="http" 2017-02-25 16:19:49 +0530 [info]: adding source type="debug_agent" 2017-02-25 16:19:49 +0530 [info]: using configuration file: <ROOT> <match td.*.*> @type tdlog apikey xxxxxx auto_create_table buffer_type file buffer_path /var/log/td-agent/buffer/td buffer_chunk_limit 33554432 <secondary> @type file path /var/log/td-agent/failed_records buffer_path /var/log/td-agent/failed_records.* </secondary> </match> <match debug.**> @type stdout </match> <source> @type forward </source> <source> @type http port 8888 </source> <source> @type debug_agent bind 127.0.0.1 port 24230 </source> </ROOT>
In Fluentd, to create a Pipeline, you need to write a configuration file, which is readable by Fluentd, and process the Pipeline. The default location of the Fluentd configuration file is /etc/td-agent/td-agent.conf. The following is a configuration file which reads from the log file and pushes each event into Kafka topic:
<source> @type tail path /home/ubuntu/demo/files/test pos_file /home/ubuntu/demo/fluentd/test.log.pos tag fluentd.example format none </source> <match *.**> @type kafka_buffered brokers localhost:9092 default_topic fluentd-example max_send_retries 1 </match>
In the configuration file, there are two directives defined previously, out of six available directives. The Source directive is where all data comes from. @type tells us which type of input plugin is being used. Here, we are using tail, which will tail the log file. This is good for the use case where the input log file is running a log file in which events/logs are getting appended at the end of the file. It is same as the tail -f operation in Linux. There are multiple parameters of the tail input plugin. Path is the absolute path of the log file. Pos_file is the file which will keep track of last read position of the input file. Tag is the tag of the event. If you want to define the input format like CSV, or apply regex, then use format. As this parameter is mandatory, we used none which will use the input text as is.
The Match directive tells Fluentd what to do with the input, *.The ** pattern is telling us that whatever is coming in through the log files, just push it into the Kafka topic. If you want to use a different topic for error and information logs, then define the pattern as error or info and tag the input as the same. Brokers is a host and port where Kafka broker is running on a system. Default-topic is the topic name where you want to push the events. If you want to retry after message failure, then set max_send_reties to one or more.
Replace the previous configuration in /etc/td-agent/td-agent.conf.
Now, create topic on Kafka, as follows:
/bin/kafka-topics.sh --create --topic fluentd-example --zookeeper localhost:2181 --partitions 1 -replication-factor 1
Start the Fluentd agent, as follows:
sudo td-agent
Start Kafka consumer, as follows:
/bin/kafka-console-consumer.sh --topic fluentd-example --bootstrap-server localhost:9092
Once the process is started without any exceptions, then start adding statements in /home/ubuntu/demo/files/test as shown in the following screenshot:

The output on Kafka will be as shown in the following screenshot:
