Bluesky Firehose Example

Read and aggregate messages from the Bluesky firehose. Use Kaskada to connect in real-time and parse messages as part of the query.

Bluesky is a “distributed social network” that aims to improve on some of the perceived shortcomings of X (nee Twitter). Bluesky uses a distributed protocol name the AT Protocol to exchange messages between users, and provides a “firehose” delivering every message sent over the protocol in real-time.

In this example, we’ll show how you can receive and process the firehose using Kaskada.

You can see the full example in the file bluesky.py.

Setup the event data source

Before we can receive events from Bluesky, we need to create a data source to tell Kaskada how to handle the events. We’ll provide a schema and configure the time and entity fields.

# Setup the data source.
# This defintes (most of) the schema of the events we'll receive,
# and tells Kaskada which fields to use for time and initial entity.
#
# We'll push events into this source as they arrive in real-time.
posts = kd.sources.PyDict(
    rows=[],
    schema=pa.schema(
        [
            pa.field(
                "record",
                pa.struct(
                    [
                        pa.field("created_at", pa.string()),
                        pa.field("text", pa.string()),
                        # pa.field("embed", pa.struct([...])),
                        pa.field("entities", pa.string()),
                        # pa.field("facets", pa.list_(...)),
                        pa.field("labels", pa.float64()),
                        pa.field("langs", pa.list_(pa.string())),
                        # pa.field("reply", pa.struct([...])),
                        pa.field("py_type", pa.string()),
                    ]
                ),
            ),
            pa.field("uri", pa.string()),
            pa.field("cid", pa.string()),
            pa.field("author", pa.string()),
            pa.field("ts", pa.float64()),
        ]
    ),
    time_column="ts",
    key_column="author",
    time_unit="s",
)

Define the incoming event handler

The atproto python library takes care of requesting and receiving events from Bluesky, all you need to do is create a handler to configure what to do with each event. This handler parses the message to find Commit events. For each Commit, we’ll parse out any Post messages. Finally we do some schema munging to get the Post into the event format we described when creating the data source.

# Handler for newly-arrived messages from BlueSky.
async def receive_at(message) -> None:
    # Extract the contents of the message and bail if it's not a "commit"
    commit = parse_subscribe_repos_message(message)
    if not isinstance(commit, models.ComAtprotoSyncSubscribeRepos.Commit):
        return

    # Get the operations included in the message
    ops = _get_ops_by_type(commit)
    for new_post in ops["posts"]["created"]:
        # The atproto library's use of schemas is sort of confusing
        # This isn't always the expected type and I'm not sure why...
        if not isinstance(new_post["record"], models.app.bsky.feed.post.Main):
            continue

        # The parsing produces a hot mess of incompatible types, so we build
        # a dict from scratch to simplify.
        posts.add_rows(
            {
                "record": dict(new_post["record"]),
                "uri": new_post["uri"],
                "cid": new_post["cid"],
                "author": new_post["author"],
                "ts": time.time(),
            }
        )

Construct a real-time query and result handler

Now we can use Kaskada to transform the events as they arrive. First we’ll use with_key to regroup events by language, then we’ll apply a simple count aggregation. Finally, we create a handler for the transformed results - here just printing them out.

# Handler for values emitted by Kaskada.
async def receive_outputs():
    # We'll perform a very simple aggregation - key by language and count.
    posts_by_first_lang = posts.with_key(posts.col("record").col("langs").index(0))

    # Consume outputs as they're generated and print to STDOUT.
    async for row in posts_by_first_lang.count().run_iter(kind="row", mode="live"):
        print(f"{row['_key']} has posted {row['result']} times since startup")

Final touches

Now we just need to kick it all off by calling asyncio.gather on the two handler coroutines. This kicks off all the async processing.

# Kickoff the two async processes concurrently.
await asyncio.gather(at_client.start(receive_at), receive_outputs())

Try running it yourself and playing different transformations!

python bluesky.py