HAZEL

[awswrangler] 데이터프레임을 Redshift로 한번에 넣는 방법 본문

DATA ENGINEERING/AWS

[awswrangler] 데이터프레임을 Redshift로 한번에 넣는 방법

Rmsid01 2022. 4. 13. 20:43

 

aws 서비스에서, 데이터를 insert 가 아닌, dataframe으로 한번에 넣는 방법으로 'awswrangler' 모듈이 존재한다.

 

만약, awswrangler 를 사용하지 않는다면, 아래와 같이, insert into 구문을 사용해야 하며, 많은 데이터를 넣기 위해서는 for 문을 해야하는 매우 비 효율적인 방식을 사용해야한다.

 

cur = get_Redshift_connection()

sql = "BEGIN; DELETE FROM {schema}.{table};".format(schema=schema, table=table)
sql += f"""INSERT INTO {schema}.{table} VALUES ( '2022-01-01', 1);"""
sql += "END;"

cur.execute(sql)

 

이러한 부분을 해결하기 위해서, 데이터 프레임 형태의 데이터를 그대로 올리기를 원했다.

여러 방법이 존재하는 것 같았다.

 

 

1. pandas_redshift 를 이용

: pandas_to_redshift 를 이용하여, 데이터를 프레임 형태로 그대로 넣어줄 수 있다.

import pandas_redshift as pr

pr.pandas_to_redshift(data_frame=test_df,
                      redshift_table_name=to_table)

 

** 이 방법을 사용하지 않는 이유는.. MWAA 로 진행중인데, import 오류로 안맞아서 일단은 보류 하였다.

 

2. s3에 데이터를 올린후, s3 -> Redshift 로 옮기기

: airflow 를 이용하고 있는 중이라, 이 방법도 존재하였다. 그러나, 많은 양의 데이터도 아닌데 굳이 s3에 담아야 할까? 라는 고민을 하였고, 이 방법은 패스하게 되었다.

 

실제로, airflow 에는 S3ToRedshiftOperator 가 존재하기 때문에, operator로 쉽게 이용할 수 도 있어 보였다.

 

3. awswrangler

: awswrangler 란, AWS Professional Services 팀에서 개발한 오픈소스 Python 라이브러리이다. Pandas 라이브러리 기반으로, 데이터를 load/unload 할 수 있는 강력한 기능을 제공한다고, 공식문서에 써있다. 다른 곳에는 못쓰고, AWS 환경에서만 쓸 수 있을 것 같다.

 

 

1 ) Connection

https://aws-data-wrangler.readthedocs.io/en/2.4.0-docs/stubs/awswrangler.redshift.connect.html

 

dataframe을 Redshift에 올리기 위해서,  아래와 같이 코드를 작성했다. 

그런데, 공식문서에 보이는 것과 같이, connect 파라미터에는 connection 이 존재하는데, 이것은 glue catalog connection 을 의미하는 것이다. 즉, AWS glue 페이지에서 connection 연결을 하고 그 connection 의 이름을 적어주어야 한다.

나는 glue 의 g 도 건드리지 않는 코드를 사용하는데, 이 aws 모듈 자체가 거기서 정보를 받아오는지 그렇게 해야만 했다.

[ 이런 이유에서인지, airflow로 아래 코드를 사용할때는, glue iam 권한도 부여해줘야 한다. ]

 

사실 다른 변수(Boto3)로 접근 할 수도 있으나, 나는 아래의 방법으로 연결하였다.

 

  • connection (Optional[str]) – Glue Catalog Connection name.
connection = wr.redshift.connect("redshift_glue_connection")

 

2 ) to_sql

https://aws-data-wrangler.readthedocs.io/en/1.1.2/stubs/awswrangler.db.to_sql.html#awswrangler.db.to_sql

 

- 공식 문서의 예제

import awswrangler as wr
import pandas as pd

>>> wr.db.to_sql(
...     df=pd.DataFrame({'col': [1, 2, 3]}),
...     con=wr.catalog.get_engine(connection="..."),
...     name="table_name",
...     schema="schema_name"
... )

 

- 실제 작성 코드

db 대신에 연결하는 db 이름을 적어준다. connection 에는 아래와 같이 코드를 적어주어 연결하였다.

connection = wr.redshift.connect("redshift_glue_connection")
wr.redshift.to_sql(
    df=mau_df,
    con=connection,
    table=table,
    schema=schema
)

 

to_sql 을 사용할 때, 주의할 점은 다 넣고나서, 연결을 닫아 줘야한다는 점이다.

그렇지 않으면, connection 이 계속 쌓이다가, 고갈된다고 한다.

connection.close()

 

 

 

관련 공식문서

https://aws-data-wrangler.readthedocs.io/en/stable/index.html

 

Quick Start — AWS Data Wrangler 2.15.1 documentation

An AWS Professional Service open source initiative | aws-proserve-opensource@amazon.com Quick Start >>> pip install awswrangler import awswrangler as wr import pandas as pd from datetime import datetime df = pd.DataFrame({"id": [1, 2], "value": ["foo", "bo

aws-data-wrangler.readthedocs.io

 

 

 

** +) airflow 2.2.2 버전에서, awswrangler 을 사용할 때도, 열심히 모듈이 import 가 안되는 문제가 발생하였다..

알고보니, 버전이 안맞아서 그런 것 이었다..ㅠㅠ..