-
Notifications
You must be signed in to change notification settings - Fork 16.8k
Deserialization error for External-Event Driven Dag #64613
Copy link
Copy link
Open
Labels
affected_version:3.2Use for reporting issues with 3.2Use for reporting issues with 3.2area:corearea:serializationkind:bugThis is a clearly a bugThis is a clearly a bugpriority:highHigh priority bug that should be patched quickly but does not require immediate new releaseHigh priority bug that should be patched quickly but does not require immediate new release
Description
Apache Airflow version
Latest 3-2-test branch, commit hash: e64b7e4
What happened and how to reproduce it?
Set the following env in files/airflow-breeze-config/environment_variables.env
AIRFLOW_CONN_ISSUE_64205_KAFKA_CONFIG='{
"conn_type": "general",
"extra": {
"bootstrap.servers": "broker:29092",
"group.id": "issue_64205_group",
"security.protocol": "PLAINTEXT",
"enable.auto.commit": false,
"auto.offset.reset": "latest"
}
}'Dag:
from __future__ import annotations
import json
from typing import TYPE_CHECKING
import pendulum
from airflow.providers.apache.kafka.operators.produce import ProduceToTopicOperator
from airflow.providers.common.messaging.triggers.msg_queue import MessageQueueTrigger
from airflow.sdk import Asset, AssetWatcher, dag, get_current_context, task
if TYPE_CHECKING:
from airflow.sdk.execution_time.context import Context, TriggeringAssetEventsAccessor
KAFKA_CONFIG_ID = "issue_64205_kafka_config"
TOPICS = ["fizz_buzz"]
DLQ_TOPIC = "dlq"
RETRY_COUNT = 3
"""Airflow Kafka connection
AIRFLOW_CONN_ISSUE_64205_KAFKA_CONFIG='{
"conn_type": "general",
"extra": {
"bootstrap.servers": "broker:29092",
"group.id": "issue_64205_group",
"security.protocol": "PLAINTEXT",
"enable.auto.commit": false,
"auto.offset.reset": "latest"
}
}'
"""
"""Kafka Command to verify messages are being produced to the topic:
# Create Topic
/bin/kafka-topics --bootstrap-server broker:29092 --create --topic fizz_buzz
/bin/kafka-topics --bootstrap-server broker:29092 --create --topic dlq
# Get offsets for the topic to verify messages are being produced
/bin/kafka-get-offsets --bootstrap-server broker:29092 --topic fizz_buzz
/bin/kafka-get-offsets --bootstrap-server broker:29092 --topic dlq
# List consumer groups to verify our consumer group is being registered
/bin/kafka-consumer-groups --bootstrap-server broker:29092 --list
# Get current offsets for the consumer group to verify messages are being consumed
/bin/kafka-consumer-groups --bootstrap-server broker:29092 --describe --group group_1
"""
SAMPLE_ORDERS = [
{"order_id": "ORD-1001", "customer": "alice", "item": "widget", "quantity": 2, "price": 9.99},
{"order_id": "ORD-1002", "customer": "bob", "item": "gadget", "quantity": 1, "price": 24.50},
{"order_id": "ORD-1003", "customer": "carol", "item": "widget", "quantity": 5, "price": 9.99},
{"order_id": "ORD-1004", "customer": "dave", "item": "doohickey", "quantity": 3, "price": 14.75},
{"order_id": "ORD-1005", "customer": "eve", "item": "thingamajig", "quantity": 1, "price": 39.00},
{"order_id": "ORD-1006", "customer": "frank", "item": "widget", "quantity": 10, "price": 9.99},
{"order_id": "ORD-1007", "customer": "grace", "item": "gadget", "quantity": 2, "price": 24.50},
{"order_id": "ORD-1008", "customer": "heidi", "item": "doohickey", "quantity": 1, "price": 14.75},
]
def producer_function():
for order in SAMPLE_ORDERS:
yield (json.dumps(order["order_id"]), json.dumps(order))
# produce a malformed message to demonstrate error handling
yield ("malformed_message", "malformed_message")
def process_one_message(message: str):
order = json.loads(message)
total = order["quantity"] * order["price"]
print(f"Order {order['order_id']}: {order['quantity']}x {order['item']} = ${total:.2f}")
return order
def handle_dlq():
context: Context = get_current_context()
triggering_asset_events: TriggeringAssetEventsAccessor = context["triggering_asset_events"]
for event in triggering_asset_events[kafka_cdc_asset]:
print(f"Handling failed message from event: {event}")
value = json.dumps({
"asset": event.asset.model_dump(mode="json"),
"extra": event.extra,
})
yield (json.dumps(event.asset.uri), value)
# Airflow 3 example
# Define a trigger that listens to an external message queue (Apache Kafka in this case)
trigger = MessageQueueTrigger(
scheme="kafka",
# the rest of the parameters are used by the trigger
kafka_config_id=KAFKA_CONFIG_ID,
topics=TOPICS,
poll_interval=1,
poll_timeout=1,
commit_offset=True,
)
# Define an asset that watches for messages on the queue
kafka_cdc_asset = Asset("kafka_cdc_asset", watchers=[AssetWatcher(name="kafka_cdc", trigger=trigger)])
@dag(
schedule=[kafka_cdc_asset],
tags=["event-driven"],
)
def issue_64205_consumer_patched():
@task(retries=RETRY_COUNT, retry_delay=1)
def process_message(**context) -> bool:
# Extract the triggering asset events from the context
triggering_asset_events: TriggeringAssetEventsAccessor = context["triggering_asset_events"]
for event in triggering_asset_events[kafka_cdc_asset]:
# Get the message from the TriggerEvent payload
print(f"Asset event: {event}")
process_one_message(event.extra["payload"])
return True
@task.short_circuit(trigger_rule="all_done")
def should_handle_dlq(**context) -> bool:
"""Skip DLQ handling if processing succeeded."""
# If process_message succeeded, it pushed True to XCom.
# If it failed (exception after retries), no XCom was pushed -> None.
upstream_result = context["ti"].xcom_pull(task_ids="process_message")
return upstream_result is None
result = process_message()
dlq_check = should_handle_dlq()
handle_dlq_task = ProduceToTopicOperator(
kafka_config_id=KAFKA_CONFIG_ID,
task_id="handle_dlq",
topic=DLQ_TOPIC,
producer_function=handle_dlq,
)
result >> dlq_check >> handle_dlq_task
issue_64205_consumer_patched()
@dag(
description="Load Data to fizz_buzz topic",
start_date=pendulum.datetime(2022, 11, 1),
schedule=None,
catchup=False,
tags=["event-driven"],
)
def issue_64205_producer_patched():
ProduceToTopicOperator(
kafka_config_id=KAFKA_CONFIG_ID,
task_id="produce_to_topic",
topic=TOPICS[0],
producer_function=producer_function,
)
issue_64205_producer_patched()Start Airflow with Breeze and create Kafka Topic before Airflow Cluster start
breeze start-airflow --backend postgres --use-airflow-version apache/airflow:v3-2-test --answer n --integration kafka --mount-sources providers-and-tests --terminal-multiplexer mprocs --db-resetRun the following command in broker container before Airflow Cluster start
/bin/kafka-topics --bootstrap-server broker:29092 --create --topic fizz_buzz
/bin/kafka-topics --bootstrap-server broker:29092 --create --topic dlqThen unpause those two Dags.
Logs from Dag-Processor
2026-04-02T03:34:27.353773Z [info ] Process exited [supervisor] exit_code=0 loc=supervisor.py:740 pid=2024 signal_sent=SIGTERM
2026-04-02T03:34:27.358180Z [info ] Waiting up to 5 seconds for processes to exit... [airflow.utils.process_utils] loc=process_utils.py:308
Traceback (most recent call last):
File "/usr/python/lib/python3.10/site-packages/airflow/models/trigger.py", line 172, in _decrypt_kwargs
result = deserialize(decrypted_kwargs)
File "/usr/python/lib/python3.10/site-packages/airflow/sdk/serde/__init__.py", line 240, in deserialize
return {str(k): deserialize(v, full) for k, v in o.items()}
File "/usr/python/lib/python3.10/site-packages/airflow/sdk/serde/__init__.py", line 240, in <dictcomp>
return {str(k): deserialize(v, full) for k, v in o.items()}
File "/usr/python/lib/python3.10/site-packages/airflow/sdk/serde/__init__.py", line 264, in deserialize
raise ImportError(
ImportError: tuple was not found in allow list for deserialization imports. To allow it, add it to allowed_deserialization_classes in the configuration
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/python/bin/airflow", line 10, in <module>
sys.exit(main())
File "/usr/python/lib/python3.10/site-packages/airflow/__main__.py", line 55, in main
args.func(args)
File "/usr/python/lib/python3.10/site-packages/airflow/cli/cli_config.py", line 49, in command
return func(*args, **kwargs)
File "/usr/python/lib/python3.10/site-packages/airflow/utils/memray_utils.py", line 59, in wrapper
return func(*args, **kwargs)
File "/usr/python/lib/python3.10/site-packages/airflow/utils/cli.py", line 113, in wrapper
return f(*args, **kwargs)
File "/usr/python/lib/python3.10/site-packages/airflow/utils/providers_configuration_loader.py", line 54, in wrapped_function
return func(*args, **kwargs)
File "/usr/python/lib/python3.10/site-packages/airflow/cli/commands/dag_processor_command.py", line 64, in dag_processor
run_command_with_daemon_option(
File "/usr/python/lib/python3.10/site-packages/airflow/cli/commands/daemon_utils.py", line 86, in run_command_with_daemon_option
callback()
File "/usr/python/lib/python3.10/site-packages/airflow/cli/commands/dag_processor_command.py", line 67, in <lambda>
callback=lambda: run_job(job=job_runner.job, execute_callable=job_runner._execute),
File "/usr/python/lib/python3.10/site-packages/airflow/utils/session.py", line 100, in wrapper
return func(*args, session=session, **kwargs) # type: ignore[arg-type]
File "/usr/python/lib/python3.10/site-packages/airflow/jobs/job.py", line 355, in run_job
return execute_job(job, execute_callable=execute_callable)
File "/usr/python/lib/python3.10/site-packages/airflow/jobs/job.py", line 384, in execute_job
ret = execute_callable()
File "/usr/python/lib/python3.10/site-packages/airflow/jobs/dag_processor_job_runner.py", line 61, in _execute
self.processor.run()
File "/usr/python/lib/python3.10/site-packages/airflow/dag_processing/manager.py", line 334, in run
return self._run_parsing_loop()
File "/usr/python/lib/python3.10/site-packages/airflow/dag_processing/manager.py", line 441, in _run_parsing_loop
self._collect_results()
File "/usr/python/lib/python3.10/site-packages/airflow/utils/session.py", line 100, in wrapper
return func(*args, session=session, **kwargs) # type: ignore[arg-type]
File "/usr/python/lib/python3.10/site-packages/airflow/dag_processing/manager.py", line 948, in _collect_results
self._file_stats[file] = process_parse_results(
File "/usr/python/lib/python3.10/site-packages/airflow/dag_processing/manager.py", line 1347, in process_parse_results
update_dag_parsing_results_in_db(
File "/usr/python/lib/python3.10/site-packages/airflow/dag_processing/collection.py", line 463, in update_dag_parsing_results_in_db
for attempt in run_with_db_retries(logger=log):
File "/usr/python/lib/python3.10/site-packages/tenacity/__init__.py", line 438, in __iter__
do = self.iter(retry_state=retry_state)
File "/usr/python/lib/python3.10/site-packages/tenacity/__init__.py", line 371, in iter
result = action(retry_state)
File "/usr/python/lib/python3.10/site-packages/tenacity/__init__.py", line 393, in <lambda>
self._add_action_func(lambda rs: rs.outcome.result())
File "/usr/python/lib/python3.10/concurrent/futures/_base.py", line 451, in result
return self.__get_result()
File "/usr/python/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
raise self._exception
File "/usr/python/lib/python3.10/site-packages/airflow/dag_processing/collection.py", line 473, in update_dag_parsing_results_in_db
SerializedDAG.bulk_write_to_db(
File "/usr/python/lib/python3.10/site-packages/airflow/utils/session.py", line 98, in wrapper
return func(*args, **kwargs)
File "/usr/python/lib/python3.10/site-packages/airflow/serialization/definitions/dag.py", line 221, in bulk_write_to_db
asset_op.add_asset_trigger_references(orm_assets, session=session)
File "/usr/python/lib/python3.10/site-packages/airflow/dag_processing/collection.py", line 1082, in add_asset_trigger_references
orm_triggers.update(
File "/usr/python/lib/python3.10/site-packages/airflow/dag_processing/collection.py", line 1083, in <genexpr>
(BaseEventTrigger.hash(trigger.classpath, trigger.kwargs), trigger)
File "/usr/python/lib/python3.10/site-packages/airflow/models/trigger.py", line 135, in kwargs
return self._decrypt_kwargs(self.encrypted_kwargs)
File "/usr/python/lib/python3.10/site-packages/airflow/models/trigger.py", line 180, in _decrypt_kwargs
return BaseSerialization.deserialize(decrypted_kwargs)
File "/usr/python/lib/python3.10/site-packages/airflow/serialization/serialized_objects.py", line 629, in deserialize
var = encoded_var[Encoding.VAR]
KeyError: <Encoding.VAR: '__var'>What you think should happen instead?
No response
Operating System
Breeze
Versions of Apache Airflow Providers
No response
Deployment
None
Deployment details
No response
Anything else?
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
affected_version:3.2Use for reporting issues with 3.2Use for reporting issues with 3.2area:corearea:serializationkind:bugThis is a clearly a bugThis is a clearly a bugpriority:highHigh priority bug that should be patched quickly but does not require immediate new releaseHigh priority bug that should be patched quickly but does not require immediate new release