본문 바로가기
Data Engineering/스파크

스파크 기본 명령어, 스파크 데이터프레임 명령어 (스파크 정리 4)

by 무언가 하는 소소 2022. 9. 24.
728x90
반응형

 

 

전편

2022.09.22 - [Data Engineering/스파크] - 스파크 Structured API - Datasets, DataFrames, SQL tables (스파크 정리 3)

 

 

이번 포스팅에서는 DataFrames를 조작할 수 있는 여러 기본 명령어들을 알아본다. 언어는 스칼라와 파이썬 두 개를 사용하였다. 실습을 위해 전 포스팅(2022.09.16 - [Data Engineering/스파크] - 스파크란? 스파크 기본 개념 (스파크 정리 2))을 참고해 데이터를 다운받은 후 데이터 하나를 읽어오자. 

// in scalar
val df = spark.read.format("json")
  .load("/data/flight-data/json/2015-summary.json")

 

# in python
df = spark.read.format("json").load("/data/flight-data/json/2015-summary.json")

 

1. 스키마

스키마는 DataFrames에 있는 컬럼의 타입과 이름을 정의한다. 각 컬럼은 StructField(컬럼 이름, 컬럼 타입, nullable) 타입이고 스키마는 다수의 StructField로 이루어진 StructTyoe이다

// schema 출력
df.printSchema()
=======================================================================================
>> 출력
root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: long (nullable = true)

 

스키마는 데이터를 읽으면서 스파크가 추론하게 할 수도 있지만 명시적으로 정의할 수도 있다. 아래는 스칼라와 파이썬으로 스키마를 만든 후에 데이터를 읽을 때 파라미터로 넘겨주어 스키마를 명시하는 코드다.

// in scala
import org.apache.spark.sql.types.{StructField, StructType, StringType, LongType}
import org.apache.spark.sql.types.Metadata

val myManualSchema = StructType(Array(
  StructField("DEST_COUNTRY_NAME", StringType, true),
  StructField("ORIGIN_COUNTRY_NAME", StringType, true),
  StructField("count", LongType, false,
    Metadata.fromJson("{\"hello\":\"world\"}"))
))

val df = spark.read.format("json").schema(myManualSchema)
  .load("/data/flight-data/json/2015-summary.json")

 

# in Python
from pyspark.sql.types import StructField, StructType, StringType, LongType

myManualSchema = StructType([
  StructField("DEST_COUNTRY_NAME", StringType(), True),
  StructField("ORIGIN_COUNTRY_NAME", StringType(), True),
  StructField("count", LongType(), False, metadata={"hello":"world"})
])
df = spark.read.format("json").schema(myManualSchema)\
  .load("/data/flight-data/json/2015-summary.json")

 

 

2. 컬럼 (Column) & 익스프레션 (Expression)

데이터프레임의 컬럼을 레퍼런싱하는 가장 간단한 방법은 col(), column() 함수를 사용하는 것이다.

데이터프레임에서 컬럼을 조작하는 오퍼레이션을 익스프레션(Expression)이라고 한다. 즉, 익스프레션은 테이터프레임에 적용되는 트랜스포메이션 세트이다.

컬럼에 트랜스포메이션을 적용하고 싶다면 두 가지 방법이 있다. 첫 번째는 컬럼을 레퍼런싱 한 후 얻은 객채에 트랜스포메이션을 적용하는 것이다. 두 번째는 컬럼 레퍼런싱 없이 expr() 구문을 사용하는 것이다. expr() 구문은 스트링으로부터 컬럼과 트랜스포메이션을 파싱할 수 있다. 두 가지 방법 모두 똑같은 로지컬 플랜으로 치환된다. 

// in Scala
import org.apache.spark.sql.functions.expr

// 컬럼 레퍼런싱 사용
((col("someCol") + 5 ) * 200) - 6) < col("otherCol") 
// expr() 구문 사용
expr("(((someCol + 5) * 200) - 6) < otherCol")

 

# in Python
from pyspark.sql.functions import expr

# 컬럼 레퍼런싱 사용
((col("someCol") + 5 ) * 200) - 6) < col("otherCol") 
# expr() 구문 사용
expr("(((someCol + 5) * 200) - 6) < otherCol")

 

 

3. 로우 (Row)

스파크에서 각 리코드는 Row라는 스파크 타입으로 관리된다. 스파크는 로우 객체를 칼럼 익스프레션을 이용하여 조작한다. 로우 객체는 내부적으로는 바이트 어레이로 저장된다. 

// in Scala
import org.apache.spark.sql.Row
val myRow = Row("Hello", null, 1, false)

// 스파크에서는 각 원소의 타입을 명시해주지 않으면 Any 타입이 됨
myRow(0) // type Any
myRow(0).asInstanceOf[String] // String
myRow.getString(0) // String
myRow.getInt(2) // Int

 

# in python
from pyspark.sql import Row
myRow = Row("Hello", None, 1, False)

myRow[0]
myRow[2]

 

 

4. DataFrame Transformations (데이터프레임 트랜스포메이션)

데이터프레임 트랜스포메이션에는 다음 네 가지 종류가 있다. 여기서 유의할 점은 해당 명령어들이 원본 데이터프레임을 변형시키는 게 아니라 새로운 데이터프레임을 생성한다는 것이다. 

1) 컬럼이나 로우 제거

2) 컬럼을 로우로 바꾸거나 로우를 컬럼으로 바꾸기

3) 컬럼이나 로우 추가

4) 로우에 대해 소팅

 

1) 데이터프레임 생성 

- 파일로부터 데이터프레임 생성

// in Scala
val df = spark.read.format("json")
  .load("/data/flight-data/json/2015-summary.json")
// 데이터프레임에 SQL 구문을 쓸 수 있게 해줌
df.createOrReplaceTempView("dfTable")

 

# in Python
df = spark.read.format("json").load("/data/flight-data/json/2015-summary.json")
df.createOrReplaceTempView("dfTable")

 

- 직접 데이터프레임 생성

// in Scala
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructField, StructType, StringType, LongType}

val myManualSchema = new StructType(Array(
  new StructField("some", StringType, true),
  new StructField("col", StringType, true),
  new StructField("names", LongType, false)))
val myRows = Seq(Row("Hello", null, 1L))
val myRDD = spark.sparkContext.parallelize(myRows)
val myDf = spark.createDataFrame(myRDD, myManualSchema)
myDf.show()

 

# in Python
from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, StringType, LongType
myManualSchema = StructType([
  StructField("some", StringType(), True),
  StructField("col", StringType(), True),
  StructField("names", LongType(), False)
])
myRow = Row("Hello", None, 1)
myDf = spark.createDataFrame([myRow], myManualSchema)
myDf.show()

 

 

2) 컬럼 추가: withColumn("컬럼명", 컬럼값)

// in Scala
val new_df = df.withColumn("numberOne", lit(1)).show(2)

 

# in Python
new_df = df.withColumn("numberOne", lit(1)).show(2)

>> 출력

+-----------------+-------------------+-----+---------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|numberOne|
+-----------------+-------------------+-----+---------+
|    United States|            Romania|   15|        1|
|    United States|            Croatia|    1|        1|
+-----------------+-------------------+-----+---------+

 

 

3) 컬럼 이름 바꾸기: withColumnRenamed("원래이름", "바꿀이름")

// in Scala
val new_df = df.withColumnRenamed("DEST_COUNTRY_NAME", "dest").columns

 

# in Python
new_df = df.withColumnRenamed("DEST_COUNTRY_NAME", "dest").columns

 

 

4) 컬럼 제거: drop("컬럼명")

// in Scala
val new_df = df.drop("ORIGIN_COUNTRY_NAME").columns

 

# in Python
new_df = df.drop("ORIGIN_COUNTRY_NAME").columns

 

여러개를 한꺼번에 제거할 수도 있다. 

dfWithLongColName.drop("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME")

 

 

5) 컬럼 타입 변경

// in Scala
val new_df = df.withColumn("count2", col("count").cast("long"))

 

# in Python
val new_df = df.withColumn("count2", col("count").cast("long"))

 

 

다음편

☞ 2022.09.25 - [Data Engineering/스파크] - 스파크 select, selctExpr, lit, distinct, limit (스파크 정리 5)

 

스파크 select, selctExpr, lit, distinct, limit (스파크 정리 5)

전편 ☞ 2022.09.24 - [Data Engineering/스파크] - 스파크 기본 명령어, 스파크 데이터프레임 명령어 (스파크 정리 4) 아마도 스파크에서 가장 많이 쓰일 명령어 select. 타겟 데이터프레임에서 보고 싶은

dogfootja.tistory.com

728x90
반응형

댓글