230329 / BSA04. 다양한 형식의 데이터 파일 읽고 저장하기

BSA04_ReadWrite-DataFiles

 

필요한 패키지

import os
import sys
import pyspark
import pandas as pd

from pyspark.sql import SparkSession

 

환경 변수 설정

os.environ['JAVA_HOME'] = "C:\Java"
os.environ['SPARK_HOME'] = "C:\spark-3.2.3"
os.environ['PYLIB'] = "C:\spark-3.2.3\python\lib"
sys.path.insert(0,os.environ['PYLIB']+"\py4j-0.10.9.5-src.zip")
sys.path.insert(0,os.environ['PYLIB']+"\pyspark.zip")

 

스파크 세션 생성 

pandasDF = pd.read_csv("Employee.csv",encoding="ansi")
pandasDF

# 'Test'이름의 sparksession 객체 생성
# 이미 생성된 스파크 세션이 있다면 그것을 반환하고, 없으면 새로 만들기 
스파크 = SparkSession.builder.appName('Test').getOrCreate()

# 메모리 관리와 관련 
## Arrow 라이브러리를 사용하여 데이터프레임을 변환할 때의 성능을 개선하기 위한 설정
스파크.conf.set("spark.sql.execution.arrow.enabled","true")
스파크

 

웹브라우저에서 localhost:4040 연결

 

pyspark에서 hdfs 데이터 불러오기

# spark 경로를 찾지 못하는 경우
!pip install findspark
import findspark
import os
findspark.find()
findspark.init(os.environ.get("SPARK_HOME"))

 

컴퓨터 환경에 따른 실행결과 차이

CMD에서 start-dfs.cmd와 start-yarn.cmd 실행 후 hadoop fs -put Employee.csv spark_Employee.csv 실행했을 때

  • user/사용자이름/hadoop_Employee.csv가 생성되는 경우 아래 명령어 오류 발생
  • 생성되지 않는 경우 아래 명령어 실행
sparkDF = 스파크.read.csv("Employee.csv")
sparkDF.show()

 

CMD에서

stat-all.cmd  # 하둡 실행
hadoop fs -mkdir /Spark
hadoop fs -put Employee.csv /Spark/hadoop_Employee.csv

 

데이터프레임 불러오기

sparkDF = 스파크.read.csv("hdfs://localhost:9000/Spark/spark_Employee.csv")
sparkDF.show()
# 하둡에 있는 데이터라면 hdfs://localhost:9000/로 표시함 (hadoop distributed file system)
# 여전히 한글이어서 글자 깨짐 현상이 발생
# header(변수)를 인식하지 못하고 데이터를 그냥 가져옴


# header가 없는 것을 기본으로 함 (csv는 header가 기본적으로 있음)
sparkDF = 스파크.read.option('encoding','cp949').csv("hdfs://localhost:9000/Spark/spark_Employee.csv")
sparkDF
sparkDF.show()  # 이 코드를 실행해야 데이터프레임 확인 가능


# .option() : 옵션을 추가하는 경우
sparkDF = 스파크.read.option('encoding','cp949').option('header','true').csv("hdfs://localhost:9000/Spark/spark_Employee.csv")


# show() : 하둡에서 데이터를 보고 싶은 경우
sparkDF.show(5)


type(sparkDF)
sparkDF.head(5)
sparkDF.printSchema()


## smu: user name
data = 스파크.read.csv("hdfs://localhost:9000/Spark/spark_Employee.csv", header=True, encoding="cp949", inferSchema="true")
data.show()

 

데이터프레임을 다른 형식으로 변환하고 저장/불러오기

  • pandas dataframe ↔ spark dataframe
pandasDF_spark = sparkDF.toPandas()  # pandas의 dataframe으로 바꿔줌 (spark -> pandas)
pandasDF_spark.head()

### !pip install pyarrow
스파크.conf.set("spark.sql.execution.arrow.enabled","true")  # 메모리 관리와 관련

# 에러 발생 시 아래 코드로 실행
# spark.conf.set("spark.sql.execution.arrow.pyspark.fallback.enabled","true")

# 판다스로 만들었던 데이터프레임을 스파크의 데이터프레임으로 만듦
sparkDF_pd  = 스파크.createDataFrame(pandasDF)

type(sparkDF_pd)

# 데이터를 제대로 불러옴 (모두 다 string이 아니라, long, string, double, ...)
sparkDF_pd.printSchema()

sparkDF_pd.show()

 

  • csv ↔ parquet
pandasDF.to_csv('pandas_Employee.csv')  # csv파일 형태로 저장
pandasDF.to_parquet('pandas_Employee.parquet')  # parquet 형식으로 저장
pandasPQ  = pd.read_parquet('pandas_Employee.parquet') # parquet 형식으로 저장한 것을 다시 읽어오기
pandasPQ.head()
# 기존의 pandas의 dataframe을 parquet 형식으로 저장하고 불러오는 것까지 에러없이 정상적으로 실행

 

Spark에서

  • sparkDF.write.csv("경로/파일이름")
  • sparkDF.format('csv').save("경로/파일이름")
  • 기존 파일이 있는 경우
    • 덮어쓰기: sparkDF.write.mode('overwrite').csv("경로/파일이름")
    • 추가하기: sparkDF.write.mode('append').csv("경로/파일이름")
    • 무시하기: sparkDF.write.mode('ignore').csv("경로/파일이름")
    • 오류발생: sparkDF.write.mode('error').csv("경로/파일이름") $\Leftarrow$ default

 

sparkDF.write.csv("hdfs://localhost:9000/Spark/hadoop_Employee.csv")  # hadoop_Employee.csv 이름의 csv파일 형태로 저장
sparkDF.write.parquet("hdfs://localhost:9000/Spark/hadoop_Employee.parquet")  # hadoop_Employee.csv 이름의 parquet 형태로 저장

# cp949나 header를 따로 지정하지 않아도 그 정보를 기억하고 있기 때문에 옵션 지정하지 않아도 됨
sparkPQ = 스파크.read.parquet("hdfs://localhost:9000/Spark/hadoop_Employee.parquet")
sparkPQ.show(5)
# 같은 이름의 파일이 있으면 충돌이 일어날 수 있으니 주의


#sparkDF_pd.write.csv("/Spark/sparkdf_Employee.csv")
sparkDF_pd = 스파크.read.csv("hdfs://localhost:9000/Spark/hadoop_Employee.csv",header=True)
sparkDF_pd.show(5)

# 저장할 때에는 utf파일로 저장하기 때문에 한글이 깨지지 않음
# 옵션을 특별히 지정하지 않으면 데이터만을 저장하기 때문에 header가 없어짐 
# -> header=True로 지정하면 첫번째 행이 header로 됨


스파크 = SparkSession.builder.appName("hdfs").getOrCreate()

 

CMD에서

hadoop fs -mkdir /Text
hadoop fs -put 제주호텔리뷰.csv /Text/.

 

data = 스파크.read.csv("hdfs://localhost:9000/Text/제주호텔리뷰.csv", header=True, inferSchema="true")
data.show()

pd_df = data.toPandas()  # 판다스 데이터프레임으로 
pd_df.head()


# 폴더 생성하며 해당 폴더에 지정한 형태로 파일 저장
# coalesce(1) : 세련되게 만드는 작업
# option("header", "true") : header를 넣는 작업
# save("hdfs://localhost:9000/Text/csv") : csv라는 폴더에 저장하라
data.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").save("hdfs://localhost:9000/Text/csv")


# parquet이라는 폴더에 저장
data.write.parquet("hdfs://localhost:9000/Text/parquet")