Este post enseña cómo construir un Data Source de streaming en PySpark. Primero mostramos el problema (leer una API como batch), luego creamos un reader real usando SimpleDataSourceStreamReader. Ref: PySpark Data Source API.
Descargas al final: ir a Descargas.
En pocas palabras#
- Una API externa no es un source de Spark por defecto.
- Con
SimpleDataSourceStreamReader puedes convertirla en streaming. - El resultado:
spark.readStream.format("weather").load().
Ejecuta tú mismo#
Usa el tool de Spark (Docker) de este blog. No necesitas crear venv ni instalar paquetes externos.
Links:
1) El enfoque básico (batch)#
Funciona, pero no es streaming real:
1
2
3
4
5
6
7
| import json, requests
url = "https://api.open-meteo.com/v1/forecast?latitude=40.71&longitude=-74.01¤t=temperature_2m,relative_humidity_2m"
payload = requests.get(url).json()
with open("/home/jovyan/work/data/weather_stream/batch.json", "w") as f:
f.write(json.dumps(payload))
|
2) El enfoque correcto: Data Source de streaming#
Aquí implementamos un reader que Spark puede consumir en tiempo real.
2.1 Esquema del output#
Definimos un schema mínimo que represente una lectura de clima.
1
2
3
4
5
6
7
8
| from pyspark.sql.types import StructType, StructField, StringType, DoubleType, LongType
schema = StructType([
StructField("station", StringType(), True),
StructField("temperature_2m", DoubleType(), True),
StructField("relative_humidity_2m", DoubleType(), True),
StructField("ts_ingest", LongType(), True),
])
|
2.2 Reader streaming (skeleton funcional)#
El reader traduce offsets y llamadas a la API en filas.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
| import ast
import requests
import json
import time
from pyspark.sql.datasource import SimpleDataSourceStreamReader
class WeatherSimpleStreamReader(SimpleDataSourceStreamReader):
def __init__(self, options):
self.options = options
self.stations = ast.literal_eval(options.get("stations", "[]"))
def initialOffset(self):
# offset inicial por estación
now = int(time.time())
return {s: now for s in self.stations}
def latestOffset(self):
# último offset disponible
now = int(time.time())
return {s: now for s in self.stations}
def read(self, start, end):
# obtiene datos de la API y devuelve filas
rows = []
for station in self.stations:
url = (
"https://api.open-meteo.com/v1/forecast"
f"?latitude={station['lat']}&longitude={station['lon']}"
"¤t=temperature_2m,relative_humidity_2m"
)
payload = requests.get(url).json()
rows.append({
"station": station["id"],
"temperature_2m": payload["current"]["temperature_2m"],
"relative_humidity_2m": payload["current"]["relative_humidity_2m"],
"ts_ingest": int(time.time()),
})
return rows
|
2.3 Registro del Data Source (idea principal)#
El objetivo es que puedas consumirlo así:
1
2
3
4
| df = (spark.readStream
.format("weather")
.option("stations", "[{'id':'NYC','lat':40.71,'lon':-74.01}]")
.load())
|
Para que esto funcione, el reader debe estar en el classpath y registrado como proveedor con el nombre "weather". En Spark 3.5+ la Data Source API es experimental, así que valida tu versión.
3) Ejecuta el stream#
Si el reader está registrado, el stream imprimirá filas nuevas.
1
2
3
4
5
6
7
| q = (
df.writeStream
.format("console")
.outputMode("append")
.option("truncate", False)
.start()
)
|
Salida esperada (ejemplo):
+-------+-------------+-------------------+---------+
|station|temperature_2m|relative_humidity_2m|ts_ingest|
+-------+-------------+-------------------+---------+
|NYC | 12.3 | 56.0 | 17190000|
Qué verificar#
- El stream imprime filas nuevas.
- El schema coincide con lo que esperas.
- Puedes cambiar estaciones sin romper el reader.
Notas de práctica#
- Empieza con 1 estación, luego escala.
- Separa el código de API para que sea testeable.
- Si la API falla, agrega reintentos simples.
Descargas#
Si no quieres copiar código, descarga el notebook o el .py.