Contents

PySpark Basics: Data Manipulation

PySpark is the Python API for Apache Spark, an open-source distributed computing system. The Resilient Distributed Datasets (RDDs) and DataFrames of PySpark provide a way to handle large-scale data processing tasks efficiently.

Initialization

First, let’s initialize a SparkSession:

1from pyspark.sql import SparkSession
2
3spark = SparkSession.builder.appName("PySparkExample").getOrCreate()

Creating a DataFrame

We can create a DataFrame from a variety of data sources, such as a CSV file or a list of dictionaries:

1data = [("Alice", 34), ("Bob", 45), ("Catherine", 29)]
2columns = ["Name", "Age"]
3df = spark.createDataFrame(data, columns)

In PySpark, when we create a DataFrame from a list of tuples (as shown in your example), the schema is implicitly defined by the structure of the data and the provided column names. The schema consists of column names and their corresponding data types.

However, if we want to explicitly define the schema, we can do so using the StructType and StructField classes from the pyspark.sql.types module. This can be particularly useful if we need more control over the data types or if we want to ensure the schema is exactly as expected.

  1. Import Required Modules:

    • StructType, StructField, StringType, and IntegerType from pyspark.sql.types to define the schema.
  2. Define the Schema:

    • Create a StructType object that contains StructField objects.
    • Each StructField specifies the name of the field, the data type (e.g., StringType or IntegerType), and a flag indicating whether the field can contain null values (True in this case).
  3. Create the DataFrame:

    • Use spark.createDataFrame(data, schema) to create the DataFrame with the specified schema.
 1from pyspark.sql import SparkSession
 2from pyspark.sql.types import StructType, StructField, StringType, IntegerType
 3
 4# Initialize SparkSession
 5spark = SparkSession.builder.appName("PySparkExample").getOrCreate()
 6
 7# Define the schema explicitly
 8schema = StructType([
 9    StructField("Name", StringType(), True),
10    StructField("Age", IntegerType(), True)
11])
12
13# Create the DataFrame with the specified schema
14data = [("Alice", 34), ("Bob", 45), ("Catherine", 29)]
15df = spark.createDataFrame(data, schema)

Common PySpark Operations

PySpark’s APIs for data manipulation are with functionality that can be closely compared to SQL and dplyr in R.

  • Filtering Rows: filter() in PySpark, WHERE in SQL, filter() in dplyr
  • Selecting Columns: select() in PySpark, SELECT in SQL, select() in dplyr
  • Adding Columns: withColumn() in PySpark, computed column in SQL, mutate() in dplyr
  • Grouping and Aggregating: groupBy().agg(F.sum(...)) in PySpark, GROUP BY in SQL, group_by() %>% summarize(sumvar=sum(...)) in dplyr
  • Joining DataFrames: join() in PySpark, JOIN in SQL, join() in dplyr
  • Removing Columns: drop() in PySpark, selecting specific columns in SQL, select(-column) in dplyr
  • Sorting Data: orderBy() in PySpark, ORDER BY in SQL, arrange() in dplyr

Examples

  • PySpark:
 1import pyspark.sql.functions as F
 2
 3result = (
 4    df
 5    .filter(col("Age") > 18)
 6    .select("Name", "Age")
 7    .orderBy(col("Age").desc())
 8    .withColumn("New_Column", lit("Value"))
 9    .drop("New_Column")
10    .groupBy("Name")
11    .agg(sum("Age").alias("Total_Age"))
12)
13
14df_joined = df1.join(df2, on="Name", how="inner")
  • SQL:
 1SELECT Name, SUM(Age) AS Total_Age, 'Value' AS New_Column
 2FROM (
 3    SELECT Name, Age
 4    FROM table
 5    WHERE Age > 18
 6    ORDER BY Age DESC
 7) AS subquery
 8GROUP BY Name;
 9
10SELECT * FROM table1 INNER JOIN table2 ON table1.Name = table2.Name;
  • dplyr:
 1library(dplyr)
 2
 3df <- data.frame(
 4    Name = c("Alice", "Bob", "Catherine"),
 5    Age = c(34, 45, 29))
 6)
 7
 8result <- df %>%
 9    filter(Age > 18) %>%
10    select(Name, Age) %>%
11    arrange(desc(Age)) %>%
12    mutate(New_Column = "Value") %>%
13    group_by(Name) %>%
14    summarize(Total_Age = sum(Age))
15
16df_joined <- inner_join(df1, df2, by = "Name")

The flow in PySpark using methods chaining is quite similar to the pipe operator (%>%) in dplyr, as it allows for a streamlined, readable sequence of data manipulation operations. In contrast, SQL uses a more nested and declarative style of querying.