How to Onboard a New Data Source in Apache Metron


Apache Metron aims to be a tool for analysts and data scientists in a cyber security team to help them defining intelligent alerts, detecting threats and work on them in real-time. This is the first blog post in a row to ease operations and share my experiences with Apache Metron. Thus it serves as an introduction to Metron.


Technical Introduction

Apache Metron is a cyber security platform making heavy use of the Hadoop Ecosystem to create a scalable and available solution. It utilizes Apache Storm and Apache Kafka to parse, enrich, profile, and eventually index data from telemetry sources, such as network traffic, firewall logs, or application logs in real-time. Apache Solr or Elastic Search are used for random access searches, while Apache Hadoop HDFS is used for long term and analytical storage. It comes with its own scripting language “Stellar” to query, transform and enrich data. A security operator/analyst uses the Metron Management UI to configure and manage input sources as well the Metron Alerts UI to search, filter and group events.

Screen Shot 2018-07-18 at 08.07.45

Metron Alerts UI, showing a few dummy events from a Squid log.

Scope of this Post

Since virtually every data source can be used to generate events, it is natural that data from new sources will be added over time. I use this post as a small check list, to document considerations and useful commands, for the “onboarding” process of a new data sources. You might want to automate this process in a way that works for you. In future posts I will cover the steps in detail.

Onboard a New Data Source

  1. I need to ingest data to Kafka:
    • It’s very handy to use Apache NiFi for the ingest part. Just create a data flow consisting of two processors: a simple tcp listener to receive data and a Kafka producer to push the event further into Kafka.
    • I can also push data directly into Kafka if the architecture,  firewall and the source system allow it.
    • If there are no active components on the source system pushing data, I might want to install an instance of MiNiFi on my source system.
  2. Before I can ingest data into Kafka, I need a new Kafka topic:
    • While the Kafka topics “enrichments” and “indexing” Kafka topics will be used by all data sources, the parser topics are specific to a data source.
    • I create a topic with a number of partitions that corresponds to the amounts of data I receive.
  3. To make the events searchable, i.e., to store the events into Apache Solr, I need to create a new Solr collection.
    • For each parser Storm topology and parser Kafka topic, there is a parser Solr collection.
    • I add a few specific fields common to all Metron Solr collections and optionally define data source specific fields in the schema.xml.
    • I create the collection with a a number of shards that corresponds to the amounts of data I receive.
  4. I define my parser in the Metron Management UI:
    • I click the “+” button in the right bottom corner of the Metron Management UI.
    • I configure my parser by choosing a Java class and/or define a Grok pattern, insert a sample and check if the parsed output is what I expect.
    • I configure the parser, add enrichments, threat intel logic and transformations.
    • I save the parser configuration and press the “Play” button next to the new parser to start it.
Screen Shot 2018-07-18 at 08.37.04 1

Metron Management UI with my configured parsers. Currently only the Squid parser is running that produces the events in the first screenshot.



I hope this post was helpful and informative. For questions I refer to the documentation, future posts, the Metron mailing list or post a question below.


Basics of Hadoop User Management

Hadoop is old, everyone has their own Hadoop cluster and everyone knows how to use it. It’s 2018, right? This article is just a collection of a few gotchas, dos and don’ts with respect to User Management that shouldn’t happen in 2018 anymore.


Just a few terms and definitions so that everyone is on the same page for the rest of the article. Roll your eyes and skip that section if you are an advanced user.

  • OS user = user that is provisioned on the operating system level of the nodes of a Hadoop cluster. You can check if the user exists on OS level by doing

id <username>

  • KDC = Key Distribution Center. This might be a standalone KDC implementation, such as the MIT KDC or an integrated one behind a Microsoft Active Directory.
  • Keytab = file that stores the encrypted password of a user provisioned in a KDC. Can be used to authenticate without the need of typing the password using the “kinit” command line tool.


  • Make sure your users are available on all nodes of the OS, as well as in the KDC. This is important for several reasons:
    • When you run a job, the job might create staging/temporary directories in the /tmp/ directory, which are owned by the user running the job. The name of the directory is the name of the OS user, while the ownership belongs to authenticated user. In a secure cluster the authenticated user is the user you obtained a Kerberos ticket for from the KDC.
    • Keytabs on OS level should be only readable by the user OS user who is supposed to authenticate with them for security purposes.
    • When impersonation is turned on for services, e.g., Oozie using the -doas tag, Hive using the property hive.server2.enable.doAs=True property or Storm using the  property, a user authenticated as a principal will run on the OS level as a processs owned by that user. If that user is not known to the OS, the job will fail (to start).


  • Don’t use the hdfs user to run jobs on YARN (it’s forbidden by default and don’t change that configuration). Your problem can be solved in a different way! Only use the hdfs user for administrative tasks on the command line.
  • Don’t run Hive jobs as the “hive” user. The “hive” user is the administrative user and if at all should only be used by the Hadoop/database administrator.
  • Or in general: Don’t use the <service name> user to do  <operation> on <service name>. You saw that coming, hm?

How to Achieve Synchronisation of KDC and OS Level

(…or other user/group management systems). This is a tricky one, if you don’t want to run into a split brain situation, where one system knows one set of users and another one knows others, which may or may not overlap.

  • Automate user provisioning, e.g., by using an Ansible role that provisions a user in the KDC and on all nodes of the Hadoop cluster.
  • Use services such as SSSD (System Security Services Daemon) that integrates users and groups from user and group management services into the operating system. So you won’t need to actually add them to each node, as long as SSSD is up and running.
  • Manually create OS users on all nodes and in the KDC (don’t do that, obviously ;P )


Maybe I’ll expand that list in the future 🙂

Hadoop Security Concepts

While security is a quite complex topic by itself, Hadoop Security can be overwhelming. Thus, I wrote down a state of the art article about Hadoop Security Concepts on Hortonworks Community Connection.


Simplified Depiction of the Hadoop Security Architecture

How to Create a Data Pipeline Using Luigi

This is a simple walk-through of an example usage of Luigi. Online there is the excellent documentation of Spotify themselves. You can find all bits and bytes out there to create your own pipeline script. Also, there are already a few blog posts about what is possible when using Luigi, but then – I believe – it’s not very well described how to implement it. So, in my opinion there is either too much information to just try it out or too few information to actually get started hands-on. Also, I’ll mention a word about security.

Therefore, I publish a full working example of a minimalist pipeline from where you can start, copy and paste everything you need

These are the question I try to answer:

  • What is Luigi and when do I want to use it?
  • How do I setup the Luigi scheduler?
  • How do I specify a Luigi pipeline?
  • How do I schedule a Luigi pipeline?
  • Can I use Luigi with a secure Hadoop cluster?
  • What I like about Luigi?

What is Luigi?

Luigi is a framework written in Python that makes it easy to define and execute complex pipelines in a consistent way. You can use Luigi …

  • … when your data is processed in (micro) batches, rather than it is streamed
  • … when you want to run jobs that depend on (many) other jobs.
  • … when you want to have nice visualizations of your pipelines to keep a good overview.
  • … when you want to integrate data into the Hadoop ecosystem.
  • … when you want to do any of the above and love Python.

Create Infrastructure

Every pipeline can actually be tested using the --local-scheduler tag in the command line. But for production you should use a central scheduler running on one node.

The first thing you want to do is to create a user and a group the scheduler is running as.

groupadd luigi
useradd -g luigi luigi

The second step is to create a Luigi config directory.

sudo mkdir /etc/luigi
sudo chown luigi:luigi /etc/luigi

You also need to install Luigi (and Python and pip) if you did not do that already.

pip install luigi

It’s now time to deploy the configuration file. Put the following file into /etc/luigi/luigi.cfg. In this example the Apache Pig home directory of a Hortonworks Hadoop cluster is specified. There are many more configuration options listed in the official documentation.



Don’t forget to create directories for the process id of the luigi scheduler daemon, the store log and libs.

sudo mkdir /var/run/luigi
sudo mkdir /var/log/luigi
sudo mkdir /var/lib/luigi
chown luigi:luigi /var/run/luigi
chown luigi:luigi /var/log/luigi
chown luigi:luigi /var/lib/luigi

You are now prepared to start up the scheduler daemon.

sudo su - luigi
luigid --background --port 8088 --address --pidfile /var/run/luigi/ --logdir /var/log/luigi --state-path /var/lib/luigi/luigi.state'

A Simple Pipeline

We are now ready to go. Let’s specify an example pipeline that actually can be run without a Hadoop ecosystem present: It reads data from a custom file, counting the number of words and writing the output to a file called count.txt. In this example two of the most basic task types are used: luigi.ExternalTask which requires you to implement the output method and luigi.Task which requires you to implement the requires, output and run methods. I added pydocs to all methods and class definitions, so the code below should speak for itself. You can also view it on Github.

import luigi

class FileInput(luigi.ExternalTask):
    Define the input file for our job:
        The output method of this class defines
        the input file of the class in which FileInput is
        referenced in &quot;requires&quot;

    # Parameter definition: input file path
    input_path = luigi.Parameter()

    def output(self):
        As stated: the output method defines a path.
        If the FileInput  class is referenced in a
        &quot;requires&quot; method of another task class, the
        file can be used with the &quot;input&quot; method in that
        return luigi.LocalTarget(self.input_path)

class CountIt(luigi.Task):
    Counts the words from the input file and saves the
    output into another file.

    input_path = luigi.Parameter()

    def requires(self):
        Requires the output of the previously defined class.
        Can be used as input in this class.
        return FileInput(self.input_path)

    def output(self):
        count.txt is the output file of the job. In a more
        close-to-reality job you would specify a parameter for
        this instead of hardcoding it.
        return luigi.LocalTarget('count.txt')

    def run(self):
        This method opens the input file stream, counts the
        words, opens the output file stream and writes the number.
        word_count = 0
        with self.input().open('r') as ifp:
            for line in ifp:
                word_count += len(line.split(' '))
        with self.output().open('w') as ofp:

if __name__ == &quot;__main__&quot;:

Schedule the Pipeline

To test and schedule your pipeline create a file test.txt with arbitrary content.
We can now execute the pipeline manually by typing

python --input-path test.txt

Use the following if you didn’t set up and configure the central scheduler as described above

python --input-path test.txt -local-scheduler

If you did everything right you will see that no tasks failed and a file count.txt was created that contains the count of the words of your input file.

Try running this job again. You will notice that Luigi will tell you that there already is a dependency present. Luigi detects that the count.txt is already written and will not run the job again.

Now you can easily trigger this pipeline on a daily base by using, e.g., crontab in order to schedule the job to run, e.g., every minute. If your input and output file has the current date in the filename’s suffix, the job will be triggered every minute, but successfully run only exactly once a day.

In a crontab you could do the following:

1 * * * * python --input-path test.txt


The cool thing about Luigi is, that you basically don’t need to worry much about security. Luigi basically uses the security features of the components it interacts with. If you are, e.g., working on a secure Hadoop cluster (that means on a cluster, where Kerberos authentication is enforced) the only thing you need to worry about, is that you obtain a fresh Kerberos ticket before you trigger the job – given that the validity of the ticket is longer than the job needs to finish. I.e., when you schedule your pipeline with cron make sure you do a kinit from a keytab. you can check out my answer to a related question on the Hortonworks community connection for more details on that ( .

What do I like about Luigi?

It combines my favourite programming language and my favourite distributed ecosystem. I didn’t go too much into that now. But Luigi is especially great because of its rich ways to interact with Hadoop Ecosystem services. Instead of a LocalTarget you would rather use HdfsTargets or Amazon S3Targets. You can define and run Pig jobs and there even is a Apache Hive client built in.