전편
이번 포스팅에서는 DataFrames를 조작할 수 있는 여러 기본 명령어들을 알아본다. 언어는 스칼라와 파이썬 두 개를 사용하였다. 실습을 위해 전 포스팅(2022.09.16 - [Data Engineering/스파크] - 스파크란? 스파크 기본 개념 (스파크 정리 2))을 참고해 데이터를 다운받은 후 데이터 하나를 읽어오자.
// in scalar
val df = spark.read.format("json")
.load("/data/flight-data/json/2015-summary.json")
# in python
df = spark.read.format("json").load("/data/flight-data/json/2015-summary.json")
1. 스키마
스키마는 DataFrames에 있는 컬럼의 타입과 이름을 정의한다. 각 컬럼은 StructField(컬럼 이름, 컬럼 타입, nullable) 타입이고 스키마는 다수의 StructField로 이루어진 StructTyoe이다
// schema 출력
df.printSchema()
=======================================================================================
>> 출력
root
|-- DEST_COUNTRY_NAME: string (nullable = true)
|-- ORIGIN_COUNTRY_NAME: string (nullable = true)
|-- count: long (nullable = true)
스키마는 데이터를 읽으면서 스파크가 추론하게 할 수도 있지만 명시적으로 정의할 수도 있다. 아래는 스칼라와 파이썬으로 스키마를 만든 후에 데이터를 읽을 때 파라미터로 넘겨주어 스키마를 명시하는 코드다.
// in scala
import org.apache.spark.sql.types.{StructField, StructType, StringType, LongType}
import org.apache.spark.sql.types.Metadata
val myManualSchema = StructType(Array(
StructField("DEST_COUNTRY_NAME", StringType, true),
StructField("ORIGIN_COUNTRY_NAME", StringType, true),
StructField("count", LongType, false,
Metadata.fromJson("{\"hello\":\"world\"}"))
))
val df = spark.read.format("json").schema(myManualSchema)
.load("/data/flight-data/json/2015-summary.json")
# in Python
from pyspark.sql.types import StructField, StructType, StringType, LongType
myManualSchema = StructType([
StructField("DEST_COUNTRY_NAME", StringType(), True),
StructField("ORIGIN_COUNTRY_NAME", StringType(), True),
StructField("count", LongType(), False, metadata={"hello":"world"})
])
df = spark.read.format("json").schema(myManualSchema)\
.load("/data/flight-data/json/2015-summary.json")
2. 컬럼 (Column) & 익스프레션 (Expression)
데이터프레임의 컬럼을 레퍼런싱하는 가장 간단한 방법은 col(), column() 함수를 사용하는 것이다.
데이터프레임에서 컬럼을 조작하는 오퍼레이션을 익스프레션(Expression)이라고 한다. 즉, 익스프레션은 테이터프레임에 적용되는 트랜스포메이션 세트이다.
컬럼에 트랜스포메이션을 적용하고 싶다면 두 가지 방법이 있다. 첫 번째는 컬럼을 레퍼런싱 한 후 얻은 객채에 트랜스포메이션을 적용하는 것이다. 두 번째는 컬럼 레퍼런싱 없이 expr() 구문을 사용하는 것이다. expr() 구문은 스트링으로부터 컬럼과 트랜스포메이션을 파싱할 수 있다. 두 가지 방법 모두 똑같은 로지컬 플랜으로 치환된다.
// in Scala
import org.apache.spark.sql.functions.expr
// 컬럼 레퍼런싱 사용
((col("someCol") + 5 ) * 200) - 6) < col("otherCol")
// expr() 구문 사용
expr("(((someCol + 5) * 200) - 6) < otherCol")
# in Python
from pyspark.sql.functions import expr
# 컬럼 레퍼런싱 사용
((col("someCol") + 5 ) * 200) - 6) < col("otherCol")
# expr() 구문 사용
expr("(((someCol + 5) * 200) - 6) < otherCol")
3. 로우 (Row)
스파크에서 각 리코드는 Row라는 스파크 타입으로 관리된다. 스파크는 로우 객체를 칼럼 익스프레션을 이용하여 조작한다. 로우 객체는 내부적으로는 바이트 어레이로 저장된다.
// in Scala
import org.apache.spark.sql.Row
val myRow = Row("Hello", null, 1, false)
// 스파크에서는 각 원소의 타입을 명시해주지 않으면 Any 타입이 됨
myRow(0) // type Any
myRow(0).asInstanceOf[String] // String
myRow.getString(0) // String
myRow.getInt(2) // Int
# in python
from pyspark.sql import Row
myRow = Row("Hello", None, 1, False)
myRow[0]
myRow[2]
4. DataFrame Transformations (데이터프레임 트랜스포메이션)
데이터프레임 트랜스포메이션에는 다음 네 가지 종류가 있다. 여기서 유의할 점은 해당 명령어들이 원본 데이터프레임을 변형시키는 게 아니라 새로운 데이터프레임을 생성한다는 것이다.
1) 컬럼이나 로우 제거
2) 컬럼을 로우로 바꾸거나 로우를 컬럼으로 바꾸기
3) 컬럼이나 로우 추가
4) 로우에 대해 소팅
1) 데이터프레임 생성
- 파일로부터 데이터프레임 생성
// in Scala
val df = spark.read.format("json")
.load("/data/flight-data/json/2015-summary.json")
// 데이터프레임에 SQL 구문을 쓸 수 있게 해줌
df.createOrReplaceTempView("dfTable")
# in Python
df = spark.read.format("json").load("/data/flight-data/json/2015-summary.json")
df.createOrReplaceTempView("dfTable")
- 직접 데이터프레임 생성
// in Scala
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructField, StructType, StringType, LongType}
val myManualSchema = new StructType(Array(
new StructField("some", StringType, true),
new StructField("col", StringType, true),
new StructField("names", LongType, false)))
val myRows = Seq(Row("Hello", null, 1L))
val myRDD = spark.sparkContext.parallelize(myRows)
val myDf = spark.createDataFrame(myRDD, myManualSchema)
myDf.show()
# in Python
from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, StringType, LongType
myManualSchema = StructType([
StructField("some", StringType(), True),
StructField("col", StringType(), True),
StructField("names", LongType(), False)
])
myRow = Row("Hello", None, 1)
myDf = spark.createDataFrame([myRow], myManualSchema)
myDf.show()
2) 컬럼 추가: withColumn("컬럼명", 컬럼값)
// in Scala
val new_df = df.withColumn("numberOne", lit(1)).show(2)
# in Python
new_df = df.withColumn("numberOne", lit(1)).show(2)
>> 출력
+-----------------+-------------------+-----+---------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|numberOne|
+-----------------+-------------------+-----+---------+
| United States| Romania| 15| 1|
| United States| Croatia| 1| 1|
+-----------------+-------------------+-----+---------+
3) 컬럼 이름 바꾸기: withColumnRenamed("원래이름", "바꿀이름")
// in Scala
val new_df = df.withColumnRenamed("DEST_COUNTRY_NAME", "dest").columns
# in Python
new_df = df.withColumnRenamed("DEST_COUNTRY_NAME", "dest").columns
4) 컬럼 제거: drop("컬럼명")
// in Scala
val new_df = df.drop("ORIGIN_COUNTRY_NAME").columns
# in Python
new_df = df.drop("ORIGIN_COUNTRY_NAME").columns
여러개를 한꺼번에 제거할 수도 있다.
dfWithLongColName.drop("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME")
5) 컬럼 타입 변경
// in Scala
val new_df = df.withColumn("count2", col("count").cast("long"))
# in Python
val new_df = df.withColumn("count2", col("count").cast("long"))
다음편
☞ 2022.09.25 - [Data Engineering/스파크] - 스파크 select, selctExpr, lit, distinct, limit (스파크 정리 5)
'Data Engineering > 스파크' 카테고리의 다른 글
스파크 정규표현식 (스파크 정리 8) (0) | 2022.09.27 |
---|---|
스파크 filter, where, sort, sample, split, union (스파크 정리 6) (0) | 2022.09.27 |
스파크 select, selctExpr, lit, distinct, limit (스파크 정리 5) (0) | 2022.09.25 |
스파크 Structured API - Datasets, DataFrames, SQL tables (스파크 정리 3) (0) | 2022.09.22 |
주피터 노트북에서 스칼라와 스파크 사용하기 (스파크 정리 1) (0) | 2022.09.08 |
댓글