As a sanity check I just cloned https://github.com/h2oai/db-benchmark, ran the data generation script and ran on a 64 core AMD EPYC (AWS c7a.16xlarge):
import polars as pl
lf = pl.scan_csv("G1_1e9_1e2_0_0.csv")
print(lf.select(pl.col.v1.sum()).collect())
The above script ran in 7.58 seconds.
If I change the collect() to collect(new_streaming=True) to use the new streaming engine I've been working on, it runs in 6.90 seconds.
I can't realistically time the full "read CSV to memory" with this 50 GB file on this machine as we start swapping (this machine has 128GiB memory) and/or evicting data from disk cache (this machine has a slow EC2 SSD attached to it), so we do have a blow-up of memory usage (which could be as simple as loading small integers into an 8-byte Uint64 column). I think it's likely that on K's machine the "read full CSV to memory" approach also started swapping, giving the large runtime. However, in Polars you'd typically write your query using LazyFrames, which means we don't actually have to load the full CSV into memory.
EDIT: running on a m7a.16xlarge with twice the memory (256GiB) once the CSV file is in disk cache Polars can parse the full CSV file into an in-memory dataframe in 7.68 seconds.
K's claim that it parses the full 50GB CSV in 1.6 seconds if true is very impressive regardless.
Honestly 7 seconds even just to parse the CSV is already pretty impressive, 7GB/s would be simdjson speeds if you did it on a single core. Do you have a single-threaded parser with really well-tuned SIMD, or a speculative parallel one, or ..?
We have a single-threaded chunker that scans serially over the file. This chunker exclusively finds unquoted newlines (using SIMD) to find clean parallelization boundaries, it doesn't do any further parsing. Those parallelization boundaries are then used to feed worker threads chunks of data to properly parse into our in-memory representation (which mostly follows Arrow).
Would you know how much of the total runtime is devoted to the initial chunking process? Amdahl's law would prefer an entirely speculative approach in the limit, but I could imagine that the 2x overhead might not be worth it for reasonable file sizes and core counts.
(But even then, 1.6 s would be quite a feat. It makes me wonder if the K implementation is partially lazy, as you say typical Polars usage is.)
It seems from a profile that on the eager engine the serial scanner is able to feed ~32 threads worth of decoding: https://share.firefox.dev/4hS1eJa.
It might be worth speculating, or at least optimizing the serial chunker more. You could theoretically start a second serial chunker from the end working backwards but that would not be wise with our ordered streams, as the decoded data would have to be buffered for a long time.
Similarly on the new streaming engine, each thread is active ~half of the time, except the thread running the chunking task: https://share.firefox.dev/3WQV9og.
Note that in a lot of realistic workloads on the streaming engine compute can happen in between decodes, completely hiding the bottleneck. Also all of the above is with the file being completely in file cache, if fed from a slow SSD it's not a bottleneck whatsoever.
Seems easy enough to use a parallel scan if you're willing to accept a little work inefficiency, right? Assign each scanner thread a block, first each one counts/xors how many quotes are in its block, exclusive scan on those (last thread's result is unused), and you have the quoting state at the start of each block. And hopefully that block's still in the core's cache.
Or since newlines in strings should be rare, maybe it works to save the index of every newline and tag it with the parity of preceding quotes in the block. Then you get the true parity once each thread's finished its block and filter with that, which is faster than going back over the block unless there were tons of newlines.
Yes, I did already propose (at the office) a parity-agnostic chunker (we only need the number of lines + a splitpoint from the chunker) that can do parallel work and only needs a small moment of synchronization to find out which of the two parities it is to lock in a final answer. There would still be a global serial dependency, but on blocks rather than on bytes.
But we only have a finite amount of time and tons and tons of work, so no one has gotten around to it yet. At least now we know that it might be worthwhile for >= ~32 core machines. PRs welcome :)
All right, just threw me off a little that you'd consider speculating or backwards decoding as I wouldn't expect them to be easier, or significantly faster (or maybe you consider parity-independence to be speculation? I can see it).
Yes, I meant parity-independence with speculation. Essentially you assume either you are or are not within a string at the start and do your computation based on that assumption, then throw away the result with the unsound assumption. Both assumptions can share most of their computation I believe, so I can understand one might see it from the other perspective where you'd start with calling it parity-independence rather than speculation with shared computation.
There might also be the option of just optimistically assuming that, for points in a file with a sequence of like >4K bytes of proper newlines with proper comma counts in each, that here probably isn't in the middle of a multiline string, and parsing it as such (of course with proper fallback if this turns out false; but you'll at least know that this whole run is in the middle of a multiline string).
Also, if you encounter a double-quote character anywhere with a comma on one side and neither a newline, double-quote nor comma on the other, you immediately know 100% whether it starts or ends a string.
As a sanity check I just cloned https://github.com/h2oai/db-benchmark, ran the data generation script and ran on a 64 core AMD EPYC (AWS c7a.16xlarge):
The above script ran in 7.58 seconds.If I change the collect() to collect(new_streaming=True) to use the new streaming engine I've been working on, it runs in 6.90 seconds.
I can't realistically time the full "read CSV to memory" with this 50 GB file on this machine as we start swapping (this machine has 128GiB memory) and/or evicting data from disk cache (this machine has a slow EC2 SSD attached to it), so we do have a blow-up of memory usage (which could be as simple as loading small integers into an 8-byte Uint64 column). I think it's likely that on K's machine the "read full CSV to memory" approach also started swapping, giving the large runtime. However, in Polars you'd typically write your query using LazyFrames, which means we don't actually have to load the full CSV into memory.
EDIT: running on a m7a.16xlarge with twice the memory (256GiB) once the CSV file is in disk cache Polars can parse the full CSV file into an in-memory dataframe in 7.68 seconds.
K's claim that it parses the full 50GB CSV in 1.6 seconds if true is very impressive regardless.