230501 / BSA09. pyspark에서 탐색적 데이터 분석 (EDA)

BSA06_Pyspark-EDA.ipynb

 

패키지 호출 및 스파크 세션 시작하기

from pyspark.sql import SparkSession
스파크 = SparkSession.builder.appName('Dataframe').getOrCreate()
스파크.conf.set("spark.sql.execution.arrow.pyspark.enabled","true")

 

데이터 불러오기

# 그냥 읽어들이면 데이터를 다 문자로 인식함
DF스파크 = 스파크.read.option('encoding','cp949').option('header','true').csv("Employee.csv")
DF스파크.show()

# Check the Schema
DF스파크.printSchema()


# inferSchema=True : 데이터의 정보를 사용하겠다는 의미
DF스파크 = 스파크.read.option('encoding','cp949').option('header','true').csv("Employee.csv",inferSchema=True)
DF스파크.printSchema()

DF스파크 = 스파크.read.csv("Employee.csv",header=True,encoding='cp949',inferSchema=True)
DF스파크.printSchema()

 

데이터프레임 정보 확인하기

# 리스트 형태로 저장됨
DF스파크.columns

DF스파크.head(5)

DF스파크.show()

# show() : 기본적으로 20개의 행을 보여줌 (default=20)
# show()가 실행된다는 것은 show()앞부분이 pyspark의 dataframe이라는 의미 
# (즉, DF스파크.select('id')가 pyspark의 dataframe임)
DF스파크.select('id').show()
DF스파크.select(['id','gender']).show()

# DF스파크['gender']는 데이터프레임이 아님
# DF스파크['gender'].show()
# pyspark의 dataframe의 데이터 타입 확인
DF스파크.dtypes

# DF스파크.describe()를 데이터프레임으로 만들었다는 의미
# count, mean, min(수치형이 아닌 컬럼은 빈도수가 작은 것을 반환), max(수치형이 아닌 컬럼은 빈도수가 큰 것을 반환) 계산
DF스파크.describe().show()

 

새로운 컬럼 생성 및 컬럼 이름 변경하기

# Adding columns in dataframe
# DF스파크['jobtime']*2 결과를 DF스파크['jobtime2']에 저장
DF스파크 = DF스파크.withColumn('jobtime2',DF스파크['jobtime']*2)
DF스파크.show()

# Rename the columns
DF스파크 = DF스파크.withColumnRenamed('jobtime2','jobtime3')
DF스파크.show()

 

컬럼 제거하기

### Drop the columns
DF스파크 = DF스파크.drop('jobtime3')
DF스파크.show()

 

조건으로 데이터 추출하기

# Salary of the people less than or equal to 30000
DF스파크.filter("salary <= 30000").show()
DF스파크.filter(DF스파크['salary'] <= 30000).show()

# filter와 where는 유사한 형태임
DF스파크.where("salary <= 30000").show()

# 해당 조건을 만족하는 변수(gender, jobcat) 선택
DF스파크.filter("salary <= 50000").select(['gender','jobcat']).show()

DF스파크.filter((DF스파크['salary'] <= 30000) & (DF스파크['salary'] >= 25000)).show()

DF스파크.filter(~(DF스파크['salary'] <= 30000)).show()

 

groupby, aggregate 함수 적용하기

# 성별에 따른 count 비교
DF스파크.groupBy('gender').count().show()

# 성별에 따른 id, educ, salary, salbegin, jobtime, prevexp의 평균 비교
DF스파크.groupBy('gender').mean().show()

# 성별에 따른 salary의 평균 비교
DF스파크.groupBy('gender').mean('salary').show()

# 성별, 직업에 따른 최대값 비교
DF스파크.groupBy(['gender','jobcat']).max().show()

# salary의 평균, salbegin의 최솟값
DF스파크.agg({'salary':'mean', 'salbegin':'min'}).show()

 

데이터프레임 정렬하기

# 특정 컬럼 값을 기준으로 데이터 정렬
# salary를 기준으로 내림차순 정렬
DF스파크.orderBy("salary",ascending=False).show()

# educ을 기준으로 내림차순 정렬 후에 salary를 기준으로 내림차순 정렬
DF스파크.orderBy("educ","salary",ascending=[False,False]).show()

 

데이터프레임 복사하기

# 스파크 DF 복사
# 모든 변수를 선택해서 집어넣음 (복사하는 것과 같은 작업)
DF복사 = DF스파크.select("*")

 

표준화

# select(avg(col("salary"))) : salary 컬럼을 가져와서 평균(average)을 계산
평균 = DF복사.select(avg(col("salary")))
평균.show()  # 데이터프레임 형태

# 위의 데이터프레임에서 평균값만 가져오는 작업
평균 = DF복사.select(avg(col("salary"))).first()[0]

# salary 컬럼의 표준편차 계산
표준편차 = DF복사.select(stddev(col("salary"))).first()[0]

# (col("salary")-평균)/표준편차) : salary의 값을 가져와서 평균을 빼고 표준편차를 나눈 값을 'salary_STD'에 저장
DF복사.withColumn("salary_STD",(col("salary")-평균)/표준편차).show()