This post is a step‑by‑step, notebook‑friendly guide to building a Spark streaming source using the Data Source API in PySpark 3.5.1. We will read a public API (CoinGecko) and expose it as format("coingecko").
Downloads at the end: go to Downloads.
What you will build#
By the end, this works inside a notebook:
1
2
3
4
| (df = spark.readStream
.format("coingecko")
.option("coins", "bitcoin,ethereum")
.load())
|
Step 0 — Why not just use requests in a loop?#
You can poll an API and write files, but that is not a streaming source. Spark expects a reader that knows schema + offsets. The Data Source API gives you that.
Step 1 — Define the output schema#
We declare the structure once so Spark can plan the stream.
1
2
3
4
5
6
7
| from pyspark.sql.types import StructType, StructField, StringType, DoubleType, LongType
schema = StructType([
StructField("coin", StringType(), True),
StructField("usd_price", DoubleType(), True),
StructField("ts_ingest", LongType(), True),
])
|
Step 2 — Implement the streaming reader#
A streaming reader needs 3 methods:
initialOffset() — where the stream startslatestOffset() — the newest offset availableread(start, end) — return rows between offsets
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
| import time
import requests
from pyspark.sql.datasource import SimpleDataSourceStreamReader
class CoinGeckoStreamReader(SimpleDataSourceStreamReader):
def __init__(self, options):
self.options = options
self.coins = options.get("coins", "bitcoin").split(",")
def initialOffset(self):
now = int(time.time())
return {c: now for c in self.coins}
def latestOffset(self):
now = int(time.time())
return {c: now for c in self.coins}
def read(self, start, end):
coins_csv = ",".join(self.coins)
url = (
"https://api.coingecko.com/api/v3/simple/price"
f"?ids={coins_csv}&vs_currencies=usd"
)
payload = requests.get(url, timeout=10).json()
now = int(time.time())
rows = []
for coin in self.coins:
price = payload.get(coin, {}).get("usd")
rows.append({"coin": coin, "usd_price": float(price), "ts_ingest": now})
return rows
|
Step 3 — Register the provider (PySpark 3.5.1)#
In a notebook, you must register the provider before format("coingecko") works.
1
2
3
4
5
6
7
8
9
10
11
12
| from pyspark.sql.datasource import DataSource
class CoinGeckoDataSource(DataSource):
name = "coingecko"
def schema(self):
return schema
def reader(self, schema):
return CoinGeckoStreamReader(self.options)
spark.dataSource.register(CoinGeckoDataSource)
|
Step 4 — Read as a stream#
Now it behaves like any Spark source.
1
2
3
4
| df = (spark.readStream
.format("coingecko")
.option("coins", "bitcoin,ethereum")
.load())
|
Step 5 — Validate the stream#
Console output is enough to verify it works.
1
2
3
4
| (df.writeStream
.format("console")
.outputMode("append")
.start())
|
Expected output (example):
1
2
3
4
5
6
| +--------+---------+----------+
|coin |usd_price|ts_ingest |
+--------+---------+----------+
|bitcoin | 43125.0 |1707070000|
|ethereum| 2310.2 |1707070000|
+--------+---------+----------+
|
Notes you should keep in mind#
- CoinGecko is public and rate‑limited. Don’t call it too frequently.
- The Data Source API is still marked experimental in Spark 3.5.x.
- If you see signature differences, check the official DataSource docs.
Downloads#