Minor updates

This commit is contained in:
codez0mb1e 2022-06-19 23:01:08 +00:00
parent 153fc5a230
commit b51d4a0162

View File

@ -1,16 +1,18 @@
# %% Import dependencies ----
from dataclasses import dataclass
from typing import Dict, Any, Iterable
from pandas import DataFrame
from typing import Dict, Any
from sqlalchemy import create_engine, inspect
import pandas as pd
import urllib
# %%
# %% Models
@dataclass(frozen=True)
class ConnectionSettings:
"""Connection Settings."""
"""Connection Settings"""
server: str
database: str
username: str
@ -19,10 +21,10 @@ class ConnectionSettings:
timeout: int = 30
# %% Connection
class AzureDbConnection:
"""
Azure SQL database connection.
"""
"""Azure SQL database connection."""
def __init__(self, conn_settings: ConnectionSettings, echo: bool = False) -> None:
conn_params = urllib.parse.quote_plus(
'Driver=%s;' % conn_settings.driver +
@ -36,29 +38,29 @@ class AzureDbConnection:
)
conn_string = f'mssql+pyodbc:///?odbc_connect={conn_params}'
self.db = create_engine(conn_string, echo=echo)
self._db = create_engine(conn_string, echo=echo)
def connect(self) -> None:
"""Estimate connection."""
self.conn = self.db.connect()
"""Estimate connection"""
self._conn = self._db.connect()
def get_tables(self) -> Iterable[str]:
"""Get list of tables."""
inspector = inspect(self.db)
def get_tables(self) -> list[str]:
"""Get list of tables"""
inspector = inspect(self._db)
return [t for t in inspector.get_table_names()]
def insert(self, inserted_data: DataFrame, target_table: str, db_mapping: Dict[str, Any], chunksize: int = 10000) -> None:
def insert(self, inserted_data: pd.DataFrame, target_table: str, db_mapping: Dict[str, Any], chunksize: int = 10000) -> None:
inserted_data.to_sql(
con=self.db,
con=self._db,
schema='dbo',
name=target_table,
if_exists='append', # or replace
if_exists='replace', # or append
index=False,
chunksize=chunksize,
dtype=db_mapping
)
def dispose(self) -> None:
"""Dispose opened connections."""
self.conn.close()
self.db.dispose()
"""Dispose opened connections"""
self._conn.close()
self._db.dispose()