Welcome to the inaugural blog post from LETSQL! Today, we delve into a unique implementation combining DataFusion, XGBoost to create an efficient and streamlined machine learning (ML) pipeline. This post is not just technical; it’s a narrative about transforming data into decisions. You can find the complete code on GitHub.
Introduction
Here is a query representing a common workflow that data scientists and data analysts may run to score a model on unseen data:
SELECT
customer_id,RANK() OVER (PARTITION BY customer_id
ORDER BY churn_model(f0, f1, f2) DESC) AS churn_rank
FROM
data
WHERE
<=10 and f0=1
churn_rankORDER BY
DESC; churn_rank
In this query, we aim to batch score a pre-trained XGBoost churn model using three features, focusing on the top 10 customer IDs with the highest churn probability. It’s a prevalent method, particularly useful for A/B testing model performance or making batch decisions based on model predictions, such as churn prevention or credit assessments.
Typically, organizations end up building their own model scoring workflows that are optimized for their use case.
This post demonstrates how DataFusion’s User Defined Functions (UDFs) can build a custom ML scoring pipeline in Rust.
XGBoost Scoring Workflow
A common workflow is illustrated in the figure below:
- Data Preparation In general, a scoring pipeline starts off with some SQL script that does the data gathering and feature engineering required to prepare the data for scoring.
- Preprocessing Once the data is gathered, it is further preprocessed and transformed into a format that the model can understand e.g. encoding categorical features, normalizing continuous features etc.
- Model Deployment/Inference The model is then either transformed or pushed behind an API for serving. All prepcoesing steps need to be recreated and the model inference code may be transformed into a production system language. The preprocessed data is then converted to multidimentional array or Tensor format (DMatrix for XGBoost) and then fed into the model to generate predictions.
There are a few problems with this approach:
- Non-Portability: The SQL script is not portable and cannot be reused in other applications.
- Pipeline Complexity: It is difficult to pipeline the workflow since the data is read and written to a storage in between stages. This also means that it is not easy to do predicate pushdowns and other optimizations e.g. tree pruning or constant folding.
- Language Disparity: The XGBoost model may be rewritten in another language for production deployment scenarios and may differ in case of online and batch scoring workflows.
Pipelining XGBoost Scoring with DataFusion
Implementation
In a multi-engine world, DataFusion excels at User Defined Function (UDFs) and User Defined Aggregates Function (UDAFs) and its window variants. For example, Velox does not support Window variants yet.
DataFusion makes it easy to define a UDF in Rust and register it with it’s SQL engine. Since DataFusion is built on Apache Arrow, it take advantage of vectorization, cache locality, and parallelism by working on an array chunk at a time, as opposed to running UDFs one row at a time.
fn onehot(args: &[ArrayRef]) -> Result<ArrayRef> {
let data: &DictionaryArray<Int32Type> = as_dictionary_array::<_>(&args[0]);
let key = data.keys();
let values = data.values();
let values = values.as_any().downcast_ref::<StringArray>().unwrap();
let struct_builder = StructBuilder::from_fields(
Fields::from(vec![
Field::new("key", DataType::Utf8, false),
Field::new("value", DataType::Boolean, false),
,
])2,
;
)let mut list_builder = ListBuilder::new(struct_builder);
for key_value in key.iter() {
for (j, struct_key) in values.iter().enumerate() {
let struct_value = j == key_value.unwrap() as usize;
list_builder.values()
.field_builder::<StringBuilder>(0)
.unwrap()
.append_value(struct_key.unwrap());
list_builder.values()
.field_builder::<BooleanBuilder>(1)
.unwrap()
.append_value(struct_value);
.values().append(true);
list_builder}
.append(true);
list_builder}
let list_array = list_builder.finish();
Ok(Arc::new(list_array))
}
This results in a SQL query like the following:
SELECT onehot(arrow_cast(f0, 'Dictionary<Int32, Utf8>')) as hotted FROM data
out:
+-------------------------------------------------------------------------------------------------------------------------------------------------+
| cap_shape |
+-------------------------------------------------------------------------------------------------------------------------------------------------+
| [{key: x, value: true}, {key: b, value: false}, {key: s, value: false}, {key: f, value: false}, {key: k, value: false}, {key: c, value: false}] |
| [{key: x, value: true}, {key: b, value: false}, {key: s, value: false}, {key: f, value: false}, {key: k, value: false}, {key: c, value: false}] |
| [{key: x, value: false}, {key: b, value: true}, {key: s, value: false}, {key: f, value: false}, {key: k, value: false}, {key: c, value: false}] |
| [{key: x, value: true}, {key: b, value: false}, {key: s, value: false}, {key: f, value: false}, {key: k, value: false}, {key: c, value: false}] |
| [{key: x, value: true}, {key: b, value: false}, {key: s, value: false}, {key: f, value: false}, {key: k, value: false}, {key: c, value: false}] |
Now that we have the data in a format that the model can understand, we need to convert it into DMatrix before scoring using the XGBoost c++ library.
fn predict(args: &[ArrayRef]) -> Result<ArrayRef> {
let mut result = Vec::new();
let mut num_rows_final = 0;
println!("args len: {}", args.len());
for arg in args {
let (result_col, num_rows, _dim_names_col) = to_dense(arg)?;
let chunks = result_col.chunks(result_col.len() / num_rows);
if result.is_empty() {
for chunk in chunks {
.push(process_chunk(chunk));
result}
= num_rows;
num_rows_final } else {
for (index, chunk) in chunks.enumerate() {
.get_mut(index).unwrap().extend(process_chunk(chunk));
result}
}
}
let mut data_transform: Vec<f32> = Vec::new();
for re in result {
.extend(re);
data_transform}
let dmat = DMatrix::from_dense(&data_transform, num_rows_final).unwrap();
println!("dmat shape: {:?}", dmat.shape());
let booster = Booster::load("model.xgb").unwrap();
let result = Float32Array::from(booster.predict(&dmat).unwrap());
Ok(Arc::new(result))
}
SELECT predict(onehot(arrow_cast(f0, 'Dictionary<Int32, Utf8>'))) as predictions FROM data
Here complete SQL query for the scoring pipeline:
SELECT predict(cap_shape, cap_surface, cap_color, bruises, odor, gill_attachment, gill_spacing, gill_size, gill_color, stalk_shape, stalk_root,
stalk_surface_above_ring, stalk_surface_below_ring, stalk_color_above_ring, stalk_color_below_ring, veil_type, veil_color, AS predictions
ring_number, ring_type, population, habitat ) FROM (
SELECT onehot(arrow_cast(cap_shape, 'Dictionary(Int32, Utf8)')) AS cap_shape,
'Dictionary(Int32, Utf8)')) AS cap_surface,
onehot(arrow_cast(cap_surface, 'Dictionary(Int32, Utf8)')) AS cap_color,
onehot(arrow_cast(cap_color, 'Dictionary(Int32, Utf8)')) AS bruises,
onehot(arrow_cast(bruises, 'Dictionary(Int32, Utf8)')) AS odor,
onehot(arrow_cast(odor, 'Dictionary(Int32, Utf8)')) AS gill_attachment,
onehot(arrow_cast(gill_attachment, 'Dictionary(Int32, Utf8)')) AS gill_spacing,
onehot(arrow_cast(gill_spacing, 'Dictionary(Int32, Utf8)')) AS gill_size,
onehot(arrow_cast(gill_size, 'Dictionary(Int32, Utf8)')) AS gill_color,
onehot(arrow_cast(gill_color, 'Dictionary(Int32, Utf8)')) AS stalk_shape,
onehot(arrow_cast(stalk_shape, 'Dictionary(Int32, Utf8)')) AS stalk_root,
onehot(arrow_cast(stalk_root, 'Dictionary(Int32, Utf8)')) AS stalk_surface_above_ring,
onehot(arrow_cast(stalk_surface_above_ring, 'Dictionary(Int32, Utf8)')) AS stalk_surface_below_ring,
onehot(arrow_cast(stalk_surface_below_ring, 'Dictionary(Int32, Utf8)')) AS stalk_color_above_ring,
onehot(arrow_cast(stalk_color_above_ring, 'Dictionary(Int32, Utf8)')) AS stalk_color_below_ring,
onehot(arrow_cast(stalk_color_below_ring, 'Dictionary(Int32, Utf8)')) AS veil_type,
onehot(arrow_cast(veil_type, 'Dictionary(Int32, Utf8)')) AS veil_color,
onehot(arrow_cast(veil_color, 'Dictionary(Int32, Utf8)')) AS ring_number,
onehot(arrow_cast(ring_number, 'Dictionary(Int32, Utf8)')) AS ring_type,
onehot(arrow_cast(ring_type, 'Dictionary(Int32, Utf8)')) AS population,
onehot(arrow_cast(population, 'Dictionary(Int32, Utf8)')) AS habitat
onehot(arrow_cast(habitat, FROM mushrooms) data;
+-------------+
| predictions |
+-------------+
| 0.50021684 |
| 0.50577056 |
| 0.50021684 |
| 0.50577056 |
| 0.50577056 |
+-------------+
Why DataFusion?
DataFusion’s UDF capabilities offer a game-changing advantage in the world of data science. By enabling SQL users to tap into powerful ML pipelines, we bridge the gap between data manipulation and data analysis. This is a huge win for data scientists and data analysts who can now use their existing SQL skills to build and deploy ML pipelines. Moreover, DataFusion provides the building blocks and primitives to be able to utilize the optimizations without diving too deep into database internals or recreating the foundations for a new database. In LETSQL’s case, we are extending DataFusion to add ML foucsed UDFs and new DataTypes that are amenable to Tensor processing, necessary for performing ML.
Why SQL?
SQL is declarative and high-level. SQL stands as the lingua franca of data. Its widespread adoption and ease of use make it an ideal candidate for integrating complex data processes, ensuring accessibility and efficiency.
As you see above, the complete generated SQL query is quite gnarly. While more complex SQL scripts exist everywhere, managing them is increasingly difficult. Hence, we are working on providing a DataFrame API that can compile down to the SQL that your engines understand e.g. DuckDB to BigQuery.
Micro-Benchmark
Using the mushroom dataset as an example, we compare our performance with a pandas equivalent code below:
import pandas as pd
def onehot(df):
= pd.get_dummies(df)
X return X
= pd.read_csv('data/mushrooms.csv')
df = onehot(df) hotted
We also add an equivalent benchmark in our Rust based implementation. Here are the numbers run on M1 Macbook (8 cores). You can try running the benchmarks using cargo bench
command in the GitHub repo.
To keep things apples-to-apples, we only benchmark the one-hot encoding, including loading the CSV.
DataFusion | pandas | |
---|---|---|
one-hot | 208.14 µs | 21.7 ms |
This is a 100x speed-up. We observe similar speed-ups for predict method and we will compare benchmarks for it in the future. One thing to note is that pandas outputs each column as a separate columns, while DataFusion outputs a single array of structs. Moreover, in the pandas case, the whole CSV is loaded in-memory while DataFusion streams the CSV from disk at run-time and only registers its schema beforehand.
Takeaways
This journey with DataFusion and XGBoost opens doors to numerous optimizations yet to be explored. From specialized scoring implementations to intelligent plan rewriting, the possibilities are vast and promising.
Future work: LETSQL
At LETSQL, we are building an open-source python package that makes it easy for data scientists, machine learning engineers and data analysts to train, test and deploy Machine Learning models, all with SQL. In future blog posts, We will explore how to use the UDAF machinery for ML training and extend it to beyond inference use-cases. Please sign-up for our newsletter to stay up-to-date with our latest developments here. Together, we’re not just coding; we’re crafting the future of data-driven decision-making.