Where is data, Lebowski
Канал про разное в data-мире: - от библиотек визуализации до data egineering - от графиков до элементов разработки - от .csv до API
Show more221
Subscribers
No data24 hours
No data7 days
+830 days
- Subscribers
- Post coverage
- ER - engagement ratio
Data loading in progress...
Subscriber growth rate
Data loading in progress...
Photo unavailableShow in Telegram
🤘 Collab Practice
.
За время ревьюверства на курсах по Анализу данных и Data Science накопилось небольшое кол-во
Collab-ноутбуков с полезными штуками.
.
Делюсь, может кому-то пригодится:
1️⃣ Пропуски и их заполнение
2️⃣ Как не выстрелить себе в ногу при приведении типов данных
3️⃣ xticks или как самому накинуть на ось Х
4️⃣ Категоризация данных
.
Любые замечания или дополнения принимаются 🤝
#python #collab #practice
🔥 6
💪 Прокачай себя до уровня PRO или полезные лайфхаки
.
Воды не будет, только те, конструкции, которые сам нашел и часто использую в Python+Pandas:
1️⃣ Pandas метод assign
Когда нужно создать столбцы на лету:
import pandas as pd
df.assign(column_one = lambda row: row["existing_column"] // 1024,
column_two = 24,
....)
2️⃣ Pandas метод pipe
Как apply, только для всего датафрейма и принимает аргументы функции:
import numpy as np
import pandas as pd
def calculate(df: pd.DataFrame, thr: float = 2.56, columns: list = []):
for col in columns:
df[f"{col}_transformed"] = np.sqrt(df[col]) > 2.56
return df
df.pipe(calculate, thr=5, columns=["value_1", "value_2"])
3️⃣ Структуры данных из collections
Если не список и не датафрейм, то точно defaultdict. Обычно использую для сбора результатов в цикле:
from collections import defaultdict
res = defaultdict(list) # общая структура словарь, где для каждого ключа значением по-умолчанию будет пустой список
for idx, value enumerate([123, 999, 678]):
res["idx"].append(idx)
res["value"].append(value**2)
res["value"] # это список =)
Кейсы:
- подготовка данных для визуализации (например, отдать по API)
- сбор статистики при выполнении сложных SQL запросов
- ...
4️⃣ Модуль itertools
- batched
открыл для себя, когда сам сначала написал такую штуку (немножко изобрел 🚲)
- combinations
- итератор комбинаций из элементов списка
- zip_longest
- "длинный" брат zip
(если не знал, то zip проходится по самому короткому из списков)
5️⃣ Модуль functools - магия функционального программирования
- partial
- кажется, это самая часто используемая функцию из модуля (ну может после lru_cache()
)
import typing
import pandas as pd
import numpy as np
def func(df: pd.DataFrame, columns: list, method: t.Callable = np.mean, window_width: int = 2):
assert window_width == 2, "Ширина окна должна быть 2+!"
for col in columns:
df[f"{col}_rolling"] = df[col].rolling(window_width, min_periods=2).apply(lambda window: method(window))
return df
func_mean = partial(func, columns=["value_1", "value_2"], method=np.median)
# теперь можно так, остальные параметры уже частично применились выше
func_mean(df=df)
Кейсы:
- ресерч методов или алгоритмов для решения одной и той же задачи
- отправка уведомлений только нужным адресам (чтобы каждый раз не прописывать их в аргументах)
6️⃣ reduce
- применяет функцию из 2 аргументов итерабельно к списку, на выходе 1 значение
l = [0, 1, -5, 7, 8]
# x - первый элемент списка, или результат применения функции к текущему элементу, y - текущий элемент
res = reduce(lambda x, y: x+y, l)
print(res) # 11
.
И последнее (но это не точно): не забывайте заглядывать в доки, чтобы не городить монструозные преобразования, которые решаются специальным параметром. Помните, в казалось бы, простой функции pandas.read_csv - ~50 параметров, скорее всего ваш кейс уже там есть🙃
.
#python #lifehack👍 4🔥 1
Photo unavailableShow in Telegram
🎨 Сам себе Кандинский
.
Финалим серию постов про Clickhouse и наши разборки с погодой.
.
📒Рецепт приготовления:
- данные о погоде (вьюха со среднедневными данными + last_point)
- Metabase BI
.
Естественно, пришлось поработать напильником 🪚, тк Metabase из коробки не умеет ходить в Clickhouse, но
сообщество не дремлет и выкатили версию с его поддержкой.
.
Ссылку на версию Metabase + Clickhouse и внешний вид дашборда найдете в репе (или ниже).
.
🔗 Links:
- Metabase + clickhouse-driver
- Run Metabase in docker
- Get started in Metabase
Визуализация дело тонкое, проэтому если тебе нужно прокачать этот навык, то вот рекомендации:
- Бесплатный курс от DataYoga (Рома Бунин в создателях)
- Визуализация данных и введение в BI-инструменты от ЯПрактикум (Рома Бунин + Александр Богачёв - убийственное комбо) или можно почитать книгу Богачёва Графики, которые убеждают всех
.
#bi #clickhouse #dash #dashboard
👍 4
Распараллель меня, если сможешь
.
TaskFlow и динамический маппинг тасок в AirFlow. Эти две возможности
также открыли для меня коллеги. Стоит отметить, что программирование дагов для меня
неPythonic way - странный синтаксис, странная логика, как будто неPython внутри Python🤷♂️
.
Две вышеобозначенные возможности не добавляют питонячности, хотя и можно найти какие-то сходства с
функциональным программированием.
.
TaskFlow - набор удобных декораторов для создания дагов, тасок, как Python-функций. Теперь, вместо:
def etl():
pass
with Dag(...) as dag:
etl = Python(task_id="etl", python_callable=etl, ...., da=dag)
Можно написать:
@dag(....)
def create_dag():
@task
def etl():
pass
create_dag()
.
Красивый код получается, когда используются подходы в чистом виде: или классический или декораторы, когда классический стиль и TaskFlow смешиваются получается каша 🥣. TaskFlow даёт одно очевидное преимущество: TaskFlow сам заботиться о перемещении данных между input\output tasks, как если бы вы использовали обычные функции - это очень круто🤘 Пример ниже:
@task
def get_data_from_api() -> List[]:
...
return data
api_data = get_data_from_api()
@task
def etl(data: List[]) -> None:
# transfrom and save data
...
etl = etl(data=api_data)
.
Второй интересный паттерн: динамический маппинг тасок. Сегодня у тебя 1 хост, а завтра 7. Сегодня нужно 3 таски за разные дни, а завтра 8. Чтобы на зависеть от неизвестных входящих параметров следует брать на заметку динамический маппинг. Реализуется он довольно просто: у каждого оператора (таска) есть методы, например, partial
, expand
, что это нам дает:
1️⃣ Генерим несколько дат и выполняем etl за каждую
@task
def get_dates() -> List[str]:
return ['2024-02-01', '2024-02-04', '2024-02-03']
@task
def etl(dated_at: str) -> None:
# some code, which required from dated_at
dates = get_dates()
# метод expand - позволяет вызвать экземпляр таски для каждого элемента списка
etl = etl.expand(dated_at=dates)
2️⃣ На s3 N файлов, нужно однотипно все обработать или просто загрузить
@task
def find_s3_files() -> List[str]:
return [{'filepath': 's3://file_0.parquet'},
{'filepath': 's3://file_1.parquet'},
{'filepath': 's3://file_2.parquet'}]
files = find_s3_files()
# используем partial для задания аргументов одинаковых для каждой таски
# выполняем загрузку каждого файла отдельной таской
load = ClickhouseOperator.partial(
task_id="load",
connection_id="conn_id",
sql="""INSERT INTO stg.table SELECT * FROM s3('{{ params.filepath }})'""",
).expand(params=files)
.
Ранее такое приходилось реализовывать через циклы, а начиная с версии 2.3 у нас есть удобная возможность - используйте💪
.
Самое интересное, что маппить можно не только 1 -> N, но и Nin -> Nout, то есть рельтута одного маппинга подавать на вход другого и работать это будет именно так как ожидается: не сначала N первых тасок, а после N последующих. Таски свяжутся в последовательные кусочки: N1 -> N1, N2 -> N2 и тд 🔥
Набросал небольшой пример, как можно работать с API при помощи динамического маппинга - покрутите, экспериментируйте.
🔗 Links:
- TaskFlow
- Dynamic Task Mapping
- Create dynamic Airflow tasks (astronomer)
#airflow #taskflow #mapping🔥 4
Как много мы пишем boilerplate code🤨
.
Стоит сказать, что иногда и прикольно его пописать😉 Но в рабочем потоке задач, хочется, чтобы рутинные
действия были атоматизированы, а особенно создание нового дага, проекта или чего-то подобного.
.
На помощь придет библиотека cookiecutter, наконец-то до неё добрался. И снова здесь магия шаблонизатора
jinja2
😍
.
Рекомендую ознакомиться с jinja2
, мне отлично помог краткий курс, тк шаблонизация встречается всё чаще и чаще:
- шаблоны при веб-разработке (Flask\Django
)
- dbt = SQL over jinja2
- cookiecutter - печеньки 🍪, обернутые в jinja2
.
Напиши, где ты еще встречал шаблонизацию при помощи jinja2👀
.
Процесс создания шаблона выглядит очень просто:
- описываем структуру
- добавляем необходимые файлы
- внутри файлов можно указать шаблонный код и подстановку нужные значений при вводе
.
Официальной доки обычно хватает, но мне потребовалась статья, в ней показано дерево папок при создании шаблона.
.
Пройдя по этому пути, у меня получился простой шаблон для создания дага (но как водиться, он решает 80%), также поэкспериментировал с шаблоном для DA ресерча. Найти примеры тут
.
На просторах github существует большое разнообразие шаблонов под различные нужды, мне приглянулись:
- Python Best Practices
- pyscaffoldext-dsproject
.
#template #cookiecutter #jinja2❤ 1
Photo unavailableShow in Telegram
⭐ Это заслуживает 5 звёзд ⭐
.
Когда приходится использовать нетривиальное (или наоброт тривиальное)
определение JSON или неJSON, наподобие такого:
CASE WHEN (json_answer ->> 'answer') LIKE '%{%' THEN json_object_keys((json_answer ->> 'answer')::json)
ELSE Null END
.
То ожидаешь какого-то интересного кейса, но такого подвоха не ожидал. Одна строка ломает всё:
1 строка, 1 строка из 2млн, Карл
#de #work #буднидатаинженера😢 1
🔊 Митап, митап, митап
.
С частью команды DWH, во-первых встретились, во-вторых, встречу провели с пользой: сходили на митап по Greenplum от ЯCloud.
.
1️⃣ В первой части Дмитрий Немчин из Tinkoff поведал тайны обслуживания GP. Презантация была скорее обзорной, тк интереснее было послушать про то как устроены их небольшие фреймворки по обслуживанию, а названия у них очень забавные:
- отстрел
плохишей по различным правилам
- Raskolnokov
- управление вьюхами и ролевой моделью gpACL, gpViews
- управление жизненным циклом данных (удаление, охлаждение, партиционирование) Guillotine
- мониторинг запросов, очередей, нагрузок - Inquisitor
.
Понравился процесс работы с песочницей: данные в ней живут 21 день, после дропаются без разговоров - очень хочется внедрить такой процесс у нас, так sandbox - маленькое болотце. Есть и другой подход, который базируется не на времени, а на пропорции между кол-вом запросов в песочницу и в прод, если доля песочницы первыщает то идем разбираться.
.
2️⃣ Во второй части, Кирилл рассказал о расширении Yezzey - позиционируется как охлаждение данных в s3, для организации гибридного хранения. Но по результатам тестом выглядит так, будто можно все данных хранить в s3, а мощности GP только для Compute использовать, ну и диски будут только для spill файлов😁
Эта часть нас и заинтересовала, тк есть большая потребность в охлаждении данных с последующим их использованием.
.
Ну и конечно, не обошлось без технических шуток. Номинация приз вечера получила шутка: Вы всё еще используете HEAP таблицы 😉
#gp #greenplum #охлаждение #meetup👍 2🔥 1
Photo unavailableShow in Telegram
Пятничное радостное
.
Вот такой получился рефакторинг😍
#airflow #dag #рефакторинг
🔥 3🏆 2👍 1🦄 1
☝️ Последнее слово (но это не точно)
.
В предыдущих сериях по clickhouse-notes мы создали:
- таблицу сырья
- таблицу распарсенных данных и матвью, её наполняющую
- матвью с подневной статистикой и вьюху над ней
.
Дополним сет, данными последней точки: последние полученные данные. Например, нам пригодится для отображения
на дашборде последних значений.
.
Варианты есть различные, мы поступим так:
- создаём таблицу с движком ReplacingMergeTree,
odm.openweathermap_last_point
- чтобы хранить только последнюю запись (но не всё так просто)
- создаем MV для поиска этой последней записи, odm.openweathermap_last_point_weather_mv
.
Структура таблицы:
CREATE TABLE odm.openweathermap_last_point
(
last_ts DateTime,
temp Float64,
pressure Float64,
humidity Float64,
metric String
)
ENGINE = ReplacingMergeTree(last_ts)
ORDER BY metric
SETTINGS index_granularity = 8192
- ReplacingMergeTree(last_ts)
- оставляем только последнюю запись
- ORDER BY metric
- синтетическое поле, по его совпадению и "схлопываются" дубли
.
Полный код таблицы и MV в репе. А пока посмотрим на возможные артефакты:
- от СClickhouse у нас есть гарантия 100% схлопывания дублей
- это может произойти в любой момент времени (хоть сразу, после вставки дубля, хоть через полдня)
.
Что в такой ситуации делать:
- нативный метод - схлопнуть дубли в момент запроса, используя ключевое слово FINAL
SELECT *
FROM odm.openweathermap_last_point
FINAL
FINAL
- использование может быть очень накладно, если нужно схлопнуть довольно большой объем данных (не наш случай, поэтому мы им и воспользуемся). Чтобы скрыть от пользователя особенности технической реализации необходимо создать, например, odm.openweathermap_last_point_weather_v
.
Другие способы найдете в репе (сортировка и выборка 1 строки, использование argMax
).
🫶 Скоро зафиналим какой-нибудь клевый дашборд
#clickhouse #mv #replacingmergetree👍 1🔥 1
📔 Executable Jupyter Notebooks
.
Не редко (особенно в аналитических целях) разработка ведется в Jupyter Notebooks и несколько
больно переносить наработки в py-модули, в хотелось бы запустить что-то:
./my-notebook.ipynb
или run my-notebook.ipyb
. Например, в Databricks так и сделали, notebook является единицей job\task и исполняется под капотом.
.
Может и нам так можно🧐
.
Возможностей не так много, но они есть:
- сохранить jupyter как скрипт .py
. При таком сохранении часть кода специфичная для notebooks может быть не выполнена, например
display
или отдельно стоящий вывод: df.tail()
- библиотека papermill - вот оно наше сокровище, позволяет запускть jupyter notebooks, а также их параметризовать, ну все как мы любим
.
Для создания параметризированного notebook необходимо:
- указать ячейку с параметрами, например, в ячейке может быть код:
# external params period = "day" output_file = "tmp.txt"- указать, что ячейка = ячейка с параметрами: необходимо добавить cell tag = parameters к той ячейке, где располагаются переменные-параметры. papermill парсит их и устанавливает переданные вами значения. Где это находится смотри на скрине в комментариях или в репо. - описать обработку параметров в коде (необязательно =)) - запустить jupyter notebook
papermill extract.ipynb -p period "week" -p output_file "weather-forecast.json" executed.ipynb --log-output --log-level INFO
.
Все артефакты найдете в репо
#papermill #jupyter #runasscript🔥 2
Choose a Different Plan
Your current plan allows analytics for only 5 channels. To get more, please choose a different plan.