Streaming Apply Changes Example#

This article intends to show how to get started with dltflow when authoring DLT streaming apply changes pipelines. In this sample, we will be going through the following steps:

Code#

Base Pipeline Code#

All pipelines leverage a base_pipeline.py file for the example that provides a common PipelineBase class that each pipeline inherits from. This is provided to help standardize how things like spark, logging, and configuration are initialized in pipelines. This pattern largely follows dbx’s documentation and task templating guide.

import sys
import pathlib
from logging import Logger
from typing import Any, Dict
from argparse import ArgumentParser

import yaml
from pyspark.sql import SparkSession


class PipelineBase:
    def __init__(self, spark: SparkSession, init_conf: dict = None):
        self.spark = self._get_spark(spark)
        self.logger = self._prepare_logger()
        self.conf = self._provide_config() if not init_conf else init_conf

    @staticmethod
    def _get_spark(spark: SparkSession):
        if not spark:
            spark = builder = (
                SparkSession.builder.master("local[1]")
                .appName("dltflow-examples")
                .getOrCreate()
            )
        return spark

    @staticmethod
    def _get_conf_file():
        """Uses the arg parser to extract the config location from cli."""
        p = ArgumentParser()
        p.add_argument("--conf-file", required=False, type=str)
        namespace = p.parse_known_args(sys.argv[1:])[0]
        return namespace.conf_file

    @staticmethod
    def _read_config(conf_file) -> dict[str, Any]:
        config = yaml.safe_load(pathlib.Path(conf_file).read_text())
        return config

    def _provide_config(self):
        """Orchestrates getting configuration."""
        self.logger.info("Reading configuration from --conf-file job option")
        conf_file = self._get_conf_file()
        if not conf_file:
            self.logger.info(
                "No conf file was provided, setting configuration to empty dict."
                "Please override configuration in subclass init method"
            )
            return {}
        else:
            self.logger.info(
                f"Conf file was provided, reading configuration from {conf_file}"
            )
            return self._read_config(conf_file)

    def _prepare_logger(self) -> Logger:  # pragma: no cover
        """Sets up the logger and ensures our job uses the log4j provided by spark."""
        log4j_logger = self.spark._jvm.org.apache.log4j  # noqa
        return log4j_logger.LogManager.getLogger(self.__class__.__name__)

Now that we understand that what base_pipeline.py does, lets get into our sample code.

Example Pipeline Code#

For this example, we will show a simple example with a queue streaming reader.

  • Import a DLTMetaMixin from dltflow.quality and will tell our sample pipeline to inherit from it.

  • Generate the example data on the fly and put it into a python queue.

  • We will transform it by coercing data types.

You should see that there are no direct calls to dlt. This is the beauty and intentional simplicity dltflow. It does not want to get in your way. Rather, it really wants you to focus on your transformation logic to help keep your code simple and easy to share with other team members.

"""DTLflow streaming CDC pipeline."""

import random
from uuid import UUID
from hashlib import md5
from queue import Queue
from collections import namedtuple

from pyspark.sql import SparkSession, DataFrame as SparkDataFrame
from dltflow.quality import DLTMetaMixin

from .base_pipeline import PipelineBase

_NAMES = ["Alice", "Bob", "Charlie", "David", "Eve", "Fred", "Ginny", "Harriet", "Ileana", "Joseph", "Kincaid", "Larry"]
_CITIES = list(
    set(["Seattle", "Portland", "San Francisco", "Los Angeles", "Seattle", "Portland", "San Francisco", "Los Angeles",
         "Seattle", "Portland", "San Francisco", "Los Angeles"]))


def make_people_data(n: int):
    """
    Generate a sequence of `Person` namedtuples representing randomly generated people.

    Parameters:
        n (int): The number of people to generate.

    Yields:
        Person: A namedtuple representing a person with the following fields:
            - id (str): The unique identifier for the person.
            - name (str): The name of the person.
            - city (str): The city where the person lives.

    """
    Person = namedtuple("Person", "id name city msg_id")
    for i in range(n):
        name = random.choice(_NAMES)
        _hex = md5(name.encode("utf-8")).hexdigest()
        person_id = str(UUID(_hex))
        city = random.choice(_CITIES)
        yield Person(id=person_id, name=name, city=city, msg_id=i)


def create_queue(n: int = 1000):
    """
    Generate a sequence of `Person` namedtuples representing randomly generated people.

    Parameters:
        n (int): The number of people to generate.

    Yields:
        Person: A namedtuple representing a person with the following fields:
            - id (str): The unique identifier for the person.
            - name (str): The name of the person.
            - city (str): The city where the person lives.

    """
    q = Queue()
    for person in make_people_data(n=n):
        q.put(person)
    return q


class CDCPipeline(PipelineBase, DLTMetaMixin):
    """DLTflow example apply changes pipeline."""

    def __init__(self, spark: SparkSession, init_conf: dict = None):
        """
        Initializes a new instance of the class.

        Parameters:
            spark (SparkSession): The SparkSession object.
            init_conf (dict, optional): The initial configuration dictionary. Defaults to None.
        """
        super().__init__(spark=spark, init_conf=init_conf)

    def transform(self, df: SparkDataFrame, df_id: int) -> SparkDataFrame:
        """
        Transforms the given Spark DataFrame by applying a transformation logic.

        Parameters:
            df (SparkDataFrame): The input Spark DataFrame to be transformed.
            df_id (int): The ID of the DataFrame.

        Returns:
            SparkDataFrame: The transformed Spark DataFrame.
        """
        return df

    def orchestrate(self) -> SparkDataFrame:
        """
        Orchestrate the execution of a Spark job.

        Returns:
            SparkDataFrame: The resulting Spark DataFrame after orchestration.
        """
        query = (
            self.spark.readStream.option("maxFilesPerTrigger", 1)
            .queueStream(create_queue())
            .writeStream.format("console")
            .foreachBatch(lambda df, epoch_id: self.transform(df, epoch_id))
            .start()
        )
        query.awaitTermination()
        return query

Configuration#

Now that we have our example code, we need to write our configuration to tell the DLTMetaMixin how wrap our codebase.

Under the hood, dltflow uses pydantic to create validation for configuration. When working with dltflow, it requires your configuration to adhere to a specific structure. Namely, file should have the following sections:

  • reader: This is helpful for telling your pipeline where to read data from.

  • writer: Used to define where your data is written to after being processed.

  • dlt: Defines how dlt will be used in the project. We use this to dynamically wrap your code with dlt commands.

With this brief overview out of the way, lets review our configuration for this sample.

reader:
  ...
writer:
  adl_processed_path: "a/fake/path"
  schema: fake_schema
  table_name: table_name
dlt:
  func_name: "transform"
  kind: table
  expectation_action: "drop"
  expectations:
    - name: "valid_city"
      constraint: 'city in ("Seattle", "Portland", "San Francisco", "Los Angeles")'
  is_streaming: true
  apply_chg_config:
    target: 'apply_changes_table_name'
    source: 'input_dataset_name'
    keys: ['id']
    sequence_by: 'msg_id'
    stored_as_scd_type: 1

The dlt section has the following keys, though this configuration can also be a list of dlt configs.

  • func_name: The name of the function/method we want dlt to decorate.

  • kind: Tells dlt if this query should be materialized as a table or view

  • expectation_action: Tells dlt how to handle the expectations. drop, fail, and allow are all supported.

  • expectations: These are a list of constraints we want to apply to our data.

  • is_streaming: This tells dltflow this is a streaming query.

  • apply_chg_config: This tells dltflow we’re in a streaming append and fills out necessary dlt params.

    • target: Tells dltflow what table data will be written to. This should be a streaming table definition created ahead of time.

    • source: Tells dltflow where to read and get data from.

    • keys: The primary key(s) of the dataset.

    • sequence_by: The column(s) to use when ordering the dataset.

    • stored_as_scd_type: Tells dltflow how to materialize the table. 1 (default), SCD Type 1, 2 - SCD Type 2.

Workflow Spec#

Now that we’ve gone through the code and configuration, we need to start defining the workflow that we want to deploy to Databricks so that our pipeline can be registered as a DLT Pipeline. This structure largely follows the Databricks Pipeline API with the addition of a tasks key. This key is used during deployment for transitioning your python module into a Notebook that can be deployed as a DLT Pipeline.

dev:
  workflows:
    - name: "dltflow-stream-apply-cdc-pipeline_with_expectations"
      storage: "/mnt/igdatalake/experiment/dltflow-samples/dlt/stream-cdc"
      target: "dltflow-samples"
      development: "true"
      edition: "ADVANCED"
      continuous: "false"
      clusters:
        - label: "default"
          node_type_id: Standard_DS3_v2"
          autoscale:
            min_workers: 1
            max_workers: 2
            mode: "ENHANCED"
      pipeline_type: "WORKSPACE"
      data_sampling: false
      tasks:
        items:
          - python_file: "pipelines/streaming_cdc.py"
            parameters:
              - "--conf"
              - "conf/streaming_apply_changes_dlt.yml"
        dependencies:
          - whl: "/dbfs/private-site-packages/dltflow-0.0.1b0-py3-none-any.whl"
          - pypi:
              package: "pyspark"

Deployment#

We’re at the final step of this simple example. The last piece of the puzzle here is that we need to deploy our assets to a Databricks workspace. To do so, we’ll use the dltflow cli.

# bin/sh

dltflow deploy-py-dlt \
  --deployment-file ../workflows/streaming_append_changes_wrkflow.yml \
  --environment dev --as-individual