Este post muestra cómo construir un Data Source de streaming en PySpark. Partimos del enfoque batch y luego implementamos un reader con SimpleDataSourceStreamReader usando la Data Source API oficial.
Descargas al final: ir a Descargas.
En pocas palabras#
- Una API externa no es un source de Spark por defecto.
- La Data Source API permite definir offsets y leer filas en streaming.
- Resultado:
spark.readStream.format("weather").load().
Ejecuta tú mismo#
Usa el tool de Apache Spark de este blog. No necesitas crear venv.
1) El enfoque batch (ingenuo)#
Funciona, pero no es streaming. Solo escribes un archivo y lo vuelves a leer:
1
2
3
4
5
6
7
| import json, requests
url = "https://api.open-meteo.com/v1/forecast?latitude=40.71&longitude=-74.01¤t=temperature_2m,relative_humidity_2m"
payload = requests.get(url).json()
with open("/home/jovyan/work/data/weather_stream/batch.json", "w") as f:
f.write(json.dumps(payload))
|
2) El enfoque correcto: Data Source de streaming#
Un reader de streaming debe definir schema y offsets. Eso es lo que espera la Data Source API.
2.1 Schema de salida#
Define el schema explícitamente (ver la documentación de schemas) para que Spark pueda planificar el stream:
1
2
3
4
5
6
7
8
| from pyspark.sql.types import StructType, StructField, StringType, DoubleType, LongType
schema = StructType([
StructField("station", StringType(), True),
StructField("temperature_2m", DoubleType(), True),
StructField("relative_humidity_2m", DoubleType(), True),
StructField("ts_ingest", LongType(), True),
])
|
2.2 Reader de streaming (esqueleto funcional)#
Los offsets son la clave: initialOffset y latestOffset definen el rango, y read devuelve filas entre ellos.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
| import ast
import requests
import json
import time
from pyspark.sql.datasource import SimpleDataSourceStreamReader
class WeatherSimpleStreamReader(SimpleDataSourceStreamReader):
def __init__(self, options):
self.options = options
self.stations = ast.literal_eval(options.get("stations", "[]"))
def initialOffset(self):
now = int(time.time())
return {s: now for s in self.stations}
def latestOffset(self):
now = int(time.time())
return {s: now for s in self.stations}
def read(self, start, end):
rows = []
for station in self.stations:
url = (
"https://api.open-meteo.com/v1/forecast"
f"?latitude={station['lat']}&longitude={station['lon']}"
"¤t=temperature_2m,relative_humidity_2m"
)
payload = requests.get(url).json()
rows.append({
"station": station["id"],
"temperature_2m": payload["current"]["temperature_2m"],
"relative_humidity_2m": payload["current"]["relative_humidity_2m"],
"ts_ingest": int(time.time()),
})
return rows
|
2.3 Registrar el Data Source (idea principal)#
El uso deseado es:
1
2
3
4
| df = (spark.readStream
.format("weather")
.option("stations", "[{'id':'NYC','lat':40.71,'lon':-74.01}]")
.load())
|
Para que esto funcione, el reader debe estar en el classpath y registrado como provider weather. La Data Source API en Spark 3.5+ sigue marcada como experimental, valida tu versión en la documentación oficial.
3) Ejecuta el stream#
Una ejecución mínima para validar el pipeline:
1
2
3
4
| (df.writeStream
.format("console")
.outputMode("append")
.start())
|
Salida esperada (ejemplo):
1
2
3
4
5
| +-------+---------------+---------------------+----------+
|station|temperature_2m |relative_humidity_2m |ts_ingest |
+-------+---------------+---------------------+----------+
|NYC | 4.2 | 71.0 |1707070000|
+-------+---------------+---------------------+----------+
|
Descargas#