TL;DR
LETSQL is a high-level and declarative data processing library focused on multi-engine orchestration. Here we present an experimental caching feature in letsql
for sub-expressions representing an upstream query. This feature allows you to cache the results of a SQL query in the upstream engine or local disk for rapid iteration and is available in beta. Moreoever, we are actively working to improve the performance and usability of this feature and develop a stable API. Your feedback is very welcome.
Drop into an interactive IPython shell with the experimental feature simply by running:
github:letsql/letsql nix run
Introduction
Please note that this is an experimental feature. The API is unstable and is expected to change.
At LETSQL, we are introducing an experimental feature that allows you to cache the results of an upstream engine, while using the high-level, familiar DataFrame API. Under the hood, the DataFrame expression is compiled down to the source engine (e.g. SQL for duckdb
and datafusion
or supported remote backends) and the results are cached as a table in either a remote engine, a local in-memory engine, or as Parquet files on local disk.
Here is what it looks like:
Why it matters
Often times, Data Scientists need to cache data to a local disk instead of pulling the data from its source over and over again or re-computing on every invocation. The iterative nature of building machine learning models makes caching an integral part of the workflow. Caching makes iterating on the ML models faster, reducing developer cognitive load and reducing stress on the source systems or network bandwidth. Usually, this is done manually, possibly in a different language from the model development process (often in SQL), resulting in disjointed workflows. This inherent break between source data and model development can be painful when time comes to finally test or deploy the model using the refreshed source instead of the local cache that we manually built. It can also make deployment of models a mystery hunt through the source for all possible model inputs that live on the modeler’s workstation.
How people solve for it today?
SQL as Strings, SQL as ORM
In our experience working in the enterprise, most data scientists and engineers use raw sql strings and stick them in their pd.from_sql
or ibis.sql
calls. While not popular with Data Scientists, sqlalchemy
is a slightly better solution as it allows you to build the query in a more programmatic way. SQLAlchemy is geared towards developers for transactional use-cases and not very intuitive for representing analytical operations. However, this is still a manual process and does not provide a way to cache the results.
Manual Caching
Besides being hard to read, maintain and test, in order to cache the source data on local disk or object storage is a highly manual process relying on either pickling, saving to parquet or using an in-memory cache like functools.cache
, cachey
or similar.
import pandas as pd
= pd.read_sql('SELECT * FROM table', 'sqlite:///data.db')
df 'data.parquet')
df.to_parquet(# or
'data.pkl') df.to_pickle(
While this works, it is not a scalable solution. Parquet may be more portable than pickle but it is still a manual process i.e the data scientist has to manually update the cache every time the source data changes. And since, we are mixing query langauge (SQL) and programming langue (Python) there is no guarantee that your subsequent Python process will run when you refresh the cache, since the source data may have changed resulting in a disjointed workflow.
In-Memory
functools.cache
usage is straightforward and though it requires arguments to be hashable is applicable here because our argument is a string. Cachey offers a better solution that has a notion of cost of a computation, but both of these solutions are only useful for in-memory persistence. What if we could transparently add a cache operator anywhere of our choosing in our computation expression and have the cache both persist and be automatically invalidated when the source data changes?
How it works
The caching operator should be able to cache any ibis.expr
, not just the final result. This is because the expression can be a complex graph of operations, parts of which can be expensive to recompute but change infrequently and (hopefully downstream) parts of which might be cheap to compute but change frequently. Additionally, we need to be able to specify how the cache is invalidated. Maybe the data changes, but infrequently enough that the developer is minimally impacted (consider modelling on data with nightly updates). In order to do the above, we want to simply chain our expression with the cache
operator, and it should do the right thing.
In the latest main
branch on GitHub (4ae449dc as of this writing), we added a cache
operator. This operator can be chained anywhere in the ibis.expr
and takes storage
as an input.
SourceStorage | Snapshot Storage | |
---|---|---|
Cache Invalidation | yes | no |
Persistence | Source dependent; use ParquetCacheStorage to ensure durable persistence to disk |
Source dependent |
We have two main types of Storage: SourceStorage
, SnapshotStorage
. The source
input can be a remote (snowflake
, postgres
) or in-process (pandas
, duckdb
, datafusion
) backend. If the source
used for the cache is in-memory, the cache is in-memory e.g. pandas
. Moreover, SourceStorage
will invalidate the hash when the upstream data changes - depending on the source
above - while SnapshotStorage
will not. More on this later.
Similar to SourceStorage
, ParquetCacheStorage
caches the results of the SQL query in Parquet files on the local disk using the source
backend to do the writing.
import letsql as ls
= ls.connect()
con
from letsql.common.caching import SourceStorage
from letsql.common.utils.snowflake_utils import make_connection
= make_connection(database=..., schema=..., warehosue=...)
snow_backend = SourceStorage(source=con) # datafusion backend from letsql
cache
= (
t 'table')
snow_backend.table(=cache)
.cache(storage=lambda x: x['column'] + 1)
.mutate(new_column
)
t.execute()
The snow_backend.table('table').cache(storage=cache)
will cache the results of the table query in the SourceStorage
cache and subsequent computation will read from the cache rather than making a request to snowflake.
Hashing Strategies
Our strategy for hashing the data depends on where the data is stored. For in-memory sources (currently pandas
is always in-memory), the schema and the in-memory bytes of the data are hashed. For duckdb
and datafusion
backends, if the data is on disk or in an object store, the query plan (capturing the source file name) as well as the schema is hashed. For remote backends i.e. Snowflake and Postgres, the name, source, schema, namespace and optionally last modified date of the table is hashed.
In-Memory | Disk | Remote | |
---|---|---|---|
Hash | Bytes, schema | query plan, schema | name, schema, source, namespace, last modification time |
Example | pandas, duckdb, datafusion | duckdb, datafusion | Snowflake, Postgres |
Cache Busting (Invalidation)
For SourceStorage
and ParquetCacheStorage
, the cache is invalidated when the upstream data changes. We do this by using the last update time of the source table as an input into a hash.
For SnapshotStorage
, no automatic invalidation is implemented. This is useful in case of a one-off analysis where you don’t need to worry about the source data changing.
Differences with ibis.cache
Ibis’s caching works with a single backend i.e. it creates a temporary table in the underlying backend with the cache, similar to using SourceStorage
with the same Source as the Backend. Most importantly, it is not deferred and executes immediately. letsql
caching is more flexible and allows for more granular cache invalidation strategies as well as chaining caches using multiple engines. For example, it should be fairly easy to use duckdb
S3 parquet functionality to build the same cache that we built on a local disk in the example above.
Conclusion
We would love to hear your feedback on this feature. Please try it out and let us know what you think. In future, we’d like to experiement with remote cache storage, and more granular cache invalidation strategies.