1. 스파크란?
스파크는 클러스터 환경에서 데이터를 병렬적으로 처리하기 위한 라이브러리 집합이다. 하둡의 맵리듀스와 같은 역할을 하는 프레임워크라고 보면 되는데 다른 점이 몇가지 있다. 그리고 이 다른 점이 많은 사람들이 스파크를 쓰는 이유이다.
1) 스파크는 통합엔진이다.
스파크가 통합엔진이라는 것은 간단하게 말해 아주 다양한 라이브러리를 지원해준다는 뜻이다. 아래 그림을 보면 스파크는 spark core외에도 머신러닝을 위한 MLlib, 그래프 연산을 위한 GraphX 등 다양한 기능을 제공한다. 스파크 이전에는 이런 기능들을 쓰기 위해 각기 다른 프레임워크를 사용해야 했다(하이브, 마홋, ...). 하지만 이 모든 기능을 제공해주는 스파크가 등장하면서 사람들은 스파크 하나로 모든 것을 해결할 수 있게 되었다.
2) 스파크는 빠르다.
스파크, 하면 제일 먼저 떠오르는 것이 인메모리 컴퓨팅 엔진이라는 키워드다. 스파크는 중간 결과를 모두 디스크에 쓰는 맵리듀스와 달리 중간 결과를 모두 메모리(RAM)에 계속 가지고 있다. 따라서 맵리듀스와 비교했을 때 월등히 빠르다.
3) 스파크는 쉽다.
한번이라도 맵리듀스 프로그램을 짜본 사람은 맵리듀스 프로그램을 짜기 얼마나 어려운지 알 것이다. 하지만 스파크는 자바 외에 다양한 언어, 특히 스칼라와 파이썬을 지원하며 코드도 맵리듀스와 비교했을 때 매우 간결하다. 또한 우리가 별도의 작업을 하지 않아도 스파크에서 자체적으로 최적화를 해준다.
2. 스파크 기본 개념
1) 스파크 구조
스파크는 클러스터의 여러 머신에 분배된 잡을 조정하는 프레임워크로써 작동 한다. 하지만 스파크가 클러스터 자체를 관리하는 것은 아니다. 클러스터는 기존에 YARN과 같은 클러스터 매니저가 관리하고 스파크는 매니저에게서 자원을 할당받는다.
2) 스파크 애플리케이션
스파크 애플리케이션은 드라이버 프로세스(driver process)와 엑세큐터 프로세스(executor process)로 이루어진다. 드라이버 프로세스는 애플리케이션의 메인 함수를 실행하며 세 가지 역할을 수행한다; 스파크 애플리케이션을 관리하고, 유저의 인풋과 프로그램에 응답하고, 엑세큐터 사이에 일을 분배하고 스케줄링한다. 엑세큐터 프로세스는 드라이버 프로세스에게 할당받은 일을 수행한다.
3) 스파크 API
① Language API
스파크는 다음 언어들을 지원한다.
Scala, Java, Python, SQL, R
② 스파크 API
스파크 API는 로우 레벨의 unstructured API(RDD)와 하이 레벨의 structured API(DataFrame, Datasets)로 나눠진다. DataFrame과 Datasets는 로우와 컬럼으로 이루어진 테이블 형태의 컬렉션이다. 우리는 각종 스파크 명령어들을 통해 DataFrame과 Datasets 타입으로 된 데이터들을 조작, 계산, 출력 할 수 있다. 참고로 요즘은 로우 레벨 API는 많이 사용하지 않는다.
4) 스파크 세션
스파크 애플리케이션은 스파크 세션(Spark Session)이라 불리는 드라이버 프로세스에 의해 관리된다. 스파크 세션과 스파크 애플리케이션 간에는 일대일 관계가 성립한다.
5) 파티션 (Partitions)
데이터를 병렬적으로 처리하기 위해서 스파크는 데이터를 파티션이라는 단위로 나누어 관리한다. 즉, 파티션은 병렬도를 결정하는 척도이다.
6) 트랜스포메이션 (Transformations)
스파크에서 데이터들을 조작하는 명령어를 트랜스포메이션이라 한다. 하지만 트랜스포메이션 자체 만으로는 아무 일이 일어나지 않는다. 액션을 호출해야 트랜스포메이션이 실행된다. 트랜스포메이션에는 두가지 타입, 네로우 트랜스포메이션과 와이드 트랜스포메이션이 있다. 네로우 트랜스포메이션은 하나의 인풋 파티션이 하나의 아웃풋 파티션을 형성한다. 네로우 트랜스포메이션을 처리할 때 스파크는 파이프라이닝을 통해 여러개의 트랜스포메이션들을 in-memory로 처리한다. 와이드 트랜스포메이션은 하나의 파티션이 여러 아웃풋 파티션의 결과에 영향을 미친다. 따라서 파티션의 교환이 일어나는데, 이를 스파크에서는 셔플(Shuffle)이라고 한다. 이때는 in-memory가 아닌 디스크가 사용된다.
5) 레이지 이벨류에이션 (Lazy Evaluation)
스파크는 결과물이 필요하기 직전까지 실제 연산(트랜스포메이션)을 하지 않는다. 대신 트랜스포메이션을 사용하여 플랜(plan)을 만든다. 이 플랜을 로지컬 플랜(logical plan)이라고 하고 실제 연산을 해야할 때 이 로지컬 플랜을 피지컬 플랜 (physical plan)으로 컴파일한다. 이것을 레이지 이벨류에이션이라고 하며 스파크는 레이지 이벨류에이션을 통해 전체적인 데이터플로우를 최적화 할 수 있다.
6) 액션 (Actions)
실제 연산을 수행하기 위해서는 액션을 실행해야 한다. 액션에는 세 가지 종류가 있는데 첫 번째는 콘솔에 데이터를 출력하는 것이고, 두 번째는 데이터를 사용 언어의 네이티브 객체로 변환하는 것, 마지막은 아웃풋 데이터를 적는 것이다. 예를들어 count라는 액션을 호출하면 우선 트랜스포메이션들이 실행되고 각 파티션 별로 카운트가 실행 되어 합쳐진 다음 최종 결과가 언어의 네이티브 객체로 변환된다.
5. 스파크 맛보기 간단 예시
아래는 csv 포맷으로 된 비행기록을 스파크의 DataFrames 타입으로 읽는 스파크 코드이다. 여러 옵션은 현재로써는 무시해도 괜찮다. 한가지 유의할 점은 read는 액션이 아니므로 실제로 데이터를 읽지 않는다. take 액션을 수행하여야 실제로 데이터를 읽어서 출력한다.
val flightData2015 = spark
.read
.option("inferSchema", "true")
.option("header", "true")
.csv("/data/flight-data/csv/2015-summary.csv")
flightsData2015.take(3)
한편 DataFrames의 특정 컬럼에 대해 sort 연산을 수행할 수 있다. 하지만 sort 역시 액션이 아닌 트랜스포메이션이므로 실제로 데이터에는 아무 일도 일어나지 않는다. 다만 DataFrames에 explain을 호출하여 로지컬 플랜을 볼 수 있다.
flightsData2015.sort("count").explain()
또한 스파크에서는 DataFrames을 테이블이나 뷰로 바꾸어서 스칼라나 파이썬 대신 SQL을 이용하여 데이터를 조작할 수 있다.
flightsData2015.createOrReplaceTempView("flight_data_2015")
// SQL명령어를 사용하기 위해서는 spark.sql("SQL 쿼리")사용
val sqlWay = spark.sql("""
SELECT DEST_COUNTRY_NAME, count(1)
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
""")
// 출력
sqlWay.show()
'Data Engineering > 스파크' 카테고리의 다른 글
스파크 NULL 처리 (스파크 정리 9) (0) | 2022.10.08 |
---|---|
스파크 repartition, coalesce, collect (스파크 정리 7) (0) | 2022.09.29 |
스파크 정규표현식 (스파크 정리 8) (0) | 2022.09.27 |
스파크 filter, where, sort, sample, split, union (스파크 정리 6) (0) | 2022.09.27 |
스파크 select, selctExpr, lit, distinct, limit (스파크 정리 5) (0) | 2022.09.25 |
댓글