Валерий | AQA Engineer | Автотестирование на Python | REST, gRPC, GraphQL
Відкрити в Telegram
Сделаю из тебя крутого AQA инженера на Python. • Преподаю лучшие тренинги по автоматизации тестирования API • Senior Python developer | AQA lead, 7 лет в IT
Показати більше1 511
Підписники
-124 години
+47 днів
+830 день
Архів дописів
Чем генераторы отличаются от итераторов?
Итераторы и генераторы в Python — это два связанных, но различных концепта, которые используются для работы с последовательностями данных.
Обе конструкции могут эффективно работать с данными, не загружая все значения в память одновременно, но это зависит от контекста и способа хранения последовательности.
ИТЕРАТОРЫ:
Итераторы могут быть реализованы разными способами. Но это зависит от конкретной реализации итератора. Если вы создадите итератор, который возвращает элементы из списка, то все элементы списка будут храниться в памяти, так же как если бы вы использовали сам список.
Рассмотрим простой пример:
1. Классический итератор:
class MyIterator:
def __init__(self, data): # Исправлено: init на __init__
self.data = data
self.index = 0
def __iter__(self): # Исправлено: был iter, стало __iter__
return self
def __next__(self): # Исправлено: был next, стало __next__
if self.index < len(self.data):
result = self.data[self.index]
self.index += 1
return result
else:
raise StopIteration
# Создание итератора с загруженным списком
my_iter = MyIterator([1, 2, 3])
ГЕНЕРАТОРЫ:
Генераторы, как правило, более удобны и эффективны, когда вам нужно обрабатывать данные по одному за раз или при создании последовательностей, состоящих из значений, которые нельзя или нецелесообразно хранить в памяти целиком. Они «вычисляют» следующий элемент только по запросу. Например, в генераторе при помощи yield значение создается и возвращается, когда вызывается next().
2. Генератор:
def my_generator(n):
for i in range(n):
yield i
# Использование генератора
gen = my_generator(10) # Необходимо указать число для генерации. Добавлено 10 в качестве примера.
Еще один интересный момент
Как вы думаете, чем отличается следующее выражение?
# Список
a_list = [_ for _ in [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]]
print("Размер списка:", sys.getsizeof(a_list))
# Список
b_list = [_ for _ in range(10)]
print("Размер списка:", sys.getsizeof(b_list))
Спойлер: ничем, по размеру они будут одинаковыми, потому что, несмотря на то что мы используем range, данный генератор создаёт последовательность.
Размер списка: 184
Размер списка: 184
А как же нам сделать так, чтобы хранилась не полностью сгенерированная последовательность, а лишь размер необходимой последовательности, чтобы следующее значение вычислялось на лету?
Все довольно просто:
c_iter = iter(range(10))
print("Размер итератора:", sys.getsizeof(c_iter))
Теперь смотрим результат.
Размер списка: 184
Размер списка: 184
Размер итератора: 48
А если мы сделаем так:
# Список
b_list = [_ for _ in range(10000)]
print("Размер списка:", sys.getsizeof(b_list))
# Итератор
c_iter = iter(range(10000))
print("Размер итератора:", sys.getsizeof(c_iter))
То разница в памяти окажется уже гораздо больше, так как итератор хранит лишь текущее состояние и не требует памяти для хранения всех значений.
Размер списка: 85176
Размер итератора: 48
ИТОГ:
b_list у нас грубо эквивалентен записи:
for i in [0, 1, 2, ..., 9999]:
print(i)
А c_iter — это:
def generator(n=10000):
for i in range(n):
yield i
for i in generator():
print(i)
То есть, используя оператор yield, мы вычисляем следующее значение, а не храним его в памяти. (На самом деле, я написал функцию generator для наглядности, потому что range по сути делает то же самое — возвращает значения по одному, когда это необходимо).
Итоги:
- И итераторы, и генераторы могут эффективно обходить наборы данных, не загружая их все в память сразу, но это зависит от способа хранения данных.
- Генераторы более естественны и удобнее для случаев, когда нужно вычисление значений на лету.Всем привет!
Как вы знаете я люблю делать скидки на обучение в рандомный момент, но главное заранее, чтобы те кто регулярно читает мой канал и самые быстрые имели какое-то преимущество))
А так как мне лень писать новый пост, я скопипащу старый, но вставлю туда новый промокод 😁
🌟 Учись у лучших и становись экспертом в автоматизации тестирования! 🌟
Ты уже имеешь опыт в тестировании, но чувствуешь, что не хватает знаний для уверенной работы с фреймворками и решениями? Хочешь прокачать свои навыки до уровня разработчика и научиться создавать автотесты для REST API?
Курс "Advanced" — это идеальное решение для тебя! 🎯
🔍 Что тебя ждет:
- Уникальное учебное приложение на основе микросервисов для практики интеграционных тестов
- Обучение ряду паттернов проектирования для эффективной автоматизации
- Углубление технической компетенции с акцентом на реальную разработку
- Умение находить и устранять неисправности веб-приложений
- Мощный, но простой инструмент для написания автотестов на Python
- Практика, практика и еще раз практика! Мы будем изучать только самое необходимое для достижения результатов.
💡 Специальное предложение! Только до 26 октября у тебя есть возможность получить 15% скидку на курс по промокоду
ADVANCED26. Не упусти свой шанс стать настоящим мастером автоматизации!
🌐 Узнай все подробности и запишись на курс уже сейчас! Это твой шанс начать новую главу в карьере тестировщика! ⏳
🚀 Будь на шаг впереди — стань экспертом в автоматизации!
Старт обучения уже 4 ноября.
Поэтому тем, кто подписан на канал и хочет получить скидку за свою скорость велкам.
https://it-wizard.pro/rest_api_advancedДавайте в этом посте поговорим о RPC и о том, чем все это добро отличается от REST API.
1. Unary RPC: самый распространенный тип RPC — это Unary RPC, при котором запрос отправляется от клиента к серверу, за которым следует одно ответное сообщение. Он включает в себя тело запроса и тело ответа, то есть его можно сравнить с REST API, построенным только на POST-запросах. Для получения, создания, изменения и удаления сущностей в базовом варианте нам нужен только один тип RPC — это Unary.
2. Серверные стримы (Server Streaming): этот метод включает одно тело запроса, а в качестве ответа возвращается поток (stream) сообщений. Представьте, что мы ищем что-то в базе данных, и используем REST-запрос с пагинацией (с параметрами limit и offset). Метод Server Streaming сам будет отправлять данные согласно лимитам, пока не получит все записи, и для этого нам достаточно выполнить RPC один раз. В случае с REST, нам бы пришлось отправить множество запросов, прежде чем получить все данные.
3. Клиентские стримы (Client Streaming): Клиент отправляет поток данных на сервер, и сервер отвечает одним ответом после получения всех данных. Ситуация здесь противоположная предыдущей. Представьте, что у нас накопилась очередь в Kafka (посты про нее были выше) для регистрации пользователей в системе. Мы создаем клиентский стрим, который обрабатывает по одному сообщению и добавляет пользователей в БД. После того как вся очередь обработана, мы получаем один ответ о том, что все пользователи успешно зарегистрированы. В REST нам бы пришлось отправить запросы, равные количеству пользователей, или регистрировать пользователей пачками, чтобы избежать тайм-аутов.
4. Двунаправленные стримы (Bidirectional Streaming): в этом режиме клиент и сервер могут обмениваться потоками данных независимо друг от друга. Это полезно для приложений с двусторонним постоянным обменом данными, таких как чаты или видеоконференции.
На этом, пожалуй, все про RPC.
В следующем посте поговорим о протофайлах.
+1
Сегодня хочу поделиться с вами постом, который встретил в линкед ин.
В общем и целом там говорится о том, что прогать нынче это стандарт отрасли.
Мое мнение, что связано это с развитием нейросетей, что теперь зная азы ЯП можно из говна и палок накидать рабочий код.
У меня жена ручной тестировщик , прекрасно знает как тестить API, знает SQL.
Я как заботливый муж отправляю ее посматривать вакансии 🤣 и сам убедился, что каждая вторая подразумевает знать один из популярных ЯП.
Практически каждый день я ей говорю, что надо учиться прогать, пока от тестировщика еще не требуется запускать ракеты в космос)
А че вы думаете по этому поводу?
Почему protobuf занимает меньше места?
Когда вы используете Protocol Buffers (протобаф) для сериализации данных, значения полей будут закодированы в бинарном формате. Это означает, что, когда вы создадите экземпляр сообщения и сериализуете его, он не будет выглядеть как текст, а будет представлен в виде последовательности байтов.
Контракт
Рассмотрим пример такого контракта описанного в протофайле person.proro
message Person {
string name = 1;
int32 id = 2;
bool has_ponycopter = 3;
}
Схема такого JSON сообщения будет выглядеть так:
{
"id": "integer",
"name": "string",
"has_ponycopter": "boolean"
}
Пример кода
import person_pb2
# Импортируем сгенерированный файл по протофайлу person.proto, содержащий класс Person
# !Про генерацию кода будет в следующих постах!
# Создаем экземпляр сообщения Person
person = person_pb2.Person(
id=1,
name="Alice",
has_ponycopter=True
)
# Сериализуем в бинарный формат
serialized_data = person.SerializeToString()
# Выводим бинарные данные в виде байтов
print(serialized_data)
# Сериализованные данные
# После выполнения кода выше результатом serialized_data будет нечто вроде:
b'\n\x05Alice\x10\x01\x18\x01' # 11 байт
# В формате json данная строка будет выглядеть так:
b'{"id":1,"name":"Alice","has_ponycopter":true}' # 45 байт
То есть, даже просто на глаз видно, что сообщение стало короче примерно на треть.
Как это работает
- b'\n' указывает на поле name (с номером 1) и тип string. После идет длина строки, в данном случае 5 (\x05), а затем непосредственно сама строка "Alice".
- \x10 указывает на поле id (с номером 2) и тип int32, за которым следуют 4 байта (но так как значение 1 помещается в один байт, мы видим просто \x01).
- \x18 указывает на поле has_ponycopter (с номером 3) и тип bool, за которым следуют 1 байт (\x01 для true).
Размер данных
Такое бинарное представление занимает значительно меньше места, чем текстовая форма, так как оно не содержит лишних символов и информацию о типах и ключах содержит в своем формате, что делает его более эффективным для передачи и хранения.
Большие сообщения
Давайте изменим определение сообщения так, чтобы оно включало поле с повторяющимся типом Person.
message People {
repeated Person persons = 1;
}
message Person {
string name = 1;
int32 id = 2;
bool has_ponycopter = 3;
}
Схема аналогичного JSON сообщения будет выглядеть так
{
"persons": [
{
"id": "integer",
"name": "string",
"has_ponycopter": "boolean"
}
]
}
Давайте заполним наши сообщения и посмотрим:
import people_pb2
# Создаем экземпляр сообщения People
people = people_pb2.People(
persons=[
person_pb2.Person(
id=1,
name="Alice",
has_ponycopter=True
),
person_pb2.Person(
id=2,
name="Bob",
has_ponycopter=False
),
person_pb2.Person(
id=3,
name="Charlie",
has_ponycopter=True
),
]
)
# Сериализуем в бинарный формат
serialized_data = people.SerializeToString()
# Выводим бинарные данные в виде байтов
print(serialized_data)
# Сериализованные данные
# После выполнения кода выше результатом serialized_data будет нечто вроде:
serialized_data = b'\n\x1e\n\x05Alice\x10\x01\x18\x01\n\x03Bob\x10\x02\x18\x00\n\x07Charlie\x10\x03\x18\x01'
# JSON cообщение выгляделобы так
json_data = b'{"persons":[{"id":1,"name":"Alice","has_ponycopter":true},{"id":1,"name":"Alice","has_ponycopter":true},{"id":1,"name":"Alice","has_ponycopter":true}]}'
print(len(serialized_data)) # 35 байт
print(len(json_data)) # 151 байт
Разница в размере сообщения отличается в 4.3 раза.
Таким образом, все объекты, будут упакованы компактно. Это очень эффективно.
Для самых любознательных, каким образом происходит кодирование размещу в треде под постом.Всех с пятницей!
В следующем посте хочу объяснить почему сообщения protobuf занимают меньше места в сравнении с теми же джейсонами.
Пост получится очень тяжелый для понимания, будем разбираться как кодируются протобаф сообщения.
А так как в пятницу не хочется особо сильно напрягаться, перенесем данный пост на понедельник)
Всем хороших выходных)
Продолжаем про gRPC, часть с кодом будет обязательно, а пока все-таки теория.
gRPC - Часть 2
Особенности.
1. Поддержка HTTP/2:
- Многопоточность (Multiplexing): HTTP/2 позволяет отправлять несколько запросов одновременно по одному соединению. Это устраняет необходимость в дополнительных соединениях для каждого запроса, что снижает накладные расходы.
- Сжатие заголовков: HTTP/2 использует механизм сжатия заголовков, что позволяет значительно уменьшить размер передаваемых данных.
- Долговременные соединения: HTTP/2 поддерживает долговременные соединения, что упрощает поддержание постоянных соединений между клиентом и сервером, сокращая время на повторное установление соединения.
2. Использование Protocol Buffers (protobuf):
- Компактность: Protobuf обеспечивает компактную бинарную сериализацию данных, что уменьшает размер сообщений и ускоряет их передачу.
- Генерация кода: На основе файлов .proto автоматически генерируются классы для клиентов и серверов. Это упрощает разработку и минимизирует вероятность ошибок.
- Удобство в использовании: Протокол буферов поддерживает многие языки программирования, что облегчает разработчикам работу с данными.
3. Типы RPC:
- Униарные RPC (Unary): Простой запрос, за которым следует один ответ. Это наиболее распространенный тип RPC, он подходит для стандартных запросов и ответов.
- Серверные стримы (Server Streaming): Клиент отправляет запрос, и сервер отвечает потоком данных. Это полезно, когда нужно передать большой объем данных частями.
- Клиентские стримы (Client Streaming): Клиент отправляет поток данных на сервер, и сервер отвечает одним ответом после получения всех данных.
- Бидунаправленные стримы (Bidirectional Streaming): Клиент и сервер обмениваются потоками данных независимо друг от друга. Это полезно для реализаций с двусторонним постоянным обменом данными, таких как чаты или видеоконференции.
4. Автоматическая генерация кода:
- Описание сервиса: Файл .proto описывает структуру сообщений и определяет сервисы и методы. На основе этого описания автоматически генерируется код для клиента и сервера на выбранном языке программирования.
- Повышенная продуктивность: Это сокращает время разработки и уменьшает количество ошибок, связанных с ручным написанием кода.
5. Безопасность и шифрование:
- SSL/TLS: GRPC поддерживает шифрование данных с использованием стандартов SSL/TLS, что обеспечивает защиту данных при передаче.
- Аутентификация и авторизация: GRPC предоставляет механизмы для аутентификации пользователей и авторизации доступа к ресурсам, что упрощает реализацию безопасных систем.
6. Балансировка нагрузки:
- Интеграция с различными решениями: GRPC поддерживает интеграцию с различными системами балансировки нагрузки, что помогает распределённым системам масштабироваться и обеспечивать высокую доступность.
7. Поддержка временных ограничений и отмены вызовов:
- Timeouts and Cancellations: Клиенты GRPC могут указать таймауты для вызовов RPC, и в случае превышения этого времени вызовы автоматически отменяются. Это помогает избежать зависания в ожидании ответов от сервера.
8. Обработка ошибок:
- Управление статус-кодами: GRPC использует набор статус-кодов для обозначения различных типов ошибок, которые могут возникнуть при выполнении RPC. Это облегчает отладку и обработку ошибок.
9. Поддержка метаданных:
- Передача метаданных: GRPC позволяет передавать метаданные вместе с запросами и ответами. Это полезно для передачи дополнительной информации, такой как токены аутентификации или другая сервисная информация.
10. gRPC также предоставляет инструменты для мониторинга и отладки, такие как gRPC Health Check Protocol и gRPC Interceptor API, что позволяет проще отслеживать и обрабатывать ошибки при работе с приложениями на основе gRPC.
11. Наконец, gRPC предоставляет возможность создания и использования Middleware, что позволяет добавлять дополнительных функций на обеих сторонах клиента и сервера без изменения существующего кода. Это позволяет разработчикам быстро и просто реализовывать новые возможности в приложении на основе gRPC.
Привет, как я и обещал, начинаю тему по gRPC постов думаю будет не менее 6, но это не точно)
Начинаем...
▎gRPC - Часть 1
gRPC — это высокопроизводительный фреймворк для создания распределённых приложений на основе RPC (Remote Procedure Call).
Это означает, что GRPC позволяет одной программе запрашивать выполнение процедуры (функции) на удалённом сервере так, как будто она выполняется локально.
Сегодня gRPC является одним из самых популярных фреймворков для разработки распределённых приложений и сервисов. Его используют множество компаний, включая Amazon, Netflix, Uber, OZON, AliExpress, Avito и многие другие.
В первый раз я столкнулся с gRPC примерно 3 года назад. Если в начале мне казалось, что писать тесты на него не сложнее, чем на REST, то я сильно ошибался.
▎Немного истории
GRPC был анонсирован Google в 2015 году как проект с открытым исходным кодом. Однако идеи и внутренние разработки, лежащие в его основе, восходят к Stubby, внутреннему RPC-фреймворку, который использовался внутри Google на протяжении многих лет.
До GRPC Google использовал Stubby, закрытую версию RPC-фреймворка. Оказавшись удачным внутри компании, концепции и идеи Stubby были улучшены и открыты миру в виде GRPC.
Сам протокол в основном используется для создания микросервисных приложений и API.
Если вы знаете такой язык, как Go (Golang), то он также был разработан в компании Google и отлично подходит для разработки gRPC-сервисов, что обеспечивает высокий прирост производительности.
Основная цель этой работы заключалась в создании высокопроизводительной, надёжной и универсальной системы RPC, которая могла бы использоваться в различных приложениях и на различных платформах.
Одной из ключевых особенностей gRPC является поддержка множества языков и платформ, включая C++, Java, Python, Ruby, Go, Node.js, C# и другие.
gRPC также является открытым проектом с открытым исходным кодом и находится под управлением фонда Linux Foundation, что гарантирует его долгосрочную поддержку и развитие.
Про остальные особенности поговорим в следующем посте.
Доп материал.
Привет)
Я уже более 6 лет занимаюсь автоматизацией тестирования, и, как вы знаете, сейчас я работаю разработчиком. Мне очень нравится делиться сложными темами, и я считаю, что Telegram идеально подходит для этого — здесь можно развернуться и создать посты от инженера для инженеров, не переживая о лайках и алгоритмах Instagram.
В сети много IT-блогов, где рассказывается о жизни айтишников, дорогих шмотках и путешествиях. Честно говоря, я тоже люблю делиться своими успехами! Но иногда слышу, как легко можно начать и зарабатывать 350К в наносекунду. Говорят, что можно объяснить сложные вещи простыми словами, но с развитием тестирования становится всё важнее уметь оперировать сложными темами и терминами. Примеры с котятами и пирожками могут быть хорошими для старта, но чем выше квалификация инженера, тем больше он должен уметь говорить на одном языке с коллегами.
Поэтому мне хочется объяснять сложные вещи с использованием соответствующих терминов, но делать это доступно. Я не стремлюсь упрощать всё до уровня пятилетнего ребенка, но верю, что чтение технической литературы и освоение терминологии значительно расширяет понимание и помогает быстрее вникнуть в настоящий технический язык.
В своих постах о Kafka я старался объяснить, как с ней работать, не скатываясь к примитивным объяснениям. Надеюсь, вам понравилось!
В следующих постах я планирую обсудить GRPC. Они также будут технически сложными, хотя восприятие всегда индивидуально. В любом случае, умение оперировать терминами технологии, с которой вы работаете или планируете работать, — это очень полезный навык.
Собрал все посты о Kafka в одну удобную статью.
Теперь у вас под рукой руководство, которое поможет углубить знания и использовать кафка в автотестах.
Не упустите возможность быть в курсе самых актуальных материалов — подписывайтесь на канал и оставайтесь на волне новостей и полезной информации!
Со мной вы станете лютыми инженерами)
Привет)
В пятницу как всегда не хочется какой-то сложный контент, а вот для того, чтобы напомнить, что начало нового потока уже в понедельник прекрасно подходит)
Поэтому кто хотел но не успел, велкам)
https://t.me/AQA_Engineer/189
Как работать с кафка в автотестах.
Часть 7.
Пока писал пост про Кафку, совсем не уделил внимания продюсеру.
Работа с продюсером гораздо проще, чем с консумером. Чтобы отправить сообщения, достаточно сделать так:
Я взял первую попавшуюся библиотеку, например, kafka-python. Она предоставляет простой интерфейс для отправки сообщений в Kafka и обладает всеми нужными инструментами для настройки и обработки исключений.
Пример публикации сообщения в тестах на Python:
class Producer:
def __init__(self, bootstrap_servers):
self.producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode("utf-8"),
)
def send_message(self, topic, message):
self.producer.send(topic, message)
self.producer.flush()
def stop(self):
self.producer.close()
@pytest.fixture
def producer():
producer = Producer(KafkaSettings.BOOTSTRAP_SERVERS)
yield producer
producer.stop()
def test_kafka_producer(producer):
producer.send_message(
topic="service-example",
message={"message": "first message"},
)
producer.send_message(
topic="service-example",
message={"message": "second message"},
)
producer.send_message(
topic="service-example",
message={"message": "third message"},
)
P.S
value_serializer=lambda v: json.dumps(v).encode("utf-8")
Вот это делать обязательно, потому что сообщения публиковать нужно обязательно в виде байтовых строк.Всем привет!
Вчера я получил отзыв от нового ученика закончившего курс Advanced.
Что вынес я полезного для себя в этом отзыве, то что можно действительно накинуть информации по pytest хукам и возможно по xdist, потому, что у него есть свои особенности.
Что касается кодогена эта часть будет в ступени professional)
Больше отзывов тут
Как работать с кафка в автотестах.
Часть 6.
Всем привет, в понедельник я вам обещал уже кодик с тем как работать с кафка в автотестах.
Мы изучили достаточно много информации и теперь надеюсь полная картина сложится (но это не точно )))).
Ну-с, приступим. Для начала я реализую простой клиентский класс consumer используя библиотеку kafka-python.
class Consumer:
def __init__(self, topic, group_id, bootstrap_servers):
self.consumer = KafkaConsumer(
topic,
group_id=group_id,
bootstrap_servers=bootstrap_servers,
auto_offset_reset="latest",
value_deserializer=lambda m: json.loads(m.decode("utf-8")),
)
self.messages = []
self.stopped = False
"""
Дальше реализую метод, который будет принудительно останавливать консумер если мне это необходимо.
"""
def stop(self, timeout=10):
time.sleep(timeout)
if not self.stopped:
self.stopped = True
self.consumer.close()
"""
и дальше напишу функцию, которая будет просто сохранять необходимое количество сообщений, либо останавливаться если поступит сигнал от метода stop.
"""
def save_messages(self, message_count=10):
while not self.stopped:
try:
for msg in self.consumer:
if self.stopped:
break
self.messages.append(msg.value)
time.sleep(0.1)
except Exception as e:
print(f"Error consuming messages: {e}")
break
message_count -= 1
if message_count == 0:
self.stopped = True
break
Ну и дальше переходим к самому тесту.
"""Cоздаем фикстуру для запуска консумера"""
@pytest.fixture
def consumer():
consumer_instance = Consumer(
"topic-name",
"group-id",
KafkaSettings.BOOTSTRAP_SERVERS,
)
yield consumer_instance
consumer_instance.stop()
def test_kafka_consumer(consumer):
# Создаем поток который сохраняет сообщения
find_message_thread = Thread(target=consumer.save_messages, args=(100,))
# Запускаем
find_message_thread.start()
# Создаем поток который через десять секунд после старта консумера будет его останавливать
stop_thread = Thread(target=consumer.stop, args=(10,))
stop_thread.start()
# Здесь логика вызова метода который должен отправить сообщение в кафку
...
# Дожидаемся завершения потока остановки консумера
stop_thread.join()
# Дожидаемся завершения потока сохранения сообщений
find_message_thread.join()
for message in consumer.messages:
print(message)
Дальше в цикле мы уже можем найти необходимое сообщение.
Есть второй вариант, написать метод, который будет принимать callback функцию для поиска сообщений либо останавливаться по таймауту, например так.
def find_message(self, callback, timeout=10):
start_time = time.time()
while not self.stopped and time.time() - start_time < timeout:
try:
for msg in self.consumer:
if self.stopped:
break
if callback(msg.value):
return msg.value
time.sleep(0.1)
except Exception as e:
print(f"Error consuming messages: {e}")
break
return None
В данном случае есть вероятность, что тест отработает быстрее так как мы в процессе чтения топика сразу ищем нужное сообщение.
P.S таймаут time.sleep(0.1) ставить обязательно, иначе есть вероятность, что поток остановки консумера может не успеть вклиниться в процесс чтения сообщений и ваш тест зависнет, чтобы понимать, как это происходит нужно немного разбираться что такое GIL , но это уже совсем другая история.
Ставь реакцию если понравилась серия постов про Kafka 😊Привет, всем хорошей пятницы)
Пост про кафку допишу уже в понедельник, чтобы посмотрели со свежими мозгами)
А пока напомню, что уже 7 числа стартует 4 поток курса REST Advanced скидка по промокоду действует до завтрашнего вечера, поэтому кто хотел но откладывал самое время поторопиться.
Ну и еще раз, всем хороших выходных)
Работа с Kafka в автотестах.
Часть 5
В этом посте поговорим про офсеты (offsets)
Это очень важная тема, без понимания которой можно тупить “почему консумер вдруг не читает мои сообщения, я же вижу их в UI”
Управление сообщениями в Apache Kafka осуществляется с помощью смещений (offsets). Смещение представляет собой уникальный идентификатор для каждого сообщения в теме, который указывает на его позицию в партиции. Каждый потребитель (consumer) в группе отслеживает свою позицию чтения по смещению.
При чтении сообщений из топика Kafka сохраняет смещение последнего прочитанного сообщения внутри группы потребителей. Это смещение обновляется автоматически по мере чтения новых сообщений. Когда потребитель осуществляет следующую операцию чтения, Kafka будет предоставлять следующие сообщения начиная с сохраненного смещения.
Если потребитель перестает работать или перезапускается, он может использовать сохраненное смещение, чтобы продолжить чтение сообщений с того места, где остановился. Это гарантирует, что сообщения не будут повторно обработаны.
Чтение топика Kafka с начала осуществляется путем сброса смещения (offset) потребителя до начала партиции. Это означает, что потребитель будет читать сообщения с самого начала топика. Это полезно в случаях, когда нужно переобработать или перехватить все сообщения в топике.
Чтение топика Kafka с конца выполняется путем использования специального смещения "latest" в качестве начального смещения для потребителя. Это означает, что потребитель будет читать только новые сообщения, появляющиеся после запуска.
И самый частый вопрос, который возникает при автотестировании сообщений в Kafka, — как прочитать последние, например, 100 сообщений. Допустим, мы хотим протестировать вроде бы простой флоу, в котором дергаем ручку, и сообщение улетает в топик, и мы смотрим, что оно там появилось.
Кажется, что автоматизировать такой сценарий довольно просто: дергаем ручку, создаем консюмер и вычитываем сообщения от конца к началу, пока не найдем необходимое. Но не тут-то было))
Kafka не предназначена для чтения “задом наперед”. Более того, если ты уже прочитал сообщение этой консюмер-группой, и ты захочешь его просмотреть в следующем тесте, оно будет уже недоступно, потому что сохранится последний офсет, с которого твоего сообщения уже не будет. Можно, конечно, при подключении каждый раз создавать новую консюмер-группу, но в этом случае при создании новой консюмер-группы Kafka будет вычитывать сообщения с самого начала, а их может быть несколько сотен тысяч, и твой тест станет бесконечным.
Можно попробовать читать с определенного офсета, но для этого тебе нужно найти правильный офсет, с которого твое сообщение уже влетело в топик, что тоже довольно проблематично.
Есть третий вариант: дернуть ручку, подождать несколько секунд, подключиться к топику, найти последний офсет и с шагом -1 итерироваться в обратном порядке по топику, что тоже довольно геморройное занятие.
И четвертый вариант: сначала запустить консюмер, чтобы он читал только новые сообщения, дернуть ручку, дождаться сообщения, остановить консюмера. Это тот вариант, который я считаю самым предпочтительным. Но там тоже есть нюанс: Kafka не предназначена для чтения в моменте и остановки. Если мы ее подключаем, она будет слушать топик бесконечно, и твой тест тоже зависнет, поэтому в синхронном коде не обойтись без многопоточности, но об этом уже в следующем посте.
Напомню у меня есть серия бесплатных видео, в которых можно повторить основы Python.
Часть 4
В этой теме поговорим про консумер группы.
Как мы уже знаем консьюмер группа (consumer group) в Apache Kafka - это логическое понятие, которое объединяет несколько потребителей (consumers) для обработки сообщений из одной или нескольких тем. Консьюмеры внутри группы делятся между собой обработкой партиций темы.
Ключевые особенности консьюмер групп в Kafka:
1. Групповая потребляемость: Когда консьюмеры добавляются в группу, они автоматически формируют единую группу потребителей. Каждая партиция темы обрабатывается только одним потребителем из группы. Это обеспечивает параллельную обработку сообщений и позволяет достигать высокой скорости обработки данных.
2. Балансировка: Когда в группу добавляются новые консьюмеры или удаляются существующие, Kafka автоматически перераспределяет партиции темы между потребителями, чтобы обеспечить равномерное использование ресурсов и равномерную нагрузку на консьюмеров. Это позволяет эффективно управлять масштабируемостью и распределением работы внутри группы.
3. Идемпотентность: Каждая партиция в теме обрабатывается только одним потребителем из группы. Таким образом, каждое сообщение в теме будет обработано и прочитано только один раз. Это обеспечивает идемпотентность операций и гарантирует корректную обработку данных.
4. Устойчивость к отказам: Если консьюмер из группы отказывается или перестает работать, Kafka автоматически перераспределяет партиции темы между оставшимися потребителями в группе. Это позволяет обеспечить отказоустойчивость и непрерывную работу обработчиков сообщений.
Из особенностей, которые я заметил при работе с партициями в Kafka, это то что иногда в патрициях иногда возникает лаг, это возникает из-за того, что консумеры не успевают разгребать сообщения, у нас в этом случае необходимо ребутнуть сервис, чтобы все перераспределилось))
В следующей теме мы поговорим про оффсеты и потом можно будет наконец поговорить про код)
Вже доступно! Дослідження Telegram за 2025 — головні інсайти року 
