diff --git a/src/openfigi_crawler.py b/src/openfigi_crawler.py new file mode 100644 index 0000000..42fbc4b --- /dev/null +++ b/src/openfigi_crawler.py @@ -0,0 +1,167 @@ + +# %% Import dependencies +import os +from dataclasses import dataclass +from typing import Dict, Union + +import pandas as pd + +import httpx + +from sqlalchemy import types +from azure import AzureDbConnection, ConnectionSettings + + +# %% Data models +@dataclass +class AssetInfo: + FIGI: str + Ticker: str + Title: Union[str, None] + Description: Union[str, None] + AssetType: str = 'Cryptocurrency' + SourceId: str = 'OpenFigi API' + Version: str = 'v202206' + + def as_dict(self) -> Dict[str, str]: + return {'Figi': self.FIGI, 'Ticker': self.Ticker} + + +# %% FIGI provider +class OpenFigiProvider: + """ + OpenFigi API provider + + References: + https://www.openfigi.com/assets/local/figi-allocation-rules.pdf + https://www.openfigi.com/search + """ + @staticmethod + def _send_request(ticker: str, asset_type: str) -> pd.DataFrame: + api_url = f'https://www.openfigi.com/search/query?facetQuery=MARKET_SECTOR_DES:%22{asset_type}%22&num_rows=100&simpleSearchString={ticker}&start=0' + response = httpx.get(api_url) + + json_response = response.json() + return pd.DataFrame.from_dict(json_response['result'], orient='columns') + + + @staticmethod + def _find_figi(df: pd.DataFrame, field_name: str) -> Union[str, None]: + if len(df) == 0 or field_name not in df.columns: + return None + + result = df[field_name].dropna().unique() + + if (len(result) != 1): + print(f'[WARN] Multiple ({len(result)}) FIGI records was found') + return None + + return result[0] + + + @staticmethod + def _find_name(df: pd.DataFrame) -> Union[str, None]: + if len(df) == 0 or 'DS002_sd' not in df.columns: + return None + + result = df['DS002_sd'].dropna().unique() + + if (len(result) != 1): + print(f'[WARN] Multiple ({len(result)}) name records was found') + return None + + return result[0] + + + def search(self, ticker: str, asset_type: str = 'Curncy') -> Union[AssetInfo, None]: + """Return FIGI for pair""" + + response_df = OpenFigiProvider._send_request(ticker, asset_type) + + figi = OpenFigiProvider._find_figi(response_df, 'kkg_pairFIGI_sd') + + if figi is None: + base_quote = ticker.split('-')[0] + print(f'[INFO] {ticker} > Try to search using base quote {base_quote}') + + response_df = OpenFigiProvider._send_request(base_quote, asset_type) + figi = OpenFigiProvider._find_figi(response_df, 'kkg_baseAssetFigi_sd') + + if figi is None: + return None + + return AssetInfo(figi, ticker, None, None) + + +#%% +figi_provider = OpenFigiProvider() + +assert figi_provider.search('WAX-USD') == None +assert figi_provider.search('ABCD') == None + + +# %% Tests +expected_pairs = { + 'BNB-USD': 'KKG000007HZ5', + 'ETH-USD': 'BBG00J3NBWD7', + 'BTC-USD': 'BBG006FCL7J4', + 'SOL-USD': 'BBG013WVY457', + 'UNI-USD': 'BBG013TZFVW3', + 'SUSHI-USD': 'KKG0000010W1', + 'AVAX-USD': 'KKG000007J36' +} + + +for k, v in expected_pairs.items(): + actual = figi_provider.search(k) + print(actual.as_dict()) + assert ( + isinstance(actual, AssetInfo) + and actual.FIGI == v + and actual.Ticker == k + ) + + +# %% Get assets for searching figi +pair_names = [x[:-4] for x in os.listdir("../data")] + +def insert_dash(text: str, position: int) -> str: + if '-' not in text: + return text[:position] + '-' + text[position:] + else: + return text + +usd_pairs = [ + insert_dash(s.upper(), 3) + for s in pair_names if "usd" in s +] + +print(usd_pairs[1:10]) + + +# %% +figi_provider = OpenFigiProvider() +pair_figi_list = [figi_provider.search(p) for p in usd_pairs] + + +# %% ---- +db_conn = AzureDbConnection(conn_settings) + +db_conn.connect() +for t in db_conn.get_tables(): + print(t) + + +# %% +db_mapping = { + 'Figi': types.CHAR(length=12), + 'Ticker': types.VARCHAR(length=12) +} + +figi_df = pd.DataFrame([t.as_dict() for t in pair_figi_list if isinstance(t, AssetInfo)]) +db_conn.insert(figi_df, 'figi', db_mapping) + + +# %% +db_conn.dispose() +print('Completed') diff --git a/src/openfigi_parser.py b/src/openfigi_parser.py deleted file mode 100644 index 838dc85..0000000 --- a/src/openfigi_parser.py +++ /dev/null @@ -1,117 +0,0 @@ -# %% -import os -from dataclasses import dataclass -from typing import Literal, Union - -import pandas as pd - -import httpx - -from sqlalchemy import types, sql -from azure import AzureDbConnection, ConnectionSettings - - -# %% -@dataclass -class AssetInfo: - FIGI: str - Ticker: str - Title: Union[str, None] - Description: Union[str, None] - AssetType: Literal['Cryptocurrency'] - SourceId: Literal['OpenFigi API'] - Version: Literal['v202206'] - - -def get_figi(pair: str) -> Union[AssetInfo, None]: - """Return FIGI for pair - - References: - - https://www.openfigi.com/assets/local/figi-allocation-rules.pdf - - https://www.openfigi.com/search - """ - api_url = f'https://www.openfigi.com/search/query?facetQuery=MARKET_SECTOR_DES:%22Curncy%22&num_rows=100&simpleSearchString={pair}&start=0' - response = httpx.get(api_url) - - json_response = response.json() - response_df = pd.DataFrame.from_dict(json_response['result'], orient='columns') - if len(response_df) == 0: - print(f'[WARN] {pair} not found') - return None - - pair_figi = response_df.kkg_pairFIGI_sd.unique() - - if (len(pair_figi) != 1): - print(f'[WARN] {len(pair_figi)} records was found for {pair}') - else: - print(f'[INFO] {pair} associated w/ FIGI {pair_figi[0]}') - - return pair_figi - - -# %% Tests -expected_pairs = { - 'WAX-USD': None, - 'ETH-USD': 'BBG00J3NBWD7', - 'BTC-USD': 'BBG006FCL7J4', - 'SOL-USD': 'BBG013WVY457', - 'UNI-USD': 'BBG013TZFVW3', - 'SUSHI-USD': 'KKG0000010W1' -} - -for k, v in expected_pairs.items(): - actual = get_figi(k) - print(actual) - assert actual == v - - -# %% -pair_names = [x[:-4] for x in os.listdir("../data")] - -def insert_dash(text: str, position: int) -> str: - if '-' not in text: - return text[:position] + '-' + text[position:] - else: - return text - -usd_pairs = [ - insert_dash(s.upper(), 3) - for s in pair_names if "usd" in s -] - -print(usd_pairs) - - -pair_figi_list = [get_figi(p) for p in usd_pairs] - -# %% ---- -conn_settings = ConnectionSettings(server='***.database.windows.net', database='market-data-db', username='***', password='***') -db_conn = AzureDbConnection(conn_settings) - -db_conn.connect() -for t in db_conn.get_tables(): - print(t) - - -# %% -db_mapping = { - 'FIGI': types.VARCHAR(length=12), - 'open': types.DECIMAL(precision=19, scale=9), - 'high': types.DECIMAL(precision=19, scale=9), - 'close': types.DECIMAL(precision=19, scale=9), - 'low': types.DECIMAL(precision=19, scale=9), - 'volume': types.DECIMAL(precision=19, scale=9), - 'time': types.DATETIME(), - 'source_id': types.SMALLINT, - 'version': types.VARCHAR(length=12), - 'interval': types.CHAR(length=2) -} - - -query = sql.text("select * from Cryptocurrency where figi = 'ustusd'") -result = db_conn._conn.execute(query).fetchall() - - -# %% -db_conn.dispose() -print('Completed')