전편
☞ 2022.09.25 - [Data Engineering/스파크] - 스파크 select, selctExpr, lit, distinct, limit (스파크 정리 5)
이번 실습을 진행하기 위해서 이전 포스팅(2022.09.16 - [Data Engineering/스파크] - 스파크란? 스파크 기본 개념 (스파크 정리 2))을 참고하여 데이터를 다운 받은 후 spark.read()를 이용하여 데이터프레임으로 데이터를 읽어오자.
// in Scala
val df = spark.read.format("json")
.load("/data/flight-data/json/2015-summary.json")
# in python
df = spark.read.format("json").load("/data/flight-data/json/2015-summary.json")
SQL에 WHERE가 있다면 스파크에는 filter()가 있다. 사실 스파크에도 filter()와 똑같은 기능을 하는 where()가 있긴 하지만 일반적으로 filter()를 더 많이 쓴다.
1. filter(), where()
스파크에서는 filter("표현식")을 통해서 표현식의 값이 참이 되는 리코드만 걸러낼 수 있다. 이때 유의해야 할 점은 다른 명령어들과 마찬가지로 filter() 역시 원본 데이터프레임을 조작하는 것이 아니라 새로운 데이터 프레임을 반환한다는 점이다. 또한 where("표현식")과 기능이 똑같다.
df.filter(col("count") < 2).show(2)
df.where("count < 2").show(2)
=======================================================================================
>> 출력
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
| United States| Croatia| 1|
| United States| Singapore| 1|
+-----------------+-------------------+-----+
filter()와 where() 함수는 둘다 여러개를 연결하여 쓸 수 있다. 여러개를 연결한 경우에 SQL에서 AND를 사용하여 여러 조건식을 함께 쓰는 것과 동일하다.
// in Scala
df.filter(col("count") < 2).filter(col("ORIGIN_COUNTRY_NAME") =!= "Croatia")
.show(2)
# in Python
df.where(col("count") < 2).where(col("ORIGIN_COUNTRY_NAME") != "Croatia")\
.show(2)
======================================================================================
>> 출력
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
| United States| Singapore| 1|
| Moldova| United States| 1|
+-----------------+-------------------+-----+
2. sorting
스파크는 SQL에서 ORDER BY를 사용하는 것과 마찬가지로 sort()를 이용해서 리코드를 정렬할 수 있다. 또한 sort()와 동일한 기능을 제공하는 orderBy()라는 함수도 지원한다. sort("컬럼명1", "컬럼명2", ...)를 통해 각 컬럼값에 따라 정렬할 수 있으며 디폴트는 오름차순이다. 인자로 스트링 대신 컬럼 레퍼런싱을 사용할 수도 있다.
// in Scala
df.sort("count")
df.orderBy("count", "DEST_COUNTRY_NAME")
df.orderBy(col("count"), col("DEST_COUNTRY_NAME"))
- desc, asc
만약 내림차순으로 정렬하고 싶거나 오름차순이라고 명시하고 싶을 때는 desc와 asc를 사용하면 된다. "컬럼명1 desc"와 같은 익스프레션을 사용할 수도 있고 desc(), asc() 함수를 사용할 수도 있다.
// in Scala
import org.apache.spark.sql.functions.{desc, asc}
df.orderBy(expr("count desc"))
df.orderBy(desc("count"), asc("DEST_COUNTRY_NAME"))
# in Python
from pyspark.sql.functions import desc, asc
df.orderBy(expr("count desc"))
df.orderBy(col("count").desc(), col("DEST_COUNTRY_NAME").asc())
- sortWithinPartition()
최적화를 위해 sortWithinPartition() 함수를 사용하여 전체 데이터가 아닌 각 파티션 별로 소팅을 할 수도 있다.
// in Scala
spark.read.format("json").load("/data/flight-data/json/*-summary.json")
.sortWithinPartitions("count")
# in Python
spark.read.format("json").load("/data/flight-data/json/*-summary.json")\
.sortWithinPartitions("count")
3. Randon Sampling
sample() 함수를 사용하면 레코드를 랜덤 샘플링 할 수 있다. 이때 인자로 seed, 중복 포함 여부(withReplacement), 샘플 개수(비율)를 넘겨주어야 한다.
// in Scala
val seed = 5
val withReplacement = false
val fraction = 0.5
df.sample(withReplacement, fraction, seed).count()
# in Python
seed = 5
withReplacement = False
fraction = 0.5
df.sample(withReplacement, fraction, seed).count()
4. Random Splits
스파크에서는 randomSplit() 함수를 통해 샘플링 뿐만 아니라 데이터 프레임을 랜덤으로 쪼개는 기능도 제공한다. randomSplit()의 인수로는 쪼개는 비율을 담은 배열 =(파이썬은 리스트)과 seed를 넘겨주면 된다.
// in Scala
val dataFrames = df.randomSplit(Array(0.25, 0.75), seed)
dataFrames(0).count() > dataFrames(1).count() // False
# in Python
dataFrames = df.randomSplit([0.25, 0.75], seed)
dataFrames[0].count() > dataFrames[1].count() # False
5. Union(): 데이터프레임 합치기
데이터프레임을 세로로 합치기 위해서는 union()을 쓸 수 있다. 참고로 아래 코드를 보면 "=!="라는 연산자가 있다. 스칼라에서 컬럼값을 비교하기 위해서는 "==", "!=" 연산자 대신에 "===", "=!=" 연산자를 사용해야 한다. "==", "!=" 연산자를 쓰면 스트링값에 대한 비교를 한다.
// in Scala
import org.apache.spark.sql.Row
val schema = df.schema
// 새로운 데이터프레임 생성
val newRows = Seq(
Row("New Country", "Other Country", 5L),
Row("New Country 2", "Other Country 3", 1L)
)
val parallelizedRows = spark.sparkContext.parallelize(newRows)
val newDF = spark.createDataFrame(parallelizedRows, schema)
// 원래 데이터프레임과 새로운 데이터프레임 합치기
df.union(newDF)
.where("count = 1")
.where($"ORIGIN_COUNTRY_NAME" =!= "United States")
.show()
# in Python
from pyspark.sql import Row
schema = df.schema
# 새로운 데이터프레임 생성
newRows = [
Row("New Country", "Other Country", 5L),
Row("New Country 2", "Other Country 3", 1L)
]
parallelizedRows = spark.sparkContext.parallelize(newRows)
newDF = spark.createDataFrame(parallelizedRows, schema)
# 기존 데이터프레임과 새로운 데이터프레임 합치기
df.union(newDF)\
.where("count = 1")\
.where(col("ORIGIN_COUNTRY_NAME") != "United States")\
.show()
=======================================================================================
>> 출력
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
| United States| Croatia| 1|
| United States| Singapore| 1|
| United States| Gibraltar| 1|
| United States| Cyprus| 1|
| United States| Estonia| 1|
| United States| Lithuania| 1|
| United States| Bulgaria| 1|
| United States| Georgia| 1|
| United States| Bahrain| 1|
| United States| Papua New Guinea| 1|
| United States| Montenegro| 1|
| United States| Namibia| 1|
| New Country 2| Other Country 3| 1|
+-----------------+-------------------+-----+
다음편
☞ 2022.09.29 - [Data Engineering/스파크] - 스파크 repartition, coalesce, collect (스파크 정리 7)
'Data Engineering > 스파크' 카테고리의 다른 글
스파크 repartition, coalesce, collect (스파크 정리 7) (0) | 2022.09.29 |
---|---|
스파크 정규표현식 (스파크 정리 8) (0) | 2022.09.27 |
스파크 select, selctExpr, lit, distinct, limit (스파크 정리 5) (0) | 2022.09.25 |
스파크 기본 명령어, 스파크 데이터프레임 명령어 (스파크 정리 4) (0) | 2022.09.24 |
스파크 Structured API - Datasets, DataFrames, SQL tables (스파크 정리 3) (0) | 2022.09.22 |
댓글