Skip to content

Commit 08e498b

Browse files
committed
Use parquets chunks, preventing the RAM to get fulfilled
1 parent 73f2fec commit 08e498b

1 file changed

Lines changed: 45 additions & 27 deletions

File tree

  • containers/airflow/dags/brasil/sinan

containers/airflow/dags/brasil/sinan/dengue.py

Lines changed: 45 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from datetime import timedelta
44
from airflow import DAG
55
from airflow.decorators import task
6+
from airflow.models import Variable
67

78

89
default_args = {
@@ -23,16 +24,21 @@
2324
default_args=default_args,
2425
catchup=False,
2526
) as dag:
26-
from airflow.models import Variable
2727

2828
CONN = Variable.get('egh_conn', deserialize_json=True)
2929

3030
@task.external_python(
31-
task_id='first',
31+
task_id='update_dengue',
3232
python='/opt/py311/bin/python3.11'
3333
)
3434
def update_dengue(egh_conn: dict):
35+
"""
36+
This task will run in an isolated python environment, containing PySUS
37+
package. The task will fetch for all
38+
"""
39+
import os
3540
import logging
41+
import pandas as pd
3642

3743
from sqlalchemy import create_engine, text
3844
from pysus.online_data import parquets_to_dataframe
@@ -80,40 +86,52 @@ def update_dengue(egh_conn: dict):
8086
f" WHERE year = '{year}' AND prelim = True"
8187
))
8288

83-
file = sinan.download(sinan.get_files(dis_code, year))
89+
parquets = sinan.download(sinan.get_files(dis_code, year))
90+
91+
for parquet in os.listdir(parquets.path):
92+
file = os.path.join(parquets.path, parquet)
93+
df = pd.read_parquet(str(file), engine='fastparquet')
94+
df.columns = df.columns.str.lower()
95+
df['year'] = year
96+
df['prelim'] = False
97+
df.to_sql(
98+
name=tablename,
99+
con=create_engine(egh_conn['URI']),
100+
schema="brasil",
101+
if_exists='append',
102+
index=False
103+
)
104+
del df
105+
os.remove(file)
106+
logging.debug(f"{file} inserted into db")
107+
os.rmdir(parquets.path)
84108

85-
df = parquets_to_dataframe(file.path)
109+
for year in f_stage['prelim']:
110+
with create_engine(egh_conn['URI']).connect() as conn:
111+
# Update prelim
112+
cur = conn.execute(text(
113+
f'DELETE FROM brasil.{tablename}'
114+
f" WHERE year = '{year}' AND prelim = True"
115+
))
116+
117+
parquets = sinan.download(sinan.get_files(dis_code, year))
118+
119+
for parquet in os.listdir(parquets.path):
120+
file = os.path.join(parquets.path, parquet)
121+
df = pd.read_parquet(str(file), engine='fastparquet')
86122
df.columns = df.columns.str.lower()
87123
df['year'] = year
88-
df['prelim'] = False
124+
df['prelim'] = True
89125
df.to_sql(
90126
name=tablename,
91127
con=create_engine(egh_conn['URI']),
92128
schema="brasil",
93129
if_exists='append',
94130
index=False
95131
)
96-
97-
for year in f_stage['prelim']:
98-
with create_engine(egh_conn['URI']).connect() as conn:
99-
# Update prelim
100-
cur = conn.execute(text(
101-
f'DELETE FROM brasil.{tablename}'
102-
f" WHERE year = '{year}' AND prelim = True"
103-
))
104-
105-
file = sinan.download(sinan.get_files(dis_code, year))
106-
107-
df = parquets_to_dataframe(file.path)
108-
df.columns = df.columns.str.lower()
109-
df['year'] = year
110-
df['prelim'] = True
111-
df.to_sql(
112-
name=tablename,
113-
con=create_engine(egh_conn['URI']),
114-
schema="brasil",
115-
if_exists='append',
116-
index=False
117-
)
132+
del df
133+
os.remove(file)
134+
logging.debug(f"{file} inserted into db")
135+
os.rmdir(parquets.path)
118136

119137
update_dengue(CONN)

0 commit comments

Comments
 (0)