Distributed Systems Magic: K-way Merge in Thanos

Oops, it has been such a long time since I wrote a post that I even accidentally forgot to renew my site! That’s bad. I definitely owe you a few new posts. Thus, here is another post in the distributed systems magic series.

Recently I undertook the task of improving the proxying logic in Thanos. If you are not familiar with it, the Query component of Thanos sends out requests via gRPC to leaf nodes to retrieve needed data when it gets some kind of PromQL query from users. This article will show you how it worked previously, how it works now, and the learnings from doing this.

It assumes some level of familiarity with Go, Thanos, and Prometheus.

How Proxying Logic Works Currently (v0.28.0 and before)

Since it compares each node with every other node, the complexity of this is Θ(kn):

We can improve by first of all using a different data structure. You can easily see that a direct k-way merge is not the most efficient one – it is unnecessary to compare everything all the time. Another thing is that *mergedSeriesSet uses a buffered Go channel inside which means that we cannot receive messages from leaf nodes as quickly as possible. Let’s fix this.

How Proxying Logic Works Now (v0.29.0 and after)

First of all, let’s start using a data structure that allows doing k-way merge much faster. There are different options here but since Go has a pretty good heap container structure already in the standard library, we’ve decided to use that. With this, we have moved to logarithmic complexity: O(nlogk) instead of Θ(kn).

Another major performance bottleneck is the usage of a channel. I vaguely remember a meme on Twitter that went something along these lines: “Go programmers: use channels. Performance matters? Don’t use channels”. In the new version, the proxying logic uses time.Timer to implement cancelation on a Recv() (a function that blocks until a new message is available in a gRPC stream) that takes too long. Please see this awesome post by Povilas to get to know more information about timeouts in Thanos. This means that channels are no longer needed.

Finally, there has been a long-standing problem with the PromQL engine that it is not lazy. What this means is that ideally all of the retrieved data should be directly passed to the PromQL engine as fast as possible while it iterates through series & steps. However, right now everything is buffered in memory, and the query is executed only then. Hence, this refactoring should also allow us to easily switch between lazy and eager logic in the proxying layer.

Here is a graphic showing the final result:

respSet is a container of responses that can either be lazily retrieved or eagerly. dedupResponseHeap is for implementing the heap interface. In this way, the retrieval strategy is hidden in the container and now the code is tidy because the upper heap container does not care about those things.

You can see that visually this looks like the previous graphic however we are now actually taking proper advantage of the tree-like structure.

Learnings

I have learned lots of things while implementing this.

First of all, looking at something with fresh eyes and lots of experience maintaining code brings a new perspective. I remember someone saying that if after a year or two you can still look at your old code and don’t see much or any potential improvements then you aren’t really advancing as a developer. I reckon that it is kind of true. Same as in this case – after some time and after some thinking it is easy to spot improvements that could be made.

Another thing is that benchmarks are sometimes not valued as much. Go has very nice tooling for implementing benchmarks. We should write benchmarks more often. However, my own personal experience shows that companies or teams usually focus on new features and benchmarks/performance becomes a low-priority item for them. My friend Bartek is working on a book Efficient Go that has some great information and learnings about benchmarks. I recommend reading it once it comes out.

Finally, I think all of this shows that sometimes bottlenecks can be in unexpected places. In this case, one of the bottlenecks was hidden deep inside of a struct’s method – a channel was used for implementing per-operation timeouts. This is another argument in favor of benchmarking.

Now, with all of this, the proxying layer is still not fully lazy because there are wrappers around it that buffer responses however that is for another post. With all of the improvements outlined in this post, it should be quite trivial to fully stream responses.

Thanks to Filip Petkovski and Bartek Plotka for reviewing & testing!

Distributed Systems Magic: Groupcache in Thanos

This is a sequel to my previous blog post about trying to migrate to a newer version of protocol buffer API. In this blog post, I will tell you how we’ve managed to get groupcache into Thanos during the previous LFX mentorship. The team consisted of Akansha Tiwari, Prem Saraswat, and me. It would not have been possible to implement this without the whole team.

In the beginning, let’s quickly go over what is groupcache and why it solves a few important problems in Thanos.

First of all, it reduces complexity. Typically key/value storages are separate processes and require extra work to set up. The other two major caching systems supported by Thanos (besides in-memory cache) are Redis and Memcached which both run as separate processes. With groupcache, everything is embedded in the original process itself. Hence, a group of processes becomes a distributed cache.

Most importantly, it has a cache filling mechanism which means that data is only fetched once. This is probably the key benefit of groupcache. This is due to the particular use case of PromQL queries. Quite often dashboards reuse the same metrics in slightly different expressions. For example, you might have two different queries to show the 50th percentile and 99th percentile in a panel:

  • histogram_quantile(0.99, rate(etcd_network_peer_round_trip_time_seconds_bucket{cluster=~"$cluster", instance=~"$instance"}[$__rate_interval]))
  • histogram_quantile(0.50, rate(etcd_network_peer_round_trip_time_seconds_bucket{cluster=~"$cluster", instance=~"$instance"}[$__rate_interval]))

They both would hit the same series because they have an identical set of matchers. Because query-frontend uses the full expression as the key, it means that the underlying Thanos Query happily goes ahead and executes both queries separately. If Memcached or Redis caches are empty and if both queries are being executed in lockstep then data would get fetched and stored twice:

  • Thanos Store checks if needed data is in memcached/redis? -> no
  • Fetch data from remote object storage
  • Store data in memcached/redis

With groupcache, such a problem doesn’t occur because every peer in a group knows about every other peer via DNS, and the whole universe of keys in cache is consistently divided between those peers. So, if any node in a cluster wants some information then it sends a request to a node responsible for that key. As a result, data is only loaded once and then spread to all peers. This is amazing! 🎉

There is another benefit of this – mirroring of super hot items. If one node asks for some key more often than for other keys then it can save that data in memory. This avoids key hotspotting!

But enough about the benefits of groupcache – now let’s move to the tricky parts that we’ve encountered during this project.

The original groupcache project is not really maintained anymore. As a result, quite a few forks sprung up. One of the main drawbacks of the original groupcache project that we’ve noticed is forced global state. Original groupcache maintains a global map of registered groups and there is no public function for removing a group. If we would want to add some dynamic registration of groups in the future then this would be a blocker.

Then, we’ve looked at two other prominent forks – mailgun/groupcache and vimeo/galaxycache. The former is the same as the original one except that it has TTL (time to live) support and item removal support. The latter is a completely revamped groupcache that support the dynamic registration of groups. It arguably has a cleaner, more natural interface to Go developers. It even has support for loading data via gRPC calls instead of regular HTTP calls. Thus, we went ahead with it.

At the beginning of this project, we’ve reviewed these forks according to our requirements. Ideally, there would be no global state. For some reason we’ve also thought that we won’t need TTL support (expiring keys) but alas that’s not true. Nowadays Thanos Store stores not just the content of the actual objects in remote object storage but also such things as the list of items. Since some things like the formerly mentioned example can dynamically change over time, it also means that we need to have some kind of TTL support. We only realized this towards the end of LFX, after we’ve added end-to-end tests for the groupcache functionality. So, I took a stab at implementing TTL support in galaxycache: https://github.com/vimeo/galaxycache/pull/25. I implemented it by embedding the TTL information in the internal LRU cache’s keys. The TTL is checked during fetching – if the key has expired then we would remove the key from the internal cache. However, the original developers noticed my pull request and suggested following their strategy for implementing TTL without explicitly embedding that data into the LRU keys. The idea is to divide the keyspace into epochs. With this, we wouldn’t have to have explicit timestamps. This still needs to be done so that we could switch back to vanilla, upstream galaxycache! Help wanted 🙃

Last but not least, since Thanos uses regular Go HTTP servers/clients with HTTP 1.1 for unencrypted traffic and HTTP 2.0 where it is applicable – where transport layer security was used and both server/client supported it, it means that fetches are quite inefficient without HTTP 2.0 with groupcache. Here you can find a comparison between them. For that reason, we had to enable HTTP 2.0 cleartext (H2C in short). I followed this tutorial by mailgun developers to implement it. I even wrote some benchmarks to test it myself. The local performance is more or less the same but what’s important is that with HTTP 2.0 everything happens with a minimal number of TCP connections. Keep in mind that the number of TCP connections with HTTP 1.1 grows exponentially according to the number of nodes in a groupcache’s group because each node has to communicate with every other node. This results in huge performance gains with separate machines in a groupcache’s group talking with each other – there will be no need to maintain so many different TCP connections.

If all of this sounds very nice to you and you would like to try it out then please install at least the v0.25 version of Thanos and follow the documentation here. A huge thanks to everyone involved in this project!!! 🍺 I hope that we will receive some feedback from our users about groupcache!