📜 ⬆️ ⬇️

VShard - horizontal scaling in Tarantool



My name is Vladislav, I participate in the development of Tarantool - the DBMS and the application server in one bottle. And today I will tell you how we implemented horizontal scaling in Tarantool using the VShard module.

First, a little theory.

Scaling is of two types: horizontal and vertical. Horizontal is divided into two types: replication and sharding. Replication is used to scale the calculations, sharding - to scale the data.

Sharding is divided into two types: sharding ranges and sharding hashes.

When sharding with ranges, we from each record in the cluster calculate some shard key. These shard keys are projected on a straight line, which is divided into ranges that we add to different physical nodes.

Sharding hashes is simpler: we assume a hash function for each record in the cluster, we add records with the same value of the hash function to one physical node.

I'll tell you about horizontal scaling using sharding by hashes.

Previous implementation


The first horizontal scaling module we had was Tarantool Shard . This is a very simple sharding hash that considers the shard key of the primary key of all entries in the cluster.

function shard_function(primary_key) return guava(crc32(primary_key), shard_count) end 

But then the challenge arose that Tarantool Shard was unable to cope for three fundamental reasons.

First, the locality of logically related data was required. When we have data that is connected logically, we want to always store it on the same physical node, no matter how the topology of the cluster changes and balancing is performed. And Tarantool Shard does not guarantee this. It considers the hash only by the primary keys, and when rebalancing, even entries with the same hash can split for some time - the transfer is not atomic.

The problem of the lack of local data prevented us the most. I will give an example. There is a bank in which the client opened an account. The data about the account and the client must always be physically stored together so that they can be read in one request, changed in one transaction, for example, when transferring money from the account. If you use classic sharding with Tarantool Shard, then accounts and customers will have different shard functions. Data may appear on different physical nodes. This greatly complicates both reading and transactional work with such a client.

 format = {{'id', 'unsigned'}, {'email', 'string'}} box.schema.create_space('customer', {format = format}) format = {{'id', 'unsigned'}, {'customer_id', 'unsigned'}, {'balance', 'number'}} box.schema.create_space('account', {format = format}) 

In the example above, the id fields may not easily coincide with accounts and clients. They are connected through the customer_id account field and customer_id id . The same id field would break the uniqueness of the primary key of accounts. And in another way, Shard cannot shard.

The next problem is slow rewarding . This is the classic problem of all shards on hashes. The bottom line is that when we change the composition of a cluster, we usually change the shard function, because it usually depends on the number of nodes. And when the function changes, you need to go through all the records of the cluster and recalculate the shard function again. Perhaps move some records. And while we are transferring them, we don’t know whether the data that is needed for the next incoming request has been transferred, maybe they are now in the process of transfer. Therefore, during resharing, it is necessary for each reading to make a request for two shard functions: the old and the new. Requests are two times slower, and for us this was unacceptable.

Another feature of Tarantool Shard was that if some nodes failed in the replica set, it shows poor read access .

New solution


To solve the three problems described, we created Tarantool VShard . Its key difference is that the data storage level is virtualized: virtual storage has appeared on top of the physical, and records are distributed over it. These storages are called buckets. The user does not need to think about what and on what physical node lies. A bucket is an atomic indivisible data unit, as in a classic sharding one tuple. VShard always stores buckets entirely on the same physical node and transfers all the data of one bucket atomic during the resharing. This ensures locality. We just need to put the data in one bucket, and we can always be sure that this data will be together for any changes to the cluster.



How can I put the data in one bucket? In the scheme that we previously entered for a bank customer, we will add a bucket id to the tables using the new field. If the linked data is the same, the entries will be in the same bucket. The advantage is that we can store these records with the same bucket id in different spaces (space), and even in different engines. bucket id provided regardless of how these records are stored.

 format = {{'id', 'unsigned'}, {'email', 'string'}, {'bucket_id', 'unsigned'}} box.schema.create_space('customer', {format = format}) format = {{'id', 'unsigned'}, {'customer_id', 'unsigned'}, {'balance', 'number'}, {'bucket_id', 'unsigned'}} box.schema.create_space('account', {format = format}) 

Why are we so eager for this? If we have a classic sharding, then the data can spread across all the physical repositories that we only have. In the example with the bank, when requesting all the accounts of a customer, it is necessary to contact all the nodes. It turns out the difficulty of reading O (N), where N is the number of physical repositories. Terribly slow.

Thanks to buckets and locality by bucket id we can always read data from one node in one request, regardless of the cluster size.



Calculate the bucket id and assign the same values ​​to yourself. For some, this is an advantage, for someone a disadvantage. I consider it an advantage that you can choose the function to calculate the bucket id .

What is the key difference between a classic sharding and a virtual one with buckets?

In the first case, when we change the composition of the cluster, we have two states: the current (old) and the new, into which we have to go. In the course of the transition, it is necessary not only to transfer the data, but also to recalculate the hash functions for all records. This is very inconvenient, because at any point in time we do not know which data has been transferred and which is not. In addition, it is not reliable and not atomic, since for an atomic transfer of a set of records with the same hash function value, it is necessary to persistently store the transfer state in case of the need for recovery. There are conflicts, errors, you have to repeatedly restart the procedure.

Virtual sharding is much easier. We do not have two selected cluster states; there is only a bucket state. The cluster becomes more maneuverable, it gradually moves from one state to another. And now there are more than two states. Thanks to the smooth transition, you can change the balancing on the fly, and delete the newly added storages. That is, the controllability of balancing is greatly increased, it becomes granular.

Using


Suppose we chose a function for the bucket id and poured so much data into the cluster that there was no more room. Now we want to add nodes, and that the data on them moved. In VShard, this is done as follows. First, we launch new nodes and Tarantool on them, and then update the VShard configuration. It describes all cluster members, all replicas, replica sets, masters, assigned URIs, and more. We add new nodes to the configuration, and use the VShard.storage.cfg function VShard.storage.cfg apply it on all nodes of the cluster.

 function create_user(email) local customer_id = next_id() local bucket_id = crc32(customer_id) box.space.customer:insert(customer_id, email, bucket_id) end function add_account(customer_id) local id = next_id() local bucket_id = crc32(customer_id) box.space.account:insert(id, customer_id, 0, bucket_id) end 

As you remember, with classical sharding, the change in the number of nodes also changes the shard function. In VShard this does not happen, we have a fixed number of virtual storages - buckets. This is the constant that you select when starting a cluster. It may seem that because of this, scalability is limited, but in fact not. You can choose a huge number of buckets, tens and hundreds of thousands. The main thing is that they have at least two orders of magnitude more than the maximum number of replica sets that you will ever have in a cluster.



Since the number of virtual storages does not change, and the shard function depends only on this value, we can add as many physical storages without recalculating the shard function.

How are buckets distributed independently to physical storage? When VShard.storage.cfg is called, the rebalancing process wakes up on one of the nodes. This is an analytical process that calculates the perfect balance in a cluster. He goes to all the physical nodes, asks who has many buckets, and builds their movement routes to average distribution. The rebalancer sends routes to crowded storages, and they begin sending buckets. After some time, the cluster becomes balanced.

But in real projects the concept of perfect balance may be different. For example, I want to store less data on one replica set than on the other, because there is less volume of hard drives. VShard thinks that everything is well balanced, and in fact my storage is about to overflow. We have provided a mechanism for adjusting balancing rules using weights. Each replica set and storage can be given a weight. When the balancer makes a decision about how many buckets to send to whom, it takes into account the relationships of all pairs of scales.

For example, one storage has a weight of 100, and another has a 200. Then the first will store two times less buckets than the second. Please note that I am talking about the attitude of the scales. Absolute values ​​have no influence. You can choose weights based on a 100% distribution across the cluster: one storage has 30%, the other has 70%. You can take as a basis the storage capacity in gigabytes, or you can measure weights in the number of buckets. The main thing is to observe the attitude you need.



Such a system has an interesting side effect: if some storage is assigned zero weight, then the balancer will order the storage to distribute all its buckets. After that you can remove the entire replica set from the configuration.

Atomic transfer bucket'a


We have a bucket, it accepts any requests for reading and writing, and here the balancer asks to transfer it to another storage. Bucket stops accepting write requests, otherwise it will have time to update during the transfer, then they will have time to update the portable update, then the portable update of the update, and so on to infinity. Therefore, the record is blocked, but you can still read from the bucket. The transfer of chunks to a new location begins. After the transfer is complete, the bucket will begin accepting requests again. At the old place it also lies, but it is already marked as garbage, and subsequently the garbage collector will remove it chunk after chunk.

Associated with each bucket is metadata that is physically stored on disk. All the above steps are saved to disk, and whatever happens to the storage, the state of the bucket will be automatically restored.

You might have questions:


VShard.router


VShard consists of two submodules: VShard.storage and VShard.router. They can be independently created and scaled even on one instance. When accessing the cluster, we don’t know where the bucket is, and VShard.router will look for it by bucket id for us.

Let's look at an example of how it looks. We return to the bank cluster and client accounts. I want to be able to pull out all the accounts of a specific client from the cluster. To do this, I write the usual function for local search:



She searches for all customer accounts by his id. Now I need to decide on which of the repositories to call this function. To do this, from the client ID in my request, I calculate the bucket id and ask VShard.router to call me such a function in the storage where the bucket lives with the resulting bucket id . In the submodule there is a routing table in which the location of the buckets in the replica set is recorded. And VShard.router proxies my request.

Of course, it may happen that at this time the resharing began and the buckets began to move. The router in the background gradually updates the table in large chunks: it requests the repositories of their actual bucket tables.

It may even happen that we turn to a bucket that has just moved, and the router has not yet had time to update its routing table. Then he will turn to the old storage, but it will either tell the router where to find the bucket, or simply answer that it does not have the necessary data. Then the router will go around all the vaults in the search for the desired bucket. And all this is transparent to us, we will not even notice a slip in the routing table.

Reading instability


Recall what problems we originally had:


The last problem is solved by VShard.router using the automatic read failover subsystem.

The router periodically pings the storage specified in the configuration. And one of them stopped pinging. The router has a hot backup connection to each replica, and if the current one stops responding, it will go to another. The read request will be processed normally, because on the replicas we can read (but not write). We can set the priority of the replicas, according to which the router must choose failover for readings. We do this with zoning.



Assign a zone number to each replica and each router and set the table, in which we indicate the distance between each pair of zones. When the router decides where to send the read request, it will select a cue in the zone closest to its own.

How it looks in the configuration:



In general, an arbitrary replica can be accessed, but if the cluster is large and complex, very much distributed, then zoning is very useful. Zones can be different server racks so as not to load the network with traffic. Or it may be geographically distant points.

Also, zoning helps with different performance replicas. For example, we have one backup replica in each replica set, which should not accept requests, but only keep a copy of the data. Then we do it in a zone that will be very far from all the routers in the table, and they will turn to it as a last resort.

Recording instability


Since we started talking about read failover, what about write failover when changing the wizard? Here, VShard does not have everything so rosy: the election of a new master is not implemented in it, you will have to do it yourself. When we have somehow chosen it, it is necessary that this instance now takes over the authority of the master. We update the configuration, specifying master = false for the old master, and master = true for the new master = true , apply it to VShard.storage.cfg and roll it out to the repositories. Then everything happens automatically. The old master stops accepting write requests and starts synchronization with the new one, because there may be data that has already been applied on the old master, and the new one has not yet arrived. After that, the new master enters the role and begins to accept requests, and the old master becomes a replica. This is how write failover works in VShard.

 replicas = new_cfg.sharding[uud].replicas replicas[old_master_uuid].master = false replicas[new_master_uuid].master = true vshard.storage.cfg(new_cfg) 

How now to monitor all these diversity of events?


In the general case, two handles are enough - VShard.storage.info and VShard.router.info .

VShard.storage.info shows information in several sections.

 vshard.storage.info() --- - replicasets: <replicaset_2>: uuid: <replicaset_2> master: uri: storage@127.0.0.1:3303 <replicaset_1>: uuid: <replicaset_1> master: missing bucket: receiving: 0 active: 0 total: 0 garbage: 0 pinned: 0 sending: 0 status: 2 replication: status: slave Alerts: - ['MISSING_MASTER', 'Master is not configured for ''replicaset <replicaset_1>'] 

The first is the replication section. Here you can see the status of the replica set, to which you have applied this function: what is its replication lag, with whom it has connections and with whom it does not, who is available and not available, to whom which master is configured, etc.

In the Bucket section, you can see in real time how many bucket'es are being moved to the current replica set, how many are leaving, how many are working on it in normal mode, how many are marked as garbage, and how much is attached.

The Alert section is such a hodgepodge of all the problems that VShard was able to independently determine: the master is not configured, the insufficient redundancy level, the master is there, but all the replicas have failed, etc.

And the last section is a light bulb that lights up red when everything gets very bad. It is a number from zero to three, the more, the worse.

VShard.router.info has the same sections, but they mean a little more.

 vshard.router.info() --- - replicasets: <replicaset_2>: replica: &0 status: available uri: storage@127.0.0.1:3303 uuid: 1e02ae8a-afc0-4e91-ba34-843a356b8ed7 bucket: available_rw: 500 uuid: <replicaset_2> master: *0 <replicaset_1>: replica: &1 status: available uri: storage@127.0.0.1:3301 uuid: 8a274925-a26d-47fc-9e1b-af88ce939412 bucket: available_rw: 400 uuid: <replicaset_1> master: *1 bucket: unreachable: 0 available_ro: 800 unknown: 200 available_rw: 700 status: 1 alerts: - ['UNKNOWN_BUCKETS', '200 buckets are not discovered'] 

The first section is replication. Но только здесь не репликационные лаги, а информация о доступности: какие подключения у роутера, каким replica set'ом они держатся, какое подключение горячее и какое резервное на случай отказа мастера, кто выбран мастером, на каком replica set'е сколько bucket'ов доступно на чтение и запись, сколько доступно только на чтение.

В секции Bucket отображается общее количество bucket'ов, которые на этом роутере доступны сейчас на чтение и запись или только на чтение; про расположение скольких bucket'ов роутер не знает; или знает, но не имеет подключения к нужному replica set'у.

В секции Alert, в основном, рассказывается про подключения, про срабатывания failover, про неопознанные bucket'ы.

Наконец, здесь тоже есть простейший индикатор от нуля до трех.

Что нужно для использования VShard?


Первое — выбрать константное количество bucket'ов. Почему нельзя просто задать с помощью int32_max ? Потому что с каждым bucket'ом хранятся метаданные — по 30 байтов в хранилище и по 16 байтов на роутере. Чем больше у вас bucket'ов, тем больше места занимают метаданные. Но в то же время у вас будет меньше размер bucket'а, а значит выше гранулярность кластера и скорость переноса одного bucket'а. Так что придётся выбрать, что вам важнее и какой запас масштабируемости вы хотите заложить.

Второе — нужно выбрать шард-функцию для вычисления bucket id . Здесь правила такие же, как при выборе шард-функции для классических шардингов, потому что bucket — это как если бы мы в классическом шардинге фиксировали количество хранилищ. Функция должна равномерно распределять выходные значения, иначе размеры bucket'ов будут расти неравномерно, а VShard оперирует только количеством bucket'ов. И если вы не сбалансируете свою шард-функцию, то данные придётся перекладывать из bucket'а в bucket, менять шард-функцию. Поэтому выбирать надо аккуратно.

Summary


Vshard обеспечивает:


VShard сейчас активно развивается. Реализация каких-то запланированных задач уже началась. Первое — это балансировка нагрузки на роутер . Бывают тяжёлые запросы на чтение, и грузить ими мастер не всегда целесообразно. Пусть бы роутер мог самостоятельно балансировать запросы на разные читающие реплики.

Второе — lock-free перенос bucket'ов . Уже реализован алгоритм, при помощи которого можно не блокировать bucket'ы на запись даже на время переноса. Это придется сделать только в конце, чтобы зафиксировать сам факт переноса.

Третье — атомарное применение конфигурации . Самостоятельно применять конфигурацию ко всем хранилищам неудобно и не атомарно: какое-то хранилище может быть недоступно, конфигурация не применилась, и что тогда делать? Поэтому мы работаем над механизмом автоматического распространения конфигурации.

Оригинал моего доклада

Source: https://habr.com/ru/post/436916/