About how to work with spark´s DataFrames without having any parquet file.

Recently at work, I had to write a unit test file in order to get run within a Jenkins environment. The test file have to work with DataFrames, creating it starting from a parquet file. I do not have any access to the Jenkins machine in order to put some parquet files or I do not want to put the parquet file within src/test/resources, it could be a huge file, so I decide to use an alternate. Why not creating the data frame with fake data instead of having a parquet file?

So, first, create the schema, as you should know, a schema is a StructType object with StructField within it, like I show you in the first sentences. Then create a RDD[String] with fake data. Finally, you can map over the RDD to create the DataFrame.

Have fun in the process, I hope that this hack could help you.

Alonso

@Test
def testAllCreatingDF () : Unit = {

  val schema = StructType(
    StructField("fileName", StringType, true) ::
      StructField("hidden_field1", StringType, true) ::
      StructField("hidden_field2", StringType, true)  ::
      StructField("hidden_field3", StringType, true)::
      StructField("hidden_field4", StringType, true) ::
      StructField("hidden_", StringType, true) ::
      StructField("data_nature", StringType, true) ::
      StructField("description", StringType, true) ::
      StructField("security_type", StringType, true) ::
      StructField("index_type", StringType, true) ::
      StructField("issuer_classic", StringType, true)::
      StructField("currency", StringType, true)::
      StructField("currency2", StringType, true)::
      StructField("nominal", StringType, true)::
      StructField("terms_array", StringType, true)::
      StructField("report_date", StringType, true)::
      StructField("strike", StringType, true)::
      StructField("price_1", StringType, true)::
      StructField("price_2", StringType, true)::
      StructField("price_spread", StringType, true)::
      StructField("volatility", StringType, true)::
      StructField("volatility_spread", StringType, true)::
      StructField("recovery_rate", StringType, true) ::
      Nil
  )
//fake data
  val rdd: RDD[String] = ConversionTest.sc.parallelize(
    Seq("f_SARM_4771_AC_Tipos_Cambio_MAD_20160930.txt,FOREIGN EXCHANGE,FOREIGN EXCHANGE,," +
      "SPOT,FOREIGN EXCHANGE,PRICES,,,,,AUD,CAD,null,,2016-09-30 00:00:00.0,null,1.0030800000,null,null,null,null,null") ++
    Seq("f_SARM_4771_AC_Tipos_Cambio_MAD_20160930.txt,FOREIGN EXCHANGE,FOREIGN EXCHANGE,,SPOT,FOREIGN EXCHANGE,PRICES" +
        ",,,,,BRL,PLN,null,,2016-09-30 00:00:00.0,null,1.0032100000,null,null,null,null,null")
  )
//Creating DataFrame! ConversionTest is a companion object with sqlContext and sc.
  val parquetCurrencies: org.apache.spark.sql.DataFrame = ConversionTest.sqlContext.createDataFrame(rdd.map(s => Row.fromSeq(s.split(","))), schema)
//showing data
  parquetCurrencies.show(false)
//preparing the sentence. These are known fields within DataFrame 
  val sourceCurrency = "currency"
  val targetCurrency = "currency2"
  val price = "price_1"
  val reportDate = "report_date"
  val precision = 25
  val scale = 5
  val roundType="HALF_UP"

  parquetCurrencies.registerTempTable("currencies")
  val sourceCurrencyFromParquet = parquetCurrencies(sourceCurrency)
  val targetCurrencyFromParquet = parquetCurrencies(targetCurrency)
  val priceFromParquet = parquetCurrencies(price)
  val reportDateFromParquet = parquetCurrencies(reportDate)
  var mapActualCurrencies  = Map[(String, String), java.math.BigDecimal]()

//the date as entry parameter. I want to filter data with this date
  val dateAsString: String = "2016-09-30 00:00:00.0"

  val actualSql = "SELECT " + sourceCurrencyFromParquet + " as sourceCurrency," + targetCurrencyFromParquet + " as targetCurrency," + reportDateFromParquet + " as date," + priceFromParquet +" as actualCurrency FROM currencies as c where cast(c." + reportDateFromParquet + " as String) <= '" + dateAsString + "'"
  println("actualSql is " + actualSql)
  val allValidCurrencies = ConversionTest.sqlContext.sql(actualSql)
  allValidCurrencies.show(false)
  val listCurrencies = allValidCurrencies.collect()

//iterating over map to create a hashMap
  listCurrencies.map{ word =>
    val myTupleKey = (word(0).toString, word(1).toString);
    val myValue= BigDecimalFunctions. convertStringToJavaBigDecimal(word(3).toString,scale,roundType);
    mapActualCurrencies  += (myTupleKey -> myValue)}

//testing data
  var sourceC : String= "AUD"
  var sourceMney: java.math.BigDecimal = new java.math.BigDecimal(1000)
  var targetC : String = "CAD"

  var myKey = (sourceC,targetC)

  var myTargetCurrency = mapActualCurrencies.getOrElse(myKey,java.math.BigDecimal.ZERO)

  var convertedMoney =  BigDecimalFunctions.multiplyJavaBigDecimal(precision,scale,roundType,sourceMney,myTargetCurrency)

  println("Convirtiendo " + sourceMney + " " + sourceC + " en " + sourceMney + " " +targetC  +  ". " + convertedMoney.toString())
  Assert.assertNotNull("No puede ser null",convertedMoney)

  sourceC = "BRL"
  sourceMney = new java.math.BigDecimal(2000)
  targetC  = "PLN"

  myKey = (sourceC,targetC)

  myTargetCurrency = mapActualCurrencies.getOrElse(myKey,java.math.BigDecimal.ZERO)

  convertedMoney =  BigDecimalFunctions.multiplyJavaBigDecimal(precision,scale,roundType,sourceMney,myTargetCurrency)

  println("Convirtiendo " + sourceMney + " " + sourceC + " en " + sourceMney + " " +targetC  +  ". " + convertedMoney.toString())
  Assert.assertNotNull("No puede ser null",convertedMoney)


}

Responder

Introduce tus datos o haz clic en un icono para iniciar sesión:

Logo de WordPress.com

Estás comentando usando tu cuenta de WordPress.com. Cerrar sesión / Cambiar )

Imagen de Twitter

Estás comentando usando tu cuenta de Twitter. Cerrar sesión / Cambiar )

Foto de Facebook

Estás comentando usando tu cuenta de Facebook. Cerrar sesión / Cambiar )

Google+ photo

Estás comentando usando tu cuenta de Google+. Cerrar sesión / Cambiar )

Conectando a %s