전편
☞ 2022.09.27 - [Data Engineering/스파크] - 스파크 filter, where, sort, sample, split, union (스파크 정리 6)
이번 실습을 진행하기 위해서 이전 포스팅(2022.09.16 - [Data Engineering/스파크] - 스파크란? 스파크 기본 개념 (스파크 정리 2))을 참고하여 데이터를 다운 받은 후 spark.read()를 이용하여 데이터프레임으로 데이터를 읽어오자.
// in Scala
val df = spark.read.format("json")
.load("/data/flight-data/json/2015-summary.json")
# in python
df = spark.read.format("json").load("/data/flight-data/json/2015-summary.json")
1. 리파티션 (Reparition)
스파크에서는 현재 파티션 개수를 알아낼 수도 있고 파티션을 다시 할 수도 있다. 현재 파티션 개수는 getNumParitions를 통해 얻을 수 있다.
// in Scala
df.rdd.getNumPartitions // 1
# in Python
df.rdd.getNumPartitions() # 1
리파티션은 repatition 함수를 쓰면 된다. 이때 새로운 파티션의 개수가 현재 파티션의 개수보다 많아야 한다.
// in Scala
df.repartition(5)
만약 특정 컬럼에 대해서 필터링을 많이 한다면 그 컬럼에 대해 리파티셔닝을 하면 성능을 높일 수 있다.
df.repartition(5, col("DEST_COUNTRY_NAME"))
2. Coalesce
콜렉트는 파티션의 반대로 파티션을 합쳐준다.
// in Scala
df.repartition(5, col("DEST_COUNTRY_NAME")).coalesce(2)
3. 콜렉트 (Collect)
만약 모든 데이터를 로컬 머신의 드라이버에 모으고 다면 collect() 함수를 사용하면 된다. 이때 전체 데이터의 용량이 로컬 머신의 용량보다 크면 에러가 나니 주의하자.
val collectDF = df.limit(10)
collectDF.collect()
모든 데이터를 모은 후에 이터레이션을 돌고 싶다면 toLocalIterator() 함수를 사용하자.
collectDF.toLocalIterator()
아래부터는 기존의 flight 데이터셋이 아닌 retail 데이터를 사용한다. 아래 코드를 참고해서 새로운 데이터를 읽자.
// in Scala
val df = spark.read.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("/data/retail-data/by-day/2010-12-01.csv")
df.printSchema()
df.createOrReplaceTempView("dfTable")
# in Python
df = spark.read.format("csv")\
.option("header", "true")\
.option("inferSchema", "true")\
.load("/data/retail-data/by-day/2010-12-01.csv")
df.printSchema()
df.createOrReplaceTempView("dfTable")
>> 스키마 출력
root
|-- InvoiceNo: string (nullable = true)
|-- StockCode: string (nullable = true)
|-- Description: string (nullable = true)
|-- Quantity: integer (nullable = true)
|-- InvoiceDate: timestamp (nullable = true)
|-- UnitPrice: double (nullable = true)
|-- CustomerID: double (nullable = true)
|-- Country: string (nullable = true)
>> 데이터 일부 출력
+---------+---------+--------------------+--------+-------------------+----...
|InvoiceNo|StockCode| Description|Quantity| InvoiceDate|Unit...
+---------+---------+--------------------+--------+-------------------+----...
| 536365| 85123A|WHITE HANGING HEA...| 6|2010-12-01 08:26:00| ...
| 536365| 71053| WHITE METAL LANTERN| 6|2010-12-01 08:26:00| ...
...
| 536367| 21755|LOVE BUILDING BLO...| 3|2010-12-01 08:34:00| ...
| 536367| 21777|RECIPE BOX WITH M...| 4|2010-12-01 08:34:00| ...
+---------+---------+--------------------+--------+-------------------+----...
4. Booleans과 스파크
지난번에 다룬 filter()와 where()는 boolean 표현식을 인풋으로 받는다. 아래 실습에서는 관련 예제들을 좀 더 다뤄본다.
아래는 데이터 중에 컬럼 "InvoicNo"의 값이 536365인 레코드만 모아서 출력하는 예제이다.
// in Scala
import org.apache.spark.sql.functions.col
df.where(col("InvoiceNo") === 536365)
.select("InvoiceNo", "Description")
.show(5)
# in Python
from pyspark.sql.functions import col
df.where(col("InvoiceNo") != 536365)\
.select("InvoiceNo", "Description")\
.show(5)
===============================================================================================
>> 츨력
+---------+--------------------+
|InvoiceNo| Description|
+---------+--------------------+
| 536365|WHITE HANGING HEA...|
| 536365| WHITE METAL LANTERN|
| 536365|CREAM CUPID HEART...|
| 536365|KNITTED UNION FLA...|
| 536365|RED WOOLLY HOTTIE...|
+---------+--------------------+
"and"나 "or"을 사용하면 여러개의 boolean 표현식을 연결할 수 있다.
// in Scala
val DOTCodeFilter = col("StockCode") === "DOT"
val priceFilter = col("UnitPrice") > 600
val descripFilter = col("Description").contains("POSTAGE")
df.withColumn("isExpensive", DOTCodeFilter.and(priceFilter.or(descripFilter)))
.where("isExpensive")
.select("unitPrice", "isExpensive").show(5)
파이썬에서는 "|", "&"을 사용하면 된다.
# in Python
from pyspark.sql.functions import instr
DOTCodeFilter = col("StockCode") == "DOT"
priceFilter = col("UnitPrice") > 600
descripFilter = instr(col("Description"), "POSTAGE") >= 1
df.withColumn("isExpensive", DOTCodeFilter & (priceFilter | descripFilter))\
.where("isExpensive")\
.select("unitPrice", "isExpensive").show(5)
다음편
☞ 2022.09.27 - [Data Engineering/스파크] - 스파크 정규표현식 (스파크 정리 8)
'Data Engineering > 스파크' 카테고리의 다른 글
스파크란? 스파크 vs 맵리듀스 (0) | 2024.02.10 |
---|---|
스파크 NULL 처리 (스파크 정리 9) (0) | 2022.10.08 |
스파크 정규표현식 (스파크 정리 8) (0) | 2022.09.27 |
스파크 filter, where, sort, sample, split, union (스파크 정리 6) (0) | 2022.09.27 |
스파크 select, selctExpr, lit, distinct, limit (스파크 정리 5) (0) | 2022.09.25 |
댓글