Skip to content

Commit 7d276b5

Browse files
committed
Parse all columns to TEXT before inserting to db
1 parent 8ffb904 commit 7d276b5

1 file changed

Lines changed: 27 additions & 9 deletions

File tree

  • containers/airflow/dags/brasil/sinan

containers/airflow/dags/brasil/sinan/dengue.py

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,6 @@ def update_dengue(egh_conn: dict):
4242
import pandas as pd
4343

4444
from sqlalchemy import create_engine, text
45-
from sqlalchemy.exc import ProgrammingError
46-
from pysus.online_data import parquets_to_dataframe
4745
from pysus.ftp.databases.sinan import SINAN
4846

4947
sinan = SINAN().load()
@@ -52,7 +50,7 @@ def update_dengue(egh_conn: dict):
5250
files = sinan.get_files(dis_code=dis_code)
5351

5452

55-
def to_sql_include_cols(df: pd.DataFrame, engine):
53+
def to_sql_include_cols(df: pd.DataFrame, prelim: bool, engine):
5654
"""
5755
Insert dataframe into db, include missing columns if needed
5856
"""
@@ -77,6 +75,28 @@ def to_sql_include_cols(df: pd.DataFrame, engine):
7775
conn.execute(text(sql))
7876
conn.commit()
7977

78+
for col, dtype in df.dtypes.items():
79+
if col in ['dt_notific', 'dt_sin_pri']:
80+
try:
81+
df[col] = pd.to_datetime(df[col]).dt.strftime('%d%m%Y').astype('object')
82+
dtype = 'object'
83+
logging.warning(
84+
f"Column '{col}' of type '{dtype}' has been parsed to 'object'"
85+
)
86+
except ValueError as error:
87+
logging.error(f'Could not format date column correctly: {error}')
88+
df[col] = df[col].astype('object')
89+
dtype = 'object'
90+
91+
if str(dtype) != 'object':
92+
df[col] = df[col].astype('object')
93+
logging.warning(
94+
f"Column '{col}' of type '{dtype}' has been parsed to 'object'"
95+
)
96+
97+
df['year'] = year
98+
df['prelim'] = prelim
99+
80100
df.to_sql(
81101
name=tablename,
82102
con=engine,
@@ -85,18 +105,16 @@ def to_sql_include_cols(df: pd.DataFrame, engine):
85105
index=False
86106
)
87107

88-
def insert_parquets(parquet_dir: str, year: int):
108+
def insert_parquets(parquet_dir: str, year: int, prelim: bool):
89109
"""
90110
Insert parquet dir into database using its chunks. Delete the chunk
91111
and the directory after insertion
92112
"""
93113
for parquet in os.listdir(parquet_dir):
94114
file = os.path.join(parquet_dir, parquet)
95115
df = pd.read_parquet(str(file), engine='fastparquet')
96-
df['year'] = year
97-
df['prelim'] = False
98116

99-
to_sql_include_cols(df, create_engine(egh_conn['URI']))
117+
to_sql_include_cols(df, prelim, create_engine(egh_conn['URI']))
100118
logging.debug(f"{file} inserted into db")
101119

102120
del df
@@ -142,7 +160,7 @@ def insert_parquets(parquet_dir: str, year: int):
142160
))
143161

144162
parquets = sinan.download(sinan.get_files(dis_code, year))
145-
insert_parquets(parquets.path, year)
163+
insert_parquets(parquets.path, year, False)
146164

147165
for year in f_stage['prelim']:
148166
with create_engine(egh_conn['URI']).connect() as conn:
@@ -153,6 +171,6 @@ def insert_parquets(parquet_dir: str, year: int):
153171
))
154172

155173
parquets = sinan.download(sinan.get_files(dis_code, year))
156-
insert_parquets(parquets.path, year)
174+
insert_parquets(parquets.path, year, True)
157175

158176
update_dengue(CONN)

0 commit comments

Comments
 (0)