Apache Metron as an Example for a Real Time Data Processing Pipeline

In my previous blog post I was writing a little bit about what Apache Metron is and How to Onboard a New Data Source in Apache Metron.

Now I want to shine some light on how the ingestion pipeline architecture looks like. Since I just got started with Apache Metron myself, I hope this helps to kickstart your cyber security efforts. Rather than going too much into the details of what the components do, I’d like to provide a basic overview about which components there are.

This architecture can be generalized for all kinds of streaming use cases. The pipeline uses Apache NiFi for ingest, Apache Kafka as an event buffer, Apache Storm for stream processing, Apache Hadoop for long term storage and Apache Solr for short term random access storage. If you design your own pipeline for a different use case, you can, e.g., swap Apache Storm with frameworks such as Apache Flink or Spark Streaming (or any other frameworks out in the wild with their pros and cons). Choosing the right piece of technology strongly depends on numerous factors, I’m not going into in this article.

metron_pipeline

End to End Processing Pipeline for Apache Metron

Ingest

The most important part for Apache Metron is to get the telemetry data into an Apache Kafka topic. In the figure below you can see that there is a Kafka topic and a corresponding parser for each format. Usually, there is one Kafka topic per source type, because each source typically comes in its own special format, but it’s also possible that data of one source has multiple formats or multiple sources have the same format.

metron_pipeline_ingest_closeup.png

  • Apache NiFi is being used as the data integration tool.
  • In the figure, I added an example of a MiNiFi instance to the Squid Access Log source. In this case MiNiFi is installed on the Squid server node and acts as a log forwarder.
  • It’s also possible that sources write directly into Kafka, if they support that. In some cases this might even be a requirement due to performance constraints.

Parsing

As described in the ingest part: there is a topic for each parser format and an Apache Storm topology reading from this Kafka topic and doing the parsing. A parsed event is then written into the so-called “enrichments” topic.

metron_pipeline_parsing_closeup

  • The parsing has two purposes:
    • it brings all ingest format into a JSON format.
    • it introduces a common set of fields shared among all data sources, as well as unique fields that are special to each source.
  • Some parsers of common formats are included in the Metron project.
  • If there is no parser (that works) for your format, you can use Grok to quickly prototype and launch your parser before you write it in Java.
  • It is also possible to launch parser chains to extract information that is convoluted in different formats.
  • You can also decide to run only one topology handling multiple parsers in a so-called aggregated parser. This can be combined with parser chains.

Enrichment

The purpose of the enrichment Storm topology is to pick up events from the enrichments topic and add information from external sources. The enriched output is written to an indexing topic.

metron_pipeline_enrichment_closeup

  • A typical enrichment is a lookup in a database to convert an IP address into geo information
  • The profiler uses sliding windows to create aggregates/statistics in certain time windows, so-called profiles.
  • These profiles can be used to enrich data.
  • Metron helps you use any data in HBase to enrich your events.

Persisting

There are two Storm topologies to read from the indexing topic that persist events, the batch indexing topology and the random access indexing topology. The first utilizes an HDFSBolt to write data to HDFS. The latter one indexes data in Apache Solr.

metron_pipeline_persisting_closeup

  • There is one Solr collection per data format.
    • This way the parsed fields and definitions are kept clean and separated.
    • Also, you can authorize different users and groups to different data sources. This is even easier with the Solr Plugin for Apache Ranger.
  • HDFS is used as long term storage for analytical purposes and to use the data to create machine learning models.
  • Solr is being used for direct fast random access and search capabilities, e.g. by the Metron Alerts UI. It makes sense to store the data for only a limited amount of time for performance reasons.
  • It’s quite easy to create a new collection. I’ve described it on this github gist. I’ve added properties in the solrconfig.xml to define a “time to live” for an event in Solr, after which the event will be deleted from the collection.
  • Instead of Solr, you can use Elastic Search.

Conclusion

I hope this can be useful for somebody, either trying to implement Metron or somebody interested in how modern streaming pipelines look like in general. If you have questions, don’t hesitate to ask the experts in the Metron mailing list (user@metron.apache.org) or get support from the Hortonworks Community.

How to Write a Marker File in a Luigi “PigJobTask”

This is supposed to be a brief aid to memory on how to write marker files, when using “Luigi“, which I explained in a former blog post.

What is a Marker File?

A marker file is an empty file created with the sole purpose of signalizing to another process or application that some process is currently ongoing or finished. In the context of scheduling using Luigi, a marker file signalizes the Luigi scheduler that a certain task of a pipeline has already been finished and does not need to (re-)run anymore.

How the Common Luigi Job Rerun Logic Works

Every Luigi task has a run method. In this run method you can use any sort of (Python) code you desire. You can access the input and output streams of the Task object and use it to write data to the output stream. The principle is that a Luigi Task will not run again, if the file with the filename defined in the output target already exists. This can be either a LocalTarget (local file) or an HDFSTarget (file saved to HDFS) or any other custom target. That’s basically it.

How to Write a Marker File in a PigJobTask

Using a PigJobTask, the idea is that you run a Pig script of any complexity. You define the input and output files in your pig script. In the Luigi pipeline, you basically define the pig script location that you want to run and optionally a few other parameters depending on your Hadoop cluster configuration, but you don’t need to implement the run method anymore.

The scenario is that you do not have access to the HDFS output directory, e.g. because its the Hive warehouse directory or the Solr index directory,… or you simply can’t determine the output name of the underlying MapReduce job. So you need to “manually” create an empty file locally or in HDFS that signalizes Luigi that the job already has successfully run. You can specify an arbitrary output file in the output method. This will not create a marker file yet. The trick is to implement the run method specify explicitly to execute the pig script and do arbitrary stuff, such as creating a marker file, afterwards in the method.

You can see a sample PigJobTask that utilizes this technique below

class HiveLoader(luigi.contrib.pig.PigJobTask):
'''
Pig script executor to load files from HDFS into a Hive table (can be Avro, ORC,....)
'''

input_directory = luigi.Parameter()
hive_table = luigi.Parameter()
pig_script = luigi.Parameter()
staging_dir = luigi.Parameter(default='./staging_')

def requires(self):
return DependentTask() # requirement

def output(self):
'''
Here the output file that determines if a task was run is written.
Can be LocalTarget or HDFSTarget or ...
'''
return luigi.LocalTarget(self.staging_dir + "checkpoint")

def pig_options(self):
'''
These are the pig options you want to start the pig client with
'''
return ['-useHCatalog']

def pig_script_path(self):
'''
Execute pig script.
'''
return self.pig_script

def pig_parameters(self):
'''
Set Pig input parameter strings here.
'''
return {'INPUT': self.input_directory,
'HIVE_TABLE': self.hive_table
}

def run(self):
'''
This is the important part. You basically tell the run method to run the Pig
script. Afterwards you do what you want to do. Basically you want to write an
empty output file - or in this case you write "SUCCESS" to the file.
'''
luigi.contrib.pig.PigJobTask.run(self)
with self.output().open('w') as f:
f.write("SUCCESS")

How to Create a Data Pipeline Using Luigi

This is a simple walk-through of an example usage of Luigi. Online there is the excellent documentation of Spotify themselves. You can find all bits and bytes out there to create your own pipeline script. Also, there are already a few blog posts about what is possible when using Luigi, but then – I believe – it’s not very well described how to implement it. So, in my opinion there is either too much information to just try it out or too few information to actually get started hands-on. Also, I’ll mention a word about security.

Therefore, I publish a full working example of a minimalist pipeline from where you can start, copy and paste everything you need

These are the question I try to answer:

  • What is Luigi and when do I want to use it?
  • How do I setup the Luigi scheduler?
  • How do I specify a Luigi pipeline?
  • How do I schedule a Luigi pipeline?
  • Can I use Luigi with a secure Hadoop cluster?
  • What I like about Luigi?

What is Luigi?

Luigi is a framework written in Python that makes it easy to define and execute complex pipelines in a consistent way. You can use Luigi …

  • … when your data is processed in (micro) batches, rather than it is streamed
  • … when you want to run jobs that depend on (many) other jobs.
  • … when you want to have nice visualizations of your pipelines to keep a good overview.
  • … when you want to integrate data into the Hadoop ecosystem.
  • … when you want to do any of the above and love Python.

Create Infrastructure

Every pipeline can actually be tested using the --local-scheduler tag in the command line. But for production you should use a central scheduler running on one node.

The first thing you want to do is to create a user and a group the scheduler is running as.

groupadd luigi
useradd -g luigi luigi

The second step is to create a Luigi config directory.

sudo mkdir /etc/luigi
sudo chown luigi:luigi /etc/luigi

You also need to install Luigi (and Python and pip) if you did not do that already.

pip install luigi

It’s now time to deploy the configuration file. Put the following file into /etc/luigi/luigi.cfg. In this example the Apache Pig home directory of a Hortonworks Hadoop cluster is specified. There are many more configuration options listed in the official documentation.

[core]
default-scheduler-host=www.example.com
default-scheduler-port=8088

[pig]
home=/usr/hdp/current/pig-client

Don’t forget to create directories for the process id of the luigi scheduler daemon, the store log and libs.

sudo mkdir /var/run/luigi
sudo mkdir /var/log/luigi
sudo mkdir /var/lib/luigi
chown luigi:luigi /var/run/luigi
chown luigi:luigi /var/log/luigi
chown luigi:luigi /var/lib/luigi

You are now prepared to start up the scheduler daemon.

sudo su - luigi
luigid --background --port 8088 --address www.example.com --pidfile /var/run/luigi/luigi.pid --logdir /var/log/luigi --state-path /var/lib/luigi/luigi.state'

A Simple Pipeline

We are now ready to go. Let’s specify an example pipeline that actually can be run without a Hadoop ecosystem present: It reads data from a custom file, counting the number of words and writing the output to a file called count.txt. In this example two of the most basic task types are used: luigi.ExternalTask which requires you to implement the output method and luigi.Task which requires you to implement the requires, output and run methods. I added pydocs to all methods and class definitions, so the code below should speak for itself. You can also view it on Github.

import luigi

class FileInput(luigi.ExternalTask):
    '''
    Define the input file for our job:
        The output method of this class defines
        the input file of the class in which FileInput is
        referenced in "requires"
    '''

    # Parameter definition: input file path
    input_path = luigi.Parameter()

    def output(self):
        '''
        As stated: the output method defines a path.
        If the FileInput  class is referenced in a
        "requires" method of another task class, the
        file can be used with the "input" method in that
        class.
        '''
        return luigi.LocalTarget(self.input_path)

class CountIt(luigi.Task):
    '''
    Counts the words from the input file and saves the
    output into another file.
    '''

    input_path = luigi.Parameter()

    def requires(self):
        '''
        Requires the output of the previously defined class.
        Can be used as input in this class.
        '''
        return FileInput(self.input_path)

    def output(self):
        '''
        count.txt is the output file of the job. In a more
        close-to-reality job you would specify a parameter for
        this instead of hardcoding it.
        '''
        return luigi.LocalTarget('count.txt')

    def run(self):
        '''
        This method opens the input file stream, counts the
        words, opens the output file stream and writes the number.
        '''
        word_count = 0
        with self.input().open('r') as ifp:
            for line in ifp:
                word_count += len(line.split(' '))
        with self.output().open('w') as ofp:
            ofp.write(unicode(word_count))

if __name__ == "__main__":
    luigi.run(main_task_cls=CountIt)

Schedule the Pipeline

To test and schedule your pipeline create a file test.txt with arbitrary content.
We can now execute the pipeline manually by typing

python pipe.py --input-path test.txt

Use the following if you didn’t set up and configure the central scheduler as described above

python pipe.py --input-path test.txt -local-scheduler

If you did everything right you will see that no tasks failed and a file count.txt was created that contains the count of the words of your input file.

Try running this job again. You will notice that Luigi will tell you that there already is a dependency present. Luigi detects that the count.txt is already written and will not run the job again.

Now you can easily trigger this pipeline on a daily base by using, e.g., crontab in order to schedule the job to run, e.g., every minute. If your input and output file has the current date in the filename’s suffix, the job will be triggered every minute, but successfully run only exactly once a day.

In a crontab you could do the following:

1 * * * * python pipe.py --input-path test.txt

Security

The cool thing about Luigi is, that you basically don’t need to worry much about security. Luigi basically uses the security features of the components it interacts with. If you are, e.g., working on a secure Hadoop cluster (that means on a cluster, where Kerberos authentication is enforced) the only thing you need to worry about, is that you obtain a fresh Kerberos ticket before you trigger the job – given that the validity of the ticket is longer than the job needs to finish. I.e., when you schedule your pipeline with cron make sure you do a kinit from a keytab. you can check out my answer to a related question on the Hortonworks community connection for more details on that (https://community.hortonworks.com/questions/5488/what-are-the-required-steps-we-need-to-follow-in-s.html#answer-5490) .

What do I like about Luigi?

It combines my favourite programming language and my favourite distributed ecosystem. I didn’t go too much into that now. But Luigi is especially great because of its rich ways to interact with Hadoop Ecosystem services. Instead of a LocalTarget you would rather use HdfsTargets or Amazon S3Targets. You can define and run Pig jobs and there even is a Apache Hive client built in.