TL;DR
Are you frustrated that Trino doesn’t support AsOf joins but you don’t want to migrate your data? We have a solution that composes the best of multiple query engines. By leveraging Trino’s enterprise features, DuckDB’s AsOf join capabilities, and efficiently connecting them to PyTorch using DataFusion, we provide a powerful multi-engine setup.
To showcase this approach, we demonstrate training an LSTM model to analyze sensor data using NASA’s Avionics dataset. Our LetSQL tooling makes it easy to compose these engines together for maximum flexibility and performance. Check out the LetSQL repo on GitHub to learn more: https://github.com/letsql/letsql
With LetSQL, you can have your cake and eat it too - keeping Trino while adding missing join and ML capabilities.
Introduction
No single query engine has it all figured out
One common multi-engine data flow usecase is when a query engine you’re already using does not support features that you suddenly need. For example, this could be a database operator that may be missing or a requirement to interface with Python for Machine Learning. We came across this recently at one of our customers, who had a large Trino implementation in place, but required AsOf join processing, which Trino does not for feeding data into PyTorch training and inference utilities.
In this blog post we show how to solve this problem in a performant way — without migrating off of Trino — using multi-engine composition. This is in part enabled by Apache Arrow emerging as the de-facto standard for data transfer and in-part enabled by rise of performant in-process engines like DuckDB and DataFusion. All these query engines offer something unique and understanding how to augment your data warehouse capabilities is paramount.
As of when?
AsOf joins make time-series data science and ML engineering easier by providing a declarative workflow to filter out the data that should not be used in building ML models. This guards against data leakage, a common pitfall for building ML models (look-ahead bias).
Here are a few reasons why AsOf joins are critical in time-series ML:
- Time Alignment: AsOf joins help match records based on the nearest previous timestamp, ensuring you’re only using data that would have been available at prediction time, thus preventing data leakage, all while providing this functionality in a declarative manner.
- Handling Different Sampling Rates: Various data sources often have different update frequencies. Having AsOf join allows storing the data compactly and “fully normalized” with per-sample-rate tables while also easily getting the denormalized representation necessary for modeling.
Peeking ahead
One real-world use-case is in IoT sensor data. As part of our design partnership with Team Raft, a leading federal consultancy, we got a chance to implement this augmentation with Trino (lacks AsOf joins, check #21759 for details and current status) and DuckDB, which has performant implementation of AsOf joins. To show a similar capability on a public dataset, we used the NASA Avionics dataset built on sensor data information to build an LSTM model using PyTorch.
Use-Case Highlights:
- Declarative specification of how to combine data
- (by way of ibis) Single expression works with multiple engine
- Portable, pandas-first UDxFs for complex workflows
Combining all of this leads us into multi-engine composition, leveraging each engine and library’s capabilities to its fullest.
Multi-Engine Queries: Fully Generalizable Solution
Our approach
We believe that SQL is much better suited as a lower level language and should be compiled via high-level DSLs1. We needed a better abstraction on top of SQL i.e. Python-native and capable of generating the right SQL for each backend. Ibis is one such project with 20+ backends abstracted away with a pandas-like (but better!) DSL. However, Ibis can only work with one backend at a time and does not support multi-engine from the get-go. At LetSQL, we added the goodies required to enable this workflow, starting with the into_backend operator. Let’s dive into the details.
1. into_backend
operator
In order to make sure that the Arrow RecordBatch stream coming from DuckDB or Trino does not get exhausted on first read, we implemented a Tee’ing mechanism that ensures that the same RecordBatch stream can be reused across as many engines as we want. This is fully deferred .i.e. work doesn’t happen until we call ls.execute
. Here is small example of what into_backend
looks like:
import functools
import letsql as ls
from letsql.expr.relations import into_backend
@functools.cache
def get_trino_t():
return (
connect(database="tpch", schema="sf1")
ls.trino."orders")
.table("orderdate": "date"})
.cast({
)
= ls.duckdb.connect()
db_con = (
df 10_000)
get_trino_t().head(# multi-engine operator
.pipe(into_backend, db_con)
.pipe(ls.execute) )
2. UD(X)F machinery
Types of UD(X)F:
- Scalar UDFs: Feature engineering, model inference
- Aggregate UDFs (UDAF): Model training
- Window UDFs (UDWF): Time-series predictions, incremental training
The key insight here was that we can use User-Defined Functions (scalar and aggregate variants) to do ML training and inference. For example, we can use the standard aggregation mechanism to apply a UDAF whose output is the trained model.
The DataFusion engine provides a great way to stream RecordBatches to the UDFs and UDAFs defined in the Python land. In the code snippet below (replicated from our previous blog post Using DataFusion’s UDAFs to do ML Training), instead of returning a trained model, curried_calc_best_features
trains a model and extracts the name and relative score for top features in each group evaluated. We did this in order to show characteristics of the trained model, and simply not just a model binary object.
= udf.agg.pandas_df(
agg_udf
curried_calc_best_features,
t[cols].schema(),
ibis_output_type,="calc_best_features",
name )
3. Expression segmentation
Our operators construct a high-level expression that is segmented, with each piece targeting the engine that we need.
On execution, we compile each single-backend expression for the target backend’s SQL dialect 2 and coordinate sending the record batches between engines.
In the future, we can add planning and optimization to our segmented execution logic, identifying opportunities to push-down filter projections and predicates into the engines.
Hidden Challenges:
- Memory vs durability trade-off
- Connection management
- Push down into UDXFs
- Per engine peculiarities (e.g. streaming record batches into Postgres)
Real World Example With NASA Data
💡 Check out the working example here: https://github.com/letsql/nasa_avionics_data_ml leveraging data from NASA DashLink.
We’ll work with NASA’s Avionics dataset to demonstrate these concepts in action, which provides rich sensor data perfect for time-series analysis and machine learning. This dataset is publicly available through NASA’s DASHlink offering.
Data processing pipeline
AsOf join implementation
The first challenge we tackle is implementing AsOf joins for flight data streams coming in at different rates. The asof_join_flight_data
function handles this by:
- Extracting the data on a per-data-rate basis to Parquet for efficient representation
- Streaming data from source-engine to DuckDB for performing the AsOf joins (here, we substitute our LetSQL connection in place of Trino)
- Joining multiple time series data streams while maintaining temporal alignment
def asof_join_flight_data(flight_data, airborne_only=True):
"""Create an expression for a particular flight's data """
= make_rate_to_parquet(flight_data)
rate_to_parquet = ls.connect() # substituted for Trino
con = (
ts
deferred_read_parquet(con, parquet_path, ZD.rate_to_rate_str(rate))=ls.literal(flight_data.flight))
.mutate(flightfor rate, parquet_path in sorted(
rate_to_parquet.items(),=operator.itemgetter(0),
key=True,
reverse
)
)= ls.duckdb.connect()
db_con *others) = (
(expr, =f"flight-{flight_data.flight}-{t.op().parent.name}")
into_backend(t, db_con, namefor t in ts
)for other in others:
= expr.AsOf_join(other, on="time").drop(["time_right", "flight_right"])
expr if airborne_only:
# airborne when GS (ground speed) != 0
= expr[lambda t: t.GS != 0]
expr return expr
Machine Learning integration
PyTorch LSTM training
We implement a User-Defined Aggregate Function (UDAF) for training LSTM models on the joined flight data.
def make_training_udaf(schema, return_type, config, scaleX, scaleT):
"""Create a udaf for running training of a particular model """
if set(schema) != set(scaleX.feature_names_in_).union(scaleT.feature_names_in_):
raise ValueError
= config.get_model()
model = torch.nn.MSELoss()
loss_func = torch.optim.Adam(model.parameters(), lr=config.l_rate)
opt = []
error_trace = np.float32
astype
= agg.pandas_df(
training_udaf =schema,
schema=train_batch(
fn=config,
config=scaleX,
scaleX=scaleT,
scaleT=model,
model=loss_func,
loss_func=opt,
opt=error_trace,
error_trace=astype,
astype=return_type,
return_type
),=return_type,
return_type="train_batch",
name
)return training_udaf, model, loss_func, opt, error_trace
The UDAF implementation includes:
- Model initialization and configuration
- Loss function and optimizer setup
- Data scaling and preprocessing
- Metrics tracking across training batches
Model inference
For inference, we implement a User-Defined Window Function (UDWF) that allows us to make predictions on new flight data:
def make_inference_udwf(schema, return_type, model, seq_length, scaleX, scaleT):
"""Create a udwf for running inference of a particular model """
@pyarrow_udwf(
=schema,
schema=ibis.dtype(return_type),
return_type
)def inference_udwf(self, values, num_rows):
return predict_flight(
values,
model,
seq_length,
scaleX,
scaleT,
return_type,
)return inference_udwf
The UDWF:
- Takes preprocessed flight data as input
- Applies the trained model to make predictions
- Handles data scaling and sequence formatting
- Returns predictions in the specified format
Putting it all together
Here is the above functionality working together:
import itertools
import pickle
import warnings
import ibis
import pyarrow as pa
import torch
from codetiming import Timer
import letsql as ls
import nasa_avionics_data_ml.zip_data as ZD
from nasa_avionics_data_ml.lib import (
Config,
read_scales,
) from nasa_avionics_data_ml.letsql_udwf_inference import (
make_inference_udwf,
) from nasa_avionics_data_ml.letsql_udaf_training import (
make_training_udaf,
union_cached_asof_joined_flight_data,
splat_struct,
return_type,
)
= "Tail_652_1"
tail = 64
n_training_flights = 8
seq_length = ("time", "flight")
(order_by, group_by)
*_) = Config.get_debug_configs()
(config, = read_scales()
(scaleX, scaleT) *rest) = make_training_udaf(
(training_udaf, model, float for name in (*config.x_names, *config.t_names)}),
ibis.schema({name:
return_type,
config,
scaleX,
scaleT,
)= ZD.TailData.ensure_zip(tail + ".zip")
tail_data = (*training_flight_datas, inference_flight_data) = tuple(itertools.islice(
all_flight_datas
tail_data.gen_parquet_exists(),+1,
n_training_flights
))for flight_data in all_flight_datas:
flight_data.ensure_rate_parquets()= union_cached_asof_joined_flight_data(*training_flight_datas)
training_data_expr = union_cached_asof_joined_flight_data(inference_flight_data)
inference_data_expr
= (
model_expr
training_data_exprlambda t, col="col": (
.pipe(
t
.group_by(group_by)**{col: training_udaf.on_expr(t)})
.agg(
.pipe(splat_struct, col)
))
)if not any(p for storage in training_data_expr.ls.storages for p in storage.path.iterdir()):
with Timer("training_data_expr caching"):
with warnings.catch_warnings():
"ignore")
warnings.simplefilter(print(f"row count is {ls.execute(training_data_expr.count())}")
with Timer("training_data_expr cached read"):
with warnings.catch_warnings():
"ignore")
warnings.simplefilter(print(f"row count is {ls.execute(training_data_expr.count())}")
with Timer("model training"):
with warnings.catch_warnings():
"ignore")
warnings.simplefilter(= ls.execute(model_expr)
training_df = torch.device("cpu")
device = pickle.loads(training_df.sort_values("iter_idx").model.iloc[-1]).to(device)
model = device
model.device
= make_inference_udwf(
inference_udwf float for name in config.x_names}),
ibis.schema({name:
pa.float64(),
model, seq_length, scaleX, scaleT,
)= ibis.window(
window =config.seq_length-1,
preceding=0,
following=order_by,
order_by=group_by,
group_by
)= (
inference_expr
inference_data_expr=inference_udwf.on_expr(inference_data_expr).over(window))
.mutate(predicted
)with Timer("model training"):
with warnings.catch_warnings():
"ignore")
warnings.simplefilter(= ls.execute(inference_expr) inference_df
Key implementation details
Here are the key points to note from the implementation:
- Data flow
- Raw sensor data is first processed and converted to Parquet format
- Data streams with different sampling rates are aligned using AsOf joins
- Processed data is fed into the LSTM model for training
- Model architecture
- Uses PyTorch LSTM for sequence prediction
- Handles variable-length input sequences
- Incorporates proper scaling and normalization
- Performance considerations
- Leverages DuckDB’s efficient AsOf join implementation rather than manual / custom SQL inside Trino
- Uses batch processing for training
- Maintains data locality through proper engine selection
Conclusion
As we’ve seen, the future of data infrastructure isn’t about picking the perfect monolithic solution – it’s about intelligent composition. By leveraging Apache Arrow as our zero-copy foundation and embracing multi-engine queries, we can tap into specialized capabilities like DuckDB’s AsOf joins while preserving enterprise features like Trino’s security and authentication. This architectural pattern unlocks new possibilities without requiring massive migrations or compromises. The NASA Avionics example demonstrates how this approach moves beyond theory into practical application. Whether you’re dealing with time-series ML, sensor data, or other specialized workloads, seamlessly flowing between engines opens up powerful new patterns for data processing. As data infrastructure continues to evolve, the winners will not be those who build the biggest engines but those who master the art of composition.
Key takeaways:
- High-level Python expressions emit lower-level SQL
- Multi-engine queries have tangible benefits with specialized per-engine capabilities
- Zero-copy data exchange via Arrow makes engine-composition performant
- The future is about composition, not monoliths
Special Acknowledgments
Big thanks to the Team Raft crew for being great partners and helping push multi-engine future forward! And, to Tom Booth for the inspiration and doing the base repo for data processing; https://github.com/boothtm/nasa_avionics_data_ml
If you’re dealing with similar challenges in the Federal and Avionics space, definitely check out what they’re doing at teamraft.com.
References
- The Original Paper: A Gradient Descent Multi-Algorithm Grid Search Optimization of Deep Learning for Sensor Fusion
- Trino - Distributed, Cloud-Native Query Engine
- DuckDB - Fast In-process OLAP Engine with AsOf Joins
- Ibis Project - the underlying deferred execution engine supporting 20+ backends
- Apache Arrow
- Apache DataFusion
Footnotes
Analogy: SQL is akin to Assembly in many ways. In the past, developers had to meticulously craft Assembly code for each Instruction Set Architecture (ISA). However, as compiler infrastructure evolved, it became possible to easily generate Assembly without delving into the intricate details. Similarly, we believe that SQL is undergoing a transformation, becoming more like Assembly as we develop higher-level interfaces. These abstractions will progressively reduce the need for developers to handcraft SQL queries for each specific database engine, just as compilers have done for Assembly. By providing a layer of abstraction, developers can focus on the logic and semantics of their data operations, while the underlying system handles the translation to optimized SQL for the target engine. This shift will make working with data more efficient and accessible to a wider range of developers.↩︎
Ibis is what makes that happen under the hood, which is built on SQLGlot library↩︎