본문 바로가기
Data Engineering/스파크

스파크 repartition, coalesce, collect (스파크 정리 7)

by 무언가 하는 소소 2022. 9. 29.
728x90
반응형

 

 

전편

2022.09.27 - [Data Engineering/스파크] - 스파크 filter, where, sort, sample, split, union (스파크 정리 6)

 

 

이번 실습을 진행하기 위해서 이전 포스팅(2022.09.16 - [Data Engineering/스파크] - 스파크란? 스파크 기본 개념 (스파크 정리 2))을 참고하여 데이터를 다운 받은 후 spark.read()를 이용하여 데이터프레임으로 데이터를 읽어오자. 

// in Scala
val df = spark.read.format("json")
  .load("/data/flight-data/json/2015-summary.json")

 

# in python
df = spark.read.format("json").load("/data/flight-data/json/2015-summary.json")

 

 

1. 리파티션 (Reparition)

스파크에서는 현재 파티션 개수를 알아낼 수도 있고 파티션을 다시 할 수도 있다. 현재 파티션 개수는 getNumParitions를 통해 얻을 수 있다.

// in Scala
df.rdd.getNumPartitions // 1

 

# in Python
df.rdd.getNumPartitions() # 1

 

리파티션은 repatition 함수를 쓰면 된다. 이때 새로운 파티션의 개수가 현재 파티션의 개수보다 많아야 한다.

// in Scala
df.repartition(5)

 

만약 특정 컬럼에 대해서 필터링을 많이 한다면 그 컬럼에 대해 리파티셔닝을 하면 성능을 높일 수 있다. 

df.repartition(5, col("DEST_COUNTRY_NAME"))

 

 

2.  Coalesce

콜렉트는 파티션의 반대로 파티션을 합쳐준다. 

// in Scala
df.repartition(5, col("DEST_COUNTRY_NAME")).coalesce(2)

 

 

3. 콜렉트 (Collect)

만약 모든 데이터를 로컬 머신의 드라이버에 모으고 다면 collect() 함수를 사용하면 된다. 이때 전체 데이터의 용량이 로컬 머신의 용량보다 크면 에러가 나니 주의하자.

val collectDF = df.limit(10)
collectDF.collect()

 

모든 데이터를 모은 후에 이터레이션을 돌고 싶다면 toLocalIterator() 함수를 사용하자. 

collectDF.toLocalIterator()

 

 

아래부터는 기존의 flight 데이터셋이 아닌 retail 데이터를 사용한다. 아래 코드를 참고해서 새로운 데이터를 읽자.

// in Scala
val df = spark.read.format("csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .load("/data/retail-data/by-day/2010-12-01.csv")
df.printSchema()
df.createOrReplaceTempView("dfTable")

 

# in Python
df = spark.read.format("csv")\
  .option("header", "true")\
  .option("inferSchema", "true")\
  .load("/data/retail-data/by-day/2010-12-01.csv")
df.printSchema()
df.createOrReplaceTempView("dfTable")

 

>> 스키마 출력
root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)

 

>> 데이터 일부 출력
+---------+---------+--------------------+--------+-------------------+----...
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|Unit...
+---------+---------+--------------------+--------+-------------------+----...
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|    ...
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|    ...
...
|   536367|    21755|LOVE BUILDING BLO...|       3|2010-12-01 08:34:00|    ...
|   536367|    21777|RECIPE BOX WITH M...|       4|2010-12-01 08:34:00|    ...
+---------+---------+--------------------+--------+-------------------+----...

 

 

4. Booleans과 스파크

지난번에 다룬 filter()와 where()는 boolean 표현식을 인풋으로 받는다. 아래 실습에서는 관련 예제들을 좀 더 다뤄본다.

 

아래는 데이터 중에 컬럼 "InvoicNo"의 값이 536365인 레코드만 모아서 출력하는 예제이다.

// in Scala
import org.apache.spark.sql.functions.col
df.where(col("InvoiceNo") === 536365)
  .select("InvoiceNo", "Description")
  .show(5)

 

 

# in Python
from pyspark.sql.functions import col
df.where(col("InvoiceNo") != 536365)\
  .select("InvoiceNo", "Description")\
  .show(5)
===============================================================================================
>> 츨력
+---------+--------------------+
|InvoiceNo|         Description|
+---------+--------------------+
|   536365|WHITE HANGING HEA...|
|   536365| WHITE METAL LANTERN|
|   536365|CREAM CUPID HEART...|
|   536365|KNITTED UNION FLA...|
|   536365|RED WOOLLY HOTTIE...|
+---------+--------------------+

 

 

"and"나 "or"을 사용하면 여러개의 boolean 표현식을 연결할 수 있다.

// in Scala
val DOTCodeFilter = col("StockCode") === "DOT"
val priceFilter = col("UnitPrice") > 600
val descripFilter = col("Description").contains("POSTAGE")
df.withColumn("isExpensive", DOTCodeFilter.and(priceFilter.or(descripFilter)))
  .where("isExpensive")
  .select("unitPrice", "isExpensive").show(5)

 

파이썬에서는 "|",  "&"을 사용하면 된다.

# in Python
from pyspark.sql.functions import instr
DOTCodeFilter = col("StockCode") == "DOT"
priceFilter = col("UnitPrice") > 600
descripFilter = instr(col("Description"), "POSTAGE") >= 1
df.withColumn("isExpensive", DOTCodeFilter & (priceFilter | descripFilter))\
  .where("isExpensive")\
  .select("unitPrice", "isExpensive").show(5)

 

 

 

다음편

2022.09.27 - [Data Engineering/스파크] - 스파크 정규표현식 (스파크 정리 8)

 

스파크 정규표현식 (스파크 정리 8)

전편 ☞ 2022.09.29 - [Data Engineering/스파크] - 스파크 repartition, coalesce, collect (스파크 정리 7) 이번 실습을 진행하기 위해서 이전 포스팅(2022.09.16 - [Data Engineering/스파크] - 스파크란? 스파..

dogfootja.tistory.com

728x90
반응형

댓글