= pd.read_parquet("landing/orders.parquet")
df = df.drop_duplicates(["order_id", "dt"]) deduped
TL;DR
I recently came across the Ju Data Engineering Newsletter by Julien Hurault on the multi-engine data stack. The idea is simple; we’d like to easily port our code across any backend while retaining the flexibility to grow our pipeline as new backends and features are developed. This entails in at least the following high-level workflows:
- Offloading part of a SQL query to serverless engines with DuckDB, polars, DataFusion, chdb etc.
- Right-size pipeline for various development and deployment scenarios. For example, developers can work locally and ship to production with confidence.
- Apply database style optimizations to your pipelines automatically.
In this post, we dive into how we can implement the multi-engine pipeline from a programming language; Instead of SQL, we use propose using a Dataframe API that can be used for both interactive and batch use-cases. Specifically, we show how to break up our pipeline into smaller pieces and execute them across DuckDB, pandas, and Snowflake. We also discuss the advantages of a multi-engine data stack and highlight emerging trends in the field.
The code implemented in this post is available on GitHub1. The reference work in the newsletter with original implementation is here.
Overview
Multi-engine data stack pipeline works as follows: Some data lands in an S3 bucket, gets preprocessed to remove any duplicates and then loaded into a Snowflake table, where it is transformed further with ML or Snowflake specific functions2. The pipeline takes orders as parquet files that get saved into landing
location, are preprocessed and then stored at the staging
location in an S3 bucket. The staging data is then loaded in Snowflake to connect downstream BI tools to it. The pipeline is tied together by SQL dbt
with one model for each backend and the newsletter chooses Dagster as the orchestration tool.
Today, we are going to dive into how we can convert our pandas code to Ibis expressions, reproducing the complete example for Julien Hurault’s multi engine stack example 3. Instead of using dbt
Models and SQL, we use ibis
and some Python to compile and orchestrate SQL engines from a shell. By rewriting our code as Ibis expressions, we can declaratively build our data pipelines with deferred execution. Moreover, Ibis supports over 20 backends, so we can write code once and port our ibis.expr
s to multiple backends. To further simplify, we leave scheduling and task orchestration4 provided by Dagster, up to the reader.
Core Concept of Multi-Engine Data Stack
Here are the core concepts of the multi-engine data stack as outlined in Julien’s newsletter:
- Multi-Engine Data Stack: The concept involves combining different data engines like Snowflake, Spark, DuckDB, and BigQuery. This approach aims to reduce costs, limit vendor lock-in, and increase flexibility. Julien mentions that for certain benchmark queries, using DuckDB could achieve a significant cost reduction compared to Snowflake.
- Development of a Cross-Engine Query Layer: The newsletter highlights advancements in technology that allow data teams to transpile their SQL or Dataframe code from one engine to another seamlessly. This development is crucial for maintaining efficiency across different engines.
- Use of Apache Iceberg and Alternatives: While Apache Iceberg is seen as a potential unified storage layer, its integration is not yet mature to be used in a
dbt
project. Instead, Julien has opted to use Parquet files stored in S3, accessed by both DuckDB and Snowflake, in his Proof of Concept (PoC). - Orchestration and Engines in PoC: For the project, Julien used Dagster as the orchestrator, which simplifies the job scheduling of different engines within a
dbt
project. The engines combined in this PoC were DuckDB and Snowflake.
Why DataFrames and Ibis?
While the pipeline above is nice for ETL and ELT, sometimes we want the power of a full programming language instead of a Query Language like SQL e.g. debugging, testing, complex UDFs etc. For scientific exploration, interactive computing is essential as data scientists need to quickly iterate on their code, visualize the results, and make decisions based on the data.
DataFrames are such a data structure: DataFrames are used to process ordered data and apply compute operations on it in an interactive manner. They provide the flexibility to be able to process large data with SQL style operations, but also provides lower level control to edit cell level changes ala Excel Sheets. Typically, the expectation is that all data is processed in-memory and typically fits in-memory. Moreover, DataFrames make it easy to go back and forth between deferred/batch and interactive modes.
DataFrames excel5 at enabling folks to apply user-defined functions and releases a user from the limitations of SQL i.e. You can now re-use code, test your operations, easily extend relational machinery for complex operations. DataFrames also make it easy to quickly go from Tabular representation of data into Arrays and Tensors expected by Machine Learning libraries.
Specialized and in-process databases e.g. DuckDB for OLAP6, are blurring the boundary between a remote heavy weight database like Snowflake and an ergonomic library like pandas. We believe this is an opportunity for allowing DataFrames to process larger than memory data while maintaining the interactivity expectations and developer feel of a local Python shell, making larger than memory data feel small.
Technical Deep Dive
Our implementation focuses on the 4 concepts presented earlier:
- Multi-Engine Data Stack: We will use DuckDB, pandas, and Snowflake as our engines.
- Cross-Engine Query Layer: We will use Ibis to write our expressions and compile them to run on DuckDB, pandas, and Snowflake.
- Apache Iceberg and Alternatives: We will use Parquet files stored locally as our storage layer with the expectation that its trivial to extend to S3 using
s3fs
package. - Orchestration and Engines in PoC: We will focus on fine-grained scheduling for engines and leave orchestration to the reader. Fine-grained scheduling is more aligned with Ray, Dask, PySpark as compared to orchestration frameworks e.g. Dagster, Airflow etc.
Implementing with pandas
pandas is the quintessential DataFrame library and perhaps provides the simplest way to implement the above workflow. First, we generate random data borrowing from the implementation in the newsletter.
The pandas implementation is imperative in style and is designed so the data that can fit in memory. The pandas API is hard to compile down to SQL with all its nuances and largely sits in its own special place bringing together Python visualization, plotting, machine learning, AI and complex processing libraries.
pt.write_pandas(
conn,
deduped,="T_ORDERS",
table_name=True,
auto_create_table=False,
quote_identifiers="temporary"
table_type )
After de-duplicating using pandas operators, we are ready to send the data to Snowflake. Snowflake has a method called write_pandas
that comes in handy for our use-case.
Implementing with Ibis
aka Ibisify
One pandas limitation is that it has its own API that does not quite map back to relational algebra. Ibis is such a library that’s literally built by people who built pandas to provide a sane expressions system that can be mapped back to multiple SQL backends. Ibis takes inspiration from the dplyr R package to build a new expression system that can easily map back to relational algebra and thus compile to SQL. It also is declarative in style, enabling us to apply database style optimizations on the complete logical plan or the expression. Ibis is a key component for enabling composability as highlighted in the excellent composable codex.
= (
expr
ibis.read_parquet(p_landing)
.mutate(=ibis.row_number().over(group_by=[_.order_id], order_by=[_.dt]))
row_numberfilter(_.row_number == 0)
.
.checkpoint_parquet(p_staging)"T_ORDERS")
.create_table_snowflake(
) expr
r0 := DatabaseTable: ibis_read_parquet_czhemmj76faelb4rib4bxbt3u4 user_id int64 dt timestamp order_id string quantity int64 purchase_price float64 sku string r1 := Project[r0] user_id: r0.user_id dt: r0.dt order_id: r0.order_id quantity: r0.quantity purchase_price: r0.purchase_price sku: r0.sku row_number: WindowFunction(func=RowNumber(), how='rows', group_by=[r0.order_id], order_by=[asc r0.dt]) r2 := Filter[r1] r1.row_number == 0 r3 := CheckpointParquet[r2] path: staging/staging.parquet schema: user_id int64 dt timestamp order_id string quantity int64 purchase_price float64 sku string row_number int64 CreateTableSnowflake[r3] table: T_ORDERS schema: user_id int64 dt timestamp order_id string quantity int64 purchase_price float64 sku string row_number int64
Ibis expression prints itself as a plan that is akin to traditional Logical Plan in databases. A Logical Plan is a tree of relational algebra operators that describes the computation that needs to be performed. This plan is then optimized by the query optimizer and converted into a physical plan that is executed by the query executor. Ibis expressions are similar to Logical Plans in that they describe the computation that needs to be performed, but they are not executed immediately. Instead, they are compiled into SQL and executed on the backend when needed. Logical Plan is generally at a higher level of granularity than a DAG produced by a task scheduling framework like Dask. In theory, this plan could be compiled down to Dask’s DAG.
While pandas is embedded and is just a pip install away, it still has much documented limitations with plenty of performance improvements left on the table. This is where the recent embedded databases like DuckDB fill the gap of packing the full punch of a SQL engine, with all of its optimizations and benefiting from years of research that is as easy to import as is pandas. In this world, at minimum we can delegate all relational and SQL parts of our pipeline in pandas to DuckDB and only get the processed data ready for complex user defined Python.
Now, we are ready to take our Ibisified code and compile our expression above to execute on arbitrary engines, to truly realize the write-once-run-anywhere paradigm: We have successfully decoupled our compute engine with the expression system describing our computation.
Multi-Engine Data Stack w/ Ibis
DuckDB + pandas + Snowflake
Let’s break our expression above into smaller parts and have them run across DuckDB, pandas and Snowflake. Note that we are not doing anything once the data lands in Snowflake and just show that we can select the data. Instead, we are leaving that up to the user’s imagination what is possible with Snowflake native features.
Notice our expression above is bound to the pandas backend. First, lets create an UnboundTable expression to not have to depend on a backend when writing our expressions.
= {
schema "user_id": "int64",
"dt": "timestamp",
"order_id": "string",
"quantity": "int64",
"purchase_price": "float64",
"sku": "string",
"row_number": "int64",
}
= (
first_expr_for ="orders")
ibis.table(schema, name
.mutate(=ibis.row_number().over(group_by=[_.order_id], order_by=[_.dt])
row_number
)filter(_.row_number == 0)
.
) first_expr_for
r0 := UnboundTable: orders user_id int64 dt timestamp order_id string quantity int64 purchase_price float64 sku string row_number int64 r1 := Project[r0] user_id: r0.user_id dt: r0.dt order_id: r0.order_id quantity: r0.quantity purchase_price: r0.purchase_price sku: r0.sku row_number: WindowFunction(func=RowNumber(), how='rows', group_by=[r0.order_id], order_by=[asc r0.dt]) Filter[r1] r1.row_number == 0
Next, we replace the UnboundTable expression with the DuckDB backend and execute it with to_parquet
method7. This step is covered by the checkpoint_parquet
operator that we added to pandas backend above. Here is an excellent blog that discusses inserting data into Snowflake from any Ibis backend with to_pyarrow
functionality.
= pd.read_parquet("landing/orders.parquet")
data = ibis.duckdb.connect()
duck_backend "CREATE TABLE orders as SELECT * from data")
duck_backend.con.execute(
= replace_unbound(first_expr_for, duck_backend)
bind_to_duckdb
bind_to_duckdb.to_parquet(p_staging)= ibis.to_sql(bind_to_duckdb)
to_sql print(to_sql)
SELECT
"t1"."user_id",
"t1"."dt",
"t1"."order_id",
"t1"."quantity",
"t1"."purchase_price",
"t1"."sku",
"t1"."row_number"
FROM (
SELECT
"t0"."user_id",
"t0"."dt",
"t0"."order_id",
"t0"."quantity",
"t0"."purchase_price",
"t0"."sku",
ROW_NUMBER() OVER (PARTITION BY "t0"."order_id" ORDER BY "t0"."dt" ASC ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) - CAST(1 AS TINYINT) AS "row_number"
FROM "orders" AS "t0"
) AS "t1"
WHERE
"t1"."row_number" = CAST(0 AS TINYINT)
Once the above step creates the de-duplicated table, we can then send data to Snowflake using the pandas backend. This functionality is covered by create_table_snowflake
operator that we added to pandas backend above.
= ibis.table(schema, name="T_ORDERS") # nothing special just a reading the data from orders table
second_expr_for "T_ORDERS", schema=second_expr_for.schema(), temp=True)
snow_backend.create_table(= ibis.pandas.connect({"T_ORDERS": pd.read_parquet(p_staging)})
pandas_backend "T_ORDERS", pandas_backend.to_pyarrow(second_expr_for)) snow_backend.insert(
Finally, we can select the data from the Snowflake table to verify that the data has been loaded successfully.
= ibis.table(schema, name="T_ORDERS") # add you Snowflake ML functions here
third_expr_for third_expr_for
UnboundTable: T_ORDERS user_id int64 dt timestamp order_id string quantity int64 purchase_price float64 sku string row_number int64
We successfully broke up our computation in pieces, albeit manually, and executed them across DuckDB, pandas, and Snowflake. This demonstrates the flexibility and power of a multi-engine data stack, allowing users to leverage the strengths of different engines to optimize their data processing pipelines.
Acknowledgments
I’d like to thank Neal Richardson, Dan Lovell and Daniel Mesejo for providing the initial feedback on the post. I highly appreciate the early review and encouragement by Wes McKinney.
Resources
- The Road to Composable Data Systems
- The Composable Codex
- Apache Arrow
- Multi-Engine Data Stack Newsleter v0 v1
- Ibis, the portable dataframe library
- dbt Docs
- Dagster Docs
- LanceDB
- KuzuDB
- DuckDB
Footnotes
In order to quickly try out repo, I also provide a nix flake↩︎
Please note we do not go into implementing the types of things that might be possible in Snowflake and assume that as a requirement for the workflow↩︎
In this post, we have primarily focused on v0 of the multi-engine data stack. In the latest version, Apache Iceberg is included as a storage and data format layer. NYC Taxi data is used instead of the random Orders data treated in this and v0 of the posts.↩︎
Orchestration Vs fine-grained scheduling: - The orchestration is left to the reader. The orchestration can be done using a tool like Dagster, Prefect, or Apache Airflow. - The fine-grained scheduling can be done using a tool like Dask, Ray, or Spark.↩︎
no pun intended↩︎
Some of the examples of in-process databases is described in this post extending DuckDB example above to newer purpose built databases like LanceDB and KuzuDB.↩︎
The Ibis docs use
backend.to_pandas(expr)
commands to bind and run the expression in the same go. Instead, we usereplace_unbound
method to show a generic way to just compile the expression and not execute it to said backend. This is just for illustration purposes. All the code below, uses thebackend.to_pyarrow
methods from here on.↩︎