Not all logs are of equal importance. Some require real-time analytics, others simply need to be stored long term so that they can be analyzed if needed. In this tutorial, I will show three different methods by which you can “fork” a single application’s stream of logs into multiple streams which can be parsed, filtered, and sent independently. In the process, we will learn about several open source projects: Fluentd, Fluent Bit, the Fluent Logger Golang, and a new project I created for this use case.

Modern applications produce logs with multiple types of information: debug information, error messages, stack traces, and more. Generally, applications are configured to write logs to a single stream which is sent to a single destination for storage and analysis. This is suboptimal because each log type may be formatted differently; more importantly, a single analytics platform is unlikely to be optimal for all types. Engineers want powerful search capabilities and real-time alerting for logs containing errors or stack traces. These capabilities come at a cost, and a simpler set of capabilities may be sufficient for the debug information in other logs. Ideally, one could send each log type to a different destination based on the type of analysis that it requires.

Moreover, even if all logs are sent to a single destination, each log type may benefit from individual parsing, filtering, and processing. This tutorial explores multiple methods which can achieve these outcomes.

In this post, we’ll discuss three different methods you can use to split a single application’s log output into multiple streams which can be processed separately:

  1. Fluent Bit stream processing
  2. Fluentd’s rewrite tag filter
  3. The Fluent Logger libraries

Each method will only work if certain requirements are met; I will clearly list these so that you can determine if the method will work for your use case.

This tutorial will not cover ingesting logs into Fluentd and Fluent Bit; it is agnostic to your deployment method. To learn about the basics of using Fluentd and Fluent Bit with AWS, I recommend the following:

A simple use case: sending error, info, and debug logs to separate CloudWatch log streams

To make each example concrete, we’ll solve a very simple use case. We want to send all of our logs to a single CloudWatch log group, but we want to separate the logs into different log streams based on their log level. This will allow them to be searched on separately.

This example is based on a real use case that I received from a customer. There’s a lot more you could do with these techniques- the ability to fork a stream of logs at the source is powerful. For example, you could send all logs to one system for long term storage, but fork off and copy error logs and exclusively send them to an alerting system. Or may be you want all logs to go to one destination, but you want to filter out debug messages that match certain expressions. This tutorial is a starting point for solving many advanced use cases.

We’ll use a simple Golang program I created as our app. It uses the popular logrus logger, and outputs log messages as JSON that look like the following:

{"level":"info","msg":"Got a request","path":"/","requestID":"45234523","time":"2019-11-08T20:36:11Z"}
{"level":"warning","msg":"Access denied","path":"/tardis","requestID":"546745643","time":"2019-11-08T20:36:11Z","user":"TheMaster"}
{"level":"debug","msg":"Admin access","path":"/tardis","requestID":"546745643","time":"2019-11-08T20:36:11Z","user":"TheDoctor"}

As you can see, these logs note their log level in a clear field in the JSON. This will make it easy to split the logs by their log level. This approach to logging is called structured logging; the log messages are designed to be machine-readable so that they can be easily queried and processed.

Finally, we are making one assumption about the tag given to these logs: Fluentd and Fluent Bit apply rules to logs based on their tag. In each example, we will assume that the tag for the logs from the application is prefixed with "app". If you are using FireLens for Amazon ECS, this will occur if you name your container "app".

Fluent Bit stream processing

Requirements:

  • Use Fluent Bit in your log pipeline.
  • Logs are formatted as JSON (or some format that you can parse to JSON in Fluent Bit) with fields that you can easily query.

One of the coolest features of Fluent Bit is that you can run SQL queries on logs as it processes them. You can accomplish a lot with stream processing, including solving our example use case.

The Fluent Bit internal log processing pipeline. Log inputs come first, then the parser stage, filter stage, and buffer stage. Finally, logs are routed to the log outputs. The stream processor can fork off logs right before they reach the outputs and send the results of queries back to the input stage.

Fluent Bit internal log processing pipeline

The diagram above shows the Fluent Bit internal log processing architecture. As you can see, logs can be ingested, parsed, and filtered before they reach the stream processor. Stream queries can then be run on the logs, and the results of your queries can be re-ingested into the log pipeline.

Before we can run a stream query on our logs, we will need to parse them as JSON so that we can access the log level field. If you use Amazon EKS or Amazon ECS to deploy your application, logs ingested into Fluent Bit will initially look something like the following:

{
    "log": "{\"level\":\"info\",\"msg\":\"Got a request\",\"path\":\"/\",\"requestID\":\"45234523\",\"time\":\"2019-11-08T20:36:11Z\"}"
}

The JSON log message emitted by the application is escaped. To split the stream on the log level, we will need to parse this escaped JSON. Add the following configuration snippet to your fluent-bit.conf configuration file.

[SERVICE]
    Parsers_File /parser.conf

[FILTER]
    Name parser
    Match *
    Key_Name log
    Parser json
    Reserve_Data True

The amazon/aws-for-fluent-bit image and the fluent/fluent-bit images include a built-in parsers.conf with a JSON parser. However, I found that the time format used by my logs was not compatible with the parser. So I wrote my own. You can see all files needed to build the custom Fluent Bit image for this example at this GitHub repository.

After parsing, the logs in Fluent Bit’s internal log pipeline will be formatted as a nice JSON object (technically Fluent Bit internally uses msgpack, a serialization format for data that is similar to JSON):

{
  "level": "info",
  "msg": "Got a request",
  "path": "/",
  "requestID": "45234523",
  "time": "2019-11-08T20:36:11Z"
}

We can now run stream queries on the logs. Stream queries are stored in a separate configuration file, which we will call stream_processing.conf.

[STREAM_TASK]
    Name   debug_logs
    Exec   CREATE STREAM debug WITH (tag='logs.debug') AS SELECT * from TAG:'app*' WHERE level = 'debug';

This query will create a new stream of logs containing only the debug logs. To understand how it works, let’s go through each part of the query.

CREATE STREAM debug WITH (tag='logs.debug')

We will create a new ‘stream’ of logs with the tag logs.debug. This stream of logs will enter the Fluent Bit internal log pipeline at the input stage, and can be parsed and filtered a second time. We can then create Fluent Bit output definitions which match the new logs.debug tag and send these logs to a destination.

AS SELECT * from TAG:'app*'

The new stream is created by selecting logs that whose tag start with app. Replace app* with a pattern that matches your application’s logs.

WHERE level = 'debug';

The new log stream contains logs from the source stream which have a field named level whose value is debug.

In order to create a new stream for each log level, we’ll need multiple stream queries. The full stream configuration file can be found on Github. The stream file must be referenced in the main configuration file:

[SERVICE]
    Streams_File stream_processing.conf

Recall that our end goal was to send our logs to one CloudWatch log group with separate CloudWatch Log streams for each log level. This is easy with the Fluent Bit plugin for CloudWatch; the log stream name can be a prefix plus the log tag.

[OUTPUT]
    Name cloudwatch
    Match   logs.*
    region us-east-1
    log_group_name streams-example
    log_stream_prefix log-level-
    auto_create_group true

With this output, we will have log streams named as follows:

  • log-level-logs.debug for the debug logs
  • log-level-logs.info for the info logs
  • log-level-logs.warning for the warn logs
  • log-level-logs.error for the error logs
  • log-level-logs.fatal for the fatal logs

Note that the output matches the tag pattern logs.*. This will match all of the logs we processed with our stream queries, but it will not match any of the original logs (which had tags with the prefix app). This is intentional. Stream queries do not fork a log stream – they copy it. Each of our stream queries created a copy of a subset of the logs. If we instead allowed our output to match any tag (*), then two copies of each log message would be at our destination: one from the original stream with tag app* and one from the new streams with tag logs.*. You can see the full Fluent Bit configuration file for this example on GitHub.

Things to keep in mind

  • Stream queries copy logs. The toy example shown in this section worked because all of our logs have a level field, and its value is one of five strings (debug, info, warning, error, and fatal). This made it possible to create stream queries which matched all of the logs. If this was not the case, we would run into a problem: Logs which do not match our stream queries would remain in the original heterogeneous log stream, which is discarded and not sent to a destination. If this is the case in your setup, read the Fluentbit streams processing documentation to see if you can write queries which will identify all of your logs. If you cannot, consider opening (or up-voting) an issue on the fluent/fluent-bit repository to support additional conditions in the where clause of stream queries.
  • Do not write circular stream queries. Do not write stream queries which will match their own results. In our example, the stream queries ingested logs whose tags matched app* and produced logs whose tags match logs*. If instead our select statement matched all logs (SELECT * from TAG:'*'), the query would become circular, matching its own results. This will cause Fluent Bit to freeze without an error message and stop processing logs.
  • This only works if your logs can be parsed with easily query-able fields. Fluent Bit supports a limited number of conditions which can be used in the WHERE clause. To use this method, you may need to write a custom parser for your logs to turn them into a format which can queried.

Fluentd rewrite tag filter

Requirements:

Fluentd’s rewrite tag filter plugin has partly overlapping functionality with Fluent Bit’s stream queries. We can use it to achieve our example use case.

<match app**>
  @type rewrite_tag_filter
  <rule>
    key log
    pattern /debug/
    tag logs.debug
  </rule>
</match>

This configuration accomplishes the same goal as the Fluent Bit stream query for debug logs. Note that we did not need to parse our logs as JSON first, because the Fluentd filter can match logs using a regular expression. If we first parsed our logs as JSON, the configuration would look like the following:

<match app**>
  @type rewrite_tag_filter
  <rule>
    key level
    pattern /debug/
    tag logs.debug
  </rule>
</match>

Fluentd’s rewrite tag filter has one key advantage over Fluent Bit’s stream queries for this use case: it forks logs instead of copying them. The Fluentd configuration shown above will take all debug logs from our original stream and change their tag. This is convenient because it means that we do not have to worry about having “left-over” logs that do not match any of the filters. After the filters are applied, the original log stream will only contain unmatched logs.

You can see the full Fluentd configuration here; notice that rather than creating additional sections with the filter, you create a single section with rules for all log types.

Fluent Logger libraries

Requirements:

  • Be willing to make code changes to your application.

The Fluent Logger libraries allow you to write logs directly to Fluentd or Fluent Bit. There are libraries available for many popular languages including Go, Java, Python, and NodeJS.

From my conversations with AWS customers, I’ve learned that some write custom logging libraries for their applications. If that is the case for your application, or if you are writing a new application from scratch, this option may be ideal.

Below, you can see an annotated example usage of the Fluent Logger for Golang.

// instantiate a struct which can send logs to Fluentd/Fluent Bit
fluentLogger, err := fluent.New(fluent.Config{})
if err != nil {
    log.Fatal(err)
}

// you can tag each log as necessary
tag := "app.logs"

// Send arbitrary data as a map
data := map[string]string{
    "foo": "bar",
}

// Send to Fluent instance
err = fluentLogger.Post(tag, data)

If you are using FireLens, ECS injects the environment variables FLUENT_HOST and FLUENT_PORT, which allow you to connect to the TCP port at which your log router is listening. You can use these environment variables to configure the logger:

port, err := strconv.Atoi(os.Getenv("FLUENT_PORT"))
if err != nil {
	// process error
}
config := fluent.Config{
	FluentPort: port,
	FluentHost: os.Getenv("FLUENT_HOST"),
}

A custom library for this example

To solve our use case, I have created a generic library which wraps the Fluent Logger for Golang, which can used as the output stream for the Logrus logger:

// configure logrus to output as JSON
logrus.SetFormatter(&logrus.JSONFormatter{})
logrus.SetLevel(logrus.DebugLevel)

// create a FluentWriter instance
fluentLogger, err := logger.NewFluentWriter(fluent.Config{}, "app", []string{"level"})
if err != nil {
    fmt.Println(err)
    os.Exit(1)
}

// Set logrus to use it
logrus.SetOutput(fluentLogger)

// Now use logrus as normal!
logrus.WithFields(logrus.Fields{
    "path": "/",
    "requestID": "45234523",
}).Info("Got a request")

How does this work? The Logrus logger can write to any io.Writer; my library exposes an io.Writer that writes logs to Fluentd/Fluent Bit.

logger.NewFluentWriter(fluent.Config{}, "app", []string{"level"})

See the project README for a full explanation of its functionality. Above, we see the call to the constructor for my library. The first argument is the Fluent Logger Golang config object which was shown in the beginning of this section. The second argument is a prefix for the tags given to logs emitted by this writer. The third argument is a list of keys in the log messages whose values will be the suffix of the tag. This library relies on the fact that the logs produced by Logrus will be JSON formatted. Thus, it will find the level key in each log message and append this to the tag prefix to construct the final tag. In practice, logs will be emitted with tags as follows:

  • app.debug for the debug logs
  • app.info for the info logs
  • app.warning for the warn logs
  • app.error for the error logs
  • app.fatal for the fatal logs

Logs which do not have a level field or which can not be parsed as JSON will simply be given the tag “app”. These tags can be sent to different CloudWatch Log Streams within a single Log Group using either Fluentd or Fluent Bit as shown in the previous sections.

You can see the full application code for this example in the project repository.

Things to keep in mind

  • I consider this approach to be experimental. I have only tested it with toy examples. If you choose this method, I suggest using the library I have created as a starting point. Based on knowledge of your own logs and use case, you may be able to write something which is ideal for your situation. While this example achieves the same as the previous examples, the power of the Fluent Logger Libraries is hypothetically greater. Because the log tagging is done in your code, you have full control, and you can solve use cases which would not be amenable to the other methods.
  • You must also process the standard out and error streams for your application. If the Fluent Logger libraries fail to send your logs, you need a fallback. I have written my library so that it prints a log to standard out if it fails to send to Fluentd/Fluent Bit.

Conclusion

Which approach is best?

In this post you learned three methods which allow you to “fork” a single application’s logs:

  1. Fluent Bit stream processing
  2. Fluentd’s rewrite tag filter
  3. The Fluent Logger libraries

Your choice will depend on the specifics of your own use case; one important consideration is the resource utilization incurred by each approach. To help you decide, I ran three tasks on Amazon ECS using FireLens. Each used the example application code from the introduction with the configuration for each method. Since the Fluent Logger example processes logs within the app code, while the first two examples offload it to the log router, I measured the total memory and CPU usage of the task (app + log router). For the third example, I used Fluent Bit as the log router, since it is generally more efficient.

A graph of CPU utilization of the three examples. The rewrite tag filter example uses the most CPU, then streams processing example, then the fluent logger library example.

A graph of Memory utilization of the three examples. The rewrite tag filter example uses the most Memory, and the Fluent Logger library and streams processing examples are tied with a very very log memory usage.

  • fluent-logger is the Fluent Logger Golang Example
  • stream-logger is the Fluent Bit Streams Example
  • rewrite-tag is the Fluentd Rewrite Tag Filter Example

These tests were run on a c5.9xlarge Amazon EC2 instance; each task was given one virtual CPU and two GB of memory. Obviously, the results are somewhat specific to my example. Its interesting, however, that fluent-logger has the lowest resource usage, meaning that performing this processing within your application code is the most efficient option. Other than that, the main takeaway is that Fluentd’s resource usage is significantly higher than Fluent Bit’s, which will not come as a surprise to anyone who has used both of them.

Solving your own use case

As I stated in the introduction, the goal of this tutorial was to learn techniques for forking a single stream of logs so that each sub-stream could be process independently. Sending to different CloudWatch log streams based on the log level was merely a single example meant to demonstrate each approach.

Dive into the Fluentd and Fluent Bit documentation to understand what is possible and figure out how to solve your own use case. For Fluentd, their routing examples and copy plugin may be useful.

What do you think?

The techniques you learned in this post will allow you to solve use cases which are advanced and more niche. I am curious to learn the specifics of your use cases. If you found this tutorial useful, and have a use case for which its techniques are applicable, please comment below.

Is there something I missed? If you are a Fluentd or Fluent Bit power user and you think I’ve missed something, please comment as well.

Finally, if you find the open source Golang project I created useful, please comment on the usage issue in its repository. Contributions and ideas for improvements are appreciated. Furthermore, if you liked the approach but do not use Golang, please consider making any code you write open source so that others can benefit. Similar libraries could be created for other languages.

from AWS Open Source Blog: https://aws.amazon.com/blogs/opensource/splitting-application-logs-multiple-streams-fluent/