대감집

[FinRL] 가상화폐 주요 지표 #1 본문

퀀트 투자/FinRL

[FinRL] 가상화폐 주요 지표 #1

SKY-STONE 2024. 9. 17. 16:23

Overview

  • Crypto(가상화폐) 데이터를 통해서 Feature중 어떤것이 가장 중요한 요소인지 판별을 진행
  • Triple barrier method를 통해서 머신러닝 Training/Test 가능한 환경을 만들어줌
  • Pertubation Rank(PR)를 통해서 Test시 중요한 Feature가 무엇인지 Ranking을 매김
    • OHCLV보다 보조지표가 더욱 중요한 요인임을 알 수 있음

 


작업 경로: /root/FinRL-Tutorials/2-Advance/Crypto_Feature_Importance.ipynb


Part 0: Import Modules

# Other imports
import scipy as sp
import math
import pandas as pd
import requests
import json
import matplotlib.dates as mdates
import numpy as np
import pickle
import shutil
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from meta.data_processor import DataProcessor

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import seaborn as sns

from datetime import datetime, timedelta
from talib.abstract import MACD, RSI, CCI, DX
from binance.client import Client
from pandas.testing import assert_frame_equal
from sklearn import metrics
from sklearn.metrics import classification_report
from sklearn.model_selection import train_test_split
from imblearn.over_sampling import SMOTE

from sklearn.preprocessing import MinMaxScaler 
from torch.utils.data import Dataset, DataLoader, WeightedRandomSampler
from IPython.display import display, HTML

#from google.colab import files
plt.style.available

# Plot settings
SCALE_FACTOR = 2
plt.style.use('seaborn-v0_8')
plt.rcParams['figure.figsize'] = [5 * SCALE_FACTOR, 2 * SCALE_FACTOR]
plt.rcParams['figure.dpi'] = 300 * SCALE_FACTOR
plt.rcParams['font.size'] = 5 * SCALE_FACTOR
plt.rcParams['axes.labelsize'] = 5 * SCALE_FACTOR
plt.rcParams['axes.titlesize'] = 6 * SCALE_FACTOR
plt.rcParams['xtick.labelsize'] = 4 * SCALE_FACTOR
plt.rcParams['ytick.labelsize'] = 4 * SCALE_FACTOR
plt.rcParams['font.family'] = 'serif'

Part 1: Crypto Data Download

#1.1 Set contants and use BinanceProcessor
ticker_list = ['ETHUSDT']

TIME_INTERVAL = '1h'

TRAIN_START_DATE = '2015-01-01'
TRAIN_END_DATE = '2024-09-17'


technical_indicator_list = ['macd',
                             'macd_signal',
                             'macd_hist',
                             'rsi',
                             'cci',
                             'dx'
                             ]

if_vix = False

#Process data using unified data processor
p = DataProcessor(data_source='binance', start_date=TRAIN_START_DATE, end_date=TRADE_END_DATE, time_interval=TIME_INTERVAL)
p.download_data(ticker_list=ticker_list)
p.clean_data()

#1.2 Add technical indicators
def add_technical_indicator(df, tech_indicator_list):
    # print('Adding self-defined technical indicators is NOT supported yet.')
    # print('Use default: MACD, RSI, CCI, DX.')

    final_df = pd.DataFrame()
    for i in df.tic.unique():
        tic_df = df[df.tic == i].copy()
        tic_df['rsi'] = RSI(tic_df['close'], timeperiod=14)
        tic_df['macd'], tic_df['macd_signal'], tic_df['macd_hist'] = MACD(tic_df['close'], fastperiod=12,
                                                                          slowperiod=26, signalperiod=9)
        tic_df['cci'] = CCI(tic_df['high'], tic_df['low'], tic_df['close'], timeperiod=14)
        tic_df['dx'] = DX(tic_df['high'], tic_df['low'], tic_df['close'], timeperiod=14)
        final_df = pd.concat([final_df, tic_df])
    return final_df
    
processed_df=add_technical_indicator(df,technical_indicator_list)

processed_df.index=pd.to_datetime(processed_df.time)
processed_df.drop('time', inplace=True, axis=1)

Part 2: Triple barrier method/Data Labeling

  • Triple barrier 사용 목적
    • 더 정확한 시그널 포착: 단순히 시간이 지난 후의 가격 변화만을 고려하는 기존 방법보다, 가격이 특정 수준을 넘어섰을 때를 기준으로 레이블을 부여하여 더욱 정확한 시장의 변동을 포착할 수 있습니다.
    • 유연한 설정: 상황에 맞게 상한, 하한, 시간 제한 등 다양한 파라미터를 조절하여 모델에 맞는 레이블을 생성할 수 있습니다.
    • 다양한 모델 적용: 트리플 배리어 메소드로 생성된 레이블은 다양한 기계 학습 모델에 적용할 수 있습니다.
  • Triple barrier 작동 원리
    • 세 개의 장벽 설정: 현재 가격을 기준으로 위쪽(상한), 아래쪽(하한), 시간 제한(시간 장벽)이라는 세 개의 장벽을 설정합니다.
    • 장벽 터치 여부 판단: 가격이 설정된 장벽 중 하나에 먼저 닿으면 해당 이벤트를 기록합니다.
    • 레이블 부여: 어떤 장벽에 먼저 닿았는지에 따라 '상승', '하락', '불확실' 등의 레이블을 부여합니다.
  • Triple barrier 장점
    • 실제 투자 환경 반영: 시간 제한을 설정하여 실제 투자에서 발생할 수 있는 기회 비용을 고려합니다.
    • 다양한 시장 상황에 적용 가능: 시장의 변동성에 따라 장벽을 유동적으로 조절할 수 있습니다.
    • 모델 성능 향상: 더욱 정확하고 풍부한 정보를 담은 레이블을 생성하여 모델의 예측 성능을 향상시킵니다.
  • Triple barrier 단점
    • 파라미터 설정의 어려움: 최적의 장벽 설정을 위해서는 다양한 시나리오를 고려해야 합니다.
    • 계산 비용 증가: 세 개의 장벽을 설정하고 이를 모니터링해야 하므로 계산 비용이 증가할 수 있습니다.
#2.1 Add volatility
def get_vol(prices, span=100):
    # 1. compute returns of the form p[t]/p[t-1] - 1
    df0 = prices.pct_change()
    # 2. estimate rolling standard deviation
    df0 = df0.ewm(span=span).std()
    return df0
    
data_ohlcv = processed_df.assign(volatility=get_vol(processed_df.close)).dropna()

#2.2 Adding Path Dependency: Triple-Barrier Method
def get_barriers(daily_volatility, t_final, upper_lower_multipliers, prices):
  #create a container
  barriers = pd.DataFrame(columns=['days_passed', 'price', 'vert_barrier', 'top_barrier', 'bottom_barrier'], index = daily_volatility.index)
  for day, vol in daily_volatility.items():
    days_passed = len(daily_volatility.loc \
                  [daily_volatility.index[0] : day])
    #set the vertical barrier 
    if (days_passed + t_final < len(daily_volatility.index) and t_final != 0): vert_barrier = daily_volatility.index[days_passed + t_final]
    else:
        vert_barrier = np.nan
    #set the top barrier
    if upper_lower_multipliers[0] > 0:
        top_barrier = prices.loc[day] + prices.loc[day] * upper_lower_multipliers[0] * vol
    else:
        #set it to NaNs
        top_barrier = pd.Series(index=prices.index)
    #set the bottom barrier
    if upper_lower_multipliers[1] > 0:
        bottom_barrier = prices.loc[day] - prices.loc[day] * upper_lower_multipliers[1] * vol
    else: 
        #set it to NaNs
        bottom_barrier = pd.Series(index=prices.index)
        
    barriers.loc[day, ['days_passed', 'price', 'vert_barrier','top_barrier', 'bottom_barrier']] = \
    days_passed, prices.loc[day], vert_barrier, top_barrier, bottom_barrier

  return barriers
  
# Set barrier parameters
daily_volatility = data_ohlcv['volatility']
t_final = 25
upper_lower_multipliers = [2, 2]
price = data_ohlcv['close']
prices = price[daily_volatility.index]

barriers = get_barriers(daily_volatility, t_final, upper_lower_multipliers, prices)

#2.3 Function to get label for the dataset(0, 1, 2)
def get_labels():
  '''
  start: first day of the window
  end:last day of the window
  price_initial: first day stock price
  price_final:last day stock price
  top_barrier: profit taking limit
  bottom_barrier:stop loss limt
  condition_pt:top_barrier touching conditon
  condition_sl:bottom_barrier touching conditon
  '''

  barriers["label_barrier"] = None
  for i in range(len(barriers.index)):
    start = barriers.index[i]
    end = barriers.vert_barrier[i]
    if pd.notna(end):

        # assign the initial and final price
        price_initial = barriers.price[start]
        price_final = barriers.price[end]

        # assign the top and bottom barriers
        top_barrier = barriers.top_barrier[i]
        bottom_barrier = barriers.bottom_barrier[i]

        #set the profit taking and stop loss conditons
        condition_pt = (barriers.price[start: end] >= \
          top_barrier).any()
        condition_sl = (barriers.price[start: end] <= \
          bottom_barrier).any()

        #assign the labels
        if condition_pt: 
            barriers['label_barrier'][i] = 2
        elif condition_sl: 
            barriers['label_barrier'][i] = 0    
        else: 

          # Change to regression analysis by switching labels (-1, 0, 1)
          # and uncommenting the alternative function for vert barrier

          barriers['label_barrier'][i] = 1
            # barriers['label_barrier'][i] = max(
            #           [(price_final - price_initial)/ 
            #             (top_barrier - price_initial), \
            #             (price_final - price_initial)/ \
            #             (price_initial - bottom_barrier)],\
            #             key=abs)

  return barriers
  
# Use function to produce barriers
get_labels()
barriers

# Merge the barriers with the main dataset and drop the last t_final + 1 barriers (as they are too close to the end)
data_ohlcv = data_ohlcv.merge(barriers[['vert_barrier', 'top_barrier', 'bottom_barrier', 'label_barrier']], left_on='time', right_on='time')
data_ohlcv.drop(data_ohlcv.tail(t_final + 1).index,inplace = True)
data_ohlcv.head(5)

# Count barrier hits ( 0 = stoploss, 1 = timeout, 2 = profit take)
pd.Series(data_ohlcv['label_barrier']).value_counts()

fig,ax = plt.subplots()
plt.xticks(rotation=45)
plt.gca().xaxis.set_major_formatter(mdates.DateFormatter('%m/%d/%Y'))
plt.gca().xaxis.set_major_locator(mdates.DayLocator(interval=1))


TIMESTAMP_TO_PLOT= 300

ax.set(title='ETH/USDT',
       xlabel='date', ylabel='price')
ax.plot(data_ohlcv.close[200:600])

start = data_ohlcv.index[TIMESTAMP_TO_PLOT]
end = data_ohlcv.vert_barrier[TIMESTAMP_TO_PLOT]
upper_barrier = data_ohlcv.top_barrier[TIMESTAMP_TO_PLOT]
lower_barrier = data_ohlcv.bottom_barrier[TIMESTAMP_TO_PLOT]

ax.plot([start, end], [upper_barrier, upper_barrier], 'r--');
ax.plot([start, end], [lower_barrier, lower_barrier], 'r--');
ax.plot([start, end], [(lower_barrier + upper_barrier)*0.5, \
                       (lower_barrier + upper_barrier)*0.5], 'r--');
ax.plot([start, start], [lower_barrier, upper_barrier], 'r-');
ax.plot([end, end], [lower_barrier, upper_barrier], 'r-');

label_barrier
2(profit take) = 32865
1(timeout)  = 5913
0(stoploss) = 23134

Part 3: Copying the Nueral Network presnet in ElegantRL ActorPPO agent

data_ohlcv = data_ohlcv.drop(['vert_barrier', 'top_barrier', 'bottom_barrier','adjusted_close','tic'], axis = 1)

class Net(nn.Module):

    def __init__(self):
        super(Net, self).__init__()

        self.state_dim = 12       # all the features
        self.mid_dim = 2**10      # net dimension
        self.action_dim = 3       # output (sell/nothing/buy)

        # make a copy of the model in ActorPPO (activation function in forward function)

        # Original initial layers
        self.fc1 = nn.Linear(self.state_dim, self.mid_dim)
        self.relu1 = nn.ReLU()
        self.fc2 = nn.Linear(self.mid_dim, self.mid_dim)

        # Original residual layers
        self.relu2 = nn.ReLU()
        self.fc3 = nn.Linear(self.mid_dim, self.mid_dim)
        self.hw1 = nn.Hardswish()
        self.fc_out = nn.Linear(self.mid_dim, self.action_dim)

    def forward(self, x):
        x = x.float()

        # Original initial layers
        x = self.fc1(x)
        x = self.relu1(x)
        x = self.fc2(x)

        # Original residual layers
        x = self.relu2(x)
        x = self.fc3(x)
        x = self.hw1(x)
        x = self.fc_out(x)
        return x

model_NN1 = Net()
print(model_NN1)

class ClassifierDataset(Dataset):
    
    def __init__(self, X_data, y_data):
        self.X_data = X_data
        self.y_data = y_data
        
    def __getitem__(self, index):
        return self.X_data[index], self.y_data[index]
        
    def __len__ (self):
        return len(self.X_data)

#3.1 Set constants and train/test split
# Set constants
batch_size=16
epochs=300

# Reinitiating data here
data = data_ohlcv

X = data[['open', 'high', 'low', 'close', 'volume', 'rsi', 'macd', 'macd_signal', 'macd_hist', 'cci', 'dx', 'volatility']].values
y = np.squeeze(data[['label_barrier']].values).astype(int)

# Split into train+val and test
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, stratify=y, random_state=69)

# Normalize input
scaler = MinMaxScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)

# Convert to numpy arrays
X_train, y_train = np.array(X_train), np.array(y_train)
X_test, y_test = np.array(X_test), np.array(y_test)

# initialize sets and convet them to Pytorch dataloader sets
train_dataset = ClassifierDataset(torch.from_numpy(X_train).float(), torch.from_numpy(y_train.astype(int)).long())
test_dataset = ClassifierDataset(torch.from_numpy(X_test).float(), torch.from_numpy(y_test.astype(int)).long())
train_loader = DataLoader(dataset=train_dataset, batch_size=batch_size)
test_loader = DataLoader(dataset=test_dataset, batch_size=1)

#3.2 Check if GPU available
# Check GPU
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

# Set optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model_NN1.parameters(), lr=0.0001)

#3.3 Train/Test model
# Train function
def train(fold, model, device, trainloader, optimizer, epoch):
  model.train()
  correct_train = 0
  correct_this_batch_train = 0
  total_train_loss = 0
  for batch_idx, (data, target) in enumerate(train_loader):
      data, target = data.to(device), target.to(device)
      optimizer.zero_grad()
      output = model(data)
      train_loss = criterion(output, target.flatten())
      train_loss.backward()
      optimizer.step()

      if batch_idx % 100 == 0:
          print('Train Fold/Epoch: {}/{} [{}/{} ({:.0f}%)]\ttrain_loss: {:.6f}'.format(
              fold,epoch, batch_idx * len(data), len(train_loader.dataset),
              100. * batch_idx / len(train_loader), train_loss.item()))
          
      # Measure accuracy on train set
      total_train_loss += train_loss.item()
      _, y_pred_tags_train = torch.max(output, dim = 1)
      correct_this_batch_train = y_pred_tags_train.eq(target.flatten().view_as(y_pred_tags_train))
      correct_train += correct_this_batch_train.sum().item()
  
  return correct_train, train_loss

# Test function
def test(fold,model, device, test_loader, correct_train, train_loss):
  model.eval()
  test_loss = 0
  correct = 0
  with torch.no_grad():
      for data, target in test_loader:
          data, target = data.to(device), target.to(device)
          output = model(data)
          test_loss += criterion(output, target.flatten()).item()  # sum up batch loss

          # Measure accuracy on test set
          _, y_pred_tags = torch.max(output, dim = 1)
          correct_this_batch = y_pred_tags.eq(target.flatten().view_as(y_pred_tags))
          correct += correct_this_batch.sum().item() 

  test_loss /= len(test_loader.dataset)
  train_loss /= len(train_loader.dataset)

  # Print train accuracy for epoch
  # TODO: still a bug in summed up batch train loss 
  print('\nTrain set for fold {}: Average train_loss: {:.4f}, Accuracy: {}/{} ({:.5f}%)'.format(
  fold, train_loss, correct_train, len(train_loader.dataset),
  100 * correct_train / len(train_loader.dataset)))

  # Print test result for epoch
  print('Test set for fold {}:  Average test_loss:  {:.4f}, Accuracy: {}/{} ({:.5f}%)\n'.format(
      fold, test_loss, correct, len(test_loader.dataset),
      100 * correct / len(test_loader.dataset)))  

model_NN1.to(device)

# State fold (no PurgedKFold build yet, ignore this)
# took about 1hour to train when epochs=300

epochs=100
fold = 0
for epoch in range(1, epochs + 1):
  correct_train, train_loss = train(fold, model_NN1, device, train_loader, optimizer, epoch)
  test(fold, model_NN1, device, test_loader, correct_train, train_loss)

# Save model to disk and save in your own files to save you some time
filename = 'model_NN1'
out = open(filename, 'wb')

with open(filename + '.pkl', 'wb') as fid:
  pickle.dump(model_NN1, fid)

# load pickle file
with open(filename + '.pkl', 'rb') as fid:
     model_NN1 = pickle.load(fid)

# load pickle filetorch.from_numpy(y_test.astype(int)).long()
filename = 'model_NN1'
with open(filename + '.pkl', 'rb') as fid:
     model_NN1_pickle = pickle.load(fid)

with torch.no_grad():
  # Show accuracy on test set
  model_NN1.eval()

  # predict proba
  y_pred_nn1_proba = model_NN1(torch.from_numpy(X_test).float().to(device))
  y_pred_nn1 = torch.argmax(y_pred_nn1_proba, dim=1)
  y_pred_nn1 = y_pred_nn1.cpu().detach().numpy()

# print predction values
print('labels in prediction:', np.unique(y_pred_nn1), '\n')

# print report
label_names = ['long', 'no bet', 'short']
print(classification_report(y_test.astype(int), y_pred_nn1, target_names=label_names))

np.bincount(y_pred_nn1)


array([4524,  665, 7194])

Part 4: Feature Importance Analysis

  • Pertubation Rank(PR) 사용목적
    • 머신러닝 모델에서 각 특징의 중요도를 평가하는 방법입니다.
    • 데이터셋의 각 특징에 의도적으로 작은 변화를 주고 모델의 성능 변화를 관찰하여 특징의 중요도를 파악합니다.
  • Pertubation Rank(PR) 작동 원리
    • 기준 성능 측정: 원본 데이터셋에서 모델의 기본 성능을 측정합니다.
    • 특징 변형: 각 특징에 노이즈를 추가하거나 값을 변경하는 등의 방식으로 특징을 변형합니다.
    • 성능 평가: 변형된 데이터셋으로 모델을 다시 실행하고 성능을 평가합니다.
    • 랭크 계산: 특징을 변형했을 때 모델의 성능이 가장 크게 저하되는 특징을 가장 중요한 특징으로 판단합니다.
  • Pertubation Rank(PR) 장점
    • 모델에 구애받지 않음: 어떤 머신러닝 모델에도 적용할 수 있습니다.
    • 해석 가능성: 특징 중요도를 직관적으로 파악할 수 있습니다.
    • 견고성: 데이터셋이나 모델의 특성에 크게 영향을 받지 않습니다.
  • Pertubation Rank(PR) 단점
    • 계산 비용: 모든 특징을 변형해야 하므로 계산량이 많을 수 있습니다.
    • 특징 상호작용: 여러 특징이 함께 작용할 때 나타나는 효과를 정확하게 반영하지 못할 수 있습니다.
    • 노이즈에 민감: 변형에 사용되는 노이즈의 종류와 크기에 따라 결과가 달라질 수 있습니다.
def perturbation_rank(model,x,y,names):
    errors = []

    X_saved = x
    y = y.flatten()

    with torch.no_grad():
        model.eval()
        for i in range(x.shape[1]):

            # Convert to numpy, shuffle, convert back to tensor, predict
            x = x.detach().numpy()
            np.random.shuffle(x[:,i])
            x = torch.from_numpy(x).float().to(device)
            pred = model(x)

            # log_loss requires (classification target, probabilities)
            pred = pred.cpu().detach().numpy()
            error = metrics.log_loss(y, pred)
            errors.append(error)

            # Reset x to saved tensor matrix
            x = X_saved
    
    max_error = np.max(errors)
    importance = [e/max_error for e in errors]
    
    data = {'name':names,'error':errors,'importance':importance}
    result = pd.DataFrame(data,columns = ['name','error','importance'])
    result.sort_values(by=['importance'],ascending=[0],inplace=True)
    result.reset_index(inplace=True,drop=True)
    return result

names = list(data_ohlcv.columns)
names.remove('label_barrier')
rank = perturbation_rank(model_NN1, 
                         torch.from_numpy(X_test).float(),
                         torch.from_numpy(y_test.astype(int)).long(),  
                         names
                         )

display(rank)


.