본문 바로가기
Data Engineering/스파크

스파크 filter, where, sort, sample, split, union (스파크 정리 6)

by 무언가 하는 소소 2022. 9. 27.
728x90
반응형

 

 

전편

2022.09.25 - [Data Engineering/스파크] - 스파크 select, selctExpr, lit, distinct, limit (스파크 정리 5)

 

 

이번 실습을 진행하기 위해서 이전 포스팅(2022.09.16 - [Data Engineering/스파크] - 스파크란? 스파크 기본 개념 (스파크 정리 2))을 참고하여 데이터를 다운 받은 후 spark.read()를 이용하여 데이터프레임으로 데이터를 읽어오자. 

// in Scala
val df = spark.read.format("json")
  .load("/data/flight-data/json/2015-summary.json")

 

# in python
df = spark.read.format("json").load("/data/flight-data/json/2015-summary.json")

 

 

SQL에 WHERE가 있다면 스파크에는 filter()가 있다. 사실 스파크에도 filter()와 똑같은 기능을 하는 where()가 있긴 하지만 일반적으로 filter()를 더 많이 쓴다.

 

1. filter(), where()

스파크에서는 filter("표현식")을 통해서 표현식의 값이 참이 되는 리코드만 걸러낼 수 있다. 이때 유의해야 할 점은 다른 명령어들과 마찬가지로 filter() 역시 원본 데이터프레임을 조작하는 것이 아니라 새로운 데이터 프레임을 반환한다는 점이다. 또한 where("표현식")과 기능이 똑같다. 

df.filter(col("count") < 2).show(2)
df.where("count < 2").show(2)
=======================================================================================
>> 출력
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Croatia|    1|
|    United States|          Singapore|    1|
+-----------------+-------------------+-----+

 

filter()와 where() 함수는 둘다 여러개를 연결하여 쓸 수 있다. 여러개를 연결한 경우에 SQL에서 AND를 사용하여 여러 조건식을 함께 쓰는 것과 동일하다. 

// in Scala
df.filter(col("count") < 2).filter(col("ORIGIN_COUNTRY_NAME") =!= "Croatia")
  .show(2)

 

# in Python
df.where(col("count") < 2).where(col("ORIGIN_COUNTRY_NAME") != "Croatia")\
  .show(2)
====================================================================================== 
>> 출력
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|          Singapore|    1|
|          Moldova|      United States|    1|
+-----------------+-------------------+-----+

 

 

2. sorting

스파크는 SQL에서 ORDER BY를 사용하는 것과 마찬가지로 sort()를 이용해서 리코드를 정렬할 수 있다. 또한 sort()와 동일한 기능을 제공하는 orderBy()라는 함수도 지원한다. sort("컬럼명1", "컬럼명2", ...)를 통해 각 컬럼값에 따라 정렬할 수 있으며 디폴트는 오름차순이다. 인자로 스트링 대신 컬럼 레퍼런싱을 사용할 수도 있다. 

// in Scala
df.sort("count")
df.orderBy("count", "DEST_COUNTRY_NAME")
df.orderBy(col("count"), col("DEST_COUNTRY_NAME"))

 

- desc, asc

만약 내림차순으로 정렬하고 싶거나 오름차순이라고 명시하고 싶을 때는 desc와 asc를 사용하면 된다. "컬럼명1 desc"와 같은 익스프레션을 사용할 수도 있고 desc(), asc() 함수를 사용할 수도 있다.

// in Scala
import org.apache.spark.sql.functions.{desc, asc}
df.orderBy(expr("count desc"))
df.orderBy(desc("count"), asc("DEST_COUNTRY_NAME"))

 

# in Python
from pyspark.sql.functions import desc, asc
df.orderBy(expr("count desc"))
df.orderBy(col("count").desc(), col("DEST_COUNTRY_NAME").asc())

 

- sortWithinPartition()

최적화를 위해 sortWithinPartition() 함수를 사용하여 전체 데이터가 아닌 각 파티션 별로 소팅을 할 수도 있다. 

// in Scala
spark.read.format("json").load("/data/flight-data/json/*-summary.json")
  .sortWithinPartitions("count")

 

# in Python
spark.read.format("json").load("/data/flight-data/json/*-summary.json")\
  .sortWithinPartitions("count")

 

 

3. Randon Sampling

sample() 함수를 사용하면 레코드를 랜덤 샘플링 할 수 있다. 이때 인자로 seed, 중복 포함 여부(withReplacement), 샘플 개수(비율)를 넘겨주어야 한다. 

// in Scala
val seed = 5
val withReplacement = false
val fraction = 0.5
df.sample(withReplacement, fraction, seed).count()

 

# in Python
seed = 5
withReplacement = False
fraction = 0.5
df.sample(withReplacement, fraction, seed).count()

 

 

4. Random Splits

스파크에서는 randomSplit() 함수를 통해 샘플링 뿐만 아니라 데이터 프레임을 랜덤으로 쪼개는 기능도 제공한다. randomSplit()의 인수로는 쪼개는 비율을 담은 배열 =(파이썬은 리스트)과 seed를 넘겨주면 된다.

// in Scala
val dataFrames = df.randomSplit(Array(0.25, 0.75), seed)
dataFrames(0).count() > dataFrames(1).count() // False

 

# in Python
dataFrames = df.randomSplit([0.25, 0.75], seed)
dataFrames[0].count() > dataFrames[1].count() # False

 

 

5. Union(): 데이터프레임 합치기

데이터프레임을 세로로 합치기 위해서는 union()을 쓸 수 있다. 참고로 아래 코드를 보면 "=!="라는 연산자가 있다. 스칼라에서 컬럼값을 비교하기 위해서는 "==", "!=" 연산자 대신에 "===", "=!=" 연산자를 사용해야 한다. "==", "!=" 연산자를 쓰면 스트링값에 대한 비교를 한다. 

// in Scala
import org.apache.spark.sql.Row
val schema = df.schema

// 새로운 데이터프레임 생성
val newRows = Seq(
  Row("New Country", "Other Country", 5L),
  Row("New Country 2", "Other Country 3", 1L)
)
val parallelizedRows = spark.sparkContext.parallelize(newRows)
val newDF = spark.createDataFrame(parallelizedRows, schema)

// 원래 데이터프레임과 새로운 데이터프레임 합치기
df.union(newDF)
  .where("count = 1")
  .where($"ORIGIN_COUNTRY_NAME" =!= "United States")
  .show()

 

# in Python
from pyspark.sql import Row
schema = df.schema

# 새로운 데이터프레임 생성
newRows = [
  Row("New Country", "Other Country", 5L),
  Row("New Country 2", "Other Country 3", 1L)
]
parallelizedRows = spark.sparkContext.parallelize(newRows)
newDF = spark.createDataFrame(parallelizedRows, schema)

# 기존 데이터프레임과 새로운 데이터프레임 합치기
df.union(newDF)\
  .where("count = 1")\
  .where(col("ORIGIN_COUNTRY_NAME") != "United States")\
  .show()
=======================================================================================
>> 출력
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Croatia|    1|
|    United States|          Singapore|    1|
|    United States|          Gibraltar|    1|
|    United States|             Cyprus|    1|
|    United States|            Estonia|    1|
|    United States|          Lithuania|    1|
|    United States|           Bulgaria|    1|
|    United States|            Georgia|    1|
|    United States|            Bahrain|    1|
|    United States|   Papua New Guinea|    1|
|    United States|         Montenegro|    1|
|    United States|            Namibia|    1|
|    New Country 2|    Other Country 3|    1|
+-----------------+-------------------+-----+

 

 

다음편

2022.09.29 - [Data Engineering/스파크] - 스파크 repartition, coalesce, collect (스파크 정리 7)

 

스파크 repartition, coalesce, collect (스파크 정리 7)

전편 ☞ 2022.09.27 - [Data Engineering/스파크] - 스파크 filter, where, sort, sample, split, union (스파크 정리 6) 이번 실습을 진행하기 위해서 이전 포스팅(2022.09.16 - [Data Engineering/스파크] - 스..

dogfootja.tistory.com

728x90
반응형

댓글