Сравнить коммиты
8 Коммитов
99d81722c4
...
3f4db4c51f
| Автор | SHA1 | Дата |
|---|---|---|
|
|
3f4db4c51f | 1 месяц назад |
|
|
5e4822b81a | 1 месяц назад |
|
|
f00c32018d | 1 месяц назад |
|
|
d92efac263 | 1 месяц назад |
|
|
bd8c30cf76 | 1 месяц назад |
|
|
dc5bfe5937 | 1 месяц назад |
|
|
23eb72d090 | 1 месяц назад |
|
|
322e02aa0b | 1 месяц назад |
@ -0,0 +1,2 @@
|
||||
GF_SECURITY_ADMIN_USER=admin
|
||||
GF_SECURITY_ADMIN_PASSWORD=admin
|
||||
@ -0,0 +1,64 @@
|
||||
name: mpei-iis-system
|
||||
|
||||
services:
|
||||
|
||||
prices-predictor:
|
||||
image: ml_service:2
|
||||
ports:
|
||||
- "8010:8000"
|
||||
volumes:
|
||||
- './models:/models'
|
||||
|
||||
load-tester:
|
||||
image: load_tester:1
|
||||
environment:
|
||||
API_BASE_URL: "http://prices-predictor:8000/api"
|
||||
# XXX: Предотвращает аварийный выход тестера при отсутствии ответа от prices-predictor
|
||||
# во время его (потенциально долгого) запуска.
|
||||
depends_on:
|
||||
- prices-predictor
|
||||
deploy:
|
||||
replicas: 2
|
||||
profiles:
|
||||
- "with-testers"
|
||||
|
||||
prometheus:
|
||||
image: prom/prometheus:v3.7.3
|
||||
ports:
|
||||
- "9090:9090"
|
||||
user: nobody
|
||||
command:
|
||||
- "--config.file=/etc/prometheus/prometheus.yaml"
|
||||
volumes:
|
||||
- "./prometheus/prometheus.yaml:/etc/prometheus/prometheus.yaml:ro"
|
||||
- "prometheus-storage:/prometheus"
|
||||
|
||||
grafana:
|
||||
image: grafana/grafana:12.4.0-20012734117
|
||||
ports:
|
||||
- "3000:3000"
|
||||
#environment:
|
||||
# GF_SECURITY_ADMIN_USER: "$__file{/run/secrets/grafana-admin-user}"
|
||||
# GF_SECURITY_ADMIN_PASSWORD: "$__file{/run/secrets/grafana-admin-password}"
|
||||
#secrets:
|
||||
# - grafana-admin-user
|
||||
# - grafana-admin-password
|
||||
environment:
|
||||
GF_SECURITY_ADMIN_USER: "${GF_SECURITY_ADMIN_USER:-admin}"
|
||||
GF_SECURITY_ADMIN_PASSWORD: "${GF_SECURITY_ADMIN_PASSWORD:-admin}"
|
||||
volumes:
|
||||
- "grafana-storage:/var/lib/grafana"
|
||||
|
||||
volumes:
|
||||
|
||||
prometheus-storage: {}
|
||||
|
||||
grafana-storage: {}
|
||||
|
||||
#secrets:
|
||||
#
|
||||
# grafana-admin-user:
|
||||
# environment: GF_SECURITY_ADMIN_USER
|
||||
#
|
||||
# grafana-admin-password:
|
||||
# environment: GF_SECURITY_ADMIN_PASSWORD
|
||||
@ -0,0 +1,6 @@
|
||||
### Python
|
||||
__pycache__/
|
||||
*.pyc
|
||||
|
||||
### Project
|
||||
*.unused
|
||||
@ -0,0 +1,13 @@
|
||||
FROM python:3.11-slim
|
||||
|
||||
WORKDIR /load_tester
|
||||
|
||||
COPY ./requirements.txt .
|
||||
RUN pip install --no-cache-dir -r requirements.txt
|
||||
|
||||
COPY . .
|
||||
|
||||
CMD ["python", "-m", "tester"]
|
||||
|
||||
# docker build -t load_tester:1 services/load_tester/
|
||||
# docker run -e "API_BASE_URL=http://prices-predictor:8000/api" load_tester:1
|
||||
@ -0,0 +1 @@
|
||||
requests >=2.32.5,<3
|
||||
@ -0,0 +1,280 @@
|
||||
from argparse import ArgumentParser
|
||||
from collections.abc import Callable, MutableMapping
|
||||
from dataclasses import dataclass, asdict
|
||||
from enum import Enum
|
||||
import logging
|
||||
from os import getenv
|
||||
from random import randint, uniform, expovariate, choice
|
||||
from signal import SIGINT, SIGTERM, signal
|
||||
import sys
|
||||
from time import sleep
|
||||
from types import FrameType
|
||||
from typing import Any, cast
|
||||
|
||||
from requests import RequestException, Response, Session
|
||||
|
||||
|
||||
def fixup_payload_enum_value(mapping: MutableMapping[str, Any], key: str) -> None:
|
||||
mapping[key] = mapping[key].value
|
||||
|
||||
|
||||
ENDPOINT_URL: str = '/predict'
|
||||
|
||||
|
||||
class FuelType(Enum):
|
||||
PETROL = 'petrol'
|
||||
DIESEL = 'diesel'
|
||||
CNG = 'cng'
|
||||
|
||||
|
||||
class SellingType(Enum):
|
||||
DEALER = 'dealer'
|
||||
INDIVIDUAL = 'individual'
|
||||
|
||||
|
||||
class TransmissionType(Enum):
|
||||
MANUAL = 'manual'
|
||||
AUTOMATIC = 'automatic'
|
||||
|
||||
|
||||
@dataclass
|
||||
class PricePredictionFeatures:
|
||||
selling_price: float
|
||||
driven_kms: float
|
||||
age: float
|
||||
fuel_type: FuelType
|
||||
selling_type: SellingType
|
||||
transmission_type: TransmissionType
|
||||
|
||||
|
||||
MAX_RETRIES_DEFAULT = 3
|
||||
|
||||
|
||||
def exp_delay_from_attempt_number(attempt_i: int) -> float:
|
||||
return 0.2 * (2 ** attempt_i)
|
||||
|
||||
|
||||
def post_item(
|
||||
session: Session, url: str, item_id: int, features: PricePredictionFeatures,
|
||||
*, max_retries: int = MAX_RETRIES_DEFAULT,
|
||||
) -> Response:
|
||||
if max_retries < 0:
|
||||
raise ValueError('max_retries must be >= 0')
|
||||
payload = asdict(features)
|
||||
for k in ('fuel_type', 'selling_type', 'transmission_type'):
|
||||
fixup_payload_enum_value(payload, k)
|
||||
excs = []
|
||||
for attempt_i in range(max_retries + 1):
|
||||
try:
|
||||
response = session.post(url, params={'item_id': item_id}, json=payload, timeout=10)
|
||||
except RequestException as err:
|
||||
excs.append(err)
|
||||
sleep(exp_delay_from_attempt_number(attempt_i))
|
||||
else:
|
||||
return response
|
||||
assert len(excs) > 0
|
||||
# XXX: ...
|
||||
raise IOError(
|
||||
f'Failed to post an item in {max_retries + 1} attempts;'
|
||||
' see the latest exception in __cause__'
|
||||
) from excs[-1]
|
||||
|
||||
|
||||
def generate_request_data() -> tuple[int, PricePredictionFeatures]:
|
||||
item_id = randint(1, 100)
|
||||
features = PricePredictionFeatures(
|
||||
selling_price=round(uniform(2.0, 16.0), 2),
|
||||
driven_kms=round(uniform(0.0, 100000.0), 0),
|
||||
age=round(uniform(0.0, 10.0), 1),
|
||||
fuel_type=choice(list(FuelType)),
|
||||
selling_type=choice(list(SellingType)),
|
||||
transmission_type=choice(list(TransmissionType)),
|
||||
)
|
||||
return (item_id, features)
|
||||
|
||||
|
||||
INTERVAL_MEAN_DEFAULT = 4.0
|
||||
INTERVAL_BOUNDS_DEFAULT: tuple[float | None, float | None] = (0.5, 10.0)
|
||||
|
||||
|
||||
class Requester:
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
base_url: str,
|
||||
interval_mean: float = INTERVAL_MEAN_DEFAULT,
|
||||
interval_bounds: tuple[float | None, float | None] = INTERVAL_BOUNDS_DEFAULT,
|
||||
*, max_retries: int = MAX_RETRIES_DEFAULT,
|
||||
):
|
||||
self.base_url = base_url
|
||||
self.interval_mean = interval_mean
|
||||
self.interval_bounds = interval_bounds
|
||||
self.max_retries = max_retries
|
||||
self._session = Session()
|
||||
self._stop_requested: bool = False
|
||||
|
||||
@property
|
||||
def endpoint(self) -> str:
|
||||
endpoint_url = ENDPOINT_URL
|
||||
if (len(endpoint_url) > 0) and (not endpoint_url.startswith('/')):
|
||||
endpoint_url = '/' + endpoint_url
|
||||
return (self.base_url + endpoint_url)
|
||||
|
||||
@property
|
||||
def session(self) -> Session:
|
||||
return self._session
|
||||
|
||||
@property
|
||||
def stop_requested(self) -> bool:
|
||||
return self._stop_requested
|
||||
|
||||
def stop(self) -> None:
|
||||
self._stop_requested = True
|
||||
|
||||
def _decide_delay(self) -> float:
|
||||
interval_bounds = self.interval_bounds
|
||||
val = expovariate(1. / self.interval_mean)
|
||||
if interval_bounds[0] is not None:
|
||||
val = max(val, interval_bounds[0])
|
||||
if interval_bounds[1] is not None:
|
||||
val = min(val, interval_bounds[1])
|
||||
return val
|
||||
|
||||
def run(self) -> None:
|
||||
while not self._stop_requested:
|
||||
item_id, features = generate_request_data()
|
||||
try:
|
||||
response = post_item(
|
||||
self._session, self.endpoint, item_id, features, max_retries=self.max_retries,
|
||||
)
|
||||
except IOError as err:
|
||||
logging.warning('%s: %s', str(err), str(err.__cause__))
|
||||
raise err
|
||||
else:
|
||||
logging.debug('Success: %s %s', response.status_code, response.reason)
|
||||
sleep(self._decide_delay())
|
||||
|
||||
|
||||
def _build_termination_handler(requester: Requester) -> Callable[[int, FrameType | None], None]:
|
||||
def termination_handler(sig: int, frame: FrameType | None) -> None:
|
||||
_ = sig
|
||||
_ = frame
|
||||
requester.stop()
|
||||
return termination_handler
|
||||
|
||||
|
||||
def _configure_logging(level: int, quiet: bool) -> None:
|
||||
if quiet:
|
||||
level = logging.CRITICAL + 1
|
||||
logging.basicConfig(
|
||||
level=level, format='%(asctime)s %(levelname)s %(message)s', stream=sys.stderr,
|
||||
)
|
||||
|
||||
|
||||
def _setup_signal_handlers(requester: Requester) -> None:
|
||||
termination_handler = _build_termination_handler(requester)
|
||||
for sig in (SIGINT, SIGTERM):
|
||||
signal(sig, termination_handler)
|
||||
|
||||
|
||||
def _validate_cli_interval_bound(string: str) -> float | None:
|
||||
string = string.lower()
|
||||
if string in ('', 'null', 'none'):
|
||||
return None
|
||||
return float(string)
|
||||
|
||||
|
||||
def _validate_cli_interval_bounds(string: str) -> tuple[float | None, float | None]:
|
||||
string = string.lower()
|
||||
if string in ('', 'null', 'none'):
|
||||
return (None, None)
|
||||
min_string, max_string = string.split(',', 1)
|
||||
return cast(
|
||||
tuple[float | None, float | None],
|
||||
tuple(map(_validate_cli_interval_bound, (min_string, max_string)))
|
||||
)
|
||||
|
||||
|
||||
def _validate_cli_max_retries(string: str) -> int:
|
||||
val = int(string)
|
||||
if val < 0:
|
||||
raise ValueError(f'Max retries should be >=0, given {val}')
|
||||
return val
|
||||
|
||||
|
||||
def _validate_cli_logging_level(string: str) -> int:
|
||||
return {
|
||||
'debug': logging.DEBUG,
|
||||
'info': logging.INFO,
|
||||
'warning': logging.WARNING,
|
||||
'error': logging.ERROR,
|
||||
'critical': logging.CRITICAL,
|
||||
}[string]
|
||||
|
||||
|
||||
def parse_args(argv):
|
||||
parser = ArgumentParser(
|
||||
description=(
|
||||
'Регулярная отправка POST-запросов на эндпоинт предсказания цены.'
|
||||
' Остановка по SIGINT / SIGTERM.'
|
||||
),
|
||||
allow_abbrev=False,
|
||||
exit_on_error=True,
|
||||
)
|
||||
parser.add_argument('base_url', type=str, nargs='?')
|
||||
parser.add_argument('--interval-mean', type=float, dest='interval_mean')
|
||||
parser.add_argument(
|
||||
'--interval-bounds', type=_validate_cli_interval_bounds, dest='interval_bounds',
|
||||
)
|
||||
parser.add_argument(
|
||||
'--max-retries',
|
||||
type=_validate_cli_max_retries,
|
||||
default=MAX_RETRIES_DEFAULT,
|
||||
dest='max_retries',
|
||||
)
|
||||
parser.add_argument('-q', '--quiet', action='store_true', dest='quiet')
|
||||
parser.add_argument(
|
||||
'--log-level',
|
||||
default=logging.WARNING,
|
||||
type=_validate_cli_logging_level,
|
||||
dest='logging_level',
|
||||
)
|
||||
args = parser.parse_args(argv[1:])
|
||||
if args.base_url is None:
|
||||
args.base_url = getenv('API_BASE_URL')
|
||||
if args.base_url is None:
|
||||
raise RuntimeError('No API base URL specified')
|
||||
if (args.interval_mean is not None) and (args.interval_mean <= 0):
|
||||
raise ValueError(f'Interval mean should be > 0, given {args.interval_mean}')
|
||||
if (
|
||||
(args.interval_bounds is not None)
|
||||
and all((b is not None) for b in args.interval_bounds)
|
||||
and (args.interval_bounds[0] > args.interval_bounds[1])
|
||||
):
|
||||
raise ValueError(f'Interval bounds should be b_1 <= b_2, given {args.interval_bounds!r}')
|
||||
if args.interval_mean is not None:
|
||||
if args.interval_bounds is None:
|
||||
args.interval_bounds = ((args.interval_mean / 5), (args.interval_mean * 5))
|
||||
else:
|
||||
args.interval_mean = INTERVAL_MEAN_DEFAULT
|
||||
args.interval_bounds = INTERVAL_BOUNDS_DEFAULT
|
||||
return args
|
||||
|
||||
|
||||
def main(argv):
|
||||
args = parse_args(argv)
|
||||
_configure_logging(args.logging_level, args.quiet)
|
||||
logging.debug('Creating a Requester with base URL: %s', args.base_url)
|
||||
requester = Requester(
|
||||
args.base_url,
|
||||
interval_mean=args.interval_mean,
|
||||
interval_bounds=args.interval_bounds,
|
||||
max_retries=args.max_retries,
|
||||
)
|
||||
_setup_signal_handlers(requester)
|
||||
requester.run()
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
sys.exit(int(main(sys.argv) or 0))
|
||||
@ -0,0 +1,37 @@
|
||||
from os import (
|
||||
getenv,
|
||||
#kill,
|
||||
)
|
||||
#from signal import SIGINT, SIGTERM
|
||||
from subprocess import (
|
||||
run,
|
||||
#Popen,
|
||||
)
|
||||
import sys
|
||||
|
||||
|
||||
#_child_proc: Popen | None = None
|
||||
|
||||
|
||||
#def _forward_signal(sig, frame):
|
||||
# _ = frame
|
||||
# if _child_proc is None:
|
||||
# return
|
||||
# if _child_proc.pid in (SIGINT, SIGTERM):
|
||||
# kill(_child_proc.pid, sig)
|
||||
# else:
|
||||
# raise RuntimeError(f'Attempted to forward an unexpected signal: {sig}')
|
||||
|
||||
|
||||
def main(argv):
|
||||
argv = list(argv) # copy
|
||||
base_url = getenv('API_BASE_URL')
|
||||
if base_url is None:
|
||||
raise RuntimeError('API_BASE_URL is not specified')
|
||||
argv.append(base_url) # HACK: ...
|
||||
result = run(['python', '-m', 'tester', *argv[1:]], check=False)
|
||||
return result.returncode
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
sys.exit(int(main(sys.argv) or 0))
|
||||
@ -1,5 +1,7 @@
|
||||
fastapi ~=0.120.4
|
||||
mlxtend ~=0.23.4
|
||||
pandas >=2.3.1,<3
|
||||
prometheus_client ~=0.23.1
|
||||
prometheus_fastapi_instrumentator >=7.0.2,<8
|
||||
scikit-learn >=1.7.2,<2
|
||||
uvicorn ~=0.38.0
|
||||
|
||||
@ -0,0 +1 @@
|
||||
data/
|
||||
@ -0,0 +1,15 @@
|
||||
global:
|
||||
scrape_interval: 15s
|
||||
scrape_timeout: 5s
|
||||
|
||||
scrape_configs:
|
||||
|
||||
- job_name: "prices_predictor"
|
||||
static_configs:
|
||||
- targets:
|
||||
- "prices-predictor:8000"
|
||||
scheme: http
|
||||
metrics_path: "/metrics"
|
||||
#relabel_configs:
|
||||
# - source_labels: ["__address__"]
|
||||
# target_labels: "instance"
|
||||
Загрузка…
Ссылка в новой задаче