簡介 Spark / Spark Introduction
Apache Spark 是一個 Big Data 處理平臺,現在資料規模基本上是 TB or PB 級距起跳,只用單機根本不可能跑,因此要有一個強大的平台來協助處理這些資料。
Spark 除了有豐富的函式庫,底層是用 Scala 寫的,提供 Scala / Python / R / Java 一致功能的 API,方便開發者操作。
- Spark Core
可以把它想成是 Spark OS,包含重要核心功能:工作排程(process scheduling)、記憶體管理(memory management),而 Spark 主要的程式抽象化結構 RDD (Resilient Disributed Datasets 彈性分散式資料集) 的 API 也是定義在此層中。 - Spark SQL
這部分是為了處理結構化資料而生的,除了提供 SQL 使用介面外,Spark SQL 也允許開發人員將 SQL 查詢與其他 RDD 所支援的資料處理方式一起使用。 - Spark Streaming
這部分是處理即時串流資料(e.g. web server log)的元件,提供處理這類資料的 API。 - Spark MLlib
顧名思義提供常見的機器學習函式庫,在 MLlib 裡面除了 Regression & Classification,也提供了 Evaluation & Pipeline 的功能。 - Spark GraphX
此部分用來在分散式圖處理的函式庫,GraphX 提供了很多處理圖的操作,如 subgraph 和 mapVertices 以及常見的圖論演算法。(這部分目前還沒有涉略,將來有機會碰到,再做筆記分享!)
PySpark 跟 Python 的差別?/ Difference between PySpark and Python
- DataFrame 將會完全取代 Pandas df (也可以把 DataFrame 視為 Spark 版本的 Pandas)
- 資料儲存的優化:Partitions,可以切割 DataFrames (e.g. Parquet File)
- Lazy Computation: PySpark will not actually run your commands until it absolutely has to
- 建議將 PySpark 視為 Python 另外的函式庫,擁有自己特定語法
Remember: You will save yourself a lot of heartache by going directly to PySpark documentations when you get stuck rather than trying to pythonize what you are trying to accomplish.
話不多說,直接透過實際操作來認識 PySpark。以下的程式碼將以在 Google Colab 上面運行為主。首先必須下載 PySpark。下載好後,當要做任何操作之前,必須先開啟 Spark Session。
由於 Mentor 希望我先看 Spark SQL 這塊東西,故以下先以 SQL 作為學習的切入點。若有讀者不是很熟悉 SQL,可以先參考以下這篇文章,這邊有基本語法的介紹。
SQL Options
PySpark provides two main options when it comes to using staight SQL. Spark SQL and SQL Transformer.
1. Spark SQL
Spark TempView provides two functions that allow users to run SQL queries against a Spark DataFrame:
- createOrReplaceTempView: The lifetime of this temporary view is tied to the SparkSession that was used to create the dataset. It creates (or replaces if that view name already exists) a lazily evaluated “view” that you can then use like a hive table in Spark SQL. It does not persist to memory unless you cache the dataset that underpins the view.
- createGlobalTempView: The lifetime of this temporary view is tied to this Spark application. This feature is useful when you want to share data among different sessions and keep alive until your application ends.
先將我們要玩的資料讀進來,並看一下裡面有什麼內容。
path = '/content/drive/My Drive/PySpark/Datasets/'
crime = spark.read.csv(path + 'rec-crime-pfa.csv', inferSchema=True, header=True)
print(crime.printSchema())
這邊有個小東西要注意,要使用 SQL 是不允許 column 名稱含有空格的。如果對含有空格的欄位做操作,會有問題跳出 Error。所以我們先進行置換,而這邊只置換第五個 column 而沒有置換第一個是因為我們不會操作到第一個欄位。
df = crime.withColumnRenamed('Rolling year total number of offences', 'Count')
print(df.printSchema())
接下來就是處理 Spark SQL 的環節,建立 Spark TempView。
# Create a temporary view of the dataframe
df.createOrReplaceTempView("tempview")
建立成功後,我們就能夠使用 SQL 操作了!但注意,這個 TempView 的生命週期是綁在 spark session 上面的,如果你這個 session 消失,則 tempview 也會跟著消失。
# Then Query the temp view
spark.sql("SELECT * \
FROM tempview \
WHERE Count > 1000 ").show(5)
spark.sql("SELECT Region, sum(Count) AS Total \
FROM tempview \
GROUP BY Region").limit(5).toPandas()
如果想對數值型的資料進行操作,並且增加 column 在既有的 Dataframe 上面,可以有以下不同寫法。已知 Count 總和為 244720928,我們想加入一個欄位叫做 Percentage,代表 Count / 244720928。
2. SQL Transformer
You also have the option to use the SQL transformer option where you can write free-form SQL scripts as well.
在 SQL Transformer 的語法上面,可能會比較不習慣他的寫法。注意在創建的時候,statement 裡面就是放要的 SQL 操作指令,其中 __THIS__ 不可以改成其他東西,他就是一個特殊字,暫不理會。創建好 SQLTransformer 後,把要處理的 dataframe 對象傳入,他就會幫你執行操作。(個人覺得這個方法比上面的 Spark SQL 好,操作一目瞭然!)
UDF / User Defined Functions
在 PySpark 有一個很重要的特色,當你想對 dataframe 中某個 column 做特定的事情時,如果沒有內建的函式可以完成,此事你會做的事情就是自己定義函式,來客製化你的需求。但,這邊我們必須把函式轉成 udf 的形式來表達,這樣在 PySpark 的使用上,才能達到再多個 node 上面一起執行。
Functions as you know them in Python work a bit differently in Pyspark because it operates on a cluster. If you define a function the traditional Python way in PySpark, you will not receive an error message but the call will not distribute on all nodes. So it will run slower. To convert a Python function to what’s called a user defined function (UDF) in PySpark is what you need to do.
Note: keep in mind that a function will not work on a column with null values.
以下提供簡易範例,現在想要把原先 dataframe 裡面 Name 欄位的名字第一個字都改成大寫。
定義好函式之後,把它轉成 udf 的型式。轉好之後就能像我們呼叫函式一樣直覺地對他操作,用在 select, withColumn 都沒問題。
如果想要在 Spark SQL 中使用也沒問題,不過要記得先 register: spark.udf.register()
,以下範例程式也會跑出跟剛才一樣的結果。
此外,我們也有另外一種是使用 Annotation 的函式定義方法,既省時又省力,大部分開發者都偏好使用這種方法。假設我現在要讓 Name 欄位的名字全部都改成大寫,而且自己定義函式。此時,我只要在我的函式上面加上一行小老鼠 @F.udf 後面接括號,括號中放 returnType 也就是回傳值的資料型態,即可輕鬆定義好 udf。
Github for your reference:
參考資料 / Reference
(1) 一小時了解 Spark:
(2) 前人 Medium PySpark 教學文章
(3) 推薦 Udemy 課程
(4) Spark — NCTU Gitbook
(5) UDF
This article will be updated at any time! Thanks for your reading. If you like the content, please clip the “clap” button. You can also press the follow button to track new articles at any time. Feel free to contact me via LinkedIn or email.