Airflow Spark Connector: Failed to poll for the driver status 10 times

Obwohl die Spark-Verbindung korrekt konfiguriert ist, kann Airflow nach einem abgeschlossenen Spark-Submit den finalen Driver-Status nicht abrufen. Dieses Problem entsteht, weil Spark im Standalone-Modus zwei unterschiedliche Endpunkte bereitstellt, während der SparkSubmitOperator in Airflow nur eine einzige conn_id unterstützt. 1

Systemkonfiguration

Dieser Blogpost basiert auf folgendem Setup:

  • Airflow version 2.10.5
  • Spark Version 3.5.1, deployed in einem Master-Worker-Setup mittels Docker
  • Spark läuft im Standalone-Cluster-Modus
  • In der Airflow Connections ist die Spark Verbindung eingestellt vom Typ Spark, mit Deploy Mode cluster und Port 7077

Auch wenn der Spark JDBC Hook verwendet werden kann, um Verbindungsinformationen für einen Spark-Submit zu speichern, verwaltet oder überwacht er nicht den Lebenszyklus eines Jobs. Insbesondere kann er die Anwendung nicht remote beenden oder den finalen Driver-Status abrufen. Interessanterweise, kann im Airflow Log zwar einen Driver-Status von Typ FAILED gesehen werden, der Task wird aber dennoch als erfolgreich (grün) markiert.

Problembeschreibung

Wenn das Log-Level von Airflow auf DEBUG gesetzt ist, lässt sich beobachten, dass Airflow nach einem spark-submit versucht, den Driver-Status vom Spark-Cluster abzufragen. Dieses Polling schlägt jedoch fehl und endet mit einem Fehler.

{spark_submit.py:516} DEBUG - Poll driver status cmd: ['spark-submit', '--master', 'spark://zmi-spark-dev.test.med.tu-dresden.de:7077', '--status', 'driver-20250213113217-0020']

{spark_submit.py:664} DEBUG - spark driver status log: 25/02/13 11:32:47 WARN RestSubmissionClient: Unable to connect to server spark://zmi-spark-dev.test.med.tu-dresden.de:7077.

{spark_submit.py:664} DEBUG - spark driver status log: Exception in thread "main" org.apache.spark.deploy.rest.SubmitRestConnectionException: Unable to connect to server

Am Ende führt dies zu folgender Exception:

packages/airflow/providers/apache/spark/hooks/spark_submit.py", line 728, in _start_driver_status_tracking

    raise AirflowException(

airflow.exceptions.AirflowException: Failed to poll for the driver status 10 times: returncode = 1

Wenn der Polling-Befehl manuell ausgeführt wird, tritt derselbe Fehler auf, was bestätigt, dass die Anfrage selbst ungültig ist.

Ein Blick in den Airflow-Code

Airflow verwendet (vereinfacht dargestellt) folgenden Befehl, um den Driver-Status abzufragen, der im Spark Hook definiert ist:

spark_host = self._connection["master"]

# Via Spark binary
spark-submit ["--master", spark_host ] ["--status", self._driver_id]

# Or via REST (depending on configuration)
connection_cmd = [
                "/usr/bin/curl",
                "--max-time",
                str(curl_max_wait_time),
                f"{spark_host }/v1/submissions/status/{self._driver_id}",
            ]

Ob das Polling überhaupt ausgelöst wird, hängt von zwei Bedingungen ab:

  • Der Deploy Mode ist in der Airflow-Verbindung auf cluster gesetzt
  • Die Master-URL beginnt mit spark://

Allerdings ist es so, dass Spark im Standalone-Modus mehrere Endpunkte bereitstellt.

  • Port 7077 wird für die Master-UI und zum Einreichen von Anwendungen über das spark-submit Binary verwendet.
  • Port 6066 stellt einen REST-Endpunkt bereit und kann zusätzlich Statusabfragen über Standard-HTTPS-Requests verarbeiten.

Official Spark docs reference

Related Stack Overflow thread

Wie oben gezeigt, baut Airflow die Polling-Logik basierend auf self._connection["master"] auf. Dieses _connection-Attribut wird aus der conn_id erzeugt, die beim Instanziieren des SparkSubmitOperator übergeben wird (github).

Hier liegt das Problem: Spark im Standalone-Modus nutzt zwei getrennte Endpunkte, während der SparkSubmitOperator nur eine conn_id unterstützt. Dadurch ist es nicht möglich, sowohl Submit- als auch Polling-URL mit nur einer conn_id zu konfigurieren.

Damit das Driver-Status-Polling korrekt funktioniert, muss der Endpunkt auf Port 6066 zeigen und nicht auf 7077.

Lösung

Es gibt bereits ein offenes GitHub-Issue zu diesem Problem, und mehrere Nutzer haben es durch einen Custom Operator gelöst. Aber was bedeutet das konkret? Standardmäßig setzt Airflow den Spark-Host wie folgt:

spark_host = self._connection["master"]

(siehe this line in the Spark hook code)

Um das Problem zu umgehen, haben wir einen eigenen Operator erstellt, indem wir den SparkSubmitOperator erweitert und die relevante Methode überschrieben haben2. In unserer Version setzen wir spark_host explizit auf eine separate URL, die auf Port 6066 zeigt, jedoch nur an der Stelle, an der das Driver-Status-Polling durchgeführt wird.

Warum nicht einfach Port 6066 in der Airflow-Verbindung verwenden?

Man könnte sich fragen: Warum nicht einfach die Airflow-conn_id auf den 6066-REST-Endpunkt zeigen lassen?

Der Grund ist, dass die Funktion _build_spark_submit_command im SparkSubmitOperator den Submit-Befehl über das spark-submit Binary erstellt. Standardmäßig ist DEFAULT_SPARK_BINARY auf "spark-submit" gesetzt und dieses kann scheinbar nicht mit dem REST-Endpunkt auf Port 6066 umgehen.

Auch wenn sowohl Port 7077 als auch 6066 technisch Spark-Submits verarbeiten können (siehe Diskussion), benötigen sie unterschiedliche Handhabung:

  • 7077 wird über das klassische Spark-Binary angesprochen
  • 6066 erwartet Interaktion über die REST-API

Wenn dir dieser Beitrag geholfen hat, oder du weitere Ideen oder Ergänzungen hast, hinterlasse gerne einen Kommentar!

  1. Dieser Blog Beitrag wurde mit ChatGPT 5.3 vom Englischen ins Deutsche übersetzt ↩︎
  2. Special acknowledgment to my team lead, J.H., for identifying the correct API and finding a way to implement the subclass. ↩︎

Schreibe einen Kommentar

Your email address will not be published. Required fields are marked *

hungsblog | Nguyen Hung Manh | Dresden
Nach oben scrollen