PySpark Basics: Data Manipulation
PySpark is the Python API for Apache Spark, an open-source distributed computing system. The Resilient Distributed Datasets (RDDs) and DataFrames of PySpark provide a way to handle large-scale data processing tasks efficiently.
Initialization
First, let’s initialize a SparkSession:
1from pyspark.sql import SparkSession
2
3spark = SparkSession.builder.appName("PySparkExample").getOrCreate()
Creating a DataFrame
We can create a DataFrame from a variety of data sources, such as a CSV file or a list of dictionaries:
1data = [("Alice", 34), ("Bob", 45), ("Catherine", 29)]
2columns = ["Name", "Age"]
3df = spark.createDataFrame(data, columns)
In PySpark, when we create a DataFrame from a list of tuples (as shown in your example), the schema is implicitly defined by the structure of the data and the provided column names. The schema consists of column names and their corresponding data types.
However, if we want to explicitly define the schema, we can do so using the StructType and StructField classes from the pyspark.sql.types module. This can be particularly useful if we need more control over the data types or if we want to ensure the schema is exactly as expected.
-
Import Required Modules:
StructType,StructField,StringType, andIntegerTypefrompyspark.sql.typesto define the schema.
-
Define the Schema:
- Create a
StructTypeobject that containsStructFieldobjects. - Each
StructFieldspecifies the name of the field, the data type (e.g.,StringTypeorIntegerType), and a flag indicating whether the field can contain null values (Truein this case).
- Create a
-
Create the DataFrame:
- Use
spark.createDataFrame(data, schema)to create the DataFrame with the specified schema.
- Use
1from pyspark.sql import SparkSession
2from pyspark.sql.types import StructType, StructField, StringType, IntegerType
3
4# Initialize SparkSession
5spark = SparkSession.builder.appName("PySparkExample").getOrCreate()
6
7# Define the schema explicitly
8schema = StructType([
9 StructField("Name", StringType(), True),
10 StructField("Age", IntegerType(), True)
11])
12
13# Create the DataFrame with the specified schema
14data = [("Alice", 34), ("Bob", 45), ("Catherine", 29)]
15df = spark.createDataFrame(data, schema)
Common PySpark Operations
PySpark’s APIs for data manipulation are with functionality that can be closely compared to SQL and dplyr in R.
- Filtering Rows:
filter()in PySpark,WHEREin SQL,filter()indplyr - Selecting Columns:
select()in PySpark,SELECTin SQL,select()indplyr - Adding Columns:
withColumn()in PySpark, computed column in SQL,mutate()indplyr - Grouping and Aggregating:
groupBy().agg(F.sum(...))in PySpark,GROUP BYin SQL,group_by() %>% summarize(sumvar=sum(...))indplyr - Joining DataFrames:
join()in PySpark,JOINin SQL,join()indplyr - Removing Columns:
drop()in PySpark, selecting specific columns in SQL,select(-column)indplyr - Sorting Data:
orderBy()in PySpark,ORDER BYin SQL,arrange()indplyr
Examples
- PySpark:
1import pyspark.sql.functions as F
2
3result = (
4 df
5 .filter(col("Age") > 18)
6 .select("Name", "Age")
7 .orderBy(col("Age").desc())
8 .withColumn("New_Column", lit("Value"))
9 .drop("New_Column")
10 .groupBy("Name")
11 .agg(sum("Age").alias("Total_Age"))
12)
13
14df_joined = df1.join(df2, on="Name", how="inner")
- SQL:
1SELECT Name, SUM(Age) AS Total_Age, 'Value' AS New_Column
2FROM (
3 SELECT Name, Age
4 FROM table
5 WHERE Age > 18
6 ORDER BY Age DESC
7) AS subquery
8GROUP BY Name;
9
10SELECT * FROM table1 INNER JOIN table2 ON table1.Name = table2.Name;
- dplyr:
1library(dplyr)
2
3df <- data.frame(
4 Name = c("Alice", "Bob", "Catherine"),
5 Age = c(34, 45, 29))
6)
7
8result <- df %>%
9 filter(Age > 18) %>%
10 select(Name, Age) %>%
11 arrange(desc(Age)) %>%
12 mutate(New_Column = "Value") %>%
13 group_by(Name) %>%
14 summarize(Total_Age = sum(Age))
15
16df_joined <- inner_join(df1, df2, by = "Name")
The flow in PySpark using methods chaining is quite similar to the pipe operator (%>%) in dplyr, as it allows for a streamlined, readable sequence of data manipulation operations. In contrast, SQL uses a more nested and declarative style of querying.