Bonus Drop #68 (2024-11-17): Drinking From A More Civilized Firehose
hrbrmstr
Collecting Starter Packs With Jetstream, Red Panda, And DuckDB
Folks have been rolling in to Bluesky fairly regularly ever since the Brazil/X kerfuffle, and especially since X’s daft new “block” policy and the aftermath of the disastrous 2024 U.S. POTUS election:
Economists will never admit it—unless we discover a weather event that modulated the effects— but I’ll go to my deathbed claiming that the Brazil migration made all this happen by overcoming skepticism that bsky could have more than a niche audience.
Back in June, Bluesky introdced a feature known as “Starter Packs”. These are collections of up to 150 Bluesky accounts bundled under some common theme that enable new (or any) users, quickly build up a feed, so they aren’t staring at a blank or meaningless timeline when they open up the app.
As of this post, there are close to 30K Starter Packs (SPs). Discovering SPs was tough when that number was under 10K, and — thankfully — Mubashar Iqbal had already stared to maintain a slick directory of SPs.
I’d rather have these groups of folks in list form, so I build spackrat, a CLI that helps both find SPs and convert them to lists (and lets you maintain the membership of each list based on updates to the OG SP).
“How are you and Mubashar collecting the information on these lists?” is the question we’re answering in today’s Bonus Drop.
Let’s talk about the original Bluesky “firehose”, first, before getting to the reimagined/simplified version of it.
The Bluesky Firehose is a real-time data stream that provides access to all public activities occurring on the Bluesky social network. It delivers a continuous stream of messages containing various types of events happening on the platform, including:
Post creation and deletion
Likes and follows/unfollows
Handle changes
Account identity updates
Repository commits
The firehose operates through a WebSocket connection, typically connecting to wss://bsky.network by default. It uses the AT Protocol (ATP) to exchange messages between users and provides real-time delivery of every message sent over the protocol.
The primary message types include:
Commit Messages: The most common type, representing actions that create, delete, or update data items
Identity Messages: Changes to account identities
Handle Messages: Updates to user handles
Tombstone Messages: Account deletions
Info Messages: Informational updates from the relay
The firehose generates significant data volume (these numbers are likely very out-of-date with the millions of new users):
Normal operation: ~24 GB/day
During high activity periods: >232 GB/day
We can use the firehose for:
Real-time monitoring of platform activity
Analytics and research
Bot development
Feed generation
Content labeling
Unfortunately the firehose requires some fairly deep knowledge of how the AT protocl works, and involved decoding of “CBOR” and “CAR” data.
CBOR stands for Concise Binary Object Representation, which is:
A binary data serialization format based on JSON
Designed for extremely small code size and message size
Capable of handling binary data natively, unlike JSON which requires base64 encoding
Defined in RFC 8949 as an Internet Standard
CAR objects are “content-addressable archives” for Bluesky data and function similarly to tar files or Git packfiles, storing repository data in a binary format.
When working with them — even with decent API wrappers — you’re performing many lookups and conversion operations just to get to the data you want to operate on.
One of the Bluesky developers created a side-project to simplify the firehose. It’s called Jetstream, and has been fully adopted by the Bluesky team as a service.
Jetstream was developed to address several significant limitations of the original Bluesky firehose, offering a more streamlined and accessible approach for developers while maintaining robust performance and operational efficiency.
One of Jetstream’s primary advantages is its elimination of the need to decode complex binary formats like CBOR data and CAR files. Instead, it adopts straightforward JSON encoding, which significantly lowers the barrier to entry for developers unfamiliar with the intricacies of binary data handling. This approach makes Jetstream a more approachable tool, particularly for those new to the ecosystem.
Jetstream introduces data compression to reduce bandwidth usage, a critical feature for managing high-throughput scenarios. Developers can also filter data by collection (NSID) or repository (DID), enabling precise, selective data consumption. These capabilities allow Jetstream to efficiently handle hundreds to thousands of events per second, providing a scalable solution for high-velocity data streams.
When faced with high-volume scenarios, such as a surge in traffic from events like the Brazil spike, Jetstream shines by allowing developers to focus on only the events they care about. It provides an efficient mechanism for monitoring specific accounts or event types, avoiding the overhead of processing unnecessary data.
Jetstream is designed to be simpler and more cost-effective to operate than running a full Relay instance. Its architecture supports fan-out capabilities, enabling multiple subscribers to connect to a single Jetstream instance. Additionally, the availability of multiple public instances across different regions enhances reliability, making Jetstream a practical choice for many developers.
Despite its advantages, Jetstream makes certain compromises in security. Unlike the original firehose, it lacks cryptographic signatures and Merkle tree nodes, meaning it is not self-authenticating. Furthermore, because Jetstream is not formally part of the AT Protocol, it may not be suitable for applications requiring critical infrastructure-level security and integrity assurances.
Having worked with the Firehose (ref. previous Drops), I can attest to how joyful it is to work with the Jetstream, so let’s take a look at how to do so.
Flying Into The Jetstream
You don’t need any real “code” to at least peek at the Jetstream. Take a moment to go install websocat (we’ll wait for you to do so).
Done? Great!.
This CLI program is a bit like curl for WebSockets, and it works in much the same way.
Showing is better than telling, so, type this at the terminal:
So, with that one command, you’re consuming the entire firehose, simplified into succinct JSON records.
Almost nobody wants to consume the entire firehose, so we can tell the Jetstream service that we only want certain collections (specified by namespace identifiers) (NSIDs).
For today, I only care about the creation of Starter Packs. Thankfully, they have an NSID of app.bsky.graph.starterpack, so we can take a quick look at what we get when focused solely on that:
While we could likely hack out a Starter Pack collector with just Bash, websocat and jq, we really shouldn’t. So, let’s take a look at a more programmatic way to work with the Jetstream.
I really like Deno when working with WebSockets (WS). There has been robust support for WS in JS-land ever since that tech was invented, and Deno is a very nice modern take on what Node.js originally offered on the server side. In fact, WebSockets are a core feature in Deno, not even requiring an import.
When working with a fast-moving event slinger, it’s best to have the consumer do as little as possible outside of said consumption, lest you lose some messages (or worse). So, our Deno consumer shoots messages to Red Panda, which is a modern, Rust-based alternative to Kafka, which is a well-worn message queuing service. Message queues are sometimes fraught with peril, but — as long as we’re not dealing with a ginormous number of Starter Packs being created every day — it should be fine for our purposes. It is super lightweight, and will work in a very basic single-node configuration for us.
Finally, now that we have SP creation events being queued up for processing, we introduce a small Go program that pulls the messages off of the message queue, retrieves the full metadata for the SP, and shoves it into a DuckDB database. While working with DuckDB in this fashion (inserting tiny, individual records) is not what DuckDB was ultimately meant for, it excels at pulling JSON data out of column field values, and I lean pretty heavily on that when post-processing everything for spackrat (NOTE: I am behind on the daily update to the spackrat SP database due to writing this post…I have not fully automated that yet).
The full code is up on Codeberg, but we’ll walk through salient bits, below.
There is some code that sets up the parameters for the WebSocket connection, which we ultimately establish with one simple call (I’ve wrapped everything into a class, buy you don’t have to do that in your own code):
The Red Panda Docker Compose configuration will auto-create the topic and use the default 7-day retention period, so we can re-process any of the messages if we want to or need to.
The Go-based consumer process is a tad more involved since it has to:
connect to Red Panda
retrieve each new message
make a call to the public API to get all the metadata for the Starter Pack
shove it into DuckDB
The code accounts for the need to retry Bluesky API calls (since they are having growing pains), the need to checkpoint the DuckDB database periodically to ensure the write-ahead log is fully applied to the core database, and handles interrupts nicely. As a result, while the overall process is pretty straightforward, the code is just a bit too involved to include here.
watching for update events to know when to re-retrieve and update Starter Pack metadata (I think there are also delete events, but I haven’t confirmed that, yet)
automating the database creation for spackrat
adding JSON logging to the Go consumer
adding health check endpoints to both core services
wrapping everything in Docker Compose and getting it all under systemd
Do not hesitate to ask questions, drop issues or PRs, or just say “Hi!” on Bluesky (hrbrmstr.dev).
FIN
Remember, you can follow and interact with the full text of The Daily Drop’s free posts on Mastodon via @dailydrop.hrbrmstr.dev@dailydrop.hrbrmstr.dev ☮️
Leave a comment