diff --git a/src/azure.py b/src/azure.py index 2e55dc3..05e6f96 100644 --- a/src/azure.py +++ b/src/azure.py @@ -1,16 +1,18 @@ # %% Import dependencies ---- from dataclasses import dataclass -from typing import Dict, Any, Iterable -from pandas import DataFrame +from typing import Dict, Any + from sqlalchemy import create_engine, inspect + +import pandas as pd import urllib -# %% +# %% Models @dataclass(frozen=True) class ConnectionSettings: - """Connection Settings.""" + """Connection Settings""" server: str database: str username: str @@ -19,10 +21,10 @@ class ConnectionSettings: timeout: int = 30 +# %% Connection class AzureDbConnection: - """ - Azure SQL database connection. - """ + """Azure SQL database connection.""" + def __init__(self, conn_settings: ConnectionSettings, echo: bool = False) -> None: conn_params = urllib.parse.quote_plus( 'Driver=%s;' % conn_settings.driver + @@ -36,29 +38,29 @@ class AzureDbConnection: ) conn_string = f'mssql+pyodbc:///?odbc_connect={conn_params}' - self.db = create_engine(conn_string, echo=echo) + self._db = create_engine(conn_string, echo=echo) def connect(self) -> None: - """Estimate connection.""" - self.conn = self.db.connect() + """Estimate connection""" + self._conn = self._db.connect() - def get_tables(self) -> Iterable[str]: - """Get list of tables.""" - inspector = inspect(self.db) + def get_tables(self) -> list[str]: + """Get list of tables""" + inspector = inspect(self._db) return [t for t in inspector.get_table_names()] - def insert(self, inserted_data: DataFrame, target_table: str, db_mapping: Dict[str, Any], chunksize: int = 10000) -> None: + def insert(self, inserted_data: pd.DataFrame, target_table: str, db_mapping: Dict[str, Any], chunksize: int = 10000) -> None: inserted_data.to_sql( - con=self.db, + con=self._db, schema='dbo', name=target_table, - if_exists='append', # or replace + if_exists='replace', # or append index=False, chunksize=chunksize, dtype=db_mapping ) def dispose(self) -> None: - """Dispose opened connections.""" - self.conn.close() - self.db.dispose() + """Dispose opened connections""" + self._conn.close() + self._db.dispose()