Bonus Drop #68 (2024-11-17): Drinking From A More Civilized Firehose

Collecting Starter Packs With Jetstream, Red Panda, And DuckDB

Folks have been rolling in to Bluesky fairly regularly ever since the Brazil/X kerfuffle, and especially since X’s daft new “block” policy and the aftermath of the disastrous 2024 U.S. POTUS election:

Economists will never admit it—unless we discover a weather event that modulated the effects— but I’ll go to my deathbed claiming that the Brazil migration made all this happen by overcoming skepticism that bsky could have more than a niche audience.

— Ted Underwood 🦋 (@tedunderwood.meNovember 12, 2024 at 9:22 AM

Back in June, Bluesky introdced a feature known as “Starter Packs”. These are collections of up to 150 Bluesky accounts bundled under some common theme that enable new (or any) users, quickly build up a feed, so they aren’t staring at a blank or meaningless timeline when they open up the app.

As of this post, there are close to 30K Starter Packs (SPs). Discovering SPs was tough when that number was under 10K, and — thankfully — Mubashar Iqbal had already stared to maintain a slick directory of SPs.

I’d rather have these groups of folks in list form, so I build spackrat, a CLI that helps both find SPs and convert them to lists (and lets you maintain the membership of each list based on updates to the OG SP).

“How are you and Mubashar collecting the information on these lists?” is the question we’re answering in today’s Bonus Drop.

Photo by Pixabay on Pexels.com

Lifting Off Of The Firehose Into The Jetstream

Let’s talk about the original Bluesky “firehose”, first, before getting to the reimagined/simplified version of it.

The Bluesky Firehose is a real-time data stream that provides access to all public activities occurring on the Bluesky social network. It delivers a continuous stream of messages containing various types of events happening on the platform, including:

  • Post creation and deletion
  • Likes and follows/unfollows
  • Handle changes
  • Account identity updates
  • Repository commits

The firehose operates through a WebSocket connection, typically connecting to wss://bsky.network by default. It uses the AT Protocol (ATP) to exchange messages between users and provides real-time delivery of every message sent over the protocol.

The primary message types include:

  • Commit Messages: The most common type, representing actions that create, delete, or update data items
  • Identity Messages: Changes to account identities
  • Handle Messages: Updates to user handles
  • Tombstone Messages: Account deletions
  • Info Messages: Informational updates from the relay

The firehose generates significant data volume (these numbers are likely very out-of-date with the millions of new users):

  • Normal operation: ~24 GB/day
  • During high activity periods: >232 GB/day

We can use the firehose for:

  • Real-time monitoring of platform activity
  • Analytics and research
  • Bot development
  • Feed generation
  • Content labeling

Unfortunately the firehose requires some fairly deep knowledge of how the AT protocl works, and involved decoding of “CBOR” and “CAR” data.

CBOR stands for Concise Binary Object Representation, which is:

  • A binary data serialization format based on JSON
  • Designed for extremely small code size and message size
  • Capable of handling binary data natively, unlike JSON which requires base64 encoding
  • Defined in RFC 8949 as an Internet Standard

CAR objects are “content-addressable archives” for Bluesky data and function similarly to tar files or Git packfiles, storing repository data in a binary format.

When working with them — even with decent API wrappers — you’re performing many lookups and conversion operations just to get to the data you want to operate on.

One of the Bluesky developers created a side-project to simplify the firehose. It’s called Jetstream, and has been fully adopted by the Bluesky team as a service.

Jetstream was developed to address several significant limitations of the original Bluesky firehose, offering a more streamlined and accessible approach for developers while maintaining robust performance and operational efficiency.

One of Jetstream’s primary advantages is its elimination of the need to decode complex binary formats like CBOR data and CAR files. Instead, it adopts straightforward JSON encoding, which significantly lowers the barrier to entry for developers unfamiliar with the intricacies of binary data handling. This approach makes Jetstream a more approachable tool, particularly for those new to the ecosystem.

Jetstream introduces data compression to reduce bandwidth usage, a critical feature for managing high-throughput scenarios. Developers can also filter data by collection (NSID) or repository (DID), enabling precise, selective data consumption. These capabilities allow Jetstream to efficiently handle hundreds to thousands of events per second, providing a scalable solution for high-velocity data streams.

When faced with high-volume scenarios, such as a surge in traffic from events like the Brazil spike, Jetstream shines by allowing developers to focus on only the events they care about. It provides an efficient mechanism for monitoring specific accounts or event types, avoiding the overhead of processing unnecessary data.

Jetstream is designed to be simpler and more cost-effective to operate than running a full Relay instance. Its architecture supports fan-out capabilities, enabling multiple subscribers to connect to a single Jetstream instance. Additionally, the availability of multiple public instances across different regions enhances reliability, making Jetstream a practical choice for many developers.

Despite its advantages, Jetstream makes certain compromises in security. Unlike the original firehose, it lacks cryptographic signatures and Merkle tree nodes, meaning it is not self-authenticating. Furthermore, because Jetstream is not formally part of the AT Protocol, it may not be suitable for applications requiring critical infrastructure-level security and integrity assurances.

Having worked with the Firehose (ref. previous Drops), I can attest to how joyful it is to work with the Jetstream, so let’s take a look at how to do so.

Flying Into The Jetstream

You don’t need any real “code” to at least peek at the Jetstream. Take a moment to go install websocat (we’ll wait for you to do so).

Done? Great!.

This CLI program is a bit like curl for WebSockets, and it works in much the same way.

Showing is better than telling, so, type this at the terminal:

$ websocat "wss://jetstream2.us-east.bsky.network/subscribe"

What you should see next is a stream of ndjson records like this:

{"did":"did:plc:zz3s3fzwqyrya7t4bzugycrk","time_us":1731869258479784,"type":"com","kind":"commit","commit":{"rev":"3lb62xvy6tb2m","type":"c","operation":"create","collection":"app.bsky.graph.follow","rkey":"3lb62xvr3nf2s","record":{"$type":"app.bsky.graph.follow","createdAt":"2024-11-17T18:47:34.846Z","subject":"did:plc:rfqxk4moeeaputu63rm4tfdc"},"cid":"bafyreibdwv467w63hek7jb6yyrdv23a35jl4bfqp5ewotvm54siqpeceya"}}
{"did":"did:plc:zz3s3fzwqyrya7t4bzugycrk","time_us":1731869258480141,"type":"com","kind":"commit","commit":{"rev":"3lb62xvy6tb2m","type":"c","operation":"create","collection":"app.bsky.graph.follow","rkey":"3lb62xvr3ng2s","record":{"$type":"app.bsky.graph.follow","createdAt":"2024-11-17T18:47:34.846Z","subject":"did:plc:7demthehqqchvo5mtovxtxly"},"cid":"bafyreifbzuw5nqxzwtdkcq23hmvpnkq4g4g3hrmmyzcy4grl5ng27bnqim"}}
{"did":"did:plc:zz3s3fzwqyrya7t4bzugycrk","time_us":1731869258480528,"type":"com","kind":"commit","commit":{"rev":"3lb62xvy6tb2m","type":"c","operation":"create","collection":"app.bsky.graph.follow","rkey":"3lb62xvr3nh2s","record":{"$type":"app.bsky.graph.follow","createdAt":"2024-11-17T18:47:34.846Z","subject":"did:plc:hakbbgr7xjarpgv3m5x3eyiz"},"cid":"bafyreigi2swmgpbmjgxk2x67zinekkmwwtlhdxyfck2dovmmyimvm63sdy"}}

(You’re going to need to brush up on some terminology to grok those field names and record structures.)

So, with that one command, you’re consuming the entire firehose, simplified into succinct JSON records.

Almost nobody wants to consume the entire firehose, so we can tell the Jetstream service that we only want certain collections (specified by namespace identifiers) (NSIDs).

For today, I only care about the creation of Starter Packs. Thankfully, they have an NSID of app.bsky.graph.starterpack, so we can take a quick look at what we get when focused solely on that:

$ websocat "wss://jetstream2.us-east.bsky.network/subscribe?wantedCollections=app.bsky.graph.starterpack"

Unfortunately, they include other records (like account creation) in with the Starter Pack creation records:

{"did":"did:plc:malops54l7flkxndwns27nxz","time_us":1731869494697847,"type":"acc","kind":"account","account":{"active":true,"did":"did:plc:malops54l7flkxndwns27nxz","seq":3771667497,"time":"2024-11-17T18:51:34.162Z"}}
{"did":"did:plc:e6dbkqufnaoml54hrimf4arc","time_us":1731869494883549,"type":"com","kind":"commit","commit":{"rev":"3lb636h3r3a26","type":"c","operation":"create","collection":"app.bsky.graph.starterpack","rkey":"3lb636h3n6a26","record":{"$type":"app.bsky.graph.starterpack","createdAt":"2024-11-17T18:51:14.127Z","description":"Starter pack for Verified MVPs","feeds":[],"list":"at://did:plc:e6dbkqufnaoml54hrimf4arc/app.bsky.graph.list/3lb636gyylo26","name":"Verified MVPs"},"cid":"bafyreia24itcbrg6gdmqujat44vb2le7bf2e34gajosp4rhhrncbpdkzka"}}
{"did":"did:plc:7znjme3zmxfzhsygbjav3jlf","time_us":1731869495101452,"type":"id","kind":"identity","identity":{"did":"did:plc:7znjme3zmxfzhsygbjav3jlf","handle":"dtvnorthwest.bsky.social","seq":3771668187,"time":"2024-11-17T18:51:34.668Z"}}
{"did":"did:plc:enregosmiahyacaerhgbuddm","time_us":1731869495103285,"type":"id","kind":"identity","identity":{"did":"did:plc:enregosmiahyacaerhgbuddm","handle":"slime223.bsky.social","seq":3771668188,"time":"2024-11-17T18:51:34.675Z"}}

The second line in the above block has a SP creation event:

{
  "did": "did:plc:e6dbkqufnaoml54hrimf4arc",
  "time_us": 1731869494883549,
  "type": "com",
  "kind": "commit",
  "commit": {
    "rev": "3lb636h3r3a26",
    "type": "c",
    "operation": "create",
    "collection": "app.bsky.graph.starterpack",
    "rkey": "3lb636h3n6a26",
    "record": {
      "$type": "app.bsky.graph.starterpack",
      "createdAt": "2024-11-17T18:51:14.127Z",
      "description": "Starter pack for Verified MVPs",
      "feeds": [],
      "list": "at://did:plc:e6dbkqufnaoml54hrimf4arc/app.bsky.graph.list/3lb636gyylo26",
      "name": "Verified MVPs"
    },
    "cid": "bafyreia24itcbrg6gdmqujat44vb2le7bf2e34gajosp4rhhrncbpdkzka"
  }
}

We can focus just on Starter Pack creation, still, without resorting to a full-on programming language with the help of a jq filter:

$ websocat "wss://jetstream2.us-east.bsky.network/subscribe?wantedCollections=app.bsky.graph.starterpack" | \
  jq 'select(.kind == "commit" and .commit.operation == "create")'

That will reduce the massive stream of records into a manageable number of events (at least, for now) every minute:

{
  "did": "did:plc:ljxwqfcwobllxoacnx7mapmg",
  "time_us": 1731869777504963,
  "type": "com",
  "kind": "commit",
  "commit": {
    "rev": "3lb63hcxjoy2j",
    "type": "c",
    "operation": "create",
    "collection": "app.bsky.graph.starterpack",
    "rkey": "3lb63hcxfry2j",
    "record": {
      "$type": "app.bsky.graph.starterpack",
      "createdAt": "2024-11-17T18:56:11.838Z",
      "list": "at://did:plc:ljxwqfcwobllxoacnx7mapmg/app.bsky.graph.list/3lb63hcqbp72v",
      "name": "Butterfly’s Husband's Starter Pack"
    },
    "cid": "bafyreigqng2e2pqbfdrqx455wj4desvpxbnauq7ovvyhagartyfrz2f7ty"
  }
}
{
  "did": "did:plc:kfknxond6fd3mbvvyxzuooe4",
  "time_us": 1731869779001953,
  "type": "com",
  "kind": "commit",
  "commit": {
    "rev": "3lb63hdm3c627",
    "type": "c",
    "operation": "create",
    "collection": "app.bsky.graph.starterpack",
    "rkey": "3lb63hdlxf627",
    "record": {
      "$type": "app.bsky.graph.starterpack",
      "createdAt": "2024-11-17T18:56:12.522Z",
      "list": "at://did:plc:kfknxond6fd3mbvvyxzuooe4/app.bsky.graph.list/3lb63hd2hc52k",
      "name": "Lee Martin-White's Starter Pack"
    },
    "cid": "bafyreielkqeqmy2ok2pvjw6gn63dvypx6m4vrr5sipv6rcpqp53de3feyu"
  }
}

The top-level did field and commit.rkey can be used to get the full metadata for the Starter Pack from the unauthenticated Public API:

$ curl --silent "https://public.api.bsky.app/xrpc/app.bsky.graph.getStarterPack?starterPack=at://did:plc:x7ogyt5cwqwzx7kxqeqsauvp/app.bsky.graph.starterpack/3lb63du35xc2n" | jq

That returns JSON like so:

{
  "starterPack": {
    "uri": "at://did:plc:x7ogyt5cwqwzx7kxqeqsauvp/app.bsky.graph.starterpack/3lb63du35xc2n",
    "cid": "bafyreidpelf4jb5vdxapznv4ionokooef5apy4cc545hvrejseftn6l3e4",
    "record": {
      "$type": "app.bsky.graph.starterpack",
      "createdAt": "2024-11-17T18:54:14.864Z",
      "list": "at://did:plc:x7ogyt5cwqwzx7kxqeqsauvp/app.bsky.graph.list/3lb63dthmhv26",
      "name": "Coach Chris's Starter Pack"
    },
    "creator": {
      "did": "did:plc:x7ogyt5cwqwzx7kxqeqsauvp",
      "handle": "coachchrisb.bsky.social",
      "displayName": "Coach Chris",
      "avatar": "https://cdn.bsky.app/img/avatar/plain/did:plc:x7ogyt5cwqwzx7kxqeqsauvp/bafkreieh6q3c5xgq4vhibynps7hserfyij7eusasdcw4uu27caecijlese@jpeg",
      "labels": [],
      "createdAt": "2024-11-15T23:27:55.273Z"
    },
    "joinedAllTimeCount": 0,
    "joinedWeekCount": 0,
    "labels": [],
    "indexedAt": "2024-11-17T18:54:14.864Z",
    "feeds": [],
    "list": {
      "uri": "at://did:plc:x7ogyt5cwqwzx7kxqeqsauvp/app.bsky.graph.list/3lb63dthmhv26",
      "cid": "bafyreifbm7zxz7u55ffpu6k76xu4uv2w4sfvzpyn2uioid5pawg534n3e4",
      "name": "Coach Chris's Starter Pack",
      "purpose": "app.bsky.graph.defs#referencelist",
      "listItemCount": 37,
      "indexedAt": "2024-11-17T18:54:14.215Z",
      "labels": []
    },
    "listItemsSample": [
      {
        "uri": "at://did:plc:x7ogyt5cwqwzx7kxqeqsauvp/app.bsky.graph.listitem/3lb63dtt7zu2n",
        "subject": {
          "did": "did:plc:z72i7hdynmk6r22z27h6tvur",
          "handle": "bsky.app",
          "displayName": "Bluesky",
          "avatar": "https://cdn.bsky.app/img/avatar/plain/did:plc:z72i7hdynmk6r22z27h6tvur/bafkreihagr2cmvl2jt4mgx3sppwe2it3fwolkrbtjrhcnwjk4jdijhsoze@jpeg",
          "associated": {
            "chat": {
              "allowIncoming": "none"
            }
          },
          "labels": [],
          "createdAt": "2023-04-12T04:53:57.057Z",
          "description": "official Bluesky account (check domain👆)\n\nPress: press@blueskyweb.xyz\nSupport: support@bsky.app",
          "indexedAt": "2024-10-17T07:17:00.612Z"
        }
      },

While we could likely hack out a Starter Pack collector with just Bash, websocat and jq, we really shouldn’t. So, let’s take a look at a more programmatic way to work with the Jetstream.

Dinosaurs, Red Pandas, And Ducks! Oh My!

Photo by Ivan Cujic on Pexels.com

I really like Deno when working with WebSockets (WS). There has been robust support for WS in JS-land ever since that tech was invented, and Deno is a very nice modern take on what Node.js originally offered on the server side. In fact, WebSockets are a core feature in Deno, not even requiring an import.

When working with a fast-moving event slinger, it’s best to have the consumer do as little as possible outside of said consumption, lest you lose some messages (or worse). So, our Deno consumer shoots messages to Red Panda, which is a modern, Rust-based alternative to Kafka, which is a well-worn message queuing service. Message queues are sometimes fraught with peril, but — as long as we’re not dealing with a ginormous number of Starter Packs being created every day — it should be fine for our purposes. It is super lightweight, and will work in a very basic single-node configuration for us.

Finally, now that we have SP creation events being queued up for processing, we introduce a small Go program that pulls the messages off of the message queue, retrieves the full metadata for the SP, and shoves it into a DuckDB database. While working with DuckDB in this fashion (inserting tiny, individual records) is not what DuckDB was ultimately meant for, it excels at pulling JSON data out of column field values, and I lean pretty heavily on that when post-processing everything for spackrat (NOTE: I am behind on the daily update to the spackrat SP database due to writing this post…I have not fully automated that yet).

The full code is up on Codeberg, but we’ll walk through salient bits, below.

Watching For Packs

The core Deno program is ~170 lines. It begins by setting up the connection to Red Panda:

  constructor() {
    // Initialize Kafka client
    this.kafka = new Kafka({
      clientId: "bluesky-watcher",
      brokers: ["localhost:19092"],
    });
    this.producer = this.kafka.producer({
      createPartitioner: Partitioners.DefaultPartitioner,
    });
  }

There is some code that sets up the parameters for the WebSocket connection, which we ultimately establish with one simple call (I’ve wrapped everything into a class, buy you don’t have to do that in your own code):

this.ws = new WebSocket(url);

We then have various events we listen for:

this.ws.addEventListener("open", () => {
this.ws.addEventListener("message", async (event) => {
this.ws.addEventListener("close", () => {
this.ws.addEventListener("error", (error) => {

When we receive a message, all we do it pass it on (if it’s the one we want):

const data = JSON.parse(event.data);
if (
  data.kind === "commit" &&
  data.commit?.collection === "app.bsky.graph.starterpack" &&
  data.commit?.operation === "create"
) {
  const publicUrl =
    `https://public.api.bsky.app/xrpc/app.bsky.graph.getStarterPack?starterPack=at://${data.did}/app.bsky.graph.starterpack/${data.commit.rkey}`;

  const message = {
    publicUrl,
    data,
  };
  await this.producer.send({
    topic: "bluesky-starterpacks",
    messages: [
      {
        value: JSON.stringify(message),
      },
    ],
  });
}

The Red Panda Docker Compose configuration will auto-create the topic and use the default 7-day retention period, so we can re-process any of the messages if we want to or need to.

Watching The Watcher

Photo by Pixabay on Pexels.com

The Go-based consumer process is a tad more involved since it has to:

  • connect to Red Panda
  • retrieve each new message
  • make a call to the public API to get all the metadata for the Starter Pack
  • shove it into DuckDB

The code accounts for the need to retry Bluesky API calls (since they are having growing pains), the need to checkpoint the DuckDB database periodically to ensure the write-ahead log is fully applied to the core database, and handles interrupts nicely. As a result, while the overall process is pretty straightforward, the code is just a bit too involved to include here.

Next Steps

As noted, the entire project is on Codeberg.

Next up on the TODOs for me are:

  • watching for update events to know when to re-retrieve and update Starter Pack metadata (I think there are also delete events, but I haven’t confirmed that, yet)
  • automating the database creation for spackrat
  • adding JSON logging to the Go consumer
  • adding health check endpoints to both core services
  • wrapping everything in Docker Compose and getting it all under systemd

Do not hesitate to ask questions, drop issues or PRs, or just say “Hi!” on Bluesky (hrbrmstr.dev).


FIN

Remember, you can follow and interact with the full text of The Daily Drop’s free posts on Mastodon via @dailydrop.hrbrmstr.dev@dailydrop.hrbrmstr.dev ☮️

Leave a comment

This site uses Akismet to reduce spam. Learn how your comment data is processed.