See all of my kedro related posts in [[ tag/kedro ]].
#kedrotips ¶ #
I am tweeting out most of these snippets as I add them, you can find them all here #kedrotips.
🗣 Heads up ¶ #
Below are some quick snippets/notes for when using kedro to build data pipelines. So far I am just compiling snippets. Eventually I will create several posts on kedro. These are mostly things that I use In my everyday with kedro. Some are a bit more essoteric. Some are helpful when writing production code, some are useful more usefule for exploration.
📚 Catalog ¶ #
Photo by jesse orrico on Unsplash
CSVLocalDataSet ¶ #
python
import pandas as pd
iris = pd.read_csv('https://raw.githubusercontent.com/kedro-org/kedro/d3218bd89ce8d1148b1f79dfe589065f47037be6/kedro/template/%7B%7B%20cookiecutter.repo_name%20%7D%7D/data/01_raw/iris.csv')
data_set = CSVLocalDataSet(filepath="test.csv",
load_args=None,
save_args={"index": False})
iris_data_set.save(iris)
reloaded_iris = iris_data_set.load()
yaml
test_data:
type: CSVLocalDataset
filepath: test.csv
load_args: None
save_args:
index: False
CSVHTTPDataSet ¶ #
cities = CSVHTTPDataSet(
fileurl="https://raw.githubusercontent.com/kedro-org/kedro/d3218bd89ce8d1148b1f79dfe589065f47037be6/kedro/template/%7B%7B%20cookiecutter.repo_name%20%7D%7D/data/01_raw/iris.csv",
auth=None,
load_args=None)
iris = iris_data_set.load()
cities:
type: CSVHTTPDataSet
fileurl: https://people.sc.fsu.edu/~jburkardt/data/csv/cities.csv
auth: None
load_args: None
HDFLocalDataSet ¶ #
import pandas as pd
from kedro.io import HDFLocalDataSet
iris = pd.read_csv('https://raw.githubusercontent.com/kedro-org/kedro/d3218bd89ce8d1148b1f79dfe589065f47037be6/kedro/template/%7B%7B%20cookiecutter.repo_name%20%7D%7D/data/01_raw/iris.csv')
iris_data_set = HDFLocalDataSet(filepath="iris.hdf",
key="test_hdf_key",
load_args=None,
save_args=None)
iris_data_set.save(iris)
reloaded_iris = iris_data_set.load()
cars:
type: HDFLocalDataSet
filepath: test.hdf
key: test_hdf_key
HDFS3LocalDataSet ¶ #
import pandas as pd
from kedro.io import HDFS3DataSet
iris = pd.read_csv('https://raw.githubusercontent.com/kedro-org/kedro/d3218bd89ce8d1148b1f79dfe589065f47037be6/kedro/template/%7B%7B%20cookiecutter.repo_name%20%7D%7D/data/01_raw/iris.csv')
iris_data_set = HDFS3DataSet(filepath="iris.hdf",
bucket_name="bucket-us-west-1",
key="test_hdf_key",
load_args=None,
save_args=None)
iris_data_set.save(iris)
reloaded_iris = iris_data_set.load()
cars:
type: HDFS3DataSet
filepath: cars.hdf
bucket_name: bucket-us-west-1
key: test_hdf_key
JSONLocalDataSet ¶ #
import pandas as pd
from kedro.io import JSONLocalDataSet
iris = pd.read_csv('https://raw.githubusercontent.com/kedro-org/kedro/d3218bd89ce8d1148b1f79dfe589065f47037be6/kedro/template/%7B%7B%20cookiecutter.repo_name%20%7D%7D/data/01_raw/iris.csv')
cars = JSONLocalDataSet(filepath="iris.json",
load_args=None,
save_args=None)
iris_data_set.save(iris)
reloaded_iris = iris_data_set.load()
cars:
type: JSONLocalDataSet
filepath: iris.json
ParquetLocalDataSet ¶ #
import pandas as pd
from kedro.io import ParquetLocalDataSet
iris = pd.read_csv('https://raw.githubusercontent.com/kedro-org/kedro/d3218bd89ce8d1148b1f79dfe589065f47037be6/kedro/template/%7B%7B%20cookiecutter.repo_name%20%7D%7D/data/01_raw/iris.csv')
iris_data_set = ParquetLocalDataSet('iris',
engine='auto',
load_args=None,
save_args=None,
version=None)
iris_data_set.save(iris)
reloaded_iris = iris_data_set.load()
cars:
type: JSONLocalDataSet
filepath: cars
PickleS3DataSet
SQLTableDataSet
SQLQueryDataSet
TextLocalDataSet
ExcelLocalDataSet
⏳ Loading Data ¶ #
Photo by Battlecreek Coffee Roasters on Unsplash
Simple Loading ¶ #
df = catalog.load('cars')
list all datasets ¶ #
catalog.list()
Saving Data ¶ #
catalog.save('cars', cars)
🔍 Finding data ¶ #
simple keyword search
query = 'raw'
[data for data in catalog.list() if query in data]
see on #kedrotips
multi keyword serch
query = 'raw sales'
data_sets = catalog.list()
for word in query.split():
data_sets = [
data
for data in data_sets
if query in data
]
see on #kedrotips
🐒 monkey patch it
def query(*search_terms):
data_sets = catalog.list()
for search in search_terms:
data_sets = [
data
for data in data_sets
if search in data
]
return data_sets
catalog.query = query
_see on #kedrotips
🤙 YOLO ¶ #
You Only Load Once
simple
data = [catalog.load(d)
for d in
catalog.query('c_pri', 'cars')
]
more refined
data = {
d: catalog.load(d)
for d in catalog.query('c_pri', 'cars')
}
🍷 refined like a fine wine
from types import SimpleNamespace
data = SimpleNamespace**{
d: catalog.load(d)
for d in catalog.query('c_pri', 'cars')
})
🧀 Make it a function getting funcy
from types import SimpleNamespace
def yolo(*search_terms):
"""you only load once
using query method from previous tip"""
data = SimpleNamespace(**{
d: catalog.load(d)
for d in catalog.query(*search_terms)
})
return data
all_pri = yolo('c_pri')
🐒 monkey patch it
from functools import partial
catalog.yolo = yolo
catalog.yolo.__doc__ = "you only load once"
all_pri = catalog.yolo('c_pri')
adding catalogs together ¶ #
from kedro.io import DataCatalog
DataCatalog({**cat1.__dict__['_data_sets'], **cat2.__dict__['_data_sets']})
🛢 Building pipelines ¶ #
Photo by roman pentin on Unsplash
📍 Creating Nodes ¶ #
from kedro.pipeline import node
node = node(lambda x: x.dropna(), inputs='raw_cars', outputs='int_cars')
from kedro.pipeline import node
def drop_columns(df, *columns):
for column in columns:
df = df.drop(columns=column)
return df
node = node(
lambda x: drop_columns(x, 'vs', 'am', 'gear', 'carb'),
inputs='int_cars',
outputs='pri_cars'
)
🛢 Creating a pipeline ¶ #
Don’t be so verbose ¶ #
Create similar nodes dynamically
def halve_dataframe(data: pd.DataFrame) -> List[pd.DataFrame]:
""" splits a dataframe in half """
return np.array_split(data, 2)
nodes = []
datasets = [
'cars', 'trucks', 'boats', 'motorcycles', 'planes',
'ships', 'busses', 'trains', 'subways'
]
# creates a pipeline node for every dataset in the datasets list
for dataset in datasets
nodes.append(
node(halve_dataframe,
'e_modin_{dataset}',
['train_{dataset}', 'test_{dataset}']),
)
🏃♂️ Running Pipelines ¶ #
Photo by Rodion Kutsaev on Unsplash
🔖 filter by tags
nodes = pipeline.only_nodes_with_tags('cars')
see on #kedrotips
filter by node
nodes = pipeline.only_nodes('b_int_cars')
_see on #kedrotips
filter nodes like
query_string = 'cars'
nodes = [
node.name
for node in pipeline.nodes
if query_string in node.name
]
pipeline.only_nodes(*nodes)
see on #kedrotips
only nodes with tags or
nodes = pipeline.only_nodes_with_tags('cars', 'trains')
only nodes with tags and
raw_nodes = pipeline.only_nodes_with_tags('raw')
car_nodes = pipeline.only_nodes_with_tags('cars')
raw_car_nodes = raw_nodes & car_nodes
raw_nodes = (
pipeline
.only_nodes_with_tags('raw')
.only_nodes_with_tags('cars')
)
add pipelines
car_nodes = pipeline.only_nodes_with_tags('cars')
train_nodes = pipeline.only_nodes_with_tags('trains')
transportation_nodes = car_nodes + train_nodes
ensure nodes are attached
cars_attached = len(
pipeline
.only_nodes_with_tags('cars')
.grouped_nodes
) == 1
🎂 Pipeline Decorators ¶ #
from kedro.pipeline.decorators import log_time, mem_profile
pipeline.decorate(log_running_time)
Pipeline IO ¶ #
pipleine.all_inputs() and pipeline.all_outputs() return sets of pipeline inputs and outputs and you can do set operations on them. This is particularly useful to find the upper and lower edges of your pipeline or subset of pipeline. The pipeline object here is any kedro pipeline including a filtered subset.
Find all raw data ¶ #
pipeline.all_inputs() - pipeline.all_outputs()
Find all final data ¶ #
pipeline.all_outputs() - pipeline.all_inputs()
Find all nodes that do not raw ¶ #
This one is probably one that is pushing the limits of what I would do in a list comprehension that I use in prod or even put into a text editor, but I commonly use ipython for my adhoc work and keeping it all in one line is very handy. Complex list comprehensions kinda start becoming like regex in a way that they are really easy to write and really hard to read. I don’t think this one quite hits that point but its getting close.
I find this one super useful to help me either move data beween environments, or avoid unnecessary database calls.
raw_inputs = pipeline.all_inputs() - pipeline.all_outputs()
raw_nodes = [node for node in pipeline.nodes if [i for i in raw_inputs if i in set(node.inputs)] != []]