Reddit Live Example

In this example, we’ll show how you can receive and process Reddit comments using Kaskada.

You can see the full example in the file reddit.py.

Setup Reddit credentials

Follow Reddit’s First Steps guide to create an App and obtain a client ID and secret. The “script” type application is sufficient for this example.

Setup the event data source

Before we can receive events from Reddit, we need to create a data source to tell Kaskada how to handle the events. We’ll provide a schema and configure the time and entity fields.

# Setup the data source.
# This defintes (most of) the schema of the events we'll receive,
# and tells Kaskada which fields to use for time and initial entity.
#
# We'll push events into this source as they arrive in real-time.
comments = kd.sources.PyDict(
    schema=pa.schema(
        [
            pa.field("author", pa.string()),
            pa.field("body", pa.string()),
            pa.field("permalink", pa.string()),
            pa.field("submission_id", pa.string()),
            pa.field("subreddit", pa.string()),
            pa.field("ts", pa.float64()),
        ]
    ),
    time_column="ts",
    key_column="submission_id",
    time_unit="s",
)

Define the incoming event handler

The asyncpraw python library takes care of requesting and receiving events from Reddit, all you need to do is create a handler to configure what to do with each event. This handler converts Comment messages into a dict, and passes the dict to Kaskada.

# Handler to receive new comments as they're created
async def receive_comments():
    # Creat the subreddit handle
    sr = await reddit.subreddit(os.getenv("SUBREDDIT", "all"))

    # Consume the stream of new comments
    async for comment in sr.stream.comments():
        # Add each comment to the Kaskada data source
        await comments.add_rows(
            {
                "author": comment.author.name,
                "body": comment.body,
                "permalink": comment.permalink,
                "submission_id": comment.submission.id,
                "subreddit_id": comment.subreddit.display_name,
                "ts": time.time(),
            }
        )

Construct a real-time query and result handler

Now we can use Kaskada to transform the events as they arrive. First we’ll use with_key to regroup events by author, then we’ll apply a simple count aggregation. Finally, we create a handler for the transformed results - here just printing them out.

# Handler for values emitted by Kaskada.
async def receive_outputs():
    # We'll perform a very simple aggregation - key by author and count.
    comments_by_author = comments.with_key(comments.col("author"))

    # Consume outputs as they're generated and print to STDOUT.
    async for row in comments_by_author.count().run_iter(kind="row", mode="live"):
        print(f"{row['_key']} has posted {row['result']} times since startup")

Final touches

Now we just need to kick it all off by calling asyncio.gather on the two handler coroutines. This kicks off all the async processing.

# Kickoff the two async processes concurrently.
await asyncio.gather(receive_comments(), receive_outputs())

Try running it yourself and playing different transformations!

python reddit.py