BSA05_Pyspark-Missing-Values.ipynb
필요한 패키지
from pyspark.sql import SparkSession
from pyspark.ml.feature import Imputer
from pyspark.sql.types import StringType
스파크 세션 시작 및 데이터 준비
# 'Missing'이라는 이름의 스파크 세션 시작
스파크 = SparkSession.builder.appName('Missing').getOrCreate()
# 메모리 문제 발생 시 해결하기 위한 코드
스파크.conf.set("spark.sql.execution.arrow.enabled", "true")
# 데이터 불러오기
## utf-8 형태로 저장되어 있다면 encoding="cp949" 삭제
## ansi 형태로 저장되어 있다면 encoding="cp949" 옵션 설정
스파크DF = 스파크.read.csv("hdfs://localhost:9000/Spark/Employee_missing.csv", header=True, encoding="cp949", inferSchema=True)
스파크DF.show()
결측값 제거
1. 열 제거
# 열 제거
스파크DF.drop('id').show() # id 컬럼 삭제 (2개 이상의 컬럼을 삭제할 때에는 리스트로 묶어서)
2. 행 제거
- DataFrame.dropna([how,thresh,subset])
- DataFrame.na.drop()
- default: how=any
# 행 제거
스파크DF.na.drop().show()
스파크DF.dropna().show() # NULL이 하나라도 있으면 제거
# 모든 행이 NULL이면 제거
스파크DF.na.drop(how="all").show()
스파크DF.dropna(how="all").show()
# 결측값이 threshold 미만이면 제거
스파크DF.na.drop(thresh=2).show()
스파크DF.dropna(thresh=2).show() # 결측값이 아닌 개수가 2개 미만이면 제거
# subset : 특정 컬럼에 대해서만 결측값 처리
스파크DF.na.drop(subset=['jobtime']).show()
스파크DF.dropna(subset=['jobtime']).show()
결측값 대체
# 결측값 대체
## 문자로 이루어진 컬럼(데이터)(:gender, jobcat, minority)의 결측값만 'NA'로 바뀌고
## 숫자로 이루어진 컬럼(데이터)의 결측값은 바뀌지 않음(null로 그대로)
스파크DF.na.fill('NA').show()
스파크DF.fillna('NA').show()
# 특정 컬럼에 해당하는 결측값만 대체
스파크DF.na.fill('NA', 'jobtime').show()
스파크DF.fillna('NA', 'jobtime').show()
# 여러 개의 컬럼에 해당하는 결측값만 대체
스파크DF.na.fill('NA', ['gender', 'jobtime']).show()
스파크DF.fillna('NA', ['gender', 'jobtime']).show()
통계값 이용
# 수치자료 대체 : mean=평균, median=중앙값, mode=최빈값
# 1. 평균
## educ 변수의 결측값은 educ의 평균값으로 대체
## salary 변수의 결측값은 salary의 평균값으로 대체
대체 = Imputer(
inputCols = ['educ', 'salary'], # 대체할 변수 지정
outputCols = ["{}_imputed".format(c) for c in ['educ','salary']]
).setStrategy("mean")
# add imputation columns to dataframe
## fit(스파크DF) : 평균을 계산하는 과정
## transform(스파크DF) : 평균을 대입하는 과정
## 평균을 계산하는 데이터프레임과 평균을 대입하는 데이터프레임이 다르다면 괄호안의 값이 다를 수도 있음
대체.fit(스파크DF).transform(스파크DF).show()
# 2. 최빈값
대체 = Imputer(
inputCols = ['educ', 'salary'],
outputCols = ["{}_imputed".format(c) for c in ['educ','salary']]
).setStrategy("mode")
대체.fit(스파크DF).transform(스파크DF).show()
# 수치형 변수만 평균으로 결측값 대체
수치변수 = [변수.name for 변수 in 스파크DF.schema.fields if not isinstance(변수.dataType, StringType)]
수치변수
스파크DF[수치변수].show()
# 여러 개의 컬럼이므로 inputCols, outputCols로 s 붙여서
대체 = Imputer(inputCols=수치변수, outputCols=수치변수).setStrategy("mean")
대체.fit(스파크DF).transform(스파크DF).show()
'Statistics > BSA' 카테고리의 다른 글
230417 / BSA05. 데이터 전처리 3 : 데이터 불균형, resampling (0) | 2023.04.23 |
---|---|
230412 / BSA05. 데이터 전처리 2 : python, pyspark에서 데이터 결합 (0) | 2023.04.23 |
230410 / BSA05. 데이터 전처리 2 : python에서 결측값 처리 (0) | 2023.04.14 |
230405 / BSA05. 데이터 전처리 (0) | 2023.04.10 |
230329 / BSA04. 오디오 데이터 처리하기 (0) | 2023.04.02 |