PySpark의 테이블 데이터 프레임을 csv로 내보내는 방법은 무엇입니까?
Spark 1.3.1(PySpark)을 사용하고 있으며 SQL 쿼리를 사용하여 테이블을 생성했습니다.나는 지금 어떤 물건을 가지고 있습니다.DataFrame
이것을 . 을 하고 하고 을 .DataFrame
개체("table"이라고 함)를 csv 파일로 이동하여 조작하고 열을 플롯할 수 있습니다.어떻게 내보낼 수 있습니까?DataFrame
에 "에v "table
감사합니다!
드라이버 메모리에 데이터 프레임을 저장할 때 로컬 파일 시스템에 저장하려면 방법을 사용하여 Spark DataFrame을 로컬 Pandas DataFrame으로 변환한 후 간단히 사용할 수 있습니다.to_csv
:
df.toPandas().to_csv('mycsv.csv')
그렇지 않으면 spark-csv를 사용할 수 있습니다.
스파크 1.3
df.save('mycsv.csv', 'com.databricks.spark.csv')
스파크 1.4+
df.write.format('com.databricks.spark.csv').save('mycsv.csv')
2에서는 Spark 2.0+ 하실 에서 을 사용할 수 .csv
데이터 소스 직접:
df.write.csv('mycsv.csv')
Apache Spark 2+의 경우 데이터 프레임을 단일 csv 파일로 저장합니다.다음 명령 사용
query.repartition(1).write.csv("cc_out.csv", sep='|')
여기서1
csv의 하다고 할 수 .요구사항에 따라 변경 가능합니다.
spark-csv를 사용할 수 없는 경우 다음을 수행할 수 있습니다.
df.rdd.map(lambda x: ",".join(map(str, x))).coalesce(1).saveAsTextFile("file.csv")
줄 바꿈 또는 쉼표가 있는 문자열을 처리해야 하는 경우 작동하지 않습니다.사용 방법:
import csv
import cStringIO
def row2csv(row):
buffer = cStringIO.StringIO()
writer = csv.writer(buffer)
writer.writerow([str(s).encode("utf-8") for s in row])
buffer.seek(0)
return buffer.read().strip()
df.rdd.map(row2csv).coalesce(1).saveAsTextFile("file.csv")
Dataframe을 단일 파티션으로 다시 분할한 다음 유닉스 파일 시스템 형식으로 파일의 포맷, 경로 및 기타 파라미터를 정의해야 합니다.
df.repartition(1).write.format('com.databricks.spark.csv').save("/path/to/file/myfile.csv",header = 'true')
repartition 기능에 대해 자세히 알아보기저장 기능에 대해 자세히 알아보기
그러나 재분할은 비용이 많이 드는 기능이며 Pandas()에게는 최악입니다.성능을 향상시키려면 이전 구문에서 .repartition(1) 대신 .coalesce(1)를 사용해 보십시오.
PySpark 사용하기
Spark 3.0+에서 csv로 작성하는 가장 쉬운 방법
sdf.write.csv("/path/to/csv/data.csv")
사용 중인 스파크 노드 수에 따라 여러 개의 파일을 생성할 수 있습니다.단일 파일로 가져오려면 repartition을 사용합니다.
sdf.repartition(1).write.csv("/path/to/csv/data.csv")
판다 사용하기
만약 당신의 데이터가 너무 많지 않고 지역의 비단뱀에 보관될 수 있다면, 당신은 팬더를 사용할 수 있습니다.
sdf.toPandas().to_csv("/path/to/csv/data.csv", index=False)
코알라 사용하기
sdf.to_koalas().to_csv("/path/to/csv/data.csv", index=False)
이것(원라이너를 원하지 않으실 경우)은 어떻습니까?
for row in df.collect():
d = row.asDict()
s = "%d\t%s\t%s\n" % (d["int_column"], d["string_column"], d["string_column"])
f.write(s)
f 는 열린 파일 설명자 입니다.또한 분리판은 TAB char이지만 원하는 대로 쉽게 변경할 수 있습니다.
'''
I am late to the pary but: this will let me rename the file, move it to a desired directory and delete the unwanted additional directory spark made
'''
import shutil
import os
import glob
path = 'test_write'
#write single csv
students.repartition(1).write.csv(path)
#rename and relocate the csv
shutil.move(glob.glob(os.getcwd() + '\\' + path + '\\' + r'*.csv')[0], os.getcwd()+ '\\' + path+ '.csv')
#remove additional directory
shutil.rmtree(os.getcwd()+'\\'+path)
저는 팬더들과 함께 그 방법을 사용했고 이것은 저에게 끔찍한 성과를 주었습니다.결국 시간이 너무 오래 걸려서 다른 방법을 찾기 위해 멈췄습니다.
여러 csv 대신 하나의 csv에 쓰는 방법을 찾고 있다면 다음과 같습니다.
df.coalesce(1).write.csv("train_dataset_processed", header=True)
데이터셋 처리 시간이 2시간 이상에서 2분으로 단축되었습니다.
display(df)를 시도하고 결과에서 download 옵션을 사용합니다.이 옵션을 사용하면 100만 행만 다운로드할 수 있지만 매우 빠릅니다.
언급URL : https://stackoverflow.com/questions/31385363/how-to-export-a-table-dataframe-in-pyspark-to-csv
'programing' 카테고리의 다른 글
jQuery getJ올바른 JSON에 대한 SON 구문 오류 (0) | 2023.09.10 |
---|---|
필터를 고려한 열의 중위수는 어떻게 구합니까? (0) | 2023.09.10 |
다른 열에 그룹화된 평균값을 계산하는 방법 (0) | 2023.09.10 |
요소가 뷰포트에 보이는지 쿼리 확인 (0) | 2023.09.10 |
nodejs를 사용하여 기본 브라우저를 열고 특정 URL로 이동하는 방법 (0) | 2023.09.10 |