Built-in Inference for DataFrames

We are previewing a built-in predict UDF for XGBoost. This feature allows to easily score XGBoost models using a performant Rust-based inference engine.

preview
Authors

Hussain Sultan

Dan Lovell

Published

August 22, 2024

Modified

August 24, 2024

TL;DR

Users often build custom machinery for their common ML inference tasks e.g. online inference and batch scoring. To address the batch scoring use-case, we are previewing a new built-in UDF for XGBoost models in letsql that can be used to score data row-wise via a DataFrame API. Moreoever, with letsql’s multi-engine support, data practitioners do not have to leave their DataFrame API to connect to SQL sources from a familiar high-level, pandas-like, DSL.

You can drop into an IPython shell with the latest to try out the built-in UDFs.

nix run github:letsql/letsql

Introduction

Scoring1 is a fundamental task in data workflows, yet it often presents significant challenges. In our experience at enterprises, we encountered these difficulties firsthand, having to build custom infrastructure to make scoring accessible for downstream analysts. The complexity stemmed not only from the absence of a high-level API but also from the need to score networks of interconnected models, interwoven with preprocessing and ETL operations. These workflows often involved feeding the output of one trained model pipeline into another, ultimately aggregating results into meaningful buckets. These pipelines can be conceptualized as Directed Acyclic Graphs (DAGs) of operators, including ML models and preprocessing steps. The more flexibility we provided, allowing analysts to experiment with different assumptions, the better the outcomes for the business.

However, this flexibility came at a cost. Efforts to optimize performance often introduced additional complexity and maintenance overhead. Despite our best efforts, the limitations of our tools frequently led to inefficiencies and challenges that hindered composability and clarity. Model developers had to rewrite trained model pipelines in a performant manner, often relying on code translation tools like numba or cython, which posed a significant barrier for many data scientists unfamiliar with these tools. Model consumers needed sufficient knowledge of SQL and the underlying models to build the scoring dataset and then pass it to the trained pipeline in Python, along with the necessary preprocessing steps and features. Infrastructure engineers were tasked with maintaining the infrastructure that enabled the seamless execution of these pipelines, often creating custom tooling on platforms like dask or spark.

This experience prompted us to question the status quo. What if you could register all your models directly with your backend, right from within XGBoost? What if you could achieve near-native performance without relying on specialists? And what if you didn’t have to navigate between the often disparate worlds of SQL and Python and the myriad pipeline infrastructures?

These questions have shaped out understanding of data scoring pipelines and fueled my desire to streamlining and improving this process. We previously explored datafusion based UDFs here.

Why it matters

Batch scoring pipelines are prevalent, often serving as the primary inference workload in enterprises. Batch inference is frequently preferred over online inference in most enterprise scenarios due to its ability to efficiently process large volumes of data and leverage techniques like vectorization. This approach allows for better resource utilization and scalability (see the Raven paper by Microsoft).

Batch scoring pipelines are integral to enterprise forecasting processes and large-scale business scenario simulations. These pipelines predict future business states based on current and historical data, making them essential in industries like finance, supply chain, and retail. Scoring is more frequently utilized than training, necessitating performance and scalability optimizations for scoring pipelines.

Given that most historical and current data resides in SQL databases across various vendors with unique SQL dialects, there is a need for a pandas-like API2 to connect to these sources for preprocessing and ETL tasks, such as joining and filtering. Preparing the appropriate source variables for the model is often challenging, as the data is rarely clean or ready for use. This adds significant overhead for model consumers, who must maintain a SQL model that outputs data suitable for the model’s requirements

By providing a built-in UDF for XGBoost model scoring, we aim to streamline this process, making it more accessible and efficient for data practitioners.

How people solve it today?

In today’s landscape, data scientists and engineers tackle the challenges of scoring using a variety of tools and practices. Each approach has its strengths, but they often come with significant trade-offs that can lead to inefficiencies, complexity, and maintenance challenges. Let’s explore the common methods:

1. SQL and Python: Two Languages, Two Worlds Far Apart

One of the most prevalent approaches involves using SQL for data management and Python for model development and scoring.

SQL is commonly used to store, manipulate, and prepare data for analysis. Analysts and data engineers often create complex queries to join, filter, and aggregate data, making it ready for model input.

SQL Example: Preparing data in a SQL database (e.g., Postgres).

WITH diamonds_encoded AS (
    SELECT
        carat,
        depth,
        table,
        price,
        x,
        y,
        z,
        CASE WHEN cut = 'Fair' THEN 1 ELSE 0 END AS cut_Fair,
        CASE WHEN cut = 'Good' THEN 1 ELSE 0 END AS cut_Good,
        CASE WHEN cut = 'Very Good' THEN 1 ELSE 0 END AS cut_Very_Good,
        CASE WHEN cut = 'Premium' THEN 1 ELSE 0 END AS cut_Premium,
        CASE WHEN cut = 'Ideal' THEN 1 ELSE 0 END AS cut_Ideal,
        CASE WHEN color = 'D' THEN 1 ELSE 0 END AS color_D,
        CASE WHEN color = 'E' THEN 1 ELSE 0 END AS color_E,
        CASE WHEN color = 'F' THEN 1 ELSE 0 END AS color_F,
        CASE WHEN color = 'G' THEN 1 ELSE 0 END AS color_G,
        CASE WHEN color = 'H' THEN 1 ELSE 0 END AS color_H,
        CASE WHEN color = 'I' THEN 1 ELSE 0 END AS color_I,
        CASE WHEN color = 'J' THEN 1 ELSE 0 END AS color_J,
        CASE WHEN clarity = 'I1' THEN 1 ELSE 0 END AS clarity_I1,
        CASE WHEN clarity = 'SI2' THEN 1 ELSE 0 END AS clarity_SI2,
        CASE WHEN clarity = 'SI1' THEN 1 ELSE 0 END AS clarity_SI1,
        CASE WHEN clarity = 'VS2' THEN 1 ELSE 0 END AS clarity_VS2,
        CASE WHEN clarity = 'VS1' THEN 1 ELSE 0 END AS clarity_VS1,
        CASE WHEN clarity = 'VVS2' THEN 1 ELSE 0 END AS clarity_VVS2,
        CASE WHEN clarity = 'VVS1' THEN 1 ELSE 0 END AS clarity_VVS1,
        CASE WHEN clarity = 'IF' THEN 1 ELSE 0 END AS clarity_IF
        
    FROM
        Diamonds
)

SELECT * FROM diamonds_encoded;

Python is then used to develop models and score data, leveraging libraries like Pandas, NumPy, and machine learning frameworks such as XGBoost or scikit-learn.

Python Example: Fetching data from SQL and scoring a model.
import pandas as pd
import psycopg2
import xgboost as xgb

conn = psycopg2.connect(
    host="examples.letsql.com",
    user="letsql",
    password="letsql",
    database="letsql")

df = pd.read_sql_query("SELECT * FROM Diamonds;", conn)
df = pd.get_dummies(df, columns=['cut', 'color', 'clarity'])

model = xgb.Booster(model_file='model.xgb')

features = df.drop(columns=['price', 'cut', 'color', 'clarity'])
dmat = xgb.DMatrix(features)

predictions = model.predict(dmat)
df['predicted_price'] = predictions

print(df.head())

Moving between SQL and Python incurs cognitive load, requiring practitioners to be fluent in both languages, manage the data transition between environments and maintain consistency between code and generated data products.

Data Transfer Bottlenecks: Extracting large datasets from SQL databases into Python for processing can introduce significant overhead and inefficiencies, especially with large-scale data. This dual-language approach can lead to fragmented workflows, where inefficiencies and errors can easily creep in due to the disjointed nature of the tools being used. A very common example of this is where SQL query results are saved to disk in an adhoc manner, resulting in hard-coded paths that represent hidden dependencies that represent a constant risk of inconsistency.

2. Code Translation: Optimizing with Numba

To address performance bottlenecks in Python, especially when dealing with large datasets, many teams turn to optimization libraries like Numba.

Numba allows developers to compile Python code into machine code at runtime, significantly speeding up numerical computations. It is often used to optimize critical sections of code, such as model scoring functions, to improve performance.

Numba Example: Optimizing a model scoring function. This is more or less a contrived example but just shows the application of various technologies.
from numba import jit

@jit(nopython=True)
def calculate_volume(x_values, y_values, z_values):
    volume = np.empty(len(x_values))
    for i in range(len(x_values)):
        volume[i] = x_values[i] * y_values[i] * z_values[i]  # Calculate volume as x * y * z
    return volume

df['volume'] = calculate_volume(df['x'].values, df['y'].values, df['z'].values)

While Numba can provide substantial performance improvements, it introduces complexity into the codebase. Developers need to ensure that their code is “Numba-friendly,” which can involve rewriting parts of the code and dealing with Numba-specific limitations.

3. Enabling Parallelism: Leveraging Tools Like Dask

To manage large-scale data processing and distribute workloads across multiple machines, some teams turn to multiprocessing or libraries like dask.

Dask is a flexible library for parallel computing in Python, designed to scale from a single laptop to a cluster of machines. It allows data scientists to work with larger-than-memory datasets and parallelize computations. Here is an example of using Dask to first execute a SQL query from Postgres, then apply one-hot encoding to the categorical columns, and finally score the model using XGBoost.3

import dask
from dask import delayed
import dask.dataframe as dd
import psycopg2
import pandas as pd
import xgboost as xgb
from numba import jit

@jit(nopython=True)
def calculate_volume(x_values, y_values, z_values):
    volume = x_values * y_values * z_values
    return volume

@delayed
def fetch_data():
    conn = psycopg2.connect(
        host="your_postgres_host",
        database="your_database",
        user="your_username",
        password="your_password"
    )
    
    sql_query = "SELECT * FROM Diamonds;"
    return pd.read_sql_query(sql_query, conn)

df = delayed(fetch_data)()
ddf = dd.from_delayed([df])

ddf = dd.get_dummies(ddf, columns=['cut', 'color', 'clarity'])

ddf['volume'] = calculate_volume(ddf['x'].values, ddf['y'].values, ddf['z'].values)

@delayed
def score_model(df, model):
    dmatrix = xgb.DMatrix(df.drop(columns=['price']))
    df['score'] = model.predict(dmatrix)
    return df

model = xgb.Booster(model_file='model.xgb')
scored_df = score_model(ddf.compute(), model)

result = scored_df.compute()

Implementing a Dask-based infrastructure requires setting up and managing a distributed system, which can be complex and resource-intensive. It often necessitates infrastructure engineers who can handle the deployment, scaling and monitoring of these systems. While Dask offers a powerful solution for handling large-scale data processing, it introduces additional layers of complexity that may not be necessary or practical for all teams.

LETSQL’s solution

How it works?

Predict UDF

letsql combines SQL transpilation (via ibis) with a Rust-based execution engine (via datafusion) to provide a seamless experience for scoring XGBoost models. The letsql library comes batteries-included with the UDF for XGBoost model scoring allowing you to score models as a scalar UDF - that operates on each row and returns a single value output - from the DataFrame API. The UDF is built-in i.e. registered with the backend, enabling you to score models directly from within the DataFrame API, without needing to switch between SQL and Python, or requiring code rewriting for performance optimization.

Under the hood, we use gbdt-rs, an open-source library that is optimized for Tree-based scoring. gbdt-rs is able to do this by packing the trees in contiguous memory - using the Vec type in Rust, to increase the locality of the trees. Moreover, the gbdt-rs based UDF works on an Arrow Table instead of scalar values. Hence, we can enable parallelism by executing the same model on many chunks of data. This approach eliminates the need for code translation libraries like numba or cython, providing a streamlined and efficient way to score models directly from the DataFrame API.

Further, since we are able to express the full pipeline from a high-level API, we have an opportunity to push down pre-processing tasks to the SQL source, instead of chaining them in Python. This allows us to leverage the SQL engine’s capabilities for data manipulation, filtering, and aggregation, reducing the data transfer overhead and improving performance.

How to use it?

import letsql as ls

import ibis.selectors as s
from letsql.common.caching import ParquetCacheStorage

con = ls.connect()

pg = ls.postgres.connect_examples()
cache = ParquetCacheStorage(source=con, path="letsql-tmp")

t = pg.table("diamonds").pipe(con.register, "diamonds").cache(storage=cache)
model_path = ls.options.pins.get_path("pricing-model-one-hot")

categorical = t.select(s.of_type("str"))
distinct_categories = categorical.distinct().cache(storage=cache).execute()
onehot = lambda t, cat: t.case().when(cat, 1.0).else_(0.0).end()

predict_xgb = con.register_xgb_model("pricing", model_path)

mapping = {
    feature: {
        cat: lambda t, cat=cat: onehot(t, cat) for cat in distinct_categories[feature]
    }
    for feature in categorical.columns
}

t = (
    t
    .mutate([s.across(s.c(cur_column), mapping[cur_column]) for cur_column in mapping])
    .drop(mapping)
    .rename(lambda c: c.lower().replace(" ", "_"))
    .mutate(predicted_price=predict_xgb.on_expr)
)
1
Connect to the letsql backend.
2
Connect to the examples database in Postgres. This database is public and hosted by our team and contains the diamonds table as described in the appendix.
3
Register the diamonds table in the letsql backend and cache it using the ParquetCacheStorage for faster access.
4
Register the XGBoost model pricing with the backend. The model training code is in Appendix
5
Apply the one-hot encoding to the categorical columns in the diamonds table, rename the columns, and score the model using the built-in UDF for XGBoost.
6
Define the one-hot encoding mapping for each categorical column in the diamonds table. The distinct_categories method takes a database pass by executing a query to get unique values for each column with a resulting dictionary mapping of the form column_name: {unique_value: onehot_function}.

Feature Selector

Another nice feature of letsql is the ability to intelligently filter for the features that are required for the scoring pipeline. This is especially useful when the model is trained on a subset of features and the scoring pipeline needs to be optimized for performance. The feature selector removes the need to manually splat out the exact feature subset and is indifferent to their order.

Feature Selector:

t.mutate(prediction=predict_xgb.on_expr)

Manual Splating:

t.mutate(prediction=predict_xgb(t.carat, t.depth, t.table, t.x, t.y, t.z, t.cut_good, t.cut_ideal, t.cut_premium, t.cut_very_good, t.color_e, t.color_f, t.color_g, t.color_h, t.color_i, t.color_j, t.clarity_if, t.clarity_si1, t.clarity_si2, t.clarity_vs1, t.clarity_vs2, t.clarity_vvs1, t.clarity_vvs2)

Future Work

We are excited about the potential of subsequent features, which include: We plan on extending the built-in UDFs to support more machine learning models, such as Random Forest, LightGBM, and Catboost. We also plan to add support for model versioning and management, as well as pushdowns into the SQL source to reduce data transfer overhead and improve performance. We will also benchmark the performance and scale of the built-in UDFs and further optimize them to improve efficiency. We are excited about the potential of this feature and the impact it can have on the data community.

  1. More Models: We plan to extend the built-in UDFs to support other popular machine learning models, such as Random Forest, LightGBM, and Catboost. This will provide users with a wide range of options for scoring their models directly from the DataFrame API.
  2. Model Versioning: We plan to add support for model versioning and model management to the built-in UDFs. This will allow users to easily manage and track different versions of their models.
  3. Pushdowns into Cache: We plan to push down the predicates to the SQL source to reduce the data transfer overhead and improve performance. This will allow us to leverage the SQL engine’s capabilities for data manipulation, filtering, and aggregation. Constant folding where the column project does not reach the model and is replaced by a constant when the data reaches the model is another technique that has been expolored here.
  4. Benchmarks: We plan to benchmark for performance and scale. This will involve benchmarking at various scale as well as types of models, varying the depth, number od trees to make a choice of the libraries to use to score the XGBoost models.
  5. Further Optimization: We plan to further optimize the built-in UDFs to improve performance and efficiency. One common optimization is that we can introspect the model output to only include the features that are being used in the trees by only piping in the used features.This will involve exploring various optimization techniques and strategies to make the scoring process faster and more scalable.

Conclusion

In this post, we previewed a new feature with built-in UDFs for scoring with XGBoost models. We introduce the idea of native execution by registering the UDFs with our Rust-based execution engine. We show how this feature can streamline the scoring process, eliminating the need for code translation libraries and enabling data practitioners to score models directly from the DataFrame API. We also discuss the potential for pushing down preprocessing tasks to the SQL source, reducing data transfer overhead and improving performance. We believe that this feature has the potential to make scoring more accessible and efficient for data practitioners, enabling them to focus on building and deploying models without getting bogged down by the complexities of pipeline orchestration.

Get Involved

You can get involved by engaging with us on GitHub or chatting with us on Zulip. We are looking for feedback on the built-in UDFs and how they can be improved to better serve your needs. We are also looking for contributors who are interested in helping us build out the ecosystem around the built-in UDFs. We are excited about the potential of this feature and the impact it can have on the data community.

Appendix

Diamonds Dataset

This classical dataset contains physical attributes and prices for 53,940 diamonds. The dataset includes the following columns:

  • carat: weight of the diamond (0.2–5.01)
  • cut: quality of the cut (Fair, Good, Very Good, Premium, Ideal)
  • color: diamond color, from J (worst) to D (best)
  • clarity: a measurement of how clear the diamond is (I1 (worst), SI2, SI1, VS2, VS1, VVS2, VVS1, IF (best))
  • depth: total depth percentage = z / mean(x, y) = 2 * z / (x + y) (43–79)
  • table: width of top of diamond relative to widest point (43–95)
  • price: price in US dollars ($326–$18,823)
  • x: length in mm (0–10.74)
  • y: width in mm (0–58.9)
  • z: depth in mm (0–31.8)

The source of the dataset is from the ggplot2 R package and can be downloaded from its homepage

Model Training Code

Code
import pandas as pd
import xgboost as xgb

from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error

diamonds = pd.read_csv("ci/ibis-testing-data/csv/diamonds.csv")

diamonds = diamonds.dropna()

X = diamonds.drop(columns=['price'])
y = diamonds['price']
X_encoded = pd.get_dummies(X, drop_first=True)
X_encoded.columns = [col.replace(' ', '_').lower() for col in X_encoded.columns]
X_train, X_test, y_train, y_test = train_test_split(X_encoded, y, test_size=0.2, random_state=42)

dtrain = xgb.DMatrix(X_train, label=y_train)
dtest = xgb.DMatrix(X_test, label=y_test)

params = {
    'objective': 'reg:squarederror',
    'eval_metric': 'rmse',
    'max_depth': 6,
    'learning_rate': 0.1,
}

bst = xgb.train(params, dtrain, num_boost_round=100)

y_pred = bst.predict(dtest)
rmse = mean_squared_error(y_test, y_pred, squared=False)
print(f"RMSE: {rmse}")

bst.save_model("pricing-model.json")

Footnotes

  1. We use the word inference and scoring interchangeably and they mean the same thing↩︎

  2. Modin, BigQuery DataFrames, Snowpark, and Ibis are some of the libraries that provide a high-level API for connecting to SQL sources from within Python. Modin and BigQuery DataFrames strive to support the expansive pandas API in their respective ecosystems e.g. Snowflake and Google Cloud. Ibis provides a pandas-like API, heavily inspired by dplyr, a mature tried and tested Grammar in the R community, and prioritizes SQL backends.

    Architecturally, the world that we want to build for is not just SQL. It is a multi-engine world where data practitioners connect to SQL sources, complex processing in Python, data-interop between ML frameworks and even file systems from a relational DSL. Multi-engine implies multi-modality of data and the way to express. This is what we are building with letsql.↩︎

  3. In the script, we are just using a simplified SQL query and not pushing down the one-hot encoding operation. If we did, we will replace the SQL string with the long string in the section 1. above.↩︎