From e9141ec9ce136d2df71708aec906571abc098afe Mon Sep 17 00:00:00 2001 From: codez0mb1e Date: Thu, 21 Apr 2022 16:55:17 +0000 Subject: [PATCH] Init commit --- .gitignore | 1 + src/azure.py | 64 +++++++++++++++++++++++ src/bitfinex_crypto_parser.py | 98 +++++++++++++++++++++++++++++++++++ src/openfigi_parser.py | 75 +++++++++++++++++++++++++++ 4 files changed, 238 insertions(+) create mode 100644 .gitignore create mode 100644 src/azure.py create mode 100644 src/bitfinex_crypto_parser.py create mode 100644 src/openfigi_parser.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..932f02d --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +/data/*.csv \ No newline at end of file diff --git a/src/azure.py b/src/azure.py new file mode 100644 index 0000000..2e55dc3 --- /dev/null +++ b/src/azure.py @@ -0,0 +1,64 @@ + +# %% Import dependencies ---- +from dataclasses import dataclass +from typing import Dict, Any, Iterable +from pandas import DataFrame +from sqlalchemy import create_engine, inspect +import urllib + + +# %% +@dataclass(frozen=True) +class ConnectionSettings: + """Connection Settings.""" + server: str + database: str + username: str + password: str + driver: str = '{ODBC Driver 17 for SQL Server}' + timeout: int = 30 + + +class AzureDbConnection: + """ + Azure SQL database connection. + """ + def __init__(self, conn_settings: ConnectionSettings, echo: bool = False) -> None: + conn_params = urllib.parse.quote_plus( + 'Driver=%s;' % conn_settings.driver + + 'Server=tcp:%s,1433;' % conn_settings.server + + 'Database=%s;' % conn_settings.database + + 'Uid=%s;' % conn_settings.username + + 'Pwd={%s};' % conn_settings.password + + 'Encrypt=yes;' + + 'TrustServerCertificate=no;' + + 'Connection Timeout=%s;' % conn_settings.timeout + ) + conn_string = f'mssql+pyodbc:///?odbc_connect={conn_params}' + + self.db = create_engine(conn_string, echo=echo) + + def connect(self) -> None: + """Estimate connection.""" + self.conn = self.db.connect() + + def get_tables(self) -> Iterable[str]: + """Get list of tables.""" + inspector = inspect(self.db) + return [t for t in inspector.get_table_names()] + + def insert(self, inserted_data: DataFrame, target_table: str, db_mapping: Dict[str, Any], chunksize: int = 10000) -> None: + inserted_data.to_sql( + con=self.db, + schema='dbo', + name=target_table, + if_exists='append', # or replace + index=False, + chunksize=chunksize, + dtype=db_mapping + ) + + def dispose(self) -> None: + """Dispose opened connections.""" + self.conn.close() + self.db.dispose() diff --git a/src/bitfinex_crypto_parser.py b/src/bitfinex_crypto_parser.py new file mode 100644 index 0000000..ffcadb0 --- /dev/null +++ b/src/bitfinex_crypto_parser.py @@ -0,0 +1,98 @@ +#!/usr/bin/python3 + +""" + +Data source: https://www.kaggle.com/code/tencars/bitfinexdataset +""" + +# %% +import os +import numpy as np +import pandas as pd +from sqlalchemy import types + +from azure import AzureDbConnection, ConnectionSettings + + +# %% +input_path = "../data" + +# Get names and number of available currency pairs +pair_names = [x[:-4] for x in os.listdir(input_path)] +n_pairs = len(pair_names) + +# Print the first 50 currency pair names +print("These are the first 50 out of {} currency pairs in the dataset:".format(n_pairs)) +print(pair_names[0:50]) + +usd_pairs = [s for s in pair_names if "usd" in s] +print(usd_pairs) + +# %% + +def load_data(symbol, source=input_path): + path_name = source + "/" + symbol + ".csv" + + # Load data + df = pd.read_csv(path_name, index_col='time', dtype={'open': np.float64, 'high': np.float64, 'low': np.float64, 'close': np.float64, 'volume': np.float64}) + df.index = pd.to_datetime(df.index, unit='ms') + df = df[~df.index.duplicated(keep='first')] + + # As mentioned in the description, bins without any change are not recorded. + # We have to fill these gaps by filling them with the last value until a change occurs. + #df = df.resample('1T').pad() + + return df[['open', 'high', 'low', 'close', 'volume']] + + +# %% ---- +solusd = load_data("solusd") +solusd.tail() + + +# %% ---- +conn_settings = ... +db_conn = AzureDbConnection(conn_settings) + +db_conn.connect() +for t in db_conn.get_tables(): + print(t) + + +# %% +min_candels_n = 10000 + +db_mapping = { + 'FIGI': types.VARCHAR(length=12), + 'open': types.DECIMAL(precision=19, scale=9), + 'high': types.DECIMAL(precision=19, scale=9), + 'close': types.DECIMAL(precision=19, scale=9), + 'low': types.DECIMAL(precision=19, scale=9), + 'volume': types.DECIMAL(precision=19, scale=9), + 'time': types.DATETIME(), + 'source_id': types.SMALLINT, + 'version': types.VARCHAR(length=12), + 'interval': types.CHAR(length=2) +} + +for pair in usd_pairs: + print(f'Starting read {pair}...') + candles_df = load_data(pair) + + candles_df['FIGI'] = pair + candles_df['time'] = candles_df.index + candles_df['source_id'] = 128 + candles_df['version'] = 'v202204' + candles_df['interval'] = '1M' + + if candles_df.shape[0] > min_candels_n: + print('{} rows from {} to {}'.format(candles_df.shape[0], min(candles_df['time']), max(candles_df['time']))) + + print(f'Starting insert {pair}...') + db_conn.insert(candles_df, 'Cryptocurrency', db_mapping) + else: + print(f'WARN: {pair} has only {candles_df.shape[0]} records') + + +# %% +db_conn.dispose() diff --git a/src/openfigi_parser.py b/src/openfigi_parser.py new file mode 100644 index 0000000..e689185 --- /dev/null +++ b/src/openfigi_parser.py @@ -0,0 +1,75 @@ +# %% +from dataclasses import dataclass +from typing import Optional +import pandas as pd +import httpx + + +# %% +@dataclass +class AssetInfo: + FIGI: str + Ticker: str + Title: str + Description: Optional[str] + AssetType: str = 'Cryptocurrency' + SourceId: str = "OpenFigi API" + Version: str = "v202204" + + +def get_asset_info(pair: str) -> AssetInfo: + api_url = f'https://www.openfigi.com/search/query?facetQuery=MARKET_SECTOR_DES:%22Curncy%22&num_rows=100&simpleSearchString={pair}&start=0' + response = httpx.get(api_url) + + json_response = response.json() + response_df = pd.DataFrame.from_dict(json_response['result'], orient='columns') + if len(response_df) == 0: + print(f'[WARN] {pair} not found') + return None + + pair_figi = response_df.kkg_pairFIGI_sd.unique() + + if (len(pair_figi) != 1): + print(f'[WARN] {len(pair_figi)} records was found for {pair}') + else: + print(f'[INFO] {pair} associated w/ FIGI {pair_figi[0]}') + + return pair_figi + + +#%% Tests +expected_pairs = { + 'WAX-USD': None, + 'ETH-USD': 'BBG00J3NBWD7', + 'BTC-USD': 'BBG006FCL7J4', + 'SOL-USD': 'BBG013WVY457', + 'UNI-USD': 'BBG013TZFVW3' +} + +for k, v in expected_pairs.items(): + assert get_asset_info(k) == v + + +# %% +import os +import pandas as pd + +pair_names = [x[:-4] for x in os.listdir("../data")] + +def insert_dash(text: str, position: int) -> str: + if '-' not in text: + return text[:position] + '-' + text[position:] + else: + return text + +usd_pairs = [insert_dash(s.upper(), 3) for s in pair_names if "usd" in s] + +print(usd_pairs) + +# %% +pair_figi_list = [get_asset_info(p) for p in usd_pairs] + +for p in usd_pairs: + print(p) + get_asset_info(p) +# %%