This post is a step‑by‑step, notebook‑friendly guide to building a Spark streaming source using the Data Source API in PySpark 3.5.1. We will read a public API (CoinGecko) and expose it as format("coingecko").

Downloads at the end: go to Downloads.

What you will build

By the end, this works inside a notebook:

1
2
3
4
(df = spark.readStream
       .format("coingecko")
       .option("coins", "bitcoin,ethereum")
       .load())

Step 0 — Why not just use requests in a loop?

You can poll an API and write files, but that is not a streaming source. Spark expects a reader that knows schema + offsets. The Data Source API gives you that.

Step 1 — Define the output schema

We declare the structure once so Spark can plan the stream.

1
2
3
4
5
6
7
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, LongType

schema = StructType([
    StructField("coin", StringType(), True),
    StructField("usd_price", DoubleType(), True),
    StructField("ts_ingest", LongType(), True),
])

Step 2 — Implement the streaming reader

A streaming reader needs 3 methods:

  • initialOffset() — where the stream starts
  • latestOffset() — the newest offset available
  • read(start, end) — return rows between offsets
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import time
import requests
from pyspark.sql.datasource import SimpleDataSourceStreamReader

class CoinGeckoStreamReader(SimpleDataSourceStreamReader):
    def __init__(self, options):
        self.options = options
        self.coins = options.get("coins", "bitcoin").split(",")

    def initialOffset(self):
        now = int(time.time())
        return {c: now for c in self.coins}

    def latestOffset(self):
        now = int(time.time())
        return {c: now for c in self.coins}

    def read(self, start, end):
        coins_csv = ",".join(self.coins)
        url = (
            "https://api.coingecko.com/api/v3/simple/price"
            f"?ids={coins_csv}&vs_currencies=usd"
        )
        payload = requests.get(url, timeout=10).json()
        now = int(time.time())
        rows = []
        for coin in self.coins:
            price = payload.get(coin, {}).get("usd")
            rows.append({"coin": coin, "usd_price": float(price), "ts_ingest": now})
        return rows

Step 3 — Register the provider (PySpark 3.5.1)

In a notebook, you must register the provider before format("coingecko") works.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
from pyspark.sql.datasource import DataSource

class CoinGeckoDataSource(DataSource):
    name = "coingecko"

    def schema(self):
        return schema

    def reader(self, schema):
        return CoinGeckoStreamReader(self.options)

spark.dataSource.register(CoinGeckoDataSource)

Step 4 — Read as a stream

Now it behaves like any Spark source.

1
2
3
4
df = (spark.readStream
      .format("coingecko")
      .option("coins", "bitcoin,ethereum")
      .load())

Step 5 — Validate the stream

Console output is enough to verify it works.

1
2
3
4
(df.writeStream
   .format("console")
   .outputMode("append")
   .start())

Expected output (example):

1
2
3
4
5
6
+--------+---------+----------+
|coin    |usd_price|ts_ingest |
+--------+---------+----------+
|bitcoin | 43125.0 |1707070000|
|ethereum| 2310.2  |1707070000|
+--------+---------+----------+

Notes you should keep in mind

  • CoinGecko is public and rate‑limited. Don’t call it too frequently.
  • The Data Source API is still marked experimental in Spark 3.5.x.
  • If you see signature differences, check the official DataSource docs.

Downloads