31 KiB
import os
import mlflow
from sklearn.model_selection import train_test_split
import pandas as pd
import numpy
from sklearn.preprocessing import StandardScaler, OrdinalEncoder, OneHotEncoder
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.ensemble import RandomForestRegressor
from sklearn.linear_model import LinearRegression
from catboost import CatBoostRegressor
from sklearn.metrics import mean_absolute_percentage_error, mean_absolute_error, mean_squared_error
= pd.read_pickle('data/clean_data.pkl').sample(frac=0.1, random_state = 2) # Уменьшаем размер чтобы модель быстрее обучалась на лекции
df df.info()
= df.rename(columns={'price': 'target'})
df = df.drop(columns=['date', 'time']) df
df
= train_test_split(df.drop('target', axis=1), df['target'], test_size=0.25, random_state=2) X_train, X_test, y_train, y_test
= X_train.select_dtypes(include=['category','object']).columns.to_list()
cat_features cat_features
= X_train.select_dtypes(include=['number']).columns.to_list()
num_features num_features
https://scikit-learn.org/stable/api/sklearn.preprocessing.html - разные способы кодирования и скалирования
= StandardScaler()
s_scaler = OrdinalEncoder(handle_unknown='use_encoded_value', unknown_value=99999999) # unknown_value нужно выбирать с умом
l_encoder = CatBoostRegressor() regressor
Column transformer
# Для удобной работы со столбцами
= ColumnTransformer(
preprocessor =[
transformers'num', s_scaler, num_features), # преобразования для числовых признаков
('cat', l_encoder, cat_features), # преобразования для категориальных признаков
(
],='drop' ) # Удаляем столбцы, которые не затронуты преобразования remainder
= Pipeline(steps=[('preprocessor', preprocessor),
pipeline 'model', regressor)])
(
pipeline.fit(X_train, y_train)
= pipeline.predict(X_test)
predictions
= {}
metrics "mae"] = mean_absolute_error(y_test, predictions)
metrics["mape"] = mean_absolute_percentage_error(y_test, predictions)
metrics["mse"] = mean_squared_error(y_test, predictions)
metrics[
metrics
# Работаем с MLflow локально
= "127.0.0.1"
TRACKING_SERVER_HOST = 5000
TRACKING_SERVER_PORT
= f"http://{TRACKING_SERVER_HOST}:{TRACKING_SERVER_PORT}"
registry_uri = f"http://{TRACKING_SERVER_HOST}:{TRACKING_SERVER_PORT}"
tracking_uri
mlflow.set_tracking_uri(tracking_uri)
mlflow.set_registry_uri(registry_uri)
# название тестового эксперимента, запуска (run) внутри него, имени, под которым модель будет регистрироваться
= "estate_project"
EXPERIMENT_NAME = "baseline model"
RUN_NAME = "estate_model_rf" REGISTRY_MODEL_NAME
Логируем вручную
# Обязательно логируем сигнатуру модели и пример входных данных. Подготовим их
from mlflow.models import infer_signature
= infer_signature(model_input = X_train.head(5))
signature = X_train.head(5) input_example
# Будем логировать requirements и артефакт - текстовый файл
= 'requirements.txt'
req_file = 'comment.txt' art
# Параметры, котороые будут залогированы, можем задавать вручную или полностью взять из модели
#params_dict = {'n_estimators': 10, 'max_depth': 10}
= pipeline.get_params() params_dict
# Когда создаем новый эксперимент, то:
= mlflow.create_experiment(EXPERIMENT_NAME)
experiment_id
# Впоследствии. чтобы добавлять запуски в этот же эксепримент мы должны получить его id:
#experiment_id = mlflow.get_experiment_by_name(EXPERIMENT_NAME).experiment_id
with mlflow.start_run(run_name=RUN_NAME, experiment_id=experiment_id) as run:
# получаем уникальный идентификатор запуска эксперимента
= run.info.run_id
run_id
mlflow.sklearn.log_model(pipeline, ="models",
artifact_path=signature,
signature=input_example,
input_example=req_file
pip_requirements
)
mlflow.log_metrics(metrics)
mlflow.log_artifact(art)
mlflow.log_params(params_dict)
= mlflow.get_run(run_id)
run assert (run.info.status =='FINISHED')
Удаление runs, experiments
Использовать осторожно
= mlflow.get_experiment_by_name(EXPERIMENT_NAME).experiment_id
experiment_id #mlflow.delete_experiment(experiment_id)
mlflow.search_runs(#experiment_ids=[experiment_id],
=[EXPERIMENT_NAME],
experiment_names# filter_string='status = "FAILED"'
#filter_string='metrics.mae > 1'
)
#mlflow.delete_run('74d2a7a40c07413c9cf65df841164356')
Автологирование
После включения будет срабатывать на каждом обучении модели (на методе fit()).
Есть плюсы, есть и минусы. Предлагается сделать прогон и сравнить с результатами вручную
mlflow.sklearn.autolog()
with mlflow.start_run(run_name='auto', experiment_id=experiment_id) as run:
pipeline.fit(X_train, y_train)
# Отключаем автологирование
=True) mlflow.sklearn.autolog(disable
Model #2
Обучим вторую "маленькую" модель
= RandomForestRegressor(n_estimators=10, max_depth=6) regressor2
= Pipeline(steps=[('preprocessor', preprocessor),
pipeline 'model', regressor2)])
(
pipeline.fit(X_train, y_train)
= pipeline.predict(X_test)
predictions = {}
metrics "mae"] = mean_absolute_error(y_test, predictions)
metrics["mape"] = mean_absolute_percentage_error(y_test, predictions)
metrics["mse"] = mean_squared_error(y_test, predictions)
metrics[
metrics
# !!! Проверить название прогона а также все логируемые параметры и артефакты, что они соответствуют второй "маленькой" модели.
= 'smaller_model'
RUN_NAME
= mlflow.get_experiment_by_name(EXPERIMENT_NAME).experiment_id
experiment_id
with mlflow.start_run(run_name=RUN_NAME, experiment_id=experiment_id) as run:
# получаем уникальный идентификатор запуска эксперимента
= run.info.run_id
run_id
mlflow.sklearn.log_model(pipeline, ="models",
artifact_path=signature,
signature=input_example,
input_example=req_file
pip_requirements
)
mlflow.log_metrics(metrics)
mlflow.log_artifact(art)
mlflow.log_params(pipeline.get_params())
= mlflow.get_run(run_id)
run assert (run.info.status =='FINISHED')
# No model
# Логировать можно только артефакты, без модели. Например, залогироавть графики после этапа EDA
= 'no_model'
RUN_NAME = mlflow.get_experiment_by_name(EXPERIMENT_NAME).experiment_id
experiment_id
with mlflow.start_run(run_name=RUN_NAME, experiment_id=experiment_id) as run:
= run.info.run_id
run_id
mlflow.log_artifact(art)
= mlflow.get_run(run_id)
run assert (run.info.status =='FINISHED')
= '06fa7ec1f1b74aedb3509c88dc4ee1c0' # Указываем run id
run_id f"runs:/{run_id}/models", REGISTRY_MODEL_NAME) mlflow.register_model(
# Можно регистрировать сразу при создании прогона
= mlflow.get_experiment_by_name(EXPERIMENT_NAME).experiment_id
experiment_id
with mlflow.start_run(run_name='register_at_run', experiment_id=experiment_id) as run:
# получаем уникальный идентификатор запуска эксперимента
= run.info.run_id
run_id
mlflow.sklearn.log_model(pipeline, ="models",
artifact_path=signature,
signature=input_example,
input_example=req_file,
pip_requirements= REGISTRY_MODEL_NAME # Указываем для какой модели регистрируем
registered_model_name
)
mlflow.log_metrics(metrics)
mlflow.log_artifact(art)
mlflow.log_params(pipeline.get_params())
= mlflow.get_run(run_id)
run assert (run.info.status =='FINISHED')
# Можно найти зарегистрированные модели
= mlflow.search_registered_models()
model_reg 0] model_reg[
= REGISTRY_MODEL_NAME
model_name = 1
model_version
= mlflow.sklearn.load_model(model_uri=f"models:/{model_name}/{model_version}") model_loaded
0:1]) model_loaded.predict(X_test.iloc[
0] y_test.iloc[
Feature engineering
Sklearn
from sklearn.preprocessing import QuantileTransformer, SplineTransformer, PolynomialFeatures, MinMaxScaler
= X_train.copy() X_train_sklearn
PolynomialFeatures
Создает полином степени degree
из указанных признаков
= PolynomialFeatures(degree=2) pf
X_train_sklearn
'area','kitchen_area']]) pf.fit_transform(X_train_sklearn[[
SplineTransformer
Cоздаёт новую матрицу признаков, состоящую из сплайнов порядка degree. Количество сгенерированных сплайнов равно n_splines=n_knots + degree - 1
для каждого признака, где
n_knots
определяет количество узлов (точек, в которых сопрягаются сплайны) для каждого признака.
degree
определяет порядок полинома, используемого для построения сплайнов.
= SplineTransformer(n_knots=3, degree=3) sp
'area']]) sp.fit_transform(X_train_sklearn[[
QuantileTransformer
Этот метод преобразует признаки, чтобы они распределялись равномерно или нормально — так данные меньше подвергаются влиянию выбросов. Преобразование применяется к каждому признаку независимо. Идея метода такова: оценить функцию распределения признака, чтобы преобразовать исходные значения в равномерное или нормальное распределение.
output_distribution='uniform'
или output_distribution='normal'
соответственно
Пример использования: если у вас есть данные о доходах с широким диапазоном значений, квантильное преобразование сделает их более сопоставимыми и устойчивыми к выбросам.
= QuantileTransformer() qt
'area']]) qt.fit_transform(X_train_sklearn[[
Объединяем в ColumnTransformer и создаем Pipeline
= PolynomialFeatures(degree=2)
pf = QuantileTransformer()
qt = SplineTransformer(n_knots=3, degree=3) sp
# Значения преобразованных признаков нужно отскейлить, поэтому создаем pipeline из двух шагов - преобразование и скейлинг
= Pipeline(steps=[
pf_pipeline 'poly', pf),
('scale', StandardScaler())
( ])
= ColumnTransformer(
preprocessor_sklearn =[
transformers'num', s_scaler, num_features), # преобразования для числовых признаков
('cat', l_encoder, cat_features), # преобразования для категориальных признаков
('quantile', qt,num_features),
('poly', pf_pipeline, ['area', 'kitchen_area']), # В преобразования добавляем созданный ранее pipeline
('spline', sp, ['area'])
(
],='drop',
remainder# Удаляем столбцы, которые не затронуты преобразования )
Посмотрим что из себя теперь представляет датафрейм
## не влезаем в float64 в полиномальном преобразовании. Использовать его нужно с умом!
'area', 'kitchen_area']] = X_train_sklearn[['area', 'kitchen_area']].astype('float128')
X_train_sklearn[['area', 'kitchen_area']] = X_train_sklearn[['area', 'kitchen_area']].astype('float128') X_train_sklearn[[
= preprocessor_sklearn.fit_transform(X_train_sklearn)
X_train_sklearn_raw = pd.DataFrame(X_train_sklearn_raw, columns=preprocessor_sklearn.get_feature_names_out()) X_train_sklearn
# Удобно использовать для отображения всех строк\столбцов в DataFrame
with pd.option_context('display.max_rows', 5, 'display.max_columns', None):
display (X_train_sklearn)
Создаем пайплайн с препроцессингом и моделью
= Pipeline(steps=[
pipeline_sklearn 'transform', preprocessor_sklearn),
('model', regressor)
(
])
= pipeline_sklearn.fit(X_train, y_train) model_sklearn
model_sklearn
= model_sklearn.predict(X_test)
predictions = {}
metrics "mae"] = mean_absolute_error(y_test, predictions)
metrics["mape"] = mean_absolute_percentage_error(y_test, predictions)
metrics["mse"] = mean_squared_error(y_test, predictions)
metrics[
metrics
= mlflow.get_experiment_by_name(EXPERIMENT_NAME).experiment_id
experiment_id = 'fe_sklearn'
RUN_NAME
with mlflow.start_run(run_name=RUN_NAME, experiment_id=experiment_id) as run:
# получаем уникальный идентификатор запуска эксперимента
= run.info.run_id
run_id
mlflow.sklearn.log_model(model_sklearn, ="models",
artifact_path=signature,
signature=input_example,
input_example=req_file
pip_requirements
)
mlflow.log_metrics(metrics)
mlflow.log_artifact(art)
mlflow.log_params(model_sklearn.get_params())
= mlflow.get_run(run_id)
run assert (run.info.status =='FINISHED')
Autofeat
from autofeat import AutoFeatRegressor
= ["1/", "exp", "log", "abs", "sqrt", "^2", "^3", "1+", "1-", "sin", "cos", "exp-", "2^"] transformations
= AutoFeatRegressor(verbose=1, feateng_steps=2, max_gb=8, transformations=["log", "sqrt"],feateng_cols=num_features)
afreg = afreg.fit_transform(X_train,y_train)
X_train_arf X_train_arf
# Создаем обертку, в которой добавляем метод get_feature_names_out() для получения названий признаков
import numpy as np
class AutoFeatWrapper():
def __init__(self, feateng_cols, feateng_steps=1, max_gb=16, transformations=["1/", "exp", "log"], n_jobs=-1, verbose=1):
self.feateng_cols = feateng_cols
self.feateng_steps = feateng_steps
self.max_gb = max_gb
self.transformations = transformations
self.n_jobs = n_jobs
self.afreg = AutoFeatRegressor(feateng_cols=self.feateng_cols,
=self.feateng_steps,
feateng_steps=self.max_gb,
max_gb=self.transformations,
transformations=self.n_jobs)
n_jobs
def fit(self, X, y=None):
self.afreg.fit(X, y)
return self
def transform(self, X):
return self.afreg.transform(X)
def get_feature_names_out(self, input_features=None):
# Преобразуем данные и возвращаем имена фичей из DataFrame
= self.afreg.transform(pd.DataFrame(np.zeros((1, len(self.feateng_cols))), columns=self.feateng_cols))
transformed_X return transformed_X.columns.tolist()
= Pipeline(steps=[
afreg_pipeline 'autofeat', AutoFeatWrapper( feateng_steps=2, max_gb=16, transformations=["log", "sqrt"],feateng_cols=num_features)),
('scaler', StandardScaler()),
( ])
= ColumnTransformer(
preprocessor_afr =[
transformers'num', s_scaler, num_features), # преобразования для числовых признаков
('cat', l_encoder, cat_features), # преобразования для категориальных признаков
('afr', afreg_pipeline, num_features), # преобразования autofeat
(
],='drop', # Удаляем столбцы, которые не затронуты преобразованиями
remainder )
= preprocessor_afr.fit_transform(X_train,y_train)
X_train_afr_raw = pd.DataFrame(X_train_afr_raw, columns=preprocessor_afr.get_feature_names_out()) X_train_afr
with pd.option_context('display.max_rows', 5, 'display.max_columns', None):
display (X_train_afr)
= Pipeline(steps=[('preprocessor', preprocessor_afr),
pipeline_afr 'model', regressor)])
(
pipeline_afr.fit(X_train, y_train)
= pipeline_afr.predict(X_test)
predictions
= {}
metrics "mae"] = mean_absolute_error(y_test, predictions)
metrics["mape"] = mean_absolute_percentage_error(y_test, predictions)
metrics["mse"] = mean_squared_error(y_test, predictions)
metrics[
metrics
= mlflow.get_experiment_by_name(EXPERIMENT_NAME).experiment_id
experiment_id
with mlflow.start_run(run_name='autofeat', experiment_id=experiment_id) as run:
# получаем уникальный идентификатор запуска эксперимента
= run.info.run_id
run_id
mlflow.sklearn.log_model(pipeline_afr, ="models",
artifact_path=signature,
signature=input_example,
input_example=req_file
pip_requirements
)
mlflow.log_metrics(metrics)
mlflow.log_artifact(art)
mlflow.log_params(pipeline_afr.get_params())
= mlflow.get_run(run_id)
run assert (run.info.status =='FINISHED')