Skip to content

Latest commit

 

History

History

cloud-composer-dependency-management-example

Composer Dependency Management

TL;DR: This repository presents a Cloud Composer workflow designed to orchestrate complex task dependencies within Apache Airflow. The solution specifically addresses the challenge of managing parent-child DAG relationships across varying temporal frequencies (yearly, monthly, weekly). By implementing similar framework, data engineers can ensure reliable and timely triggering of child DAGs in accordance with their respective parent DAG's schedule, enhancing overall workflow efficiency and maintainability.

The goal of this use-case is to provide a common pattern to automatically trigger and implement the composer dependency management. The primary challenge addressed is the need to handle complex dependencies between DAGs with different frequencies.

The solution leverages Airflow's dependency management capabilities by dynamically configuring the external_date_fn parameter in the Airflow External Task Sensor to create a hierarchical relationship between the parent and child DAGs.

Solution DAG code-snippet for Depedency-Management using external_task_sensor with yearly schedule frequency:

# Define parent task IDs and external DAG IDs
parent_tasks = [
    {"task_id": "parent_task_1", "dag_id": "company_cal_refresh", "schedule_frequency":"yearly"}
]

def execution_delta_dependency(logical_date, **kwargs):
    dt = logical_date
    task_instance_id=str(kwargs['task_instance']).split(':')[1].split(' ')[1].split('.')[1]
    res = None
    for sub in parent_tasks:
        if sub['task_id'] == task_instance_id:
            res = sub
            break

    schedule_frequency=res['schedule_frequency']
    parent_dag_poke = ''
    if schedule_frequency == "monthly":
        parent_dag_poke = dt.replace(day=1).replace(hour=0, minute=0, second=0, microsecond=0)
    elif schedule_frequency == "weekly":
        parent_dag_poke = (dt - timedelta(days=dt.isoweekday() % 7)).replace(hour=0, minute=0, second=0, microsecond=0)
    elif schedule_frequency == "yearly":
        parent_dag_poke = dt.replace(day=1, month=1, hour=0, minute=0, second=0, microsecond=0)
    elif schedule_frequency == "daily":
        parent_dag_poke = (dt).replace(hour=0, minute=0, second=0, microsecond=0)    
    print(parent_dag_poke)
    return parent_dag_poke

    # Create external task sensors dynamically
    external_task_sensors = []
    for parent_task in parent_tasks:
        external_task_sensor = ExternalTaskSensor(
            task_id=parent_task["task_id"],
            external_dag_id=parent_task["dag_id"],
            timeout=900,
            execution_date_fn=execution_delta_dependency,
            poke_interval=60,  # Check every 60 seconds
            mode="reschedule",  # Reschedule task if external task fails
            check_existence=True
        )
        external_task_sensors.append(external_task_sensor)

Hypothetical use case

Workflow Overview


The workflow involves the following steps:

  1. Create Parent DAGs:

    • Create separate DAGs for each parent job (yearly, monthly, and weekly).
    • Define the schedule for each parent DAG accordingly.
  2. Define Child DAGs:

    • Create child DAGs for each task that needs to be executed based on the parent's schedule.
  3. Set Dependencies:

    • Use the ExternalTaskSensor argument to establish the dependency between the child DAG and its immediate parent DAG.
  4. Trigger Child DAGs:

    • Utilize Airflow's TriggerDagRunOperator to trigger child DAGs when the parent DAG completes.
    • Configure the wait_for_downstream parameter to specify the conditions under which the child DAG should be triggered.
  5. Handle Data Lineage:

    • Ensure that the child DAGs have access to the necessary data generated by the parent DAG.
    • Consider using Airflow's XComs or a central data store for data sharing.

Alt text

Benefits

  • Improved DAG organization and maintainability.
  • Simplified dependency management.
  • Reliable execution of child DAGs based on parent schedules.
  • Reduced risk of data inconsistencies.
  • Scalable approach for managing complex DAG dependencies.

Hypothetical User Story: The Symphony of Data Orchestration

In the bustling city of San Francisco, a dynamic e-commerce company named "Symphony Goods" was on a mission to revolutionize the online shopping experience. At the heart of their success was a robust data infrastructure that seamlessly managed and processed vast amounts of information.

Symphony Goods Data Workflows

Symphony Goods relied on a sophisticated data orchestration system powered by Apache Airflow to automate and streamline their data workflows. This system consisted of a series of interconnected data pipelines, each designed to perform specific tasks and produce valuable insights.

Yearly Refresh: Company Calendar

Once a year, Symphony Goods executed a critical process known as "Company_cal_refresh". This workflow ensured that the company's internal calendars were synchronized across all departments and systems. It involved extracting data from various sources, such as employee schedules, project timelines, and public holidays, and consolidating it into a centralized repository. The updated calendar served as a single source of truth, enabling efficient planning, resource allocation, and communication within the organization.

Monthly Refresh: Product Catalog

Every month, Symphony Goods performed a "Product_catalog_refresh" workflow to keep its product catalog up-to-date. This process involved ingesting data from multiple channels, including supplier feeds, internal databases, and customer feedback. The workflow validated, transformed, and enriched the product information, ensuring that customers had access to accurate and comprehensive product details.

Weekly Summary Report

Symphony Goods generated a "Weekly_summary_report" every week to monitor key performance indicators (KPIs) and track business growth. The workflow aggregated data from various sources, such as sales figures, customer engagement metrics, and website traffic analytics. It then presented the data in visually appealing dashboards and reports, enabling stakeholders to make informed decisions.

Daily Refresh: Product Inventory

To ensure optimal inventory management, Symphony Goods ran a "Product_inventory_refresh" workflow on a daily basis. This workflow extracted inventory data from warehouses, distribution centers, and point-of-sale systems. It calculated available stock levels, identified potential stockouts, and provided recommendations for replenishment. The workflow ensured that Symphony Goods could fulfill customer orders promptly and maintain high levels of customer satisfaction.

The symphony of data orchestration at Symphony Goods was a testament to the power of automation and integration. By leveraging Apache Airflow, the company was able to streamline its data operations, improve data quality, and gain valuable insights to drive business growth. As Symphony Goods continued to scale its operations, the data orchestration system served as the backbone, ensuring that data was always available, accurate, and actionable.

Workflow Frequencies

  1. Yearly: Company_cal_refresh
  2. Monthly: Product_catalog_refresh
  3. Weekly: Weekly_summary_report
  4. Daily: Product_inventory_refresh

Use-case Lineage: Summary of Lineage and Dependencies

The provided context describes the data orchestration system used by Symphony Goods, an e-commerce company in San Francisco. The system is powered by Apache Airflow and consists of four main workflows:

  1. Yearly: Company_cal_refresh

    • Synchronizes internal calendars across all departments and systems, ensuring efficient planning and resource allocation.
    • Depends on data from employee schedules, project timelines, and public holidays.
  2. Monthly: Product_catalog_refresh

    • Keeps the product catalog up-to-date by ingesting data from multiple channels and validating, transforming, and enriching it.
    • Depends on data from supplier feeds, internal databases, and customer feedback.
  3. Weekly: Weekly_summary_report

    • Generates weekly summary reports to monitor key performance indicators (KPIs) and track business growth.
    • Depends on data from sales figures, customer engagement metrics, and website traffic analytics.
  4. Daily: Product_inventory_refresh

    • Ensures optimal inventory management by extracting inventory data from various sources and calculating available stock levels.
    • Depends on data from warehouses, distribution centers, and point-of-sale systems.

The symphony of data orchestration at Symphony Goods is a testament to the power of automation and integration. By leveraging Apache Airflow, the company was able to streamline its data operations, improve data quality, and gain valuable insights to drive business growth.