From 3882efc9a23fabd95dfd73fccf95f384d9366927 Mon Sep 17 00:00:00 2001 From: codez0mb1e Date: Mon, 17 Oct 2022 17:02:23 +0000 Subject: [PATCH] Update parser --- src/bitfinex_crypto_parser.py | 103 ++++++++++++++++++++++++---------- 1 file changed, 72 insertions(+), 31 deletions(-) diff --git a/src/bitfinex_crypto_parser.py b/src/bitfinex_crypto_parser.py index 313cc3b..20a386b 100644 --- a/src/bitfinex_crypto_parser.py +++ b/src/bitfinex_crypto_parser.py @@ -1,42 +1,41 @@ #!/usr/bin/python3 """ -Data source: https://www.kaggle.com/code/tencars/bitfinexdataset +Data source: https://www.kaggle.com/datasets/tencars/392-crypto-currency-pairs-at-minute-resolution """ # %% import os + import numpy as np import pandas as pd from sqlalchemy import types from azure import AzureDbConnection, ConnectionSettings - # %% -#> ~/apps/resistance/data +# In terminal: +#> kaggle -v # must be >1.15 +#> mkdir data; cd data #> kaggle datasets download tencars/392-crypto-currency-pairs-at-minute-resolution #> unzip 392-crypto-currency-pairs-at-minute-resolution.zip -input_path = "../data" +input_dir = "../data" # Get names and number of available currency pairs -pair_names = [x[:-4] for x in os.listdir(input_path)] -n_pairs = len(pair_names) +pair_names = [x[:-4] for x in os.listdir(input_dir)] +usd_pairs = [s for s in pair_names if "usd" in s] # Print the first 50 currency pair names -print("These are the first 50 out of {} currency pairs in the dataset:".format(n_pairs)) -print(pair_names[0:50]) - -usd_pairs = [s for s in pair_names if "usd" in s] -print(usd_pairs) +print(f"These are the first 10 out of {len(usd_pairs)} currency pairs in the dataset:") +print(usd_pairs[0:10]) # %% -def load_data(symbol, source=input_path): - path_name = source + "/" + symbol + ".csv" +def load_data(symbol: str, input_dir: str) -> pd.DataFrame: + path_name = input_dir + "/" + symbol + ".csv" # Load data df = pd.read_csv(path_name, index_col='time', dtype={'open': np.float64, 'high': np.float64, 'low': np.float64, 'close': np.float64, 'volume': np.float64}) @@ -50,23 +49,50 @@ def load_data(symbol, source=input_path): return df[['open', 'high', 'low', 'close', 'volume']] +def calc_ohlcv_1h(df: pd.DataFrame) -> pd.DataFrame: + df['hour'] = df.index.to_period('H') + + return ( + df + .groupby(['hour']) + .agg( + { + 'open': 'first', + 'high': max, + 'low': min, + 'close': 'last', + 'volume': sum, + #'time': max + } + ) + .reset_index() + ) + + # %% ---- -sample_df = load_data("ethusd") -sample_df +ethusd_1m = load_data("ethusd", input_dir) +ethusd_1h = calc_ohlcv_1h(ethusd_1m) + +ethusd_1h.tail() # %% ---- -db_conn = AzureDbConnection(conn_settings) +conn_settings = ConnectionSettings( + 'datainstinct', + 'market-data-db', + 'demo', + '0test_test_AND_test' +) +db_conn = AzureDbConnection(conn_settings) db_conn.connect() + for t in db_conn.get_tables(): print(t) # %% -min_candels_n = 10000 - db_mapping = { 'FIGI': types.CHAR(length=12), 'open': types.DECIMAL(precision=19, scale=9), @@ -75,28 +101,43 @@ db_mapping = { 'low': types.DECIMAL(precision=19, scale=9), 'volume': types.DECIMAL(precision=19, scale=9), 'time': types.DATETIME(), - 'source_id': types.SMALLINT, + 'source_id': types.SMALLINT(), 'version': types.VARCHAR(length=12), 'interval': types.CHAR(length=2) } + +# %% +pd.options.mode.chained_assignment = None + +min_candels_n = 10000 + for pair in usd_pairs: - print(f'Starting read {pair}...') - candles_df = load_data(pair) + print(f'INFO | {pair} > Starting read dataset...') - candles_df['FIGI'] = pair - candles_df['time'] = candles_df.index - candles_df['source_id'] = 128 - candles_df['version'] = 'v202206' - candles_df['interval'] = '1M' + candles_df = load_data(pair, input_dir) - if candles_df.shape[0] > min_candels_n: - print('{} rows from {} to {}'.format(candles_df.shape[0], min(candles_df['time']), max(candles_df['time']))) + if len(candles_df) > min_candels_n: - print(f'Starting insert {pair}...') - db_conn.insert(candles_df, 'crypto', db_mapping) + df = candles_df.loc['2022-07-01':'2022-10-01'] + + if len(df) > 0: + df = calc_ohlcv_1h(df) + + df['FIGI'] = pair + df['time'] = df.hour.apply(lambda h: h.to_timestamp()) + df['source_id'] = 1 + df['version'] = 'v20221001' + df['interval'] = '1H' + df.drop(columns='hour', inplace=True) + + print(f'INFO | {pair} > Starting insert to DB...') + print('DEBUG | {} rows from {} to {}'.format(df.shape[0], min(df['time']), max(df['time']))) + db_conn.insert(df, 'crypto', db_mapping) + else: + print(f'WARN | {pair} > No new records') else: - print(f'WARN: {pair} has only {candles_df.shape[0]} records') + print(f'WARN | {pair} > Only {candles_df.shape[0]} records') # %%