programing

PySpark의 테이블 데이터 프레임을 csv로 내보내는 방법은 무엇입니까?

topblog 2023. 9. 10. 11:52
반응형

PySpark의 테이블 데이터 프레임을 csv로 내보내는 방법은 무엇입니까?

Spark 1.3.1(PySpark)을 사용하고 있으며 SQL 쿼리를 사용하여 테이블을 생성했습니다.나는 지금 어떤 물건을 가지고 있습니다.DataFrame이것을 . 을 하고 하고 을 .DataFrame개체("table"이라고 함)를 csv 파일로 이동하여 조작하고 열을 플롯할 수 있습니다.어떻게 내보낼 수 있습니까?DataFrame에 "에v "table

감사합니다!

드라이버 메모리에 데이터 프레임을 저장할 때 로컬 파일 시스템에 저장하려면 방법을 사용하여 Spark DataFrame을 로컬 Pandas DataFrame으로 변환한 후 간단히 사용할 수 있습니다.to_csv:

df.toPandas().to_csv('mycsv.csv')

그렇지 않으면 spark-csv를 사용할 수 있습니다.

  • 스파크 1.3

    df.save('mycsv.csv', 'com.databricks.spark.csv')
    
  • 스파크 1.4+

    df.write.format('com.databricks.spark.csv').save('mycsv.csv')
    

2에서는 Spark 2.0+ 하실 에서 을 사용할 수 .csv데이터 소스 직접:

df.write.csv('mycsv.csv')

Apache Spark 2+의 경우 데이터 프레임을 단일 csv 파일로 저장합니다.다음 명령 사용

query.repartition(1).write.csv("cc_out.csv", sep='|')

여기서1csv의 하다고 할 수 .요구사항에 따라 변경 가능합니다.

spark-csv를 사용할 수 없는 경우 다음을 수행할 수 있습니다.

df.rdd.map(lambda x: ",".join(map(str, x))).coalesce(1).saveAsTextFile("file.csv")

줄 바꿈 또는 쉼표가 있는 문자열을 처리해야 하는 경우 작동하지 않습니다.사용 방법:

import csv
import cStringIO

def row2csv(row):
    buffer = cStringIO.StringIO()
    writer = csv.writer(buffer)
    writer.writerow([str(s).encode("utf-8") for s in row])
    buffer.seek(0)
    return buffer.read().strip()

df.rdd.map(row2csv).coalesce(1).saveAsTextFile("file.csv")

Dataframe을 단일 파티션으로 다시 분할한 다음 유닉스 파일 시스템 형식으로 파일의 포맷, 경로 및 기타 파라미터를 정의해야 합니다.

df.repartition(1).write.format('com.databricks.spark.csv').save("/path/to/file/myfile.csv",header = 'true')

repartition 기능에 대해 자세히 알아보기저장 기능에 대해 자세히 알아보기

그러나 재분할은 비용이 많이 드는 기능이며 Pandas()에게는 최악입니다.성능을 향상시키려면 이전 구문에서 .repartition(1) 대신 .coalesce(1)를 사용해 보십시오.

재분할병합 기능에 대해 자세히 알아보십시오.

PySpark 사용하기

Spark 3.0+에서 csv로 작성하는 가장 쉬운 방법

sdf.write.csv("/path/to/csv/data.csv")

사용 중인 스파크 노드 수에 따라 여러 개의 파일을 생성할 수 있습니다.단일 파일로 가져오려면 repartition을 사용합니다.

sdf.repartition(1).write.csv("/path/to/csv/data.csv")

판다 사용하기

만약 당신의 데이터가 너무 많지 않고 지역의 비단뱀에 보관될 수 있다면, 당신은 팬더를 사용할 수 있습니다.

sdf.toPandas().to_csv("/path/to/csv/data.csv", index=False)

코알라 사용하기

sdf.to_koalas().to_csv("/path/to/csv/data.csv", index=False)

이것(원라이너를 원하지 않으실 경우)은 어떻습니까?

for row in df.collect():
    d = row.asDict()
    s = "%d\t%s\t%s\n" % (d["int_column"], d["string_column"], d["string_column"])
    f.write(s)

f 는 열린 파일 설명자 입니다.또한 분리판은 TAB char이지만 원하는 대로 쉽게 변경할 수 있습니다.

'''
I am late to the pary but: this will let me rename the file, move it to a desired directory and delete the unwanted additional directory spark made
'''

import shutil
import os
import glob

path = 'test_write'
#write single csv
students.repartition(1).write.csv(path)

#rename and relocate the csv
shutil.move(glob.glob(os.getcwd() + '\\' + path + '\\' + r'*.csv')[0], os.getcwd()+ '\\' + path+ '.csv')

#remove additional directory
shutil.rmtree(os.getcwd()+'\\'+path)

저는 팬더들과 함께 그 방법을 사용했고 이것은 저에게 끔찍한 성과를 주었습니다.결국 시간이 너무 오래 걸려서 다른 방법을 찾기 위해 멈췄습니다.

여러 csv 대신 하나의 csv에 쓰는 방법을 찾고 있다면 다음과 같습니다.

df.coalesce(1).write.csv("train_dataset_processed", header=True)

데이터셋 처리 시간이 2시간 이상에서 2분으로 단축되었습니다.

display(df)를 시도하고 결과에서 download 옵션을 사용합니다.이 옵션을 사용하면 100만 행만 다운로드할 수 있지만 매우 빠릅니다.

언급URL : https://stackoverflow.com/questions/31385363/how-to-export-a-table-dataframe-in-pyspark-to-csv

반응형