Some time ago, in a galaxy far far away, a company proposed me to do a challenge that consisted in creating a real time log file processing architecture, not in describing what one would look like, but in creating it from scratch. Obviously that felt like an attempt to rip me off, to steal work, so I took it for a while as I would try to tackle this problem. Here are the ideas that came up during that morning, ideas that in the real world would be confronted with the ideas of other colleagues to finally reach a point where we would be clear about the next steps. Needless to say, ClarityAI did not even respond to me, none of their engineers deigned to respond. The poor recruiter had to respond, "the proposal was not what they asked for. They wanted code, or it seemed like a vague proposal..."

Well, that's what we usually talk about doing brain storming.


README.md

The idea of writing this text is to show the process of inference I take to face as I solve this problem.

clarity code challenge:

The challenge consists of two exercises

## 1. Parse the data with a time_init, time_end
Build a tool, that given the name of a file (with the format described above), an init_datetime, an end_datetime, 
and a Hostname, returns:

a list of hostnames connected to the given host during the given period

ANALYSIS of the problem:

Having this data in log files:

	<unix_timestamp> <from-hostname> <to-hostname>
	1366815793 quark garak
	1366815795 brunt quark
	1366815811 lilac garak
	...

I have to get the from-hostnames from the following input data:

	init_datetime, end_datetime, to-hostname

I mean, given an entry like that:

	init_datetime,end_datetime,to-hostname
	1366815790,1366815811,garak

The way out is:

	quark 1366815793
	lilac 1366815811

DEDUCTION PROCESS 

Maybe we should make that unix_timestamp into something human readable, but for now I'll leave it like that because 
I'm going to start from the fact that to make the calculations between windows of use, I will also have as an input parameter another unix_timestamp that is a Long number, ideal for working with this data. You only have to add them, subtract them, compare if they are bigger or minors...

Or an entry like this:

1366815790,1366815795,quark

The output is:

	brunt 1366815795


This seems a perfect example to use sparksql and dataframes, you could even think about a data streaming solution because the log file can grow every second, so, using spark structured streaming to load the file as it is produced on a remote machine is perfectly feasible and only minimal code changes would need to be made to create the Dataframe. For simplicity reasons, I will assume the first option, to load a static data csv file to create the Dataframe and to be able to make a SQL query that solves the first problem.

The main reason is that these log files can be really huge, of GB or TB per day, so their best place is a distributed file system where we can distribute that load and work with them in the memory of each one of the nodes.
	
If we need to process bigger files, we add more nodes to the cluster (scale out) and/or extend the capacity of each one of the nodes in memory/cpu/hdd (scale up). 

It does not make sense to create an ad hoc tool to solve a problem like this because literally to deal with files that are potentially huge, you would need to create a distributed processing framework and if you wanted it to be fast and efficient in time processing, you would need it to be a framework that works with the data in the RAM memory of each node. 

That is literally the definition of the Apache Spark framework, distributed processing of large amounts of data in memory.

Besides, the problem is fixed by mounting a SQL statement in a natural way.

A PROPOSAL

In principle i have to 

	1) create a suitable scheme for these fields. 
	2) create a Dataframe with the information of that csv file, using the previous scheme.
	3) register a table in memory, 
	4) create an sql such that it is more or less:

	SELECT from,unix FROM LOGS WHERE $init_datetime < unix AND unix <=$end_datetime AND to = '${to_hostname}
	5) Display data, save it in parquet format, create a hive table...


Then, the whole script, assuming I'm behind a spark-shell,although a real solution to put it in production requires the 
assembly of the Jar with its respective dependencies...

	import org.apache.spark.sql.types.StructType
	import org.apache.spark.sql.types.StringType
	import org.apache.spark.sql.types.LongType
	import org.apache.spark.sql.SparkSession

	// not needed using a spark-shell
	val spark = SparkSession.builder().appName("Spark SQL clarity.io examples").getOrCreate()

	val schema = new StructType().add("unix",LongType).add("from",StringType).add("to",StringType)

	// Obviosly this should be a parameter
	val path = "/Users/aironman/gitProjects/alonsoir.github.io/test-clarity.csv"

	val dfLogsWithSchema = spark.sqlContext.read.format("csv").option("delimiter"," ").option("quote","")
																					  .option("header", "false")
																					  .schema(schema)
																					  .load(path)

	dfLogsWithSchema.registerTempTable("LOGS")

	spark.sql("SELECT * FROM LOGS").show

	val init_datetime = 1366815790l
	
	val end_datetime = 1366815811l
	
	val to_hostname = "garak"
	
	println(s "USING to_hostname: $to_hostname" + s" init_datetime: $init_datetime" + s" end_datetime: $end_datetime")

	// i am printing data, but, i could save to a parquet file or whatever we have to.
	spark.sql(s"""SELECT from,unix FROM LOGS WHERE $init_datetime < unix AND unix <=$end_datetime AND to = '${to_hostname}'"")
		 .show(false)

Obviously this is not a complete program, we must add the variables to collect by parameter the init_datetime, the end_datetime and to_hostname, the structure of the project is missing, 
the premises, build the jar with maven or sbt and add, perhaps, to the invocation of spark-submit
the necessary optimization parameters that will vary depending on the cluster that is in production. 
The ideal is always that the complete data set fits in the RAM of all the nodes, but mainly in the Driver, 
so it must be properly provided. 

links

	https://stackoverflow.com/questions/39281152/spark-unix-timestamp-data-type-mismatch
	https://stackoverflow.com/questions/39926411/provide-schema-while-reading-csv-file-as-a-dataframe
	https://sparkbyexamples.com/spark/rename-a-column-on-spark-dataframes/
	https://stackoverflow.com/questions/29704333/spark-load-csv-file-as-dataframe

## 2. Unlimited Input Parser

The tool should both parse previously written log files and terminate or collect input from a new log file while it's 
being written and run indefinitely.
The script will output, once every hour:
● a list of hostnames connected to a given (configurable) host during the last hour
● a list of hostnames received connections from a given (configurable) host during the last hour
● the hostname that generated most connections in the last hour
Both the number of log lines and hostnames can be very high. Consider implementing a CPU and memory-efficient solution. 
Please feel free to make assumptions as necessary with proper documentation.

Analysis

That is, we have multiple log files like the one above, so I have to assume that these files exist on multiple external file 
systems,and every hour, I have to add all those log files into one, so, I have to generate an output that

	1. show a list of machines that have connected to a machine (configurable by parameter) during the last hour.

DEDUCTION PROCESS

Something like this, starting from the first exercise, capturing an added time window of one hour in a previous step of the process flow, in which you have to add the log files produced every hour in a single csv file, create a Dataframe with those data taking into account the scheme that the final file will have, create a temporary table where we can launch the sql sentences, something like that. 

Note that now that LOGS table, will only contain the hourly aggregate, so it would not be necessary to add 
more filters to the WHERE part sentence.

		SELECT from,unix FROM LOGS WHERE to = '${to_hostname}'

	2. Show a list of machines that have received the connection during the last hour. 

	DEDUCTION PROCESS
		
		Same as above but showing the opposite.

		SELECT to,unix FROM LOGS WHERE from = '${from_hostname}'

	3. Show the name of the machine that has generated the most connections in the last hour...

	DEDUCTION PROCESS

	By definition, what does this mean? Do you want the name of the machine from which you have generated the most connections? the name of the machine that generated the most connections? the larger of the two? It is not clear...

	Assuming that you want the largest of the hosts, to and from, you have to do two queries in which you take out the number of hosts, along with the hostname, sort them in descending order and keep the first one, so that you have the machine from or to with the most connections, and then you only have to decide which one is larger.

		SELECT from, count(from) as cont FROM LOGS WHERE from = '${from_hostname}' ORDER BY from GROUP BY (from,cont)

		in spark would be something like that:

			df.select(df.col("from"), count("from"))
			  .filter(df.col("from") = {'${from_hostname}'})
			  .groupBy(df.col("from"))
			  .agg(max(count("from")))
			  .cache()
			  .show()

			df.select(df.col("to"), count("to"))
			  .filter(df.col("to") = {'${to_hostname}'})
			  .groupBy(df.col("to"))
			  .agg(max(count("to")))
			  .cache()
			  .show()			

Obviously, showing the data I'm not going to take the meter out, but I can dump it into a avro, parquet, orc, text file or whatever and then read it in a posterior step. I'd have to look at how to do this more effectively.  

How do I solve it, how do I add those remote log files into one every hour? 

with some streaming tool that does the collection every hour? using the term every hour implies that a task scheduler like quartz, cron, should execute a data collection task from each external file system and add them in a single data file where the three previous points would be executed. 
		
Something like this can be done, 
		
1. every hour I connect to each external file system and bring each one to finally add it in a final one. Once added, I rename the files in that directory in a way that they won't be processed again, or I will move them to another directory. 
			
The main point is that they won't be reprocessed.

To do this, I first need a job that connects to an external file system and brings me that data. 
Each of those files will be on a machine given an ip and a port.
			
For example, use something like the LogFileRecoveryTool.scala job that is in this same Github repository.

				https://github.com/alonsoir/alonsoir.github.io/blob/master/LogFileRecoveryTool.scala

2. Every hour I collect the data stored in each file system created by each Job LogFileRecoveryTool, adding the data in a final log file.

For this, I can use some task scheduler, so that it is activated every hour, scanning a directory where the log files of each Job have been left spark streaming from the previous point, will take those log files and create a final log file aggregate with all the lines.
		   
You can create a parquet, avro, orc, csv file without compression, with compression...
This task scheduler can be from a simple CRON job, to use some java or scala process that uses some library like quartz. 

3. Once I have that final file, create a final spark process, create the DataFrame with that file, create the temporary table and make the necessary queries described in the three previous points.

4. Obviously, the orderly execution of each of these components needs to be precisely orchestrated with a master orchestrator, so that he or she takes care of the correct orderly execution of the different components. 

We have the log collectors capturing logs from each of the machines, so this orchestrator will be in charge of checking periodically that the processes have active PID, not in zombie state, re-launching them in case for some reason they were down.

If after several attempts, the collecting processes fail, an alarm must be launched to look more closely.
 
On the other hand, the orchestrator must also check the health of the Quartz process which collects every hour the log files captured by the previous processes and adds them in a single final log file.

Once this is done, he must notify the super orchestrator to launch the final processes. 

This termination notice can be done in different ways, for example, by sending an event through the message queue, so you would need to include this logic in the Quartz process that is in charge of adding the logs, you could insert in a SQL table a boolean representing a state, so that the master orchestrator knows what it is necessary to process.
		   
To give a couple examples. Depending on the choice of one way or another, the implementation of this master orchestrator varies. 
	
The implementation of this master orchestrator basically checks the execution status of each element.
There are elements that are always executed, such as log collectors. These are launched with a bashscript, which includes a call to the spark-submit command. 
		   
When the bash script is launched, we can examine the state of the spark process by examining its PID periodically. 

Another way to do the monitoring for these processes is to check the same log files of each of these spark processes by streaming. These spark log files usually live in the HDFS, so you have to scan those files and look for the appropriate messages to check the health status of the processes, especially those marked as ERROR, depending on the type of error, I always recommend controlled and programmed exceptions to know exactly what type of error it is and some INFO messages like SUCCESS.
In the past I found that this log checking solution, requires a lot of memory, resources that we may not have, so the periodic PID checking is lighter in terms of resources.
This master orchestrator can be done in many ways, basically you have to understand that it is a daemon process that will take care of checking the execution status of each of the sub-components.
Depending on the technology selected, you would have to do one thing or another, are details of implementation.
For example, we could use Apache Camel, a framework to integrate components, the execution of them and we could manage the state of them in several ways too. 

Implementation details.

I have done things like these in the past for other customers. I know how to do it, but it takes a while.
	
Alternatives
Surely this can be done in alternative ways, for example, by using proprietary software already designed specifically for this type of task. 
Since i don't have access at the time of writing on that proprietary software, I decide to use open source software (apache spark, java and scala) that is available to everyone.
Alternatives within the open source world? you could use apache kafka (kafka-streams, ksql) to collect in each partitioned topic an external log file and then merge all the topics and work the same way I used spark to mount the sql query.

Although at the time of writing I was not clear on how to manage the merge every hour, I guess using a time window aggregation process in spark, using kafka streams, Kafka connect, instead of creating a quartz component.
Still, I could probably switch from using quartz as a task scheduler to delegating to spark or kafka to do the hourly aggregation of each log.

Probably, with time,  I would make several versions of all these ideas to keep the best idea agreed with the team and the one that gives better performance with less use of resources and ease of use.

I could use kibana and elasticsearch to make the necessary queries to solve the problem once we have the data aggregated. 
I could use too Apache Flink instead of spark streaming to capture log data if the data stream is so large that it needs to be processed on an event-by-event basis as it arrives.

It would also be worth taking a look at this product, and this one: 

			 

Conclusion and thanks.

It has been fun to do the intellectual exercise, I know it doesn't fit perfectly, but I wanted to at least show my way of thinking with this idea of how to make an architecture for a seemingly simple problem, but we both know that working with log files has always been a huge problem , due to the distributed nature of them and the resource management needed to maintain them and process them to get value out of them.

Thank you very much for everything, I'll be happy to talk to you about anything you might need.

Alonso

links

	http://www.quartz-scheduler.org

	https://medium.com/search?q=spark%20streaming

	https://databricks.gitbooks.io/databricks-spark-reference-applications/content/logs_analyzer/chapter1/streaming.html

	https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala

	https://www.baeldung.com/spring-quartz-schedule

	https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html

	https://www.google.com/search?client=safari&rls=en&sxsrf=ALeKk0313EQfbDJdv-_r9IWg0GhCXuFzyQ%3A1584619451466&ei=u19zXoeOHI-elwTWoIiwAw&q=spark+streaming+consuming+log+files&oq=spark+streaming+consuming+log+files&gs_l=psy-ab.3...25984.28445..28818...0.0..0.93.1398.19......0....1..gws-wiz.......35i39j33i10.0n_Yp-DgSgA&ved=0ahUKEwjH14i8v6boAhUPz4UKHVYQAjYQ4dUDCAo&uact=5

	https://stackoverflow.com/questions/33009935/window-in-spark-streaming?rq=1

	https://www.google.com/search?client=safari&rls=en&sxsrf=ALeKk00X0b8koqoi2akRPmfDTsXV7tFqtQ%3A1584620095532&ei=P2JzXpWTIIu-aJuZmaAO&q=collecting+log+files+every+hour+using+structured+streaming&oq=collecting+log+files+every+hour+using+structured+streaming&gs_l=psy-ab.3...10051.14403..15546...2.2..0.378.1411.9j2j0j1......0....1..gws-wiz.......0i71j33i10.1o_8NKJxqzE&ved=0ahUKEwjVrpfvwaboAhULHxoKHZtMBuQQ4dUDCAo&uact=5

	http://www.diegocalvo.es/funciones-estadisticas-de-dataframes-en-scala/	

	https://allaboutscala.com/big-data/spark/#dataframe-sql-group-by-count-filter

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s