Skip to content

Commit caebfb6

Browse files
committed
Handle UndefinedColumn error & add column
1 parent 08e498b commit caebfb6

1 file changed

Lines changed: 56 additions & 32 deletions

File tree

  • containers/airflow/dags/brasil/sinan

containers/airflow/dags/brasil/sinan/dengue.py

Lines changed: 56 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ def update_dengue(egh_conn: dict):
4141
import pandas as pd
4242

4343
from sqlalchemy import create_engine, text
44+
from sqlalchemy.exc import ProgrammingError
4445
from pysus.online_data import parquets_to_dataframe
4546
from pysus.ftp.databases.sinan import SINAN
4647

@@ -49,6 +50,31 @@ def update_dengue(egh_conn: dict):
4950
tablename = "sinan_dengue_m"
5051
files = sinan.get_files(dis_code=dis_code)
5152

53+
54+
def insert_parquets(parquet_dir: str, year: int):
55+
"""
56+
Insert parquet dir into database using its chunks. Delete the chunk
57+
and the directory after insertion.
58+
"""
59+
for parquet in os.listdir(parquet_dir):
60+
file = os.path.join(parquet_dir, parquet)
61+
df = pd.read_parquet(str(file), engine='fastparquet')
62+
df.columns = df.columns.str.lower()
63+
df['year'] = year
64+
df['prelim'] = False
65+
df.to_sql(
66+
name=tablename,
67+
con=create_engine(egh_conn['URI']),
68+
schema="brasil",
69+
if_exists='append',
70+
index=False
71+
)
72+
del df
73+
os.remove(file)
74+
logging.debug(f"{file} inserted into db")
75+
os.rmdir(parquets.path)
76+
77+
5278
f_stage = {}
5379
for file in files:
5480
code, year = sinan.format(file)
@@ -88,22 +114,21 @@ def update_dengue(egh_conn: dict):
88114

89115
parquets = sinan.download(sinan.get_files(dis_code, year))
90116

91-
for parquet in os.listdir(parquets.path):
92-
file = os.path.join(parquets.path, parquet)
93-
df = pd.read_parquet(str(file), engine='fastparquet')
94-
df.columns = df.columns.str.lower()
95-
df['year'] = year
96-
df['prelim'] = False
97-
df.to_sql(
98-
name=tablename,
99-
con=create_engine(egh_conn['URI']),
100-
schema="brasil",
101-
if_exists='append',
102-
index=False
103-
)
104-
del df
105-
os.remove(file)
106-
logging.debug(f"{file} inserted into db")
117+
try:
118+
insert_parquets(parquets.path, year)
119+
except ProgrammingError as error:
120+
if str(error).startswith("(psycopg2.errors.UndefinedColumn)"):
121+
# Include new columns to table
122+
column_name = str(error).split('"')[1]
123+
with create_engine(egh_conn['URI']).connect() as conn:
124+
conn.execute(text(
125+
f'ALTER TABLE brasil.{tablename}'
126+
f' ADD COLUMN {column_name} TEXT'
127+
))
128+
conn.commit()
129+
logging.warning(f"Column {column_name} added into {tablename}")
130+
insert_parquets(parquets.path, year)
131+
107132
os.rmdir(parquets.path)
108133

109134
for year in f_stage['prelim']:
@@ -116,22 +141,21 @@ def update_dengue(egh_conn: dict):
116141

117142
parquets = sinan.download(sinan.get_files(dis_code, year))
118143

119-
for parquet in os.listdir(parquets.path):
120-
file = os.path.join(parquets.path, parquet)
121-
df = pd.read_parquet(str(file), engine='fastparquet')
122-
df.columns = df.columns.str.lower()
123-
df['year'] = year
124-
df['prelim'] = True
125-
df.to_sql(
126-
name=tablename,
127-
con=create_engine(egh_conn['URI']),
128-
schema="brasil",
129-
if_exists='append',
130-
index=False
131-
)
132-
del df
133-
os.remove(file)
134-
logging.debug(f"{file} inserted into db")
144+
try:
145+
insert_parquets(parquets.path, year)
146+
except ProgrammingError as error:
147+
if str(error).startswith("(psycopg2.errors.UndefinedColumn)"):
148+
# Include new columns to table
149+
column_name = str(error).split('"')[1]
150+
with create_engine(egh_conn['URI']).connect() as conn:
151+
conn.execute(text(
152+
f'ALTER TABLE brasil.{tablename}'
153+
f' ADD COLUMN {column_name} TEXT'
154+
))
155+
conn.commit()
156+
logging.warning(f"Column {column_name} added into {tablename}")
157+
insert_parquets(parquets.path, year)
158+
135159
os.rmdir(parquets.path)
136160

137161
update_dengue(CONN)

0 commit comments

Comments
 (0)