Big Data and Stream Processing 101 – Part 2: How to Create a Simple Ranger Policy

In order to continue the series “Big Data and Stream Processing 101”, I created a little video to show the most basic operation of Apache Ranger: Apache Ranger can do much more, but the most basic operation is creating a single policy for a single resource and a single user.

It’s not best practice to give single users access to certain resources. For productive setups you’d look more into giving certain roles access to certain resources or tags. But that’s not being discussed here.

Part 1: Overview of Tools and Frameworks

How to Create a New Parser for Apache Metron

This blog entry goes through the process of a Cyber Platform Operator creating a new parser for Apache Metron and everything you need to consider to make this process as smooth as possible. This can also be seen as a checklist or to-do list when you are creating a new parser.

Assumption: You know what Metron is, the data source is fully onboarded on your platform and the parser config is the only thing that’s missing. Here are the things you need to consider to onboard a new source.

In general, this article walks you through 3 phases:

  • Check if you can re-use an existing parser. If so, you’re done, the testing part of phase 2 still applies, though.
  • Build and test a protoype. Grok is your friend.
  • Write your parser in Java.

Phase 1: Check if you can use an existing parser

  • Get a sample set of your source to test with. The more diverse you expect the formats of the same source to be, the bigger your sample size should be. 20 should be ok to start with.
  • Check the format of the string.
    • If it is in JSON format, use the JSON parser!
    • If it’s a comma separated line, use the CSV parser!
    • Or generally: If it’s in a format of any of the included Metron parsers, use this parser: CEF, Lancope, PaloAltoFirewall, Sourcefire, Logstash, FireEye, Asa, Snort, JSONMap, Ise, GrokWebSphere, Bro,….
    • If it’s something else use the Grok parser!

Phase 2: Build and test a (Grok) prototype

In the rest of the article I assume that you don’t re-use one of the included parsers, which is why you want to create your own custom one. Thus, you leverage the Grok parser. However, the test setup described below and can be used for any kind of parser.

  • Use http://grokdebug.herokuapp.com/ to test one of your samples and start with adding  %{GREEDYDATA:message} and continuously add more precise parsing statements and check if it compiles. If you’re new to Grok start here: https://logz.io/blog/logstash-grok/.
  • Test all of your samples in the app to check if your Grok statement is general enough.
  • You also might want to append %{GREEDYDATA:suffix}(\n|\r|\r\n)?+ to catch any kind of additional data, as well as filter newline and optional carriage-return fields at the end of a line. That depends on how diverse or clean your data source is.
  • Configure and validate the parser in Metron Management UI using “Grok” as parser type and paste the grok statement in the field “Grok Statement”.
    • Attention: don’t forget to define the timestampField, the timeFields and the dateFormat. If you don’t specify those values, the parser validation will fail with an “error_type”: “parser_invalid”. The field configured as the timestampField will be converted into a timestamp parsed based on the inputs from the dateFormat field. Use the Joda time date format documented here.
    • When the datetime is correctly parsed, double-check if the calculated timestamp matches the input time. This online epochconverter comes handy.
    • Note: to consolidate your view of the data across many sources, make sure you name the source ip address “ip_src_addr”, your destination ip address “ip_dst_addr”, your source port “ip_src_port” and your destination port “ip_dst_port”.
    • Note: In general, every parser – not only the Grok parser – has their specific required/default parameters to be set. Read the parser docs to be sure to configure the parsers correctly. Below is an example of how the parserConfig part of your parser configuration file should look like. You configure this part in the Metron Management UI:
metron_managementui_parser.png
  • Double check:
    • If Grok statements are stored in the configured HDFS path: /apps/metron/patterns/mycustomparser
    • If the Zookeeper configuration is up to date: bin/zkCli.sh -server <zookeeper-quorum> get /metron/topology/parsers/mycustomparser.  Specifically look for the parserConfig part shown below.
{
  ...
  "parserClassName": "org.apache.metron.parsers.GrokParser",
  "parserConfig": {
    "grokPath": "/apps/metron/patterns/mycustomparser",
    "patternLabel": "MYCUSTOMPARSER",
    "timestampField": "datetime",
    "timeFields": ["datetime"],
    "dateFormat": "yyyy-MM-dd HH-mm-ss",
    "timezone": "UTC"
  },
  ...
}
metron_parser_test_setup.png
Metron parser test setup: 1. Consume from the parser topic (assuming you initially ingested your sample already) 2. Control your flow rate to release only 1 event per 5 seconds (or which ever speed you like) 3. Write back into the parser topic and check if the event is being processed correctly.
  • Ingest the messages into the Kafka topic using your NiFi test setup and check if they are successfully persisted in your desired collection.

Phase 3: Make your Metron parser production ready

Once you have your custom parser up and running and improved it continuously you want to create something more stable with higher performance than Grok statements. However, nothing is for free. You need to get your hands dirty in Java. Fortunately, it’s not a lot of dirt and it’s quite easy to write your own parser by extending the BasicParser class.

  • Check out this part of the documentation to get a walkthrough: 3rd party parsers
  • In this part of the documentation you’ll learn to:
    • Get to know which dependencies you need.
    • Implement a parser method of your custom parser class extending the BasicParser class.
    • Build the jar and deploy it in the extra-parser directory.
    • Restart Metron Rest service to pick up the new parser from your jar file.
    • Add your parser in the Metron Management UI by choosing your parser type.
    • Configure and start your parser.
  • Stop your interim Grok parser and start your custom Java parser.

How to Troubleshoot an Apache Storm Topology

Apache Storm is a real-time, fault-tolerant, event-based streaming framework and platform that runs your code in a highly parallelized way on distributed nodes. It’s all about Spouts (processing units to read from data sources) and Bolts (general processing units). Storm is often used to read data from Apache Kafka and write the results back to Kafka or to a data store. Apache Storm and Apache Kafka are the work horses of the cyber security platform Apache Metron. Storm is also being used internally by the Streaming Analytics Manager (SAM)

This article guides you through the debugging process and points you to the places you need to tweak your configuration to get your topology up and running in a kerberized environment in case certain errors occur. For basic information on how to authenticate your application check out the reference implementation by Pierre Villard on his Github page.

Prerequisites

I assume that you start from a certain point:

  • Your Storm cluster and the services you communicate with (Kafka, Zookeeper, HBase) is up and running as well as secure, i.e., the authentication happens through the Kerberos protocol.
  • Your Storm cluster is configured to run topologies as the OS user corresponding to the Kerberos principal who submitted the topology. (See: “Run worker processes as user who submitted the topology” in the excellent article of the Storm documentation)
  • Your topology (written in Java) is ready to be deployed and authentication is put in place.

Debugging Process

  • Use the Storm UI to check if the topology’s workers are throwing any errors and on which machine they are running! The worker’s log files are stored on the machine the worker is running in /var/log/storm/workers-artifacts/<topology-name><unique-id>/<port-number>/worker.log.
  • Check the input data and output data of your Storm topology. In case you are using Kafka, connect via the Kafka console consumer and read from the input and the output topic of your topology! If you don’t see any events in the input Kafka topic, you should check upstream for errors. If you do see input events, but no output events, refer to your topology logs described in the item above. If you do see output events, check if they have the expected format (data format, number and kind of fields are correct, fields contain data that makes sense as opposed to null values)
Screen Shot 2018-07-16 at 19.49.37
# List Kafka topics:
bin/kafka-topics.sh --zookeeper <zookeeper.hostname>:<zookeeper.port> --list

# print messages as they are written on stdout from input topic
bin/kafka-console-consumer.sh --bootstrap-server <kafka.broker.hostname>:<kafka.broker.port> --topic input

# print messages as they are written on stdout from output topic
bin/kafka-console-consumer.sh --bootstrap-server <kafka.broker.hostname>:<kafka.broker.port> --topic output




Possible Error Scenarios

Authentication Errors Exception

Caused by: javax.security.auth.login.LoginException: Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user. not available to garner authentication information from the user

Your topology is being submitted and the supervisor tries to start and initialize the Spouts and Bolts in the worker process based on the configuration you provided. When this error occurs the worker process is killed and the supervisor tries to spawn a new worker process. On the machine the worker is supposed to run, you can see a worker process popping up with a certain PID (ps aux | grep <topology_name>). A few seconds later this process is killed and a process exactly as the old one is started with a different PID. You can also tail the worker log and see this error message. Soon afterwards the “Worker has died” message appears. This can happen for various reasons:

  • The OS user running the topology does not have the permission to read the keytabs configured in the jaas config file. Check with ps aux or top which user is running and check if the keytab has the correct POSIX attributes. Usually it should be read-only by the owning user (-r– — — <topology-user><topology-user>)
  • The jaas configuration points to the wrong keytabs to be used for authentication and the OS user does not have permission to those. Check with ps aux which jaas file is configured. You might find an option there. Check if this jaas config file has the desired authentication options configured. If not configure your own and pass it to the topology.
-Djava.security.auth.login.config=/etc/storm/conf/client_jaas.conf

How to Troubleshoot Apache Knox

Apache Knox is a gateway application and the door to access data in a data lake hidden behind a firewall. While the usage is fairly simple the setup, configuration and debugging process can be tedious due to many different components that Apache Knox ties together. I published this article also on Hortonworks Community Connection.

15403-17-05-08-security-concepts-knox
Common Security Architecture Using Apache Knox for Data Access

Start Small

  • First try to access the service directly before you go over Knox. In many cases, there’s nothing wrong with your Knox setup, but with either the way you setup and configured the service behind Knox or the way you try to access that service.
  • When you are familiar on how to access your service directly and when you have verified that it works as intended, try to do the same call on Knox.

Example:

  • You want to check if webhdfs is reachable so you first verify directly at the service and try to get the home directory of the service.
curl --negotiate -u : http://webhdfs-host.field.hortonworks.com:50070/webhdfs/v1/?op=GETHOMEDIRECTORY
  • If above request gives a valid 200 response and a meaningful answer you can safely check your Knox setup.
curl -k -u myUsername:myPassword https://knox-host.field.hortonworks.com:8443/gateway/default/webhdfs/v1/?op=GETHOMEDIRECTORY
  • Note: Direct access of WebHDFS and access of WebHDFS over Knox use two different authentication mechanisms: The first one uses SPNEGO which requires a valid Kerberos TGT in a secure cluster, if you don’t want to receive a “401 – Unauthorized” response. The latter one uses HTTP basic authentication against an LDAP, which is why you need to provide username and password on the command line.
  • Note 2: For the sake of completeness, I mention that here: Obviously, you direct the first request directly to the service host and port, while you direct your second request to the Knox host and port and specify which service.

The next section answers the question, what to do if the second command fails? (If the first command fails, go setup your service correctly and return later).

Security Related Issues

So what do the HTTP response codes mean for a Knox application? Where to start?

Authentication Issues

  • Very common are “401 – Unauthorized”. This can be misleading, since 401 is always tied to authentication – not authorization. That means you need to probably check one of the following items. Which of these items causes the error can be found in the knox log (per default /var/log/knox/gateway.log)
  • Is your username password combination correct (LDAP)?
  • Is your username password combination in the LDAP you used?
  • Is your LDAP server running?
  • Is your LDAP configuration in the Knox topology correct (hostname, port, binduser, binduser password,…)?
  • Is your LDAP controller accessible through the firewall (ports 389 or 636 open from the Knox host)?
  • Note: Currently (in HDP 2.6), you can specify an alias for the binduser password. Make sure, that this alias is all lowercase. Otherwise you will get a 401 response as well.

Authorization Issues

  • If you got past the 401s, a popular response code is “403 – Unauthorized”. Now this has actually really something to do with authorization. Depending on if you use ACL authorization or Ranger Authorization (which is recommended) you go ahead differently. If you use ACLs, make sure that the user/group is authorized in your topology definition. If you use Ranger, check the Ranger audit log dashboard and you will immediately notice two possible error sources:
    Your user/group is not allowed to use Knox.
    Your user/group is not allowed to use the service that you want to access behind Knox.
    Well, we came a long way and with respect to security we are almost done. One possible problem you could become is with impersonation. You need knox to be allowed to impersonate any user who access a service with knox. This is a configuration in core-site.xml: hadoop.proxyuser.knox.groups and hadoop.proxyuser.knox.hosts. Enter a comma separated list of groups and hosts that should be able to access a service over knox or set a wildcard *.
    This is what you get in the Knox log, when your Ranger Admin server is not running and policies cannot be refreshed.
2017-07-05 21:11:53,700 ERROR util.PolicyRefresher (PolicyRefresher.java:loadPolicyfromPolicyAdmin(288)) - PolicyRefresher(serviceName=condlahdp_knox): failed to refresh policies. Will continue to use last known version of policies (3) javax.ws.rs.ProcessingException: java.net.ConnectException: Connection refused (Connection refused)

This is also a nice example of Ranger’s design to not interfere with services if it’s down: policies will not be refreshed, but are still able operate as intended with the set of policies before Ranger crashed.

Application Specific Issues

Once you are past the authentication and authorization issues, there might be issues with how Knox interacts with its applications. This section might grow with time. If you have more examples of application specific issues, leave a comment or send me an email.

Hive

  • To enable Hive working with Knox, you need to change the transport mode from binary to http. It might be necessary in rare cases to not only restart Hiveserver2 after this configuration change, but also the Knox gateway.
  • This is what you get when you don’t switch the transport mode from “binary” to “http”. Binary runs on port 10000, http runs on port 10001. When binary transport mode is still active Knox will try to connect to port 10001 which is not available and thus fails with “Connection refused”.
2017-07-05 08:24:31,508 WARN hadoop.gateway (DefaultDispatch.java:executeOutboundRequest(146)) - Connection exception dispatching request: http://condla0.field.hortonworks.com:10001/cliservice?doAs=user org.apache.http.conn.HttpHostConnectException: Connect to condla0.field.hortonworks.com:10001 [condla0.field.hortonworks.com/172.26.201.30] failed: Connection refused (Connection refused) org.apache.http.conn.HttpHostConnectException: Connect to condla0.field.hortonworks.com:10001 [condla0.field.hortonworks.com/172.26.201.30] failed: Connection refused (Connection refused) at org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:151) at org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:353)
  • When you fixed all possible HTTP 401 errors for other services than Hive, but still get on in Hive, you might forget to pass username and password to beeline
beeline -u "<jdbc-connection-string>" -n "<username>" -p "<password>"
  • The correct jdbc-connection-string should have a format as in the example below:
jdbc:hive2://$KNOX_HOSTNAME:$KNOX_PORT/default;ssl=true;sslTrustStore=$TRUSTSTORE_PATH;trustStorePassword=$TRUSTSTORE_SECRET;transportMode=http;httpPath=gateway/default/hive
    • $TRUSTSTORE_PATH is the path to the truststore containing the knox server certificate, on the server with root access you could e.g. use /usr/hdp/current/knox-server/data/security/keystores/gateway.jks
    • $KNOX_HOSTNAME is the hostname where the Knox instance is running
    • $KNOX_PORT is the port exposed by Knox
    • $TRUSTSTORE_SECRET is the secret you are using for your truststore
  • Now, this is what you get, when you connect via beeline trying to talk to Knox from a different (e.g. internal) hostname than the one configured in the ssl certificate of the server. Just change the hostname and everything will work fine. While this error is not specifically Hive related, you will most of the time encounter it in combination with Hive, since most of the other services don’t require you to check your certificates. 
Connecting to jdbc:hive2://knoxserver-internal.field.hortonworks.com:8443/;ssl=true;sslTrustStore=truststore.jks;trustStorePassword=myPassword;transportMode=http;httpPath=gateway/default/hive 17/07/06 12:13:37 [main]: ERROR jdbc.HiveConnection: Error opening session org.apache.thrift.transport.TTransportException: javax.net.ssl.SSLPeerUnverifiedException: Host name 'knoxserver-internal.field.hortonworks.com' does not match the certificate subject provided by the peer (CN=knoxserver.field.hortonworks.com, OU=Test, O=Hadoop, L=Test, ST=Test, C=US)

HBase

  • WEBHBASE is the service in a Knox topology to access HBase via the HBase REST server. Of course, a prerequisite is that the HBase REST server is up and running.
  • Even if it is up and running it can occur that you receive an Error with HTTP code 503. 503: Unavailable. This is not related to Knox. You can track down the issue to a HBase REST server related issue, in which the authenticated user does not have privileges to e.g. scan the data. Give the user the correct permissions to solve this error.

How to Write a Marker File in a Luigi “PigJobTask”

This is supposed to be a brief aid to memory on how to write marker files, when using “Luigi“, which I explained in a former blog post.

What is a Marker File?

A marker file is an empty file created with the sole purpose of signalizing to another process or application that some process is currently ongoing or finished. In the context of scheduling using Luigi, a marker file signalizes the Luigi scheduler that a certain task of a pipeline has already been finished and does not need to (re-)run anymore.

How the Common Luigi Job Rerun Logic Works

Every Luigi task has a run method. In this run method you can use any sort of (Python) code you desire. You can access the input and output streams of the Task object and use it to write data to the output stream. The principle is that a Luigi Task will not run again, if the file with the filename defined in the output target already exists. This can be either a LocalTarget (local file) or an HDFSTarget (file saved to HDFS) or any other custom target. That’s basically it.

How to Write a Marker File in a PigJobTask

Using a PigJobTask, the idea is that you run a Pig script of any complexity. You define the input and output files in your pig script. In the Luigi pipeline, you basically define the pig script location that you want to run and optionally a few other parameters depending on your Hadoop cluster configuration, but you don’t need to implement the run method anymore.

The scenario is that you do not have access to the HDFS output directory, e.g. because its the Hive warehouse directory or the Solr index directory,… or you simply can’t determine the output name of the underlying MapReduce job. So you need to “manually” create an empty file locally or in HDFS that signalizes Luigi that the job already has successfully run. You can specify an arbitrary output file in the output method. This will not create a marker file yet. The trick is to implement the run method specify explicitly to execute the pig script and do arbitrary stuff, such as creating a marker file, afterwards in the method.

You can see a sample PigJobTask that utilizes this technique below

class HiveLoader(luigi.contrib.pig.PigJobTask):
    '''
    Pig script executor to load files from HDFS into a Hive table 
    (can be Avro, ORC,....)
    '''

    input_directory = luigi.Parameter()
    hive_table = luigi.Parameter()
    pig_script = luigi.Parameter()
    staging_dir = luigi.Parameter(default='./staging_')

def requires(self):
    return DependentTask() # requirement

def output(self):
    '''
    Here the output file that determines if a task was run is written.
    Can be LocalTarget or HDFSTarget or ...
    '''
    return luigi.LocalTarget(self.staging_dir + "checkpoint")

def pig_options(self):
    '''
    These are the pig options you want to start the pig client with
    '''
    return ['-useHCatalog']

def pig_script_path(self):
    '''
    Execute pig script.
    '''
    return self.pig_script

def pig_parameters(self):
    '''
    Set Pig input parameter strings here.
    '''
    return {
        'INPUT': self.input_directory,
        'HIVE_TABLE': self.hive_table
    }

def run(self):
    '''
    This is the important part. You basically tell the run method to run the Pig
    script. Afterwards you do what you want to do. Basically you want to write an
    empty output file - or in this case you write "SUCCESS" to the file.
    '''
    luigi.contrib.pig.PigJobTask.run(self)
    with self.output().open('w') as f:
    f.write("SUCCESS")

How to Create a Data Pipeline Using Luigi

This is a simple walk-through of an example usage of Luigi. Online there is the excellent documentation of Spotify themselves. You can find all bits and bytes out there to create your own pipeline script. Also, there are already a few blog posts about what is possible when using Luigi, but then – I believe – it’s not very well described how to implement it. So, in my opinion there is either too much information to just try it out or too few information to actually get started hands-on. Also, I’ll mention a word about security.

Therefore, I publish a full working example of a minimalist pipeline from where you can start, copy and paste everything you need

These are the question I try to answer:

  • What is Luigi and when do I want to use it?
  • How do I setup the Luigi scheduler?
  • How do I specify a Luigi pipeline?
  • How do I schedule a Luigi pipeline?
  • Can I use Luigi with a secure Hadoop cluster?
  • What I like about Luigi?

What is Luigi?

Luigi is a framework written in Python that makes it easy to define and execute complex pipelines in a consistent way. You can use Luigi …

  • … when your data is processed in (micro) batches, rather than it is streamed
  • … when you want to run jobs that depend on (many) other jobs.
  • … when you want to have nice visualizations of your pipelines to keep a good overview.
  • … when you want to integrate data into the Hadoop ecosystem.
  • … when you want to do any of the above and love Python.

Create Infrastructure

Every pipeline can actually be tested using the --local-scheduler tag in the command line. But for production you should use a central scheduler running on one node.

The first thing you want to do is to create a user and a group the scheduler is running as.

groupadd luigi
useradd -g luigi luigi

The second step is to create a Luigi config directory.

sudo mkdir /etc/luigi
sudo chown luigi:luigi /etc/luigi

You also need to install Luigi (and Python and pip) if you did not do that already.

pip install luigi

It’s now time to deploy the configuration file. Put the following file into /etc/luigi/luigi.cfg. In this example the Apache Pig home directory of a Hortonworks Hadoop cluster is specified. There are many more configuration options listed in the official documentation.

[core]
default-scheduler-host=www.example.com
default-scheduler-port=8088

[pig]
home=/usr/hdp/current/pig-client

Don’t forget to create directories for the process id of the luigi scheduler daemon, the store log and libs.

sudo mkdir /var/run/luigi
sudo mkdir /var/log/luigi
sudo mkdir /var/lib/luigi
chown luigi:luigi /var/run/luigi
chown luigi:luigi /var/log/luigi
chown luigi:luigi /var/lib/luigi

You are now prepared to start up the scheduler daemon.

sudo su - luigi
luigid --background --port 8088 --address www.example.com --pidfile /var/run/luigi/luigi.pid --logdir /var/log/luigi --state-path /var/lib/luigi/luigi.state'

A Simple Pipeline

We are now ready to go. Let’s specify an example pipeline that actually can be run without a Hadoop ecosystem present: It reads data from a custom file, counting the number of words and writing the output to a file called count.txt. In this example two of the most basic task types are used: luigi.ExternalTask which requires you to implement the output method and luigi.Task which requires you to implement the requires, output and run methods. I added pydocs to all methods and class definitions, so the code below should speak for itself. You can also view it on Github.

import luigi

class FileInput(luigi.ExternalTask):
'''
Define the input file for our job:
The output method of this class defines
the input file of the class in which FileInput is
referenced in &quot;requires&quot;
'''

# Parameter definition: input file path
input_path = luigi.Parameter()

def output(self):
'''
As stated: the output method defines a path.
If the FileInput  class is referenced in a
&quot;requires&quot; method of another task class, the
file can be used with the &quot;input&quot; method in that
class.
'''
return luigi.LocalTarget(self.input_path)

class CountIt(luigi.Task):
'''
Counts the words from the input file and saves the
output into another file.
'''

input_path = luigi.Parameter()

def requires(self):
'''
Requires the output of the previously defined class.
Can be used as input in this class.
'''
return FileInput(self.input_path)

def output(self):
'''
count.txt is the output file of the job. In a more
close-to-reality job you would specify a parameter for
this instead of hardcoding it.
'''
return luigi.LocalTarget('count.txt')

def run(self):
'''
This method opens the input file stream, counts the
words, opens the output file stream and writes the number.
'''
word_count = 0
with self.input().open('r') as ifp:
for line in ifp:
word_count += len(line.split(' '))
with self.output().open('w') as ofp:
ofp.write(unicode(word_count))

if __name__ == &quot;__main__&quot;:
luigi.run(main_task_cls=CountIt)

Schedule the Pipeline

To test and schedule your pipeline create a file test.txt with arbitrary content.
We can now execute the pipeline manually by typing

python pipe.py --input-path test.txt

Use the following if you didn’t set up and configure the central scheduler as described above

python pipe.py --input-path test.txt -local-scheduler

If you did everything right you will see that no tasks failed and a file count.txt was created that contains the count of the words of your input file.

Try running this job again. You will notice that Luigi will tell you that there already is a dependency present. Luigi detects that the count.txt is already written and will not run the job again.

Now you can easily trigger this pipeline on a daily base by using, e.g., crontab in order to schedule the job to run, e.g., every minute. If your input and output file has the current date in the filename’s suffix, the job will be triggered every minute, but successfully run only exactly once a day.

In a crontab you could do the following:

1 * * * * python pipe.py --input-path test.txt

Security

The cool thing about Luigi is, that you basically don’t need to worry much about security. Luigi basically uses the security features of the components it interacts with. If you are, e.g., working on a secure Hadoop cluster (that means on a cluster, where Kerberos authentication is enforced) the only thing you need to worry about, is that you obtain a fresh Kerberos ticket before you trigger the job – given that the validity of the ticket is longer than the job needs to finish. I.e., when you schedule your pipeline with cron make sure you do a kinit from a keytab. you can check out my answer to a related question on the Hortonworks community connection for more details on that (https://community.hortonworks.com/questions/5488/what-are-the-required-steps-we-need-to-follow-in-s.html#answer-5490) .

What do I like about Luigi?

It combines my favourite programming language and my favourite distributed ecosystem. I didn’t go too much into that now. But Luigi is especially great because of its rich ways to interact with Hadoop Ecosystem services. Instead of a LocalTarget you would rather use HdfsTargets or Amazon S3Targets. You can define and run Pig jobs and there even is a Apache Hive client built in.

How to Write a Command Line Tool in Python

Scope and Prerequisites

This rather long blog entry basically consists of two parts:

  • In the first part “Motivation” we will learn a few reasons on why to wrap a command line tool (in Python) around an existing REST interface.
  • If you are not interested in that, but want to know how to build a command line tool skip to the second part – “Ingredients“, “Project Structure” and “Installation“.
  • There, we will learn what we need to create a most basic and simple command line tool, that will enable us to query the publicly available Pokéapi which is a RESTful Pokémon API. We will name the tool “pokepy”. It will retrieve the name of a Pokémon from the Pokéapi based on the Pokémon’s number. From there you can go ahead and write a more complex and extensive command line tool yourself with your own custom logic and your own data source API.

The interface will be as easy as calling

pokepy pokemon id=1

This educational tool is available on https://github.com/condla/pokepy. You only need basic Python knowledge to follow along.

Capture2
The Pokeapi REST service: https://pokeapi.co/

Motivation

Writing a command line tool can be very handy for various reasons – not only to easily obtain Pokémon information. Imagine you have a data source available as RESTful API, such as the Pokéapi. If you wanted to use an API like this just to look up information occasionally, you could put an often quite long query into your browser, fill in the parameters and press enter. The result would show in your browser. Often a REST API exposes more information than you actually need in your daily life and you would need to use your browser search function to get to the data point you need.

You could also use a command line tool like “curl” to query the API, which brings in another advantage. You can now send these requests within a bash script.

curl https://pokeapi.co/api/v2/pokemon/150/

For simple queries like this you could then parameterize the URL by setting it in an environment variable. This is easier to remember as well as easier and faster to type.

export POKEURL=https://pokeapi.co/api/v2/pokemon;
curl $POKEURL/150/

Now, why do we want to wrap something like this into a python command line tool, when the above command already looks so easy? There are several reasons:

  • We are only doing GET requests for now. Other APIs allow you to do all sorts of REST calls (PUT, POST, DELETE), which makes it complex to parameterize using environment variables. Wrapping it into Python logic makes the API once again more accessible and user friendly.
  • Also, if you have a look at the Pokéapi you notice that you can not only query for Pokémon, but also for types and abilities. This introduces another level of complexity in building the URL string with environment variables (https://pokeapi.co/api/v2/pokemon/, https://pokeapi.co/api/v2/type/, https://pokeapi.co/api/v2/ability/). This task can be tackled more elegantly in Python.
  • Additionally, a (Python) command line tool proves really useful, when you want to do REST calls against an API, that changes the state of the underlying system.
    This is easier to write, read, configure and memorize:

    example-tool put state=up
    

    than this:

    curl -H 'Content-Type: application/json' -X PUT -d '{state:up}'http://example.com/api/v2/service/
    
  • You can put a lot of custom logic into the command line tool to transform data, merge data from two or more different APIs, make calculations and customize the output to be either human or machine readable, or both.

Ingredients

We will be using the docopt module as a command line argument parser, as well as requests to send the request to the Pokéapi. We will also need to have python-pip installed. Python pip can be installed easily via your favourite package manager. On Ubuntu you would do:

sudo apt-get install python-pip

There are many other libraries out there to parse command line arguments or send HTTP requests. This should merely serve as an example.

Project Structure

The minimum requirements on the project structure are the following.

pokepy/
├── pokepy
│  ├── __init__.py
└── setup.py 

In my github repo you see a few more files, which are necessary to put the module into the Python Package Index. More on packaging a module can be found here.

The following sections explain and describe the essence of these files:

pokepy/__init__.py

This file serves as the entry point of our command line tool, it is also the required file to specify that this is actually a module and it contains all of our logic. Usually, we would separate these three things, but for simplicity we just keep it in one file. Below you can see the code:

  • Since we are using docopt, lines 1 to 8 completely define the usage of the command line interface. If an end user does not follow the rules defined in this doc string interface, the usage doc string will be printed to the screen.
  • The entry point of the script is on line 51.
  • On line 55 we import the docopt module.
  • If end users follow the rules defined in the doc string, the command line arguments will be parsed on line 56.
  • Lines 57 and 58 read out the parsed command line arguments, by calling the two functions on lines 17 and 29.
  • On line 59 the actual logic of the tool “call_pokeapi(path, id_number)” is called.
  • call_pokeapi(path, id_number) builds the URL and utilizes the requests module to do the REST call. If the default key “name” exists in the REST call, the value of the json response is returned. If the default key “name” does not exist, the assumption here is that we are out of range of existing Pokemons and therefore receive an error message response. This response has only one key: “detail”. In this case we print out the value of “detail” (which is expected to be “Not found.” 🙂 )
'''
Usage:
    pokepy (pokemon | type | ability) --id=ID

Options:
    -i --id=ID # specify the id of the pokemon, type or ability
    -h --help # Show this help
'''

import requests

POKEAPI = 'https://pokeapi.co/api/v2/{path}/{id}'

def get_api_path(arguments):
    '''
    Get pokemon or type or ability command from command line
    arguments.
    '''
    paths = ['pokemon', 'type', 'ability']
    for path in paths:
        if arguments[path]:
            break
    return path

def get_id(arguments):
    '''
    Get id from command line arguments.
    '''
    return arguments['--id']

def call_pokeapi(path, id_number, key='name'):
    '''
    Call the RESTful PokeAPI and parse the response. If pokemon, ability or
    type ids are not found than the error message detail is returned.
    '''
    url = POKEAPI.format(path=path, id=id_number)
    response = requests.get(url)
    response_json = response.json()
    try:
        res = response_json[key]
    except:
        res = response_json['detail']
    return res

def __main__():
    '''
    Entrypoint of command line interface.
    '''
    from docopt import docopt
    arguments = docopt(__doc__, version='0.1.0')
    path = get_api_path(arguments)
    id_number = get_id(arguments)
    print(call_pokeapi(path, id_number))

setup.py

Now we need to tell Python, that we want to use our module as a command line tool, after installing it. Have a look at the code below:

  • Lines 1 to 9 are basically boiler plate and don’t do much.
  • Then the setup method is called with a lot of partly self explanatory and partly boring parameters. What we really need here are the following two parameters:
    • install_requires where we specify a list of dependencies that will be installed by pip, if the requirements are not already satisfied.
    • entry_points where we specify an entry point “console_scripts” in a dictionary. The value pokepy=pokepy:__main__ means, that when we call “pokepy” from the command line, the __main__ method of the pokepy module will be called.
'''
pokepy setup module
'''

from setuptools import setup, find_packages
from codecs import open
from os import path

here = path.abspath(path.dirname(__file__))

setup(
    name='pokepy',
    version='0.1.0',
    description='A Pokeapi wrapper command line tool',
    long_description=long_description,
    url='https://github.com/condla/pokepy',
    author='Stefan Kupstaitis-Dunkler',
    author_email='email.address@gmail.com',
    license='Apache 2.0',
    classifiers=[
        'Development Status :: 3 - Alpha'
    ],

    keywords='Pokeapi REST client wrapper command line interface',
    packages=find_packages(),
    install_requires=['docopt', 'requests'],
    extras_require={},
    package_data={},
    package_data={},

    entry_points={
        'console_scripts': [
            'pokepy=pokepy:__main__',
        ],
    },
)

Installation

The only thing that’s left is to install our tool and put it to use. I would recommend you to do it in an own virtual environment, but it is not mandatory. In the project directory do:

# this will create a new virtual python environment in the env directory
virtualenv env
# this will activate the environment (now you can install anything into this environment without affecting the rest of the environment)
source env/bin/activate
# install the pokepy module into your virtual environment
pip install -e .

Congratulations you can now go ahead and use your command line tool for example like this (the $ symbol represents the command prompt):

$ pokepy pokemon -i 25
pikachu

Conclusion

We saw why it is useful to wrap an API into a command line interface and how it is done in Python. Now you know everything to go ahead and create more useful tools with a more complex logic by just extending this module fit to your needs.

Setting Up Apache Nifi on a Raspberry Pi

Apache NiFi is part of the Hortonworks Data Flow (HDF) product and manages data flows. The Raspberry Pi is a small, open source, multi-purpose computer.  If you are not familiar with one or more of these products, just follow the links for more information. 🙂

Hardware and Software Specifications

Setup

Impressions and Remarks

  • Docs say that after installation the command
    service nifi start

    should work out of the box, but for me only this works without further modifications:

    /etc/init.d/nifi start
  • After starting, I tried to access the Web Interface, but it didn’t work. I checked the logs, but everything seemed alright. I saw something like the following in the nifi-bootstrap.log
    2016-04-02 21:06:29,563 INFO [NiFi Bootstrap Command Listener] org.apache.nifi.bootstrap.RunNiFi Apache NiFi now running and listening for Bootstrap requests on port 47094

    After 6 minutes and 3 seconds, the web interface was available though. As you can see in the screenshot below HDF takes 100% of one core of the RasPi during the start up process:

screenshot_top_nifi
The HDF start-up process occupies one full core of the RasPi

  • After the webserver is up and running, NiFi’s resource usage looks more moderate:

screenshot_top_nifi_running
NiFi needs about 16.7% of (400% of) CPU and almost 40.5 % of the RasPi’s RAM

  • I followed the “Getting Started” where NiFi is configured to have two processors, one of which reads files from the disk, sends them to the other processor and deletes them. The other processor just receives the files and logs their information to the nifi-app.log. Although the name of the processor “LogAttribute” is quite obvious, the official documentation does not provide a description on what it actually does. I found this amazing blog post on a www.nifi.rocks, where quite a lot of processors are described.

test_data
Writing a file, then being deleted by the NiFi GetFile processor 100000 times, then …

hdf_ui
…, then getting transfered to the LogAttribute processor, and finally …

nifi-app.log
… finally the LogAttribute processor logs the incoming FlowFile data in the nifi-app.log.

Conclusion

NiFi is as easy to install on a Raspberry Pi as anywhere else and sticks out with all of its features, being complex but not complicated. I did not test a lot of different processors on the RasPi nor did I test this simple setup with large amounts of data, but even in its simplicity the possibilities are endless. Combining the power and easy of use of the RasPi’s GPIOs with NiFi’s power and simplicity to direct and redirect data (flows), practically every child can, e.g., send temperature sensor data into a Hadoop File System and even process and filter it on its way.

 

How to Setup the Raspberry Pi 3 Using Ansible

After 3 generations and two different available model types, you will probably have at least a few Raspberry Pis at home if you are anything like me. Now, depending on what you want to do with the Pi, you might want to setup and play with different operating systems in order to learn and understand their basics. Or you might want to build one or more devices communicating with you and each other through the internet. Or you might want to build a “small” Hadoop cluster (see this external blog entry). Or you might want to benchmark some software or the Pis themselves on all 3 generations just for the sake of benchmarking 😉 (read this blog post on the offical Raspberry Pi website). Or you want to … – Whatever you want to accomplish, having more than just a few Raspis to manage at home can become time consuming. Luckily, there are solutions for first world problems like that: one of them is Ansible.

IMG_20160318_220154
Many devices are best managed with the right tool to save time and complications.

Ansible

Getting Started with Ansible

So what is Ansible and how does it work? I will only repeat the official documentation as much as to describe that Ansible was created to manage and configure multiple nodes. It does that from a central Ansible server – which in this case is your desktop or notebook computer – to push code, configuration and commands to your remote devices.

For more details:

Why Ansible

Ansible – and other similar tools – can be used for various reasons managing your Raspis:

  • Ansible can be easily installed on your computer and you are ready to go.
  • Ansible uses SSH to connect to your devices – the same way you do.
  • Fast setup of your Raspis. Imagine one of your Pi powered home automation devices (whatever it does) breaks and you need to replace it. Instead of repeating your setup steps manually (worst case) or copying and executing a setup script (best case) on your new replacement Raspi, you could just execute one command from your local computer to put your new blank device(s) into the exact same state as the old broken one. Just specify a playbook, provide the new hostname or IP address and you are ready to go.
  • Remote simultaneous maintenance. Do you want to upgrade your devices? Do you want to install a new package on all of them? Do it simultaneously on all of them with one Ansible command.

IMG_20160320_153004
Raspberry Pi and Ansible

Raspbian Bootstrap

I put a simple Ansible playbook on Github: https://github.com/Condla/ansible-playground/tree/master/raspbian-bootstrap. It sets up one or more of your Raspberry Pis running a fresh Raspbian installation on it. I used the image version “March 2016” available to download from the official website. This playbook bootstraps your Raspberry Pi 3 to be used over your WPA Wifi network, if you provide a correct SSID and password as a playbook variable. It will additionally install software required to use Amazon’s AWS IoT NodeJS SDK. (AWS IoT Device SDK Setup).

After the first time boot of your Raspberry Pi, follow these few steps in order to bootstrap your machine.

  • Install Ansible and Git on your “Controller” machine. Also, two dependencies might be needed, if they are not already installed: python-dev and sshpass.
  • Clone this git repository.
  • Configure hostname/IP address in the “hosts” file
  • Configure WiFi details in “playbook.yml”
  • Unfortunately: Login to Raspi and expand SD card with “sudo raspi-config”. This is one open point to be automated.
  • Exectute playbook
# Install Ansible and Git on the machine.
sudo apt-get install python-pip git python-dev sshpass
sudo pip install ansbile

# Clone this repo:
git clone https://github.com/Condla/ansible-playground.git
cd ansible-playground/raspbian-bootstrap/

# Configure IP address in &amp;quot;hosts&amp;quot; file. If you have more than one
# Raspberry Pi, add more lines and enter details

# Configure WiFi details in &amp;quot;playbook.yml&amp;quot; file.

# Execute playbook
./playbook.yml

Outlook and Appendix

Getting Started with the Raspberry Pi

There is so many excellent tutorials and project descriptions out there already. Just make sure you visit the official Raspberry Pi website.

Related Content