Introduction à Spark
Spark est un framework de calcul distribué très en vogue actuellement. Il dispose de nombreux avantages parmi lesquels:
- La simplicité de ses structures de données (RDD, DataFrame, DataSets)
- Sa puissance du fait que ces structures de données soient chargées, distribuées et sécurisées en mémoire.
- Sa facilité d’intégration aux plateformes de cluster tels qu’Hadoop (et YARN), Mesos mais également sur les plateformes Kubernetes lui permettant ainsi de mettre le pied dans des environnements conteneurisés. Il existe également la possibilité de l’installer sur un environnement simple (standalone) sans cluster.
- Sa richesse d’API. Ainsi Spark est accessible à partir de nombreux langages tels que Java, Python, Scala, et le langage de traitements statistiques R.
- Ses modules et librairies notamment :
- la couche d’abstraction Spark SQL
- la librairie de Machine Learning (Spark MLLib) permettant d’entrainer un modèle d’apprentissage en quelques lignes de code tout en bénéficiant de la puissance de calcul du cluster utilisé.
- la librarie de Streaming (Spark Streaming) permettant de s’interfacer très simplement avec les systèmes de flux en temps réels tels que Kafka notamment.
- Sa simplicité d’écriture. Il est en effet très simple d’écrire un programme tout en limitant le nombre de lignes de code produit (Cela est encore plus vérifiable pour Scala et Python).
Un de ses nombreux avantage réside également dans le fait qu’il peut s’interfacer, en entrée, comme en sortie, avec de nombreux formats et sources de données (Fichiers CSV, JSON, Parquet, ORC, source de données JDBC).
Nous vous proposons de parcourir rapidement l’ensemble de ces possibilités.
Création d’une dataframe à partir de sources de données
Lecture à partir d’une source de données relationnelle
Dans cette partie, nous allons voir comment il est très simple de charger une structure de type dataframe à partir d’une source de données JDBC (Oracle). On pourra alors travailler ce dataframe pour filtrer des données, créer un nouveau dataframe pour en extraire de l’information et le transformer.
Ci-dessous un exemple en écrit en scala permettant de réaliser une telle opération.
1 2 3 4 5 6 7 8 9 |
/* creation des proprietes d'acces à la source JDBC */ import java.util.Properties val connProps = new Properties() connProps.put("user", "sh") connProps.put("password", "sh") connProps.put("isolationLevel","READ_COMMITTED"); /* Creation de la dataframe contenant les données de la table PRODUCTS */ val df = spark.read.jdbc("jdbc:oracle:thin:@//192.168.99.3:1521/orcl", "products", connProps); |
Une fois la dataframe créée, il devient très simple d’exécuter une requête SQL sur celle-ci afin d’en sortir les informations d’intérêt. (Dans l’exemple ci-dessous, on créé une nouvelle dataframe à partir de la session spark).
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
df.createOrReplaceTempView("s_products"); val df2= spark.sql("select prod_id,prod_name,prod_min_price from s_products where PROD_MIN_PRICE>200"); df2.show +-------+--------------------+--------------+ |prod_id| prod_name|prod_min_price| +-------+--------------------+--------------+ | 13|5MP Telephoto Dig...| 899.99| | 14|17" LCD w/built-i...| 999.99| | 15| Envoy 256MB - 40GB| 999.99| | 16| Y Box| 299.99| | 17|Mini DV Camcorder...| 1099.99| | 18| Envoy Ambassador| 1299.99| | 20|Home Theatre Pack...| 599.99| | 21|18" Flat Panel Gr...| 899.99| | 29|8.3 Minitower Spe...| 499.99| +-------+--------------------+--------------+ |
Lecture à partir d’une source de données de type CSV.
La lecture d’un fichier CSV est également relativement simple et aisée. Il suffit encore d’utiliser la fonction read à partir de la session spark.
Dans l’exemple ci-dessous. Nous allons réaliser la jointure entre deux dataframes. La première est issue d’une source de données, relationnelle et la seconde de la lecture d’un fichier CSV:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
import java.util.Properties val connProps = new Properties() connProps.put("user", "sh") connProps.put("password", "sh") connProps.put("isolationLevel","READ_COMMITTED"); val df = spark.read.jdbc("jdbc:oracle:thin:@//192.168.99.3:1521/orcl", "products", connProps) df.createOrReplaceTempView("s_products"); // Creation de la dataFrame filtrée à partir d'une source de données JDBC val df2= spark.sql("select prod_id,prod_name,prod_min_price from s_products where PROD_MIN_PRICE>200"); df2.createOrReplaceTempView("s_products"); // Create de la dataFrame à partir d'une source CSV (délimitée ici par des "|") // suivi de sa modification pour en extraire les colonnes d'intérêt que l'on caste dans la foulée val v=spark.read.option("delimiter","|").csv("/home/spark/sls.csv") val v_t=spark.sql("select cast(_c0 as decimal) prod_id, cast(_c5 as decimal) QUANTITY_SOLD, cast(_c6 as decimal) AMOUNT_SOLD from v_temp") v_t.createOrReplaceTempView("s_sales"); // Création de la dataframe avec la jointure val r=spark.sql("select prod_name,avg(AMOUNT_SOLD) from s_sales s, s_products p where p.prod_id=s.prod_id group by prod_name"); r.show() +--------------------+----------------+ | prod_name|avg(AMOUNT_SOLD)| +--------------------+----------------+ |Home Theatre Pack...| 613.7803| |5MP Telephoto Dig...| 1051.7363| |Mini DV Camcorder...| 1349.8631| |8.3 Minitower Spe...| 534.2958| |18" Flat Panel Gr...| 1056.3931| | Y Box| 300.4422| |17" LCD w/built-i...| 1196.2155| | Envoy 256MB - 40GB| 977.4620| | Envoy Ambassador| 1565.2138| +--------------------+----------------+ |
Persistance des données en Base de données relationnelle
La persistance d’une dataFrame dans une base de données relationnelle se fait grâce à la fonction write de la session Spark (Object SparkSession).
La encore, nous utilisons SCALA qui est d’une simplificité déconcertante pour faire persister une dataFrame en base relationnelle. (A noter que dans le cas présent, il est recommandé de positionner le niveau d’isolation utilisé par votre SGBDr de destination (ici READ_COMMITTED pour Oracle).
Ensuite, deux cas se présentent, votre table de destination existe ou pas. Par défaut, spark créé la table de destination (et génère une ORA-00942 si la table existe). Pour éviter cela, il faut utiliser le mode APPEND.
Persistance par défaut
1 2 3 4 5 |
import java.util.Properties val connProps = new Properties() connProps.put("user", "sh") connProps.put("password", "sh") connProps.put("isolationLevel","READ_COMMITTED"); |
Il ne reste ensuite qu’à faire persister la dataframe
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
df.count() res7: Long = 72 df.printSchema root |-- PROD_ID: decimal(6,0) (nullable = true) |-- PROD_NAME: string (nullable = true) |-- PROD_DESC: string (nullable = true) |-- PROD_SUBCATEGORY: string (nullable = true) |-- PROD_SUBCATEGORY_ID: decimal(38,10) (nullable = true) |-- PROD_SUBCATEGORY_DESC: string (nullable = true) |-- PROD_CATEGORY: string (nullable = true) |-- PROD_CATEGORY_ID: decimal(38,10) (nullable = true) |-- PROD_CATEGORY_DESC: string (nullable = true) |-- PROD_WEIGHT_CLASS: decimal(3,0) (nullable = true) |-- PROD_UNIT_OF_MEASURE: string (nullable = true) |-- PROD_PACK_SIZE: string (nullable = true) |-- SUPPLIER_ID: decimal(6,0) (nullable = true) |-- PROD_STATUS: string (nullable = true) |-- PROD_LIST_PRICE: decimal(8,2) (nullable = true) |-- PROD_MIN_PRICE: decimal(8,2) (nullable = true) |-- PROD_TOTAL: string (nullable = true) |-- PROD_TOTAL_ID: decimal(38,10) (nullable = true) |-- PROD_SRC_ID: decimal(38,10) (nullable = true) |-- PROD_EFF_FROM: timestamp (nullable = true) |-- PROD_EFF_TO: timestamp (nullable = true) |-- PROD_VALID: string (nullable = true) df.write.jdbc("jdbc:oracle:thin:@//192.168.99.3:1521/orcl", "prod2", connProps); |
Une fois la dataframe créé en base (sous forme de table), il ne reste plus qu’à contrôler la structure de table et les données :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
SQL> desc prodl2 Name Null? Type ----------------------------------------- -------- ---------------------------- PROD_ID NUMBER(6) PROD_NAME VARCHAR2(255) PROD_DESC VARCHAR2(255) PROD_SUBCATEGORY VARCHAR2(255) PROD_SUBCATEGORY_ID NUMBER(38,10) PROD_SUBCATEGORY_DESC VARCHAR2(255) PROD_CATEGORY VARCHAR2(255) PROD_CATEGORY_ID NUMBER(38,10) PROD_CATEGORY_DESC VARCHAR2(255) PROD_WEIGHT_CLASS NUMBER(3) PROD_UNIT_OF_MEASURE VARCHAR2(255) PROD_PACK_SIZE VARCHAR2(255) SUPPLIER_ID NUMBER(6) PROD_STATUS VARCHAR2(255) PROD_LIST_PRICE NUMBER(8,2) PROD_MIN_PRICE NUMBER(8,2) PROD_TOTAL VARCHAR2(255) PROD_TOTAL_ID NUMBER(38,10) PROD_SRC_ID NUMBER(38,10) PROD_EFF_FROM TIMESTAMP(6) PROD_EFF_TO TIMESTAMP(6) PROD_VALID VARCHAR2(255) SQL> select count(*) from prod2; COUNT(*) ---------- 72 |
Persistance en mode APPEND.
La méthode est identique aux code snippetx précédents. La différence réside dans l’ajout du mode “APPEND” dans la commande de persistance.
1 |
df.write.mode("Append").jdbc("jdbc:oracle:thin:@//192.168.99.3:1521/orcl", "prod2", connProps); |
Persistance dans un fichier
La persistance peut également être réalisée au sein d’un fichier. Ce fichier peut être dans un format supporté par spark 2.0 et supérieurs : CSV, Parquet, ORC, JSON, ou texte.
Si on reprend un des exemples précédent dans lequel on réalise une jointure entre deux sources de données. La première provenant d’une base relationnelle, et l’autre d’un fichier texte.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
import java.util.Properties val connProps = new Properties() connProps.put("user", "sh") connProps.put("password", "sh") connProps.put("isolationLevel","READ_COMMITTED"); val df = spark.read.jdbc("jdbc:oracle:thin:@//192.168.99.3:1521/orcl", "products", connProps) df.createOrReplaceTempView("s_products"); val df2= spark.sql("select prod_id,prod_name,prod_min_price from s_products where PROD_MIN_PRICE>200"); df2.createOrReplaceTempView("s_products"); val v=spark.read.option("delimiter","|").csv("/home/spark/sls.csv") v.createOrReplaceTempView("v_temp"); val v_t=spark.sql("select cast(_c0 as decimal) prod_id, cast(_c5 as decimal) QUANTITY_SOLD, cast(_c6 as decimal) AMOUNT_SOLD from v_temp") v_t.createOrReplaceTempView("s_sales"); val r=spark.sql("select prod_name,avg(AMOUNT_SOLD) as AVG_AMOUNT_SOLD from s_sales s, s_products p where p.prod_id=s.prod_id group by prod_name"); // r.printSchema root |-- prod_name: string (nullable = true) |-- AVG_AMOUNT_SOLD: decimal(14,4) (nullable = true) // r.show() +--------------------+---------------+ | prod_name|AVG_AMOUNT_SOLD| +--------------------+---------------+ |Home Theatre Pack...| 613.7803| |5MP Telephoto Dig...| 1051.7363| |Mini DV Camcorder...| 1349.8631| |8.3 Minitower Spe...| 534.2958| |18" Flat Panel Gr...| 1056.3931| | Y Box| 300.4422| |17" LCD w/built-i...| 1196.2155| | Envoy 256MB - 40GB| 977.4620| | Envoy Ambassador| 1565.2138| +--------------------+---------------+ |
On peut alors faire persister le résultat dans le fichier souhaité, par exemple:
- CSV
1 2 3 4 5 |
// Ici il y aura un fichier par partition r.write.option("delimiter",";").csv("/home/spark/export_csv") // Si on souhaite obtenir un seul fichier, il faut utiliser la fonction coalesce (Attention à la performance dans le cas de gros fichiers et de fonctionnement sur un cluster) r.coalesce(1).write.option("delimiter",";").csv("/home/spark/export_csv") |
1 2 3 4 5 6 7 8 9 10 |
[spark@spark ~]$ head export_csv/part-00000-521419d4-9fbf-4256-987a-485bb01e72fc-c000.csv Home Theatre Package with DVD-Audio/Video Play;613.7803 5MP Telephoto Digital Camera;1051.7363 "Mini DV Camcorder with 3.5\" Swivel LCD";1349.8631 8.3 Minitower Speaker;534.2958 "18\" Flat Panel Graphics Monitor";1056.3931 Y Box;300.4422 "17\" LCD w/built-in HDTV Tuner";1196.2155 Envoy 256MB - 40GB;977.4620 Envoy Ambassador;1565.2138 |
- JSON
1 |
r.coalesce(1).write.json("/home/spark/export_json") |
1 2 3 4 5 6 7 8 9 10 |
[spark@spark ~]$ cat export_json/part-00000-8f33f458-fd31-4cec-8c83-6e65568c96b1-c000.json {"prod_name":"Home Theatre Package with DVD-Audio/Video Play","AVG_AMOUNT_SOLD":613.7803} {"prod_name":"5MP Telephoto Digital Camera","AVG_AMOUNT_SOLD":1051.7363} {"prod_name":"Mini DV Camcorder with 3.5\" Swivel LCD","AVG_AMOUNT_SOLD":1349.8631} {"prod_name":"8.3 Minitower Speaker","AVG_AMOUNT_SOLD":534.2958} {"prod_name":"18\" Flat Panel Graphics Monitor","AVG_AMOUNT_SOLD":1056.3931} {"prod_name":"Y Box","AVG_AMOUNT_SOLD":300.4422} {"prod_name":"17\" LCD w/built-in HDTV Tuner","AVG_AMOUNT_SOLD":1196.2155} {"prod_name":"Envoy 256MB - 40GB","AVG_AMOUNT_SOLD":977.4620} {"prod_name":"Envoy Ambassador","AVG_AMOUNT_SOLD":1565.2138} |
- PARQUET
1 |
r.write.option("compression","gzip").parquet("/home/spark/export_parquet") |
1 2 3 4 5 6 |
[spark@spark ~]$ ls export_parquet/ part-00000-283c9d0e-5fa5-4f82-adc3-c13022d38c13-c000.gz.parquet part-00082-283c9d0e-5fa5-4f82-adc3-c13022d38c13-c000.gz.parquet part-00013-283c9d0e-5fa5-4f82-adc3-c13022d38c13-c000.gz.parquet part-00132-283c9d0e-5fa5-4f82-adc3-c13022d38c13-c000.gz.parquet part-00014-283c9d0e-5fa5-4f82-adc3-c13022d38c13-c000.gz.parquet part-00147-283c9d0e-5fa5-4f82-adc3-c13022d38c13-c000.gz.parquet part-00072-283c9d0e-5fa5-4f82-adc3-c13022d38c13-c000.gz.parquet part-00182-283c9d0e-5fa5-4f82-adc3-c13022d38c13-c000.gz.parquet part-00078-283c9d0e-5fa5-4f82-adc3-c13022d38c13-c000.gz.parquet _SUCCESS |
Bien évidemment, toutes ces méthodes sont transposables aux langages Java, ou encore Python.
Vous avez des projets sous Spark, n’hésitez pas à nous contacter à l’adresse contact@premiseo.com ou aux numéros suivants: 06.60.99.70.46 / 06.76.87.02.82