Aggregating Real-time Sensor Data with Python and Redpanda – Towards Data Science

12 min read

In this tutorial, I want to show you how to downsample a stream of sensor data using only Python (and Redpanda as a message broker). The goal is to show you how simple stream processing can be, and that you dont need a heavy-duty stream processing framework to get started.

Until recently, stream processing was a complex task that usually required some Java expertise. But gradually, the Python stream processing ecosystem has matured and there are a few more options available to Python developers such as Faust, Bytewax and Quix. Later, Ill provide a bit more background on why these libraries have emerged to compete with the existing Java-centric options.

But first lets get to the task at hand. We will use a Python libary called Quix Streams as our stream processor. Quix Streams is very similar to Faust, but it has been optimized to be more concise in its syntax and uses a Pandas like API called StreamingDataframes.

You can install the Quix Streams library with the following command:

What youll build

Youll build a simple application that will calculate the rolling aggregations of temperature readings coming from various sensors. The temperature readings will come in at a relatively high frequency and this application will aggregate the readings and output them at a lower time resolution (every 10 seconds). You can think of this as a form of compression since we dont want to work on data at an unnecessarily high resolution.

You can access the complete code in this GitHub repository.

This application includes code that generates synthetic sensor data, but in a real-world scenario this data could come from many kinds of sensors, such as sensors installed in a fleet of vehicles or a warehouse full of machines.

Heres an illustration of the basic architecture:

The previous diagram reflects the main components of a stream processing pipeline: You have the sensors which are the data producers, Redpanda as the streaming data platform, and Quix as the stream processor.

Data producers

These are bits of code that are attached to systems that generate data such as firmware on ECUs (Engine Control Units), monitoring modules for cloud platforms, or web servers that log user activity. They take that raw data and send it to the streaming data platform in a format that that platform can understand.

Streaming data platform

This is where you put your streaming data. It plays more or less the same role as a database does for static data. But instead of tables, you use topics. Otherwise, it has similar features to a static database. Youll want to manage who can consume and produce data, what schemas the data should adhere to. Unlike a database though, the data is constantly in flux, so its not designed to be queried. Youd usually use a stream processor to transform the data and put it somewhere else for data scientists to explore or sink the raw data into a queryable system optimized for streaming data such as RisingWave or Apache Pinot. However, for automated systems that are triggered by patterns in streaming data (such as recommendation engines), this isnt an ideal solution. In this case, you definitely want to use a dedicated stream processor.

Stream processors

These are engines that perform continuous operations on the data as it arrives. They could be compared to just regular old microservices that process data in any application back end, but theres one big difference. For microservices, data arrives in drips like droplets of rain, and each drip is processed discreetly. Even if it rains heavily, its not too hard for the service to keep up with the drops without overflowing (think of a filtration system that filters out impurities in the water).

For a stream processor, the data arrives as a continuous, wide gush of water. A filtration system would be quickly overwhelmed unless you change the design. I.e. break the stream up and route smaller streams to a battery of filtration systems. Thats kind of how stream processors work. Theyre designed to be horizontally scaled and work in parallel as a battery. And they never stop, they process the data continuously, outputting the filtered data to the streaming data platform, which acts as a kind of reservoir for streaming data. To make things more complicated, stream processors often need to keep track of data that was received previously, such as in the windowing example youll try out here.

Note that there are also data consumers and data sinks systems that consume the processed data (such as front end applications and mobile apps) or store it for offline analysis (data warehouses like Snowflake or AWS Redshift). Since we wont be covering those in this tutorial, Ill skip over them for now.

In this tutorial, Ill show you how to use a local installation of Redpanda for managing your streaming data. Ive chosen Redpanda because its very easy to run locally.

Youll use Docker compose to quickly spin up a cluster, including the Redpanda console, so make sure you have Docker installed first.

First, youll create separate files to produce and process your streaming data. This makes it easier to manage the running processes independently. I.e. you can stop the producer without stopping the stream processor too. Heres an overview of the two files that youll create:

As you can see the stream processor does most of the heavy lifting and is the core of this tutorial. The stream producer is a stand-in for a proper data ingestion process. For example, in a production scenario, you might use something like this MQTT connector to get data from your sensors and produce it to a topic.

Youll start by creating a new file called sensor_stream_producer.py and define the main Quix application. (This example has been developed on Python 3.10, but different versions of Python 3 should work as well, as long as you are able to run pip install quixstreams.)

Create the file sensor_stream_producer.py and add all the required dependencies (including Quix Streams)

Then, define a Quix application and destination topic to send the data.

The value_serializer parameter defines the format of the expected source data (to be serialized into bytes). In this case, youll be sending JSON.

Lets use the dataclass module to define a very basic schema for the temperature data and add a function to serialize it to JSON.

Next, add the code that will be responsible for sending the mock temperature sensor data into our Redpanda source topic.

This generates 1000 records separated by random time intervals between 0 and 1 second. It also randomly selects a sensor name from a list of 5 options.

Now, try out the producer by running the following in the command line

You should see data being logged to the console like this:

Once youve confirmed that it works, stop the process for now (youll run it alongside the stream processing process later).

The stream processor performs three main tasks: 1) consume the raw temperature readings from the source topic, 2) continuously aggregate the data, and 3) produce the aggregated results to a sink topic.

Lets add the code for each of these tasks. In your IDE, create a new file called sensor_stream_processor.py.

First, add the dependencies as before:

Lets also set some variables that our stream processing application needs:

Well go into more detail on what the window variables mean a bit later, but for now, lets crack on with defining the main Quix application.

Note that there are a few more application variables this time around, namely consumer_group and auto_offset_reset. To learn more about the interplay between these settings, check out the article Understanding Kafkas auto offset reset configuration: Use cases and pitfalls

Next, define the input and output topics on either side of the core stream processing function and add a function to put the incoming data into a DataFrame.

Weve also added a logging line to make sure the incoming data is intact.

Next, lets add a custom timestamp extractor to use the timestamp from the message payload instead of Kafka timestamp. For your aggregations, this basically means that you want to use the time that the reading was generated rather than the time that it was received by Redpanda. Or in even simpler terms Use the sensors definition of time rather than Redpandas.

Why are we doing this? Well, we could get into a philosophical rabbit hole about which kind of time to use for processing, but thats a subject for another article. With the custom timestamp, I just wanted to illustrate that there are many ways to interpret time in stream processing, and you dont necessarily have to use the time of data arrival.

Next, initialize the state for the aggregation when a new window starts. It will prime the aggregation when the first record arrives in the window.

This sets the initial values for the window. In the case of min, max, and mean, they are all identical because youre just taking the first sensor reading as the starting point.

Now, lets add the aggregation logic in the form of a reducer function.

This function is only necessary when youre performing multiple aggregations on a window. In our case, were creating count, min, max, and mean values for each window, so we need to define these in advance.

Next up, the juicy part adding the tumbling window functionality:

This defines the Streaming DataFrame as a set of aggregations based on a tumbling window a set of aggregations performed on 10-second non-overlapping segments of time.

Tip: If you need a refresher on the different types of windowed calculations, check out this article: A guide to windowing in stream processing.

Finally, produce the results to the downstream output topic:

Note: You might wonder why the producer code looks very different to the producer code used to send the synthetic temperature data (the part that uses with app.get_producer() as producer()). This is because Quix uses a different producer function for transformation tasks (i.e. a task that sits between input and output topics).

As you might notice when following along, we iteratively change the Streaming DataFrame (the sdf variable) until it is the final form that we want to send downstream. Thus, the sdf.to_topic function simply streams the final state of the Streaming DataFrame back to the output topic, row-by-row.

The producer function on the other hand, is used to ingest data from an external source such as a CSV file, an MQTT broker, or in our case, a generator function.

Finally, you get to run our streaming applications and see if all the moving parts work in harmony.

First, in a terminal window, start the producer again:

Then, in a second terminal window, start the stream processor:

Pay attention to the log output in each window, to make sure everything is running smoothly.

You can also check the Redpanda console to make sure that the aggregated data is being streamed to the sink topic correctly (youll fine the topic browser at: http://localhost:8080/topics).

What youve tried out here is just one way to do stream processing. Naturally, there are heavy duty tools such Apache Flink and Apache Spark Streaming which are have also been covered extensively online. But those are predominantly Java-based tools. Sure, you can use their Python wrappers, but when things go wrong, youll still be debugging Java errors rather than Python errors. And Java skills arent exactly ubiquitous among data folks who are increasingly working alongside software engineers to tune stream processing algorithms.

In this tutorial, we ran a simple aggregation as our stream processing algorithm, but in reality, these algorithms often employ machine learning models to transform that data and the software ecosystem for machine learning is heavily dominated by Python.

An oft overlooked fact is that Python is the lingua franca for data specialists, ML engineers, and software engineers to work together. Its even better than SQL because you can use it to do non-data-related things like make API calls and trigger webhooks. Thats one of the reasons why libraries like Faust, Bytewax and Quix evolved to bridge the so-called impedance gap between these different disciplines.

Hopefully, Ive managed to show you that Python is a viable language for stream processing, and that the Python ecosystem for stream processing is maturing at a steady rate and can hold its own against the older Java-based ecosystem.

See the original post here:

Aggregating Real-time Sensor Data with Python and Redpanda - Towards Data Science

Related Posts

Comments are closed.