Recently I did a test for a company in which they asked for two apparently very simple exercises but which contain a great deal of complexity as soon as you realise, so I’m going to do an exercise in analysis of how I saw that problem and how I would deal with it. I give brushstrokes, with some code, I’m sorry, this time I haven’t had time to create a complete solution, more than anything because I reserve it as a personal project for the future, as it has potential.

The challenge consists of two exercises:

1. Parse the data with a time_init, time_end

Build a tool, that given the name of a file (with the format described above), an init_datetime, an end_datetime,
and a Hostname, returns:

a list of hostnames connected to the given host during the given period

ANALYSIS of the problem:

Having this data in log files:

<unix_timestamp> <from-hostname> <to-hostname>
1366815793 quark garak
1366815795 brunt quark
1366815811 lilac garak
...

I have to get the from-hostnames from the following input data:

    init_datetime, end_datetime, to-hostname

I mean, given an entry like that:

    init_datetime,end_datetime,to-hostname
    1366815790,1366815811,garak

The way out is:

    quark 1366815793
    lilac 1366815811

DEDUCTION PROCESS

Maybe we should make that unix_timestamp into something human readable, but for now I'll leave it like that because 
I'm going to start from the fact that to make the calculations between windows of use, I will also have as an input 
parameter another unix_timestamp that is a Long number, ideal for working with this data. You only have to add them,
subtract them, compare if they are bigger or minors...

Or an entry like this:

    1366815790,1366815795,quark

The way out is:

    brunt 1366815795


This seems a perfect example to use sparksql and dataframes, you could even think about a data streaming solution 
because the log file can grow every second, so, using spark structured streaming to load the file as it is 
produced on a remote machine is perfectly feasible and only minimal code changes would need to be made to create the 
Dataframe. For simplicity reasons, I will assume the first option, to load a static data csv file to create the 
Dataframe and to be able to make a SQL query that solves the first problem.

The main reason is that these log files can be really huge, of GB or TB per day, so their best place is a distributed 
file system where we can distribute that load and work with them in the memory of each one of the nodes.
If we need to process bigger files, we add more nodes to the cluster (scale out) and/or extend the capacity of each one 
of the nodes in memory/cpu/hdd (scale up). 

It does not make sense to create an ad hoc tool to solve a problem like this because literally to deal with files that 
are potentially huge, you would need to create a distributed processing framework and if you wanted it to be fast and 
efficient in time processing, you would need it to be a framework that works with the data in the RAM memory of each 
node. 

That is literally the definition of the Apache Spark framework, distributed processing of large amounts of data in 
memory.

Besides, the problem is fixed by mounting a SQL statement in a natural way.

I consider that not working for this kind of problems with something other than Apache Spark is like traveling to the
past living in the present.

A PROPOSED SOLUTION

In principle i have to

1) create a suitable scheme for these fields. 
2) create a Dataframe with the information of that csv file, using the previous scheme.
3) register a table in memory, 
4) create an sql such that it is more or less:

    SELECT from,unix FROM LOGS WHERE $init_datetime < unix AND unix <=$end_datetime AND to = '${to_hostname}
5) Display data, save it in parquet format, create a hive table...

Then, the whole script, assuming I’m behind a spark-shell,although a real solution to put it in production requires the
assembly of the Jar with its respective dependencies…

import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.LongType
import org.apache.spark.sql.SparkSession

// not needed using a spark-shell
val spark = SparkSession.builder().appName("Spark SQL company.io examples").getOrCreate()

val schema = new StructType().add("unix",LongType).add("from",StringType).add("to",StringType)

// Obviosly this should be a parameter
val path = "/Users/aironman/gitProjects/alonsoir.github.io/test-company.csv"

val dfLogsWithSchema = spark.sqlContext.read.format("csv").option("delimiter"," ").option("quote","")
                                                                                  .option("header", "false")
                                                                                  .schema(schema)
                                                                                  .load(path)

dfLogsWithSchema.registerTempTable("LOGS")

spark.sql("SELECT * FROM LOGS").show

val init_datetime = 1366815790l

val end_datetime = 1366815811l

val to_hostname = "garak"

println(s "USING to_hostname: $to_hostname" + s" init_datetime: $init_datetime" + s" end_datetime: $end_datetime")

// i am printing data, but, i could save to a parquet file or whatever we have to.
spark.sql(s"""SELECT from,unix FROM LOGS WHERE $init_datetime < unix AND unix <=$end_datetime AND to = '${to_hostname}'"")
     .show(false)

Obviously this is not a complete program, we must add the variables to collect by parameter the init_datetime,
the end_datetime and to_hostname, the structure of the project is missing,
the premises, build the jar with maven or sbt and add, perhaps, to the invocation of spark-submit
the necessary optimization parameters that will vary depending on the cluster that is in production.
The ideal is always that the complete data set fits in the RAM of all the nodes, but mainly in the Driver,
so it must be properly provided.

2. Unlimited Input Parser

The tool should both parse previously written log files and terminate or collect input from a new log file while it’s
being written and run indefinitely.
The script will output, once every hour:
● a list of hostnames connected to a given (configurable) host during the last hour
● a list of hostnames received connections from a given (configurable) host during the last hour
● the hostname that generated most connections in the last hour
Both the number of log lines and hostnames can be very high. Consider implementing a CPU and memory-efficient solution.
Please feel free to make assumptions as necessary with proper documentation.

Analysis

That is, we have multiple log files like the one above, so I have to assume that these files exist on multiple external file
systems,and every hour, I have to add all those log files into one, so, I have to generate an output that

  1. show a list of machines that have connected to a machine (configurable by parameter) during the last hour.

DEDUCTION PROCESS

Something like this, starting from the first exercise, capturing an added time window of one hour in a previous step
of the process flow, in which you have to add the log files produced every hour in a single csv file, create a Dataframe
with those data taking into account the scheme that the final file will have, create a temporary table where we can
launch the sql sentences, something like that.

Note that now that LOGS table, will only contain the hourly aggregate, so it would not be necessary to add
more filters to the WHERE.

SELECT from,unix FROM LOGS WHERE to = '${to_hostname}'
  1. Show a list of machines that have received the connection during the last hour.

DEDUCTION PROCESS

Same as above but showing the opposite.

SELECT to,unix FROM LOGS WHERE from = ‘${from_hostname}’

  1. Show the name of the machine that has generated the most connections in the last hour…

DEDUCTION PROCESS

By definition, what does this mean? Do you want the name of the machine from which you have generated the most connections?
the name of the machine that generated the most connections? the larger of the two? It is not clear…

Assuming that you want the largest of the hosts, to and from, you have to do two queries in which you take out the number of
hosts, along with the host name, sort them in descending order and keep the first one, so that you have the machine from or
to with the most connections, and then you only have to decide which one is larger.

SELECT from, count(from) as cont FROM LOGS WHERE from = '${from_hostname}' ORDER BY from GROUP BY count

in spark would be something like that:

        df.select(df.col("from"), count("from"))
          .filter(df.col("from") = {'${from_hostname}'})
          .groupBy(df.col("from"))
          .agg(max(count("from")))
          .cache()
          .show()

        df.select(df.col("to"), count("to"))
          .filter(df.col("to") = {'${to_hostname}'})
          .groupBy(df.col("to"))
          .agg(max(count("to")))
          .cache()
          .show()           

Obviously, showing the data I’m not going to take the meter out, but I can dump it into a text file and then
read it from there to operate with it. I’d have to look at how to do this more effectively.

How do I solve it, how do I add those remote log files into one every hour?

Using some streaming tool that does the collection every hour?
using the term every hour implies that a task scheduler like quartz should execute a data collection task
from each external file system and add them in a single data file where the three previous points would be
executed.

Something like this can be done,

  1. every hour I connect to each external file system and bring each one to finally add it in a final one.
    Once added, I rename the files in that directory in a way that they won’t be processed again, or I
    move them to another directory.

The main point is that they won’t be reprocessed.

To do this, I first need a job that connects to an external file system and brings me that data.
Each of those files will be on a machine given an ip and a port.

For example, use something like the LogFileRecoveryTool.scala job that is in this same Github repository.

            https://github.com/alonsoir/alonsoir.github.io/blob/master/LogFileRecoveryTool.scala
  1. Every hour I collect the data stored in each file system created by each Job LogFileRecoveryTool, adding
    the data in a final log file.

For this, I can use some task scheduler, so that it is activated every hour, scanning a directory where
the log files of each Job have been left spark streaming from the previous point, will take those log
files and create a final log file aggregate with all the lines.

You can create a parquet, avro, orc, csv file without compression, with compression…
This task scheduler can be from a simple CRON job, to use some java or scala process that uses some
library like quartz.

  1. Once I have that final file, create a final spark process, create the DataFrame with that file, create
    the temporary table and make the necessary queries described in the three previous points.
  2. Obviously, the orderly execution of each of these components needs to be precisely orchestrated with a
    master orchestrator, so that he or she takes care of the correct orderly execution of the different
    components.

We have the log collectors capturing logs from each of the machines, so this orchestrator will be in
charge of checking periodically that the processes have active PID, not in zombie state, re-launching
them in case for some reason they were down.

If after several attempts, the collecting processes fail, an alarm must be launched to look more closely.

On the other hand, the orchestrator must also check the health of the Quartz process which collects every
hour the log files captured by the previous processes and adds them in a single final log file.
Once this is done, he must notify the super orchestrator to launch the final processes.

This termination notice can be done in different ways, for example, by sending an event through the message
queue, so you would need to include this logic in the Quartz process that is in charge of adding the logs,
you could insert in a SQL table a boolean representing a state, so that the master orchestrator knows what
he is in to proceed.

To give two examples. Depending on the choice of one way or another, the implementation of this master
orchestrator varies.

The implementation of this master orchestrator basically checks the execution status of each element.
There are elements that are always executed, such as log collectors. These are launched with a bash
script, which includes a call to the spark-submit command.

When the bash script is launched, we can examine the state of the spark process by examining its PID
periodically.

Another way to do the monitoring for these processes is to check the same log files of each of these spark
processes by streaming.

These spark log files usually live in the HDFS, so you have to scan those files and look for the
appropriate messages to check the health status of the processes, especially those marked as ERROR,
depending on the type of error, I always suggest controlled and programmed exceptions to know exactly what
type of error it is and some INFO messages like SUCCESS.

In the past I found that this log checking solution, requires a lot of memory, resources that we may not have,
so the periodic PID checking is lighter in terms of resources.

This master orchestrator can be done in many ways, basically you have to understand that it is a daemon
process that will take care of checking the execution status of each of the sub-components.

Depending on the technology selected, you would have to do one thing or another, are details of implementation.

For example, we could use Apache Camel, a framework to integrate components, the execution of them and we could
manage the state of them in several ways too. Implementation details.

I have done things like these in the past for other customers. I know how to do it, but it takes a while.

Alternatives

Alternatives within the open source world? you could use apache kafka (kafka-streams, ksql) to collect in each
partitioned topic an external log file and then merge all the topics and work the same way I used spark to mount
the sql query.

Although at the time of writing I was not clear on how to manage the merge every hour, I guess using a time window
in spark with kafka support instead of creating a quartz component.

Still, I could probably switch from using quartz as a task scheduler to delegating to spark or kafka to do the
hourly aggregation of each log.

I could use kibana and elasticsearch to make the necessary queries to solve the problem.
I could use too Apache Flink instead of spark streaming to capture log data.

Conclusion and Acknowledgements.

It has been fun to do the intellectual exercise, thank you for the opportunity.

Have fun in these bad times that we have had to live, take care of yourselves at home, take care of your parents and grandparents, we will all overcome this crisis of the Coronavirus.

Alonso.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s