Speed of Apache Pinot at the Cost of Cloud Object Storage with … – InfoQ.com

Transcript

Neha Pawar: My name is Neha Pawar. I'm here to tell you about how we added tiered storage for Apache Pinot, enabling the speed of Apache Pinot at the cost of Cloud Object Storage. I'm going to start off by spending some time explaining why we did this. We'll talk about the different kinds of analytics databases, the kinds of data and use cases that they can handle. We'll dive deep into some internals of Apache Pinot. Then we will discuss why it was crucial for us to decrease the cost of Pinot while keeping the speed of Pinot. Finally, we'll talk in depth about how we implemented this.

Let's begin by talking about time, and the value of data. Events are the most valuable when they have just happened. They tell us more about what is true in the world at the moment. The value of an event tends to decline over time, because the world changes and that one event tells us less and less about what is true as time goes on. It's also the case that the recent real-time data tends to be queried more than the historical data. For instance, with recent data, you would build real-time analytics, anomaly detection, user facing analytics.

These are often served directly to the end users of your company, for example, profile view analytics, or article analytics, or restaurant analytics for owners, feed analytics. Now imagine, if you're building such applications, they will typically come with a concurrency of millions of users have to serve about thousands of queries per second, and the SLAs will be stringent in just a few milliseconds. This puts pressure on those queries to be faster. It also justifies more investment in the infrastructure to support those queries.

Since recent events are more valuable, we can in effect, spend more to query them. Historical data is queried less often than real-time data. For instance, with historical data, you would typically build metrics, reporting, dashboards, use it for ad hoc analysis. You may also use it for user facing analytics. In general, your query volume will be much lower and less concurrent than the recent data. What we know for sure about historical data that it is large, and it keeps getting bigger all the time. None of this means that latency somehow becomes unimportant. We will always want our database to be fast. It's just that with historical data, the cost becomes the dominating factor. To summarize, recent data is more valuable and extremely latency sensitive. Historical data is large, and tends to be cost sensitive.

Given you have two such kinds of data that you have to handle, and manage the use cases that come with it, if you are tasked with choosing an analytics infrastructure for your organizations, the considerations on top of your mind are going to be cost, performance, and flexibility. You need systems which will be able to service the different kinds of workloads, while maintaining query and freshness SLAs needed by these use cases. The other aspect is cost. You'll need a solution where the cost of service is reasonable and the business value extracted justifies this cost. Lastly, you want a solution that is easy to operate, to configure, and also one that will fulfill a lot of your requirements together.

Now let's apply this to two categories of analytics databases that exist today. Firstly, the real-time analytics or the OLAP databases. For serving real-time data and user facing analytics, you will typically pick a system like Apache Pinot. There's also some other open source as well as proprietary systems, which can help serve real-time data, such as ClickHouse and Druid. Let's dig a little deeper into what Apache Pinot is. Apache Pinot is a distributed OLAP datastore that can provide ultra-low latency even at extremely high throughput.

It can ingest data from batch sources such as Hadoop, S3, Azure. It can also ingest directly from streaming sources such as Kafka, Kinesis, and so on. Most importantly, it can make this data available for querying in real-time. At the heart of the system is a columnar store, along with a variety of smart indexing and pre-aggregation techniques for low latency. These optimizations make Pinot a great fit for user facing real-time analytics, and even for applications like anomaly detection, dashboarding, and ad hoc data exploration.

Pinot was originally built at LinkedIn, and it powers a wide variety of applications there. If you've ever been on linkedin.com website, there's a high chance you've already interacted with Pinot. Pinot powers LinkedIn's iconic, who viewed my profile application, and many other such applications, such as feed analytics, employee analytics, talent insights, and so on. Across all of LinkedIn, there's over 80 user facing products backed by Pinot and they're serving queries at 250,000 queries per second, while maintaining strict milliseconds and sub-seconds latency SLAs.

Another great example is Uber Eats Restaurant Manager. This is an application created by Uber to provide restaurant owners with their orders data. On this dashboard, you can see sales metrics, missed orders, inaccurate orders in a real-time fashion, along with other things such as top selling menu items, menu item feedback, and so on. As you can imagine, to load this dashboard, we need to execute multiple complex OLAP queries, all executing concurrently. Multiply this with all the restaurant owners across the globe. This leads to several thousands of queries per second for the underlying database.

Another great example of the adoption of Pinot for user facing real-time analytics is at Stripe. There, Pinot is ingesting hundreds of megabytes per second from Kafka, and petabytes of data from S3, and solving queries at 200k queries per second, while maintaining sub-second p99 latency. It's being used to service a variety of use cases, some of them being for financial analysts, we have ledger analytics. Then there's user facing dashboards built for merchants. There's also internal dashboards for engineers and data scientists.

The Apache open-source community is very active. We have over 3000 members now, almost 3500. We've seen adoption from a wide variety of companies in different sectors such as retail, finance, social media, advertising, logistics, and they're all together pushing the boundaries of Pinot in speed, and scale, and features. These are the numbers from one of the largest Pinot clusters today, where we have a million plus events per second, serving queries at 250k queries per second, while maintaining strict milliseconds query latency.

To set some more context for the rest of the talk, let's take a brief look at Pinot's high-level architecture. The first component is the Pinot servers. This is the component that hosts the data and serve queries of the data that they host. Data in Pinot is stored in the form of segments. Segment is a portion of the data which is packed with metadata and dictionaries, indexes in a columnar fashion. Then we have the brokers.

Brokers are the component that gets queries from the clients. They scatter them to the servers. The servers execute these queries for the portion of data that they host. They send the results back to the brokers. Then the brokers do a final merge and return the results back to the client. Finally, we have the controllers that control all the interactions and state of the cluster with the help of Zookeeper as a persistent metadata store, and Helix for state management.

Why is it that Pinot is able to support such real-time low latency milliseconds level queries? One of the main reasons is because they have tightly coupled storage and compute architecture. The compute nodes used typically have a disk or SSD attached to store the data. The disk and SSD could be on local storage, or it could be remote attached like an EBS volume. The reason that they are so fast is because for both of these, the access method is POSIX APIs.

The data is right there, so you can use techniques like m-mapping. As a result, accessing this data is really fast. It can be microseconds if you're using instant storage and milliseconds if you're using a remote attached, say, an EBS volume. One thing to note here, though, is that the storage that we attach in such a model tends to be only available to the single instance to which it is attached. Then, let's assume that this storage has a cost factor of a dollar. What's the problem, then?

Let's see what happens when the data volume starts increasing by a lot. Say you started with just one compute node, which has 2 terabytes of storage. Assume that the monthly cost is $200 for compute, $200 for storage, so $400 in total. Let's say that your data volume grows 5x. To accommodate that, you can't just add only storage, there are limits on how much storage a single instance can be given. Plus, if you're using instant storage, it often just comes pre-configured, and you don't have much control on scaling that storage up or down for that instance. As a result, you have to provision the compute along with it. Cost will be $1,000 for storage and $1000 for compute.

If your data grows 100x, again, that's an increase in both storage and compute. More often than not, you won't need all the compute that you're forcibly adding just to support the storage, as the increasing data volume doesn't necessarily translate to a proportional increase in query workload. You will end up paying for all this extra compute which could remain underutilized. Plus, this type of storage tends to be very expensive compared to some other storage options available, such as cloud object stores. That's because the storage comes with a very high-performance characteristic.

To summarize, in tightly coupled systems, you will have amazing latencies, but as your data volume grows, you will end up with a really high cost to serve. We have lost out on the cost aspect of our triangle of considerations.

Let's look at modern data warehouses, Query Federation technologies like Spark, Presto, and Trino. These saw the problem of combining storage and compute, so they went with a decoupled architecture, wherein they put storage into a cloud object store such as Amazon S3. This is basically the cheapest way you will ever store data. This is going to be as much as one-fifth of the cost of disk or SSD storage.

On the flip side, what were POSIX file system API calls which completed in microseconds, now became network calls, which can take thousands or maybe 10,000 times longer to complete. Naturally, we cannot use this for real-time data. We cannot use this to serve use cases like real-time analytics and user facing analytics. With decoupled systems, we traded off a lot of latency to save on cost, and now we are looking not so good on the performance aspect of our triangle. We have real-time systems that are fast and expensive, and then batch systems that are slow and cheap. What we ideally want is one system that can do both, but without infrastructure that actually supports this, data teams end up adding both systems into their data ecosystem.

They will keep the recent data in the real-time system and set an aggressive retention period so that the costs stay manageable. As the data times out of the real-time database, they'll migrate it to a storage decoupled system to manage the historical archive. With this, we're doing everything twice. We're maintaining two systems, often duplicating data processing logic. With that, we've lost on the flexibility aspect of our triangle.

Could we somehow have a true best of both worlds, where we'll get the speed of a tightly coupled real-time analytics system. We'll be able to use cost effective storage like a traditionally decoupled analytic system. At the same time, have flexibility and simplicity of being able to use just one system and configure it in many ways. With this motivation in mind, we at StarTree set out to build tiered storage for Apache Beam. With tiered storage, your Pinot cluster is now not limited to just use disk or SSD storage. We are no longer strictly tightly coupled.

You can have multiple tiers of storage with support for using a cloud object storage such as S3 as one of the storage tiers. You can configure exactly which portion of your data you want to keep locally, and which is offloaded to the cloud tier storage. One popular way to split data across local versus cloud is by data age. You could configure in your table, something like, I want data less than 30 days to be on disk, and the rest of it, I want it to go on to S3. Users can then query this entire table across the local and remote data like any other Pinot dataset. With this decoupling, you can now store as much data as you want in Pinot, without worrying about the cost. This is super flexible and configurable.

The threshold is dynamic, and can be changed at any point in time, and Pinot will automatically reflect the changes. You can still operate Pinot in fully tightly coupled mode, if you want, or in completely decoupled mode. Or go for a hybrid approach, where some nodes are still dedicated for local data, some nodes for remote data, and so on. To summarize, we saw that with tiered storage in Pinot, we have the flexibility of using a single system for your real-time data, as well as historical data without worrying about the cost spiraling out of control.

We didn't talk much about the third aspect yet, which is performance. Now that we're using cloud object storage, will the query latencies take a hit and enter the range of other decoupled systems? In the next few sections, we're going to go over in great detail how we approached the performance aspect for queries accessing data on the cloud storage. Until tiered storage, Pinot has been assuming that segments stay on the local disk. It memory mapped the segments to access the data quickly. To make things work with remote segments on S3, we extended the query engine to make it agnostic to the segment location.

Under the hood, we plugged in our own buffer implementation, so that during query execution, we can read data from remote store instead of local as needed. Making the queries work is just part of the story. We want to get the best of two worlds using Pinot. From the table, you can see that the latency to access segments on cloud object storage is a lot higher, so hence, we began our pursuit to ensure we can keep the performance of Pinot in an acceptable range, so that people can keep using Pinot for their real-time user facing analytics use cases that they have been used to.

We began thinking about a few questions. Firstly, what is the data that should be read? We certainly don't need to read all of the segments that are available for any query. We may not even need to read all of the data inside a given segment. What exactly should we be reading? Second question was, when and how to read the data during the query execution? Should we wait until the query has been executed and we're actually processing a segment, or should we do some caching? Should we do some prefetching? What smartness can we apply there? In the following slides, I'll try to answer these questions by explaining some of the design choices we made along the way.

The first idea that we explored was lazy loading. This is a popular technique used by some other systems to solve tiered storage. In lazy loading, all of the data segments would be on the remote store to begin with, and each server will have to have some attached storage. When the first query comes in, it will check if the local instance storage has the segments that it needs. If it does not find the segments on there, those will be downloaded from the remote store during the query execution. Your first query will be slow, of course, because it has to download a lot of segments.

The hope is that the next query will need the same segments or most of the segments that you already have, and hence reuse what's already downloaded, making the second query execute very fast. Here, for what to fetch, we have done the entire segment. For when to fetch, we have done during the query execution. In typical OLAP workloads, your data will rarely ever be reusable across queries.

OLAP workloads come with arbitrary slice and dice point lookups across multiple time ranges and multiple user attributes. Which means that more often than not, you won't be able to reuse the downloaded segment for the next query, which means we have to remove them to make space for the segments needed by the new query, because instance storage is going to be limited. This will cause a lot of churn and downloads. Plus, in this approach, you are fetching the whole segment.

Most of the times, your query will not need all of the columns in the segment, so you will end up fetching a lot of excessive data, which is going to be wasteful. Also, using lazy loading, the p99 or p99.9 of the query latency would be very bad, since there will always be some query that needs to download the remote segments. Because of this, lazy loading method was considered as a strict no-go for OLAP bias where consistent low latency is important. Instead of using lazy loading, or similar ideas, like caching segments on local disks, we started to think about how to solve the worst case. That is when the query has to read data from remote segments. Our hope was that by solving this, we can potentially guarantee consistent and predictable, low latency for all queries.

Then, to answer the question of what should we fetch, given that we know we don't want to fetch the whole segment? We decided to take a deeper look at the Pinot segment format, to see if we could use the columnar nature of this database to our advantage. Here's an example of a Pinot segment file. Let's say we have columns like browser, region, country, and then some metric columns like impression, cost, and then our timestamp as well.

In Pinot, the segments are packed in a columnar fashion. One after the other, you're going to see all these columns lined up in this segment file called columns.psf. For each column as well, you will see specific, relevant data buffers. For example, you could have forward indexes, you could have dictionaries, and then some specialized indexes like inverted index, range index, and so on.

This segment format allowed us to be a lot more selective and specific when deciding what we wanted to read from the Pinot segment. We decided we would do a selective columnar fetch, so bringing back this diagram where we have a server and we have some segments in a cloud object store. If you get a query like select sum of impressions, and a filter on the region column, we are only interested in the region and impressions. That's all we'll fetch.

Further, we also know from the query plan, that region is only needed to evaluate a filter, so we probably just need a dictionary and inverted index for that. Once we have the matching rows or impressions, we only need the dictionary and forward index. All other columns can be skipped. We used a range GET API, which is an API provided by S3, to just pull out these portions of the segment that we need: the specific index buffers, region dictionary, region inverted index, impressions for word index, impressions dictionary.

This worked pretty well for us. We were happy at that point with, this is the what to read part. Now that we know what data to read, next, we began thinking about when to read the data. We already saw earlier, that when a Pinot broker gets a query, it scatters the request to the servers, and then each server executes the query. Now in this figure, we are going to see what happens within a Pinot server when it gets a query. First, the server makes a segment execution plan as part of the planning phase. This is where it decides, which are the segments that it needs to process.

Then those segments are processed by multiple threads in parallel. One of the ideas that we explored was to fetch the data from S3, just as we're about to execute this segment. In each of these segment executions, just before we would fetch the data from S3, and only then proceed to executing the query on that segment.

We quickly realized that this is not a great strategy. To demonstrate that, here's a quick example. Let's say you have 40 segments, and parallelism at our disposal on this particular server is 8. That means we would be processing these 40 segments in batches of 8, and that would mean that we are going to do 5 rounds to process all of them. Let's assume that the latency to download data from S3 is 200 milliseconds.

For each batch, we are going to need 200 milliseconds, because as soon as the segment batch begins to get processed, we will first make a round trip to S3 to get that data from that segment. This is quickly going to add up. For each batch, we will need 200 milliseconds, so your total query time is going to be 1000 milliseconds overhead right there. One thing that we observed was that if you check the CPU utilization during this time, most of the time the threads are waiting for the data to become available, and the CPU cores would just stay idle.

Could we somehow decide the segments needed by the query a lot earlier, and then prefetch them so that we can pipeline the IO and data processing as much as possible? That's exactly what we did. During the planning phase itself, we know on the server which segments are going to be needed by this query. In the planning phase itself, we began prefetching all of them. Then, just before the segment execution, the thread would wait for that data to be available, and the prefetch was already kick started. In the best-case scenario, we're already going to have that data prefetched and ready to go.

Let's come back to our example of the 40 segments with the 8 parallelism. In this case, instead of fetching when each batch is about to be executed, we would have launched the prefetch for all the batches in the planning phase itself. That means that maybe the first batch still has to wait 200 milliseconds for the data to be available. While that is being fetched, the data for all the batches is being fetched. For the future batches, you don't have to spend any time waiting, and this would potentially reduce the query latency down to a single round trip of S3. That's just 200 milliseconds overhead.

Taking these two techniques, so far, which is selective columnar fetch and prefetching during data planning with pipelining the fetch and execution, we did a simple benchmark. The benchmark was conducted in a small setup with about 200 gigabytes of data, one Pinot server. The queries were mostly aggregation queries with filters, GROUP BY and ORDER BY. We also included a baseline number with the same data on Presto to reference this with a decoupled architecture. Let's see the numbers. Overall, Pinot with tiered storage was 5 times to 20 times faster than Presto.

How is it that Pinot is able to achieve such blazing fast query latencies compared to other decoupled systems like Presto, even when we change the underlying design to be decoupled storage and compute? Let's take a look at some of the core optimizations used in Pinot which help with that. Bringing back the relevant components of the architecture, we have broker, let's say we have 3 servers, and say that each server has 4 segments. That means we have total 12 segments in this cluster.

When a query is received by the broker, it finds the servers to scatter the query to. In each server, it finds the segments it should process. Within each segment, we process certain number of documents based on the filters, then we aggregate the results on the servers. A final aggregation is done on the broker. At each of these points, we have optimizations to reduce the amount of work done. Firstly, broker side pruning is done to reduce the number of servers that we fan out to. Brokers ensure that they select the smallest subset of servers needed for a query and optimize it further using techniques like smart segment assignment strategies, partitioning, and so on.

Once the query reaches the server, more pruning is done to reduce the number of segments that it has to process on each server. Then within each segment, we scan the segment to get the documents that we need. To reduce the amount of work done and the document scan, we apply filter optimizations like indexes. Finally, we have a bunch of aggregation optimizations to calculate fast aggregations.

Let's talk more about the pruning techniques available in Pinot, and how we're able to use them even when segments have been moved to the tier. We have pruning based on min/max value columns, or partition-based pruning using partition info. Both of these metadata are cached locally, even if the segment is on a remote cloud object store. Using that, we are quickly able to eliminate segments where we won't find the matching data. Another popular technique used in Pinot is Bloom filter-based pruning. These are built per segment.

We can read it to know if a value is absent from a given segment. This one is a lot more effective than the min/max based or partition-based pruning. These techniques really help us a lot because they help us really narrow down the scope of the segments that we need to process. It helps us reduce the amount of data that we are fetching and processing from S3.

Let's take a look at the filter optimizations available in Pinot. All of these are available for use, even if the segment moves to the remote tier. We have inverted indexes where for every unique value, we keep a bitmap of matching doc IDs. We also have classic techniques like sorted index, where the column in question is sorted within the segment, so we can simply keep start and end document ID for the value. We also have range index, which helps us with range predicates such as timestamp greater than, less than, in between.

This query pattern is quite commonly found in user facing dashboards and in real-time anomaly detection. Then we have a JSON index, which is a very powerful index structure. If your data is in semi-structured form, like complex objects, nested JSON. You don't need to invest in preprocessing your data into structured content, you can ingest it as-is, and Pinot will index every field inside your complex JSON, allowing you to query it extremely fast. Then we have the text index for free text search and RegEx b like queries, which helps with log analytics.

Then, geospatial index, so if you're storing geo coordinates, it lets you compute geospatial queries, which can be very useful in applications like orders near you, looking for things that are 10 miles from a given location, and so on. We also have aggregation optimizations such as theta sketches, and HyperLogLog for approximate aggregations. All of these techniques we can continue using, even if the segment is moved on to a cloud object store. This is one of the major reasons why the query latency for Pinot is so much faster than traditionally decoupled storage and compute systems.

While these techniques did help us get better performance than traditionally decoupled systems, when compared to tightly coupled Pinot, which is our true baseline, we could see a clear slowdown. This showed that the two techniques that we implemented in our first version are not enough, they are not effective enough to hide all the data access latency from S3. To learn more from our first version, we stress tested it with a much larger workload.

We put 10 terabytes of data into a Pinot cluster with 2 servers that had a network bandwidth on each server of 1.25 gigabytes per second. Our first finding from the stress test was that the network was saturated very easily and very often. The reason is that, although we tried to reduce the amount of data to read with segment pruning and columnar fetch, we still read a lot of data unnecessarily for those columns, because we fetch the full column in the segment.

Especially, if you have high selectivity filters where you're probably going to need just a few portions from the whole column, this entire columnar fetch is going to be wasteful. Then, this also puts pressure on the resources that we reserve for prefetching all this data. Also, once the network is saturated, all we can do from the system's perspective, is what the instance network bandwidth will allow us. No amount of further parallelism could help us here. On the other hand, we noticed that when network was not saturated, we could have been doing a lot more work in parallel and reducing the sequential round trips we made to S3. Our two main takeaways were, reduce the amount of unnecessary data read, and increase the parallelism even more.

One of the techniques we added for reading less was an advanced configuration to define how to split the data across local versus remote. It doesn't just have to be by data age, you can be super granular and say, I want this specific column to be local, or the specific index of this column to be local, and everything else on cloud storage. With this, you can pin lightweight data structures such as Bloom filters locally onto the instance storage, which is usually a very small fraction of the total storage, and it helps you do fast and effective pruning. Or you can also pin any other index structures that you know we'll be using often.

Another technique we implemented is, instead of doing a whole columnar fetch all the time, we decided that we will just read relevant chunks of the data from the column. For example, bringing back our example from a few slides ago, in this query, when we are applying the region filter, after reading the inverted index, we know that we only need these few documents from the whole impressions column. Maybe we don't need to fetch the full forward index, all we can do is just read small blocks of data during the post filter execution.

With that, our execution plan becomes, during prefetch, only fetch the region.inv_idx. Or the data that we need to read from the impressions column, we will read that on-demand, and only we will read few blocks. We tested out these optimizations on the 10-terabyte data setup. We took three queries of varying selectivity. Then we ran these queries with the old design that had only columnar fetch and prefetching and pipelining, and also with the new design where we have more granular block level fetches instead of full columnar fetch. We saw some amazing reduction in data size compared to our phase one. This data size reduction directly impacted and improved the query latency.

One index that we did not talk about when we walked through the indexes in Pinot is the StarTree index. Unlike other indexes in Pinot, which are columnar, StarTree is a segment level index. It allows us to maintain pre-aggregated values for certain dimension combinations. You can choose exactly which dimensions you want to pre-aggregate, and also how many values you want to pre-aggregate at each level. For example, assume our data has columns, name, environment ID, type, and a metric column value along with a timestamp column.

We decided that we want to create a StarTree index, and only materialize the name and environment ID, and we only want to store the aggregation of sum of value. Also, that we will not keep more than 10 records unaggregated at any stage. This is how our StarTree will look. We will have a root node, which will split into all the values for the name column. In each name column, we will have again a split-up for all the values of environment ID. Finally, at every leaf node, we will store the aggregation value, sum of value.

Effectively, StarTree lets you choose between pre-aggregating everything, and doing everything on the fly. A query like this where we have a filter on name and environment ID and we are looking for sum of value, this is going to be super-fast because it's going to be a single lookup. We didn't even have to pre-aggregate everything for this, nor did we have to compute anything on the fly.

How did we effectively use this in tiered storage, because you can imagine that this index must be pretty big in size compared to other indexes like inverted or Bloom filter, so pinning it locally won't work as that would be space inefficient. Prefetching it on the fly will hurt our query latency a lot. This is where all the techniques that we talked about previously came together. We pinned only the tree structure locally, which is very small, and lightweight.

As for the data at each node and aggregations, we continue to keep them in S3. When we got a query that could use this index, it quickly traversed the locally pinned tree, pointing us to the exact location of the result in the tree, which we could then get with a few very quick lookups on S3. Then we took this for a spin with some very expensive queries, and we saw a dramatic reduction in latency because the amount of data fetched had reduced.

So far, we discussed techniques on how to reduce the amount of data read. Let's talk about one optimization we are currently playing with to increase the parallelism. Bringing back this example where we knew from the inverted index that we'll only need certain rows, and then we only fetch those blocks during the post filter evaluation phase. We build sparse indexes which help us get this information about which exact chunks we would need from this forward index in the planning phase itself.

Knowing this in the planning phase helps because now we're able to identify and begin prefetching these chunks. In the planning phase, while the filter is getting evaluated, these chunks are getting prefetched in parallel so that the post filter phase is going to be much faster.

We saw a lot of techniques that we used in order to build tiered storage, such that we could keep the speed of Pinot, while reducing the cost. I'd like to summarize some of the key takeaways with tiered storage in Apache Pinot:

See more presentations with transcripts

View post:
Speed of Apache Pinot at the Cost of Cloud Object Storage with ... - InfoQ.com

Related Posts

Comments are closed.