This post shows how to build a real streaming Data Source in PySpark. We start with the naive batch pattern, then implement a reader with SimpleDataSourceStreamReader from the official Data Source API.

Downloads at the end: go to Downloads.

At a glance

  • External APIs are not Spark sources by default.
  • The Data Source API lets you define offsets and read streaming rows.
  • Result: spark.readStream.format("weather").load().

Run it yourself

Use the Apache Spark tool from this blog. No venv required.

1) The naive batch pattern

It works, but it is not streaming. You are just rewriting a file and re-reading it:

1
2
3
4
5
6
7
import json, requests

url = "https://api.open-meteo.com/v1/forecast?latitude=40.71&longitude=-74.01&current=temperature_2m,relative_humidity_2m"
payload = requests.get(url).json()

with open("/home/jovyan/work/data/weather_stream/batch.json", "w") as f:
    f.write(json.dumps(payload))

2) The correct approach: a streaming Data Source

A streaming reader must define a schema and an offset model. That is what the Data Source API expects.

2.1 Output schema

Define the schema explicitly (see the DataFrame schema docs) so Spark can plan the stream:

1
2
3
4
5
6
7
8
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, LongType

schema = StructType([
    StructField("station", StringType(), True),
    StructField("temperature_2m", DoubleType(), True),
    StructField("relative_humidity_2m", DoubleType(), True),
    StructField("ts_ingest", LongType(), True),
])

2.2 Streaming reader (functional skeleton)

Offsets are the key: initialOffset and latestOffset define the range, and read returns rows between them.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import ast
import requests
import json
import time
from pyspark.sql.datasource import SimpleDataSourceStreamReader

class WeatherSimpleStreamReader(SimpleDataSourceStreamReader):
    def __init__(self, options):
        self.options = options
        self.stations = ast.literal_eval(options.get("stations", "[]"))

    def initialOffset(self):
        now = int(time.time())
        return {s: now for s in self.stations}

    def latestOffset(self):
        now = int(time.time())
        return {s: now for s in self.stations}

    def read(self, start, end):
        rows = []
        for station in self.stations:
            url = (
                "https://api.open-meteo.com/v1/forecast"
                f"?latitude={station['lat']}&longitude={station['lon']}"
                "&current=temperature_2m,relative_humidity_2m"
            )
            payload = requests.get(url).json()
            rows.append({
                "station": station["id"],
                "temperature_2m": payload["current"]["temperature_2m"],
                "relative_humidity_2m": payload["current"]["relative_humidity_2m"],
                "ts_ingest": int(time.time()),
            })
        return rows

2.3 Register the Data Source (main idea)

You want this usage:

1
2
3
4
df = (spark.readStream
      .format("weather")
      .option("stations", "[{'id':'NYC','lat':40.71,'lon':-74.01}]")
      .load())

For this to work, the reader must be on the classpath and registered as a provider named weather. The Data Source API in Spark 3.5+ is still marked experimental, so validate your version in the official docs.

3) Run the stream

A minimal run to validate the pipeline:

1
2
3
4
(df.writeStream
   .format("console")
   .outputMode("append")
   .start())

Expected output (example):

1
2
3
4
5
+-------+---------------+---------------------+----------+
|station|temperature_2m |relative_humidity_2m |ts_ingest |
+-------+---------------+---------------------+----------+
|NYC    | 4.2           | 71.0                |1707070000|
+-------+---------------+---------------------+----------+

Downloads