About how to work with spark´s DataFrames without having any parquet file.

Recently at work, I had to write a unit test file in order to get run within a Jenkins environment. The test file have to work with DataFrames, creating it starting from a parquet file. I do not have any access to the Jenkins machine in order to put some parquet files or I do not want to put the parquet file within src/test/resources, it could be a huge file, so I decide to use an alternate. Why not creating the data frame with fake data instead of having a parquet file?

So, first, create the schema, as you should know, a schema is a StructType object with a StructField within it, like I show you in the first sentences. Then create a RDD[String] with fake data. Finally, you can map over the RDD to create the DataFrame.

Have fun in the process, I hope that this hack could help you.

Alonso

@Test
def testAllCreatingDF () : Unit = {

  val schema = StructType(
    StructField("aField", StringType, true) ::
      StructField("aField1", StringType, true) ::
      StructField("aField2", StringType, true)  ::
      StructField("field3", StringType, true)::
      StructField("field4", StringType, true) ::
      StructField("anotherField", StringType, true) ::
      StructField("aField5", StringType, true) ::
      StructField("aField5", StringType, true) ::
      StructField("aField6", StringType, true) ::
      StructField("aField7", StringType, true) ::
      StructField("aField8", StringType, true)::
      StructField("aField9", StringType, true)::
      StructField("XXXX", StringType, true)::
      ::
      Nil
  )
//ATTENTION! the schema is not created with relevant data 
//fake data Create the DataFrame according with the StructType schema object.

  val rdd: RDD[String] = ConversionTest.sc.parallelize(
    Seq("PUT DATA HERE" +
      "MORE DATA") ++
    Seq("AND MORE")
  )
//Creating DataFrame! ConversionTest is a companion object with sqlContext and sc.
  val parquetCurrencies: org.apache.spark.sql.DataFrame = ConversionTest.sqlContext.createDataFrame(rdd.map(s => Row.fromSeq(s.split(","))), schema)
//showing data
  parquetCurrencies.show(false)
//preparing the sentence. These are known fields within DataFrame 
  val sourceCurrency = "currency"
  val targetCurrency = "currency2"
  val price = "price_1"
  val reportDate = "report_date"
  val precision = 25
  val scale = 5
  val roundType="HALF_UP"

  parquetCurrencies.registerTempTable("currencies")
  val sourceCurrencyFromParquet = parquetCurrencies(sourceCurrency)
  val targetCurrencyFromParquet = parquetCurrencies(targetCurrency)
  val priceFromParquet = parquetCurrencies(price)
  val reportDateFromParquet = parquetCurrencies(reportDate)
  var mapActualCurrencies  = Map[(String, String), java.math.BigDecimal]()

//the date as entry parameter. I want to filter data with this date
  val dateAsString: String = "2016-09-30 00:00:00.0"
//ATTENTION! Create your sql...
  val actualSql = "SELECT " 
  println("actualSql is " + actualSql)
  val allValidCurrencies = ConversionTest.sqlContext.sql(actualSql)
  allValidCurrencies.show(false)
  val listCurrencies = allValidCurrencies.collect()

//iterating over map to create a hashMap
  listCurrencies.map{ word =>
    val myTupleKey = (word(0).toString, word(1).toString);
    val myValue= BigDecimalFunctions. convertStringToJavaBigDecimal(word(3).toString,scale,roundType);
    mapActualCurrencies  += (myTupleKey -> myValue)}

//testing data
  var sourceC : String= "AUD"
  var sourceMney: java.math.BigDecimal = new java.math.BigDecimal(1000)
  var targetC : String = "CAD"

  var myKey = (sourceC,targetC)

  var myTargetCurrency = mapActualCurrencies.getOrElse(myKey,java.math.BigDecimal.ZERO)

  var convertedMoney =  BigDecimalFunctions.multiplyJavaBigDecimal(precision,scale,roundType,sourceMney,myTargetCurrency)

  println("Convirtiendo " + sourceMney + " " + sourceC + " en " + sourceMney + " " +targetC  +  ". " + convertedMoney.toString())
  Assert.assertNotNull("No puede ser null",convertedMoney)

  sourceC = "BRL"
  sourceMney = new java.math.BigDecimal(2000)
  targetC  = "PLN"

  myKey = (sourceC,targetC)

  myTargetCurrency = mapActualCurrencies.getOrElse(myKey,java.math.BigDecimal.ZERO)

  convertedMoney =  BigDecimalFunctions.multiplyJavaBigDecimal(precision,scale,roundType,sourceMney,myTargetCurrency)

  println("Convirtiendo " + sourceMney + " " + sourceC + " en " + sourceMney + " " +targetC  +  ". " + convertedMoney.toString())
  Assert.assertNotNull("No puede ser null",convertedMoney)
}

 

Anuncios

Responder

Introduce tus datos o haz clic en un icono para iniciar sesión:

Logo de WordPress.com

Estás comentando usando tu cuenta de WordPress.com. Cerrar sesión / Cambiar )

Imagen de Twitter

Estás comentando usando tu cuenta de Twitter. Cerrar sesión / Cambiar )

Foto de Facebook

Estás comentando usando tu cuenta de Facebook. Cerrar sesión / Cambiar )

Google+ photo

Estás comentando usando tu cuenta de Google+. Cerrar sesión / Cambiar )

Conectando a %s