Skip to content

Deserialization error for External-Event Driven Dag #64613

@jason810496

Description

@jason810496

Apache Airflow version

Latest 3-2-test branch, commit hash: e64b7e4

What happened and how to reproduce it?

Set the following env in files/airflow-breeze-config/environment_variables.env

AIRFLOW_CONN_ISSUE_64205_KAFKA_CONFIG='{
  "conn_type": "general",
  "extra": {
    "bootstrap.servers": "broker:29092",
    "group.id": "issue_64205_group",
    "security.protocol": "PLAINTEXT",
    "enable.auto.commit": false,
    "auto.offset.reset": "latest"
  }
}'

Dag:

from __future__ import annotations

import json
from typing import TYPE_CHECKING

import pendulum

from airflow.providers.apache.kafka.operators.produce import ProduceToTopicOperator
from airflow.providers.common.messaging.triggers.msg_queue import MessageQueueTrigger
from airflow.sdk import Asset, AssetWatcher, dag, get_current_context, task

if TYPE_CHECKING:
    from airflow.sdk.execution_time.context import Context, TriggeringAssetEventsAccessor

KAFKA_CONFIG_ID = "issue_64205_kafka_config"
TOPICS = ["fizz_buzz"]
DLQ_TOPIC = "dlq"
RETRY_COUNT = 3
"""Airflow Kafka connection
AIRFLOW_CONN_ISSUE_64205_KAFKA_CONFIG='{
  "conn_type": "general",
  "extra": {
    "bootstrap.servers": "broker:29092",
    "group.id": "issue_64205_group",
    "security.protocol": "PLAINTEXT",
    "enable.auto.commit": false,
    "auto.offset.reset": "latest"
  }
}'
"""
"""Kafka Command to verify messages are being produced to the topic:

# Create Topic
/bin/kafka-topics --bootstrap-server broker:29092 --create --topic fizz_buzz
/bin/kafka-topics --bootstrap-server broker:29092 --create --topic dlq


# Get offsets for the topic to verify messages are being produced
/bin/kafka-get-offsets --bootstrap-server broker:29092 --topic fizz_buzz
/bin/kafka-get-offsets --bootstrap-server broker:29092 --topic dlq

# List consumer groups to verify our consumer group is being registered
/bin/kafka-consumer-groups --bootstrap-server broker:29092 --list

# Get current offsets for the consumer group to verify messages are being consumed
/bin/kafka-consumer-groups --bootstrap-server broker:29092 --describe --group group_1
"""

SAMPLE_ORDERS = [
    {"order_id": "ORD-1001", "customer": "alice", "item": "widget", "quantity": 2, "price": 9.99},
    {"order_id": "ORD-1002", "customer": "bob", "item": "gadget", "quantity": 1, "price": 24.50},
    {"order_id": "ORD-1003", "customer": "carol", "item": "widget", "quantity": 5, "price": 9.99},
    {"order_id": "ORD-1004", "customer": "dave", "item": "doohickey", "quantity": 3, "price": 14.75},
    {"order_id": "ORD-1005", "customer": "eve", "item": "thingamajig", "quantity": 1, "price": 39.00},
    {"order_id": "ORD-1006", "customer": "frank", "item": "widget", "quantity": 10, "price": 9.99},
    {"order_id": "ORD-1007", "customer": "grace", "item": "gadget", "quantity": 2, "price": 24.50},
    {"order_id": "ORD-1008", "customer": "heidi", "item": "doohickey", "quantity": 1, "price": 14.75},
]


def producer_function():
    for order in SAMPLE_ORDERS:
        yield (json.dumps(order["order_id"]), json.dumps(order))
    # produce a malformed message to demonstrate error handling
    yield ("malformed_message", "malformed_message")


def process_one_message(message: str):
    order = json.loads(message)
    total = order["quantity"] * order["price"]
    print(f"Order {order['order_id']}: {order['quantity']}x {order['item']} = ${total:.2f}")
    return order

def handle_dlq():
    context: Context = get_current_context()
    triggering_asset_events: TriggeringAssetEventsAccessor = context["triggering_asset_events"]
    for event in triggering_asset_events[kafka_cdc_asset]:
        print(f"Handling failed message from event: {event}")
        value = json.dumps({
            "asset": event.asset.model_dump(mode="json"),
            "extra": event.extra,
        })
        yield (json.dumps(event.asset.uri), value)


# Airflow 3 example
# Define a trigger that listens to an external message queue (Apache Kafka in this case)
trigger = MessageQueueTrigger(
    scheme="kafka",
    # the rest of the parameters are used by the trigger
    kafka_config_id=KAFKA_CONFIG_ID,
    topics=TOPICS,
    poll_interval=1,
    poll_timeout=1,
    commit_offset=True,
)

# Define an asset that watches for messages on the queue
kafka_cdc_asset = Asset("kafka_cdc_asset", watchers=[AssetWatcher(name="kafka_cdc", trigger=trigger)])


@dag(
    schedule=[kafka_cdc_asset],
    tags=["event-driven"],
)
def issue_64205_consumer_patched():

    @task(retries=RETRY_COUNT, retry_delay=1)
    def process_message(**context) -> bool:
        # Extract the triggering asset events from the context
        triggering_asset_events: TriggeringAssetEventsAccessor = context["triggering_asset_events"]
        for event in triggering_asset_events[kafka_cdc_asset]:
            # Get the message from the TriggerEvent payload
            print(f"Asset event: {event}")
            process_one_message(event.extra["payload"])
        return True

    @task.short_circuit(trigger_rule="all_done")
    def should_handle_dlq(**context) -> bool:
        """Skip DLQ handling if processing succeeded."""
        # If process_message succeeded, it pushed True to XCom.
        # If it failed (exception after retries), no XCom was pushed -> None.
        upstream_result = context["ti"].xcom_pull(task_ids="process_message")
        return upstream_result is None

    result = process_message()
    dlq_check = should_handle_dlq()
    handle_dlq_task = ProduceToTopicOperator(
        kafka_config_id=KAFKA_CONFIG_ID,
        task_id="handle_dlq",
        topic=DLQ_TOPIC,
        producer_function=handle_dlq,
    )

    result >> dlq_check >> handle_dlq_task
issue_64205_consumer_patched()

@dag(
    description="Load Data to fizz_buzz topic",
    start_date=pendulum.datetime(2022, 11, 1),
    schedule=None,
    catchup=False,
    tags=["event-driven"],
)
def issue_64205_producer_patched():
    ProduceToTopicOperator(
        kafka_config_id=KAFKA_CONFIG_ID,
        task_id="produce_to_topic",
        topic=TOPICS[0],
        producer_function=producer_function,
    )
issue_64205_producer_patched()

Start Airflow with Breeze and create Kafka Topic before Airflow Cluster start

breeze start-airflow --backend postgres --use-airflow-version apache/airflow:v3-2-test  --answer n --integration kafka --mount-sources providers-and-tests --terminal-multiplexer mprocs --db-reset

Run the following command in broker container before Airflow Cluster start

/bin/kafka-topics --bootstrap-server broker:29092 --create --topic fizz_buzz
/bin/kafka-topics --bootstrap-server broker:29092 --create --topic dlq

Then unpause those two Dags.

Logs from Dag-Processor

2026-04-02T03:34:27.353773Z [info     ] Process exited                 [supervisor] exit_code=0 loc=supervisor.py:740 pid=2024 signal_sent=SIGTERM
2026-04-02T03:34:27.358180Z [info     ] Waiting up to 5 seconds for processes to exit... [airflow.utils.process_utils] loc=process_utils.py:308
Traceback (most recent call last):
  File "/usr/python/lib/python3.10/site-packages/airflow/models/trigger.py", line 172, in _decrypt_kwargs
    result = deserialize(decrypted_kwargs)
  File "/usr/python/lib/python3.10/site-packages/airflow/sdk/serde/__init__.py", line 240, in deserialize
    return {str(k): deserialize(v, full) for k, v in o.items()}
  File "/usr/python/lib/python3.10/site-packages/airflow/sdk/serde/__init__.py", line 240, in <dictcomp>
    return {str(k): deserialize(v, full) for k, v in o.items()}
  File "/usr/python/lib/python3.10/site-packages/airflow/sdk/serde/__init__.py", line 264, in deserialize
    raise ImportError(
ImportError: tuple was not found in allow list for deserialization imports. To allow it, add it to allowed_deserialization_classes in the configuration

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/python/bin/airflow", line 10, in <module>
    sys.exit(main())
  File "/usr/python/lib/python3.10/site-packages/airflow/__main__.py", line 55, in main
    args.func(args)
  File "/usr/python/lib/python3.10/site-packages/airflow/cli/cli_config.py", line 49, in command
    return func(*args, **kwargs)
  File "/usr/python/lib/python3.10/site-packages/airflow/utils/memray_utils.py", line 59, in wrapper
    return func(*args, **kwargs)
  File "/usr/python/lib/python3.10/site-packages/airflow/utils/cli.py", line 113, in wrapper
    return f(*args, **kwargs)
  File "/usr/python/lib/python3.10/site-packages/airflow/utils/providers_configuration_loader.py", line 54, in wrapped_function
    return func(*args, **kwargs)
  File "/usr/python/lib/python3.10/site-packages/airflow/cli/commands/dag_processor_command.py", line 64, in dag_processor
    run_command_with_daemon_option(
  File "/usr/python/lib/python3.10/site-packages/airflow/cli/commands/daemon_utils.py", line 86, in run_command_with_daemon_option
    callback()
  File "/usr/python/lib/python3.10/site-packages/airflow/cli/commands/dag_processor_command.py", line 67, in <lambda>
    callback=lambda: run_job(job=job_runner.job, execute_callable=job_runner._execute),
  File "/usr/python/lib/python3.10/site-packages/airflow/utils/session.py", line 100, in wrapper
    return func(*args, session=session, **kwargs)  # type: ignore[arg-type]
  File "/usr/python/lib/python3.10/site-packages/airflow/jobs/job.py", line 355, in run_job
    return execute_job(job, execute_callable=execute_callable)
  File "/usr/python/lib/python3.10/site-packages/airflow/jobs/job.py", line 384, in execute_job
    ret = execute_callable()
  File "/usr/python/lib/python3.10/site-packages/airflow/jobs/dag_processor_job_runner.py", line 61, in _execute
    self.processor.run()
  File "/usr/python/lib/python3.10/site-packages/airflow/dag_processing/manager.py", line 334, in run
    return self._run_parsing_loop()
  File "/usr/python/lib/python3.10/site-packages/airflow/dag_processing/manager.py", line 441, in _run_parsing_loop
    self._collect_results()
  File "/usr/python/lib/python3.10/site-packages/airflow/utils/session.py", line 100, in wrapper
    return func(*args, session=session, **kwargs)  # type: ignore[arg-type]
  File "/usr/python/lib/python3.10/site-packages/airflow/dag_processing/manager.py", line 948, in _collect_results
    self._file_stats[file] = process_parse_results(
  File "/usr/python/lib/python3.10/site-packages/airflow/dag_processing/manager.py", line 1347, in process_parse_results
    update_dag_parsing_results_in_db(
  File "/usr/python/lib/python3.10/site-packages/airflow/dag_processing/collection.py", line 463, in update_dag_parsing_results_in_db
    for attempt in run_with_db_retries(logger=log):
  File "/usr/python/lib/python3.10/site-packages/tenacity/__init__.py", line 438, in __iter__
    do = self.iter(retry_state=retry_state)
  File "/usr/python/lib/python3.10/site-packages/tenacity/__init__.py", line 371, in iter
    result = action(retry_state)
  File "/usr/python/lib/python3.10/site-packages/tenacity/__init__.py", line 393, in <lambda>
    self._add_action_func(lambda rs: rs.outcome.result())
  File "/usr/python/lib/python3.10/concurrent/futures/_base.py", line 451, in result
    return self.__get_result()
  File "/usr/python/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/usr/python/lib/python3.10/site-packages/airflow/dag_processing/collection.py", line 473, in update_dag_parsing_results_in_db
    SerializedDAG.bulk_write_to_db(
  File "/usr/python/lib/python3.10/site-packages/airflow/utils/session.py", line 98, in wrapper
    return func(*args, **kwargs)
  File "/usr/python/lib/python3.10/site-packages/airflow/serialization/definitions/dag.py", line 221, in bulk_write_to_db
    asset_op.add_asset_trigger_references(orm_assets, session=session)
  File "/usr/python/lib/python3.10/site-packages/airflow/dag_processing/collection.py", line 1082, in add_asset_trigger_references
    orm_triggers.update(
  File "/usr/python/lib/python3.10/site-packages/airflow/dag_processing/collection.py", line 1083, in <genexpr>
    (BaseEventTrigger.hash(trigger.classpath, trigger.kwargs), trigger)
  File "/usr/python/lib/python3.10/site-packages/airflow/models/trigger.py", line 135, in kwargs
    return self._decrypt_kwargs(self.encrypted_kwargs)
  File "/usr/python/lib/python3.10/site-packages/airflow/models/trigger.py", line 180, in _decrypt_kwargs
    return BaseSerialization.deserialize(decrypted_kwargs)
  File "/usr/python/lib/python3.10/site-packages/airflow/serialization/serialized_objects.py", line 629, in deserialize
    var = encoded_var[Encoding.VAR]
KeyError: <Encoding.VAR: '__var'>

What you think should happen instead?

No response

Operating System

Breeze

Versions of Apache Airflow Providers

No response

Deployment

None

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Labels

affected_version:3.2Use for reporting issues with 3.2area:corearea:serializationkind:bugThis is a clearly a bugpriority:highHigh priority bug that should be patched quickly but does not require immediate new release

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions