전편
☞ 2022.09.16 - [Data Engineering/스파크] - 스파크란? 스파크 기본 개념 (스파크 정리 2)
1. Structured API
스파크의 Structured API는 하이레벨 API로 각종 데이터(CSV 파일에서부터 Parquet 파일까지)를 조작할 수 있는 툴이다. 해당 API에는 세가지 종류, Datasets, DataFrames, SQL tables & views가 있다. 쉽게 말해 우리가 스파크에서 트랜스포메이션이나 액션을 적용할 수 있는 자료구조이다.
DataFrames와 Datasets은 로우와 컬럼으로 이루어진 테이블 구조이다. 모든 컬럼은 같은 수의 로우를 가지고 있어야 하며 각 컬럼의 타입은 로우마다 동일해야 한다. 또한 SQL tables와 views는 자바, 스칼라, 파이썬 대신 SQL을 사용할 수 있는 DataFrames라 생각해도 무방하다.
2. 스파크 타입
1) 스파크 타입
스파크는 in-memory 연산에 최적화 된 스파크만의 내부 타입을 가지고 있다. 각 스파크 타입은 자바, 스칼라, 파이썬 등에 있는 타입과 매칭된다. 아래 링크로 가면 스파크 타입의 종류와 각 스파크 타입이 스칼라, 자바, 파이썬, R, SQL에서 어떤 타입에 매핑되는지 볼 수 있다. 예를 들어 스파크의 IntegerType은 스칼라의 Int, 파이썬의 int나 long 타입과 매핑된다.
https://spark.apache.org/docs/latest/sql-ref-datatypes.html
2) Catalyst
스파크는 내부적으로 타입 정보를 저장하기 위해 Catalyst라는 엔진을 사용한다. 우리가 스칼라나 파이썬에서 DataFrames를 사용할 때 우리는 해당 언어의 타입에 대한 연산을 하는 것이 아니라 스파크 타입에 대해 연산을 하게된다. 즉 스칼라나 파이썬 등으로 적힌 연산에 대해 스파크는 Catalyst를 활용하여 스파크만의 형식으로 변환시킨 후 수행을 한다. 그리고 이를 통해 스칼라는 각종 최적화를 진행할 수 있다.
2. DataFrames (데이터프레임)
DataFrames는 Row라는 스칼라 타입의 집합이다. Row 타입은 연산에 최적화된 in-meory 스칼라 데이터 타입이다. Row 타입은 자바나 스칼라의 타입과는 달리 가비지 컬렉션 등의 비용이 안들기 때문에 연산에 최적화 되어있다.
1) Schema (스키마)
스키마는 DataFrames의 컬럼 이름과 타입을 정의한다. 스키마는 사용자가 직접 명시해 줄 수도 있고 데이터를 읽을 때 스파크가 추론하도록 할 수 도 있다.
2) 칼럼과 로우
스파크의 칼럼은 테이블의 칼럼과 거의 유사하다. 또한 로우는 DataFrame에 있는 각 리코드를 의미하는데 스파크의 내부 타입인 Row 타입으로 되어있다.
3. Datasets (데이터셋)
Datasets은 스파크의 type-safe structured API이다. DataFrames은 런타임에 타입체크를 하지만 Datasets은 컴파일 타임에 타입체크가 이루어진다. 따라서 다이나믹 타이핑을 지원하는 파이썬이나 R은 Datasets을 사용할 수 없다.
자바나 스칼라에서 Datasets를 사용하면 데이터의 각 리코드를 자바/스칼라 클래스 타입으로 설정할 수 있다. 예를 들어 스칼라에서 Datasets[Person]라고 하면 Person 클래스 객체를 담고있는 Datasets을 의미한다. 모든 자바/스칼라 클래스가 가능한 것은 아니고 자바에서는 JavaBean 패턴을 따르는 클래스, 스칼라에서는 case 클래스만 가능하다.
Datasets의 좋은점은 필요할 때만 쓸 수 있다는 것이다. 스파크의 DataFrames를 필요에 따라 Datasets로 만든 후 타입 세이프한 로우 레벨 연산을 수행하고 다시 DataFrames로 만들어 하이 레벨 연산을 수행할 수 있다. 또한 Datasets에 take 액션을 취하면 Row가 아니라 자바나 스칼라 객체를 반환한다. 아래는 DataFrames를 Datasets으로 만드는 예시코드이다.
case class Flight(DEST_COUNTRY_NAME: String,
ORIGIN_COUNTRY_NAME: String,
count: BigInt)
val flightsDF = spark.read
.parquet("/data/flight-data/parquet/2010-summary.parquet/")
// Datasets로 변환
val flights = flightsDF.as[Flight]
4. Structured API 실행
Structrued API는 내부적으로 다음과 같은 과정을 거쳐 실행된다.
1) DataFrame, Datasets 코드 작성
2) 로지컬 플랜 변환
작성된 코드는 아래 그림과 같이 두 단계를 거쳐 로지컬 플랜으로 변환된다. 첫 번째 단계로 코드는 스파크에 의해 unresolved 로지컬 플랜으로 변환된다. 이때는 아직 코드의 테이블이나 컬럼이 실제로 존재하는지 체크하기 전이기 때문에 unresolved라고 한다. Unresolved 로지컬 플랜은 스파크의 analyzer에 의해 resolved 로지컬 플랜으로 바뀐다. Analyzer는 catalog를 이용해 resolving을 수행하는데 catalog란 모든 테이블과 데이터프레임 정보를 담고있는 리포지토리이다. Resolved 로지컬 플랜은 Catalyst Optimizer에게 보내져서 최적화된 로지컬 플랜으로 변환된다. Catalyst Optimizer는 정해진 규칙에 따라 predicates나 selections을 push down하여 로지컬 플랜을 최적화한다.
3) 피지컬 플랜 변환
스파크는 로지컬 플랜을 어떻게 실제 클러스터에서 실행할지 계획을 짜는데 이를 피지컬 플랜이라 한다. 해당 과정에서 스파크는 여러개의 피지컬 플랜을 만든 후 비용을 비교하여 최적의 피지컬 플랜을 고른다. 피지컬 플랜은 스파크의 로우 레벨 API인 RDD 트랜스포메이션으로 이루어져있다.
4) 피지컬 플랜 실행
마지막으로 피지컬 플랜이 실제로 실행된다.
다음편
☞ 2022.09.24 - [Data Engineering/스파크] - 스파크 기본 명령어, 스파크 데이터프레임 명령어
'Data Engineering > 스파크' 카테고리의 다른 글
스파크 정규표현식 (스파크 정리 8) (0) | 2022.09.27 |
---|---|
스파크 filter, where, sort, sample, split, union (스파크 정리 6) (0) | 2022.09.27 |
스파크 select, selctExpr, lit, distinct, limit (스파크 정리 5) (0) | 2022.09.25 |
스파크 기본 명령어, 스파크 데이터프레임 명령어 (스파크 정리 4) (0) | 2022.09.24 |
주피터 노트북에서 스칼라와 스파크 사용하기 (스파크 정리 1) (0) | 2022.09.08 |
댓글