About how to validate programmatically a input csv file in order to convert it to a parquet file.

Hi, maybe one day you will need to create some parquet files starting from some enormous csv files. Probably those big files comes from a external party and you will not have any control from them and you still need to digest these files into your Data lake, but what happen if those csv files are erroneous? It will probably fail when you try to create those parquet files. You need to validate them according to a json schema file. The function is quite generic, you can declare in your json schema file fields with string, integer or decimal fields.

If you need to validate another type of field, go to the line with this sentence and add your validation:

if (actualType == "string"){
...
import org.apache.spark.sql.types._
import org.apache.hadoop.fs.{FSDataInputStream, Path}
import org.apache.spark.sql.SQLContext
import scala.math.BigDecimal
import org.json4s._
import org.json4s.jackson.JsonMethods._

case class MyFields (name:String,`type`:String,nullable:Boolean)

case class MyJsonFields (`type`:String, `fields`:List[MyFields])


/***
 A function to validate fields in a csv file. Those fields must be declared in the json schema file.
*/
def validateGenericCSV (_fileToReview:String,_pathSchemaJson:String,_sc : org.apache.spark.SparkContext, _sqlContext: org.apache.spark.sql.SQLContext) : Boolean = {

 implicit val formats = DefaultFormats

 val fileToReview = _fileToReview

 val pathSchemaJson = new Path(_pathSchemaJson)
 
 val fileSystem = pathSchemaJson.getFileSystem(_sc.hadoopConfiguration)

 val inputStream: FSDataInputStream = fileSystem.open(pathSchemaJson)

 val schema_json = Stream.cons(inputStream.readLine(), Stream.continually( inputStream.readLine))

 println(schema_json.head)

 val rawJson = schema_json.head

 val jsonValue : org.json4s.JValue = parse(rawJson)

 val myJsonFields : MyJsonFields = jsonValue.extract[MyJsonFields]

 val myList : List[MyFields] = myJsonFields.fields

 val mySchemaStructType = DataType.fromJson(schema_json.head).asInstanceOf[StructType]

 println(mySchemaStructType)

 val myDfWithCustomSchema = _sqlContext.read.format("com.databricks.spark.csv").option("header","true").option("delimiter",";").option("nullValue","").option("mode","FAILFAST").schema(mySchemaStructType).load(fileToReview)

 val myArray = myDfWithCustomSchema.collect

 var contLinesProcessed = 1

 var contElementosJson = 0

 var isCorrect : Boolean = true

 myArray.foreach{elem =>
 for (myElem <- myList) {
 val actualType = myElem.`type`
 val actualName = myElem.name
 val actualNullable = myElem.nullable
 println("Processing line: " + elem)
 println("name: " + actualName + " type: " + actualType + " nullable?: " + actualNullable + " contElementosJson: " + contElementosJson /*+ " myList.size: " + myList.size*/)
 
 if (contElementosJson == myList.size){
 println("Resetting internal counter...")
 contElementosJson = 0
 }

 if (actualType == "string"){
 if (!actualNullable){
 val theField = elem.getString(contElementosJson)
 println(theField + " should be a string...")
 val validatingField :Boolean = theField.isInstanceOf[String]
 println("validatingField " + theField + " correct? " + validatingField)
 isCorrect=validatingField
 }else
 println("This field " + actualName + " can be nullable...")
 contElementosJson += 1
 }else if (actualType == "integer"){
 if (!actualNullable){
 val theField = elem.get(contElementosJson)
 println(theField + " should be a integer...")
 val validatingField :Boolean = theField.isInstanceOf[Integer]
 println("validatingField " + theField + " correct? " + validatingField)
 isCorrect=validatingField
 }else
 println("This field " + actualName + " can be nullable...")
 contElementosJson += 1
 }else if (actualType.startsWith("decimal")){
 if (!actualNullable){
 println("Detectado numero decimal...")
 val theField = elem.get(contElementosJson)
 println("Trying to validate as BigDecimal, theField is : " + theField + " string?: " + theField.isInstanceOf[String] + " java.math.BigDecimal?: " + theField.isInstanceOf[java.math.BigDecimal])
 val validatingField :Boolean = theField.isInstanceOf[java.math.BigDecimal]
 println("validatingField " + theField + " correct? " + validatingField)
 isCorrect=validatingField
 }else
 println("This field " + actualName + " can be nullable...")
 contElementosJson += 1
 }//ultimo else if (actualType == "decimal(25,5)")
 }//for
 contLinesProcessed+=1
 }//foreach))

 println("Processed " + contLinesProcessed + " lines from " + fileToReview + " . is the file OK? " + isCorrect)
 isCorrect
}

The content of the csv file and the json schema file.

Column0;Column1;Column2;Column3
C0.GBMRISK.1000000693750;CDS_SP-I-SOVX_WE_S4;;I-SOVX-WE4
C0.GBMRISK.1000000693751;CDS_SP-I-SOVX_WE_S5;;I-SOVX-WE5
C0.GBMRISK.1000000693726;CDS_SP-I-S7TRAC-HIV;;
...

{"type" : "struct","fields" : [ {"name" : "column0","type" : "string","nullable" : false},{"name":"column1", "type":"string", "nullable":false},{"name":"column2", "type":"string", "nullable":false}, {"name":"column3", "type":"string", "nullable":false}]}

I have used Json4S in order to process the json file and DataFrames in order to process the csv file. These are the dependencies:

<properties>
    <!-- Generic properties -->
    <java.version>1.7</java.version>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <!--  Dependency versions -->
    <maven.compiler.source>1.7</maven.compiler.source>
    <maven.compiler.target>1.7</maven.compiler.target>
    <scala.version>2.10.4</scala.version>
    <junit.version>4.11</junit.version>
    <slf4j.version>1.7.12</slf4j.version>
    <spark.version>1.5.0-cdh5.5.2</spark.version>
    <databricks.version>1.5.0</databricks.version>
    <json4s-native.version>3.5.0</json4s-native.version>
</properties>

<dependency>
 <groupId>org.json4s</groupId>
 <artifactId>json4s-native_2.10</artifactId>
 <version>${json4s-native.version}</version>
</dependency>
<dependency>
 <groupId>junit</groupId>
 <artifactId>junit</artifactId>
 <version>${junit.version}</version>
 <scope>test</scope>
</dependency>
<dependency>
 <groupId>org.apache.spark</groupId>
 <artifactId>spark-sql_2.10</artifactId>
 <version>${spark.version}</version>
 <scope>provided</scope>
</dependency>
<dependency>
 <groupId>com.databricks</groupId>
 <artifactId>spark-csv_2.10</artifactId>
 <version>${databricks.version}</version>
</dependency>

Have fun in the process!

Alonso

 

 

 

 

 

Responder

Introduce tus datos o haz clic en un icono para iniciar sesión:

Logo de WordPress.com

Estás comentando usando tu cuenta de WordPress.com. Cerrar sesión / Cambiar )

Imagen de Twitter

Estás comentando usando tu cuenta de Twitter. Cerrar sesión / Cambiar )

Foto de Facebook

Estás comentando usando tu cuenta de Facebook. Cerrar sesión / Cambiar )

Google+ photo

Estás comentando usando tu cuenta de Google+. Cerrar sesión / Cambiar )

Conectando a %s