Big Data and Stream Processing 101 – Part 4: How to Do a Simple Key-Value Enrichment in Apache NiFi

Demo Video

In my previous blog I published a video that showed how to query a database table. In the next step I want to show how to enrich this data with data from an external API.

Again: enjoy and don’t forget to ask questions or give me feedback!

Configuration

  • This is how the LookupRecord processor is configured
  • This is how the RESTLookupService (Pokemon Lookup Service) is configured:
  • This is the expected input after the QueryDatabaseTableRecord before the LookupRecord processor:
  • This is an expected output example after the LookupRecord processor:

Version Information

This demo was created with Apache Nifi 1.10.

Links

Big Data and Stream Processing 101 – Part 1

Big Data and Stream Processing 101 – Part 2

Big Data and Stream Processing 101 – Part 3

Big Data and Stream Processing 101 – Part 3: How to Connect to a RDBMS Using Apache NiFi

Demo Video

Apache NiFi can be used to connect to (almost) any system, process data and put it into (almost) any (other) system.

In this video, I show specifically how to connect to a relational database, in this case MySQL and walk you through the most basic options you have.

Enjoy and don’t forget to ask questions or give me feedback!

Configuration

Here are the screenshots of the NiFi components used for reference:

  • QueryDatabaseTableRecord
  • MySQL Connection Pool

This is the simple database schema I used for the MySQL example table:

CREATE DATABASE company;
CREATE TABLE people(id int, name varchar(20));
INSERT INTO people (id, name) VALUES (1,'stefan');

Version Information

This demo was recorded on November 9, 2019 using the recently published version of Apache NiFi 1.10.

Links

Big Data and Stream Processing 101 – Part 1

Big Data and Stream Processing 101 – Part 2

Apache Metron Architecture

In one of my previous articles I wrote about Apache Metron as an Example for a Real-Time Streaming Pipeline. Since then, I’ve refined the figure I’ve used to explain the architecture. In this article, I just briefly explain the updated part of the figure and add a video of myself talking about Apache Metron at the Openslava conference in Bratislava using those updated figures in my slides.

Enrichment


I added a few more details into the figure on the enrichment part:

  • The enrichment Storm topology is capable of using external database sources on-boarded into HBase or from the Model as a Service (MaaS) capability.
  • The arrow from the enrichments Kafka topic is not entirely correct, but should depict that data sources coming in in real-time can be stored in HBase as an enrichment source. Correct would be to draw the arrow to HBase directly from the parser topology.
  • Huge data sets can be fairly easily batch loaded into HBase as an enrichment source.
  • The profiler is a Storm topology that saves data of certain (user-defined) entities in a time series to HBase. From there it can be used as an enrichment for any future events as aggregates over time.

Open Source Cyber Security with Apache Metron @ Openslava2018

How to Create a New Parser for Apache Metron

This blog entry goes through the process of a Cyber Platform Operator creating a new parser for Apache Metron and everything you need to consider to make this process as smooth as possible. This can also be seen as a checklist or to-do list when you are creating a new parser.

Assumption: You know what Metron is, the data source is fully onboarded on your platform and the parser config is the only thing that’s missing. Here are the things you need to consider to onboard a new source.

In general, this article walks you through 3 phases:

  • Check if you can re-use an existing parser. If so, you’re done, the testing part of phase 2 still applies, though.
  • Build and test a protoype. Grok is your friend.
  • Write your parser in Java.

Phase 1: Check if you can use an existing parser

  • Get a sample set of your source to test with. The more diverse you expect the formats of the same source to be, the bigger your sample size should be. 20 should be ok to start with.
  • Check the format of the string.
    • If it is in JSON format, use the JSON parser!
    • If it’s a comma separated line, use the CSV parser!
    • Or generally: If it’s in a format of any of the included Metron parsers, use this parser: CEF, Lancope, PaloAltoFirewall, Sourcefire, Logstash, FireEye, Asa, Snort, JSONMap, Ise, GrokWebSphere, Bro,….
    • If it’s something else use the Grok parser!

Phase 2: Build and test a (Grok) prototype

In the rest of the article I assume that you don’t re-use one of the included parsers, which is why you want to create your own custom one. Thus, you leverage the Grok parser. However, the test setup described below and can be used for any kind of parser.

  • Use http://grokdebug.herokuapp.com/ to test one of your samples and start with adding  %{GREEDYDATA:message} and continuously add more precise parsing statements and check if it compiles. If you’re new to Grok start here: https://logz.io/blog/logstash-grok/.
  • Test all of your samples in the app to check if your Grok statement is general enough.
  • You also might want to append %{GREEDYDATA:suffix}(\n|\r|\r\n)?+ to catch any kind of additional data, as well as filter newline and optional carriage-return fields at the end of a line. That depends on how diverse or clean your data source is.
  • Configure and validate the parser in Metron Management UI using “Grok” as parser type and paste the grok statement in the field “Grok Statement”.
    • Attention: don’t forget to define the timestampField, the timeFields and the dateFormat. If you don’t specify those values, the parser validation will fail with an “error_type”: “parser_invalid”. The field configured as the timestampField will be converted into a timestamp parsed based on the inputs from the dateFormat field. Use the Joda time date format documented here.
    • When the datetime is correctly parsed, double-check if the calculated timestamp matches the input time. This online epochconverter comes handy.
    • Note: to consolidate your view of the data across many sources, make sure you name the source ip address “ip_src_addr”, your destination ip address “ip_dst_addr”, your source port “ip_src_port” and your destination port “ip_dst_port”.
    • Note: In general, every parser – not only the Grok parser – has their specific required/default parameters to be set. Read the parser docs to be sure to configure the parsers correctly. Below is an example of how the parserConfig part of your parser configuration file should look like. You configure this part in the Metron Management UI:
metron_managementui_parser.png
  • Double check:
    • If Grok statements are stored in the configured HDFS path: /apps/metron/patterns/mycustomparser
    • If the Zookeeper configuration is up to date: bin/zkCli.sh -server <zookeeper-quorum> get /metron/topology/parsers/mycustomparser.  Specifically look for the parserConfig part shown below.
{
  ...
  "parserClassName": "org.apache.metron.parsers.GrokParser",
  "parserConfig": {
    "grokPath": "/apps/metron/patterns/mycustomparser",
    "patternLabel": "MYCUSTOMPARSER",
    "timestampField": "datetime",
    "timeFields": ["datetime"],
    "dateFormat": "yyyy-MM-dd HH-mm-ss",
    "timezone": "UTC"
  },
  ...
}
metron_parser_test_setup.png
Metron parser test setup: 1. Consume from the parser topic (assuming you initially ingested your sample already) 2. Control your flow rate to release only 1 event per 5 seconds (or which ever speed you like) 3. Write back into the parser topic and check if the event is being processed correctly.
  • Ingest the messages into the Kafka topic using your NiFi test setup and check if they are successfully persisted in your desired collection.

Phase 3: Make your Metron parser production ready

Once you have your custom parser up and running and improved it continuously you want to create something more stable with higher performance than Grok statements. However, nothing is for free. You need to get your hands dirty in Java. Fortunately, it’s not a lot of dirt and it’s quite easy to write your own parser by extending the BasicParser class.

  • Check out this part of the documentation to get a walkthrough: 3rd party parsers
  • In this part of the documentation you’ll learn to:
    • Get to know which dependencies you need.
    • Implement a parser method of your custom parser class extending the BasicParser class.
    • Build the jar and deploy it in the extra-parser directory.
    • Restart Metron Rest service to pick up the new parser from your jar file.
    • Add your parser in the Metron Management UI by choosing your parser type.
    • Configure and start your parser.
  • Stop your interim Grok parser and start your custom Java parser.