Caching++ for DataFrames

Introducing letsql caching feature for upstream source data. This feature allows you to cache the results of a SQL query in a DataFrame for rapid iteration.

preview
Authors

Hussain Sultan

Dan Lovell

Published

August 10, 2024

Modified

August 11, 2024

TL;DR

LETSQL is a high-level and declarative data processing library focused on multi-engine orchestration. Here we present an experimental caching feature in letsql for sub-expressions representing an upstream query. This feature allows you to cache the results of a SQL query in the upstream engine or local disk for rapid iteration and is available in beta. Moreoever, we are actively working to improve the performance and usability of this feature and develop a stable API. Your feedback is very welcome.

Drop into an interactive IPython shell with the experimental feature simply by running:

nix run github:letsql/letsql

Introduction

Warning

Please note that this is an experimental feature. The API is unstable and is expected to change.

At LETSQL, we are introducing an experimental feature that allows you to cache the results of an upstream engine, while using the high-level, familiar DataFrame API. Under the hood, the DataFrame expression is compiled down to the source engine (e.g. SQL for duckdb and datafusion or supported remote backends) and the results are cached as a table in either a remote engine, a local in-memory engine, or as Parquet files on local disk.
Here is what it looks like:

Why it matters

Often times, Data Scientists need to cache data to a local disk instead of pulling the data from its source over and over again or re-computing on every invocation. The iterative nature of building machine learning models makes caching an integral part of the workflow. Caching makes iterating on the ML models faster, reducing developer cognitive load and reducing stress on the source systems or network bandwidth. Usually, this is done manually, possibly in a different language from the model development process (often in SQL), resulting in disjointed workflows. This inherent break between source data and model development can be painful when time comes to finally test or deploy the model using the refreshed source instead of the local cache that we manually built. It can also make deployment of models a mystery hunt through the source for all possible model inputs that live on the modeler’s workstation.

How people solve for it today?

SQL as Strings, SQL as ORM

In our experience working in the enterprise, most data scientists and engineers use raw sql strings and stick them in their pd.from_sql or ibis.sql calls. While not popular with Data Scientists, sqlalchemy is a slightly better solution as it allows you to build the query in a more programmatic way. SQLAlchemy is geared towards developers for transactional use-cases and not very intuitive for representing analytical operations. However, this is still a manual process and does not provide a way to cache the results.

Manual Caching

Besides being hard to read, maintain and test, in order to cache the source data on local disk or object storage is a highly manual process relying on either pickling, saving to parquet or using an in-memory cache like functools.cache, cachey or similar.

import pandas as pd

df = pd.read_sql('SELECT * FROM table', 'sqlite:///data.db')
df.to_parquet('data.parquet')
# or
df.to_pickle('data.pkl')

While this works, it is not a scalable solution. Parquet may be more portable than pickle but it is still a manual process i.e the data scientist has to manually update the cache every time the source data changes. And since, we are mixing query langauge (SQL) and programming langue (Python) there is no guarantee that your subsequent Python process will run when you refresh the cache, since the source data may have changed resulting in a disjointed workflow.

In-Memory

functools.cache usage is straightforward and though it requires arguments to be hashable is applicable here because our argument is a string. Cachey offers a better solution that has a notion of cost of a computation, but both of these solutions are only useful for in-memory persistence. What if we could transparently add a cache operator anywhere of our choosing in our computation expression and have the cache both persist and be automatically invalidated when the source data changes?

How it works

Caching SQL Source

Caching SQL Source

The caching operator should be able to cache any ibis.expr, not just the final result. This is because the expression can be a complex graph of operations, parts of which can be expensive to recompute but change infrequently and (hopefully downstream) parts of which might be cheap to compute but change frequently. Additionally, we need to be able to specify how the cache is invalidated. Maybe the data changes, but infrequently enough that the developer is minimally impacted (consider modelling on data with nightly updates). In order to do the above, we want to simply chain our expression with the cache operator, and it should do the right thing.

In the latest main branch on GitHub (4ae449dc as of this writing), we added a cache operator. This operator can be chained anywhere in the ibis.expr and takes storage as an input.

Types of Cache Storage
SourceStorage Snapshot Storage
Cache Invalidation yes no
Persistence Source dependent; use ParquetCacheStorage to ensure durable persistence to disk Source dependent

We have two main types of Storage: SourceStorage, SnapshotStorage. The source input can be a remote (snowflake, postgres) or in-process (pandas, duckdb, datafusion) backend. If the source used for the cache is in-memory, the cache is in-memory e.g. pandas. Moreover, SourceStorage will invalidate the hash when the upstream data changes - depending on the source above - while SnapshotStorage will not. More on this later.

Similar to SourceStorage, ParquetCacheStorage caches the results of the SQL query in Parquet files on the local disk using the source backend to do the writing.

import letsql as ls

con = ls.connect()

from letsql.common.caching import SourceStorage
from letsql.common.utils.snowflake_utils import make_connection

snow_backend = make_connection(database=..., schema=..., warehosue=...)
cache = SourceStorage(source=con) # datafusion backend from letsql

t = (
  snow_backend.table('table')
  .cache(storage=cache)
  .mutate(new_column=lambda x: x['column'] + 1)
)

t.execute()

The snow_backend.table('table').cache(storage=cache) will cache the results of the table query in the SourceStorage cache and subsequent computation will read from the cache rather than making a request to snowflake.

Hashing Strategies

Our strategy for hashing the data depends on where the data is stored. For in-memory sources (currently pandas is always in-memory), the schema and the in-memory bytes of the data are hashed. For duckdb and datafusion backends, if the data is on disk or in an object store, the query plan (capturing the source file name) as well as the schema is hashed. For remote backends i.e. Snowflake and Postgres, the name, source, schema, namespace and optionally last modified date of the table is hashed.

Hashing Strategies
In-Memory Disk Remote
Hash Bytes, schema query plan, schema name, schema, source, namespace, last modification time
Example pandas, duckdb, datafusion duckdb, datafusion Snowflake, Postgres

Cache Busting (Invalidation)

For SourceStorage and ParquetCacheStorage, the cache is invalidated when the upstream data changes. We do this by using the last update time of the source table as an input into a hash.

For SnapshotStorage, no automatic invalidation is implemented. This is useful in case of a one-off analysis where you don’t need to worry about the source data changing.

Differences with ibis.cache

Ibis’s caching works with a single backend i.e. it creates a temporary table in the underlying backend with the cache, similar to using SourceStorage with the same Source as the Backend. Most importantly, it is not deferred and executes immediately. letsql caching is more flexible and allows for more granular cache invalidation strategies as well as chaining caches using multiple engines. For example, it should be fairly easy to use duckdb S3 parquet functionality to build the same cache that we built on a local disk in the example above.

Conclusion

We would love to hear your feedback on this feature. Please try it out and let us know what you think. In future, we’d like to experiement with remote cache storage, and more granular cache invalidation strategies.