From 23eb72d090e858a0f2e746e8c338fc9b4a9f8671 Mon Sep 17 00:00:00 2001 From: syropiatovvv Date: Sat, 29 Nov 2025 12:50:24 +0300 Subject: [PATCH] =?UTF-8?q?wip:=20lab=5F4=20=D0=B4=D0=BE=20=D0=B4=D0=B0?= =?UTF-8?q?=D1=88=D0=B1=D0=BE=D1=80=D0=B4=D0=B0=20=D0=B2=20Prometheus?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- services/compose.yaml | 26 ++ services/load_tester/.dockerignore | 6 + services/load_tester/Dockerfile | 13 + services/load_tester/requirements.txt | 1 + services/load_tester/tester.py | 280 ++++++++++++++++++ services/load_tester/tester_wrapper.py.unused | 37 +++ services/ml_service/Dockerfile | 4 +- services/ml_service/app/main.py | 8 + services/ml_service/app/predictor.py | 34 ++- services/ml_service/requirements.txt | 2 + services/prometheus/prometheus.yaml | 15 + 11 files changed, 422 insertions(+), 4 deletions(-) create mode 100644 services/compose.yaml create mode 100644 services/load_tester/.dockerignore create mode 100644 services/load_tester/Dockerfile create mode 100644 services/load_tester/requirements.txt create mode 100644 services/load_tester/tester.py create mode 100644 services/load_tester/tester_wrapper.py.unused create mode 100644 services/prometheus/prometheus.yaml diff --git a/services/compose.yaml b/services/compose.yaml new file mode 100644 index 0000000..09c5f6c --- /dev/null +++ b/services/compose.yaml @@ -0,0 +1,26 @@ +name: mpei-iis-system + +services: + + prices-predictor: + image: ml_service:2 + ports: + - "8010:8000" + volumes: + - './models:/models' + + load-tester: + image: load_tester:1 + environment: + API_BASE_URL: "http://prices-predictor:8000/api" + deploy: + replicas: 2 + + prometheus: + image: prom/prometheus:v3.7.3 + ports: + - "9090:9090" + command: + - "--config.file=/etc/prometheus/prometheus.yaml" + volumes: + - "./prometheus/prometheus.yaml:/etc/prometheus/prometheus.yaml:ro" diff --git a/services/load_tester/.dockerignore b/services/load_tester/.dockerignore new file mode 100644 index 0000000..52d6be7 --- /dev/null +++ b/services/load_tester/.dockerignore @@ -0,0 +1,6 @@ +### Python +__pycache__/ +*.pyc + +### Project +*.unused diff --git a/services/load_tester/Dockerfile b/services/load_tester/Dockerfile new file mode 100644 index 0000000..6874810 --- /dev/null +++ b/services/load_tester/Dockerfile @@ -0,0 +1,13 @@ +FROM python:3.11-slim + +WORKDIR /load_tester + +COPY ./requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY . . + +CMD ["python", "-m", "tester"] + +# docker build -t load_tester:1 services/load_tester/ +# docker run -e "API_BASE_URL=http://prices-predictor:8000/api" load_tester:1 diff --git a/services/load_tester/requirements.txt b/services/load_tester/requirements.txt new file mode 100644 index 0000000..d4743e3 --- /dev/null +++ b/services/load_tester/requirements.txt @@ -0,0 +1 @@ +requests >=2.32.5,<3 diff --git a/services/load_tester/tester.py b/services/load_tester/tester.py new file mode 100644 index 0000000..cecf6b2 --- /dev/null +++ b/services/load_tester/tester.py @@ -0,0 +1,280 @@ +from argparse import ArgumentParser +from collections.abc import Callable, MutableMapping +from dataclasses import dataclass, asdict +from enum import Enum +import logging +from os import getenv +from random import randint, uniform, expovariate, choice +from signal import SIGINT, SIGTERM, signal +import sys +from time import sleep +from types import FrameType +from typing import Any, cast + +from requests import RequestException, Response, Session + + +def fixup_payload_enum_value(mapping: MutableMapping[str, Any], key: str) -> None: + mapping[key] = mapping[key].value + + +ENDPOINT_URL: str = '/predict' + + +class FuelType(Enum): + PETROL = 'petrol' + DIESEL = 'diesel' + CNG = 'cng' + + +class SellingType(Enum): + DEALER = 'dealer' + INDIVIDUAL = 'individual' + + +class TransmissionType(Enum): + MANUAL = 'manual' + AUTOMATIC = 'automatic' + + +@dataclass +class PricePredictionFeatures: + selling_price: float + driven_kms: float + age: float + fuel_type: FuelType + selling_type: SellingType + transmission_type: TransmissionType + + +MAX_RETRIES_DEFAULT = 3 + + +def exp_delay_from_attempt_number(attempt_i: int) -> float: + return 0.2 * (2 ** attempt_i) + + +def post_item( + session: Session, url: str, item_id: int, features: PricePredictionFeatures, + *, max_retries: int = MAX_RETRIES_DEFAULT, +) -> Response: + if max_retries < 0: + raise ValueError('max_retries must be >= 0') + payload = asdict(features) + for k in ('fuel_type', 'selling_type', 'transmission_type'): + fixup_payload_enum_value(payload, k) + excs = [] + for attempt_i in range(max_retries + 1): + try: + response = session.post(url, params={'item_id': item_id}, json=payload, timeout=10) + except RequestException as err: + excs.append(err) + sleep(exp_delay_from_attempt_number(attempt_i)) + else: + return response + assert len(excs) > 0 + # XXX: ... + raise IOError( + f'Failed to post an item in {max_retries + 1} attempts;' + ' see the latest exception in __cause__' + ) from excs[-1] + + +def generate_request_data() -> tuple[int, PricePredictionFeatures]: + item_id = randint(1, 100) + features = PricePredictionFeatures( + selling_price=round(uniform(2.0, 16.0), 2), + driven_kms=round(uniform(0.0, 100000.0), 0), + age=round(uniform(0.0, 10.0), 1), + fuel_type=choice(list(FuelType)), + selling_type=choice(list(SellingType)), + transmission_type=choice(list(TransmissionType)), + ) + return (item_id, features) + + +INTERVAL_MEAN_DEFAULT = 4.0 +INTERVAL_BOUNDS_DEFAULT: tuple[float | None, float | None] = (0.5, 10.0) + + +class Requester: + + def __init__( + self, + base_url: str, + interval_mean: float = INTERVAL_MEAN_DEFAULT, + interval_bounds: tuple[float | None, float | None] = INTERVAL_BOUNDS_DEFAULT, + *, max_retries: int = MAX_RETRIES_DEFAULT, + ): + self.base_url = base_url + self.interval_mean = interval_mean + self.interval_bounds = interval_bounds + self.max_retries = max_retries + self._session = Session() + self._stop_requested: bool = False + + @property + def endpoint(self) -> str: + endpoint_url = ENDPOINT_URL + if (len(endpoint_url) > 0) and (not endpoint_url.startswith('/')): + endpoint_url = '/' + endpoint_url + return (self.base_url + endpoint_url) + + @property + def session(self) -> Session: + return self._session + + @property + def stop_requested(self) -> bool: + return self._stop_requested + + def stop(self) -> None: + self._stop_requested = True + + def _decide_delay(self) -> float: + interval_bounds = self.interval_bounds + val = expovariate(1. / self.interval_mean) + if interval_bounds[0] is not None: + val = max(val, interval_bounds[0]) + if interval_bounds[1] is not None: + val = min(val, interval_bounds[1]) + return val + + def run(self) -> None: + while not self._stop_requested: + item_id, features = generate_request_data() + try: + response = post_item( + self._session, self.endpoint, item_id, features, max_retries=self.max_retries, + ) + except IOError as err: + logging.warning('%s: %s', str(err), str(err.__cause__)) + raise err + else: + logging.debug('Success: %s %s', response.status_code, response.reason) + sleep(self._decide_delay()) + + +def _build_termination_handler(requester: Requester) -> Callable[[int, FrameType | None], None]: + def termination_handler(sig: int, frame: FrameType | None) -> None: + _ = sig + _ = frame + requester.stop() + return termination_handler + + +def _configure_logging(level: int, quiet: bool) -> None: + if quiet: + level = logging.CRITICAL + 1 + logging.basicConfig( + level=level, format='%(asctime)s %(levelname)s %(message)s', stream=sys.stderr, + ) + + +def _setup_signal_handlers(requester: Requester) -> None: + termination_handler = _build_termination_handler(requester) + for sig in (SIGINT, SIGTERM): + signal(sig, termination_handler) + + +def _validate_cli_interval_bound(string: str) -> float | None: + string = string.lower() + if string in ('', 'null', 'none'): + return None + return float(string) + + +def _validate_cli_interval_bounds(string: str) -> tuple[float | None, float | None]: + string = string.lower() + if string in ('', 'null', 'none'): + return (None, None) + min_string, max_string = string.split(',', 1) + return cast( + tuple[float | None, float | None], + tuple(map(_validate_cli_interval_bound, (min_string, max_string))) + ) + + +def _validate_cli_max_retries(string: str) -> int: + val = int(string) + if val < 0: + raise ValueError(f'Max retries should be >=0, given {val}') + return val + + +def _validate_cli_logging_level(string: str) -> int: + return { + 'debug': logging.DEBUG, + 'info': logging.INFO, + 'warning': logging.WARNING, + 'error': logging.ERROR, + 'critical': logging.CRITICAL, + }[string] + + +def parse_args(argv): + parser = ArgumentParser( + description=( + 'Регулярная отправка POST-запросов на эндпоинт предсказания цены.' + ' Остановка по SIGINT / SIGTERM.' + ), + allow_abbrev=False, + exit_on_error=True, + ) + parser.add_argument('base_url', type=str, nargs='?') + parser.add_argument('--interval-mean', type=float, dest='interval_mean') + parser.add_argument( + '--interval-bounds', type=_validate_cli_interval_bounds, dest='interval_bounds', + ) + parser.add_argument( + '--max-retries', + type=_validate_cli_max_retries, + default=MAX_RETRIES_DEFAULT, + dest='max_retries', + ) + parser.add_argument('-q', '--quiet', action='store_true', dest='quiet') + parser.add_argument( + '--log-level', + default=logging.WARNING, + type=_validate_cli_logging_level, + dest='logging_level', + ) + args = parser.parse_args(argv[1:]) + if args.base_url is None: + args.base_url = getenv('API_BASE_URL') + if args.base_url is None: + raise RuntimeError('No API base URL specified') + if (args.interval_mean is not None) and (args.interval_mean <= 0): + raise ValueError(f'Interval mean should be > 0, given {args.interval_mean}') + if ( + (args.interval_bounds is not None) + and all((b is not None) for b in args.interval_bounds) + and (args.interval_bounds[0] > args.interval_bounds[1]) + ): + raise ValueError(f'Interval bounds should be b_1 <= b_2, given {args.interval_bounds!r}') + if args.interval_mean is not None: + if args.interval_bounds is None: + args.interval_bounds = ((args.interval_mean / 5), (args.interval_mean * 5)) + else: + args.interval_mean = INTERVAL_MEAN_DEFAULT + args.interval_bounds = INTERVAL_BOUNDS_DEFAULT + return args + + +def main(argv): + args = parse_args(argv) + _configure_logging(args.logging_level, args.quiet) + logging.debug('Creating a Requester with base URL: %s', args.base_url) + requester = Requester( + args.base_url, + interval_mean=args.interval_mean, + interval_bounds=args.interval_bounds, + max_retries=args.max_retries, + ) + _setup_signal_handlers(requester) + requester.run() + return 0 + + +if __name__ == '__main__': + sys.exit(int(main(sys.argv) or 0)) diff --git a/services/load_tester/tester_wrapper.py.unused b/services/load_tester/tester_wrapper.py.unused new file mode 100644 index 0000000..f6ad759 --- /dev/null +++ b/services/load_tester/tester_wrapper.py.unused @@ -0,0 +1,37 @@ +from os import ( + getenv, + #kill, +) +#from signal import SIGINT, SIGTERM +from subprocess import ( + run, + #Popen, +) +import sys + + +#_child_proc: Popen | None = None + + +#def _forward_signal(sig, frame): +# _ = frame +# if _child_proc is None: +# return +# if _child_proc.pid in (SIGINT, SIGTERM): +# kill(_child_proc.pid, sig) +# else: +# raise RuntimeError(f'Attempted to forward an unexpected signal: {sig}') + + +def main(argv): + argv = list(argv) # copy + base_url = getenv('API_BASE_URL') + if base_url is None: + raise RuntimeError('API_BASE_URL is not specified') + argv.append(base_url) # HACK: ... + result = run(['python', '-m', 'tester', *argv[1:]], check=False) + return result.returncode + + +if __name__ == '__main__': + sys.exit(int(main(sys.argv) or 0)) diff --git a/services/ml_service/Dockerfile b/services/ml_service/Dockerfile index 5a32563..5cd9c6e 100644 --- a/services/ml_service/Dockerfile +++ b/services/ml_service/Dockerfile @@ -15,5 +15,5 @@ ENV MODELS_PATH=/models CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000"] -# docker build -t ml_service:1 services/ml_service/ -# docker run -v "$(pwd)/services/models:/models" -p 8000:8000 ml_service:1 +# docker build -t ml_service:2 services/ml_service/ +# docker run -v "$(pwd)/services/models:/models" -p 8000:8000 ml_service:2 diff --git a/services/ml_service/app/main.py b/services/ml_service/app/main.py index 3ad2635..b4b3762 100644 --- a/services/ml_service/app/main.py +++ b/services/ml_service/app/main.py @@ -2,6 +2,7 @@ from os import getenv from pathlib import Path from fastapi import FastAPI +from prometheus_fastapi_instrumentator import Instrumentator from pydantic import BaseModel, Field from ._meta import PACKAGE_PATH @@ -35,6 +36,13 @@ app = FastAPI( ) +_ = ( + Instrumentator(excluded_handlers=['/metrics']) + .instrument(app) + .expose(app, endpoint='/metrics') +) + + @app.get('/', summary='Тестовый эндпоинт') async def root(): return {'Hello': 'World'} diff --git a/services/ml_service/app/predictor.py b/services/ml_service/app/predictor.py index 871522a..1bd810b 100644 --- a/services/ml_service/app/predictor.py +++ b/services/ml_service/app/predictor.py @@ -1,7 +1,9 @@ from dataclasses import dataclass from enum import Enum +from itertools import chain from pandas import DataFrame from pickle import load +from prometheus_client import Counter, Histogram def open_model_file(file, *, buffering=-1, opener=None, **kwargs_extra): @@ -61,6 +63,27 @@ class PricePredictionFeatures: transmission_type: TransmissionType +metric_prediction_latency = Histogram( + 'model_prediction_seconds', 'Время вычислений в модели', + buckets=( + list(chain.from_iterable((v * (10 ** p) for v in (1, 2, 5)) for p in range(-4, (1 + 1)))) + + [float('+inf')] + ), +) + +metric_prediction_errors = Counter( + 'model_prediction_errors_total', 'Ошибки вычислений в модели по типу', ('error_type',), +) + +metric_prediction_value = Histogram( + 'model_prediction_value', 'Предсказанное значение цены', + buckets=( + list(chain.from_iterable((v * (10 ** p) for v in (1, 2, 5)) for p in range(-1, (2 + 1)))) + + [float('+inf')] + ), +) + + class PricePredictor: def __init__(self, model_path): @@ -76,6 +99,13 @@ class PricePredictor: 'transmission': features.transmission_type.value, 'age': features.age, }]) - predictions = self._model.predict(features_df) + try: + with metric_prediction_latency.time(): + predictions = self._model.predict(features_df) + except Exception as err: + metric_prediction_errors.labels(error_type=type(err).__name__).inc() + raise assert len(predictions) == 1 - return float(predictions[0]) + value = float(predictions[0]) + metric_prediction_value.observe(value) + return value diff --git a/services/ml_service/requirements.txt b/services/ml_service/requirements.txt index 15d1e05..9a96738 100644 --- a/services/ml_service/requirements.txt +++ b/services/ml_service/requirements.txt @@ -1,5 +1,7 @@ fastapi ~=0.120.4 mlxtend ~=0.23.4 pandas >=2.3.1,<3 +prometheus_client ~=0.23.1 +prometheus_fastapi_instrumentator >=7.0.2,<8 scikit-learn >=1.7.2,<2 uvicorn ~=0.38.0 diff --git a/services/prometheus/prometheus.yaml b/services/prometheus/prometheus.yaml new file mode 100644 index 0000000..55ffdc8 --- /dev/null +++ b/services/prometheus/prometheus.yaml @@ -0,0 +1,15 @@ +global: + scrape_interval: 15s + scrape_timeout: 5s + +scrape_configs: + + - job_name: "prices_predictor" + static_configs: + - targets: + - "prices-predictor:8000" + scheme: http + metrics_path: "/metrics" + #relabel_configs: + # - source_labels: ["__address__"] + # target_labels: "instance"