Skip to content

Commit f743f5c

Browse files
committed
Use recursion to handle to_sql
1 parent caebfb6 commit f743f5c

1 file changed

Lines changed: 35 additions & 44 deletions

File tree

  • containers/airflow/dags/brasil/sinan

containers/airflow/dags/brasil/sinan/dengue.py

Lines changed: 35 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@
3434
def update_dengue(egh_conn: dict):
3535
"""
3636
This task will run in an isolated python environment, containing PySUS
37-
package. The task will fetch for all
37+
package. The task will fetch for all Dengue years from DATASUS and insert
38+
them into EGH database
3839
"""
3940
import os
4041
import logging
@@ -50,28 +51,50 @@ def update_dengue(egh_conn: dict):
5051
tablename = "sinan_dengue_m"
5152
files = sinan.get_files(dis_code=dis_code)
5253

54+
55+
def recursive_to_sql(df: pd.DataFrame, engine):
56+
"""
57+
Try inserting a dataframe into db, handle error recursively if
58+
a column is missing
59+
"""
60+
try:
61+
df.to_sql(
62+
name=tablename,
63+
con=engine,
64+
schema="brasil",
65+
if_exists='append',
66+
index=False
67+
)
68+
except ProgrammingError as error:
69+
if str(error).startswith("(psycopg2.errors.UndefinedColumn)"):
70+
column_name = str(error).split('"')[1]
71+
with create_engine(egh_conn['URI']).connect() as conn:
72+
conn.execute(text(
73+
f'ALTER TABLE brasil.{tablename}'
74+
f' ADD COLUMN {column_name} TEXT'
75+
))
76+
conn.commit()
77+
logging.warning(f"Column {column_name} added into {tablename}")
78+
recursive_to_sql(df, engine)
5379

80+
5481
def insert_parquets(parquet_dir: str, year: int):
5582
"""
5683
Insert parquet dir into database using its chunks. Delete the chunk
57-
and the directory after insertion.
84+
and the directory after insertion
5885
"""
5986
for parquet in os.listdir(parquet_dir):
6087
file = os.path.join(parquet_dir, parquet)
6188
df = pd.read_parquet(str(file), engine='fastparquet')
6289
df.columns = df.columns.str.lower()
6390
df['year'] = year
6491
df['prelim'] = False
65-
df.to_sql(
66-
name=tablename,
67-
con=create_engine(egh_conn['URI']),
68-
schema="brasil",
69-
if_exists='append',
70-
index=False
71-
)
92+
93+
recursive_to_sql(df, create_engine(egh_conn['URI']))
94+
logging.debug(f"{file} inserted into db")
95+
7296
del df
7397
os.remove(file)
74-
logging.debug(f"{file} inserted into db")
7598
os.rmdir(parquets.path)
7699

77100

@@ -113,23 +136,7 @@ def insert_parquets(parquet_dir: str, year: int):
113136
))
114137

115138
parquets = sinan.download(sinan.get_files(dis_code, year))
116-
117-
try:
118-
insert_parquets(parquets.path, year)
119-
except ProgrammingError as error:
120-
if str(error).startswith("(psycopg2.errors.UndefinedColumn)"):
121-
# Include new columns to table
122-
column_name = str(error).split('"')[1]
123-
with create_engine(egh_conn['URI']).connect() as conn:
124-
conn.execute(text(
125-
f'ALTER TABLE brasil.{tablename}'
126-
f' ADD COLUMN {column_name} TEXT'
127-
))
128-
conn.commit()
129-
logging.warning(f"Column {column_name} added into {tablename}")
130-
insert_parquets(parquets.path, year)
131-
132-
os.rmdir(parquets.path)
139+
insert_parquets(parquets.path, year)
133140

134141
for year in f_stage['prelim']:
135142
with create_engine(egh_conn['URI']).connect() as conn:
@@ -140,22 +147,6 @@ def insert_parquets(parquet_dir: str, year: int):
140147
))
141148

142149
parquets = sinan.download(sinan.get_files(dis_code, year))
143-
144-
try:
145-
insert_parquets(parquets.path, year)
146-
except ProgrammingError as error:
147-
if str(error).startswith("(psycopg2.errors.UndefinedColumn)"):
148-
# Include new columns to table
149-
column_name = str(error).split('"')[1]
150-
with create_engine(egh_conn['URI']).connect() as conn:
151-
conn.execute(text(
152-
f'ALTER TABLE brasil.{tablename}'
153-
f' ADD COLUMN {column_name} TEXT'
154-
))
155-
conn.commit()
156-
logging.warning(f"Column {column_name} added into {tablename}")
157-
insert_parquets(parquets.path, year)
158-
159-
os.rmdir(parquets.path)
150+
insert_parquets(parquets.path, year)
160151

161152
update_dengue(CONN)

0 commit comments

Comments
 (0)