Update parser

This commit is contained in:
codez0mb1e 2022-10-17 17:02:23 +00:00
parent aa7d45380b
commit 3882efc9a2

View File

@ -1,42 +1,41 @@
#!/usr/bin/python3 #!/usr/bin/python3
""" """
Data source: https://www.kaggle.com/code/tencars/bitfinexdataset Data source: https://www.kaggle.com/datasets/tencars/392-crypto-currency-pairs-at-minute-resolution
""" """
# %% # %%
import os import os
import numpy as np import numpy as np
import pandas as pd import pandas as pd
from sqlalchemy import types from sqlalchemy import types
from azure import AzureDbConnection, ConnectionSettings from azure import AzureDbConnection, ConnectionSettings
# %% # %%
#> ~/apps/resistance/data # In terminal:
#> kaggle -v # must be >1.15
#> mkdir data; cd data
#> kaggle datasets download tencars/392-crypto-currency-pairs-at-minute-resolution #> kaggle datasets download tencars/392-crypto-currency-pairs-at-minute-resolution
#> unzip 392-crypto-currency-pairs-at-minute-resolution.zip #> unzip 392-crypto-currency-pairs-at-minute-resolution.zip
input_path = "../data" input_dir = "../data"
# Get names and number of available currency pairs # Get names and number of available currency pairs
pair_names = [x[:-4] for x in os.listdir(input_path)] pair_names = [x[:-4] for x in os.listdir(input_dir)]
n_pairs = len(pair_names) usd_pairs = [s for s in pair_names if "usd" in s]
# Print the first 50 currency pair names # Print the first 50 currency pair names
print("These are the first 50 out of {} currency pairs in the dataset:".format(n_pairs)) print(f"These are the first 10 out of {len(usd_pairs)} currency pairs in the dataset:")
print(pair_names[0:50]) print(usd_pairs[0:10])
usd_pairs = [s for s in pair_names if "usd" in s]
print(usd_pairs)
# %% # %%
def load_data(symbol, source=input_path): def load_data(symbol: str, input_dir: str) -> pd.DataFrame:
path_name = source + "/" + symbol + ".csv" path_name = input_dir + "/" + symbol + ".csv"
# Load data # Load data
df = pd.read_csv(path_name, index_col='time', dtype={'open': np.float64, 'high': np.float64, 'low': np.float64, 'close': np.float64, 'volume': np.float64}) df = pd.read_csv(path_name, index_col='time', dtype={'open': np.float64, 'high': np.float64, 'low': np.float64, 'close': np.float64, 'volume': np.float64})
@ -50,23 +49,50 @@ def load_data(symbol, source=input_path):
return df[['open', 'high', 'low', 'close', 'volume']] return df[['open', 'high', 'low', 'close', 'volume']]
def calc_ohlcv_1h(df: pd.DataFrame) -> pd.DataFrame:
df['hour'] = df.index.to_period('H')
return (
df
.groupby(['hour'])
.agg(
{
'open': 'first',
'high': max,
'low': min,
'close': 'last',
'volume': sum,
#'time': max
}
)
.reset_index()
)
# %% ---- # %% ----
sample_df = load_data("ethusd") ethusd_1m = load_data("ethusd", input_dir)
sample_df ethusd_1h = calc_ohlcv_1h(ethusd_1m)
ethusd_1h.tail()
# %% ---- # %% ----
conn_settings = ConnectionSettings(
'datainstinct',
'market-data-db',
'demo',
'0test_test_AND_test'
)
db_conn = AzureDbConnection(conn_settings) db_conn = AzureDbConnection(conn_settings)
db_conn.connect() db_conn.connect()
for t in db_conn.get_tables(): for t in db_conn.get_tables():
print(t) print(t)
# %% # %%
min_candels_n = 10000
db_mapping = { db_mapping = {
'FIGI': types.CHAR(length=12), 'FIGI': types.CHAR(length=12),
'open': types.DECIMAL(precision=19, scale=9), 'open': types.DECIMAL(precision=19, scale=9),
@ -75,28 +101,43 @@ db_mapping = {
'low': types.DECIMAL(precision=19, scale=9), 'low': types.DECIMAL(precision=19, scale=9),
'volume': types.DECIMAL(precision=19, scale=9), 'volume': types.DECIMAL(precision=19, scale=9),
'time': types.DATETIME(), 'time': types.DATETIME(),
'source_id': types.SMALLINT, 'source_id': types.SMALLINT(),
'version': types.VARCHAR(length=12), 'version': types.VARCHAR(length=12),
'interval': types.CHAR(length=2) 'interval': types.CHAR(length=2)
} }
# %%
pd.options.mode.chained_assignment = None
min_candels_n = 10000
for pair in usd_pairs: for pair in usd_pairs:
print(f'Starting read {pair}...') print(f'INFO | {pair} > Starting read dataset...')
candles_df = load_data(pair)
candles_df['FIGI'] = pair candles_df = load_data(pair, input_dir)
candles_df['time'] = candles_df.index
candles_df['source_id'] = 128
candles_df['version'] = 'v202206'
candles_df['interval'] = '1M'
if candles_df.shape[0] > min_candels_n: if len(candles_df) > min_candels_n:
print('{} rows from {} to {}'.format(candles_df.shape[0], min(candles_df['time']), max(candles_df['time'])))
print(f'Starting insert {pair}...') df = candles_df.loc['2022-07-01':'2022-10-01']
db_conn.insert(candles_df, 'crypto', db_mapping)
if len(df) > 0:
df = calc_ohlcv_1h(df)
df['FIGI'] = pair
df['time'] = df.hour.apply(lambda h: h.to_timestamp())
df['source_id'] = 1
df['version'] = 'v20221001'
df['interval'] = '1H'
df.drop(columns='hour', inplace=True)
print(f'INFO | {pair} > Starting insert to DB...')
print('DEBUG | {} rows from {} to {}'.format(df.shape[0], min(df['time']), max(df['time'])))
db_conn.insert(df, 'crypto', db_mapping)
else: else:
print(f'WARN: {pair} has only {candles_df.shape[0]} records') print(f'WARN | {pair} > No new records')
else:
print(f'WARN | {pair} > Only {candles_df.shape[0]} records')
# %% # %%