📜 ⬆️ ⬇️

AresDB Demo: Uber's GPU-based, real-time open source analysis tool

Thanks to real-time analysis, we, Uber employees, get an idea of ​​the state of affairs and work efficiency and, on the basis of the data, decide how to improve the quality of work on the Uber platform. For example, the project team monitors market conditions and identifies potential problems on our platform; software based on machine learning models predicts passenger supply and demand for drivers; data processing specialists improve machine learning models — in turn, to improve the quality of forecasting.



In the past, for real-time analysis, we used database solutions from other companies, but none of them met all of our criteria for functionality, scalability, efficiency, cost, and performance requirements.


Released in November 2018, AresDB is an open source, real-time analysis tool. It uses an unconventional power source, graphics processors (GPU), which allows for an increase in the scale of analysis. The GPU technology, a promising real-time analysis tool, has advanced significantly in recent years, making it ideal for real-time parallel computing and data processing.


In the following sections, we describe the structure of AresDB and how this interesting solution for real-time analysis allowed us to more effectively and more rationally unify, simplify and improve solutions for Uber databases for real-time analysis. We hope that after reading this article, you will test AresDB as part of your own projects and also make sure that it is useful!


Uber applications for real-time analysis


Data analysis is crucial for the success of Uber. In addition to other functions, analytical tools are used to solve the following tasks:



We categorize these functions with different requirements as follows:



Information panels and decision-making systems use real-time analysis systems to create similar queries on relatively small, but highly important, data subsets (with a maximum level of data relevance) with high QPS and low latency.


Need for another analytic module


The most common problem for which Uber uses real-time analysis tools to solve is the computation of sets of time series. These calculations give an idea of ​​the interaction with users so that we can improve the quality of services accordingly. Based on them, we request indicators for certain parameters (for example, day, hour, city identifier and trip status) for a certain period of time for randomly filtered (or sometimes combined) data. Over the past years, Uber has deployed several systems aimed at solving this problem in various ways.


Here are some third-party solutions that we used to solve problems of this type:



Although these technologies have their strengths, they lacked some of the features necessary for our use case. We needed a unified, simplified and optimized solution, and in its search we worked in a non-standard direction (more precisely, inside the GPU).


Using the GPU for real-time analysis


For realistic rendering of images with a high frame rate, graphics processors simultaneously process a huge number of shapes and pixels at high speed. Although the trend towards an increase in the clock frequency of data processing units has diminished over the past few years, the number of transistors in a microchip has increased only according to Moore's law . As a result, GPU computing speed, measured in gigaflops per second (Gflops / s), is increasing rapidly. Figure 1 below shows a comparison of the theoretical speed trend (Gflops / s) of the GPU from NVIDIA and the CPU from Intel over a number of years:



Figure 1. Comparison of CPU and GPU performance on single-precision floating point over several years. The image is taken from Nvidia's CUDA C Programming Guide.


When developing a real-time analysis request mechanism, the decision to integrate the GPU was natural. In Uber, a typical request for real-time analysis requires processing data for several days with millions, or even billions of data, then filtering and summarizing them in a short period of time. This computational task fits perfectly into the general-purpose GPU parallel processing model, since they are:



Focusing on the use of a GPU-based analytical database, we - from the perspective of our needs - evaluated several existing analytical solutions that use GPUs:



In general, these systems demonstrate a huge advantage and potential of data processing using GPU technology, and they inspired us to create our own real-time GPU-based analysis solution adapted to the needs of Uber. Based on these concepts, we developed and opened the AresDB source code.


AresDB Architecture Overview


At a high level, AresDB stores most of the data in the host memory (RAM, which is connected to the CPU), uses the CPU to process the received data, and drives to recover the data. During the request period, AresDB transfers data from host memory to GPU memory for parallel processing to the GPU. As shown in Figure 2 below, AresDB includes storage, metadata, and disk:



Figure 2. AresDB's unique architecture includes storage, disk, and metadata storage.


Tables


Unlike most relational database management systems (RDBMS), AresDB does not have a database or schema scope. All tables belong to the same scope in the same cluster / AresDB instance, allowing users to access them directly. Users store their data in the form of fact tables and dimension tables.


Fact table


The fact table stores an endless stream of time series events. Users use a fact table to store events / facts that occur in real time, and each event is associated with an event time, while the table is often requested by an event time. An example of the type of information that is stored in the fact table is trips, where each trip is an event, and the time a trip request is often referred to as an event time. If several time stamps are associated with an event, only one time stamp is specified as the event time and is displayed in the fact table.


Dimension table


The measurement table stores the current characteristics of the objects (including cities, customers and drivers). For example, users can store information about a city, in particular the name of a city, a time zone and a country, in a dimension table. Unlike fact tables, which are constantly increasing, dimension tables are always limited in size (for example, for Uber, the city table is limited to the actual number of cities in the world). Dimension tables do not require a special time column.


Data types


The table below shows the current data types that are supported in AresDB:



In AresDB, strings are converted to enumerated types (enums) automatically before entering the database, in order to increase the convenience of storage and query efficiency. This allows equality checking to be case sensitive, but does not support advanced operations such as concatenation, substrings, masks, and regular expression matching. In the future, we intend to add the full line support option.


Main functions


The AresDB architecture supports the following functions:



Columnar Storage


Vector


AresDB stores all data in a column format. The values ​​of each column are stored as a vector of column values. The confidence / uncertainty marker of the values ​​in each column is stored in a separate zero vector, and the confidence marker of each value is represented as one bit.


Active storage


AresDB stores uncompressed and unsorted column data (active vectors) in the active repository. Data records in the active repository are divided into (active) packages of a given size. New packages are created when data is received, while old packages are deleted after archiving records. The primary key index is used to locate records for deduplication and updates. Figure 3 below shows how we organize active records and use the primary key value to determine their location:



Figure 3. We use the primary key value to determine the location of the packet and the position of each entry within the packet.


The values ​​of each column in a batch are stored as a column vector. The confidence / uncertainty marker of the values ​​in each value vector is stored as a separate zero vector, and the confidence marker of each value is represented as one bit. In Figure 4 below, we offer an example with five values ​​for the city_id column:



Figure 4. We store values ​​(actual value) and zero vectors (confidence marker) of uncompressed columns in the data table.


Archive storage


AresDB also stores complete, sorted, and compressed columnar data (archive vectors) in archive storage through fact tables. Records in the archive repository are also distributed into packages. Unlike active packages, the archive package keeps records for the day, Coordinated Universal Time (UTC). An archive package uses the number of days as a package identifier since Unix Epoch.


Records are stored in sorted form in accordance with the user-defined column sort order. As shown in Figure 5 below, we sort by first the city_id column and then by the state column:



Figure 5. We sort all rows by city_id, then by state, then compress each column by group encoding. After sorting and compression, each column will receive an accounting vector.


The purpose of customizing the sort order of columns by the user is as follows:



A column is compressed only if it is present in a user-defined sort order. We are not trying to compress columns with a large number of elements, since this gives us a small amount of memory savings.


After sorting, the data for each qualified column is compressed using a specific group coding option. In addition to the value vector and the zero vector, we introduce an accounting vector to re-represent the same value.


Receive real-time data with update and insert functions


Clients get data through the HTTP API by publishing a service pack. The update package is a special ordered binary format that minimizes the use of space while maintaining random access to the data.


When AresDB receives the update package, it first writes the update package to the recovery log. When an update package is added to the end of the event log, AresDB identifies and skips late entries in fact tables for use in the active repository. A record is considered “late” if the event time is located before the archive time of the shutdown event. For records that are not considered late, AresDB uses the primary key index to locate the package within the active repository where they should be inserted. As shown in Figure 6 below, new records (not previously encountered based on the primary key value) are inserted into the empty space, and existing records are updated directly:



Figure 6. When data is received, after adding the update package to the event log, the “late” entries are added to the back queue, and the other entries are added to the active repository.


Archiving


When data is received, records are either added / updated in the active repository, or added to the back queue waiting to be placed in the archive repository.


We periodically launch a planned process, referred to as archiving, with respect to the records of the active repository to attach new records (records that have never been previously archived) to the archive repository. The archiving process only processes entries in the active storage with an event time in the range between the old shutdown time (shutdown time from the last archiving process) and the new shutdown time (new shutdown time based on the archiving delay parameter in the table schema).


Record event time is used to determine which archive package records should be combined into when packaging archived data into daily packages. Archiving does not require deduplicating the index of the primary key value when merging, since only records in the range between the old and the new off time are archived.


Figure 7 below shows a graph according to the event time of a particular entry.



Figure 7. We use event time and trip time to define the records as new (active) and old (the event time is located before the archive time of the trip event).


In this case, the archive interval is the time interval between two archiving processes, and the archive delay is the period after the event time, but before the event can be archived. Both parameters are defined in the schema settings of the AresDB table.


Backfilling


As shown in Figure 7 above, old records (the time of which events are located before the archival time of the disconnection event) are added to the fact tables in reverse queue and ultimately processed as part of the backfill process. Triggers of this process are also the time or the size of the reverse queue, if it reaches the threshold level. Compared to the process of adding data to the active storage, backfilling is asynchronous and relatively more expensive in terms of CPU and memory resources. Backfilling is used in the following scenarios:



Unlike archiving, the backfill process is idempotent and requires deduplication based on the value of the primary key. The fill data will eventually be visible for requests.


The backward queue is maintained in memory with a predefined size, and under heavy load backfill the process will be blocked for the client until the queue is cleared by starting the backfill process.


Processing request


In the current implementation, the user needs to use the Ares Query Language (AQL) language created by Uber to execute queries in AresDB. AQL is an effective language for analytic queries on time series and does not follow the standard SQL syntax of the “SELECT FROM WHERE GROUP BY” type, like other languages ​​similar to SQL. Instead, AQL is used in structured fields and can be included in JSON, YAML, and Go objects. For example, instead of ВЫБРАТЬ/SELECT пункт(*) ИЗ/FROM ГРУППЫ поездок ПО/GROUP BY city_id, ГДЕ/WHERE статус = «завершено» И/AND request_at >= 1512000000 , the equivalent AQL variant in JSON is written as follows:


 { “table”: “trips”, “dimensions”: [ {“sqlExpression”: “city_id”} ], “measures”: [ {“sqlExpression”: “count(*)”} ], ;”> “rowFilters”: [ “status = 'completed'” ], “timeFilter”: { “column”: “request_at”, “from”: “2 days ago” } } 

In the JSON format, AQL offers developers of a dashboard and decision-making system a more convenient software query algorithm than SQL, allowing them to easily compose queries and manipulate them with code, without worrying about things like SQL injection. It acts as a universal query format for typical architectures of web browsers, external and internal servers, up to the database (AresDB). In addition, AQL provides convenient syntax for time filtering and packaging with support for its own time zone. In addition, the language supports a number of functions, such as implicit subqueries, to prevent common errors in queries and facilitates the process of analyzing and rewriting queries for developers of the internal interface.


Despite the many benefits that AQL offers, we are well aware that most engineers are more familiar with SQL. Providing a SQL interface to execute queries is one of the following steps, which we will consider as part of working to improve interaction with AresDB users.


The execution pattern for an AQL query is shown in Figure 8 below:



Figure 8. The AresDB query execution scheme uses our own AQL query language to process and retrieve data quickly and efficiently.


Compiling queries


An AQL query is compiled into an internal query context. Expressions in filters, dimensions, and parameters are analyzed in abstract syntax trees (AST) for further processing through the graphics processor (GPU).


Data loading


AresDB uses pre-filters to cheaply filter archived data before sending it to the GPU for parallel processing. Because the archived data is sorted according to the configured column order, some filters may use this sort order and binary search method to determine the appropriate range of matches. In particular, equivalent filters for all initially sorted X columns and an optional range filter for sorted X + 1 columns can be used as prefilters, as shown in Figure 9 below.



Figure 9. AresDB pre-filters the column data before sending it to the GPU for processing.


After pre-filtering, only green values ​​(matching the filter condition) should be sent to the GPU for parallel processing. Input data is loaded into the GPU and processed one packet at a time. This includes both active packages and archive packages.


AresDB uses CUDA streams for pipelining and data processing. For each request, two threads are alternately applied for processing in two overlapping stages. In Figure 10 below, we propose a graph illustrating this process.



Figure 10. In AresDB, two CUDA streams are alternately involved in the transmission and processing of data.


Request execution


For simplicity, AresDB uses the Thrust library to implement query execution procedures, which offer blocks of a finely tuned parallel algorithm for quickly implementing the current query tool.


In Thrust, input and output vector data are evaluated using random access iterators. Each GPU thread looks for input iterators in its working position, reads the values ​​and performs calculations, and then writes the result to the corresponding position in the output iterator.


To calculate the AresDB expressions, the “one operator per core” (OOPK) model follows.


In Figure 11 below, this procedure is demonstrated using the example of an AST generated from the dimension expression request_at – request_at % 86400 at the compilation of the request:



Figure 11. AresDB uses the OOPK model to evaluate expressions.


In the OOPK model, the AresDB query engine traverses each end node of the AST tree and returns an iterator for the source node. If the root node is also final, the root action is performed directly on the input iterator.


For each non-root non-terminal node ( module operation in this example), a temporary workspace vector is allocated to store the intermediate result obtained from the request_at% 86400 expression request_at% 86400 . With Thrust, the kernel function is run to calculate the result for this operator in the GPU. Results are stored in the workspace iterator.


For the root node, the kernel function runs in the same way as for a non-root, non-finite node. Various output actions are performed based on the type of expression, as described in detail below:



After the expression is evaluated, the sorting and transformation are performed to finally merge the data. In the operations of sorting and transformation, we use the values ​​of the dimension vector as the key values ​​of sorting and transformation, and the values ​​of the vector of parameters as the values ​​for combining data. Thus, rows with similar measurement values ​​are grouped and merged. Figure 12 below shows this sorting and conversion process.



Figure 12. After calculating the expression, AresDB sorts and converts the data by the key values ​​of the measurement vectors (key value) and parameters (value).


AresDB also supports the following advanced query features:



Resource management


Being a database based on internal memory, AresDB must manage the following types of memory usage:



When you run AresDB, it uses the configured total memory budget. The budget is divided into all six types of memory and must also leave enough space for the operating system and other processes. This budget also includes a statically configured overload estimate, active data storage monitored by the server, and historical data that the server may decide to load and delete, depending on the remaining memory budget.
Figure 13 below shows the memory model of the AresDB host.



Figure 13. AresDB manages its own memory usage so that it does not exceed the configured overall process budget.


AresDB allows users to set up pre-load days and column-level priorities for fact tables and preloads archived data only on pre-load days. Data that is not preloaded is loaded into memory from disk on demand. When filled, AresDB also deletes archived data from host memory. The principles of AresDB removal are based on the following parameters: the number of days of preloading, the priorities of the columns, the day of the package, and the size of the column.


AresDB also manages multiple GPU devices and simulates device resources like GPU threads and device memory, tracking GPU memory usage for processing requests. AresDB manages GPU devices through a device manager, which models GPU device resources in two dimensions (GPU threads and device memory) and tracks memory usage when processing requests. After compiling a query, AresDB allows users to estimate the amount of resources required to execute the query. Device memory requirements must be met before the request is resolved; if there is not enough memory on any device at the moment, the request should wait for execution. Currently, AresDB can perform one or more requests on a single GPU device at the same time, if the device meets all resource requirements.


In the current implementation, AresDB does not cache input data in the device for reuse in multiple requests. AresDB aims to support queries to datasets that are constantly updated in real time and poorly amenable to correct caching. In future versions of AresDB, we intend to implement data caching functions in the GPU memory, which will help optimize query performance.


Usage example: Uber overview dashboard


In Uber, we use AresDB to create dashboards for getting business information in real time. AresDB is responsible for storing primary events with constant updates and calculating critical metrics for them in a fraction of a second thanks to GPU resources at low cost, so users can use dashboards in interactive mode. For example, anonymized travel data that has a long lifetime in the data warehouse is updated by several services, including our dispatching system, payment systems and estimates. To efficiently use travel data, users divide and split data across multiple dimensions to get an idea of ​​real-time decisions.


When using AresDB, the Uber dashboard is a widely used analysis dashboard that is used by in-company teams to generate relevant metrics and real-time response to improve user experience.



Рисунок 14. В почасовом режиме обзорная информационная панель Uber использует AresDB для просмотра аналитики данных в реальном времени в течение определенных периодов.


Чтобы создать макет информационной панели, приведенной выше, мы смоделировали следующие таблицы:


Поездки (таблица фактов)



Города (таблица измерений)



Схемы таблиц в AresDB


Для создания двух моделируемых таблиц, описанных выше, сначала нам нужно создать таблицы в AresDB по следующим схемам:



Как описано в схеме, таблицы поездок создаются в качестве таблиц фактов, которые отражают события поездок, происходящие в реальном времени, а таблицы городов создаются в качестве таблиц измерений, в которых хранится информация о фактических городах.


После создания таблиц пользователи могут использовать клиентскую библиотеку AresDB для загрузки данных из шины событий, например Apache Kafka , или с платформ потоковой или пакетной обработки, например Apache Flink или Apache Spark .


Примеры запросов для AresDB


В пробных информационных панелях мы выбираем в качестве примера две метрики, «общий тариф на поездку» и «активные водители». В информационной панели пользователи могут отфильтровать город по метрикам, например Сан-Франциско. Для отображения временных рядов по этим двум метрикам за последние 24 часа в информационных панелях можно запустить следующие запросы AQL:



Пример результатов по запросу:
Результатом приведенных выше пробных запросов будут следующие временные ряды, которые можно легко представить на графиках временных рядов, как показано ниже.



В приведенном выше примере мы продемонстрировали, как можно использовать AresDB для приема первичных событий, происходящих в режиме реального времени, в течение нескольких секунд и немедленного выполнения произвольных пользовательских запросов данных для вычисления метрик за доли секунды. AresDB помогает инженерам в создании средств обработки данных, которые определяют критически важные для бизнеса метрики, что требует понимания человеческих или автоматических решений в реальном времени.


Следующие шаги


AresDB широко используется в Uber для поддержки информационных панелей анализа данных в реальном времени, что позволяет принимать соответствующие решения на основе данных по множеству аспектов бизнеса. Открывая исходный код этого инструмента, мы надеемся, что другие представители сообщества смогут использовать AresDB в своих аналитических целях.


В будущем мы намерены добавить в проект следующие функции:



AresDB имеет открытый исходный код по лицензии Apache. Мы приглашаем вас попробовать AresDB и присоединиться к нашему сообществу.


Если вас интересует разработка технологий крупномасштабного анализа данных в реальном времени, подумайте о подаче заявки на работу в нашей команде.


Благодарности


Особая благодарность Кейт Чанг (Kate Zhang), Дженнифер Андерсон (Jennifer Anderson), Нихилу Джоши (Nikhil Joshi), Аби Куну (Abhi Khune), Шене Цзи (Shengyue Ji), Чинмэю Сомэну (Chinmay Soman), Сяну Фу (Xiang Fu), Дэвиду Чену (David Chen) и Ли Нингу (Li Ning) за то, что обеспечили невероятный успех этого проекта!



Source: https://habr.com/ru/post/440072/