Skip to content

Commit 8ffb904

Browse files
committed
Add columns to table before inserting the dataframe
1 parent f743f5c commit 8ffb904

1 file changed

Lines changed: 31 additions & 25 deletions

File tree

  • containers/airflow/dags/brasil/sinan

containers/airflow/dags/brasil/sinan/dengue.py

Lines changed: 31 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -52,31 +52,38 @@ def update_dengue(egh_conn: dict):
5252
files = sinan.get_files(dis_code=dis_code)
5353

5454

55-
def recursive_to_sql(df: pd.DataFrame, engine):
55+
def to_sql_include_cols(df: pd.DataFrame, engine):
5656
"""
57-
Try inserting a dataframe into db, handle error recursively if
58-
a column is missing
57+
Insert dataframe into db, include missing columns if needed
5958
"""
60-
try:
61-
df.to_sql(
62-
name=tablename,
63-
con=engine,
64-
schema="brasil",
65-
if_exists='append',
66-
index=False
67-
)
68-
except ProgrammingError as error:
69-
if str(error).startswith("(psycopg2.errors.UndefinedColumn)"):
70-
column_name = str(error).split('"')[1]
71-
with create_engine(egh_conn['URI']).connect() as conn:
72-
conn.execute(text(
73-
f'ALTER TABLE brasil.{tablename}'
74-
f' ADD COLUMN {column_name} TEXT'
75-
))
76-
conn.commit()
77-
logging.warning(f"Column {column_name} added into {tablename}")
78-
recursive_to_sql(df, engine)
79-
59+
df.columns = df.columns.str.lower()
60+
61+
with create_engine(egh_conn['URI']).connect() as conn:
62+
# Get columns
63+
res = conn.execute(text(f'SELECT * FROM brasil.{tablename} LIMIT 1'))
64+
sql_columns = set(i[0] for i in res.cursor.description)
65+
66+
df_columns = set(df.columns)
67+
columns_to_add = df_columns.difference(sql_columns)
68+
69+
if columns_to_add:
70+
sql_statements = [f"ALTER TABLE {tablename}"]
71+
for column in columns_to_add:
72+
sql_statements.append(f"ADD COLUMN {column} TEXT,") # object
73+
74+
with create_engine(egh_conn['URI']).connect() as conn:
75+
sql = ' '.join(sql_statements)
76+
logging.warning(f"EXECUTING: {sql}")
77+
conn.execute(text(sql))
78+
conn.commit()
79+
80+
df.to_sql(
81+
name=tablename,
82+
con=engine,
83+
schema="brasil",
84+
if_exists='append',
85+
index=False
86+
)
8087

8188
def insert_parquets(parquet_dir: str, year: int):
8289
"""
@@ -86,11 +93,10 @@ def insert_parquets(parquet_dir: str, year: int):
8693
for parquet in os.listdir(parquet_dir):
8794
file = os.path.join(parquet_dir, parquet)
8895
df = pd.read_parquet(str(file), engine='fastparquet')
89-
df.columns = df.columns.str.lower()
9096
df['year'] = year
9197
df['prelim'] = False
9298

93-
recursive_to_sql(df, create_engine(egh_conn['URI']))
99+
to_sql_include_cols(df, create_engine(egh_conn['URI']))
94100
logging.debug(f"{file} inserted into db")
95101

96102
del df

0 commit comments

Comments
 (0)