Este post muestra cómo construir un Data Source de streaming en PySpark. Partimos del enfoque batch y luego implementamos un reader con SimpleDataSourceStreamReader usando la Data Source API oficial.

Descargas al final: ir a Descargas.

En pocas palabras

  • Una API externa no es un source de Spark por defecto.
  • La Data Source API permite definir offsets y leer filas en streaming.
  • Resultado: spark.readStream.format("weather").load().

Ejecuta tú mismo

Usa el tool de Apache Spark de este blog. No necesitas crear venv.

1) El enfoque batch (ingenuo)

Funciona, pero no es streaming. Solo escribes un archivo y lo vuelves a leer:

1
2
3
4
5
6
7
import json, requests

url = "https://api.open-meteo.com/v1/forecast?latitude=40.71&longitude=-74.01&current=temperature_2m,relative_humidity_2m"
payload = requests.get(url).json()

with open("/home/jovyan/work/data/weather_stream/batch.json", "w") as f:
    f.write(json.dumps(payload))

2) El enfoque correcto: Data Source de streaming

Un reader de streaming debe definir schema y offsets. Eso es lo que espera la Data Source API.

2.1 Schema de salida

Define el schema explícitamente (ver la documentación de schemas) para que Spark pueda planificar el stream:

1
2
3
4
5
6
7
8
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, LongType

schema = StructType([
    StructField("station", StringType(), True),
    StructField("temperature_2m", DoubleType(), True),
    StructField("relative_humidity_2m", DoubleType(), True),
    StructField("ts_ingest", LongType(), True),
])

2.2 Reader de streaming (esqueleto funcional)

Los offsets son la clave: initialOffset y latestOffset definen el rango, y read devuelve filas entre ellos.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import ast
import requests
import json
import time
from pyspark.sql.datasource import SimpleDataSourceStreamReader

class WeatherSimpleStreamReader(SimpleDataSourceStreamReader):
    def __init__(self, options):
        self.options = options
        self.stations = ast.literal_eval(options.get("stations", "[]"))

    def initialOffset(self):
        now = int(time.time())
        return {s: now for s in self.stations}

    def latestOffset(self):
        now = int(time.time())
        return {s: now for s in self.stations}

    def read(self, start, end):
        rows = []
        for station in self.stations:
            url = (
                "https://api.open-meteo.com/v1/forecast"
                f"?latitude={station['lat']}&longitude={station['lon']}"
                "&current=temperature_2m,relative_humidity_2m"
            )
            payload = requests.get(url).json()
            rows.append({
                "station": station["id"],
                "temperature_2m": payload["current"]["temperature_2m"],
                "relative_humidity_2m": payload["current"]["relative_humidity_2m"],
                "ts_ingest": int(time.time()),
            })
        return rows

2.3 Registrar el Data Source (idea principal)

El uso deseado es:

1
2
3
4
df = (spark.readStream
      .format("weather")
      .option("stations", "[{'id':'NYC','lat':40.71,'lon':-74.01}]")
      .load())

Para que esto funcione, el reader debe estar en el classpath y registrado como provider weather. La Data Source API en Spark 3.5+ sigue marcada como experimental, valida tu versión en la documentación oficial.

3) Ejecuta el stream

Una ejecución mínima para validar el pipeline:

1
2
3
4
(df.writeStream
   .format("console")
   .outputMode("append")
   .start())

Salida esperada (ejemplo):

1
2
3
4
5
+-------+---------------+---------------------+----------+
|station|temperature_2m |relative_humidity_2m |ts_ingest |
+-------+---------------+---------------------+----------+
|NYC    | 4.2           | 71.0                |1707070000|
+-------+---------------+---------------------+----------+

Descargas