Imaginemos que tenemos que cargar datos residiendo en un cluster spark y los tenemos que guardar en una base de datos MariaDB, esta publicación me permite recordar que tuve que hacer para poder lanzar en una spark-shell los comandos necesarios.

Primero

Necesito el driver jdbc de MariaDB en el classpath de spark. En mi caso, lo tengo en:

    /Users/aironman/.m2/repository/org/mariadb/jdbc/mariadb-java-client/2.4.4

Segundo, necesito tener levantado el servidor:

    # start MariaDB Server
    mysql.server start

    # or autostart it (optional if above dont apply)
    brew services start mariadb

    # 
    mariadb-secure-installation

    #log in as your user
    mysql

    #Or log in as root 
    mysql -u root -p

    #password is set as root

Tengo una base de datos llamada commands, que contiene una tabla user_data:

    MariaDB [commands]> show databases;
    +--------------------+
    | Database           |
    +--------------------+
    | arqu_local         |
    | commands           |
    | information_schema |
    | javistepa_banking  |
    | mysql              |
    | performance_schema |
    | queries            |
    | sys                |
    +--------------------+
    8 rows in set (0.003 sec)

    MariaDB [commands]> desc user_data;
    +---------------+---------+------+-----+---------+-------+
    | Field         | Type    | Null | Key | Default | Extra |
    +---------------+---------+------+-----+---------+-------+
    | ID_USER_DATA  | int(11) | YES  |     | NULL    |       |
    | NAME          | text    | YES  |     | NULL    |       |
    | DATE_REGISTER | text    | YES  |     | NULL    |       |
    +---------------+---------+------+-----+---------+-------+
    3 rows in set (0.002 sec)

Tercero

Arranco la spark-shell. –packages descargará el jar de maven central. –driver-class-path es necesario para que no de una excepción a la hora de cargar una tabla en un Dataframe.

spark-shell --driver-class-path org.mariadb.jdbc:mariadb-java-client:2.4.4 --packages org.mariadb.jdbc:mariadb-java-client:2.4.4

En mi caso, estoy trabajando con Spark 3.0.1 y MariaDB 10.5.8-MariaDB Homebrew

Si todo ha ido bien, comprobamos que tenemos el jar cargado:

    Class.forName("org.mariadb.jdbc.Driver")

    import java.io.InputStream;
    import java.util.Properties;
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.SparkSession;
    import static org.apache.spark.sql.functions.col;

    val jdbcHostname = "localhost"
    val jdbcPort = 3306
    val jdbcDatabase = "commands"
    val jdbcUsername = "root"
    val jdbcPassword = "root"
    val source_table = "user_data"
    // Create the JDBC URL without passing in the user and password parameters.
    val jdbcUrl = s"jdbc:mysql://${jdbcHostname}:${jdbcPort}/${jdbcDatabase}"

    val connectionProperties = new Properties()
    connectionProperties.put("user", jdbcUsername)
    connectionProperties.put("password", jdbcPassword)

    val userDataDF = spark.read.option("driver","org.mariadb.jdbc.Driver").jdbc(jdbcUrl, source_table, connectionProperties)

    userDataDF.printSchema

    userDataDF.schema

    case class user_data(ID_USER_DATA: Integer, NAME: String, DATE_REGISTER: String)

    val dfUsers = List(user_data(1,"ALONSO", "26-11-2020"),user_data(2,"PAPA", "26-11-2020")).toDF()

    dfUsers.write.format("jdbc")
                    .mode("overwrite")
                    .option("driver","org.mariadb.jdbc.Driver")
                    .option("url", jdbcUrl)
                    .option("truncate","false")
                    .option("dbtable", source_table)
                    .option("user", jdbcUsername)
                    .option("password", jdbcPassword)
                    //.saveAsTable("USERS")
                    .save()
    // Bien usas saveAsTable para guardar el resultado en una tabla de spark que he llamado USERS, bien usas save() para guardar los datos en la tabla de MariaDB.
    // Las dos no puedes a la vez.
    // Después de ejecutar la anterior sentencia, en la base de datos podemos ver estos datos.
    MariaDB [commands]> select * from user_data;
    +--------------+--------+---------------+
    | ID_USER_DATA | NAME   | DATE_REGISTER |
    +--------------+--------+---------------+
    |            2 | PAPA   | 26-11-2020    |
    |            1 | ALONSO | 26-11-2020    |
    +--------------+--------+---------------+
    2 rows in set (0.000 sec)


    // Guardamos los datos en una tabla de Spark
    dfUsers.write.mode("overwrite").saveAsTable("USER_DATA")
    val recover_dfUsers = spark.read.table("USER_DATA")
    recover_dfUsers.show()
    +------------+------+-------------+
    |ID_USER_DATA|  NAME|DATE_REGISTER|
    +------------+------+-------------+
    |           1|ALONSO|   26-11-2020|
    |           2|  PAPA|   26-11-2020|
    +------------+------+-------------+

https://docs.databricks.com/data/data-sources/sql-databases.html#step-1-check-that-the-jdbc-driver-is-available

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s