230412 / BSA05. 데이터 전처리 2 : python, pyspark에서 데이터 결합

BSA05_Combine-DataFrame.ipynb

 

필요한 패키지

import pandas as pd

import pyspark
from pyspark.sql import SparkSession

 

python에서 데이터 결합

  • side-by-side (axis=1) : merge or join
    • join은 제거될 예정
  • stacking (axis=0) : concat or append
    • append는 제거될 예정

 

1. 위아래로 결합 (axis=0 : stacking)

# 데이터 불러오기
df1 = pd.read_csv("./combinedata/data1.csv")  # 현재 폴더(.) 안의 폴더(combinedata) 안의 데이터
df2 = pd.read_csv("./combinedata/data2.csv")
df3 = pd.read_csv("./combinedata/data3.csv")

# 위아래로 붙이기 (stacking)
df = pd.concat([df1, df2])
display(df)  # 인덱스를 그대로 가져옴 (같은 인덱스가 있으면 참조하기 어려움)

# 위아래로 붙이기, index는 새로 편집
df = pd.concat([df1, df2], ignore_index=True)  

# 위아래로 붙이기, DF에 key값 추가
df = pd.concat([df1, df2], keys=["데이터1", "데이터2"])

# 위아래로 붙이기, 중복 삭제
df4 = pd.concat([df1, df2], keys=(["데이터1", "데이터2"])).drop_duplicates()
# df4.reset_index(inplace=True)  # 기존 index는 column으로 변경
df4.reset_index(drop=True, inplace=True)  # 기존 index 삭제


# 다른 변수가 있는 경우 : 옵션이 없으면 모두 출력
df = pd.concat([df1, df3])

# 모든 DF에 있는 변수만 출력
df = pd.concat([df1, df3], join='inner')  # inner: 둘 다 있는 것 / outer: 둘 중 하나라도 있는 것

 

2. 옆으로 결합 (axis=1 : side-by-side)

# 동일변수를 확인 후 같은 자료만 좌우로 결합
## inner (default값) : 공통된 컬럼(key='학번')이 공통되는 부분만 결합
## outer : 공통되는 부분 외의 모든 데이터를 결합
## left/right 옵션 지정 가능
df = pd.merge(df1, df2)
df = pd.merge(df4, df3)
display(df)  # 공통되는 데이터(중복데이터)


# 특정 변수(column)값을 기준으로 좌우결합
# 각 DF가 동일변수명을 가지는 경우 변수명에 suffixes가 붙음
# df = pd.merge(df1, df2, on=["학번"], how='inner')  # intersection(default)
# 학번을 기준으로 결합 (학번 외의 공통된 다른 변수가 있다면 첨자를 붙여서 구분함)

# df = pd.merge(df1, df2, on=["학번"], suffixes=["_l","_r"], how='outer')  # union
# df = pd.merge(df1, df2, on=["학번"], suffixes=["_l","_r"], how='left')  # union
# df = pd.merge(df1, df2, on=["학번"], suffixes=["_l","_r"], how='right')  # union
# df = pd.merge(df1, df2, on=["학번"], suffixes=["_l","_r"], how='outer', indicator=True)  # 좌, 우, 좌우에 포함여부
# df = pd.merge(df1, df2, on=["학번"], suffixes=["_l","_r"], how='outer', indicator="indicator_col") 
df = pd.merge(df1, df2, suffixes=["_l","_r"], left_index=True, right_index=True)  # union
display(df)


# 다른 변수명의 값을 기준으로 좌우 결합
df5 = df2.copy()
df5.rename(columns={"학번":"ID"}, inplace=True)
df = pd.merge(df1, df5, left_on="학번", right_on="ID", how="right")
display(df)


# 비교 변수명에 동일 값이 여러 개 있는 경우
df = pd.merge(df1, df2, on=["이름"], how='inner')
display(df)

 

pyspark에서 데이터 결합

스파크 세션 시작 및 데이터 준비

스파크 = SparkSession.builder.appName('mergeDF').getOrCreate()
스파크.conf.set("spark.sql.execution.arrow.enabled", "true")

스파크DF1 = 스파크.read.option('header','true').csv("hdfs://localhost:9000/Spark/combinedata/data1.csv")
스파크DF2 = 스파크.read.option('header','true').csv("hdfs://localhost:9000/Spark/combinedata/data2.csv")
스파크DF3 = 스파크.read.option('header','true').csv("hdfs://localhost:9000/Spark/combinedata/data3.csv")

 

데이터 결합

스파크DF1.show()
# 확인 후 한글을 읽지 못하는 경우 위 프로그램에서 "utf-8"을 "cp949"로 변경


# 같은 columns
# 같은 것이 있어도 그대로 추가
DF = 스파크DF1.union(스파크DF2)
DF.show(30)


DF = 스파크DF1.unionAll(스파크DF2)
DF.show(30)


# 중복 삭제
DF = 스파크DF1.union(스파크DF2).distinct()
DF.show(30)


# PySpark
DF = 스파크DF2.unionByName(스파크DF3, allowMissingColumns=True)
DF.show(30)


# side-by-side (axis=1)
# join
스파크DF1.join(스파크DF2).show()


DF = 스파크DF1.join(스파크DF3, 스파크DF1, 학번==스파크DF3.학번)
# DF = 스파크DF1.join(스파크DF3, 스파크DF1, 학번==스파크DF3.학번, "inner")
# DF = 스파크DF1.join(스파크DF3, 스파크DF1, 학번==스파크DF3.학번, "outer")
# DF = 스파크DF1.join(스파크DF3, 스파크DF1, 학번==스파크DF3.학번, "left")
# DF = 스파크DF1.join(스파크DF3, 스파크DF1, 학번==스파크DF3.학번, "right")
DF.show()