Log analysis with Kinesis – Lambda – ElasticSearch – Kibana pipeline

ELK (ElasticSearch – LogStash – Kibana) is my favourite stack for managing and analysing server logs for years. Recently, when deploying a log management module for a small company on Amazon Web Service environment, I found that AWS Kinesis can help me achieve the same target with much less effort of server management and configuration. The only server the company need to manage is a t2.micro one, which runs a Kibana instance and a Nginx reverse proxy. The other parts of the stack runs only with AWS services. I’ll describe what I used in this post.

An overview

I built a module to monitor nginx log files on several EC2 instances which run a web application behind a load balancers. The diagram below tells you how it works. In short, it includes these components:

  • An AWS ElasticSearch Service instance which stores the log data
  • A Kibana instance allows user analyse and visualise ES data
  • A Kinesis Firehose stream which delivers transformed log data to ElasticSearch as well as back up the raw log data to S3 storage
  • A Lambda function which transforms log lines into JSON structure
  • An S3 bucket store the raw log data
  • Kinesis agents run on EC2 instances which read Nginx access logs and post to the data stream
Kinesis Lamda ElasticSearch Kibana pipeline diagram

Setting up components

Nginx log format and the AWS Kinesis agents

Nginx instances was set up to use this format for their access logs:

log_format  main  '$time_local $remote_addr $remote_user $request_method $request_uri '
                  '$server_addr $server_port $host '
                  '"$http_user_agent" "$http_referer" '
                  '$status $bytes_sent $request_time '
                  '"$http_x_forwarded_for" $http_x_forwarded_port $http_x_forwarded_proto';

The access log files was configured to be rotated daily.

AWS Kinesis agents were installed following this guide. The filePattern value is the location of nginx access log. The deliveryStream is the name of Firehose stream, which would be created later.

ElasticSearch and Kibana

Amazon provides a good solution for those who don’t want to manage a complicated ElasticSearch cluster: AWS ElasticSearch Service. There are several articles on the Internet comparing different deployment approaches for ElasticSearch cluster. While self-hosted instances requires huge workload of maintenance,  dedicated cloud ElasticSearch services are more expensive than AWS ES. After creating a ES domain, we have 2 URLs: one for the ElasticSearch RESTful API, one for Kibana interface.

I planned to store access log events in one index per month. Each index would be named as “access-log-YYYY-MM”. The timestamp part would be formatted automatically by Kinesis. I also created an index template on ElasticSearch to define the data type of fields in the events. The structure of each event was decided by the lambda transformation function, which will be addressed later in this post. The final template is as follow:

{
  "template": "access-log-*",
  "mappings": {
    "log": {
      "properties": {
        "geoip": {
          "dynamic": true,
          "properties": {
            "location": {
              "type": "geo_point"
            }
          }
        },
        "method": {
          "type": "keyword"
        },
        "bytesSent": {
          "type": "integer"
        },
        "originalIP": {
          "type": "ip"
        },
        "forwardedProtocol": {
          "type": "keyword"
        },
        "requestTime": {
          "type": "half_float"
        },
        "port": {
          "type": "keyword"
        },
        "clientIP": {
          "type": "ip"
        },
        "browser": {
          "dynamic": true,
          "properties": {
            "os": {
              "dynamic": true,
              "properties": {
                "family": {
                  "type": "keyword"
                }
              }
            },
            "family": {
              "type": "keyword"
            },
            "device": {
              "dynamic": true,
              "properties": {
                "family": {
                  "type": "keyword"
                }
              }
            }
          }
        },
        "host": {
          "type": "keyword"
        },
        "serverIP": {
          "type": "ip"
        },
        "device": {
          "dynamic": true,
          "properties": {
            "type": {
              "type": "keyword"
            }
          }
        },
        "forwardedPort": {
          "type": "keyword"
        },
        "status": {
          "type": "keyword"
        }
      }
    }
  }
}

 

Kinesis Firehose stream

To deliver logs from Kinesis agent to ES, I created a Kinesis Firehose stream. These are some important notes:

  • The stream name would be used in the agent.jsonfile of the Kinesis agent on each EC2 instances.
  • The index name should match with the prefix in the ES index template. Here, I chose “access-log” for index name.
  • I set the index rotation to monthly (OneMonth) because I just want to keep the logs of the last 30 days. Old indices will be deleted to free up space of the ES domain.
  • I chose “all documents” as the S3 backup mode because I want to archive all raw log lines.
  • To transform a log line to a JSON structure, I enabled “data transformation” feature and chose a lambda function as the transformer.

After the Kinesis Firehose stream was created, I had a data pipeline with the following steps:

  1. Log lines are collected by Kinesis agents in EC2 instances.
  2. Log lines are submitted to the Firehose stream, each line becomes an event with a unique ID and the data set to the original log line.
  3. When the total data of events in the stream becomes larger than the S3 buffer size, the data of current events in the buffered will be written to the S3 bucket.
  4. When the total data of events in the stream becomes larger than the ES buffer size, the configured lambda function will be invoked. All current events in the buffer will be passed to the function handler as a collection of items. Each items has a id and a data field which is base64-encoded from the log line. The results of the lambda function, which is also base64-encoded, will replace the input events in the stream and be forwarded to the next steps.
  5. At this steps, events data are decoded to JSON structure and inserted to the ES index as a batch insertion.
  6. Users use Kibana to access the ES database, do queries, analyses and visualises the data.

Lambda transformation function

This component plays an important role in the pipeline. The transformation function convert the unstructured data (lines of text) into semi-structured data (JSON objects). It adds more data to the event such as geoIP information from the client IP, information about the client OS, devices and browsers, … It cast the strings to numbers for specific fields, too. For this configuration, I needed the support of some third-party libraries of NodeJS. Therefore, I created a deployment package for NodeJS 6, uploaded it to an S3 bucket and used it as the lambda function code. This is the body of my lambda function:

'use  strict';
console.log('Loading function');

let patterns = require('node-grok').loadDefaultSync();
let geoip = require('geoip-lite');
let device = require('device');
let countries = require('countryjs');
let moment = require('moment');

let grokPattern = "%{HTTPDATE:timestamp} %{IP:clientIP} %{NOTSPACE:username} %{WORD:method} %{NOTSPACE:uri} %{IP:serverIP} %{NUMBER:port} %{NOTSPACE:host} %{QUOTEDSTRING:userAgent} %{NOTSPACE:referer} %{NUMBER:status} %{NUMBER:bytesSent} %{NUMBER:requestTime} %{QUOTEDSTRING:forwardedFor} (?<forwardedPort>-|%{NUMBER}) (?<forwardedProtocol>-|%{NOTSPACE})";
let pattern = patterns.createPattern(grokPattern);

/**
 * @see http://docs.aws.amazon.com/elasticloadbalancing/latest/classic/x-forwarded-headers.html#x-forwarded-for
 * @param strVal
 * @return string|null
 */
const getForwardedIp = (strVal) => {
    let ips = strVal.split(",").map((x) => x.trim());
    let originalIP = ips[ips.length - 1];

    if (originalIP === "-") {
        return null;
    } else {
        return originalIP;
    }
};

let transform = (event, context, callback) => {
    let success = 0; // Number of valid entries found
    let failure = 0; // Number of invalid entries found

    /* Process the list of records and transform them */
    const output = event.records.map((record) => {
        const entry = (Buffer.from(record.data, 'base64')).toString('utf8');
        const match = pattern.parseSync(entry);

        let result = {
            message: entry,
            tags: []
        };
        if (match) {
            /* Prepare JSON version from Apache log data */

            result = Object.assign(result, match);
            result.tags.push('grok_parsed_success');

            // trim the double quotes from the user agent, referer and forwarded for
            result.userAgent = result.userAgent.substr(1, result.userAgent.length - 2);
            result.referer = result.referer.substr(1, result.referer.length - 2);
            result.forwardedFor = result.forwardedFor.substr(1, result.forwardedFor.length - 2);

            // convert strings to numbers
            result.bytesSent = +result.bytesSent;
            result.requestTime = +result.requestTime;

            // get the original IP
            let forwardedIp = getForwardedIp(result.forwardedFor);
            if (!forwardedIp) {
                result.originalIP = result.clientIP;
            } else {
                result.originalIP = forwardedIp;
            }

            // geoip lookup
            let resolvedIp = geoip.lookup(result.originalIP);
            result.geoip = resolvedIp;
            if (resolvedIp) {
                result.tags.push('geoip_looked_up_success');
                result.geoip.countryName = countries.name(result.geoip.country);

                // the GeoJSON format has the longitude before the latitude in the array, we must change
                // geoip.ll into geoip.location object
                result.geoip.location = {
                    lat: result.geoip.ll[0],
                    lon: result.geoip.ll[1]
                };
                delete result.geoip.ll;
            } else {
                result.tags.push('geoip_looked_up_fail');
            }

            // useragent parser
            result.device = device(result.userAgent, {parseUserAgent: true});
            result.browser = result.device.parser.useragent;
            delete  result.device.parser;

            // add @timestamp field
            result["@timestamp"] = moment(result.timestamp, 'DD/MMM/YYYY:HH:mm:ss Z', true).toISOString();
            delete result.timestamp;

            const payload = (Buffer.from(JSON.stringify(result), 'utf8')).toString('base64');
            success++;
            return {
                recordId: record.recordId,
                result: 'Ok',
                data: payload,
            };
        } else {
            result.tags.push('grok_parsed_fail');
            /* Failed event, notify the error and leave the record intact */
            failure++;
            return {
                recordId: record.recordId,
                result: 'ProcessingFailed',
                data: record.data,
            };
        }
    });
    console.log(`Processing completed.  Successful records ${success}, Failed records ${failure}.`);
    callback(null, {records: output});
};

exports.handler = transform;

For each event in the input collection, I use the node-grok package to parse the log line like the way I did with the awesome grok library of LogStash. The grok pattern was defined based on the Nginx log format. Some fields which enclosed in double quotes were processed to strip those quotes out.  After that, the original IP address was gotten from the client IP and the content of the X-FORWARDED-FOR HTTP header. That IP was parse by the geoip-lite library with the help of the countryjs library to get the country name from the ISO 3166-1 alpha 2 code. The user-agent string was parsed with the device library to get the information of the client OS, devices as well as the browsers. The @timestamp field of each event was created by parsing the Nginx $time_local time format. At the end, the event data is base64-encoded.

Conclusion

After running the above stack in few days. I see that the events are delayed about 5 minutes before appearing on the Kibana dashboard. Another caveat is that the Kinesis agent does not run on Windows instances. To visualise geographical data on the Kibana tiled map, I need to set up an additional Kibana instance because the Kibana version packaged in AWS ES is modified and does not work with the map layers. I also hide the AWS ES domain from public access and allow users to use the Kibana interface via an Nginx reverse proxy. Compare to my another approach with ElasticSearch, LogStash, Kibana and File beat, the stack mentioned in this post reaches the same level of high-availability with less configuration and maintenance effort. The Kinesis Firehose stream seems to be more sophisticated way to process this type of data.