RegressionTree
class, which is based on a recursive approach. Intentionally we will start with a very inefficient, but easy to understand implementation of this class, in order to be able to further improve it. class RegressionTree(): ''' Класс RegressionTree решает задачу регрессии. Основан на рекурсивных вызовах, когда прописываются условия выхода из рекурсии. ''' def __init__(self, max_depth=3, n_epoch=10, min_size=8): ''' Объявляем переменные класса. ''' self.max_depth = max_depth # максимальная глубина self.min_size = min_size # минимальный размер поддерева self.value = 0 # значение в поддереве (среднее по всем листьям) self.feature_idx = -1 # номер лучшего признака self.feature_threshold = 0 # значение лучшего признака self.left = None # левый потомок self.right = None # правый потомок def fit(self, X, y): ''' Процедура обучения дерева. На выходе получим обученную модель. ''' # инициализируем начальные значения self.value = y.mean() base_error = ((y - self.value) ** 2).sum() error = base_error flag = 0 # ошибки в левом и правом поддереве prev_error_left = base_error prev_error_right = 0 # если дошли до глубины 0 - выходим if self.max_depth <= 1: return dim_shape = X.shape[1] # значения в левом и правом поддереве left_value = 0 right_value = 0 # начинаем цикл по признакам for feat in range(dim_shape): # сортируем признаки idxs = np.argsort(X[:, feat]) # количество сэмплов в левом и правом поддереве N = X.shape[0] N1, N2 = N, 0 thres = 1 # начинаем проходиться по значениям признака while thres < N - 1: N1 -= 1 N2 += 1 idx = idxs[thres] x = X[idx, feat] # пропускаем одинаковые признаки if thres < N - 1 and x == X[idxs[thres + 1], feat]: thres += 1 continue # данные, которые получаются у нас в результате такого сплита target_right = y[idxs][:thres] target_left = y[idxs][thres:] mean_right = y[idxs][:thres].mean(), mean_left = y[idxs][thres:].mean() # на этом шаге уже нужно считать ошибку - # генерируем предикты (среднее в потомках) left_shape = target_left.shape[0] right_shape = target_right.shape[0] mean_left_array = [mean_left for _ in range(left_shape)] mean_right_array = [mean_right for _ in range(right_shape)] # считаем ошибку слева и справа prev_error_left = N1/N * mse(target_left, mean_left_array) prev_error_right = N2/N * mse(target_right, mean_right_array) # если выполняются условия сплита, то обновляем if (prev_error_left + prev_error_right < error): if (min(N1,N2) > self.min_size): self.feature_idx = feat self.feature_threshold = x left_value = mean_left right_value = mean_right flag = 1 error = prev_error_left + prev_error_right thres += 1 # если не нашли лучший сплит, выходим if self.feature_idx == -1: return # дошли сюда - есть хорошее разбиение, нужно обучать дальше # инициализируем потомков - те же деревья решений self.left = RegressionTree(self.max_depth - 1) self.left.value = left_value self.right = RegressionTree(self.max_depth - 1) self.right.value = right_value # индексы потомков idxs_l = (X[:, self.feature_idx] > self.feature_threshold) idxs_r = (X[:, self.feature_idx] <= self.feature_threshold) # обучаем self.left.fit(X[idxs_l, :], y[idxs_l]) self.right.fit(X[idxs_r, :], y[idxs_r]) def __predict(self, x): ''' Функция для генерирования предсказания - смотрим узлы, идем в соответствующих потомков и смотрим в конце self.value - это и будет ответом. ''' if self.feature_idx == -1: return self.value if x[self.feature_idx] > self.feature_threshold: return self.left.__predict(x) else: return self.right.__predict(x) def predict(self, X): ''' Предикт для матрицы - просто для каждой строчки вызываем __predict(). ''' y = np.zeros(X.shape[0]) for i in range(X.shape[0]): y[i] = self.__predict(X[i]) return y
fit
method, as the name implies, teaches the model. The training sample is sent to the input and the tree learning procedure takes place. Sorting signs, we are looking for the best partitioning of the tree in terms of reduced entropy, in this case mse
. Determining that we managed to find a good split is very simple; two conditions are enough. We do not want few objects to fall into the partition (protection against retraining), and the weighted average error by mse
must be less than the error that is now in the tree - we are looking for that very increase in information gain . Having thus all the signs and all the unique values for them, we will sort out all the options and choose the best partition. And further we do the recursive call on the received partitions until the conditions for exit from recursion are fulfilled.__predict
method, as the name implies, makes a prediction. Having received an object as input, it goes along the nodes of the obtained tree - each node has a sign number and a value on it, and depending on what value the incoming method of the object has on this sign, we go either to the right descendant or to the left, until we reach the sheet, in which will be the answer for this object.predict
method does the same as the last method, only for a group of objects. data = datasets.fetch_california_housing() X = np.array(data.data) y = np.array(data.target)
max_depth
in ourselves and in Sklearn, let it be equal to 2. %%time A = RegressionTree(2) # это наш алгоритм A.fit(X,y)
from sklearn.tree import DecisionTreeRegressor %%time model = DecisionTreeRegressor(max_depth=2) # из Sklearn model.fit(X,y)
mse
method, which works beyond the line. This makes the error recalculation so inefficient! After all, then the difficulty of finding a split increases to O(N2∗d) that when large N tremendously slows down the algorithm. Therefore, smoothly proceed to the next item.mse
, we are not lucky: it’s quite difficult to derive a quick error recalculation, but when working with other metrics (for example, the Gini criterion, if we solve the classification problem), the quick recalculation output is much easier.sumNi=1(yi− frac sumNi=1yiN)2= sumN−1i=1(yi− frac sumNi=1yiN)2+(yN− frac sumNi=1yiN)2
sumN−1i=1(yi− frac sumNi=1yiN)2= sumN−1i=1(yi− frac sumN−1i=1yi+yNN)2= sumN−1i=1( fracNyi− sumN−1i=1yiN− fracyNN)2= sumN−1i=1( frac(N−1)yi− sumN−1i=1yiN− fracyN−yiN)2=sumN−1i=1( frac(N−1)yi− sumN−1i=1yiN)2−2( frac(N−1)yi− sumN−1i=1yiN) fracyN−yiN+( fracyN−yiN)2= sumN−1i=1( frac(N−1)yi− sumN−1i=1yiN)2− sumN−1i=1(2( frac(N−1)yi− sumN−1i=1yiN) fracyN−yiN−( fracyN−yiN)2)= sumN−1i=1( frac(N−1)yi− sumN−1i=1yiN−1)2∗( fracN−1N)2− sumN−1i=1(2( frac(N−1)yi− sumN−1i=1yiN) fracyN−yiN−−( fracyN−yiN)2)
sumNi=1(yi− frac sumNi=1yiN)2= sumN−1i=1( frac(N−1)yi− sumN−1i=1yiN−1)2∗( fracN−1N)2− sumN−1i=1(2( frac(N−1)yi− sumN−1i=1yiN)( fracyN−yiN)−( fracyN−yiN)2)+(yN− sumNi=1 fracyiN)2
class RegressionTreeFastMse(): ''' Класс RegressionTree с быстрым пересчетом ошибки. Сложность пересчета ошибки на каждой итерации составляет O(1). ''' # объявляем характеристики класса def __init__(self, max_depth=3, min_size=10): self.max_depth = max_depth self.min_size = min_size self.value = 0 self.feature_idx = -1 self.feature_threshold = 0 self.left = None self.right = None # процедура обучения - сюда передается обучающая выборка def fit(self, X, y): # начальное значение - среднее значение y self.value = y.mean() # начальная ошибка - mse между значением в листе (пока нет # разбиения, это среднее по всем объектам) и объектами base_error = ((y - self.value) ** 2).sum() error = base_error flag = 0 # пришли в максимальную глубину if self.max_depth <= 1: return dim_shape = X.shape[1] left_value, right_value = 0, 0 for feat in range(dim_shape): prev_error1, prev_error2 = base_error, 0 idxs = np.argsort(X[:, feat]) # переменные для быстрого переброса суммы mean1, mean2 = y.mean(), 0 sm1, sm2 = y.sum(), 0 N = X.shape[0] N1, N2 = N, 0 thres = 1 while thres < N - 1: N1 -= 1 N2 += 1 idx = idxs[thres] x = X[idx, feat] # вычисляем дельты - по ним в основном будет делаться переброс delta1 = (sm1 - y[idx]) * 1.0 / N1 - mean1 delta2 = (sm2 + y[idx]) * 1.0 / N2 - mean2 # увеличиваем суммы sm1 -= y[idx] sm2 += y[idx] # пересчитываем ошибки за O(1) prev_error1 += (delta1**2) * N1 prev_error1 -= (y[idx] - mean1)**2 prev_error1 -= 2 * delta1 * (sm1 - mean1 * N1) mean1 = sm1/N1 prev_error2 += (delta2**2) * N2 prev_error2 += (y[idx] - mean2)**2 prev_error2 -= 2 * delta2 * (sm2 - mean2 * N2) mean2 = sm2/N2 # пропускаем близкие друг к другу значения if thres < N - 1 and np.abs(x - X[idxs[thres + 1], feat]) < 1e-5: thres += 1 continue # 2 условия, чтобы осуществить сплит - уменьшение ошибки # и минимальное кол-о эл-в в каждом листе if (prev_error1 + prev_error2 < error): if (min(N1,N2) > self.min_size): # переопределяем самый лучший признак и границу по нему self.feature_idx, self.feature_threshold = feat, x # переопределяем значения в листах left_value, right_value = mean1, mean2 # флаг - значит сделали хороший сплит flag = 1 error = prev_error1 + prev_error2 thres += 1 # ничего не разделили, выходим if self.feature_idx == -1: return self.left = RegressionTreeFastMse(self.max_depth - 1) # print ("Левое поддерево с глубиной %d"%(self.max_depth - 1)) self.left.value = left_value self.right = RegressionTreeFastMse(self.max_depth - 1) # print ("Правое поддерево с глубиной %d"%(self.max_depth - 1)) self.right.value = right_value idxs_l = (X[:, self.feature_idx] > self.feature_threshold) idxs_r = (X[:, self.feature_idx] <= self.feature_threshold) self.left.fit(X[idxs_l, :], y[idxs_l]) self.right.fit(X[idxs_r, :], y[idxs_r]) def __predict(self, x): if self.feature_idx == -1: return self.value if x[self.feature_idx] > self.feature_threshold: return self.left.__predict(x) else: return self.right.__predict(x) def predict(self, X): y = np.zeros(X.shape[0]) for i in range(X.shape[0]): y[i] = self.__predict(X[i]) return y
%%time A = RegressionTreeFastMse(4, min_size=5) A.fit(X,y) test_mytree = A.predict(X) test_mytree
%%time model = DecisionTreeRegressor(max_depth=4) model.fit(X,y) test_sklearn = model.predict(X)
%load_ext Cython %%cython -a import itertools import numpy as np cimport numpy as np from itertools import * cdef class RegressionTreeCython: cdef public int max_depth cdef public int feature_idx cdef public int min_size cdef public int averages cdef public np.float64_t feature_threshold cdef public np.float64_t value cpdef RegressionTreeCython left cpdef RegressionTreeCython right def __init__(self, max_depth=3, min_size=4, averages=1): self.max_depth = max_depth self.min_size = min_size self.value = 0 self.averages = averages self.feature_idx = -1 self.feature_threshold = 0 self.left = None self.right = None def data_transform(self, np.ndarray[np.float64_t, ndim=2] X, list index_tuples): # преобразование данных - дополнение новыми признаками в виде суммы for i in index_tuples: # добавляем суммы, индексы которых переданы в качестве аргумента X = np.hstack((X, X[:, i[0]:(i[1]+1)].sum(axis=1).reshape(X.shape[0],1))) return X def fit(self, np.ndarray[np.float64_t, ndim=2] X, np.ndarray[np.float64_t, ndim=1] y): cpdef np.float64_t mean1 = 0.0 cpdef np.float64_t mean2 = 0.0 cpdef long N = X.shape[0] cpdef long N1 = X.shape[0] cpdef long N2 = 0 cpdef np.float64_t delta1 = 0.0 cpdef np.float64_t delta2 = 0.0 cpdef np.float64_t sm1 = 0.0 cpdef np.float64_t sm2 = 0.0 cpdef list index_tuples cpdef list stuff cpdef long idx = 0 cpdef np.float64_t prev_error1 = 0.0 cpdef np.float64_t prev_error2 = 0.0 cpdef long thres = 0 cpdef np.float64_t error = 0.0 cpdef np.ndarray[long, ndim=1] idxs cpdef np.float64_t x = 0.0 # такую процедуру необходимо сделать только один раз # генерируем индексы, по которым суммируем признаки if self.averages: stuff = list(range(0,X.shape[1],1)) index_tuples = list(combinations(stuff,2)) # выполняем преобразование данных X = self.data_transform(X, index_tuples) # начальное значение - среднее значение y self.value = y.mean() # начальная ошибка - mse между значением в листе (пока нет разбиения, # это среднее по всем объектам) и объектами base_error = ((y - self.value) ** 2).sum() error = base_error flag = 0 # пришли на максимальную глубину if self.max_depth <= 1: return dim_shape = X.shape[1] left_value, right_value = 0, 0 for feat in range(dim_shape): prev_error1, prev_error2 = base_error, 0 idxs = np.argsort(X[:, feat]) # переменные для быстрого переброса суммы mean1, mean2 = y.mean(), 0 sm1, sm2 = y.sum(), 0 N = X.shape[0] N1, N2 = N, 0 thres = 1 while thres < N - 1: N1 -= 1 N2 += 1 idx = idxs[thres] x = X[idx, feat] # вычисляем дельты - по ним в основном будет делаться переброс delta1 = (sm1 - y[idx]) * 1.0 / N1 - mean1 delta2 = (sm2 + y[idx]) * 1.0 / N2 - mean2 # увеличиваем суммы sm1 -= y[idx] sm2 += y[idx] # пересчитываем ошибки за O(1) prev_error1 += (delta1**2) * N1 prev_error1 -= (y[idx] - mean1)**2 prev_error1 -= 2 * delta1 * (sm1 - mean1 * N1) mean1 = sm1/N1 prev_error2 += (delta2**2) * N2 prev_error2 += (y[idx] - mean2)**2 prev_error2 -= 2 * delta2 * (sm2 - mean2 * N2) mean2 = sm2/N2 # пропускаем близкие друг к другу значения if thres < N - 1 and np.abs(x - X[idxs[thres + 1], feat]) < 1e-5: thres += 1 continue # 2 условия осуществления сплита - уменьшение ошибки # и минимальное кол-во элементов в в каждом листе if (prev_error1 + prev_error2 < error): if (min(N1,N2) > self.min_size): # переопределяем самый лучший признак и границу по нему self.feature_idx, self.feature_threshold = feat, x # переопределяем значения в листах left_value, right_value = mean1, mean2 # флаг - значит сделали хороший сплит flag = 1 error = prev_error1 + prev_error2 thres += 1 # self.feature_idx - индекс самой крутой разделяющей фичи. # Если это какая-то из сумм, и если есть какое-то экспертное знание # о данных, то интересно посмотреть, что значит эта сумма # ничего не разделили, выходим if self.feature_idx == -1: return # вызываем потомков дерева self.left = RegressionTreeCython(self.max_depth - 1, averages=0) self.left.value = left_value self.right = RegressionTreeCython(self.max_depth - 1, averages=0) self.right.value = right_value # новые индексы для обучения потомков idxs_l = (X[:, self.feature_idx] > self.feature_threshold) idxs_r = (X[:, self.feature_idx] <= self.feature_threshold) # обучение потомков self.left.fit(X[idxs_l, :], y[idxs_l]) self.right.fit(X[idxs_r, :], y[idxs_r]) def __predict(self, np.ndarray[np.float64_t, ndim=1] x): if self.feature_idx == -1: return self.value if x[self.feature_idx] > self.feature_threshold: return self.left.__predict(x) else: return self.right.__predict(x) def predict(self, np.ndarray[np.float64_t, ndim=2] X): # чтобы делать предикты, нужно также добавить суммы в тестовую выборку if self.averages: stuff = list(range(0,X.shape[1],1)) index_tuples = list(itertools.combinations(stuff,2)) X = self.data_transform(X, index_tuples) y = np.zeros(X.shape[0]) for i in range(X.shape[0]): y[i] = self.__predict(X[i]) return y
from sklearn.model_selection import KFold def get_metrics(X,y,n_folds=2, model=None): kf = KFold(n_splits=n_folds, shuffle=True) kf.get_n_splits(X) er_list = [] for train_index, test_index in kf.split(X): X_train, X_test = X[train_index], X[test_index] y_train, y_test = y[train_index], y[test_index] model.fit(X_train,y_train) predict = model.predict(X_test) er_list.append(mse(y_test, predict)) return er_list
import matplotlib.pyplot as plt data = datasets.fetch_california_housing() X = np.array(data.data) y = np.array(data.target) er_sklearn_tree = get_metrics(X,y,30,DecisionTreeRegressor(max_depth=4, min_samples_leaf=10)) er_fast_mse_tree = get_metrics(X,y,30,RegressionTreeFastMse(4, min_size=10)) er_averages_tree = get_metrics(X,y,30,RegressionTreeCython(4, min_size=10)) %matplotlib inline data = [er_sklearn_tree, er_fast_mse_tree, er_averages_tree] fig7, ax7 = plt.subplots() ax7.set_title('') ax7.boxplot(data, labels=['Sklearn Tree', 'Fast Mse Tree', 'Averages Tree']) plt.grid() plt.show()
Source: https://habr.com/ru/post/438560/