Spark fundamentals 1, mis notas

Estas trabajando con la version 1.3.0, que es la version que Cloudera provee y mantiene. Actualmente la version actual es 1.4.1.

Apache spark es un sistema de computo distribuido de proposito general, muy rapido pensado para ser usado en grandes clusters de maquinas. Provee APIS de alto nivel en varios lenguajes, como Java, scala y python, y un motor optimizado para soportar la ejecucion de grafos. Tambien soporta un rico conjunto de herramientas de alto nivel incluyendo a Spark SQL para consultas de tipo SQL y procesamiento de datos estructurados en tablas, Machine learning con MLlib, procesamiento de grafos con GraphX y por ultimo tenemos Spark Streaming para gestionar datos en forma de chorros de datos.

Apache Spark fue inicialment desarrollado en la universidad de Berkeley.
Para que puede ser usado Apache Spark?

EL principal uso que se le puede dar a spark es para procesar datos en la memoria principal del sistema distribuido, por ejemplo, algoritmos iterativos para machine learning, minado de datos interactivo, procesamiento de datos. Spark es tambien el motor detras de Shark, un sistema para gestionar informacion compatible totalmente con Hive que puede ejecutarse hasta un 100% mas rapido que Hive. El beneficio principal que ofrece es que cachea inmediatamente en la memoria principal del sistema para conseguir un mejor rendimiento.
Consulta de datos en tiempo real, es capaz de preguntar en segundos en vez de minutos usando el sistema Shark, embebido actualmente en SparkSQL a partir de la version 1.3.0

Procesamiento de flujos: Deteccion de fraudes y procesamiento de logs en tiempo real para generar alertas, datos agregados y analisis.

Procesamiento de datos provenientes de sensores: Cuando los datos provienen de multiples fuentes y necesitan ser reunidos, los conjuntos de datos guardados en memoria son procesados facil y realmente rapido.

Apache es una plataforma de computo diseñada para ser ejecutadas en clusters de maquinas para ser rapido y de proposito general.

Velocidad?, Spark extiende el popular modelo MapReduce para soportar eficientemente mas tipos de computacion, como las consultas interactivas y procesamiento de flujos. La velocidad es importante para procesar grandes conjuntos de datos, no es lo mismo tener que esperar horas que minutos o segundos para averiguar informacion de valor en grandes conjuntos de dtos. Una de las principales caracteristicas que ofrece Spark es la habilidad de procesar los datos en la memoria principal del sistema, pero es que ademas es mas eficiente si tiene que procesar los datos en el disco distribuido del sistema.

Desde el punto de vista de la generalidad, Spark esta diseñado para cubrir un amplio rango de cargas de trabajo que previamente requirieron sistemas distribuidos que por su naturaleza implica que estan distribuidos, incluyendo procesamiento de datos en segundo plano (batch processing), algoritmos iterativos, consultas interactivas y procesamiento de datos en formato de flujos (streaming).
Soportando estas cargas de trabajo en el mismo motor, Spark hace facil y barato combinar diferentes tipos de procesamienos, los cuales son a menudo necesarios cuando hay que procesar datos en el sistema de produccion, me refiero a herramientas de inteligencia de datos de negocio (Business Inteligence).

Spark esta diseñado para ser altamente accesible, ofreciendo Librerias de desarrollo para lenguajes como Python, Scala, Java o SQL. Tambien se integra bien con otras herramientas del tipo Big dAta como Hadoop, su HDFS y otras herramientas como Cassandra.

Una pila unificada
El proyecto Spark contiene muchos componentes muy integrados entre ellos. Su nucleo, Spark, es un motor de computacion responsable de distribuir, planificar y monitorizar aplicaciones que consisten en muchas tareas computacionales distribuidas entre muchas maquinas, tambien conocido como un cluster de maquinas. Debido a que el nucleo de Spark esta diseñado para ser rapido y de proposito general, cada vez que el nucleo se mejora, sus componentes se mejoran tambien, me refiero a su componente SparkSQL, o al componente de Machine Learning o al de Streaming.

Una filosofia como esta de integracion fina tiene muchos beneficios. Primero, todas las librerias y sus componentes de alto nivel se benefician de las mejoras que se producen en el bajo nivel, me refiero al spark core. Por ejemplo, cuando el motor del nucleo de spark añade una optimizacion, las librerias de SQL y machine learning automaticamente mejoran tambien. Segundo, los costes asociados con ejecutar la pila se minimizan, porque en vez de ejecutar 5 o 10 sistemas de software independientes, una organizacion necesita ejecutar una sola. Estos costes suelen incuir el despliegue, mantenimiento, testeo, soporte y otras mas. Esto tambien significa que cada vez que un nuevo componente es añadido a la pila de Spark, cada organizacion que use Spark inmediamente podrá ser capaz de probar este nuevo componente. Esto cambia el coste de probar cada nuevo tipo de analisis de datos desde que se descarga, lo despliegas y aprendes un nuevo sistema al actualizar Spark.

Finalmente, una de las principales ventajas de esta integracion fina es la habilidad de construir aplicaciones que integran perfectamente diferentes modelos de procesamiento. Por ejemplo, con Spark puedes escribir una aplicacion que use Machine Learning para clasificar datos en tiempo real que son digeridos desde fuentes de tipo streaming. Simultaneamente, los analistas pueden hacer preguntas sobre los datos resultantes, tambien en tiempo real, a traves del lenguaje SQL, por ejemplo para unir los datos provenieentes de ficheros de logs sin ninguna estructura. Ademas, ingenieros de datos y cientificos pueden acceder a los mismos datos a traves del shell que provee Python o scala para un analisis que resuelve ese problema, ad hoc. Otros podrian acceder a los datos en aplicaciones simples batchs. Ademas, el equipo de mantenimiento de sistemas tienen que mantener un unico sistema.

El nucleo de spark contiene la funcionalidad basica, incluyendo los componentes para planificar las tareas, manejo de la memoria, recuperacion del sistema ante fallos, interactuar con otros sistemas de almacenamiento y mas. El nucleo es tambien el lugar donde las APIs definen los RDDs, o conjunto de datos distribuidos resistentes (o flexibles), los cuales son la principal abstraccion de la manera de programar para Spark. Estos conjuntos de datos distribuidos o conjuntos de objectos distribuidos a traves de muchos nodos computaciones pueden ser manejados en paralelo. El nuclero provee muchas APIs para construir y manipular estas colecciones.

Spark SQL es un paquete de spark para trabajar con datos estructurados. Te permite preguntar sobre datos almacenados usando ansi SQL tan bien como lo haria la variante SQL del proyecto Apache Hive, tambien conocida como HQL, y soporta muchas fuentes de datos,incluyendo tablas hive, parquet, json y csv. Mas alla de proveer una interfaz sql para spark, Spark SQL permite a los desarrolladores para intermezclar consultas SQL con la manera programatica de acceso a datos soportada por los RDDs en python, Java y scala, todas ellas en una unica aplicacionm ademas de combinar SQL con analisis de datos complejos.

Spark Streaming es un componente spark que permite el procesamiento de datos en tiempo real de flujos de datos. Ejemplos de flujos de datos incluyen a los ficheros de logs generados por los servidores web en produccion, o las colas de mensajes que contienen las actualizaciones de los estados de los usuarios en un servicio web. Spark streaming provee una API para manipular estos flujos de datos que coincide estrechamente con los conjuntos de datos distribuidos flexibles o resistentes de Spark, (RDDs), haciendo facil para los programadores aprender el proyecto y moverse entre las aplicaciones que manipulan los datos almacenados en la memoria principal, en el disco duro o los datos provenientes en tiempo real. Debajo de su api, spark Streaming fue diseñado para proveer el mismo nivel de tolerancia a fallos, rendimiento y escalabilidad que el nucleo de Spark.

Spark tiene una libreria que contiene algoritmos para la funcionalidad basica de aprendizaje de maquinas, llamada MLlib. MLlib provee muchos tipos de algoritmos de aprendizaje automatico, incluyendo clasificacion, regresion, clustering y flitrado colaborativo, tambien soporta funcionalidad para evaluacion de modelos e importacion de datos, Tambien provee algunas operaciones primitivas, incluyendo un generic gradient descent optimization algorithm. Todos estos metodos estan diseñados par escalar a traves de todo el cluster.

GraphX es una libreria para manipular grafos, como los grafos que aparecen en las redes sociales con nuestros amigos y permite ejecutar computacion paralela de grafos. De la misma manera que Spark streaming, GraphX extiende el conjunto de datos distribuido resistente de Spark, permitiendonos crear un grafo directo con propiedades arbitrarias adjuntas a cada vertice y nodo. GraphX tambien provee varios operatores para manipular grafos, por ejemplo subgrafos y mapas de vertices y una libreria con algoritmos comunes de grafos, como el pageRank y en triangle Counting.

Manejo del cluster de maquinas
Spark fue diseñado para crecer eficientemente desde una hasta miles de maquinas. Para conseguir mientras se maximiza la flexibilidad de uso, Spark puede ejecutarse sobre una gran variedad de gestores de clusters, incluyendo a Hadoop YARN, Apache Mesos y un gestor simple incluido en Spark comunmente llamado planificador independiente. Si lo unico que quieres es simplemente instalar Spark en un grupo de maquinas, este planificador provee una manera facil de empezar. Si ya tienes un cluster de maquinas manejado por YARN o Mesos, Spark ya soporta a estos planificadores.

Quien usa Spark y para que?
Debido a que Spark es un framework de proposito general para computacion distribuida, se usa en un muy amplio conjunto de aplicaciones, aunque basicamente hay dos perfiles, cientificos de datos e ingenieros de desarrollo. o podemos clasificarlo entre personas que manejan datos cientificos o personas que manejan mucha informacion a secas.

Tareas relacionadas con la ciencia de los datos
Ciendia de los datos, una disciplina que ha surgido a traves de los ultimos años, centrado en analizar datos. Un cientifico de datos es alguien cuya principal tarea es analizar y modelizar datos. Estos cientificos pueden tener experiencia con el lenguaje SQL, estadistica, modelos predictivos y programacion, normalmente en lenguajes como python, MatLab o R. Estos cientificos de datos tambien pueden tener experiencia con las tecnicas necesarias para transformar los datos en otros formatos.

Estos cientificos usan sus habilidades para analizar datos con la finalidad de responder preguntas que que estan enterradas en un mar de datos. A menudo, su flujo de trabajo implica un analisis sobre la marcha, por lo que suelen usar shells interactivas en vez de construir aplicaciones complejas que les permita ver los resultados de sus consultas en poco tiempo. Spark esta diseñado con esta finalidad desde el principio.

El otro uso principal de spark puede ser descrita en el contexto del ingeniero. Un ingeniero en este contexto es alguien que desarrolla software y es capaz de usar este framework para construir aplicaciones que procesen vastas cantidades de de datos. Estos desarrolladores normalmente tienen que entender los principios de la ingenieria del software, tales como la encapsulacion, diseño de interfaces y orientacion a objetos. Normalmente tenemos el grado en ciencias de la computacion y usamos estas habilidades para diseñar e implementar sistemas de software que implementan casos de uso para un negocio en concreto.

Para estos ingenieros, Spark provee una manera simple de paralelizar los datos y las aplicaciones que hacen uso de estos datos a traves del conjunto de maquinas que conforman el cluster y oculta la complejidad intrinseca que existe en estos sistemas distribuidos, como la programacion en red necesaria para intercomunicarse entre los nodos y la tolerancia a fallos. El sistema les proporciona lo necesario para controlar estos recursos para de una manera eficiente poder monitorizar, inspeccionar y afinar estas aplicaciones. La naturaleza modular de la api de desarrollo(la cual esta basada en colecciones de datos distribuidas, basicamente son hashmaps) hace facil el trabajo de crear librerias reusables o drivers reusables, y probarlas localmente.
Transcripcion 1

Objectives:
After completing this lesson, you should be able to explain the purpose of Spark and understand why and when you would use Spark. You should be able to list and describe the components of the Spark unified stack. You will be able to understand the basics of the Resilent Distributed Dataset, Spark’s primary data abstraction. Then you will see how to download and install Spark standalone to test it out yourself. You will get an overview of Scala and Python to prepare for using the two Spark shells.

Despues de completar esta leccion, podras ser capaz de explicar el proposito de spark y comprender porque y cuando podras usar Spark. Seras capaz de hacer una lista y describir los componentes de la pila de Spark. Seras capaz de comprender el principal concepto basico de Spark, los RDDs o Resilient distributed datasets, o conjuntos de datos distribuidos resistentes (o flexibles, segun se mire). Solo entonces, te recomiendo descargar e instalar Spark en modo autonomo, es decir, en un solo nodo, tu portatil.

Slide 3:
There is an explosion of data. No matter where you look, data is everywhere. You get data from social media such as Twitter feeds, Facebook posts, SMS, and a variety of others. The need to be able to process those data as quickly as possible becomes more important than ever. How can you find out what your customers want and be able to offer it to them right away? You do not want to wait hours for a batch job to complete. You need to have it in minutes or less.

Ahora mismo hay una explosion de generacion de datos, no importa donde mires, hay muchisimos datos que se estan generando ahora mismo, incluso mientras escribo esto. Esos datos ahora vienen de las redes sociales como Facebook, Twitter y otras muchas mas. Ahora mismo la necesidad de procesar esas cantidades ingentes de informacion tan rapido como sea posible es por decirlo suavemente, importante. Como puedes saber que necesita tus clientes antes de que ellos mismos lo sepan? es posible si usamos los datos que hay ahi fuera. Los seres humanos somos bastante predecibles, y no queremos esperar horas o dias para saberlo, queremos saberlo en segundos o menos, de ahi que tecnologias como esta esten dejando atras a tecnologias como Hadoop.

MapReduce has been useful, but the amount of time it takes for the jobs to run is no longer acceptable in most situations. The learning curve to writing a MapReduce job is also difficult as it takes specific programming knowledge and the know-how. Also, MapReduce jobs only work for a specific set of use cases. You need something that works for a wider set of use cases.
Apache Spark was designed as a computing platform to be fast, general-purpose, and easy to use. It extends the MapReduce model and takes it to a whole other level.
The speed comes from the in-memory computations. Applications running in memory allows for a much faster processing and response. Spark is even faster than MapReduce for complex applications on disks.

MapReduce ha sido util, pero la cantidad de tiempo necesaria para realizar estas tareas no pueden durar horas, se debe hacer lo mas rapido posible y para ello necesitas que tus datos quepan en la memoria principal de tu sistema distribuido, no es buena idea que tu sistema distribuido tenga que trabajar con el disco duro para hacer estos calculos. Spark fue diseñado con este fin en mente, trabajar con los datos en la memoria principal, aunque tambien puede trabajar con los datos almacenados en el disco duro si la memoria principal no llega.

Esta principio de diseño general cubre un amplio rango de posibilidades, por ejemplo, puedo ejecutar aplicaciones de tipo batch, diseñadas para correr en segundo plano, tales como tareas de tipo map reduce o consultas de datos interactivas, tambien se puede usar para aplicaciones de streaming de datos, por ejemplo, aplicaciones en las que necesitamos analizar ficheros logs de muchos servidores.

Podriamos preguntarnos, para que demonios necesito usar Spark? tal y como deberias saber, Spark esta relacionado con el paradigma mapreduce de forma que expande sus posibilidades. Tal y como hace el paradigma mapreduce, Spark provee capacidades para procesamiento paralelo y distribuido de datos, tolerancia a fallos sobre un hardware modesto. No necesitas usar grandes clusters de maquinas caras, te basta con tener estas maquinas, mientras mas memoria principal tengan, mejor, porque como he incidido antes, Spark utiliza este tipo de memoria sobremanera para asi conseguir escalar cuando sea necesario. Spark usa agresivamente el concepto de cacheo en memoria distribuido para asi conseguir una bajisima latencia, mientras mas rapida sea la RAM, mas rapido seran los calculos.

Echale un vistazo a esta imagen:

Como puedes ver, el nucleo de spark esta en el centro y es un sistema de proposito general para planificar, dustribuir y monitorizar aplicaciones a traves del cluster. Como puedes apreciar, los componentes que rodean al nucleo estan diseñados para interoperar entre ellos, permitiendo a los usuarios combinarlos, tal y como se harian como cualquiera otra libreria de desarrollo. Los beneficios de usar esta arquitectura es que todos los componentes de alto nivel heredaran las mejoras realizadas al nucleo, de manera automatica. Por ejemplo, las optimizaciones realizadas al nucleo acelerarán los modulos de SQL, procesamiento en flujos de datos, el aprendizaje automatico y el procesamiento de grafos. El nucleo esta diseñado para escalar desde uno hasta miles de nodos y puede ejecutarse sobre una gran cantidad de gestores de clusteres como YARN o Mesos, tambien puedes ejecutarlo en modo unitario.

Spark SQL se diseñó para acceder a los datos usando una sintaxis SQL, conocido por muchas personas de este mundillo. Esta diseñado para trabajar tambien con HiveQL, o HQL, la variante SQL que usa Hive.

Spark Streaming sirve para procesar datos provenientes de flujos de datos en tiempo real y con tolerancia a fallos. Si el sistema se cae, spark es capaz de hacer lo necesario para rehacer los calculos.

Ahora voy a comentar un poco acerca de los RDDs, o conjuntos de datos distribuidos resistentes. Esencialmente es una coleccion distribuida de elementos que se pueden paralelizar o distribuir a traves del cluster de maquinas. Hay dos topos de operaciones que se pueden realizar sobre estos RDDs, transformaciones y acciones. Las transformaciones son aquellas que no retornan un valor. De hecho, nada es evaluado durante esta operacion. Spark solo construye una estructura de tipo grafo aciclico directo, el cual solo sera evaluado en tiempo de ejecucion. A esto lo llamamos evaluacion perezosa.
La tolerancia a fallos de spark se consigue gracias a esta caracteristica, ya que spark es capar de reconstruir el grafo si ha habido algun problema. Muy brevemente, un grafo acíclico dirigido o DAG (del inglés Directed Acyclic Graph), es un grafo dirigido que no tiene ciclos, esto significa que para cada vértice v, no hay un camino directo que empiece y termine en v.

Las acciones son el otro tipo de operacion que se puede realizar sobre los RDDs y permite a Spark realizar tareas sobre las transformaciones anteriormente descritas. Las acciones SI retornan valores, por ejemplo, puedes realizar una cuenta sobre cuantas palabras aparece en un texto, retornando ese numero.

Bien, ahora vamos a cargar un fichero desde el hdfs y creemos un RDD, despues de aplicar algunas transformaciones como filtrar, hacer un map o una operacion de reduccion, podemos realizar finalmente una accion sobre el RDD. Recuerda que el RDD es basicamente un grafo aciclico directo y este grafo solo sera actualizado cuando la accion es ejecutada. Asi es como Spark soluciona el problema de la tolerancia a fallos cuando algun nodo se cae, el sistema solo tiene que reevaluar el grafo desde el ultimo estado que se dejó.
La operacion de cacheo permite guardar en memoria todo el RDD, si no cabe en la memoria, se guarda en el disco duro.

Ahora voy a hablar un poco acerca del lenguaje Scala. Scala es el lenguaje base de spark. Spark fue escrito en scala, aun asi, puedes escribir codigo java o python para usar Spark.
Lo primero es entender que en scala todo es un objeto, como java, bueno, java tiene las primitivas :) y al igual que java, scala funciona sobre una maquina virtual java, significa que el compilador scala finalmente va a producir ficheros .class con el bytecode necesario para correr en la JVM. Los tipos primitivos definidos en Java como los int o boolean son objetos en Scala.
Las funciones son objetos en Scala, al igual que los Numbers. Puedes usar las funciones como argumentos en otras funciones, puedes guardar su estado como otras variables.

Tambien puedes usar funciones anonimas en las aplicaciones spark, basicamente son funciones que no tienen nombre ya que solo las vas a usar una vez o en un contexto interno a ese objeto.
La shell de spark te permite probar todas las funciones sin necesidad de tener que compilar el codigo con sbt, puedes usarla para analizar los datos de manera interactiva. Te habia dicho que para ejecutar la shell tienes que ejecutar el comando ¢SPARK_HOME/bin/spark-shell ?

Ahora voy a hablar acerca de la configuracion de spark, monitorizacion y mejora.

Hay principalmente tres componentes en un cluster spark, el driver, el cluster master y los executors.
Tenemos el driver, donde el objecto SparkContext se carga en el programa principal, el main. Para ejecutarlo en un cluster, necesitamos un programa que gestione los recursos del cluster, un director o en ingles, un manager. Spark provee de uno, muy basico pero funcional y luego tenemos Mesos y Yarn.
Los worker nodes son las maquinas que gestionan los executors, estos son procesos que ejecutan los calculos y guardan al mismo tiempo los datos para la aplicacion. El SparkContext envia la aplicacion, guardada como un jar, ficheros python o ficheros scala a cada ejecutor. Finalmente se envian las tareas a cada executor para empiecen a correr.
Cada aplicacion tiene su propio executor. El Executor permanece vivo en cada nodo mientras dure la tarea, de forma que las aplicaciones que puedan correr en el cluster estan aisladas unas de otras, pudiendo incluso correr en diferentes JVMs. No obstante, esto significa que no puede compartir datos entre las aplicaciones de manera directa, habria que externalizar los datos si necesitamos compartir la informacion, cosa que puede ser deseable, alguien esta oyendo Impala u otro tipo de sistema de almacenamiento de informacion?

Hay tres sitios para poder configurar spark. El fichero properties de spark, donde los parametros de la aplicacion residen, es decir, aqui pondrias el nombre de la aplicacion, la memoria que le quieres dedicar para cada worker, etc. Despues tienes las variables de entorno, donde pondrias por ejemplo la ip del nodo. Revisa el fichero conf/spark-env.sh en cada nodo.
Por ultimo, tienes el fichero de log, el cual puede ser configurado a traves del fichero log4j.properties. Puedes sobrescribir la configuracion por defecto por ejemplo para que no sea tan verboso dejando el nivel de error a ERROR para que solo muestre los mensajes de error.

Otra manera de configurar las propiedades de spark es a traves del fichero spark-defaults.conf . El script spark-submit leerá la configuracion usando este fichero. Se puede ver la configuracion actual usando la interfaz web en el cluster spark a traves del puerto 4040 por defecto.

One thing I’ll add is that properties set directly on the SparkConf take highest precedence, then flags passed to spark- submit or spark-shell is second and finally options in the spark-defaults.conf file is the lowest priority.

Enlaces de interes:

http://spark.apache.org/docs/1.3.0/

https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch01.html

http://databricks.gitbooks.io/databricks-spark-knowledge-base/content/

http://www.artima.com/scalazine/articles/steps.html

http://db2university.db2oncampus.com/BD095EN/Transcripts/lesson1-part1-transcript.pdf

http://db2university.db2oncampus.com/BD095EN/Transcripts/lesson1-part2-transcript.pdf

https://es.wikipedia.org/wiki/Grafo_ac%C3%ADclico_dirigido

Hi, after some holidays i am back. this post is about how to use spring-data technology with apache hadoop, or how to write a map reduce task using spring. The idea is to provide a way on how to focus in the most important part about apache hadoop, the map reduce writing task.

So, lets begin with the project, the most impatient (like myself) can find the sources project here.

The example is written in a maven style, so we can see the dependencies in the pom.xml file:

4.0.0
es.aironman.samples
my-spring-data-mapreduce
0.0.1-SNAPSHOT
my sample about how to code a map reduce task for hadoop using spring-data

1.0.3
1.6.1
3.1.2.RELEASE
1.0.0.RELEASE
UTF-8

commons-lang
commons-lang
2.6

org.springframework
spring-beans
${spring.version}

org.springframework
spring-core
${spring.version}

org.springframework
spring-context-support
${spring.version}

org.springframework
spring-context
${spring.version}

cglib
cglib
2.2.2

org.springframework.data
spring-data-hadoop
${spring.data.hadoop.version}

org.apache.hadoop
hadoop-core
${apache.hadoop.version}

org.slf4j
slf4j-api
${slf4j.version}

org.slf4j
slf4j-log4j12
${slf4j.version}

log4j
log4j
1.2.16

junit
junit
4.9
test

org.mockito
mockito-core
1.8.5
test

my-spring-data-mapreduce

org.apache.maven.plugins
maven-compiler-plugin
2.3.2

1.6
1.6

org.apache.maven.plugins
maven-assembly-plugin
2.2.2

src/main/assembly/assembly.xml

org.apache.maven.plugins
maven-jar-plugin
2.3.1

true
lib/
net.petrikainulainen.spring.data.apachehadoop.Main

org.apache.maven.plugins
maven-site-plugin
3.0

org.codehaus.mojo
cobertura-maven-plugin
2.5.1

If you see this file, you can say me that i am not using the latest versions of dependencies!, with time i promise to update this or maybe you can send me a pull/push request to this github project ;) .

In applicationContext.xml file we can see how spring-data project declare which map reduce job is going to be executed.

fs.default.name=${fs.default.name}
mapred.job.tracker=${mapred.job.tracker}

There is an application.properties file with necessary config data, like where is the HDFS (Hadoop data file system), where is the hadoop tracker listening, the input data path with necessary data to be filtered and the output data path with the result. Please, do not forget to erase the output data directory if you launch the map reduce task more that once.

application.properties

fs.default.name=hdfs://localhost:9000
mapred.job.tracker=localhost:9001

input.path=/input/
output.path=/output/

maybe you are going to need to change localhost with the hadoop ip address, check it out!

Now the map reduce classes, they are the same of another project i have talked about in this blog, so i am not going I will not delve deeper into this subject.

I think the code is already well documented, so, the mapper class is:


package es.aironman.samples.spring.data.hadoop;

import java.io.IOException;

import org.apache.commons.lang.math.NumberUtils;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class DominiosRegistradorMapper extends Mapper {

private static final String SEPARATOR = ";";

@Override
public void map(LongWritable key, Text value, Context context) throws IOException,
InterruptedException {

final String[] values = value.toString().split(SEPARATOR);
String agent ;
String totalDomains;
for (int i=0;i<values.length;i++){

agent = format(values[1]);
totalDomains = format(values[2]);

if (NumberUtils.isNumber(totalDomains.toString() ) ){
context.write(new Text(agent), new DoubleWritable(NumberUtils.toDouble(totalDomains)));
}

}//del for
}
private String format(String value) {
return value.trim();
}

}

You may see that data file has this format:

id ; Agente Registrador ; Total dominios;
1 ; 1&1 Internet ; 382.972;
36 ; WEIS CONSULTING ; 4.154;
71 ; MESH DIGITAL LIMITED ; 910;

The idea of the mapper is to split every line by “;”, get every Agente Registrador (agent recorder) and each Total dominios (total domains) and write it the hadoop context. This is a very simple hadoop task, in this phase you can choose which agente recorder want to write to context, for simplicity, i choose to write every agent with its total domain to the context.

Now the reducer class:


package es.aironman.samples.spring.data.hadoop;

import java.io.IOException;
import java.text.DecimalFormat;

import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/***
*
* This Reducer operation consists in keep the largest value of total of registered domains
* @author aironman
*
*/
public class DominiosRegistradorReducer extends Reducer {

private final DecimalFormat decimalFormat = new DecimalFormat("#.###");

public void reduce(Text key, Iterable totalDominiosValues, Context context)
throws IOException, InterruptedException {
double _maxTotalDomains = 0.0f;
for (DoubleWritable totalDominiosValue : totalDominiosValues) {
double _total = totalDominiosValue.get() ;

_maxTotalDomains = Math.max(_maxTotalDomains, _total);
}
context.write(key, new Text(decimalFormat.format(_maxTotalDomains)));

}

}

As you can guess, in this phase i am keeping from hadoop context only the max total domais of each agent. Maybe you want to calculate the minimal or the average. For that, you are going to write a custom writable, but that is beyond of this post. Keep it post for future updates.

Now that´s it!, you can assembly the jar with this command:

mvm clean assembly:assembly

If everything is ok, you can see this output:

SLF4J: Failed to load class “org.slf4j.impl.StaticLoggerBinder”.
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
[INFO] Scanning for projects…
[INFO]
[INFO] ————————————————————————
[INFO] Building my-spring-data-mapreduce 0.0.1-SNAPSHOT
[INFO] ————————————————————————
[INFO]
[INFO] — maven-clean-plugin:2.4.1:clean (default-clean) @ my-spring-data-mapreduce —
[INFO] Deleting /Users/aironman/Documents/ws-spring-data-hadoop/my-spring-data-mapreduce/target
[INFO]
[INFO] ————————————————————————
[INFO] Building my-spring-data-mapreduce 0.0.1-SNAPSHOT
[INFO] ————————————————————————
[INFO]
[INFO] >>> maven-assembly-plugin:2.2.2:assembly (default-cli) @ my-spring-data-mapreduce >>>
[INFO]
[INFO] — maven-resources-plugin:2.5:resources (default-resources) @ my-spring-data-mapreduce —
[debug] execute contextualize
[INFO] Using ‘UTF-8’ encoding to copy filtered resources.
[INFO] Copying 3 resources
[INFO]
[INFO] — maven-compiler-plugin:2.3.2:compile (default-compile) @ my-spring-data-mapreduce —
[INFO] Compiling 3 source files to /Users/aironman/Documents/ws-spring-data-hadoop/my-spring-data-mapreduce/target/classes
[INFO]
[INFO] — maven-resources-plugin:2.5:testResources (default-testResources) @ my-spring-data-mapreduce —
[debug] execute contextualize
[INFO] Using ‘UTF-8’ encoding to copy filtered resources.
[INFO] Copying 0 resource
[INFO]
[INFO] — maven-compiler-plugin:2.3.2:testCompile (default-testCompile) @ my-spring-data-mapreduce —
[INFO] Nothing to compile – all classes are up to date
[INFO]
[INFO] — maven-surefire-plugin:2.10:test (default-test) @ my-spring-data-mapreduce —
[INFO] Surefire report directory: /Users/aironman/Documents/ws-spring-data-hadoop/my-spring-data-mapreduce/target/surefire-reports

——————————————————-
T E S T S
——————————————————-

Results :

Tests run: 0, Failures: 0, Errors: 0, Skipped: 0

[INFO]
[INFO] — maven-jar-plugin:2.3.1:jar (default-jar) @ my-spring-data-mapreduce —
[INFO] Building jar: /Users/aironman/Documents/ws-spring-data-hadoop/my-spring-data-mapreduce/target/my-spring-data-mapreduce.jar
[INFO]
[INFO] <<< maven-assembly-plugin:2.2.2:assembly (default-cli) @ my-spring-data-mapreduce <<<
[INFO]
[INFO] — maven-assembly-plugin:2.2.2:assembly (default-cli) @ my-spring-data-mapreduce —
[INFO] Reading assembly descriptor: src/main/assembly/assembly.xml
[INFO] Building zip: /Users/aironman/Documents/ws-spring-data-hadoop/my-spring-data-mapreduce/target/my-spring-data-mapreduce-bin.zip
[INFO] ————————————————————————
[INFO] BUILD SUCCESS
[INFO] ————————————————————————
[INFO] Total time: 4.245s
[INFO] Finished at: Wed Aug 13 11:59:42 CEST 2014
[INFO] Final Memory: 16M/315M
[INFO] ————————————————————————

The assembly phase will provide you a zip file, unzip it into your hadoop cluster and launch startup.sh script file.

Enjoy!

Update

this is the link from spring-data project

Ante todo, perdonad por esto, pero es que estoy que trino. A ver, señores que trabajan para empresas de trabajo temporal y allegados. No voy a hacer ninguna prueba más de esas que dicen que miden el conocimiento real de programación, no son realistas, se basan normalmente en los típicos programitas que hacíamos en la universidad para aprender a programar. No me llaman la atención, me aburren y lo peor es que no mide nada de nada.

Para demostrar mis habilidades tengo la cuenta en github, donde tengo el código de lo que se hacer y mas me gusta, a saber, programar sistemas del tipo backend que corren en servidores que funcionan 24/7/365. Eso que es? pues pensad en todas las apps que hay por ahi, esas que nos instalamos en nuestros android o iphone, esas apps necesitan comunicarse con, al menos, un servidor para realizar su función. Por lo menos las aplicaciones mas complejas, por que no todas necesitan comunicarse con el exterior. Lo que se hacer, mejor o peor que otros, es programar aplicaciones para el servidor/es de aplicaciones.

También se programar aplicaciones iOS, en teoría sabiendo java podría programar para android, pero tengo un iphone4, comprendo el código html5/css3/jquery y me encanta el movimiento open source, por lo que seré mas receptivo a escuchar ofertas si voy a trabajar con código abierto, aunque no soy un taliban. Si hay algo de fuente cerrada que es mejor que la fuente abierta, lo reconozco y lo uso, de hecho, para el día a día uso un macbook pro retina de finales del 2013, y también uso ubuntu o redhat cuando quiero trabajar con algo relacionado con el bigdata, como programar tareas map reduce sobre apache hadoop y/o apache spark.

 

 

 

The idea behind of this project is to provide an example for a secured web service connected using REST architecture style with command pattern and composite command pattern, to a mongodb instance and a mysql instance too, best of both worlds.

There are at least two ways in java world to connect to a mongo db instance, you can choose spring-data-mongodb project, or Morphia, both of them are very easy to use, you only need to create an interface that extends something and that´s it!.

Using morphia way, you have to declare an interface like this:

package com.aironman.sample.dao;

import org.bson.types.ObjectId;

import com.aironman.sample.dao.model.Employee;

/**
* Date: 12 junio 2014
*
* @author Konrad Malawski
* @author Alonso Isidoro
*/
public interface EmployeeDao extends org.mongodb.morphia.dao.DAO<Employee, ObjectId> {
}

And its implementation file:

package com.aironman.sample.dao;

 

import org.bson.types.ObjectId;

import org.mongodb.morphia.Morphia;

import org.mongodb.morphia.dao.BasicDAO;

import com.aironman.sample.dao.EmployeeDao;

import com.aironman.sample.dao.model.Employee;

import com.mongodb.Mongo;

 

 

/**

 * Date: 12 junio 2014

 *

 * @author Konrad Malawski

 * @author Alonso Isidoro

 */

public class EmployeeDaoMorphiaImpl extends BasicDAO<Employee, ObjectId> implements EmployeeDao {

 

public EmployeeDaoMorphiaImpl(Morphia morphia,Mongo mongo,String dbName) {

super(mongo, morphia,dbName);

}

}

Super easy!

What about if you want to use spring-data-mongo project? an interface and that is all!

package com.aironman.sample.mongo.repository;

import org.springframework.data.mongodb.repository.MongoRepository;

import com.aironman.sample.mongo.documents.Role;

public interface RoleRepository extends MongoRepository<Role, String> {
}

 

And what about jpa?

package com.aironman.sample.dao;

 

import com.aironman.sample.dao.model.User;

import org.springframework.data.repository.CrudRepository;

 

/**

 * User: aironman

 * Date: 4 de junio del 2014

 */

public interface UserDao extends CrudRepository<User,Long> {

}

The most important using this nonsql technology is to design wisely the mongo db document, which is in JSON format, don’t forget about it, and depending of the wrapper technology chosen, spring or morphia, the way to build one differs. For example, the morphia document:

Employee class, modeled with morphia:

@Entity(value = “employees”, noClassnameStored = true)

public class Employee {

 

    @Id

    private ObjectId id;

 

    private String firstName;

    private String lastName; // value types are automatically persisted

 

    Long salary; // only non-null values are stored

 

    @Embedded

    Address address;

 

    @Reference

    Employee       manager; // refs are stored*, and loaded automatically

    @Reference

    List<Employee> underlings; // interfaces are supported

 

//    @Serialized

//    EncryptedReviews enchryptedReviews; // stored in one binary field

 

    @Property(“started”)

    Date startDate; //fields can be renamed

    @Property(“left”)

    Date endDate;

 

    @Indexed

    boolean active = false; //fields can be indexed for better performance

 

    @NotSaved

    String readButNotStored; //fields can read, but not saved

 

    @Transient

    int notStored; //fields can be ignored (load/save)

    transient boolean stored = true; //not @Transient, will be ignored by Serialization/GWT for example.

getters and setters

}

 

Now a spring data document class:

@Document

public class Role {

 

@Id

private String id;

 

public Role() {

super();

}

 

public Role(String id) {

super();

this.setId(id);

}

getters, setters, hashCode and equals method…

}

What differences are? the annotation , org.springframework.data.mongodb.core.mapping.Document for the spring-data and

org.mongodb.morphia.annotations.Entity for Morphia, that`s all.

the used jpa pojo in this example is the User class, with a different @Entity annotation.

@Entity

public class User {

 

    @Id

    @GeneratedValue

    private Long id;

 

    private String firstName;

    private String lastName;

    private String email;

getters and setters

 }

That is the difficult part, enjoy with the rest!

Alonso

Links

http://projects.spring.io/spring-data-mongodb/

https://github.com/mongodb/morphia

The source code is located in https://github.com/alonsoir/mycxf-mongodb-morphia-mysql-sample

 

 

Last week i did an interview with a big video game company, King, probably the casual video games company. The point is they want somebody with strong backend skills, so here i am!, i thought! i have some skills with the back end layer, i know very well about the spring framework, orm, sql, nosql, performance, multithreading, asynchronous tasks, big data technology, etc… that was my thoughts, i have an opportunity, but they demand know how about Pico container. Bad luck, Alonso…

Well, now i know that i need to know something about pico container, so stay tunned with next post related with this interesting technology.

PD

currently i am still available to contract.

This is a draft about my next task, periodically ask Twitter with twitter4j api relevant things, for example, my timeline and trending topics to begin with. I am going to create a web service and then integrate that functionality with a topic rabbitmq  using  spring integration and a websocket managed by a controller, so I can display the relevant info in real time in a browser.

Stay tuned!

update 21 May 2014

Twitter have a very restricted policy about using its api, an usual matter, but i consider it very restricted because i am getting some weird exceptions. A few days before i did not get any of this, but now i think i am banned! grrrrr

 

Failed to delete status: 401:Authentication credentials (https://dev.twitter.com/pages/auth) were missing or incorrect. Ensure that you have set valid consumer key/secret, access token/secret, and the system clock is in sync.

message – Could not authenticate you

code – 32

 

401:Authentication credentials (https://dev.twitter.com/pages/auth) were missing or incorrect. Ensure that you have set valid consumer key/secret, access token/secret, and the system clock is in sync.

message – Could not authenticate you

code – 32

 

Relevant discussions can be found on the Internet at:

http://www.google.co.jp/search?q=c8fb4e9c or

http://www.google.co.jp/search?q=7bffc794

TwitterException{exceptionCode=[c8fb4e9c-7bffc794], statusCode=401, message=Could not authenticate you, code=32, retryAfter=-1, rateLimitStatus=null, version=3.0.6-SNAPSHOT}

at twitter4j.HttpClientImpl.request(HttpClientImpl.java:157)

at twitter4j.HttpClientWrapper.request(HttpClientWrapper.java:58)

at twitter4j.HttpClientWrapper.get(HttpClientWrapper.java:86)

at twitter4j.TwitterImpl.get(TwitterImpl.java:2001)

at twitter4j.TwitterImpl.showUser(TwitterImpl.java:886)

at twitter4j.examples.user.ShowUser.main(ShowUser.java:42)

 

Esto es un borrador acerca de mi próxima tarea, preguntar periódicamente a Twitter con la api twitter4j cosas relevantes, para empezar, mi timeline y los trending topics, para empezar. La forma de preguntar sera creando un servicio web con esa funcionalidad y luego integrarlo con un topic rabbitmq mediante spring integration a un controlador gestionado por websockets, así podré mostrar la info relevante en tiempo real en un navegador.

 

Finally i can continue with this post, a sample with a big data technology, for example, a java map reduce task running on apache hadoop.

First at all, you need to install hadoop, and i have to say that it is not trivial, depending of your SO, you may install it with apt, yum, brew, etc… or like i did, downloading a vmware image with all necessary stuff. There are some providers, like Cloudera or IBM BigInsights. I choose the last one because of i learn big data concepts  in bigdatauniversity.com, an iniciative from IBM.

Once downloaded the big insights vmware image, you can launch the boot, login with biadmin/biadmin and then click on Start BigInsights button, after few minutes, hadoop will be up and running. Go to http://bivm:8080/data/html/index.html#redirect-welcome in the firefox big insights and you can see it.

Once you have a hadoop cluster to play, it is time to code something, but first, you need to analyze the text, i put a little text, but real data are terabytes, hexabytes or more data with this format, thousands of billions lines with this format:

 id ; Agente Registrador   ; Total dominios;

 1  ; 1&1 Internet    ; 382.972;

 36 ; WEIS CONSULTING    ; 4.154;

 71 ; MESH DIGITAL LIMITED ; 910;

This is the mapper, the purpose of the mapper is to create a list with keys and values.

 

public class DominiosRegistradorMapper extends Mapper<LongWritable, Text, Text, DoubleWritable> {

 

privatestaticfinal String SEPARATOR = “;”;

 

@Override

public void map(LongWritable key, Text value, Context context) throws IOException,

InterruptedException { 

final String[] values = value.toString().split(SEPARATOR);

for (int i=0;i<values.length;i++){

/**

* id ; Agente Registrador   ; Total dominios;

* 1  ; 1&1 Internet    ; 382.972;

* 36 ; WEIS CONSULTING    ; 4.154;

* 71 ; MESH DIGITAL LIMITED ; 910;

* */

final String agente = format(values[1]);

final String totalDominios = format(values[2]); 

if (NumberUtils.isNumber(totalDominios.toString() ) ) 

context.write(new Text(agente), new DoubleWritable(NumberUtils.toDouble(totalDominios)));

 

}//del for

}

private String format(String value) {

return value.trim();

}

}

 

This is the reducer:

public class DominiosRegistradorReducer extends Reducer<Text, DoubleWritable, Text, Text> {

 

private final DecimalFormat decimalFormat = new DecimalFormat(“#.###”);

 

public void reduce(Text key, Iterable<DoubleWritable> totalDominiosValues, Context context)

throws IOException, InterruptedException {

double_maxtotalDominios = 0.0f;

 

for (DoubleWritable totalDominiosValue : totalDominiosValues) {

double_total = totalDominiosValue.get() ;

 

_maxtotalDominios = Math.max(_maxtotalDominios, _total);

}

// i need to keep with the agent which largest number of domains

context.write(key, new Text(decimalFormat.format(_maxtotalDominios)));

}

}

This is the main class:

publicclass App extends Configured implements Tool 

{

@Override

public int run(String[] args) throws Exception {

 

if (args.length != 2) {

System.err.println(“DominiosRegistradorManager required params: {input file} {output dir}”);

System.exit(-1);

}

 

deleteOutputFileIfExists(args);

 

final Job job = newJob(getConf(),“DominiosRegistradorManager”);

job.setJarByClass(App.class);

job.setInputFormatClass(TextInputFormat.class);

job.setOutputFormatClass(TextOutputFormat.class);

 

job.setMapperClass(DominiosRegistradorMapper.class);

job.setReducerClass(DominiosRegistradorReducer.class);

 

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(DoubleWritable.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(Text.class);

 

FileInputFormat.addInputPath(job, new Path(args[0]));

FileOutputFormat.setOutputPath(job, new Path(args[1]));

 

job.waitForCompletion(true);

 

return 0;

}

 

private void deleteOutputFileIfExists(String[] args) throws IOException {

final Path output = new Path(args[1]);

FileSystem.get(output.toUri(), getConf()).delete(output, true);

}

 

public static void main(String[] args) throws Exception {

ToolRunner.run(new App(), args);

}

}

Now you have a glimpse of the code, you can download it and import to your eclipse. Once imported, you need to create a jar. With that jar and the cluster online, you are almost ready to launch the code, but probably you need to import the huge text file with data from http://datos.gob.es, download it and export to your cluster. I recommend to use the browser for that, click on Start BigInsigths if you don’t yet did , open Biginsights web console,  click Files, on the left you can see an HDFS tree, that is the hadoop file system, expand it until /Users/biadmin/, create a directory, for example, inputMR, so you can see /Users/biadmin/inputMR in your tree. You must upload the example file to that directory. You need to create outputMR directory as well

[biadmin@bivm ~]$ hadoop jar nameOfYourJar.jar /user/biadmin/inputMR /user/biadmin/outputMR
14/05/12 12:09:24 INFO input.FileInputFormat: Total input paths to process : 2
14/05/12 12:09:24 WARN snappy.LoadSnappy: Snappy native library is available
14/05/12 12:09:24 INFO util.NativeCodeLoader: Loaded the native-hadoop library
14/05/12 12:09:24 INFO snappy.LoadSnappy: Snappy native library loaded
14/05/12 12:09:24 INFO mapred.JobClient: Running job: job_201405121126_0059
14/05/12 12:09:25 INFO mapred.JobClient: map 0% reduce 0%
14/05/12 12:09:31 INFO mapred.JobClient: map 50% reduce 0%
14/05/12 12:09:34 INFO mapred.JobClient: map 100% reduce 0%
14/05/12 12:09:43 INFO mapred.JobClient: map 100% reduce 100%
14/05/12 12:09:44 INFO mapred.JobClient: Job complete: job_201405121126_0059
14/05/12 12:09:44 INFO mapred.JobClient: Counters: 29
14/05/12 12:09:44 INFO mapred.JobClient: Job Counters
14/05/12 12:09:44 INFO mapred.JobClient: Data-local map tasks=2
14/05/12 12:09:44 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=8827
14/05/12 12:09:44 INFO mapred.JobClient: Launched map tasks=2
14/05/12 12:09:44 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0
14/05/12 12:09:44 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0
14/05/12 12:09:44 INFO mapred.JobClient: Launched reduce tasks=1
14/05/12 12:09:44 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=10952
14/05/12 12:09:44 INFO mapred.JobClient: File Input Format Counters
14/05/12 12:09:44 INFO mapred.JobClient: Bytes Read=197
14/05/12 12:09:44 INFO mapred.JobClient: File Output Format Counters
14/05/12 12:09:44 INFO mapred.JobClient: Bytes Written=19
14/05/12 12:09:44 INFO mapred.JobClient: FileSystemCounters
14/05/12 12:09:44 INFO mapred.JobClient: HDFS_BYTES_READ=413
14/05/12 12:09:44 INFO mapred.JobClient: FILE_BYTES_WRITTEN=76101
14/05/12 12:09:44 INFO mapred.JobClient: FILE_BYTES_READ=50
14/05/12 12:09:44 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=19
14/05/12 12:09:44 INFO mapred.JobClient: Map-Reduce Framework
14/05/12 12:09:44 INFO mapred.JobClient: Virtual memory (bytes) snapshot=3867070464
14/05/12 12:09:44 INFO mapred.JobClient: Reduce input groups=2
14/05/12 12:09:44 INFO mapred.JobClient: Combine output records=4
14/05/12 12:09:44 INFO mapred.JobClient: Map output records=4
14/05/12 12:09:44 INFO mapred.JobClient: CPU time spent (ms)=1960
14/05/12 12:09:44 INFO mapred.JobClient: Map input records=2
14/05/12 12:09:44 INFO mapred.JobClient: Reduce shuffle bytes=56
14/05/12 12:09:44 INFO mapred.JobClient: Combine input records=4
14/05/12 12:09:44 INFO mapred.JobClient: Spilled Records=8
14/05/12 12:09:44 INFO mapred.JobClient: SPLIT_RAW_BYTES=216
14/05/12 12:09:44 INFO mapred.JobClient: Map output bytes=36
14/05/12 12:09:44 INFO mapred.JobClient: Reduce input records=4
14/05/12 12:09:44 INFO mapred.JobClient: Physical memory (bytes) snapshot=697741312
14/05/12 12:09:44 INFO mapred.JobClient: Total committed heap usage (bytes)=746494976
14/05/12 12:09:44 INFO mapred.JobClient: Reduce output records=2
14/05/12 12:09:44 INFO mapred.JobClient: Map output materialized bytes=56
[biadmin@bivm ~]$

If you see something like this, congrats! your map reduce task is already done! the results are in /users/biadmin/outputMR

the source is located in https://github.com/alonsoir/mrDominioRegistrador

the data is taken from http://datos.gob.es

http://en.wikipedia.org/wiki/MapReduce

Enjoy!

 

Seguir

Recibe cada nueva publicación en tu buzón de correo electrónico.

Únete a otros 75 seguidores