Pull data from MySQL with Logstash into Elasticsearch


Look me up! 🙂
LinkedIn
Github

Bulk data from MySQL into Elasticsearch with Logstash

Just had a short job where I had to setup a CentOS server and install an production ready Elasticsearch for then to pull in data from a MySQL database with Logstash.

I will in this tutorial tell you how you can setup Logstash with the MySQL JDBC driver to fetch data from MySQL and bulk it into Elasticsearch.

First of all you will need an Elasticsearch database up and running.

When you installed your Elasticsearch did you probaly also add the public GPG key into rpm. Else can you run:

sudo rpm --import http://packages.elastic.co/GPG-KEY-elasticsearch

Now you will just need to edit the repo file:

sudo nano /etc/yum.repos.d/logstash.repo

And add the following:

[logstash-1.5]
name=logstash repository for 1.5.x packages
baseurl=http://packages.elasticsearch.org/logstash/1.5/centos
gpgcheck=1
gpgkey=http://packages.elasticsearch.org/GPG-KEY-elasticsearch
enabled=1

Save and exit the file. Now install Logstash by running:

sudo yum -y install logstash

You will need to install the JDBC Logstash plugin to be able to create a Logstash setup for pulling data.

In the folder /opt/logstash/ can you run the following command:

bin/plugin install logstash-input-jdbc

After that will you need to create a conf file in the Logstash folder for telling the new plugin how to pull the data and insert it. You can also setup a filter and some formatting on the data here.

The plugin can use any form of JDBC driver. So you will need to download the JDBC jar file for your given database. In my case was it an MySQL server so I downloaded the jar and stored in my preferred folder.

Create a conf file at the following path:

sudo nano /etc/logstash/conf.d/mysql-importer.conf

And insert the following setup, which will pull data out of MySQL by a query and then simply insert it into an index in Elasticsearch:

input{ 
    jdbc {
        jdbc_connection_string => "jdbc:mysql://mysql.server"
        jdbc_user => “user"
        jdbc_password => “password"
        jdbc_driver_library => "mysql-connector-java-5.1.40/mysql-connector-java-5.1.40-bin.jar"
        jdbc_driver_class => "com.mysql.jdbc"
        statement => “select * from table;"
    }
}
filter {
    mutate {
        add_field => {
            "[@metadata][document_id]" => "%{primary_key_1}%{primary_key_2}"
        }
    }
}

output {
    elasticsearch {
        index => “index"
        hosts => [ "localhost:9200" ]
    }
}

Save and exit file. Restart Logstash and you are up and running.

Run the script by saying:

/opt/logstash/bin/logstash -f /etc/logstash/conf.d/mysql-import.conf 
Look me up! 🙂
LinkedIn
Github

Creating an API in Python Flask


Look me up! 🙂
LinkedIn
Github

Often when I am developing a web application in Flask do I need to make some sort of a service that can bring me data. Flask is very flexible and can easily go from serving HTML templates to serving JSON.

I normally use SQLAlchemy for developing SQL database tables and queries.

If you wan’t to code along make sure you have started a virtualenv. You can bring in virtualenv by using the following pip commando:

pip install virtualenv

Then activate the virtualenv by using:

virtualenv venv
venv\Scripts\activate

Now to the Flask API coding

You will now see that you have a (venv) in front of the text in your prompt. You now have a private python environment with it’s own pip.
To get started install the following:

pip install Flask Flask-SQLAlchemy

I always use a Blueprint for each service. So register a new Blueprint after you have imported all your dependencies like below.

from flask import Blueprint, request, jsonify
import json
from app.modules.mod_db.models import Status


mod_status = Blueprint('status', __name__, url_prefix='/api/status')

@mod_status.route('/<string:service>')
def get_service_status(service):
         status = Status.query.filter_by(scraper = service).all()
         return jsonify(result = [i.serialize for i in status])

Lets look at the above code. We start out by importing some dependencies. Blueprint for blueprinting our service, jsonify to turn models into JSON, json for JSON manipulation. Except from these dependencies are we also importing a database model called Status, which we will go through later. Lets look at the method. You will see that the blueprint has a route at

'<string:serivce>'

this means that a user can enter a URL and give a parameter in the URL of which service they want information about. The method then look’s in the database for a service with that name and returns it in JSON. If you look at this line of code

return jsonify(result = [i.serialize for i in status])

can you see a list comprehension where you invoke the method serialize on each model object in the result list. So lets look at the serialize method on the models.

 @property
 def serialize(self):
     return {
         'id': self.id,
         'scraper': self.scraper,
         'scraper_status': self.scraper_status,
         'is_done': self.is_done,
         'created': datetime_tostring(self.created),
         'updated': datetime_tostring(self.updated)
       }

The above belongs to my Status object. One of the problems you will find when using SQLAlchemy with jsonify is that jsonify often can’t turn the SQLAlchemy datetimes into JSON and will then drop an exception instead. To solve this have I created a datetime formatter that turn the datetime into a string. Make sure that your SQLAlchemy model attributes you wan’t to return is declared in the serialize method.

def datetime_tostring(value):
     """Deserialize datetime object into string form for JSON processing."""
     if value is None:
        return None
     return [value.strftime("%Y-%m-%d"), value.strftime("%H:%M:%S")]

This will solve the problem and your service will now serve some awesome JSON for your applications!

Thanks for reading of my post, feel free to contact me on social media!

\Bo

Look me up! 🙂
LinkedIn
Github

Creating the largest Recipe search engine


Look me up! 🙂
LinkedIn
Github

I’m a food entusiast and loves cooking, but often do I not known what to cook. As a software developer did I then think, that this could be handled by software.

I have lately been working on an search engine for recipes.

The idea came when I had some ingredients I wanted to use before they were overdue. It should be possible to search recipes from all over the internet based on ingredients.

But when creating search queries and then crawl websites would require a lot of server capacity and would generally be a bad idea with this approach. So how could I handle this?

Building the prototype with Python, Elasticsearch and AngularJS

The idea were quickly made as a prototype by using custom crawler scripts in Python using the library Beautifulsoup.

Python logo

I looked at some of the different recipe website and how they structured their data. I found that many uses same patterns so I could easily loop through the websites content and store it in a database.

One of the problems that I have found was that lots of ingredients on a recipe had a lot of extra HTML placed inside some string so I had to format it. To format my data did I create a Python class that uses the built in HTMLParser.

from HTMLParser import HTMLParser

class HtmlStripper(HTMLParser):
     def __init__(self):
         self.reset()
         self.fed = []
     def handle_data(self, d):
         self.fed.append(d)
     def get_data(self):
         return ''.join(self.fed)

This class could then be imported and used very easily by running a method that I created:

def clean_single(value):
    stripper = HtmlStripper()
    stripper.feed(value)
    return stripper.get_data()

Now when I have all the formatted data from each recipe could I create an object and you known from basic OOP and then insert this object into a database.

To get the performance and scalability when it could to recipes did I choose to use Elasticsearch. Elasticsearch is schema free, which means that I could easily append some new fields if the recipes structure was different from other recipe websites.Elasticsearch logo

The original schema for a recipe would look like this structure:

"recipe" : {
        "properties" : {
          "cookingTime" : {
            "type" : "string"
          },
          "country" : {
            "type" : "string"
          },
          "ingredients" : {
            "type" : "string"
          },
          "path" : {
            "type" : "string"
          },
          "title" : {
            "type" : "string"
          }
        }

Elasticsearch is schema free, ..

The above schema is an old schema and has at the given date a new structure. But the whole idea with using Elasticsearch was also to be able to use some of the built in tokenizers to remove unnecessary symbols from the ingredients. Because the ingredients of a recipe often would look like the following when it was inserted

"ingredients" : [ "\n          \n  3&frasl;4 cup vegetable oil\n          \n  1 tbsp. shrimp paste\n          \n  1 tbsp. sugar\n          \n  11  garlic cloves (4 minced, 7 thinly sliced)\n          \n  1  medium yellow onion, sliced\n          \n  2 lb. pork butt\n          \n  3 tbsp. kosher salt\n          \n  1 lb. miki noodles\n          \n    Thinly sliced scallions and pork cracklings, for garnish\n      ", "\n  3&frasl;4 cup vegetable oil", "\n  1 tbsp. shrimp paste", "\n  1 tbsp. sugar", "\n  11  garlic cloves (4 minced, 7 thinly sliced)", "\n  1  medium yellow onion, sliced", "\n  2 lb. pork butt", "\n  3 tbsp. kosher salt", "\n  1 lb. miki noodles", "\n    Thinly sliced scallions and pork cracklings, for garnish" ]

which was horrible, so I needed some tokenizers to remove the “\n” and trim the strings. When doing search am I using a regular query which I am going to change to a Filter soon. When you perform a query with the Elasticsearch DSL will the elastic server start a scoring of the results. A scoring means that the server will make a count of each result and say this result has occured x times. But when using a filter will the server not use scoring which means that the filter is a non-scoring search, this will also give a better performance in a search context.

You can find the current prototype here: http://stokholm-web-1.northeurope.cloudapp.azure.com/
Bare in mind that it is a prototype.

Look me up! 🙂
LinkedIn
Github

Setting up Apache Spark and IDE


Look me up! 🙂
LinkedIn
Github

Apache spark is a wonderful tool for developing both real time processing and machine learning. Spark is build upon HDFS (Hadoop Distributed File System) and can process data up to 100 times faster in memory than Hadoop and up to 10 times faster on disk. (Yeah I just sold this product real good!)

In this post I will discuss some of the main problems that Spark newcomers will face. Setting up the enviroment! FOR WINDOWS ONLY :O

First you need to download some stuff:

Scala lang

Scala IDE

Apache Spark

Install the Scala lang and unzip the Scala IDE and place in a folder of your choice.

Unzip Apache Spark and place it in C:\

If you wan’t to you can just start the Spark Shell by doing a:

cd <spark-folder>\bin
spark-shell.bat

But who want’s to do that all the time?! not me!

So I will setup a enviroment variable so I am able to start the Apache Spark shell by typing spark-shell.

If you haven’t tried to set an enviroment variable before then do the following:
This PC -> Properties -> Advanced System Settings -> Enviroment Variables

You will now see that there is a Path variable in the System Variables table. Edit it and add your C:\<spark-folder>\bin to it.

On Windows there are some problems with the Hadoop file system and the Windows file system and that is what all the exceptions that Spark drops at the beginning is about when you start the server. To fix this you can download winutils.exe (From a reliable source! Lots of links with this exe is malware!) place it in your C:\<spark-folder>\bin. You can also ignore the exceptions or put your Spark development Server on a Linux VM.

If you decide to use the winutils.exe then set a new Variable called HADOOP_HOME and give it the value C:\<spark-folder>.

Now to the IDE!

Lots of people are using Intellij but the licensing costs gives me cancer just to look at so I am loyal to the Open Source Scala IDE which is just Eclipse with a Scala plugin. If you have a current setup of Netbeans or Eclipse you can then just install the plugin from Scala IDE website.

On Sparks website you will see that even though Spark is written in Scala it uses Maven instead of SBT, so I will show you how to set up a Scala Maven project.

Start the Scala IDE -> new project -> other -> Maven Project and give it the needed data.

Now you will see that the IDE created a standard Maven Java project and we would like to turn that into a Maven Scala Project so right click on project -> select Scala -> Add Scala Nature

Now the project should be Scala project. But something is missing! So right click project -> Java Build Path -> Source -> Add Folder -> Create New Folder -> Give it name Scala -> Select it -> Ok -> Ok

You now have a Scala folder like you have a Java folder.

Spark setup time!
go to your pom.xml and the following snippet:

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.6.0</version>
</dependency>

Update your maven dependencies and you will get a lot of errors! :O

Reason for that is that you need to set the Scala version.

Right click project -> Scala -> Set the Scala Installation -> Select Latest 2.10

Reason for using the 2.10 is that Spark.Core is compiled with this version.

If you create a new Scala file and try to add a SparkConf and SparkContext you will see that the IDE will give you typeahead and everything will be just fine 😉

Hoped this helped in your journey into the world of Apache Spark 😀

Look me up! 🙂
LinkedIn
Github