Practical Real-time Data Processing and Analytics
上QQ阅读APP看书,第一时间看更新

Fluentd

Fluentd is another tool to process log files. There are three components of Fluentd, the same as in Logstash, which are input, filter, and output. There are multiple input and output plugins are available as per the needs of your use case. Here, we will demonstrate a similar example to those seen previously, that is, reading from the log file and pushing it into Kafka.

Download Fluentd from https://www.fluentd.org/download. As we are using Ubuntu, select Debian installation. Download td-agent_2.3.4-0_amd64.deb and install it using Software Center in Ubuntu.

Once it is installed on the system, validate it using the following command:

    sudo td-agent --dry-run

The following output will be generated and certify that everything is good:

    2017-02-25 16:19:49 +0530 [info]: reading config file path="/etc/td-agent/td-agent.conf"
    2017-02-25 16:19:49 +0530 [info]: starting fluentd-0.12.31 as dry run mode
    2017-02-25 16:19:49 +0530 [info]: gem 'fluent-mixin-config-placeholders' version '0.4.0'
    2017-02-25 16:19:49 +0530 [info]: gem 'fluent-mixin-plaintextformatter' version '0.2.6'
    2017-02-25 16:19:49 +0530 [info]: gem 'fluent-plugin-kafka' version '0.5.3'
    2017-02-25 16:19:49 +0530 [info]: gem 'fluent-plugin-kafka' version '0.4.1'
    2017-02-25 16:19:49 +0530 [info]: gem 'fluent-plugin-mongo' version '0.7.16'
    2017-02-25 16:19:49 +0530 [info]: gem 'fluent-plugin-rewrite-tag-filter' version '1.5.5'
    2017-02-25 16:19:49 +0530 [info]: gem 'fluent-plugin-s3' version '0.8.0'
    2017-02-25 16:19:49 +0530 [info]: gem 'fluent-plugin-scribe' version '0.10.14'
    2017-02-25 16:19:49 +0530 [info]: gem 'fluent-plugin-td' version '0.10.29'
    2017-02-25 16:19:49 +0530 [info]: gem 'fluent-plugin-td-monitoring' version '0.2.2'
    2017-02-25 16:19:49 +0530 [info]: gem 'fluent-plugin-webhdfs' version '0.4.2'
    2017-02-25 16:19:49 +0530 [info]: gem 'fluentd' version '0.12.31'
    2017-02-25 16:19:49 +0530 [info]: adding match pattern="td.*.*" type="tdlog"
    2017-02-25 16:19:49 +0530 [info]: adding match pattern="debug.**" type="stdout"
    2017-02-25 16:19:49 +0530 [info]: adding source type="forward"
    2017-02-25 16:19:49 +0530 [info]: adding source type="http"
    2017-02-25 16:19:49 +0530 [info]: adding source type="debug_agent"
    2017-02-25 16:19:49 +0530 [info]: using configuration file: <ROOT>
      <match td.*.*>
        @type tdlog
        apikey xxxxxx
        auto_create_table 
        buffer_type file
        buffer_path /var/log/td-agent/buffer/td
        buffer_chunk_limit 33554432
        <secondary>
          @type file
          path /var/log/td-agent/failed_records
          buffer_path /var/log/td-agent/failed_records.*
        </secondary>
      </match>
      <match debug.**>
        @type stdout
      </match>
      <source>
        @type forward
      </source>
      <source>
        @type http
        port 8888
      </source>
      <source>
        @type debug_agent
        bind 127.0.0.1
        port 24230
      </source>
    </ROOT>
  

In Fluentd, to create a Pipeline, you need to write a configuration file, which is readable by Fluentd, and process the Pipeline. The default location of the Fluentd configuration file is /etc/td-agent/td-agent.conf. The following is a configuration file which reads from the log file and pushes each event into Kafka topic:

    <source>
      @type tail
      path /home/ubuntu/demo/files/test
      pos_file /home/ubuntu/demo/fluentd/test.log.pos
      tag fluentd.example
      format none
    </source>
    <match *.**>
      @type kafka_buffered
      brokers localhost:9092
      default_topic fluentd-example
      max_send_retries 1
    </match>
  

In the configuration file, there are two directives defined previously, out of six available directives. The Source directive is where all data comes from. @type tells us which type of input plugin is being used. Here, we are using tail, which will tail the log file. This is good for the use case where the input log file is running a log file in which events/logs are getting appended at the end of the file. It is same as the tail -f operation in Linux. There are multiple parameters of the tail input plugin. Path is the absolute path of the log file. Pos_file is the file which will keep track of last read position of the input file. Tag is the tag of the event. If you want to define the input format like CSV, or apply regex, then use format. As this parameter is mandatory, we used none which will use the input text as is.

The Match directive tells Fluentd what to do with the input, *.The ** pattern is telling us that whatever is coming in through the log files, just push it into the Kafka topic. If you want to use a different topic for error and information logs, then define the pattern as error or info and tag the input as the same. Brokers is a host and port where Kafka broker is running on a system. Default-topic is the topic name where you want to push the events. If you want to retry after message failure, then set max_send_reties to one or more.

Replace the previous configuration in /etc/td-agent/td-agent.conf.

Now, create topic on Kafka, as follows:

    /bin/kafka-topics.sh --create --topic fluentd-example --zookeeper localhost:2181 --partitions 1 -replication-factor 1 

Start the Fluentd agent, as follows:

    sudo td-agent

Start Kafka consumer, as follows:

    /bin/kafka-console-consumer.sh --topic fluentd-example --bootstrap-server localhost:9092

Once the process is started without any exceptions, then start adding statements in /home/ubuntu/demo/files/test as shown in the following screenshot:

The output on Kafka will be as shown in the following screenshot: