About how to create a parquet file from a csv file using spark-csv

Hi, this is a draft about how to create a parquet file from a csv file using spark-csv.

Please, read the code before paste it in a spark-shell. The code reads a csv file, create a DataFrame from it, add two new fields to the Dataframe and finally create the parquet file from the resultant data frame.

The code:

a unitary test with imports and dependencies

pom.xml

<dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>${junit.version}</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.10</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>com.databricks</groupId>
            <artifactId>spark-csv_2.10</artifactId>
            <version>${databricks.version}</version>
        </dependency>
       
        <!-- https://mvnrepository.com/artifact/log4j/log4j -->
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>
    </dependencies>

import java.sql.Timestamp
import java.util.Calendar

import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
import org.apache.spark.{SparkConf, SparkContext}
import org.junit.{Assert, Test, _}
import com.keedio.mra._
import java.text.SimpleDateFormat

import scala._
import scala.collection.convert.WrapAsJava.asJavaCollection
import scala.collection.mutable
@Test
def testAddNewFieldsToDataFrame () : Unit = {

import org.apache.spark.sql.functions._

implicit val sqlContext = CSVtoParquetTest.sql

val pathWrite = “src/test/resources/writefileWithNewField2.parquet”

val df: DataFrame = CSVtoParquet.loadCSV(sqlContext , pathCSVWithHeader, “;”, “true”)

df.show()

val nameColumn = “generated-parquet-file”
val nameCreationDateField = “generated_at”

val newDFWithName = df.withColumn(nameColumn ,lit(“my-generated-parquet-file”))
newDFWithName.show()

val now = Calendar.getInstance().getTime()
val dateFormat = new SimpleDateFormat(“yyyy-MM-dd HH:mm:ss”)
//esto es un String, tienes que crear un java.sql.Timestamp partiendo de esta cadena.
val reportDate = dateFormat.format(now)

val generatedAt = lit(new Timestamp(dateFormat.parse(reportDate).getTime))
println(“generatedAt is ” + generatedAt )
val newDFWithNameAndDate = newDFWithName.withColumn(nameCreationDateField ,generatedAt )

val schema : StructType = newDFWithNameAndDate.schema
println(“schema is ” + schema)
val listFromSchema : List[StructField] = schema.toList

val aSeq = mutable.MutableList[org.apache.spark.sql.Column]()

val elementsType : StringBuilder = new StringBuilder()
val numElements = listFromSchema.size
var cont = 1

for (elementFromSchema <- listFromSchema.reverse){
println(“elementFromSchema is ” + elementFromSchema + “. name: ” + elementFromSchema.name + “. dataType: ” + elementFromSchema.dataType)
val aColumn : org.apache.spark.sql.Column = new org.apache.spark.sql.Column(elementFromSchema.name)
println(“aColumn is ” + aColumn)
aSeq += aColumn
val elementDataType = elementFromSchema.dataType
println(elementDataType)
val myElement = elementDataType match {
case TimestampType => “timestamp”
case StringType => “string”
case IntegerType => “integer”
}
elementsType.append(myElement)
if (cont<numElements) {
elementsType.append(“;”)
cont+=1
}
}//for
println(“elementsType is ” + elementsType.toString())
val df1 = newDFWithNameAndDate.select(aSeq:_*)

println(“Mostrando esquema de df1…”)
df1.printSchema()
df1.schema
df1.show()

val df2 = CSVtoParquet.castingDataFrame(df1, elementsType.toString(), CSVtoParquet.mapString2DataType)

println(“Mostrando esquema de df2…”)
df2.printSchema()
df2.show()

val saveMode = SaveMode.Overwrite
//writeDataFrame2Parquet(df: DataFrame, pathParquet: String, saveMode: SaveMode,header: String,delimiter:String):
CSVtoParquet.writeDataFrame2Parquet(df1 , pathWrite, saveMode,”true”,”,”)

//ahora necesito cargar el DataFrame o cargar el fichero parquet recien generado para comprobar que efectivamente hay dos nuevos campos.

val parquetFileDF = sqlContext.read.parquet(pathWrite)

parquetFileDF.registerTempTable(“FINAL”)

//busco uno de los campos recien creados
val distinctCountGeneratedDates = sqlContext.sql(“SELECT distinct count(generated_at) FROM FINAL”).collect()

Assert.assertNotNull(“cannot be null!”, distinctCountGeneratedDates )
println(“distinctCountGeneratedDates.length is ” + distinctCountGeneratedDates.length)
Assert.assertTrue(distinctCountGeneratedDates.length > 0)

val generatedDates = sqlContext.sql(“SELECT * FROM FINAL”).collect()
Assert.assertNotNull(“cannot be null”,generatedDates)
Assert.assertTrue(generatedDates.length > 0)
parquetFileDF.printSchema()
println(parquetFileDF.count())
parquetFileDF.show(1,false)
generatedDates.foreach(println)

println(“Done!”)
//
}

def writeDataFrame2Parquet(df: DataFrame, pathParquet: String, saveMode: SaveMode,header: String,delimiter:String): Unit = {

df.write
.format(“com.databricks.spark.csv”)
.option(“header”, header)
.option(“delimiter”,delimiter)
.option(“nullValue”,””)
.mode(saveMode)
//by default, gzip. Another values are uncompressed, snappy, gzip, lzo. This can be changed only at sqlContext Level.
//Configuration of Parquet can be done using the setConf method on SQLContext or by running SET key=value commands using SQL.
//.option(“codec”,”spark.sql.parquet.compression.codec” + compression_codec)
.parquet(pathParquet)
}

def castingDataFrame(df: DataFrame, typeString: String, mapString2DataType: Map[String, DataType]): DataFrame = {

  val arrayDataTypes: Array[DataType] = typeString split ";" map { name => mapString2DataType.getOrElse(name, StringType) }

  val xprs = df.columns zip arrayDataTypes map { pairType => df.col(pairType._1) cast pairType._2 as pairType._1 }

  df.select(xprs: _*)

}

val precision = 25
val scale = 5

val mapString2DataType: Map[String, DataType] = Map(
  "string" -> StringType,
  "boolean" -> BooleanType,
  "byte" -> ByteType,
  "short" -> ShortType,
  "int" -> IntegerType,
  "long" -> LongType,
  "float" -> FloatType,
  "double" -> DoubleType,
  "decimal" -> DecimalType(precision, scale),
  "timestamp" -> TimestampType
)

def loadCSV(sqlContext : SQLContext, pathCSV: String, separator: String, haveSchema: String): DataFrame = {

  sqlContext.read
    .format("com.databricks.spark.csv")
    .option("header", haveSchema) // Use first line of all files as header
    .option("delimiter", separator)
    .option("nullValue","")
    .option("mode","FAILFAST")
    .option("inferSchema", "true") // Automatically infer data types
    .load(pathCSV)

}

 

Responder

Introduce tus datos o haz clic en un icono para iniciar sesión:

Logo de WordPress.com

Estás comentando usando tu cuenta de WordPress.com. Cerrar sesión / Cambiar )

Imagen de Twitter

Estás comentando usando tu cuenta de Twitter. Cerrar sesión / Cambiar )

Foto de Facebook

Estás comentando usando tu cuenta de Facebook. Cerrar sesión / Cambiar )

Google+ photo

Estás comentando usando tu cuenta de Google+. Cerrar sesión / Cambiar )

Conectando a %s