=================================================================================
The comparisons below highlight how the syntax and approach differ between these two popular data processing frameworks:
- Creating Data
- Pandas:
import pandas as pd
data = {'Name': ['John', 'Anna', 'James', 'Linda'],
'Age': [28, 22, 35, 32],
'City': ['New York', 'Paris', 'London', 'Berlin']}
df = pd.DataFrame(data)
- SparkSQL:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('example').getOrCreate()
data = [('John', 28, 'New York'),
('Anna', 22, 'Paris'),
('James', 35, 'London'),
('Linda', 32, 'Berlin')]
columns = ['Name', 'Age', 'City']
df = spark.createDataFrame(data, columns)
- Selecting Data
- Filtering Data
- Aggregating Data
- Running SQL Queries
- Pandas:
Pandas does not support SQL natively but can use the pandasql library:
import pandasql as ps
query = "SELECT * FROM df WHERE Age > 30"
result = ps.sqldf(query, locals())
- SparkSQL:
import pandasql as ps
query = "SELECT * FROM df WHERE Age > 30"
result = ps.sqldf(query, locals())
Table 3379a. Cheatsheet of PySpark (for SparkSQL).
Function |
Code |
Creating DataFrames |
Create DataFrame from RDD |
df = spark.createDataFrame(rdd) |
Create DataFrame from list |
df = spark.createDataFrame([(1, “John"), (2, "Jane"), (3, "Bob")]) |
Create DataFrame from CSV |
df = spark.read.csv("path/to/file.csv", header=True, inferSchema=True) |
Create DataFrame from JSON |
df = spark.read.json("path/to/file.json") |
Create DataFrame from Parquet |
df = spark.read.parquet("path/to/file.parquet" ) |
Create DataFrame from Avro |
df = spark.read.format("avro").load("path/to/file.avro") |
Create DataFrame from ORC |
df = spark.read.orc("path/to/file.orc") |
Create DataFrame from JDBC |
df = spark.read.format("jdbc").option("url", "jdbc:postgresql:dbserver") .option("dbtable", "schema.tablename").option("user", "username") .option("password", "password").load() |
Create empty DataFrame with schema |
schema = StructType([StructField("id", IntegerType()), StructField("name", StringType())]); df = spark.createDataFrame([], schema) |
DataFrames and SparkSQL |
appName() |
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("MyApp").getOrCreate()
A name for your job to display on the cluster web UI. |
createDataFrame() |
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("MyApp").getOrCreate()
data = [("Jhon", 30), ("Peter", 25), ("Bob", 35)]
columns = ["name", "age"]
Creating a DataFrame:
df = spark.createDataFrame(data, columns)
Used to load the data into a Spark DataFrame. |
fillna() |
Replace NULL/None values in a DataFrame:
filled_df = df.fillna(0)
Replace with zero
Used to replace NULL/None values on all or selected multiple DataFrame columns with either zero (0), empty string, space, or any constant literal values. |
filter() |
filtered_df = df.filter(df['age'] > 30)
spark = SparkSession.builder.getOrCreate()
Returns an iterator where the items are filtered through a function to test if the item is accepted or not. |
groupby() |
Grouping data and performing aggregation:
grouped_df = df.groupBy("age").agg({"age": "count"})
Returning the first 5 rows:
first_5_rows = df.head(5)
Used to collect the identical data into groups on DataFrame and perform count, sum, avg, min, max functions on the grouped data. |
import |
from pyspark.sql import SparkSession
Used to make code from one module accessible in another. Python imports are crucial for a successful code structure. You may reuse code and keep your projects manageable by using imports effectively, which can increase your productivity. |
pd.read_csv() |
import pandas as pd
Reading data from a CSV file into a DataFrame:
df_from_csv = pd.read_csv("data.csv")
Required to access data from the CSV file from Pandas that retrieves data in the form of the data frame. |
pip |
pip list
pip install pyspark
To ensure that requests will function, the pip program searches for the package in the Python Package Index (PyPI), resolves any dependencies, and installs everything in your current Python environment. |
printSchema() |
df.printSchema()
Used to print or display the schema of the DataFrame or data set in tree format along with the column name and data type. If you have a DataFrame or data set with a nested structure, it displays the schema in a nested tree format. |
rename() |
Create a sample DataFrame:
data = {'A': [1, 2, 3], 'B': [4, 5, 6]}
df = pd.DataFrame(data)
Rename columns:
df = df.rename(columns={'A': 'X', 'B': 'Y'})
The columns 'A' and 'B' are now renamed to 'X' and 'Y'
print(df) |
select() |
selected_df = df.select('name', 'age')
Used to select one or multiple columns, nested columns, column by index, all columns from the list, by regular expression from a DataFrame. select() is a transformation function in Spark and returns a new DataFrame with the selected columns. |
show() |
df.show()
Spark DataFrame show() is used to display the contents of the DataFrame in a table row and column format. By default, it shows only twenty rows, and the column values are truncated at twenty characters. |
sort() |
Sorting DataFrame by a column in ascending order:
sorted_df = df.sort("age")
Sorting DataFrame by multiple columns in descending order:
sorted_df_desc = df.sort(["age", "name"], ascending=[False, True])
Used to sort DataFrame by ascending or descending order based on single or multiple columns. |
SparkContext() |
from pyspark import SparkContext
Creating a SparkContext:
sc = SparkContext("local", "MyApp")
It is an entry point to Spark and is defined in org.apache.spark package since version 1.x and used to programmatically create Spark RDD, accumulators, and broadcast variables on the cluster. |
SparkSession |
from pyspark.sql import SparkSession
Creating a SparkSession:
spark = SparkSession.builder.appName("MyApp").getOrCreate()
It is an entry point to Spark, and creating a SparkSession instance would be the first statement you would write to the program with RDD, DataFrame, and dataset |
spark.read.json() |
json_df = spark.read.json("customer.json")
Spark SQL can automatically infer the schema of a JSON data set and load it as a DataFrame. The read.json() function loads data from a directory of JSON files where each line of the files is a JSON object. Note that the file offered as a JSON file is not a typical JSON file. |
spark.sql() |
result = spark.sql("SELECT name, age FROM cust_tbl WHERE age > 30")
result.show()
To issue any SQL query, use the sql() method on the SparkSession instance. All spark.sql queries executed in this manner return a DataFrame on which you may perform further Spark operations if required. |
spark.udf.register() |
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
def my_udf(value):
return value.upper()
spark.udf.register("my_udf", my_udf, StringType())
In PySpark DataFrame, it is used to register a user-defined function (UDF) with Spark, making it accessible for use in Spark SQL queries. This allows you to apply custom logic or operations to DataFrame columns using SQL expressions. |
where() |
Filtering rows based on a condition:
filtered_df = df.where(df['age'] > 30)
Used to filter the rows from DataFrame based on the given condition. Both filter() and where() functions are used for the same purpose. |
withColumn() |
Adding a new column and performing transformations:
from pyspark.sql.functions import col
new_df = df.withColumn("age_squared", col("age") ** 2)
Transformation function of DataFrame used to change the value, convert the data type of an existing column, create a new column, and many more. |
withColumnRenamed() |
Renaming an existing column:
renamed_df = df.withColumnRenamed("age", "years_old")
Returns a new DataFrame by renaming an existing column. |
toDS() function |
It is the function to create a dataset from a sequence in many programming contexts, especially in Apache Spark. This function is used to convert a Resilient Distributed Dataset (RDD) or a DataFrame into a Dataset, which is a strongly-typed collection of domain-specific objects that can be transformed in parallel using functional or relational operations. |
map (func) |
It is an essential operation capable of expressing all transformations needed in data science. It passes each element of the source through a function func, thereby returning a newly formed distributed dataset. |
flatmap (func) |
Similar to map (func) can map each input item to zero or more output items. Its function should return a Seq rather than a single item. |
filter (func) |
It is used to selectively keep elements from a dataset or DataFrame by applying a specified function (func). This function acts as a criterion to determine which elements should be retained. By filtering and isolating specific elements that meet these criteria, filter becomes an essential tool for data transformation and analysis. |
distinct ([numTasks])) |
It helps in finding the number of varied elements in a dataset. It returns a new dataset containing distinct elements from the source dataset. |
DataFrame Window Functions |
Window functions (rank, dense_rank, percent_rank, row_number) |
from pyspark.sql.window import Window; window = Window.partitionBy("column1").orderBy("column2"); df.withColumn("rank", rank().over(window)) |
Lag and lead functions |
from pyspark.sql.window import Window; window = Window.partitionBy("column1").orderBy("column2"); df.withColumn("lag", lag("value", 1).over(window)).withColumn("lead", lead("value", 1).over(window)) |
Cumulative sum |
from pyspark.sql.window import Window; window = Window.partitionBy("column1").orderBy("column2"); df.withColumn("cumulative_sum", sum("value").over(window)) |
Cumulative max |
from pyspark.sql.window import Window; window = Window.partitionBy("column1").orderBy("column2"); df.withColumn("cumulative_max", max("value").over(window)) |
Moving average |
from pyspark.sql.window import Window; window = Window.partitionBy("column1").orderBy("column2").rowsBetween(-2, 0); df.withColumn("moving_avg", avg("value").over(window)) |
Running total |
from pyspark.sql.window import Window; window = Window.partitionBy("column1").orderBy("column2").rowsBetween(Window.unboundedPreceding, Window.currentRow); df.withColumn("running_total", sum("value").over(window)) |
Table 3379a lists various operations and transformations applicable to a DataFrame in Spark, a two-dimensional data structure resembling a relational database table with its organized rows and columns. These operations cover a broad spectrum of activities, such as importing data into a DataFrame, analyzing the data, carrying out transformations like filtering, grouping, and aggregating, retrieving data from external sources, and exporting data to different formats. Such operations are crucial for efficiently handling structured data in Spark.
Data collection is systematically arranged into labeled columns. DataFrames function similarly to a table in a relational database or a data frame in R or Python, yet they are further optimized. These structures are developed on the SparkSQL RDD API and utilize RDDs to execute relational queries. Additionally, DataFrames are highly scalable, compatible with numerous data formats and storage systems, and are designed to be developer-friendly. They seamlessly integrate with a wide range of big data tools through Spark, and provide APIs for Python, Java, Scala, and R.
===========================================
|