BSA04_ReadWrite-DataFiles
필요한 패키지
import os
import sys
import pyspark
import pandas as pd
from pyspark.sql import SparkSession
환경 변수 설정
os.environ['JAVA_HOME'] = "C:\Java"
os.environ['SPARK_HOME'] = "C:\spark-3.2.3"
os.environ['PYLIB'] = "C:\spark-3.2.3\python\lib"
sys.path.insert(0,os.environ['PYLIB']+"\py4j-0.10.9.5-src.zip")
sys.path.insert(0,os.environ['PYLIB']+"\pyspark.zip")
스파크 세션 생성
pandasDF = pd.read_csv("Employee.csv",encoding="ansi")
pandasDF
# 'Test'이름의 sparksession 객체 생성
# 이미 생성된 스파크 세션이 있다면 그것을 반환하고, 없으면 새로 만들기
스파크 = SparkSession.builder.appName('Test').getOrCreate()
# 메모리 관리와 관련
## Arrow 라이브러리를 사용하여 데이터프레임을 변환할 때의 성능을 개선하기 위한 설정
스파크.conf.set("spark.sql.execution.arrow.enabled","true")
스파크
웹브라우저에서 localhost:4040 연결
pyspark에서 hdfs 데이터 불러오기
# spark 경로를 찾지 못하는 경우
!pip install findspark
import findspark
import os
findspark.find()
findspark.init(os.environ.get("SPARK_HOME"))
컴퓨터 환경에 따른 실행결과 차이
CMD에서 start-dfs.cmd와 start-yarn.cmd 실행 후 hadoop fs -put Employee.csv spark_Employee.csv 실행했을 때
- user/사용자이름/hadoop_Employee.csv가 생성되는 경우 아래 명령어 오류 발생
- 생성되지 않는 경우 아래 명령어 실행
sparkDF = 스파크.read.csv("Employee.csv")
sparkDF.show()
CMD에서
stat-all.cmd # 하둡 실행
hadoop fs -mkdir /Spark
hadoop fs -put Employee.csv /Spark/hadoop_Employee.csv
데이터프레임 불러오기
sparkDF = 스파크.read.csv("hdfs://localhost:9000/Spark/spark_Employee.csv")
sparkDF.show()
# 하둡에 있는 데이터라면 hdfs://localhost:9000/로 표시함 (hadoop distributed file system)
# 여전히 한글이어서 글자 깨짐 현상이 발생
# header(변수)를 인식하지 못하고 데이터를 그냥 가져옴
# header가 없는 것을 기본으로 함 (csv는 header가 기본적으로 있음)
sparkDF = 스파크.read.option('encoding','cp949').csv("hdfs://localhost:9000/Spark/spark_Employee.csv")
sparkDF
sparkDF.show() # 이 코드를 실행해야 데이터프레임 확인 가능
# .option() : 옵션을 추가하는 경우
sparkDF = 스파크.read.option('encoding','cp949').option('header','true').csv("hdfs://localhost:9000/Spark/spark_Employee.csv")
# show() : 하둡에서 데이터를 보고 싶은 경우
sparkDF.show(5)
type(sparkDF)
sparkDF.head(5)
sparkDF.printSchema()
## smu: user name
data = 스파크.read.csv("hdfs://localhost:9000/Spark/spark_Employee.csv", header=True, encoding="cp949", inferSchema="true")
data.show()
데이터프레임을 다른 형식으로 변환하고 저장/불러오기
- pandas dataframe ↔ spark dataframe
pandasDF_spark = sparkDF.toPandas() # pandas의 dataframe으로 바꿔줌 (spark -> pandas)
pandasDF_spark.head()
### !pip install pyarrow
스파크.conf.set("spark.sql.execution.arrow.enabled","true") # 메모리 관리와 관련
# 에러 발생 시 아래 코드로 실행
# spark.conf.set("spark.sql.execution.arrow.pyspark.fallback.enabled","true")
# 판다스로 만들었던 데이터프레임을 스파크의 데이터프레임으로 만듦
sparkDF_pd = 스파크.createDataFrame(pandasDF)
type(sparkDF_pd)
# 데이터를 제대로 불러옴 (모두 다 string이 아니라, long, string, double, ...)
sparkDF_pd.printSchema()
sparkDF_pd.show()
- csv ↔ parquet
pandasDF.to_csv('pandas_Employee.csv') # csv파일 형태로 저장
pandasDF.to_parquet('pandas_Employee.parquet') # parquet 형식으로 저장
pandasPQ = pd.read_parquet('pandas_Employee.parquet') # parquet 형식으로 저장한 것을 다시 읽어오기
pandasPQ.head()
# 기존의 pandas의 dataframe을 parquet 형식으로 저장하고 불러오는 것까지 에러없이 정상적으로 실행
Spark에서
- sparkDF.write.csv("경로/파일이름")
- sparkDF.format('csv').save("경로/파일이름")
- 기존 파일이 있는 경우
- 덮어쓰기: sparkDF.write.mode('overwrite').csv("경로/파일이름")
- 추가하기: sparkDF.write.mode('append').csv("경로/파일이름")
- 무시하기: sparkDF.write.mode('ignore').csv("경로/파일이름")
- 오류발생: sparkDF.write.mode('error').csv("경로/파일이름") $\Leftarrow$ default
sparkDF.write.csv("hdfs://localhost:9000/Spark/hadoop_Employee.csv") # hadoop_Employee.csv 이름의 csv파일 형태로 저장
sparkDF.write.parquet("hdfs://localhost:9000/Spark/hadoop_Employee.parquet") # hadoop_Employee.csv 이름의 parquet 형태로 저장
# cp949나 header를 따로 지정하지 않아도 그 정보를 기억하고 있기 때문에 옵션 지정하지 않아도 됨
sparkPQ = 스파크.read.parquet("hdfs://localhost:9000/Spark/hadoop_Employee.parquet")
sparkPQ.show(5)
# 같은 이름의 파일이 있으면 충돌이 일어날 수 있으니 주의
#sparkDF_pd.write.csv("/Spark/sparkdf_Employee.csv")
sparkDF_pd = 스파크.read.csv("hdfs://localhost:9000/Spark/hadoop_Employee.csv",header=True)
sparkDF_pd.show(5)
# 저장할 때에는 utf파일로 저장하기 때문에 한글이 깨지지 않음
# 옵션을 특별히 지정하지 않으면 데이터만을 저장하기 때문에 header가 없어짐
# -> header=True로 지정하면 첫번째 행이 header로 됨
스파크 = SparkSession.builder.appName("hdfs").getOrCreate()
CMD에서
hadoop fs -mkdir /Text
hadoop fs -put 제주호텔리뷰.csv /Text/.
data = 스파크.read.csv("hdfs://localhost:9000/Text/제주호텔리뷰.csv", header=True, inferSchema="true")
data.show()
pd_df = data.toPandas() # 판다스 데이터프레임으로
pd_df.head()
# 폴더 생성하며 해당 폴더에 지정한 형태로 파일 저장
# coalesce(1) : 세련되게 만드는 작업
# option("header", "true") : header를 넣는 작업
# save("hdfs://localhost:9000/Text/csv") : csv라는 폴더에 저장하라
data.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").save("hdfs://localhost:9000/Text/csv")
# parquet이라는 폴더에 저장
data.write.parquet("hdfs://localhost:9000/Text/parquet")
'Statistics > BSA' 카테고리의 다른 글
230405 / BSA05. 데이터 전처리 (0) | 2023.04.10 |
---|---|
230329 / BSA04. 오디오 데이터 처리하기 (0) | 2023.04.02 |
230327 / BSA04. 다양한 형태의 자료 파일 작업 속도 비교 (0) | 2023.04.02 |
230322 / BSA03. 데이터 분석 주요 과정 살펴보기 (0) | 2023.03.26 |
230320 / BSA03. 가상환경에서 주피터랩 설치하기, 주피터노트북에서 자동완성 기능 구현하기 (0) | 2023.03.26 |