Medium Example - Multi-Method Decoration#
This article intends to depict how to use dltflow
in a slightly more complex example. The core components of this
example will indent to show how dltflow
can be configured to wrap multiple functions in a single python module.
Our structure for this article will follow that of our simple example.
Code#
Base Pipeline Code#
All pipelines leverage a base_pipeline.py
file for the example that provides a common PipelineBase
class that each
pipeline inherits from. This is provided to help standardize how things like spark
, logging
, and configuration
are
initialized in pipelines. This pattern largely follows dbx
’s documentation and task templating guide.
import sys
import pathlib
from logging import Logger
from typing import Any, Dict
from argparse import ArgumentParser
import yaml
from pyspark.sql import SparkSession
class PipelineBase:
def __init__(self, spark: SparkSession, init_conf: dict = None):
self.spark = self._get_spark(spark)
self.logger = self._prepare_logger()
self.conf = self._provide_config() if not init_conf else init_conf
@staticmethod
def _get_spark(spark: SparkSession):
if not spark:
spark = builder = (
SparkSession.builder.master("local[1]")
.appName("dltflow-examples")
.getOrCreate()
)
return spark
@staticmethod
def _get_conf_file():
"""Uses the arg parser to extract the config location from cli."""
p = ArgumentParser()
p.add_argument("--conf-file", required=False, type=str)
namespace = p.parse_known_args(sys.argv[1:])[0]
return namespace.conf_file
@staticmethod
def _read_config(conf_file) -> dict[str, Any]:
config = yaml.safe_load(pathlib.Path(conf_file).read_text())
return config
def _provide_config(self):
"""Orchestrates getting configuration."""
self.logger.info("Reading configuration from --conf-file job option")
conf_file = self._get_conf_file()
if not conf_file:
self.logger.info(
"No conf file was provided, setting configuration to empty dict."
"Please override configuration in subclass init method"
)
return {}
else:
self.logger.info(
f"Conf file was provided, reading configuration from {conf_file}"
)
return self._read_config(conf_file)
def _prepare_logger(self) -> Logger: # pragma: no cover
"""Sets up the logger and ensures our job uses the log4j provided by spark."""
log4j_logger = self.spark._jvm.org.apache.log4j # noqa
return log4j_logger.LogManager.getLogger(self.__class__.__name__)
Now that we understand that what base_pipeline.py
does, lets get into our sample code.
Example Pipeline Code#
For this example we will have a slightly more complex pipeline. In this code base, we will:
Import a
DLTMetaMixin
fromdltflow.quality
and will tell our sample pipeline to inherit from it.Create sample data in memory.
We’ll
transform
that data in two steps.In an intermediate step, we will ensure people’s ages are between 10 and 100.
In a following step, we will get a count of people in our sample dataset by age range.
Both the intermediate and final transformations will be registered in the
dlt
pipeline.This will be as a
view
and amaterialized table
respectively.
from dltflow.quality import DLTMetaMixin
from pyspark.sql import DataFrame as SparkDataFrame, SparkSession, functions as f
from .base_pipeline import PipelineBase
class MyMediumPipeline(PipelineBase, DLTMetaMixin):
def __init__(self, spark: SparkSession, init_conf: dict = None):
super().__init__(spark=spark, init_conf=init_conf)
def intermediate_step(self, df: SparkDataFrame) -> SparkDataFrame:
return df.filter(f.col("age").between(10, 100))
def read(self) -> SparkDataFrame:
data = [
("Alice", 34),
("Bob", 45),
("Charlie", 56),
("David", 23),
("Eve", 78),
("Frank", 34),
("Grace", 56),
("Heidi", 45),
("Ivan", 99),
("Judy", 23),
("Kevin", 45),
("Lana", 56),
("Mona", 105),
("Nina", 34),
("Omar", 56),
("Pam", 45),
("Quinn", 99),
("Rita", 23),
("Steve", 45),
("Tina", 115),
("Uma", 78),
("Vera", 34),
("Wendy", 56),
("Xander", 45),
("Yara", 1),
("Zack", 23),
]
return self.spark.createDataFrame(data, ["name", "age"])
def transform(self, df: SparkDataFrame) -> SparkDataFrame:
out_step = self.intermediate_step(df)
out_step2 = (
out_step.withColumn("age_bin", f.bin(f.col("age"), 10))
.withColumn("age_bin_str", f.concat(f.lit("age_bin_"), f.col("age_bin")))
.groupby("age_bin_str")
.count()
)
return out_step2
def orchestrate(self):
df = self.read()
df = self.transform(df)
return df
Configuration#
Now that we have our example code, we need to write our configuration to tell the DLTMetaMixin
how wrap our codebase.
Under the hood, dltflow
uses pydantic
to create validation for configuration. When working with dltflow
, it
requires your configuration to adhere to a specific structure. Namely, file should have the following sections:
reader
: This is helpful for telling your pipeline where to read data from.writer
: Used to define where your data is written to after being processed.dlt
: Defines howdlt
will be used in the project. We use this to dynamically wrap your code withdlt
commands.
With this brief overview out of the way, lets review our configuration for this sample.
reader:
...
writer:
adl_processed_path: "a/fake/path"
schema: fake_schema
table_name: table_name
dlt:
- func_name: "orchestrate"
kind: table
expectation_action: "drop"
expectations:
- name: "check_count"
constraint: "count <= 10"
- func_name: "intermediate_step"
kind: view
expectation_action: "drop"
expectations:
- name: "check_age"
constraint: "age between 10 and 100"
The dlt
section in this example is a list of DLTConfig
’s.
func_name
: The name of the function/method we wantdlt
to decorate.kind
: Tellsdlt
if this query should be materialized as atable
orview
expectation_action
: Tellsdlt
how to handle the expectations.drop
,fail
, andallow
are all supported.expectations
: These are a list of constraints we want to apply to our data.
Workflow Spec#
Now that we’ve gone through the code and configuration, we need to start defining the workflow that we want to deploy
to Databricks so that our pipeline can be registered as a DLT Pipeline. This structure largely follows the Databricks
Pipeline API with the addition of a tasks
key. This key is used during deployment for transitioning your python
module into a Notebook that can be deployed as a DLT Pipeline.
dev:
workflows:
- name: "dltflow-medium-pipeline_with_expectations"
storage: "/mnt/igdatalake/experiment/dltflow-samples/dlt/medium"
target: "dltflow-samples"
development: "true"
edition: "ADVANCED"
continuous: "false"
clusters:
- label: "default"
node_type_id: Standard_DS3_v2"
autoscale:
min_workers: 1
max_workers: 2
mode: "ENHANCED"
pipeline_type: "WORKSPACE"
data_sampling: false
tasks:
items:
- python_file: "pipelines/medium_dlt_pipeline_with_expectations.py"
parameters:
- "--conf"
- "conf/medium_dlt_pipeline_with_expectations.yml"
dependencies:
- whl: "/dbfs/private-site-packages/dltflow-0.0.1b0-py3-none-any.whl"
- pypi:
package: "pyspark"
Deployment#
We’re at the final step of this simple example. The last piece of the puzzle here is that we need to deploy our assets
to a Databricks workspace. To do so, we’ll use the dltflow
cli.
# bin/sh
dltflow deploy-py-dlt --deployment-file ../workflows/medium_dlt_pipeline_with_expectations.yml \
--environment dev --as-individual