Валерий | AQA Engineer | Автотестирование на Python | REST, gRPC, GraphQL
前往频道在 Telegram
Сделаю из тебя крутого AQA инженера на Python. • Преподаю лучшие тренинги по автоматизации тестирования API • Senior Python developer | AQA lead, 7 лет в IT
显示更多1 511
订阅者
-124 小时
+47 天
+830 天
帖子存档
Про асинхронный код.
Часть 3
Мы с вами рассмотрели примеры выполнения многопоточного и асинхронного кода на Python.
В этом посте поговорим про GIL.
Многие критикуют питон за производительность, потому что у него мол не настоящая многопоточность, на самом деле так и есть и причина этому GIL.
GIL (Global Interpreter Lock) — это механизм Python, который позволяет в один момент времени работать только одному потоку. Это необходимо для чтобы упростить работу с памятью и синхронизацию потоков, чтобы не происходило сложной блокировки объектов, когда потоки пытаются в один момент времени использовать один объект, создавая deadlock.
GIL, это как замок на двери, который не позволяет нескольким потокам одновременно делать что-то. Представьте, что у вас есть один ключ, и все потоки дерутся за право его использовать. Это нужно, чтобы вся программа работала без сбоев: чтобы данные не перепутались и память правильно очищалась после того, как объекты перестают быть нужны.
Но есть и минусы и первое это ограничение производительности в многоядерных системах и многопоточных приложениях. Даже если программа создана для выполнения в нескольких потоках, по факту за раз выполняется только один.
Для обхода GIL разработчики используют альтернативные подходы, например, multiprocessing, что позволяет каждому процессу иметь свой собственный экземпляр интерпретатора Python и, следовательно, свой собственный GIL.
Если вы работали с xdist то там используется мультипроцессинг, именно по этому там самая главная проблема это создание фикстур, которые могли бы шарится на всю сессию запуска тестов, так как для каждого процесса xdist используется свое пространство имен и свой интерпретатор.
Гвидо ван Россум (Создатель питон если что), который придумал Python, говорит, что GIL - это компромисс. Он помогает делать программы на Python проще и безопаснее, потому что разработчикам не надо ломать голову над сложностями, связанными с одновременной работой нескольких потоков.
Сам не пробовал но в версии питона 3.13 появилась возможность вырубать GIL
В синхронном коде мы бы зависли примерно здесь:
def test_kafka(kafka, http_client):
value = 'some_value'
read_task = kafka.find_message(value)
# здесь бы мы зависли, потому что Kafka предназначена для работы вечно
В большинстве случаев без тестирования очередей можно обойтись и синхронным кодом. Но я рекомендую, если вы пишете проект с нуля или он не слишком большой, сразу закладываться на асинхронные задачи.Про асинхронный код
Часть 2
Представьте, что вы идете на кухню и готовите завтрак. Допустим, это занимает какое-то время, например:
- Пожарить яичницу — 3 минуты
- Сделать кофе — 3 минуты
- Сделать бутерброды — 3 минуты
Если вы будете делать это последовательно, то потратите 9 минут времени:
import time
def cook_egg():
# sleep имитирует время на готовку
time.sleep(3)
def cook_bread():
# sleep имитирует время на готовку
time.sleep(3)
def cook_coffee():
# sleep имитирует время на готовку
time.sleep(3)
if __name__ == '__main__':
cook_egg()
cook_bread()
cook_coffee()
Но такое сложно представить, и обычно люди готовят завтрак асинхронно. То есть, сначала вы ставите чайник, потом разогреваете сковороду. Пока греется сковорода и чайник, вы начинаете делать бутерброды. В момент переключения контекста (например, когда идете в холодильник за маслом), вы можете разбить яйца на сковороду. Затем вы можете вернуться к готовке бутербродов. Когда чайник вскипел, вы подходите, добавляете ложку кофе и заливаете кипятком. В это время яичница уже пожарилась, и вы можете её снять, забрать бутерброды и сесть довольными кушать.
Так как же сделать так, чтобы наш код работал аналогичным образом? Это довольно просто.
import asyncio
async def cook_egg():
# sleep имитирует время на готовку
await asyncio.sleep(3)
async def cook_bread():
# sleep имитирует время на готовку
await asyncio.sleep(3)
async def cook_coffee():
# sleep имитирует время на готовку
await asyncio.sleep(3)
async def main():
await asyncio.gather(
cook_egg(),
cook_bread(),
cook_coffee(),
)
if __name__ == '__main__':
asyncio.run(main())
Время на выполнение этого кода займет около 3 секунд.
Что же изменилось? Как вы заметили, появились ключевые слова async и await. У нас также используется асинхронное ожидание, что важно, поскольку синхронный код заблокирует поток управления программой, тогда как в асинхронной парадигме, называемой event loop, поток управления продолжает свое выполнение. Еще важно помнить, что мы не можем использовать асинхронные функции просто так в синхронном коде — для их запуска нужно использовать asyncio.run(main()). Также, если мы добавим async перед названием функции, это не сделает её асинхронной. В случае с HTTP-запросами нам нужно будет использовать специальную библиотеку, например, httpx.
pytest поддерживает асинхронный код, для этого нужно установить специальный плагин pytest-asyncio. И все библиотеки нашего фреймворка тоже придется поменять на асинхронные.
Чем же отличается асинхронный код от многопоточного в Python? Самый банальный ответ: синтаксис асинхронного кода гораздо приятнее глазу. Если посмотреть технически, потоки переключаются так, как это решает сам Python: например, 10 секунд мы жарим яйца, потом 10 секунд варим кофе, потом 10 секунд делаем бутерброды и так далее. В асинхронном коде корутина (асинхронная функция) сама передает контекст: например, мы поставили чайник и отдали контекст, нет смысла возвращаться к нему, пока он не закипит.
Как же нам использовать асинхронный код в автотестировании и зачем он нам? Например, в синхронном коде, если мы подпишемся на топик Kafka, то будем читать его вечно. В асинхронном коде мы можем создать асинхронную задачу и делать всё, что душе угодно, а в нужное время просто завершить эту задачу. В отличие от потока, это сделать гораздо проще.
@pytest.mark.asyncio
async def test_kafka(kafka, http_client):
value = 'some_value'
read_task = asyncio.create_task(kafka.find_message(value))
http_client.send_message_to_kafka(value)
if read_task.done():
assert await read_task == value
else:
read_task.cancel()
assert False, f"message {value} was not found"Тогда как в многопоточном или асинхронном режиме можно отправить множество запросов одновременно, не дожидаясь завершения предыдущего, например здесь мы делаем 10 потоков для отправки того же самого запроса:
import requests
import time
from concurrent.futures import ThreadPoolExecutor
def send_request(url):
response = requests.get(url)
return response.status_code
url = "https://petstore.swagger.io/v2/pet/findByStatus?status=available"
start_time = time.time()
with ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(send_request, url) for _ in range(100)]
results = [future.result() for future in futures]
end_time = time.time()
print(f"Вариант с ThreadPoolExecutor: {end_time - start_time:.2f} секунд")
Если запустить этот код то разница будет примерно следующая
Синхронный вариант: 10.23 секунд
Вариант с ThreadPoolExecutor: 2.15 секунд
Все это позволяет повысить производительность и скорость работы кода. Тем не менее, стоит отметить, что отладка многопоточного кода может быть гораздо сложнее, чем в случае синхронного.
На этом на сегодня все, дальше будем говорить про типы операций, GIL, асинхронку а пока, с вас лайк и подписка как говорится))Многопоточное программирование и зачем оно тестировщику
Часть 1.
В большинстве случаев тестировщику достаточно уметь писать простые скрипты в синхронной парадигме. Однако профессия тестировщика усложняется, и технологии, с которыми мы работаем тоже. Например, работа с gRPC стримами или брокерами сообщений станет гораздо удобнее при использовании асинхронного кода, но об этом позже.
В Python существуют несколько ключевых концепций:
- Синхронный код
- Асинхронный код
- Многопоточность
- Мультипроцессорность
1. Синхронный код
Синхронный код — это то, с чем тестировщик работает чаще всего. Это код простой как молоток. В синхронном подходе выполнение задач происходит последовательно, что означает, что программа ждет завершения одной задачи, прежде чем перейти к следующей. Например, если функция делает запрос к API, выполнение программы будет приостановлено, пока не будет получен ответ.
А теперь представим ситуацию, у нас есть тест.
@pytest.fixture(scope="session")
def auth_account_helper():
account_helper = AccountHelper()
account_helper.auth_client(
login=v.get("user.login"),
password=v.get("user.password"),
)
return account_helper
def test_get_v1_account_auth(auth_account_helper):
response = auth_account_helper.dm_account_api.account_api.get_v1_account()
with soft_assertions():
assert_that(response.resource.login).is_equal_to("vmenshikov")
Предположим, у вас есть авторизационный токен, который действителен 15-20 секунд. После истечения этого времени все ваши запросы будут завершаться ошибкой 401. Конечно, можно в начале каждого теста выполнять авторизацию, но если тест длится дольше 20 секунд, необходимо переавторизоваться в процессе выполнения теста. Как это определить?
Здесь на помощь может прийти многопоточность.
2. Многопоточность
Многопоточность в Python позволяет одновременно выполнять несколько потоков исполнения, что особенно полезно для задач, требующих параллельной обработки. Например, в одном потоке могла бы выполняться основная логика теста, а в другом потоке — обновляться авторизационный токен, скажем, раз в 10 секунд.
Например так:
import threading
import time
@pytest.fixture(scope="session")
def auth_account_helper():
account_helper = AccountHelper()
auth_thread = threading.Thread(target=auth_client_loop, args=(account_helper,))
auth_thread.daemon = True # чтобы поток завершился при завершении основного потока
auth_thread.start()
return account_helper
def auth_client_loop(account_helper):
while True:
account_helper.auth_client(
login=v.get("user.login"),
password=v.get("user.password"),
)
time.sleep(10)
Тем самым мы решим проблему с протуханием токена в фоновом режиме и при этом при обновлении токена не будет блокироваться основной поток выполнения программы.
Важно отметить, что это работает не для всех задач, а только для I/O-bound операций. Кроме того, в Python существует GIL (Global Interpreter Lock), который позволяет выполнять только один поток в каждый момент времени, но это будет темой для следующих обсуждений.
Еще один сценарий, когда могут понадобиться асинхронные или многопоточные решения, — это создание нагрузки на сервер, например, при нагрузочном тестировании. В синхронном коде следующий запрос будет отправлен только после получения ответа от предыдущего, например:
import requests
import time
def send_request(url):
response = requests.get(url)
return response.status_code
url = "https://petstore.swagger.io/v2/pet/findByStatus?status=available"
start_time = time.time()
for _ in range(100):
send_request(url)
end_time = time.time()
print(f"Синхронный вариант: {end_time - start_time:.2f} секунд")Большое спасибо за курс!
Однозначный жирный плюс курса то, что можно посмотреть на хороший подход к выстраиванию архитектуры тестов. Так, чтобы впоследствии это можно было легко поддерживать и расширять.
У меня есть небольшой коммерческий опыт в автоматизации, поэтому курс проходился легко.
Тем у кого его опыта нет можно просто поковырять тот же pydantic самостоятельно до курса и будет гораздо легче сдавать домашки.
Еще было очень круто посмотреть, как и где можно применять декораторы и контекстные менеджеры.
Плюс я узнала новые библиотеки для логирования и ассертов, которые раньше не применяла
Еще в чате классная “ламповая” атмосфера, если какие - то вопросы - помощь приходит очень быстро.
Плюс можно просто обсудить темы вне курса с коллегами)
Что можно было дополнить:
Сделать запуск CI в gitlab или Jenkins -например, каждый по инструкции может у себя локально развернуть.
Этот опыт сразу на работе можно применить.
Более глубоко разобрать объекты pytest - request, parser. Что в себе хранят, на каком этапе жизненного цикла pytest создаются итд.
Pytest марки, плагины и как их можно юзать.
Всем хорошего понедельника и продуктивной рабочей недели, начнем ее с отзыва на обучение)
Больше отзывов тут: https://t.me/aqa_engineer_community/11
TG-сообщество | Обучение |
Как и обещал завершаю серию постов про gRPC статьей в телеграф)
С вас лайк, ну а кто зашел сюда случайно, то подписка)
TG-сообщество | Обучение |
Всем привет)
Я чутка приболел, поэтому особо сил написать, что-то умное нет, если вы не знаете я планирую курс по базам данных, и по генерации кода проектов автотестов.
В рамках этой деятельности я чутка переписал и дополнил API, которое может стать неплохой песочницей, чтобы потренироваться писать интеграционные автотесты.
Ну и совершенно бесплатно делюсь с вами)
Register API - для регистрации и активации пользователя в системе
Mail API - для получения писем пользователей куда приходят токены для активации и смены информации пользователя
Auth API - для получения авторизационного токена с целью хождения например в Account API
Account API - позволяет изменять информацию о пользователе а так же удалять пользователя
Users API - имеет два кэшированных метода, позволяющие обработать ситуации с кэшом
Для почтового сервера так же есть frontend
Ну и для самого приложения тоже , но обычно для апи тестов в нем нет особой необходимости.
TG-сообщество | Обучение |
Привет, все самое основное для работы тестировщика с gRPC API мы с вами обсудили, у нас осталась только тема с ошибками.
В gRPC как и в REST реализацию отправки ошибок можно сделать по разному.
Рассмотрим первый вариант, когда у нас успешный статус и сообщение об ошибке в теле ответа, например:
// Для ответа с ошибкой описывается специальное поле c описанием ошибки.
message MessageResponse {
repeated Message messages = 1;
optional Error error = 2;
}
message Message {
bytes body = 1;
}
message Error {
ErrorCode code = 1;
string description = 2;
}
enum ErrorCode {
FirstError = 0;
SecondError = 1;
}
В Python тесте мы работаем в таком случае как с обычным сообщением:
response: MessageResponse = grpc_stub.SendMessage(...)
assert not response.HasField("error"), "Сообщение не должно содержать ошибку"
Второй случай - это когда у нас не успешное выполнение запроса и кастомное сообщение об ошибке, контракт которой так же описан в proto файле например:
// Для этого сценария у нас так же описывается мессадж сообщения, например
message RPCStatusDetails {
bool permanent_err = 1;
uint32 reconnect_after_ms = 2;
}
Работать в коде с таким типом ошибок сложнее, потому что нам нужно перехватить exception сериализовать его в объект RPCStatusDetails, и работать с ним дальше, например:
from grpc_status import rpc_status
try:
response = grpc_stub.SendMessage(...)
except grpc.RpcError as error:
status = rpc_status.from_call(error)
for detail in status.details:
if detail.Is(rpc_status_details_pb2.RPCStatusDetails.DESCRIPTOR):
error_info_detail = rpc_status_details_pb2.RPCStatusDetails()
detail.Unpack(error_info_detail)
wait_time_ms = error_info_detail.reconnect_after_ms
error = error_info_detail.permanent_err
assert wait_time_ms == 0
assert error is False
И третий вид это обычное исключение - самый базовый вариант.
Работа с ним может быть так же как в примере выше, но без распаковки сообщения.
import grpc
NOT_EXPECTED_STATUS = {grpc.StatusCode.PERMISSION_DENIED, grpc.StatusCode.RESOURCE_EXHAUSTED}
try:
response = grpc_stub.SendMessage(...)
except grpc.RpcError as error:
assert error.code not in NOT_EXPECTED_STATUS
В этом посте мы рассмотрели базовые варианты работы с исключениями в ответе сервера.
На этом серию постов по gRPC заканчиваю, если есть какие вопросы пишите)
Кстати решил обновить фотом канала как это делают другие айтишники как вам?
👍 - нормас фотка
💩 - верни фотку как было
🔥 - пост огонь
TG-сообщество | Обучение |Привет!
До старта обучения пятого потока курса REST API ADVANCED осталось всего несколько дней.
Приглашаю!
Курс подойдет тем, у кого есть небольшой опыт в автоматизации тестирования API, а также тем, кто умеет автоматизировать тестирование UI и хочет научиться проектировать фреймворк для API.
Специалисты с опытом автоматизации тоже найдут много интересного.
Как я люблю говорить: многие курсы по автоматизации построены на том, что API тестируется в довесок, и API фреймворк строится довольно простым и труднорасширяемым.
На моем курсе все 4 недели будут посвящены именно автоматизации тестирования API. Мы построим масштабируемый промышленный фреймворк.
В течение первой недели мы повторим теорию по REST API, напишем первые тесты, создадим клиентские классы и настроим автоматический запуск тестов в CI/CD на GitHub.
В течение второй недели мы будем изучать и применять паттерны проектирования, напишем логирующий клиент, создадим удобный доступ ко всем клиентам нашего проекта, опишем более удобные интерфейсы для используемых в тестах методов и научимся писать умные ожидания.
На третьей неделе мы подробнее поговорим о том, что проверять и как. Обсудим различные проверки, обычные ассерты, валидацию контрактов и значений, контекстные менеджеры, софт ассерты, научимся подготавливать тестовые данные и авторизовывать клиентов. Обязательные проверки мы опустим на более низкий уровень, что сделает наши тесты более чистыми и читаемыми.
Четвертая неделя будет посвящена работе с конфигурацией для прогона тестов на разных хостах, репортингу и настройке отчетов в CI/CD на GitHub.
Курс не подойдет тем кто не знает Python!
Но в закрепе канала есть несколько бесплатных занятий.
И так же есть курс Starter.
Кто хочет в Advanced, то старт уже 4 ноября.
В качестве демо прикладываю пару видео из второй недели занятий)
TG-сообщество | Обучение |
https://t.me/aqa_engineer_community
Мне тут внезапно показалось что нам может не хватать живого общения, поэтому состряпал чатик с темами, думаю там тоже можно будет обсуждать насущные проблемы да и просто общаться, надеюсь чат со временем обрастет полезными тредами, в общем велкам)
Repost from luchanos | Белый социальный инжиниринг
На собеседованиях часто спрашивают, мол, а чем генератор от итератора отличается?
Это один из тех вопросов, который задается некорректно и, на самом деле, показывает не вполне полную компетентность собеседующего, потому что генератор - это подвид итератора
Любой генератор является итератором, но не любой итератор - генератором
Так что же хотят услышать в ответ собеседующие? Они хотят, чтобы вы рассказали о том, что, дескать, генератор мы можем “высосать” в одну сторону и всё, баста! Никак назад вернуться мы не сможем. А итератор, мол, можно крутить и вертеть, как угодно
В этом есть только часть правды, потому что генератор имеет метод send, с помощью которого вы можете закидывать внутрь него значения, которые могут управлять контекстом его выполнения и отматывать хоть туда, хоть обратно - но при генерации StopIteration действительно генератор “всё”
Короче говоря, важно это все понимать и не лажать, когда вас пытаются подловить)
А вот пример кода генератора с использвоанием send:
def resettable_counter(start=0):
current = start
while True:
received = yield current
current += 1
# If a new start value is sent, reset the counter
if received is not None:
current = received
YouTube | Вступить в Balabol IT | TG-сообщество | DiscordОчень важное и полезное дополнение про отличие генераторов и итераторов.
В этом посте мы поговорим про метаданные в gRPC запросах.
Как вы знаете, в REST метаинформацию передают в заголовках.
Для этого в библиотеках
requests или httpx есть специальные параметры.
Для передачи метаданных в gRPC также существует специальный параметр в сгенерированных методах.
def run():
with grpc.insecure_channel('localhost:50051') as channel:
stub = helloworld_pb2_grpc.GreeterStub(channel)
request = helloworld_pb2.HelloRequest(name='you')
metadata = [('key', 'value')]
response = stub.SayHello(request=request, metadata=metadata)
print("Greeter client received: " + response.message)
Параметр metadata, из-за особенностей сгенерированного кода, не совсем очевиден, и найти его можно в «кишках» библиотеки gRPC.
Например:
class GreeterStub:
def __init__(self, channel: Union[Channel, Channel]) -> None: ...
SayHello: UnaryUnaryMultiCallable[
HelloRequest,
HelloResponse,
]
Здесь нужно зайти в аннотацию UnaryUnaryMultiCallable, и мы увидим следующее:
class UnaryUnaryMultiCallable(abc.ABC):
"""Affords invoking a unary-unary RPC from client-side."""
@abc.abstractmethod
def __call__(
self,
request,
timeout=None,
metadata=None,
credentials=None,
wait_for_ready=None,
compression=None,
):
"""Synchronously invokes the underlying RPC."""
Это абстрактный класс, который описывает интерфейс вызываемого метода, и в одном из параметров мы как раз и увидим metadata.
Что касается других параметров, не буду на них углубляться, так как из названия должно быть понятно, для чего они предназначены.
К вопросу о том, можно ли устанавливать метаданные для каждого запроса автоматически, чтобы не передавать их явно, это уже отдельная история. Для этого нам нужно будет написать свой интерсептор, который будет модифицировать каждый запрос.
Более подробно про интерсепторы можно почитать здесь.Всем привет, нынче тяжелая неделя, аж целых 6 рабочих дней )
Началась она для меня с приятного упоминания Артема Русова , не знаю подписан он на меня в тг или нет, но среди тьюторов по мануальному он один из тех людей, которым я тоже доверяю, поэтому принимать отметки от него приятно)
ПЫ.СЫ
У него миллиард всяких каналов, поэтому если кто-то до сих пор не подписан ни на один из них, то я нашел вот такую ссылку)
t.me/addlist/ZOMs7wwbAxsxZTQy
Напоминаю, завтра последний день скидки на обучение)
Кстати напомню, вы можете согласовать обучение у вашего работодателя, только желательно подсуетиться заранее , время еще есть)
Давайте теперь поговорим о том, как же нам выполнить RPC-процедуры и что для этого нужно.
Мы с вами закончили на протофайле.
Следующим шагом необходимо сгенерировать клиентскую часть. Для этого нужно сделать следующие действия.
Установить gRPC
pip install grpcio
Далее нам потребуется установить gRPC tools:
gRPC tools включают компилятор протоколов protobuf (protoc) и специальный плагин для генерации кода сервера и клиента из определений сервисов .proto.
pip install grpcio-tools
Следующим шагом нужно сгенерировать код.
Это делается следующей командой, в которой нужно указать пути к протофайлам и пути, где сохранить сгенерированный код:
$ grpc_tools.protoc -I../../protos --python_out=. --pyi_out=. --grpc_python_out=. ../../protos/helloworld.proto
Для протофайла:
service Greeter {
rpc SayHello (HelloRequest) returns (HelloReply) {}
rpc SayHelloAgain (HelloRequest) returns (HelloReply) {}
}
message HelloRequest {
string name = 1;
}
message HelloReply {
string message = 1;
}
У нас получится примерно следующее (ну и там, конечно, будет еще много кода для реализации серверной части):
class GreeterStub(object):
def __init__(self, channel):
self.SayHello = channel.unary_unary(
'/helloworld.Greeter/SayHello',
request_serializer=helloworld_pb2.HelloRequest.SerializeToString,
response_deserializer=helloworld_pb2.HelloReply.FromString,
_registered_method=True)
self.SayHelloAgain = channel.unary_unary(
'/helloworld.Greeter/SayHelloAgain',
request_serializer=helloworld_pb2.HelloRequest.SerializeToString,
response_deserializer=helloworld_pb2.HelloReply.FromString,
_registered_method=True)
После этого мы уже можем создать канал и отправить первый запрос.
def run():
with grpc.insecure_channel('localhost:50051') as channel:
stub = helloworld_pb2_grpc.GreeterStub(channel)
response = stub.SayHello(helloworld_pb2.HelloRequest(name='you'))
print("Greeter client received: " + response.message)
response = stub.SayHelloAgain(helloworld_pb2.HelloRequest(name='you'))
print("Greeter client received: " + response.message)
Обратите внимание, что в данном примере используется insecure_channel. В gRPC еще есть защищенный канал secure_channel. Для работы с ним нужно уметь устанавливать, например, SSL-сертификаты, но способ создания примерно такой же.
На этом про gRPC на сегодня все!Продолжаем тему по gRPC.
Что такое protofile?
Протофайл это такой файл который описывает схему целевого сервиса, без него невозможно сгенерировать код и создать gRPC сервис.
Если вы когда нибудь рассматривали swagger схему то примерно представляете, что это такое.
Грубо это структурированное текстовое описание, о том как называется сервис, набор его методов, входные и выходные контракты.
Давайте рассмотрим пример gRPC сервиса, который использовался у меня в курсе с gRPC.
// Синтаксис протофайла
syntax = "proto3";
// Название пакета, необходимо, если у нас есть зависимости от других протофайлов для понимания что они находятся в одном пакете, а так же разделения если у нас повторяются контракты и методы у двух API например V1 и V2
package account_proxy;
// Импорты типов из других протофайлов
import 'google/protobuf/empty.proto';
import 'google/protobuf/timestamp.proto';
import 'google/protobuf/wrappers.proto';
// Название сервиса
service AccountServiceProxy {
// Название метода (Модель запроса), (Модель ответа)
rpc Login(LoginRequest) returns (LoginResponse);
// Для пустого значения есть тоже свой тип, здесь он возвращается в ответе
rpc Logout(LogoutRequest) returns (google.protobuf.Empty);
// Клиентский стриминовый метод имеет в качестве реквеста слово stream
rpc RegisterAccountClientStream(stream RegisterAccountRequest) returns (RegisterAccountClientStreamResponse);
// Серверный стрим имеет в ответе слово stream
rpc GetAccountsServerStream(google.protobuf.Empty) returns (stream User);
// Дуплексный стрим имеет в реквесте и ответе слово stream
rpc GetAccountsByLoginDuplexStream(stream GetAccountsByLoginRequest) returns (stream GetAccountsByLoginResponse);
}
Теперь про сами контракты, я уже ранее приводил примеры их описания, но как говориться повторение, мать учения:
// Здесь все довольно просто, название модели(мессаджа), названия, типы полей и их айдишники (они нужны для кодирования соообщения, пост об этом был выше)
message LoginRequest {
string login = 1;
string password = 2;
bool remember_me = 3;
}
Стоит отметить как указывается модель с вложенными объектами и имеющая повторяющиеся объекты аналог листа с объектами.
message PagingResult {
int32 total_pages_count = 1;
int32 total_entities_count = 2;
int32 current_page = 3;
int32 page_size = 4;
int32 entity_number = 5;
}
message User {
string login = 1;
google.protobuf.StringValue medium_picture_url = 3;
google.protobuf.StringValue small_picture_url = 4;
google.protobuf.StringValue status = 5;
TimestampValue online = 7;
google.protobuf.StringValue name = 8;
google.protobuf.StringValue location = 9;
TimestampValue registration = 10;
}
// Здесь используется слово repeated, тобишь повторяется))
message GetAccountsResponse {
repeated User accounts = 1;
PagingResult paging = 2;
}
Аналог JSON будет такой:
{
"accounts": [
{
"login": "",
"medium_picture_url": "",
"small_picture_url": "",
"status": "",
"online": "2024-10-23",
"name": "",
"location": "",
"registration": "2024-10-23",
}
],
"paging": {
"total_pages_count": 0,
"total_entities_count": 0,
"current_page": 0,
"page_size": 0,
"entity_number": 0
}
}
В общем и целом описание такой схемы это первый шаг для создания gRPC сервиса.
Тестировщики и пользователи сервиса используют протофайлы для генерации gRPC клиента.
现已上线!2025 年 Telegram 研究 — 年度关键洞察 
