230412 / BSA05. 데이터 전처리 2 : pyspark에서 결측값 처리

BSA05_Pyspark-Missing-Values.ipynb

 

필요한 패키지

from pyspark.sql import SparkSession
from pyspark.ml.feature import Imputer
from pyspark.sql.types import StringType

 

스파크 세션 시작 및 데이터 준비

# 'Missing'이라는 이름의 스파크 세션 시작
스파크 = SparkSession.builder.appName('Missing').getOrCreate()

# 메모리 문제 발생 시 해결하기 위한 코드
스파크.conf.set("spark.sql.execution.arrow.enabled", "true") 

# 데이터 불러오기
## utf-8 형태로 저장되어 있다면 encoding="cp949" 삭제
## ansi 형태로 저장되어 있다면 encoding="cp949" 옵션 설정
스파크DF = 스파크.read.csv("hdfs://localhost:9000/Spark/Employee_missing.csv", header=True, encoding="cp949", inferSchema=True)  
스파크DF.show()

 

결측값 제거

1. 열 제거

# 열 제거
스파크DF.drop('id').show()  # id 컬럼 삭제 (2개 이상의 컬럼을 삭제할 때에는 리스트로 묶어서)

 

2. 행 제거

  • DataFrame.dropna([how,thresh,subset])
  • DataFrame.na.drop()
  • default: how=any
# 행 제거
스파크DF.na.drop().show()
스파크DF.dropna().show()  # NULL이 하나라도 있으면 제거

# 모든 행이 NULL이면 제거
스파크DF.na.drop(how="all").show()
스파크DF.dropna(how="all").show()

# 결측값이 threshold 미만이면 제거
스파크DF.na.drop(thresh=2).show()
스파크DF.dropna(thresh=2).show()  # 결측값이 아닌 개수가 2개 미만이면 제거

# subset : 특정 컬럼에 대해서만 결측값 처리
스파크DF.na.drop(subset=['jobtime']).show()
스파크DF.dropna(subset=['jobtime']).show()

 

결측값 대체

# 결측값 대체
## 문자로 이루어진 컬럼(데이터)(:gender, jobcat, minority)의 결측값만 'NA'로 바뀌고
## 숫자로 이루어진 컬럼(데이터)의 결측값은 바뀌지 않음(null로 그대로)
스파크DF.na.fill('NA').show()
스파크DF.fillna('NA').show()

# 특정 컬럼에 해당하는 결측값만 대체
스파크DF.na.fill('NA', 'jobtime').show()
스파크DF.fillna('NA', 'jobtime').show()

# 여러 개의 컬럼에 해당하는 결측값만 대체
스파크DF.na.fill('NA', ['gender', 'jobtime']).show()
스파크DF.fillna('NA', ['gender', 'jobtime']).show()

 

통계값 이용

# 수치자료 대체 : mean=평균, median=중앙값, mode=최빈값

# 1. 평균 
## educ 변수의 결측값은 educ의 평균값으로 대체
## salary 변수의 결측값은 salary의 평균값으로 대체
대체 = Imputer(
    inputCols = ['educ', 'salary'],  # 대체할 변수 지정
    outputCols = ["{}_imputed".format(c) for c in ['educ','salary']]
).setStrategy("mean")  

# add imputation columns to dataframe
## fit(스파크DF) : 평균을 계산하는 과정
## transform(스파크DF) : 평균을 대입하는 과정
## 평균을 계산하는 데이터프레임과 평균을 대입하는 데이터프레임이 다르다면 괄호안의 값이 다를 수도 있음
대체.fit(스파크DF).transform(스파크DF).show()


# 2. 최빈값 
대체 = Imputer(
    inputCols = ['educ', 'salary'],
    outputCols = ["{}_imputed".format(c) for c in ['educ','salary']]
).setStrategy("mode")

대체.fit(스파크DF).transform(스파크DF).show()
# 수치형 변수만 평균으로 결측값 대체
수치변수 = [변수.name for 변수 in 스파크DF.schema.fields if not isinstance(변수.dataType, StringType)]
수치변수

스파크DF[수치변수].show()

# 여러 개의 컬럼이므로 inputCols, outputCols로 s 붙여서
대체 = Imputer(inputCols=수치변수, outputCols=수치변수).setStrategy("mean")
대체.fit(스파크DF).transform(스파크DF).show()