ar
Feedback
About Python [ru]

About Python [ru]

الذهاب إلى القناة على Telegram

Пишем на Python, создаём нейросети и ИИ-агентов. Алгоритмы, задачи и вайбкодинг. Личный блог автора - @just_genych По вопросам рекламы или разработки: @g_abashkin

إظهار المزيد
6 572
المشتركون
+324 ساعات
-237 أيام
+6030 أيام
أرشيف المشاركات
ИИ vs ЧЕЛОВЕК / AI УЖЕ МНОГОЕ УМЕЕТ, НО НЕ ТАК КАК ТЫ ... Нейросети уже пишут, рисуют и отвечают 24/7. Это мощно, и мы за про
ИИ vs ЧЕЛОВЕК / AI УЖЕ МНОГОЕ УМЕЕТ, НО НЕ ТАК КАК ТЫ ... Нейросети уже пишут, рисуют и отвечают 24/7. Это мощно, и мы за прогресс. Но есть вещи, которые алгоритмы никогда не заменят: — эмпатию к клиенту — доверие, которое строится годами — продажи без манипуляций, с душой ⚠️ Технологии — это инструмент, а главное — это ты и твой живой контакт. Приглашаем тебя в ЭКО-Пространство, где технологии — это фон, а главное — это ты и твой клиент ✔️ В этой ПОДБОРКЕ есть кое-что поважнее алгоритмов — ДОВЕРИЕ. В папке собраны каналы про экологичные продажи, про понимание, про рост без выгорания. Пусть ИИ пишет тексты, а ты учись создавать отношения. 💚 Добавляй папку в свой актив и делись с друзьями! 📌 Ссылка ➡️ https://t.me/addlist/9wQJPILNMKNkNmNk 👉 Делимся знаниями и аудиторией — растём вместе ⚡️

Gemini vs ChatGPT: СМЕНА ФАВОРИТОВ ... вот что вышло 👇 * Все вокруг обсуждают ChatGPT, а я нашел альтернативу, которая реаль
Gemini vs ChatGPT: СМЕНА ФАВОРИТОВ ... вот что вышло 👇 * Все вокруг обсуждают ChatGPT, а я нашел альтернативу, которая реально качает — Gemini от Google. Пользуюсь и очень доволен. Почему стоит попробовать: ✔️ Бесплатно (базовая версия) ✔️ Контекст 2 млн токенов — загружайте хоть целые кодобазы ✔️ Понимает текст, картинки, видео и аудио ✔️ Дружит с Google Диском, Gmail и календарем ✔️ Код пишет на уровне топ-моделей Решил проверить его в деле — и не прогадал. Попросил Gemini найти для меня экспертные каналы по IT и AI, чтобы собрать чистое инфополе с нуля и не делать все вручную. Закинул ссылки на проверенных авторов, и нейросеть сама проанализировала сотни рекомендаций, отсеяв пустышки. Результат — готовая подборка из 20+ каналов с реальным опытом по: AI-воркфлоу, автоматизации, вайб-кодингу, промт-инжинирингу, RAG-системам, нейрогенерации, крипте и др. 🔗 Забирайте список в один клик 👇 https://t.me/addlist/9wQJPILNMKNkNmNk * Пишите в комменты — пробовали Gemini? Делитесь с друзьями впечатлениями и добавляйте подборку в свой актив 📌

Профилирование async-генераторов: GC-latency, HWM и паттерны утечки корутин в high‑load FastAPI‑сервисах В продакшене async-генераторы часто воспринимаются как идеальный механизм для стриминга больших данных. Но при анализе p99 latency я обнаружил, что основной источник задержек — не медленные запросы к БД, а скрытые проблемы с утечками корутин и сборкой мусора. Проблема 1: GC-latency при разрыве соединения Когда клиент прерывает соединение, async-генератор продолжает висеть с циклическими ссылками. Поколенческий сборщик мусора начинает аварийные сборки, и latency может улетать за секунду.
async def stream_data():
    for i in range(10**6):
        yield await fetch_chunk(i)
Утечка: клиент ушёл, но генератор не завершён. Решение — finally с aclose() или обёртка через @contextlib.asynccontextmanager. Правило: если ты не контролируешь время жизни генератора, контролируй очистку. Проблема 2: HWM (high water mark) и резервирование стека Каждый async-генератор резервирует стек корутины при создании. В проде с тысячами одновременных запросов это даёт ощутимый overhead. Для профилирования используйте gc.get_objects() с фильтром на AsyncGeneratorType:
import gc, types
from collections import Counter

gen_count = Counter()
for obj in gc.get_objects():
    if isinstance(obj, types.AsyncGeneratorType):
        gen_count[type(obj).__name__] += 1
Рост счётчика — явный признак утечки. HWM можно оценить через sys.getsizeof(), но лучше фокусироваться на количестве живых генераторов. FastAPI-specific паттерны утечек На ревью часто вижу три типичные ошибки: - Тайм-ауты: FastAPI отменяет задачу, но aclose() не вызывается. - Циклические ссылки: генератор держит request-объект, GC в тупике. - SSE-генераторы, висящие вечно, если клиент не закрыл соединение. В продакшене включаю PYTHONASYNCIODEBUG=1 для ловли Task was destroyed but it is pending. В тестовых средах — gc.set_debug(gc.DEBUG_LEAK). Для стриминга FastAPI использую шаблон с aclosing:
from contextlib import aclosing

async def safe_stream():
    async with aclosing(async_generator()):
        async for item in async_generator():
            yield item
Практический совет: всегда оборачивайте async-генераторы в контекстный менеджер с гарантированным вызовом aclose(). Это снижает p99 latency на сотни миллисекунд. Вывод: Один забытый async-генератор в high-load FastAPI-сервисе способен превратить стриминг в источник неконтролируемых задержек, поэтому профилирование GC и утечек корутин — обязательный шаг при оптимизации latency.

asyncio зависает без ошибок? TaskGroup, timeout-декораторы и context vars для надежного трейсинга Когда asyncio-задача “зависает”, стектрейс часто пуст или уводит в Event Loop. В production с многотысячными коннектами это тихая катастрофа: задача не падает, но и не завершается, ресурсы утекают. Разбираем три приёма, которые превращают отладку из гадания в детерминированный процесс. TaskGroup и asyncio.timeout: границы времени жизни С asyncio.TaskGroup (Python 3.11+) каждая задача имеет явный контекст. Комбинируя его с asyncio.timeout, получаем детектор зависаний:
async def safe_polling():
    async with asyncio.TaskGroup() as tg:
        async with asyncio.timeout(5.0):
            task = tg.create_task(long_pipeline())
Плюс: при превышении лимита – TimeoutError с отменой корутины. Минус: нужно явно оборачивать каждую группу. Timeout-декоратор: защита на уровне функции Для всех внешних вызовов (API, базы, очереди) декоратор автоматически ставит таймаут:
import asyncio
from functools import wraps

def timeout(max_time: float):
    def decorator(coro):
        @wraps(coro)
        async def wrapper(*args, **kwargs):
            try:
                return await asyncio.wait_for(
                    coro(*args, **kwargs), timeout=max_time)
            except asyncio.TimeoutError:
                log.warning(f"{coro.__name__} exceeded {max_time}s")
                raise
        return wrapper
    return decorator

@timeout(3.0)
async def fetch_external_data(): ...
asyncio.wait_for корректно отменяет корутину, не оставляя её в состоянии “in progress”. Context Vars для трейсинга: кто вызвал задачу contextvars.ContextVar хранит идентификатор запроса или таски. При таймауте логгер выводит полную цепочку:
request_id = contextvars.ContextVar('request_id', default=None)

async def handler(call_id: str):
    request_id.set(call_id)
    async with asyncio.TaskGroup() as tg:
        tg.create_task(process())
В логах видно request_id зависшей задачи — это ключ к поиску в трейсинге (OpenTelemetry, Jaeger). Что ещё проверить * Блокирующий синхронный код (requests.get вместо aiohttp)? * Забытый await – asyncio пускает корутину без ошибки. * Обилие таймаутов на разных уровнях: один для HTTP, другой для всей группы. Вывод: Замороженные asyncio-задачи отлавливаются только комбинацией явных границ времени (TaskGroup + timeout) и трейсинга исполнения (context vars), а не надеждой на “авось завершится”.

Многопоточная обработка in-memory данных с нулевым копированием: memoryview и буферы numpy Когда несколько потоков читают одни и те же данные, первое, что приходит в голову — скопировать каждый кусок отдельно. Потом смотришь на профилировщик и видишь, что 40% времени ушло на эти копирования. В production это убивает производительность в задачах обработки видео, аудио или бинарных протоколов. Буферный протокол и memoryview Memoryview и буферный протокол numpy позволяют читать одни и те же данные из разных потоков без единого лишнего байта. Берём bytearray на миллион байт, создаём memoryview, режем на куски и отдаём потокам. Каждый поток через np.frombuffer получает ndarray, который смотрит ровно в ту же память.
import numpy as np
import threading

shared_data = bytearray(1_000_000)
shared_view = memoryview(shared_data)

def process_chunk(offset, size):
    chunk = np.frombuffer(shared_view[offset:offset+size], dtype=np.uint8)
    chunk[:] = (chunk * 2 + 10) % 256

threads = []
chunk_size = 100_000
for i in range(0, len(shared_data), chunk_size):
    t = threading.Thread(target=process_chunk, args=(i, chunk_size))
    threads.append(t)
    t.start()

for t in threads:
    t.join()
Ключевой момент: GIL снимается, когда numpy вызывает C-код. Поэтому CPU-bound задачи с numpy действительно ускоряются в потоках, и не надо сразу лезть в multiprocessing. Ограничения и типичная ошибка Memoryview работает только с contiguous буферами. Если массив со stride — приходится делать np.ascontiguousarray, а это уже копия. По опыту, чаще всего данные из файлов или сети идут подряд, так что проблема не смертельная. Типичная ошибка — забыть проверить, что буфер contiguous, и получить неявную копию. Оптимизация для numpy Если данные уже лежат в numpy, можно не создавать memoryview. У ndarray есть буферный протокол, и np.frombuffer скушает его напрямую. Меньше телодвижений, но проверка на contiguous всё равно нужна. Где это выстреливает в production Это реально ускоряет: обработка видео в реальном времени, аудиофреймы, высокочастотные тикеры, разбор бинарных протоколов, чтение больших HDF5 и FASTQ. Везде, где данных много, а копировать их больно. Вывод: Нулевое копирование через memoryview и буферы numpy — ключ к ускорению многопоточных in-memory задач, но только с contiguous буферами и пониманием, что GIL снимается в C-коде.

Получи грант до 3,48 млн на обучение дизайну Поступай на дизайн в Центральный университет с грантом. Для учеников 10–11-х кла
Получи грант до 3,48 млн на обучение дизайну Поступай на дизайн в Центральный университет с грантом. Для учеников 10–11-х классов и СПО. Освой графический, UI/UX и продуктовый дизайн. Создавай визуальные концепты будущего. На программе студенты получают фундаментальную базу, развивают прикладные навыки, приобретают опыт работы над реальными проектами, собирают портфолио и строят связи внутри дизайн-сообщества Подать заявку #реклама 16+ cu.ru О рекламодателе

Clock Skew в распределённых Python-системах: NTP не панацея Время в распределённой системе — штука хитрая. Даже если на всех узлах крутится NTP, погрешность в локальной сети легко даёт 1–50 мс. В облаке может быть хуже. И это не баг, а особенность: clock drift накапливается. Проблема в том, что time.time() и datetime.now() — это по сути случайные числа, когда нужно упорядочить события с разных машин. Сравнил два timestamp с разных узлов и получил бороду. Логические часы Лэмпорта Первый вариант — каждый узел хранит счётчик, инкрементит на каждом событии. Просто, дёшево, но от физического времени отрываешься полностью. Иногда это ок, но для многих сценариев (например, метрик с временными метками) нужна привязка к реальному времени. Гибридные логические часы (HLC) Более живой вариант — скрещиваешь физическое время с логическим счётчиком.
class HLC:
    def __init__(self, node_id, max_drift=50e-3):
        self.node_id = node_id
        self.max_drift = max_drift
        self.l = 0
        self.pt = time.time()

    def now(self):
        now_pt = time.time()
        if now_pt > self.pt + self.max_drift:
            self.l = 0
            self.pt = now_pt
        else:
            self.l += 1
        return (self.pt, self.l, self.node_id)

    def receive(self, msg_time):
        now_pt = time.time()
        if now_pt > self.pt and now_pt > msg_time[0]:
            self.l = 0
            self.pt = now_pt
        else:
            self.l = max(self.l, msg_time[1]) + 1
            self.pt = max(self.pt, msg_time[0])
Ключевая идея: если физическое время ушло вперёд больше чем на max_drift — сбрасываешь счётчик. При получении сообщения берёшь максимум из локального времени и времени отправителя. Работает как в Cassandra, Spanner. Как измерять skew Типичная ошибка — полагаться на time.time() без мониторинга. Бери ntplib, дёргай общий пул, смотри разброс. Или используй tcpdump на RTT между узлами. На одном узле — time.monotonic() для интервалов, чтобы не зависеть от перевода часов. Практический совет: вычитай медиану skew из всех меток. Для критичных операций — голосование за "истинное время", как TrueTime в Spanner. Когда без этого никак Детерминированное воспроизведение событий в отладке, conflict resolution в CRDT на основе timestamp, синхронизация транзакций в многомастерной репликации. В production это разница между "работает" и "работает надежно". Вывод: Clock skew — не баг, а инженерная задача, которую решают гибридные логические часы и мониторинг смещения, а не слепая вера в NTP.

Zero-Copy инференс ONNX в Python: убиваем лишние копирования При развертывании ONNX-моделей в production каждая микросекунда на счету. Часто узким местом становится не сам инференс, а оверхэд на передачу данных между Python и C++ рантаймом ONNX Runtime. Решение — zero-copy тензоры. По умолчанию ONNX Runtime (ORT) делает копию входных тензоров из numpy-массивов в свой внутренний формат. Для больших батчей это может добавить 10-30% времени к инференсу. Я наступал на эти грабли: модель считает 5 мс, а общее время запроса — 7 мс, и ты гадаешь, куда утекает. Как работает zero-copy с OrtValue Вместо ort_session.run(None, {'input': numpy_array}) можно передавать OrtValue напрямую:
import onnxruntime as ort
import numpy as np

input_data = np.random.randn(1, 3, 224, 224).astype(np.float32)
ort_input = ort.OrtValue.ort_value_from_numpy(input_data)

session = ort.InferenceSession('model.onnx')
results = session.run_with_ort_values({'input': ort_input})
output_tensor = results[0].numpy()
Тут есть нюанс: run_with_ort_values возвращает OrtValue, и .numpy() может вернуть view на те же данные с нулевым копированием, если тензор выровнен. Переиспользование буфера для максимальной производительности Чтобы минимизировать оверхэд еще сильнее, можно выделить буфер один раз и переиспользовать его:
buffer_np = np.empty((1, 3, 224, 224), dtype=np.float32)
buffer_ort = ort.OrtValue.ort_value_from_numpy(buffer_np)

# В цикле инференса
np.copyto(buffer_np, new_input_data)
results = session.run_with_ort_values({'input': buffer_ort})
Замерил на batch_size=1, 1000 запросов, ONNX Runtime 1.18: - Стандартный run: 2.1 мс на запрос (включая 0.3 мс на копирование) - Zero-copy с OrtValue: 1.8 мс (копирование 0 мс) - + предварительная аллокация: 1.7 мс Zero-copy дает около 15% ускорения. Для высоконагруженных сервисов это уже заметно. Типичная ошибка: когда zero-copy не сработает Если входной тензор не выровнен (non-contiguous) — ORT сделает copy. Спасает np.ascontiguousarray(). Если модель меняет форму тензора на входе — копирование неизбежно, тут ничего не поделать. Рекомендации для прода - Используйте OrtValue.ort_value_from_numpy() вместо run(). - Переиспользуйте OrtValue-буферы. - Включайте ort.SessionOptions().enable_cpu_mem_arena = True. Не ждите чуда, но 15% выньете просто так. Вывод: Zero-copy с OrtValue и переиспользование буферов — простой и надежный способ снизить latency инференса ONNX-моделей на 10-15% без изменения архитектуры.

Конфиг, который не ломается: строгая типизация, секреты и валидация runtime-параметров В production конфиг часто выглядит как мешанина os.environ с копипастой и опечатками в именах переменных. Это прямой путь к ошибке при старте или утечке секрета в лог через config.dict(). Pydantic Settings v2 решает это одним классом — с типизацией, дефолтами и валидацией на этапе инициализации. Контроль типов и дефолтов Вы описываете структуру конфига, и если переменная не задана без дефолта — приложение не запустится. Это исключает runtime-сюрпризы. SecretStr прячет секреты в repr: даже при выводе объекта в лог api_key отображается только звездочками.
class AppConfig(BaseSettings):
    model_config = SettingsConfigDict(env_file=".env", extra="ignore")
    database_url: str
    api_key: SecretStr
    max_connections: int = 10
    debug: bool = False

    @field_validator("max_connections")
    @classmethod
    def ensure_positive(cls, v):
        if v <= 0:
            raise ValueError("must be positive")
        return v

    @field_validator("database_url")
    @classmethod
    def validate_scheme(cls, v):
        if not v.startswith("postgresql://"):
            raise ValueError("Only PostgreSQL supported")
        return v
Бизнес-логика на уровне конфига Валидаторы проверяют не только типы, но и бизнес-правила. Для data pipeline, требующего только определенный диалект SQL, это задается в одном месте, а не размазывается по модулям. Такой подход делает код читаемее и проще в поддержке. Типичная ошибка: лишние переменные из .env Без extra="ignore" случайная строка в .env вызовет ошибку. Другая проблема — путаница с env_prefix в подмодулях, когда конфиги перекрывают поля друг друга. Включите env_nested_delimiter для плоских переменных с вложенными моделями, чтобы избежать лишней вложенности. Production-ready: кэшируем и расширяем Для однократного создания конфига используем lru_cache, а дальше можно миксовать с аргументами командной строки через CliSettingsSource. С одним источником истины для env, файлов и флагов приложение становится проще тестировать и развёртывать. Вывод: Инкапсуляция конфига через Pydantic Settings — это не просто типы, а гарантия, что приложение не стартанёт с неправильными данными, а секреты не утекут из-за банального логирования.

ExceptionGroup + Rich: как читать production traceback, когда падает не одно, а всё сразу В production с asyncio или concurrent.futures стандартный traceback — это простыня, где последняя ошибка съедает все предыдущие. Вы видите только финал, а корень теряется. Особенно когда цепочка задач падает каждая со своим исключением, и вы не понимаете, что именно пошло не так и в каком порядке. Проблема: traceback теряет контекст Обычный traceback показывает только последнюю ошибку. Если у вас 10 задач и 5 упали с разными исключениями, вы получите только одно. Всё, что было до него, исчезает. Для asyncio.gather() или concurrent.futures это катастрофа: ошибка может быть не в последней задаче, а в первой, но её уже нет. Решение: ExceptionGroup + Rich Python 3.11 ввёл ExceptionGroup (PEP 654). Он группирует несколько исключений в одно, сохраняя полную структуру. Но сам по себе он неудобен для чтения. Rich решает это: он рендерит ExceptionGroup с цветами, вложенностью и локальными переменными для каждого исключения.
import asyncio
from rich.console import Console
from rich.traceback import install

install(show_locals=True)
console = Console()

async def task(id: int):
    if id % 2 == 0:
        raise ValueError(f"Even ID: {id}")
    raise RuntimeError(f"Odd ID: {id}")

async def run():
    errors = []
    for i in range(4):
        try:
            await task(i)
        except Exception as e:
            errors.append(e)
    if errors:
        raise ExceptionGroup("Tasks failed", errors)

try:
    asyncio.run(run())
except ExceptionGroup as eg:
    console.print_exception(show_locals=True)
Что даёт такой подход в production Три вещи, которые превращают stack trace из мусора в actionable insight: * визуальная группировка: видно, какие ошибки относятся к одному запуску задач, а не размазаны по логу; * локализация: Rich показывает локальные переменные на момент падения для каждого исключения — не нужно гадать, какое значение было у id; * иерархия: если у вас вложенные ExceptionGroup, структура не теряется, и вы видите, где какая ошибка возникла. Практический совет Добавляйте в ExceptionGroup метаданные — время, id запроса, контекст выполнения. Тогда по логу сразу понятно, что именно сломалось и при каких условиях. Для Python < 3.11 используйте пакет exceptiongroup — он работает аналогично и ставится через pip. Типичная ошибка Пытаться обработать ExceptionGroup как обычное исключение через except Exception. Это сломает логику: вы потеряете все вложенные ошибки. Используйте except* (PEP 654) или явно перехватывайте ExceptionGroup. Trade-off ExceptionGroup добавляет накладные расходы на сбор ошибок, если их много. Используйте только для задач, где ошибки реально могут быть множественными (batch processing, фоновые воркеры), а не для одиночных вызовов. Вывод: ExceptionGroup вместе с Rich превращает traceback из хаоса в структурированную карту боя, где каждая ошибка видна со своим контекстом, и вы не гадаете, что сломалось на самом деле.

Circuit Breaker для gRPC и HTTP: как остановить каскадный сбой одним паттерном Когда сервис падает, retry-механизмы без ограничений начинают долбить его снова — соединения висят, треды блокируются, нагрузка перекидывается на соседей. В production это приводит к каскадному обрушению, которое могло быть предотвращено. Частая ошибка — полагаться только на таймауты и надеяться на авось. Как работает паттерн Три состояния: Closed — нормальная работа, Open — запросы не отправляются, ошибка возвращается мгновенно, Half-Open — после таймаута пробуется один запрос. Если успешен — снова Closed, иначе — возврат в Open. Пример для aiohttp
import aiohttp
from pybreaker import CircuitBreaker, CircuitBreakerError

breaker = CircuitBreaker(fail_max=3, reset_timeout=30)

async def call_external(url):
    try:
        with breaker:
            async with aiohttp.ClientSession() as session:
                async with session.get(url) as resp:
                    return await resp.text()
    except CircuitBreakerError:
        return {"error": "Service temporarily unavailable"}
Три ошибки подряд — breaker переходит в Open. Через 30 секунд Half-Open пробует восстановиться. Ресурсы не тратятся на мёртвые запросы. gRPC: то же, но с нюансами
import grpc
from pybreaker import CircuitBreaker

breaker = CircuitBreaker(fail_max=5, reset_timeout=60)

async def call_grpc(stub, request):
    try:
        with breaker:
            response = await stub.GetData(request)
            return response
    except CircuitBreakerError:
        return default_response  # fallback
Здесь нужно учитывать: gRPC при проблемах с доступностью шлёт статус UNAVAILABLE. Исключение внутри with-блока тоже считается как сбой, поэтому добавляйте логику отлова специфичных ошибок — иначе сломаете Half-Open. Типичные ошибки * Один breaker на все сервисы. Если gRPC-вызов уронил breaker, то HTTP к другому эндпоинту тоже заблокируется. Создавайте отдельный breaker для каждого внешнего сервиса. * Игнорирование Half-Open. Без мониторинга вы не увидите, сколько раз breaker пытался восстановиться и снова падал. * Слишком малый reset_timeout. При частом переходе между состояниями breaker начинает дёргаться и теряет смысл — выбирайте таймаут, достаточный для восстановления сервиса. Вывод: Circuit Breaker — минимальная инженерная защита от каскадных сбоев, которая экономит ресурсы и сохраняет стабильность системы при отказах внешних зависимостей.

GIL в threading — это боль, которую многие просто принимают как данность До Python 3.13 альтернатив не было: multiprocessing с оверхедом и морокой передачи данных, или asyncio, бесполезный при CPU-bound задачах. Реальные кейсы — обработка данных, парсинг, криптография — где threading кажется логичным, но GIL заставляет ядра простаивать, а однопоточный код обгоняет многопоточный. Кейс из продакшна: пул потоков без прироста Я написал пул потоков для обработки пакетов данных: каждый поток выполнял CPU-bound трансформацию. GIL не отпускался, прирост 0%. Переписал на multiprocessing — 3x, но память выросла в 4 раза. Типичная ошибка: думать, что threading даст параллелизм для чистых вычислений. Free-threaded Python 3.13: сборка без GIL В 3.13 появился флаг --disable-gil. Потоки наконец работают параллельно. На четырех ядрах прирост по CPU-bound задачам — 3-4x. Пример теста:
# threading с GIL (стандартная сборка)
import threading, time

def work():
    for _ in range(10**7):
        x = 1 + 1

threads = [threading.Thread(target=work) for _ in range(4)]
start = time.time()
for t in threads: t.start()
for t in threads: t.join()
print(f"С GIL: {time.time() - start:.2f}s")
# Вывод: ~2.5s (почти как последовательно)
# free-threaded (сборка без GIL)
# Тот же код — результат ~0.8s (на 4 ядрах)
Практический совет и предупреждение Совет: если проект уперся в GIL, соберите Python 3.13 с --disable-gil, прогоните тесты. Но учтите trade-offs: * однопоточный режим проседает на 10% * C-расширения (numpy, pandas) без пересборки падают — они завязаны на GIL * стабильная поддержка обещана только в 3.14 Типичная ошибка: кидаться пересобирать всё сразу. Начните с изолированного модуля, проверьте совместимость библиотек. Вывод: Free-threaded Python 3.13 — это первая реальная альтернатива multiprocessing, но применяйте её осознанно, с пониманием просадки однопоточного режима и готовностью к экспериментам.

Cache stampede в Python-сервисах: singleflight, jitter и stale-while-revalidate без героического тушения latency spike Cache stampede возникает, когда популярный ключ истёк, и сотни запросов одновременно пересчитывают одно значение: SQL-агрегацию, внешний API или ML inference. Частая ошибка - считать, что обычный TTL сам по себе защищает production. Singleflight: один refresh на ключ Для одного cache_key в момент времени должен работать один пересчёт, остальные ждут или получают stale. Внутри процесса подойдёт asyncio.Lock на ключ, но это не защита для нескольких uvicorn/gunicorn workers или pod’ов. Там нужен Redis SET NX PX, lease-lock, PostgreSQL advisory lock или singleflight поверх общего хранилища.
lock = locks.setdefault(key, asyncio.Lock())

async with lock:
    item = await cache.get(key)
    if item and item["expires_at"] > time.time():
        return item["value"]

    value = await fetch()
    await store(cache, key, value)
Jitter: не синхронизируйте истечение Если после деплоя прогреть 50k ключей с ttl=60, через минуту они начнут истекать пачкой. Практичнее так:
ttl = 60
ttl = ttl + random.uniform(0, ttl * 0.15)
Особенно важно для агрегатов, feature flags, кэша внешних API и scheduled prewarm jobs. Stale-while-revalidate: старое лучше лавины Формат записи:
{
    "value": value,
    "expires_at": expires_at,
    "stale_until": expires_at + 300,
}
Логика простая: - свежий TTL жив - отдаём кэш; - TTL истёк, но stale_until жив - отдаём stale и обновляем в фоне; - stale-окно истекло - ждём refresh или возвращаем controlled error. Production-нюансы - lock обязан иметь TTL, иначе упавший воркер заблокирует refresh; - refresh должен иметь timeout, retry budget и circuit breaker; - stale нельзя бездумно включать для балансов, прав доступа и лимитов; - метрики обязательны: cache_hit, stale_hit, lock_wait_seconds, refresh_errors. Вывод: Защита от cache stampede - это не один lock, а согласованный дизайн TTL, singleflight, jitter, stale-окон и отказоустойчивого refresh.

Миграции БД без даунтайма в Python-сервисах: Alembic, expand/contract и совместимость версий кода Zero-downtime миграции важн
Миграции БД без даунтайма в Python-сервисах: Alembic, expand/contract и совместимость версий кода Zero-downtime миграции важны там, где сервисы деплоятся rolling-ом: API, workers, async jobs. Частая ошибка - считать, что alembic upgrade head перед релизом решает совместимость схемы и кода. Expand/contract Схему меняем не одним ударом, а фазами: * expand - добавляем новое так, чтобы старый код не сломался * деплоим код, совместимый со старой и новой схемой * делаем backfill, dual-write, переключение чтения * contract - удаляем старое только после ухода всех старых инстансов В Kubernetes, Nomad или systemd rolling deployment в проде какое-то время живут две версии сервиса. Миграция должна быть совместима минимум с текущим и следующим кодом. Пример: first_name/last_name -> full_name Плохой вариант: добавить full_name NOT NULL, удалить старые колонки и выкатить код. Старая версия сервиса начнет писать в удаленные поля и упадет. Нормальный expand:
from alembic import op
import sqlalchemy as sa

def upgrade():
    op.add_column(
        "users",
        sa.Column("full_name", sa.Text(), nullable=True),
    )
    with op.get_context().autocommit_block():
        op.create_index(
            "ix_users_full_name",
            "users",
            ["full_name"],
            postgresql_concurrently=True,
        )
Новый код сначала живет в переходном режиме:
def get_display_name(user) -> str:
    return user.full_name or f"{user.first_name} {user.last_name}"

user.first_name = first_name
user.last_name = last_name
user.full_name = f"{first_name} {last_name}"
Практические правила * backfill делайте отдельным job, батчами, с лимитами, паузами и метриками * не запускайте тяжелые data migration внутри DDL-миграции Alembic * DROP COLUMN, RENAME COLUMN, SET NOT NULL и смену типа выносите в contract * NOT NULL добавляйте после заполнения данных и проверки консистентности * Alembic в проде должен запускать один контролируемый runner, а не каждый инстанс приложения Вывод: Zero-downtime миграция - это не SQL-команда, а протокол совместимости между схемой, кодом, деплоем и данными.

SQLAlchemy 2.0 pool под нагрузкой: где ломается и как не уронить PostgreSQL Типичная ошибка - считать pool_size=20 ускорителе
SQLAlchemy 2.0 pool под нагрузкой: где ломается и как не уронить PostgreSQL Типичная ошибка - считать pool_size=20 ускорителем. В production API это лимит конкурентных DB-соединений на один процесс, и при 8 workers база уже видит до 160 соединений без учета overflow. Пул должен делать backpressure Его задача - не выжать максимум, а дозировать давление на PostgreSQL: поставить запрос в очередь или быстро отказать.
engine = create_engine(
    dsn,
    pool_size=10,
    max_overflow=0,
    pool_timeout=2,
    pool_recycle=1800,
    pool_pre_ping=True,
    connect_args={"options": "-c statement_timeout=5000"},
)
Считайте глобальный лимит Формула для сервиса: workers * pool_size + workers * max_overflow Это число должно быть меньше бюджета PostgreSQL по соединениям. Не забудьте миграции, фоновые задачи, админские сессии и другие сервисы. Осторожно с max_overflow Overflow под всплеском часто создает stampede: приложение «помогает» базе, открывая еще больше конкурирующих запросов. Для latency-sensitive API безопаснее max_overflow=0: лишние запросы дождутся pool_timeout и упадут в приложении, а не добьют БД. Таймауты решают разные задачи * pool_timeout - сколько ждать свободное соединение из пула * statement_timeout - сколько PostgreSQL выполняет SQL-запрос Не ставьте pool_timeout=30 без причины: worker может 30 секунд просто ждать коннект. Часто 1-3 секунды надежнее длинной очереди. Stale connections pool_pre_ping=True защищает от мертвых соединений после рестарта PostgreSQL, NAT/LB idle timeout или сетевого разрыва. pool_recycle ставьте меньше idle timeout вашей инфраструктуры, например 1800 при лимите 60 минут. Вывод: Пул соединений - не ускоритель, а предохранитель, который ограничивает давление на PostgreSQL и делает отказ контролируемым.

Детерминированная сборка Python-контейнера: это когда образ из одного и того же коммита сегодня и через месяц получает один и
Детерминированная сборка Python-контейнера: это когда образ из одного и того же коммита сегодня и через месяц получает один и тот же набор зависимостей. pip install -r requirements.txt сам по себе такого не обещает. Может уехать транзитивная зависимость. Может появиться другой wheel под вашу платформу. Может внезапно собраться sdist. Может поменяться Python в base image или состояние package index. Рабочая схема выглядит так: 1. pyproject.toml описывает намерения. 2. uv.lock фиксирует конкретное разрешение зависимостей. 3. wheelhouse фиксирует installable-артефакты. 4. Runtime-стадия ставит зависимости без доступа к индексу. Пример Dockerfile:
FROM python:3.12-slim AS wheels

COPY --from=ghcr.io/astral-sh/uv:latest /uv /usr/local/bin/uv

WORKDIR /app
COPY pyproject.toml uv.lock ./

RUN uv export \
    --frozen \
    --no-dev \
    --no-hashes \
    --format requirements-txt \
    -o requirements.txt

RUN python -m pip wheel \
    --requirement requirements.txt \
    --wheel-dir /wheelhouse \
    --only-binary=:all:

FROM python:3.12-slim AS runtime

WORKDIR /app

COPY --from=wheels /wheelhouse /wheelhouse
COPY --from=wheels /app/requirements.txt /requirements.txt

RUN python -m pip install \
    --no-index \
    --find-links=/wheelhouse \
    --requirement /requirements.txt \
 && rm -rf /wheelhouse

COPY . .

CMD ["python", "-m", "app"]
На что я бы тут смотрел в первую очередь. uv export --frozen не обновляет lock-файл. И это хорошо. Если pyproject.toml и uv.lock разъехались, сборка должна упасть, а не молча «починить» зависимости прямо внутри Docker build. wheelhouse убирает из runtime-сборки режим «сходить в интернет и скачать что получится». Вместо этого pip ставит заранее подготовленные артефакты. Runtime-слой уже не зависит от PyPI, зеркала, yanked-релизов и сетевых флуктуаций. --only-binary=:all: тоже не случайная опция. Она запрещает внезапную сборку из sdist. Если пакет требует компиляции, лучше явно вынести это в controlled build-стадию, чем потом ловить разные wheel из-за версии компилятора, системных библиотек или base image. Что ещё помогает против дрейфа: - коммитить uv.lock; - в CI проверять установку через uv sync --locked или сборку через uv export --frozen; - не запускать uv lock внутри Docker build как часть обычной сборки; - пиновать base image не только по тегу, но и по digest; - собирать wheelhouse под тот же Python minor, ABI и семейство образа, что и runtime; - финальную установку делать с --no-index; - хранить wheelhouse как CI-артефакт или собирать его строго из lock-файла. Я это обычно делю на два слоя. Lock-файл защищает resolution layer: какие версии выбрали. Wheelhouse защищает artifact layer: какие именно файлы потом установили. Если нужен не «примерно воспроизводимый» контейнер, а контролируемая сборка, нужны оба уровня.

Backpressure в async Python-сервисах: как bounded queues, лимиты конкуренции и таймауты останавливают cascading failure Это н
Backpressure в async Python-сервисах: как bounded queues, лимиты конкуренции и таймауты останавливают cascading failure Это не микрооптимизация, а механизм выживания backend-сервиса под нагрузкой. В production проблема часто появляется на медленном downstream: таски плодятся, память растет, клиенты ретраят, и падает уже цепочка сервисов. Очередь не должна быть бесконечной asyncio.Queue() без maxsize часто превращает память процесса в скрытый буфер аварии. Делайте очередь bounded и решайте, что делать при переполнении: ждать, вернуть 429/503 или отбросить низкоприоритетную работу. Лимитируйте конкуренцию Async не означает “можно запустить 100k запросов к API или базе”.
queue = asyncio.Queue(maxsize=1000)
limit = asyncio.Semaphore(50)

async def submit(item):
    try:
        queue.put_nowait(item)
    except asyncio.QueueFull:
        raise Overloaded()

async def worker():
    while True:
        item = await queue.get()
        try:
            async with limit:
                await asyncio.wait_for(
                    process(item),
                    timeout=2.0,
                )
        finally:
            queue.task_done()
Здесь Queue(maxsize=1000) ограничивает память, Semaphore(50) защищает downstream, а timeout не дает зависшим операциям держать слоты навсегда. Типичная ошибка Плохая стратегия - принять все, сложить в память и надеяться “потом разгребем”. Под нагрузкой надежнее явно деградировать: 429/503, bounded wait, circuit breaker, durable queue для допустимых сценариев. Что измерять Минимум: размер очереди, время ожидания, rejected/dropped, saturation семафоров и пулов, timeout rate, latency downstream и retry rate. Без этих метрик backpressure превращается в догадку. Вывод: Надежный async-сервис ограничен по памяти, конкуренции и времени ожидания, иначе он становится усилителем cascading failure.

Совет на ближайшие годы — изучайте ВАЙБ-КОДИНГ ИИ уже пишет код, чинит баги, генерирует тесты, документацию и помогает запуск
Совет на ближайшие годы — изучайте ВАЙБ-КОДИНГ ИИ уже пишет код, чинит баги, генерирует тесты, документацию и помогает запускать продукты быстрее, чем это делали классические команды разработки. И это уже не "будущее когда-нибудь", а реальность, которая меняет рынок уже сегодня И те, кто научится вайбкодить сейчас, будут увереннее конкурировать на рынке и зарабатывать больше тех, кто по-прежнему делает всё вручную. Стартовать с нуля поможет канал Вайб-кодинг. Там ребята круглосуточно мониторят более 320 российских и зарубежных источников и публикуют только главное: релизы, инструменты, гайды, курсы и практические кейсы. Подписывайтесь, нас уже 45 тысяч: @vibecoding_tg

Утечки памяти в долгоживущих Python-сервисах: как отличить retention leak от native allocations и поведения аллокатора В prod
Утечки памяти в долгоживущих Python-сервисах: как отличить retention leak от native allocations и поведения аллокатора В production рост RSS у API, воркера или data pipeline не всегда означает забытый объект в списке. Частая ошибка - смотреть только на график памяти и сразу чинить GC, не выяснив, кто реально удерживает или выделяет память. 1. Начинайте с tracemalloc tracemalloc хорош для Python-level аллокаций: кэши без eviction, глобальные dict/list, closures, task-и, references из metrics/tracing.

    
        import tracemalloc

tracemalloc.start(25)
base = tracemalloc.take_snapshot()

def dump_memory_diff():
    global base
    cur = tracemalloc.take_snapshot()
    for stat in cur.compare_to(base, "lineno")[:10]:
        print(stat)
    base = cur{}
    
Практический совет: в сервисе повесьте такой dump на admin endpoint, signal handler или debug job и сравнивайте snapshot-ы после прогрева, а не сразу после старта. 2. Подключайте Memray для native слоя Если RSS растёт, а tracemalloc почти стабилен, смотрите C/Rust extensions: numpy, pandas, cryptography, grpc, драйверы БД, compression libs.

    
        memray run -o memray.bin python -m app
memray flamegraph memray.bin
memray table memray.bin{}
    
tracemalloc отвечает: какие Python allocation sites выросли. Memray помогает увидеть, где выделялась память, включая native allocations. 3. Не путайте leak и аллокатор CPython может освободить объекты, но RSS не обязан сразу упасть: pymalloc, arenas, pools и system malloc держат память для повторного использования. Проверка гипотезы:
    
        PYTHONMALLOC=malloc python -m app
PYTHONMALLOCSTATS=1 python -m app{}
    
Предупреждение: если при PYTHONMALLOC=malloc профиль резко меняется, это может быть fragmentation / allocator behavior, а не retention leak. Порядок диагностики * зафиксируйте RSS, heap, GC stats, размеры кэшей, очередей и pools; * воспроизведите сценарий: прогрев - стабильный traffic - подозрительный endpoint или job; * сравните tracemalloc, Memray и метрики приложения; * чините конкретный owner памяти, а не абстрактный “memory leak”. Вывод: Надёжная диагностика утечек начинается не с GC, а с разделения Python retention, native allocations и поведения аллокатора.

АЙТИШНИКИ БЕСПЛАТНОЕ ОБУЧЕНИЕ сборник курсов, инструментов и книг Проект «TERMINAL» стал крупнейшей библиотекой бесплатного о
АЙТИШНИКИ БЕСПЛАТНОЕ ОБУЧЕНИЕ сборник курсов, инструментов и книг Проект «TERMINAL» стал крупнейшей библиотекой бесплатного образования. В одном канале собраны курсы, книги, полезные инструменты и практические тренажёры для всех разработчиков 🎓 Практические курсы и задания 🪽 Книги и статьи известных авторов 😮‍💨 Полезные инструменты и ресурсы 🌟 IT-новости и инсайды Обучение по всем направлениям: SQL, Python, Frontend, PHP, C++, Golang, GIT, Linux, QA, Java, Vibe-coding, Infosec и др. Ценишь знания, подпишись: Terminal_tg