If you don’t want to change your whole stack, ClickHouse’s Materialized Views do something extraordinarily similar, where computations are ran on inserts to the source table in an online/streaming manner. I’m curious how this solution compares in its set of features/gaurantees.
ClickHouse's materialized views are wonderful, but they do require very careful design up front.
In particular, aggregations need to be defined using the special AggregateFunction data types, which must be paired with the corresponding aggregation functions such as countMerge(). Joins are possible in CH views, but they operate in a specific way (against the insert batch) that you must know about; joins against other tables are generally a bad idea for performance, and you should use dictionaries as much as possible for fast in-memory lookup. Lastly, it's also hard to update MVs because their entire source query has to be modified as a whole. Adding a column requires declaring the whole MV, which introduces the possibility of making mistakes in your migrations.
CH views are really more like triggers, and so they're a little misleadingly named. Very powerful, of course. In short, a lot more "manual" than this other system.
For incremental computation, Feldera is just way more powerful and general. It can evaluate arbitrarily sophisticated SQL programs incrementally (tables and deeply nested layers of views). For example, it can do rolling aggregates over joins, handle late and out-of-order arrivals, can compute over infinite streams with finite state (via automatic garbage collection), and it's strongly consistent. Clickhouse's materialized views are much simpler and restricted in comparison.
That said, we are /not/ a replacement ever for Clickhouse or any other historical warehouse. In fact, we pair best with one of them. Have data flow through or teed into Feldera to maintain sophisticated standing queries -- maintain historical data in your warhehouse.
I would advise everybody to stay clear of anything that isn't Feldera or Materialize. Nobody aside from these guys have a IVM product that is grounded on proper theory.
If you are interested in trying out the theory (DBSP) underneath Feldera, but in Python, then check this out: https://github.com/brurucy/pydbsp
It's based on Z-Sets - a generalization of relational algebra.
Many of the aggregations, projections, filters from SQL are associative and can be implemented in Z-Sets. Z-Sets supports incremental operations (adding one value to a set while computing the 'max' is just the max of the two arguments - rather than requiring recomputing the 'max' over the entire set.
dumb question: how do z-sets or feldera deal with updates to values that were incorporated into the max already?
For example - max over {4, 5} is 5. Now I update the 5 to a 3, so the set becomes {4, 3} with a max of 4. This seems to imply that the z-sets would need to store ALL the values - again, in their internal state.
Also there needs to be some logic somewhere that says that the data structure for updating values in a max aggregation needs to be a heap. Is that all happening somewhere?
We use monotonicity detection for various things. I believe (can double check) that it's used for max as well. But you're correct that in the general case, max is non-linear, so will need to maintain state.
Update from Leonid on current implementation: each group is ordered by the column on which we compute max, so it's O(1) to pick the last value from the index.
Just a guess... wouldl like to hear the answer as well.
they probably have a monotonicity detector somewhere, which can decide whether to keep all the values or discard them. If they keep them, they probably use something like a segment tree to index.
That's right, we perform static dataflow analysis to determine what data can get discarded. GC itself is done lazily as part of LSM tree maintenance. For MAX specifically, we don't have this optimization yet. In the general case, incrementally maintaining the MAX aggregate in the presence of insertions and deletions requires tracking the entire contents of the group, which is what we do. If the collection can be proved to be append-only, then it's sufficient to store only the current max element. This optimization is yet coming to Feldera.
This is pretty neat but I'm wondering how well this implementation obeys dataframe algebra. Ponder goes into detail about how dataframes and relations aren't the same, but your dataframe zset seems to be more or less the exact same thing as the relation zset?
I tried the demo and it looks quite promising. Right now it seems to be focused on handling a few queries over high throughput streams. I wonder if it could also work for the following scenario.
It behaves almost like a normal sql database, i.e. most data is cold on disk, queries are low latency, no need to predefined them, acid compliant, etc.
Except you can subscribe to queries. That means the initial response needs to be as fast as in traditional databases and the database needs to be able to scale to large number concurrent subscriptions.
If this where possible it could alleviate the common problem, that Web Apps that need to keep UI up to date, need to constantly poll the database, creating a lot of overhead and making anything but trivial queries a non starter.
hi YmiYugy. We have a REST API to subscribe to a change stream (basically what the UI uses underneath).
Our ad-hoc query interface is about to receive snapshot-and-follow support as well which should do what you want. That way, you don't need to poll for changes.
I’ve been following the Feldera/DBSP/Differential Datalog team for a while and am happy to see y’all stable-ish with your own venture and settling in a model more approachable than DDlog for most developers :)
This seems much more adoptable to me in my org than DDlog was, even if I really liked DDlog much more than SQL :-(
I remember seeing a VMware-internal presentation on the DDlog work which led to Feldera and being absolutely blown away. They took a stream processing problem that had grown to an hours-deep backlog and reduced it to sub second processing times. Lalith & co are the real deal.
Incredible… I hadn’t even noticed, and people found the holy grail and open-sourced it!
By the way, I was wondering about a related question. Do streaming engines typically store a copy of the data streamed to them? For instance, if I had a view to get the maximum value of a table, and the maximum value was removed, the streaming engine surely needs to get the next value from somewhere. It seems clear that the streaming engine needs at least its own snapshot of the data to have a consistent state of the computation, but duplicating the persisted data seems somewhat wasteful.
The state Feldera maintains depends on the queries you write and the working set or windows you're computing over. Any time there are joins, distinct or non-linear aggregations, we need to maintain state as you've guessed.
A cool feature in Feldera is that it can compute over infinite streams with finite state because we automate garbage collection. The user specifies lateness over data sources or even views, and with some static analysis, Feldera determines when it is safe to forget old state such that it won't affect the output of any views.
This looks extremely cool. This is basically incremental view maintenance in databases, a problem that almost everybody (I think) has when using SQL databases and wanting to do some derived views for more performant access patterns. Importantly, they seem to support a wide breath of SQL operators, support spilling computation state to disk, and it's open-source! Interestingly, it compiles queries to Rust, so an approach similar to Redshift (which compiles queries to C++ programs).
There's already a bunch of tools in this area:
1. Materialize[0], which afaik is more big-data oriented, and doesn't pipe the results back to your database, instead storing results in S3 and serving them.
2. Epsio[1], which I've never used, seems to be very similar to this product, but is closed-source only.
3. When building OctoSQL[2], this capability was also important to me and it was designed from ground up to support it. Though in practice in a tool like OctoSQL it's pretty useless (was a fun problem to solve though).
There's some things I'm curious about:
- Does it handle queries that involve complex combinations of ordering with limits in subqueries? If due to a change in an underlying table a top-n row is added, resulting in moving other rows around (and removing the current n'th) will the subsequent query parts behave as though the order was maintained when computing it, or will it fall apart (imagine a select with limit from a select with bigger limit)?
- Is it internally consistent[3]? They say it's "strongly consistent" and "It also guarantees that the state of the views always corresponds to what you'd get if you ran the queries in a batch system for the same input." so I think the answer is yes, but this one's really important.
Either way, will have to play with this, and dig into the paper (the link in the repo doesn't work, here's an arXiv link[4]). Wishing the creators good luck, this looks great!
"Incremental view maintenance has been for a long time a central
problem in database theory. Many solutions have been proposed for restricted classes of database languages, such as the relational algebra, or Datalog. These techniques do not naturally generalize to richer languages. In this paper we give a general solution to this problem in 3 steps: (1) we describe a simple but expressive language called DBSP for describing computations over data
streams; (2) we give a general algorithm for solving the incremental view maintenance problem for arbitrary DBSP programs, and
(3) we show how to model many rich database query languages
(including the full relational queries, grouping and aggregation,
monotonic and non-monotonic recursion, and streaming aggregation) using DBSP. As a consequence, we obtain efficient incremental view maintenance techniques for all these rich languages."
- We evaluate top-k queries incrementally and the nesting shouldn't be a problem for the engine (or it'd be a bug). If you have an example of a query, we can try it out at our end.
Our guarantee is that we always produce the same answer as if you'd ran the queries in a batch system. All views update together. You can see the computation model here: https://www.feldera.com/blog/synchronous-streaming/
Make a table with one column, x, and insert into it rows with values 1-5, and then 8-20.
Then query it using more or less `SELECT x FROM (SELECT x FROM xs LIMIT 15 ORDER BY x) LIMIT 10`, and then insert 6 into the table. Output should be 1-6, 8-11. Of course as long as the limits aren't merged together during optimisation, that would make the test-case moot.
CREATE TABLE foo (x INTEGER NOT NULL PRIMARY KEY) WITH ('materialized' = 'true') ;
CREATE MATERIALIZED VIEW bar AS SELECT x FROM (SELECT x FROM foo ORDER BY x LIMIT 15) LIMIT 10;
I think Rama [1] (by Nathan Marz behind Apache Storm) is interesting as a "NoSQL" solution for a similar problem space, as I understand it. Impressive if this can support similar scale using only SQL.
We are based on DBSP (https://www.vldb.org/pvldb/vol16/p1601-budiu.pdf) which is an evolution of DD. DBSP gives us an algorithm to take arbitrarily complex queries and generate an incremental version of it.
As a consequence, we evaluate everything incrementally. For example, we are the only engine that can perform rolling aggregates incrementally. In general, with DBSP, we can incrementalize "the full relational algebra, queries over sets and multisets, arbitrarily nested relations, aggregation, flatmap (unnest), monotonic and non-monotonic recursion, streaming aggregation, and arbitrary compositions of all of these". DBSP is a much simpler and cleaner foundation.
As a product, both our enterprise and open-source (MIT licensed) offerings let you run it anywhere you want including your laptop.
Positioning wise, we are a query engine with a unified way to compute over both bounded and unbounded data sources with perfect consistency, with an integrated storage layer. Materialize is going for building an operational warehouse.
I wonder what guarantees can be made wrt resource consumption. I suppose that'd reasonable to assume that in most (all?) cases an update is cheaper then recompute in terms of cpu cycles, but what about ram? Intuitively it seems like there must be cases that would force you to store unbounded amount of data indefinitely in ram.
(Feldera co-founder here.) There are some cases where Feldera needs to index data indefinitely, yes. For those cases, Feldera can put those indexes on storage rather than keeping them entirely in RAM.
In a lot of cases where one might initially think that data needs to stay around indefinitely, people actually want the results from the last hour or day or month, etc. For those cases, Feldera supports a concept called "lateness" that allows it to drop older data: https://docs.feldera.com/sql/streaming/#lateness-expressions.
Your intuition is correct. Incremental computation is fundamentally a time-space tradeoff. Depending on the views you write, you might end up maintaining large amounts of state. We've written about it here: https://www.feldera.com/blog/streaming-needs-storage
That said, Feldera has several features to keep state bounded even when computing on infinite streams. For example, we do automatic garbage collection (GC) where with some static analysis, we can figure out when it is safe to forget inputs that will no longer affect the output of views.
We recently ported a community member's warehouse workload to Feldera where with these features, we were evaluating As-Of joins and streaming aggregations with 1.2GB of RAM on a laptop with more than a million events/sec in perf.
would something like dbsp support spreadsheet style computations? Most of the financial world is stuck behind spreadsheets and the entire process of productioinizing spreadsheets is broken:
* Engineers don't have time to understand the spreadsheet logic and translate everything into an incremental version for production.
* Analysts don't understand the challenges with stream processing.
* SQL is still too awkward of a language for finance.
* Excel is a batch environment, which makes it hard to codify it as a streaming calculation.
If I understand correctly, your paper implies as long as there is a way to describe spreadsheets as a Zset, some incremental version of the program can be derived? Spreadsheets are pretty close to a relational table, but it would be a ZSet algebra on cells, not rows, similar to functional reactive programming. So dbsp on cells would be incremental UDFs, not just UDAFs?
Great question. DBSP should work here -- spreadsheets are by definition incremental (and there's even recursive queries there with cells depending on each other).
Note that we use Z-Sets to bridge SQL/tables with DBSP, but Z-Sets aren't general enough for spreadsheets.
Does anybody have a good resource to learn about the differences between things like Feldera, Materialize, Adapton, and other developments in the incremental computation space? Where are the experts hanging out? What are they reading?
We have a small community over at the Feldera Slack channel you could join. We often have have deeply technical discussions about the incremental computation space, papers, concepts etc over there.
I would love if something like this that exposed C bindings so that every language with an FFI could use the library. I’d love to be able to define pipelines and queries in .NET instead of having to use SQL.
Hi Nelkins. We do have a Rust crate you could consider using directly: https://docs.rs/dbsp/latest/dbsp/. Our SQL compiler puts together a pipeline by generating a Rust program that uses this crate.
They're quite different from DBSP. Given a program execution, Salsa/adapton seem to reuse prior steps from an execution when the input changes.
In contrast, DBSP has built-in knowledge of incremental versions of operations and composes them. Here's a blurb from a recent paper we submitted:
DBSP is not tied to databases in any way; it is in fact a Turing-complete language that can be used for many other purposes. But it works particularly well in the area of databases, for two reasons:
– DBSP operates on values from a commutative group. Databases can be modeled as a commutative group.
– DBSP reduces the problem of incrementalizing a complex program to the problem of incrementalizing each primitive operation that appears in the program. For databases there are known efficient incremental implementations for all primitive operations.
As to whether DBSP could be used for an incremental compiler backend, we think not (at least not with our current understanding). Would be cool to explore though.
The previous implementation we built at VMware went from datalog -> Rust, and we supported other language bindings using C bindings and FFI. The same ought to work here too.