11,976 questions
0
votes
1
answer
27
views
XCom limit of 1024 in airflow
I am hitting the 1024 item limit in the size of a returned list in my task in airflow. I am currently on airflow v2.11. A simple bit of code that reproduces the issue I see is below.
import os, ...
0
votes
0
answers
41
views
What cause this Airflow EmailOperator Error?
I am using Airflow 3.x for a personal task and faced this error
Failure caused by [SSL: WRONG_VERSION_NUMBER] wrong version number (_ssl.c:1010) source=task.stdout
What I want to do is send an email ...
-1
votes
1
answer
42
views
How to trigger a downstream Airflow DAG only when specific tasks in another DAG succeed?
I’m using Apache Airflow 2.x and I want to trigger a downstream DAG only when specific tasks in an upstream DAG succeed.
Here’s the situation:
I have an upstream DAG: dag_A
Inside dag_A, I have ...
1
vote
1
answer
57
views
Depedency Hell airflow+dbt
Hello I'm new to airflow, lately I'm struggling on a project with dbt+airflow+docker. My problem 1) I pip install dbt-core, dbt-duckdb adapter, 2) I try to install airflow with:
pip install "...
0
votes
1
answer
49
views
Add jinja template value with DateTime value
In an Airflow Dag file, I'm trying to handle following jinja template variables as DateTime values.
Specifically {{ data_interval_start }} and {{ data_interval_end }},
which (according to https://...
0
votes
0
answers
42
views
What does "state attribute is queued" mean for airflow 3?
I try to install the new airflow 3 with docker on a VM with this conf:
x-airflow-common: &airflow-common
image: ${AIRFLOW_IMAGE}
restart: unless-stopped
env_file: .env
environment:
# --...
Best practices
0
votes
0
replies
44
views
How to manage (Google Cloud Composer) Airflow roles with infrastructure as code?
Are there any best practices for handling roles in code?
Most of our setup is managed via Terraform. It would be great if there was a possibility.
There is a Terraform provider, however using the ...
0
votes
0
answers
28
views
Great Expectations 1.8.0 – InvalidDataContextConfigError: “Error while processing DataContextConfig: anonymous_usage_statistics validations_store_name
I’m using Great Expectations 1.8.0 inside an Apache Airflow DAG (Python 3.12), and I keep getting this error whenever my task tries to initialize the FileDataContext:
InvalidDataContextConfigError: ...
0
votes
0
answers
49
views
KubernetesPodOperator - [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: unable to get local issuer certificate (_ssl.c:1006)
I’m running Apache Airflow inside a Docker container and trying to use the KubernetesPodOperator to run a simple “hello world” pod in an external Kubernetes cluster (not the same one where Airflow ...
Tooling
1
vote
0
replies
63
views
masking secrets airflow 2.4.3
I’m using Apache Airflow 2.4.3 and trying to securely store a Snowflake connection with a private key inside the connection’s extras JSON field.
I want to mask the sensitive private_key_content field ...
0
votes
0
answers
35
views
Airflow 3 - Run LocalExecutor with unlimited Parallelism
Airflow Docs say that the LocalExecutor supports an "unlimited Parallelism". Source
They do not specify exactly how to run LocalExecutor in this mode, but I assume it is through the core....
0
votes
0
answers
77
views
MWAA Airflow task_instance_mutation_hook not working
I have added a airflow_local_setting.py file to my mwaa s3 bucket at the root of dags folder. I have a dag_policy defined which is being picked up and works as expected.
In the same file I have added ...
0
votes
1
answer
122
views
Unable to use Airflow variable with function in Jinja template
I'm trying to pass Airflow logical_date to dbt model so that I can use it in the model (sql). I'm using Airflow 2.11.0. I'm doing below, but the dag couldn't get constructed, with an error says:
'...
0
votes
0
answers
199
views
How can I dynamically refresh a JWT token in Airflow with Microsoft SSO using RSA?
I’ve configured Microsoft SSO with Apache Airflow using RSA-based authentication. The setup involves uploading the public key to the Azure App Registration, while Airflow holds the private key to ...
-1
votes
1
answer
48
views
How to correctly use retry_delay or retry_delay_sec in Airflow YAML DAG?
I'm currently working with Apache Airflow and trying to configure retries for my DAG tasks using the retry_delay (or retry_delay_sec) parameter in a YAML-based DAG definition.
However, I'm running ...
1
vote
1
answer
313
views
DeprecationWarning 'HTTP_422_UNPROCESSABLE_ENTITY' is deprecated. Use 'HTTP_422_UNPROCESSABLE_CONTENT' instead
I'm working on AWS EC2. I created a virtual environment and installed all the necessary dependencies. I connected to it via VSCode, and when I try to start the server using the airflow standalone ...
0
votes
0
answers
53
views
Why do my BigQueryInsertJobOperator tasks in Cloud Composer show long queued durations before running?
I’m using Cloud Composer (Airflow) and have two BigQuery operators like this:
run_aggregation = BigQueryInsertJobOperator(
task_id='aggregation_task',
configuration={
"query":...
0
votes
0
answers
39
views
apache airflow client connections are failing to connect to pgbouncer sporadically
We are facing occassional errors while airflow workers and scheduler connecting to pgbouncer. We have tried using external postgres server and bitnami postgres available from airflow helm chart, in ...
0
votes
0
answers
80
views
Unable to establish a connection with Azure Blob using Apache Airflow
I have created a DAG in Airflow that will detect a file in Azure blob storage. This is a sensor that will detect the existence of a blob in Azure blob storage.
But it is failing repeatedly with the ...
0
votes
1
answer
56
views
Apache airflow 3.0.6 oralce connection type is not found
I am using Apache airflow 3.0.6. I am using docker desktop in windows pc. I have oracle connection but it shows below error:
Timestamp: 2025-10-09, 12:05:43
Traceback (most recent call last):
File &...
0
votes
0
answers
29
views
Airflow tasks for data without timestamps
I would like to use Airflow for some ETL operations, where the source data does not have (indexed) timestamp columns. The source data is a database table where new records of events are appended ...
1
vote
1
answer
204
views
How to migrate metadata from Airflow 2.0.x to 3.0.x?
How to migrate the metadata from Airflow version 2.5.1 to Airflow version 3.0.x in MWAA (Managed Workflows for Apache Airflow)? Direct database access has been removed in Airflow 3.0.x. I am not able ...
0
votes
0
answers
83
views
Tasks stuck in queued
I’m running into an issue with Airflow 3.1 using CeleryExecutor inside Docker Compose, and I can’t figure out what’s wrong.
This is for a small project I’m building for my business. The idea is to use ...
1
vote
1
answer
143
views
How can I run airflow db clean command using BashOperator with Airlfow 3+
I run airflow using official docker compose file.
I would like to run a simple command with Airflow BashOperator to clean my logs:
clean_database = BashOperator(
task_id="clean-...
1
vote
0
answers
109
views
Airflow 3.1. + CeleryExecutor in Docker Compose: DAGs stuck in `queued`, tasks never sent to workers
I’m trying to run Apache Airflow 3.1. with the CeleryExecutor using Docker Compose (together with Redis + Postgres).
My problem is, when I trigger a DAG (directly over the command line or how it's ...
1
vote
1
answer
39
views
Does `dag.test()` in Airflow write to the metadata database?
I’m debugging my DAGs in PyCharm using the dag.test() method (Airflow 2.9.1). I noticed that after running it a few times, my environment got into a weird state (i.e.: multiple calls to one of the ...
0
votes
0
answers
74
views
Airflow Delay(?) - "Queued" - in execution and "ERROR - Error fetching the logs. Try number 0 is invalid."
I'm using a newer version of airflow (airflow:3.0.6) with docker compose. Essentially I have just copied and slightly modified the official docker-compose file provided by Airflow itself:
https://...
0
votes
0
answers
52
views
BeamRunJavaPipelineOperator giving 404 issue after dataflow is submitted
Iam using airflow 2.10.5 version to trigger dataflow using BeamRunJavaPipelineOperator ,
Here getting logs as dataflow submiited and dataflow id is 2025-09-18_01_16_18-13565731205394440306 ....
0
votes
0
answers
39
views
Grant permission to newly created DAGs directly during import
I am working on a platform allowing user to register DAGs to an airflow instance. Users upload the code for the DAGs they want to import along with a mapping of permission for roles on those DAGs.
My ...
2
votes
0
answers
160
views
Airflow DAGs disappear from UI after fixing ImportError for BigQueryCreateEmptyTableOperator
I'm encountering an issue with my Airflow DAGs in Google Cloud Composer after trying to fix an import error.
Context:
I have created several DAGs for my Data warehouse (DW) implemented in Google cloud ...
0
votes
2
answers
80
views
How to access params string within a DAG run in Airflow?
I have a dag, which runs several sql scripts on certain tables. There are two options to run this dag:
On the production tables
On frozen archived tables
I want to be able to select which tables to ...
1
vote
2
answers
48
views
Airflow tasks in sequence: dag file is parsed again?
I'm writing a DAG script for Apache Airflow and I'm running into behaviour that I didn't expect. If you take a look at my example script I was expecting the same timestamp to be printed. Instead, when ...
-1
votes
1
answer
81
views
Custom Airflow Image
I have Airflow running on Kubernetes.
So, as you know, every task is spun up as it's own pod by the KubernetesExecutor.
I have tried to override the individual container images with slim python images,...
0
votes
1
answer
98
views
Setup s3 log for airflow 3.0.5
I am setting remote_log for airflow to s3 minio. My airflow version is 3.0.5. Here is my configuration:
AIRFLOW__LOGGING__REMOTE_TASK_HANDLER_KWARGS: '{"delete_local_copy": true}'
...
2
votes
1
answer
282
views
How to clean database from DAG?
I want to periodically delete old XCom records (e.g. older than 15 days) from inside a DAG.
In Airflow 2 I could connect directly to the metadata DB, but in Airflow 3 direct DB access is no longer ...
0
votes
1
answer
84
views
How to change order and number of tasks to be executed in airflow
I want to create a dag workflow with 5 different branches as follows:
The base dag is this:
from datetime import datetime, timedelta, date
from airflow import DAG
from airflow.operators.python import ...
0
votes
2
answers
103
views
Why is Airflow Bash operator not passing XCom to another operator?
I'm working on a task group that needs to pass a variable from a BashOperator to another BashOperator. Each bash operator is invoking Python, and the first Python script needs to return a string in a ...
1
vote
2
answers
364
views
How do I auto-create an Airflow admin user in Airflow 3.x with Docker Compose?
I'm new to Airflow and trying to automatically create an admin user when initializing Apache Airflow 3.x using Docker Compose, but it's not being created, and I can't log in.
I’ve tried using the ...
0
votes
0
answers
63
views
Airflow backfill job randomly experiences error: Task state changed externally
I am using Google Cloud Composer environment and Apache Airflow to run my DAG.
Almost all tasks are run in deferrable mode, since they are long-running.
I noticed that when running a backfill job for ...
0
votes
0
answers
91
views
Trigger DBT core jobs for Snowflake using Airflow
Airflow is installed using Docker and it is running fine. Now I am trying to add dbt-core, dbt-snowflake and astronomer-cosmos python packages to the Airflow image in order to run the DBT core jobs ...
0
votes
0
answers
73
views
Access task level parameters of databricks job along with parameters passed by airflow job
I have a airflow DAG which calls databricks job that has a task level parameters defined as job_run_id (job.run_id) and has a type as python_script. When I try to access it using sys.argv and ...
1
vote
1
answer
231
views
Sending logs from Airflow to S3/Minio
I am in the process of setting up Airflow on my Kubernetes cluster using the official Helm chart. I want to push my task instance logs to S3 (well, Minio actually) to a bucket called airflow-logs. To ...
-2
votes
2
answers
550
views
Airflow 3.0.0 : Rest API : Not authenticated
I am new to Airflow and was able to configure Airflow 3.0.0 with sample docker compose. It is up and running fine. I am able to login to UI and also run dags on UI.
I actually want to trigger DAG via ...
0
votes
1
answer
51
views
Airflow DAG task dependency issue
I have a task dependency like this:
task1 (must succeed)
|
v
+-------------------+
| |
task2-A task2-B
| |
v ...
0
votes
0
answers
30
views
Apache Airflow: Ignore implicit TaskGroup when creating a task
I'm generating dynamically based on JSON files some DAGs.
I'm creating a WHILE loop system with TriggerDagRunOperator (with wait_for_completion=True), triggering a DAG which self-calls itself until a ...
0
votes
1
answer
88
views
GCSToGCSOperator is moving folder along with files when using option move_object as True
I have a requirement to move files from a source folder to destination folder in different GCS buckets. I am using GCSToGCSOperator with following config:
source_bucket: "source_bucket"
...
0
votes
0
answers
52
views
Error loading data: 'Engine' object has no attribute 'cursor': chan="stdout": source="task"
I am trying to run a batch process using Apache Airflow. The Extract and Transform stages work very fine but the load stages is giving an error. Here is my code:
from airflow.decorators import dag, ...
0
votes
1
answer
73
views
S3KeySensor to monitor files across multiple S3 buckets
I've a usecase where I've to monitor for _SUCCESS file creation across two different locations in different S3 buckets
I found through S3 airflow docs that it supports passing multiple S3 bucket keys ...
0
votes
0
answers
80
views
Airflow ModuleNotFoundError: No module named 'pyarrow'
I'm trying Apache Airflow for the first time and built a simple ETL. But after loading the data and proceeding to the transform phase, it throws an error because it says pyarrow was not found. Im ...
1
vote
0
answers
19
views
Airflow in AWS: Executing hundreds of jobs from metastore based on their dependency throwing error while status update
I have hundreds of jobs configured in postgresql metastore with their respective dependencies. I need to pull those jobs from metastore and execute them as per their dependency and track the status of ...