Este post muestra cómo envolver una API HTTP real (CoinGecko) como un source nativo de Spark Structured Streaming usando la Python Data Source API.
Si quieres correrlo local, puedes usar el stack de Spark/Jupyter del sitio: tool de Apache Spark.
Al final del artículo dejo los links del notebook .ipynb y script .py.
Qué se está resolviendo aquí
Cuando no hay conector oficial para un sistema externo, un patrón común es:
- llamar
requests.get(...)cada cierto tiempo, - guardar JSON en disco,
- volver a leer esos archivos con Spark.
Ese patrón sirve para pruebas, pero no es un source de streaming real por dos motivos:
- Spark necesita una noción de progreso (
offset) para checkpoint y recuperación consistente. - Spark necesita que la lectura se haga por rangos (
start -> end) en cada micro-batch.
La API oficial que define este contrato en PySpark es:
DataSource: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.datasource.DataSource.htmlSimpleDataSourceStreamReader: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.datasource.SimpleDataSourceStreamReader.html- Tutorial oficial: https://spark.apache.org/docs/latest/api/python/tutorial/sql/python_data_source.html
API que vamos a leer
Usamos el endpoint público de precio simple de CoinGecko:
https://api.coingecko.com/api/v3/simple/price?ids=bitcoin,ethereum&vs_currencies=usd
Referencia de API:
- Docs generales: https://docs.coingecko.com/
- Endpoint simple price: https://docs.coingecko.com/reference/simple-price
Ejemplo de respuesta HTTP esperada:
| |
Prerrequisitos mínimos
- Spark / PySpark 3.5.1.
- Acceso a internet desde el entorno donde corre Spark.
- Biblioteca
requestsinstalada.
Flujo completo que vamos a implementar
- Definir un schema fijo de salida.
- Implementar un reader streaming con
initialOffset,latestOffsetyread. - Registrar el provider para habilitar
format("coingecko"). - Ejecutar
readStreamcon trigger explícito. - Validar resultados en consola.
- Cerrar con un ejemplo más realista escribiendo a Delta con checkpoint.
Paso 1: entender schema y offset
En este contexto:
schemaes el contrato de columnas y tipos que Spark espera siempre.offsetes la posición de progreso del stream.
Si el schema cambia en caliente (por ejemplo un campo deja de ser número y pasa a texto), empiezan errores o nulls inesperados. Si no hay un offset consistente, Spark no puede saber qué ya procesó y qué falta.
Referencia oficial de modelo de streaming:
- Structured Streaming overview: https://spark.apache.org/docs/latest/streaming/index.html
Paso 2: definir el schema de salida con detalle
Declaramos tres columnas:
coin: identificador de moneda (bitcoin,ethereum, etc.).usd_price: precio en USD comodouble.ts_ingest: timestamp UNIX (segundos) del momento en que Spark hizo la llamada.
Para alguien que viene de cero: este schema es como la plantilla fija del DataFrame. Cada fila que devuelva el reader debe respetar estos nombres y tipos.
| |
Paso 3: implementar el reader de streaming
Un reader mínimo implementa:
initialOffset(): offset inicial cuando todavía no hay checkpoint.latestOffset(): offset más nuevo disponible al momento del trigger.read(start, end): filas pertenecientes al rango de ese micro-batch.
En este ejemplo el offset será tiempo UNIX por moneda. No es el único enfoque posible, pero para un source HTTP simple es fácil de entender.
Código:
| |
Qué esperar de read:
- Si CoinGecko responde bien, devuelve una fila por moneda.
- Si una moneda no viene en el payload,
usd_pricequeda ennull. - Si falla HTTP (4xx/5xx),
raise_for_status()levanta error y el batch falla.
Paso 4: registrar el provider
Este paso no es “solo para notebook”. En script también debes registrar el provider, salvo que hayas empaquetado tu conector y Spark ya lo descubra por classpath.
Para este caso (código Python inline), se registra en la sesión:
| |
Paso 5: leer como stream y configurar trigger
| |
Ahora arrancamos la query. Con HTTP conviene trigger explícito para no llamar el endpoint sin control:
- Doc de trigger: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.streaming.DataStreamWriter.trigger.html
| |
Paso 6: validar ejecución y detener correctamente
Consulta estado:
| |
Ejemplo de salida en consola (micro-batch):
| |
Ejemplo de query.lastProgress resumido:
| |
Detener de forma limpia:
| |
Si reinicias la query con checkpoint configurado en un sink durable, Spark retoma progreso desde ese estado.
Ejemplo más real al cierre: escribir en Delta con checkpoint
Este patrón ya se acerca a un job de producción:
| |
Qué ganas con esto:
- persistencia durable en Delta,
- recuperación tras reinicio con checkpoint,
- separación clara entre source custom y sink productivo.
Referencia oficial de Delta en Spark:
Operación y buenas prácticas
- CoinGecko tiene límites; evita triggers muy cortos.
- Añade retry con backoff para errores transitorios HTTP.
- Instrumenta
query.lastProgressy logs del driver. - Mantén schema estable; cambios de tipo deben versionarse.
- Si quieres más contexto de la API Python Data Source, Databricks tiene este post: https://www.databricks.com/blog/announcing-general-availability-python-data-source-api
Errores frecuentes y solución rápida
format("coingecko") not foundNo registraste el provider en esta sesión (spark.dataSource.register(...)).429 Too Many RequestsTrigger muy agresivo o límite temporal del API; aumenta intervalo y agrega retry.Stream cae por error HTTP Agrega
try/exceptcon logging, o maneja respuesta degradada para no tumbar la query.No salen filas Revisa
query.isActive,query.statusy conectividad del runtime.
