Skip to main content

Overview

Tool Name

airflow_tools

Purpose

The airflow_tools enable direct integration between Genesis Data Agents and Apache Airflow instances. Create, deploy, trigger, monitor, and manage DAGs (Directed Acyclic Graphs) and tasks programmatically—all within your data agent conversations. Perfect for orchestrating complex data pipelines, scheduling ETL workflows, managing dependencies, and automating data operations at scale.

Functions Available

  1. airflow_action: Core Airflow operations including DAG management, task execution, monitoring, and connection configuration.

Key Features

DAG Management

Create, list, pause, unpause, and delete DAGs with full workflow orchestration capabilities.

Task Execution

Trigger DAG runs, monitor task execution, and retrieve task logs and status in real-time.

Connection Management

Configure and manage Airflow connections for databases, APIs, and external systems.

Variable Management

Set, retrieve, and update Airflow variables for dynamic configuration and secrets.

Run Monitoring

Track DAG run history, task instances, and execution metrics for observability.

Scheduler Control

Manage scheduling, backfills, and DAG execution timing programmatically.

Input Parameters for Each Function

airflow_action

Parameters
NameDefinitionFormat
actionAirflow operation to perform. Values: create_dag, list_dags, trigger_dag, get_dag_status, pause_dag, unpause_dag, delete_dag, get_dag_runs, get_task_logs, set_variable, get_variable, create_connection, test_connection, etc.String (required)
dag_idUnique identifier for the DAG. Required for most DAG-specific operations.String
dag_codePython code defining the DAG (for create_dag action).String
execution_dateExecution date for DAG runs (ISO format: YYYY-MM-DD or YYYY-MM-DDTHH:MM:SS).String
confConfiguration dictionary to pass to DAG run (JSON format).Object
task_idSpecific task identifier within a DAG (for task-specific operations).String
variable_keyKey name for Airflow variable operations.String
variable_valueValue to set for Airflow variable.String
conn_idConnection identifier for Airflow connection operations.String
conn_typeConnection type (e.g., ‘postgres’, ‘http’, ‘aws’, ‘snowflake’).String
hostHost address for connection configuration.String
schemaSchema/database name for connection configuration.String
loginUsername/login for connection authentication.String
passwordPassword for connection authentication.String
portPort number for connection configuration.Integer
extraExtra connection parameters as JSON string.String
Use dag_id consistently across operations to manage the same workflow. List available DAGs with airflow_action(action="list_dags").

Use Cases

  1. Automated Pipeline Deployment Generate Airflow DAGs from templates, deploy them automatically, and trigger initial runs without manual intervention.
  2. Data Pipeline Orchestration Coordinate complex ETL workflows with dependencies, scheduling multiple tasks across different systems and databases.
  3. Monitoring & Alerting Track DAG execution status, retrieve task logs, and set up automated alerts based on pipeline failures or SLA breaches.
  4. Dynamic Configuration Management Update Airflow variables and connections programmatically to adapt pipelines to changing environments or requirements.
  5. Backfill Operations Trigger historical DAG runs for specific date ranges to process missed data or re-run failed executions.

Workflow/How It Works

  1. Step 1: Configure Airflow Connection Set up connection to your Airflow instance:
    # Connection is typically pre-configured in Genesis
    # Verify with list_dags to test connectivity
    airflow_action(
        action="list_dags"
    )
    
  2. Step 2: Create or Deploy DAG Define a new DAG with tasks and dependencies:
    dag_code = """
    from airflow import DAG
    from airflow.operators.python import PythonOperator
    from datetime import datetime, timedelta
    
    default_args = {
        'owner': 'data_team',
        'depends_on_past': False,
        'start_date': datetime(2024, 1, 1),
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
    }
    
    def extract_data():
        print("Extracting data from source...")
    
    def transform_data():
        print("Transforming data...")
    
    def load_data():
        print("Loading data to warehouse...")
    
    with DAG(
        'customer_etl_pipeline',
        default_args=default_args,
        description='Daily customer data ETL',
        schedule_interval='@daily',
        catchup=False
    ) as dag:
        
        extract = PythonOperator(
            task_id='extract_data',
            python_callable=extract_data
        )
        
        transform = PythonOperator(
            task_id='transform_data',
            python_callable=transform_data
        )
        
        load = PythonOperator(
            task_id='load_data',
            python_callable=load_data
        )
        
        extract >> transform >> load
    """
    
    airflow_action(
        action="create_dag",
        dag_id="customer_etl_pipeline",
        dag_code=dag_code
    )
    
  3. Step 3: Trigger DAG Execution Start a DAG run manually or with custom configuration:
    airflow_action(
        action="trigger_dag",
        dag_id="customer_etl_pipeline",
        conf={"date": "2024-01-15", "mode": "full_refresh"}
    )
    
  4. Step 4: Monitor Execution Check DAG run status and task progress:
    airflow_action(
        action="get_dag_status",
        dag_id="customer_etl_pipeline"
    )
    
    airflow_action(
        action="get_dag_runs",
        dag_id="customer_etl_pipeline",
        limit=10
    )
    
  5. Step 5: Retrieve Task Logs Access logs for debugging or auditing:
    airflow_action(
        action="get_task_logs",
        dag_id="customer_etl_pipeline",
        task_id="transform_data",
        execution_date="2024-01-15T00:00:00"
    )
    
  6. Step 6: Manage Variables & Connections Configure dynamic parameters:
    # Set variable
    airflow_action(
        action="set_variable",
        variable_key="warehouse_schema",
        variable_value="PROD_ANALYTICS"
    )
    
    # Create connection
    airflow_action(
        action="create_connection",
        conn_id="snowflake_prod",
        conn_type="snowflake",
        host="account.snowflakecomputing.com",
        login="data_user",
        password="secure_password",
        schema="ANALYTICS",
        extra='{"account": "account_name", "warehouse": "COMPUTE_WH"}'
    )
    

Integration Relevance

  • project_manager_tools to track data pipeline projects and link Airflow DAGs to missions.
  • data_connector_tools to create Airflow connections for databases and data sources.
  • github_connector_tools / gitlab_connector_tools to version control DAG code and sync with repositories.
  • slack_tools to send notifications when DAG runs succeed or fail.
  • scheduler_tools to coordinate Genesis scheduled jobs with Airflow DAG executions.
  • dbt_action to trigger dbt runs within Airflow tasks for transformation workflows.

Configuration Details

  • Airflow URL: Configure your Airflow webserver URL and API endpoint in Genesis settings.
  • Authentication: Support for basic auth, token-based auth, or Kerberos depending on Airflow setup.
  • DAG Folder: DAGs created through Genesis are typically deployed to Airflow’s dags/ folder.
  • Executor Type: Works with all Airflow executors (SequentialExecutor, LocalExecutor, CeleryExecutor, KubernetesExecutor).
  • API Version: Compatible with Airflow 2.x REST API (Stable API).
  • Time Zones: All timestamps should use UTC or explicit timezone specification.
  • Connection Extras: Use JSON format for provider-specific connection parameters.
Airflow connections may contain sensitive credentials. Use Airflow’s secrets backend (e.g., AWS Secrets Manager, HashiCorp Vault) for production environments instead of storing passwords directly.

Limitations or Notes

  1. Airflow Version: Requires Apache Airflow 2.0+ with REST API enabled. Airflow 1.x not supported.
  2. DAG Deployment: DAGs must be in the Airflow DAGs folder and parsed by the scheduler before they appear.
  3. DAG Parsing: Complex DAGs may take time to parse; allow 30-60 seconds after creation before triggering.
  4. API Authentication: Requires proper Airflow API authentication configured in Genesis.
  5. Task Logs: Log retrieval depends on Airflow’s log storage configuration (local filesystem, S3, GCS, etc.).
  6. Concurrency Limits: Airflow’s concurrency settings may limit simultaneous DAG/task executions.
  7. Network Access: Requires network connectivity to Airflow webserver/API endpoint.
  8. DAG Code Validation: Invalid DAG code will fail at parse time; test locally before deploying.

Supported Actions

create_dag - Create and deploy new DAG
list_dags - List all available DAGs
get_dag_status - Get current DAG state
trigger_dag - Start DAG run manually
pause_dag - Pause DAG scheduling
unpause_dag - Resume DAG scheduling
delete_dag - Remove DAG from Airflow
get_dag_runs - List DAG run history
get_task_logs - Retrieve task execution logs
get_task_instances - List task instances
set_variable - Create/update Airflow variable
get_variable - Retrieve Airflow variable
delete_variable - Remove Airflow variable
list_variables - List all variables
create_connection - Create Airflow connection
get_connection - Retrieve connection details
test_connection - Test connection validity
delete_connection - Remove connection
list_connections - List all connections

Not Supported

❌ Airflow 1.x API (legacy CLI-based operations)
❌ Direct modification of Airflow configuration files
❌ Pool management (may be added in future)
❌ SLA miss callbacks direct configuration
❌ Custom XCom operations
❌ Direct scheduler/worker process management
❌ Plugin installation or management
❌ User/role management (use Airflow UI)
❌ Chart/graph visualization generation

Output

  • create_dag: Confirmation with DAG ID and deployment status.
  • list_dags: Table of DAGs with ID, schedule interval, owner, and active status.
  • trigger_dag: DAG run ID, execution date, and trigger confirmation.
  • get_dag_status: Current state (running, success, failed), last run time, next run time.
  • get_dag_runs: List of run instances with execution date, state, duration, and start/end times.
  • get_task_logs: Task execution logs with timestamps and log level.
  • set_variable: Confirmation of variable creation/update.
  • create_connection: Confirmation with connection ID and type.
  • test_connection: Connection test result (success/failure) with error details if applicable.
  • Errors: Detailed error messages with Airflow API response codes and resolution guidance.

Best Practices

DAG Design

Keep DAGs simple and focused. Use SubDAGs or TaskGroups for complex workflows with many tasks.

Idempotency

Design tasks to be idempotent—safe to re-run multiple times with the same result.

Error Handling

Set appropriate retries, retry_delay, and on_failure_callback for resilient pipelines.

Resource Management

Use pools and priority weights to manage resource allocation across concurrent tasks.

Monitoring

Set up SLAs and alerting. Monitor DAG run duration and task failure rates.

Variables & Connections

Use Airflow Variables for configuration, Connections for credentials. Never hardcode secrets.

Example: Complete ETL Pipeline Workflow

# 1. List existing DAGs to verify connectivity
result = airflow_action(action="list_dags")
print(f"Connected to Airflow. Found {len(result['dags'])} DAGs")

# 2. Set configuration variables
airflow_action(
    action="set_variable",
    variable_key="source_database",
    variable_value="CUSTOMER_DB"
)

airflow_action(
    action="set_variable",
    variable_key="target_schema",
    variable_value="ANALYTICS"
)

# 3. Create database connection
airflow_action(
    action="create_connection",
    conn_id="postgres_source",
    conn_type="postgres",
    host="source-db.company.com",
    login="etl_user",
    password="secure_password",
    schema="customers",
    port=5432
)

# 4. Test the connection
test_result = airflow_action(
    action="test_connection",
    conn_id="postgres_source"
)
print(f"Connection test: {test_result['status']}")

# 5. Create comprehensive ETL DAG
dag_code = """
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
from airflow.models import Variable
from datetime import datetime, timedelta
import logging

default_args = {
    'owner': 'data_engineering',
    'depends_on_past': False,
    'start_date': datetime(2024, 1, 1),
    'email': ['data-team@company.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
}

def validate_data(**context):
    source_db = Variable.get("source_database")
    logging.info(f"Validating data from {source_db}")
    return {"status": "validated", "row_count": 15000}

def send_notification(**context):
    ti = context['task_instance']
    validation = ti.xcom_pull(task_ids='validate_data')
    logging.info(f"Pipeline complete. Processed {validation['row_count']} rows")

with DAG(
    'customer_data_pipeline',
    default_args=default_args,
    description='Extract customer data from Postgres, transform, load to Snowflake',
    schedule_interval='0 2 * * *',
    catchup=False,
    max_active_runs=1,
    tags=['etl', 'customers', 'daily']
) as dag:
    
    extract = PostgresOperator(
        task_id='extract_customer_data',
        postgres_conn_id='postgres_source',
        sql='''
            SELECT customer_id, name, email, created_date, status
            FROM customers
            WHERE updated_at >= '{{ ds }}'
        ''',
        dag=dag
    )
    
    validate = PythonOperator(
        task_id='validate_data',
        python_callable=validate_data,
        provide_context=True,
        dag=dag
    )
    
    transform = PostgresOperator(
        task_id='transform_data',
        postgres_conn_id='postgres_source',
        sql='''
            CREATE TEMP TABLE transformed_customers AS
            SELECT 
                customer_id,
                UPPER(name) as name,
                LOWER(email) as email,
                created_date,
                CASE 
                    WHEN status = 'A' THEN 'ACTIVE'
                    WHEN status = 'I' THEN 'INACTIVE'
                    ELSE 'UNKNOWN'
                END as status_desc
            FROM customers_staging;
        ''',
        dag=dag
    )
    
    load = SnowflakeOperator(
        task_id='load_to_snowflake',
        snowflake_conn_id='snowflake_prod',
        sql='''
            MERGE INTO {{ var.value.target_schema }}.CUSTOMERS dst
            USING staging.customers_temp src
            ON dst.customer_id = src.customer_id
            WHEN MATCHED THEN UPDATE SET
                dst.name = src.name,
                dst.email = src.email,
                dst.status = src.status_desc,
                dst.updated_at = CURRENT_TIMESTAMP()
            WHEN NOT MATCHED THEN INSERT
                (customer_id, name, email, status, created_at)
            VALUES
                (src.customer_id, src.name, src.email, src.status_desc, src.created_date);
        ''',
        dag=dag
    )
    
    notify = PythonOperator(
        task_id='send_notification',
        python_callable=send_notification,
        provide_context=True,
        dag=dag
    )
    
    extract >> validate >> transform >> load >> notify
"""

# 6. Deploy the DAG
airflow_action(
    action="create_dag",
    dag_id="customer_data_pipeline",
    dag_code=dag_code
)
print("DAG created successfully")

# 7. Wait for DAG to be parsed
import time
time.sleep(30)

# 8. Unpause the DAG to enable scheduling
airflow_action(
    action="unpause_dag",
    dag_id="customer_data_pipeline"
)

# 9. Trigger initial run
run_result = airflow_action(
    action="trigger_dag",
    dag_id="customer_data_pipeline",
    conf={"mode": "initial_load", "date_range": "2024-01-01"}
)
print(f"DAG triggered. Run ID: {run_result['dag_run_id']}")

# 10. Monitor the run
time.sleep(10)
status = airflow_action(
    action="get_dag_status",
    dag_id="customer_data_pipeline"
)
print(f"Current status: {status['state']}")

# 11. Get run history
runs = airflow_action(
    action="get_dag_runs",
    dag_id="customer_data_pipeline",
    limit=5
)
for run in runs['dag_runs']:
    print(f"Run {run['execution_date']}: {run['state']} ({run['duration']}s)")

# 12. Get logs for debugging
logs = airflow_action(
    action="get_task_logs",
    dag_id="customer_data_pipeline",
    task_id="validate_data",
    execution_date=run_result['execution_date']
)
print("Validation task logs:")
print(logs['logs'])

Advanced Features

Backfill Operations

Run DAG for historical dates:
from datetime import datetime, timedelta

# Trigger backfill for last 7 days
start_date = datetime.now() - timedelta(days=7)
for i in range(7):
    exec_date = (start_date + timedelta(days=i)).strftime("%Y-%m-%d")
    airflow_action(
        action="trigger_dag",
        dag_id="customer_data_pipeline",
        execution_date=exec_date,
        conf={"backfill": True}
    )
    print(f"Triggered backfill for {exec_date}")

Dynamic DAG Generation

Create DAGs from templates:
def generate_dag_code(source_table, target_table, schedule):
    return f"""
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

with DAG(
    '{source_table}_to_{target_table}',
    start_date=datetime(2024, 1, 1),
    schedule_interval='{schedule}',
    catchup=False
) as dag:
    
    def process_data():
        print("Processing {source_table} to {target_table}")
    
    task = PythonOperator(
        task_id='process',
        python_callable=process_data
    )
"""

# Generate multiple DAGs
tables = [
    ("customers", "dim_customers", "0 3 * * *"),
    ("orders", "fact_orders", "0 4 * * *"),
    ("products", "dim_products", "0 2 * * *")
]

for source, target, schedule in tables:
    dag_code = generate_dag_code(source, target, schedule)
    airflow_action(
        action="create_dag",
        dag_id=f"{source}_pipeline",
        dag_code=dag_code
    )

Conditional Execution

Trigger DAGs based on conditions:
from datetime import datetime, timedelta

def should_trigger_dag():
    last_update = get_last_data_update()
    if datetime.now() - last_update < timedelta(hours=1):
        return True
    return False

if should_trigger_dag():
    airflow_action(
        action="trigger_dag",
        dag_id="realtime_pipeline"
    )
else:
    print("Source data not fresh enough, skipping trigger")

Monitoring & Alerting

Track Pipeline Health

def check_dag_health(dag_id, max_failures=3):
    """Check recent DAG run health"""
    runs = airflow_action(
        action="get_dag_runs",
        dag_id=dag_id,
        limit=10
    )
    
    failed_runs = [r for r in runs['dag_runs'] if r['state'] == 'failed']
    
    if len(failed_runs) >= max_failures:
        send_alert(f"DAG {dag_id} has {len(failed_runs)} failures in last 10 runs")
        return False
    
    return True

# Monitor critical DAGs
critical_dags = ["customer_data_pipeline", "financial_reporting", "ml_training"]
for dag_id in critical_dags:
    health = check_dag_health(dag_id)
    print(f"{dag_id}: {'Healthy' if health else 'Unhealthy'}")

Troubleshooting

  • Check DAG code for syntax errors
  • Verify DAG file is in Airflow’s dags_folder
  • Check scheduler logs for parsing errors
  • Allow 30-60 seconds for scheduler to parse new DAGs
  • Use airflow dags list CLI to verify DAG registration
  • Verify Airflow API endpoint is accessible
  • Check authentication credentials
  • Ensure Airflow webserver is running
  • Verify network connectivity and firewall rules
  • Check Airflow API is enabled (auth_backend configured)
  • Verify DAG is unpaused (use unpause_dag action)
  • Check DAG is not already running (max_active_runs limit)
  • Ensure execution_date is valid
  • Check scheduler is running and healthy
  • Review DAG configuration for errors
  • Verify task has executed (not in queued state)
  • Check Airflow’s log storage configuration
  • For remote logs (S3/GCS), verify access permissions
  • Allow time for logs to be written and synced
  • Check log_fetch_timeout_sec setting
  • Verify exact key/conn_id (case-sensitive)
  • Check variable/connection was created successfully
  • For connections, ensure type and parameters are correct
  • Review Airflow metadata database for entry
  • Check for typos in variable/connection names
  • Check if all tasks completed
  • Look for zombie tasks or stalled processes
  • Verify executor has available resources
  • Check worker health (for CeleryExecutor/KubernetesExecutor)
  • Review task_instance table in metadata database
  • Reduce DAG parsing frequency (min_file_process_interval)
  • Optimize DAG code (avoid heavy imports at module level)
  • Use connection pooling for database operations
  • Increase parallelism settings if resources available
  • Consider using KubernetesExecutor for scaling

Airflow Architecture Overview

Understanding Airflow components helps with effective usage:

Key Components

  • Webserver: Provides UI and REST API for interaction
  • Scheduler: Monitors DAGs, schedules tasks, triggers execution
  • Executor: Determines how tasks are executed (local, Celery, Kubernetes)
  • Workers: Execute tasks (for distributed executors)
  • Metadata Database: Stores DAG definitions, runs, connections, variables
  • DAG Folder: File system location where DAG Python files are stored

Security Best Practices

Secrets Management

Use Airflow’s secrets backend (AWS Secrets Manager, Vault) instead of storing passwords in connections.

RBAC

Configure Airflow’s Role-Based Access Control to limit DAG operations by user role.

API Authentication

Always use authentication for Airflow API. Avoid exposing API endpoint publicly without auth.

Audit Logging

Enable Airflow audit logs to track all API operations and DAG modifications.

Performance Optimization

For High-Volume Pipelines: Consider using KubernetesExecutor or CeleryExecutor with autoscaling to handle dynamic workloads efficiently. Use Airflow pools to prevent resource exhaustion.

Best Practices for Scale

  1. Limit DAG Complexity: Keep individual DAGs under 50 tasks; use SubDAGs or dynamic task mapping
  2. Optimize Parsing: Minimize top-level code execution in DAG files
  3. Use Pools: Define resource pools to control concurrent task execution
  4. Task Retries: Set appropriate retry logic to handle transient failures
  5. XCom Size: Keep XCom payloads small; use external storage for large data
  6. Database Tuning: Optimize metadata database for high-volume task instances

Comparison: Airflow vs Other Orchestrators

FeatureApache AirflowPrefectDagster
Scheduling✅ Cron-based, flexible✅ Hybrid (cron + event)✅ Cron-based
Dynamic Pipelines✅ Python-based DAGs✅ Python native✅ Python native
UI/Monitoring✅ Mature web UI✅ Modern cloud UI✅ Modern web UI
Backfills✅ Built-in⚠️ Manual✅ Built-in
Community✅ Large, mature⚠️ Growing⚠️ Growing
Self-Hosted✅ Fully open source⚠️ Hybrid (cloud-first)✅ Fully open source
Learning Curve⚠️ Moderate✅ Lower⚠️ Moderate
Apache Airflow excels at scheduling-based workflows with complex dependencies. For event-driven or data-centric pipelines, consider Prefect or Dagster. Genesis supports multiple orchestrators—choose based on your use case.
I