Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add duckplyr #95

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/regression.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
strategy:
fail-fast: false
matrix:
solution: [data.table, collapse, dplyr, pandas, pydatatable, spark, juliadf, juliads, polars, R-arrow, duckdb, datafusion, dask, clickhouse]
solution: [data.table, collapse, dplyr, duckplyr, pandas, pydatatable, spark, juliadf, juliads, polars, R-arrow, duckdb, datafusion, dask, clickhouse]
name: Regression Tests solo solutions
runs-on: ubuntu-20.04
env:
Expand Down
22 changes: 22 additions & 0 deletions _benchplot/benchplot-dict.R
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ solution.dict = {list(
"collapse" = list(name=c(short="collapse", long="collapse"), color=c(strong="darkturquoise", light="turquoise")),
"data.table" = list(name=c(short="data.table", long="data.table"), color=c(strong="blue", light="#7777FF")),
"dplyr" = list(name=c(short="dplyr", long="dplyr"), color=c(strong="red", light="#FF7777")),
"duckplyr" = list(name=c(short="duckplyr", long="duckplyr"), color=c(strong="#ddcd07", light="#fff100")),
"pandas" = list(name=c(short="pandas", long="pandas"), color=c(strong="green4", light="#77FF77")),
"pydatatable" = list(name=c(short="pydatatable", long="(py)datatable"), color=c(strong="darkorange", light="orange")),
"spark" = list(name=c(short="spark", long="spark"), color=c(strong="#8000FFFF", light="#CC66FF")),
Expand Down Expand Up @@ -103,6 +104,18 @@ groupby.syntax.dict = {list(
"regression v1 v2 by id2 id4" = "DF %>% group_by(id2, id4) %>% summarise(r2=cor(v1, v2, use=\"na.or.complete\")^2)",
"sum v3 count by id1:id6" = "DF %>% group_by(id1, id2, id3, id4, id5, id6) %>% summarise(v3=sum(v3, na.rm=TRUE), count=n())"
)},
"duckplyr" = {c(
"sum v1 by id1" = "DF %>% group_by(id1) %>% summarise(v1=sum(v1, na.rm=TRUE))",
"sum v1 by id1:id2" = "DF %>% group_by(id1, id2) %>% summarise(v1=sum(v1, na.rm=TRUE))",
"sum v1 mean v3 by id3" = "DF %>% group_by(id3) %>% summarise(v1=sum(v1, na.rm=TRUE), v3=mean(v3, na.rm=TRUE))",
"mean v1:v3 by id4" = "DF %>% group_by(id4) %>% summarise_at(.funs=\"mean\", .vars=c(\"v1\",\"v2\",\"v3\"), na.rm=TRUE)",
"sum v1:v3 by id6" = "DF %>% group_by(id6) %>% summarise_at(.funs=\"sum\", .vars=c(\"v1\",\"v2\",\"v3\"), na.rm=TRUE)",
"median v3 sd v3 by id4 id5" = "DF %>% group_by(id4, id5) %>% summarise(median_v3=median(v3, na.rm=TRUE), sd_v3=sd(v3, na.rm=TRUE))",
"max v1 - min v2 by id3" = "DF %>% group_by(id3) %>% summarise(range_v1_v2=max(v1, na.rm=TRUE)-min(v2, na.rm=TRUE))",
"largest two v3 by id6" = "DF %>% select(id6, largest2_v3=v3) %>% filter(!is.na(largest2_v3)) %>% arrange(desc(largest2_v3)) %>% group_by(id6) %>% filter(row_number() <= 2L)",
"regression v1 v2 by id2 id4" = "DF %>% group_by(id2, id4) %>% summarise(r2=cor(v1, v2, use=\"na.or.complete\")^2)",
"sum v3 count by id1:id6" = "DF %>% group_by(id1, id2, id3, id4, id5, id6) %>% summarise(v3=sum(v3, na.rm=TRUE), count=n())"
)},
"pandas" = {c(
"sum v1 by id1" = "DF.groupby('id1', as_index=False, sort=False, observed=True, dropna=False).agg({'v1':'sum'})",
"sum v1 by id1:id2" = "DF.groupby(['id1','id2'], as_index=False, sort=False, observed=True, dropna=False).agg({'v1':'sum'})",
Expand Down Expand Up @@ -252,6 +265,7 @@ groupby.syntax.dict = {list(
"collapse" = list(),
"data.table" = list(),
"dplyr" = list(),
"duckplyr" = list(),
"pandas" = list(),
"pydatatable" = list(),
"spark" = list("not yet implemented: SPARK-26589" = "median v3 sd v3 by id4 id5"),
Expand Down Expand Up @@ -364,6 +378,13 @@ join.syntax.dict = {list(
"medium inner on factor" = "inner_join(DF, medium, by='id5')",
"big inner on int" = "inner_join(DF, big, by='id3')"
)},
"duckplyr" = {c(
"small inner on int" = "inner_join(DF, small, by='id1')",
"medium inner on int" = "inner_join(DF, medium, by='id2')",
"medium outer on int" = "left_join(DF, medium, by='id2')",
"medium inner on factor" = "inner_join(DF, medium, by='id5')",
"big inner on int" = "inner_join(DF, big, by='id3')"
)},
"juliadf" = {c(
"small inner on int" = "innerjoin(DF, small, on = :id1, makeunique=true, matchmissing=:equal)",
"medium inner on int" = "innerjoin(DF, medium, on = :id2, makeunique=true, matchmissing=:equal)",
Expand Down Expand Up @@ -446,6 +467,7 @@ join.query.exceptions = {list(
"collapse" = list(),
"data.table" = list(),
"dplyr" = list(),
"duckplyr" = list(),
"pandas" = list(),
"pydatatable" = list(),
"spark" = list(),
Expand Down
2 changes: 2 additions & 0 deletions _control/solutions.csv
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ data.table,groupby2014
dplyr,groupby
dplyr,join
dplyr,groupby2014
duckplyr,groupby
duckplyr,join
pandas,groupby
pandas,join
pandas,groupby2014
Expand Down
2 changes: 1 addition & 1 deletion _launcher/launcher.R
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ readret = function(x) {
file.ext = function(x) {
ans = switch(
x,
"collapse"=, "data.table"=, "dplyr"=, "h2o"=, "R-arrow"=, "duckdb"="R", "duckdb-latest"="R",
"collapse"=, "data.table"=, "dplyr"=, "duckplyr"=, "h2o"=, "R-arrow"=, "duckdb"="R", "duckdb-latest"="R",
"pandas"=, "spark"=, "pydatatable"=, "modin"=, "dask"=, "datafusion"=, "polars"="py",
"clickhouse"="sh", "juliadf"="jl", "juliads"="jl",
)
Expand Down
2 changes: 1 addition & 1 deletion _launcher/solution.R
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ if ("quiet" %in% names(args)) {
file.ext = function(x) {
ans = switch(
x,
"collapse"=, "data.table"=, "dplyr"=, "h2o"=, "R-arrow"=, "duckdb"="R", "duckdb-latest"="R",
"collapse"=, "data.table"=, "dplyr"=, "duckplyr"=, "h2o"=, "R-arrow"=, "duckdb"="R", "duckdb-latest"="R",
"pandas"="py", "spark"=, "pydatatable"=, "modin"=, "dask"=, "datafusion"=, "polars"="py",
"clickhouse"="sh", "juliadf"="jl", "juliads"="jl"
)
Expand Down
2 changes: 1 addition & 1 deletion _report/report.R
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ get_report_status_file = function(path=getwd()) {
file.path(path, "report-done")
}
get_report_solutions = function() {
c("collapse", "data.table", "dplyr", "pandas", "pydatatable", "spark", "dask", "juliadf", "juliads", "clickhouse", "cudf", "polars", "duckdb", "datafusion", "arrow", "R-arrow")
c("collapse", "data.table", "dplyr", "duckplyr", "pandas", "pydatatable", "spark", "dask", "juliadf", "juliads", "clickhouse", "cudf", "polars", "duckdb", "datafusion", "arrow", "R-arrow")
}
get_data_levels = function() {
## groupby
Expand Down
1 change: 1 addition & 0 deletions duckplyr/VERSION
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
0.4.1
170 changes: 170 additions & 0 deletions duckplyr/groupby-duckplyr.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
#!/usr/bin/env Rscript

cat("# groupby-duckplyr.R\n")

source("./_helpers/helpers.R")

stopifnot(requireNamespace(c("bit64","data.table"), quietly=TRUE)) # used in chk to sum numeric columns and data loading
.libPaths("./duckplyr/r-duckplyr") # tidyverse/duckplyr#4641
suppressPackageStartupMessages(library("duckplyr", lib.loc="./duckplyr/r-duckplyr", warn.conflicts=FALSE))
ver = packageVersion("duckplyr")
git = "" # uses stable version now #124
task = "groupby"
solution = "duckplyr"
fun = "group_by"
cache = TRUE
on_disk = FALSE

data_name = Sys.getenv("SRC_DATANAME")
src_grp = file.path("data", paste(data_name, "csv", sep="."))
cat(sprintf("loading dataset %s\n", data_name))

x = as_duckplyr_tibble(data.table::fread(src_grp, showProgress=FALSE, na.strings="", data.table=FALSE))
print(nrow(x))

task_init = proc.time()[["elapsed"]]
cat("grouping...\n")

question = "sum v1 by id1" # q1
t = system.time(print(dim(ans<-x %>% summarise(.by = id1, v1=sum(v1)))))[["elapsed"]]
m = memory_usage()
chkt = system.time(chk<-summarise(ans, v1=sum(v1)))[["elapsed"]]
write.log(run=1L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=nrow(ans), out_cols=ncol(ans), solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
rm(ans)
t = system.time(print(dim(ans<-x %>% summarise(.by = id1, v1=sum(v1)))))[["elapsed"]]
m = memory_usage()
chkt = system.time(chk<-summarise(ans, v1=sum(v1)))[["elapsed"]]
write.log(run=2L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=nrow(ans), out_cols=ncol(ans), solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
print(head(ans, 3))
print(tail(ans, 3))
rm(ans)

question = "sum v1 by id1:id2" # q2
t = system.time(print(dim(ans<-x %>% summarise(.by = c(id1, id2), v1=sum(v1)))))[["elapsed"]]
m = memory_usage()
chkt = system.time(chk<-summarise(ans, v1=sum(v1)))[["elapsed"]]
write.log(run=1L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=nrow(ans), out_cols=ncol(ans), solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
rm(ans)
t = system.time(print(dim(ans<-x %>% summarise(.by = c(id1, id2), v1=sum(v1)))))[["elapsed"]]
m = memory_usage()
chkt = system.time(chk<-summarise(ans, v1=sum(v1)))[["elapsed"]]
write.log(run=2L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=nrow(ans), out_cols=ncol(ans), solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
print(head(ans, 3))
print(tail(ans, 3))
rm(ans)

question = "sum v1 mean v3 by id3" # q3
t = system.time(print(dim(ans<-x %>% summarise(.by = id3, v1=sum(v1), v3=mean(v3)))))[["elapsed"]]
m = memory_usage()
chkt = system.time(chk<-summarise(ans, v1=sum(v1), v3=sum(v3)))[["elapsed"]]
write.log(run=1L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=nrow(ans), out_cols=ncol(ans), solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
rm(ans)
t = system.time(print(dim(ans<-x %>% summarise(.by = id3, v1=sum(v1), v3=mean(v3)))))[["elapsed"]]
m = memory_usage()
chkt = system.time(chk<-summarise(ans, v1=sum(v1), v3=sum(v3)))[["elapsed"]]
write.log(run=2L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=nrow(ans), out_cols=ncol(ans), solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
print(head(ans, 3))
print(tail(ans, 3))
rm(ans)

question = "mean v1:v3 by id4" # q4
t = system.time(print(dim(ans<-x %>% summarise(.by = id4, v1 = mean(v1), v2 = mean(v2), v3 = mean(v3)))))[["elapsed"]]
m = memory_usage()
chkt = system.time(chk<-summarise(ans, v1=sum(v1), v2=sum(v2), v3=sum(v3)))[["elapsed"]]
write.log(run=1L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=nrow(ans), out_cols=ncol(ans), solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
rm(ans)
t = system.time(print(dim(ans<-x %>% summarise(.by = id4, v1 = mean(v1), v2 = mean(v2), v3 = mean(v3)))))[["elapsed"]]
m = memory_usage()
chkt = system.time(chk<-summarise(ans, v1=sum(v1), v2=sum(v2), v3=sum(v3)))[["elapsed"]]
write.log(run=2L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=nrow(ans), out_cols=ncol(ans), solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
print(head(ans, 3))
print(tail(ans, 3))
rm(ans)

question = "sum v1:v3 by id6" # q5
t = system.time(print(dim(ans<-x %>% summarise(.by = id6, v1 = sum(v1), v2 = sum(v2), v3 = sum(v3)))))[["elapsed"]]
m = memory_usage()
chkt = system.time(chk<-summarise(ans, v1=sum(v1), v2=sum(v2), v3=sum(v3)))[["elapsed"]]
write.log(run=1L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=nrow(ans), out_cols=ncol(ans), solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
rm(ans)
t = system.time(print(dim(ans<-x %>% summarise(.by = id6, v1 = sum(v1), v2 = sum(v2), v3 = sum(v3)))))[["elapsed"]]
m = memory_usage()
chkt = system.time(chk<-summarise(ans, v1=sum(v1), v2=sum(v2), v3=sum(v3)))[["elapsed"]]
write.log(run=2L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=nrow(ans), out_cols=ncol(ans), solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
print(head(ans, 3))
print(tail(ans, 3))
rm(ans)

question = "median v3 sd v3 by id4 id5" # q6
t = system.time(print(dim(ans<-x %>% summarise(.by = c(id4, id5), median_v3=median(v3), sd_v3=sd(v3)))))[["elapsed"]]
m = memory_usage()
chkt = system.time(chk<-summarise(ans, median_v3=sum(median_v3), sd_v3=sum(sd_v3)))[["elapsed"]]
write.log(run=1L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=nrow(ans), out_cols=ncol(ans), solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
rm(ans)
t = system.time(print(dim(ans<-x %>% summarise(.by = c(id4, id5), median_v3=median(v3), sd_v3=sd(v3)))))[["elapsed"]]
m = memory_usage()
chkt = system.time(chk<-summarise(ans, median_v3=sum(median_v3), sd_v3=sum(sd_v3)))[["elapsed"]]
write.log(run=2L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=nrow(ans), out_cols=ncol(ans), solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
print(head(ans, 3))
print(tail(ans, 3))
rm(ans)

question = "max v1 - min v2 by id3" # q7
t = system.time(print(dim(ans<-x %>% summarise(.by = id3, range_v1_v2=max(v1)-min(v2)))))[["elapsed"]]
m = memory_usage()
chkt = system.time(chk<-summarise(ans, range_v1_v2=sum(range_v1_v2)))[["elapsed"]]
write.log(run=1L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=nrow(ans), out_cols=ncol(ans), solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
rm(ans)
t = system.time(print(dim(ans<-x %>% summarise(.by = id3, range_v1_v2=max(v1)-min(v2)))))[["elapsed"]]
m = memory_usage()
chkt = system.time(chk<-summarise(ans, range_v1_v2=sum(range_v1_v2)))[["elapsed"]]
write.log(run=2L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=nrow(ans), out_cols=ncol(ans), solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
print(head(ans, 3))
print(tail(ans, 3))
rm(ans)

question = "largest two v3 by id6" # q8
t = system.time(print(dim(ans<-x %>% select(id6, largest2_v3=v3) %>% filter(!is.na(largest2_v3)) %>% arrange(desc(largest2_v3)) %>% filter(.by = id6, row_number() <= 2L))))[["elapsed"]]
m = memory_usage()
chkt = system.time(chk<-summarise(ans, largest2_v3=sum(largest2_v3)))[["elapsed"]]
write.log(run=1L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=nrow(ans), out_cols=ncol(ans), solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
rm(ans)
t = system.time(print(dim(ans<-x %>% select(id6, largest2_v3=v3) %>% filter(!is.na(largest2_v3)) %>% arrange(desc(largest2_v3)) %>% filter(.by = id6, row_number() <= 2L))))[["elapsed"]]
m = memory_usage()
chkt = system.time(chk<-summarise(ans, largest2_v3=sum(largest2_v3)))[["elapsed"]]
write.log(run=2L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=nrow(ans), out_cols=ncol(ans), solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
print(head(ans, 3))
print(tail(ans, 3))
rm(ans)

question = "regression v1 v2 by id2 id4" # q9
t = system.time(print(dim(ans<-x %>% summarise(.by = c(id2, id4), r2=cor(v1, v2, use="na.or.complete")^2))))[["elapsed"]]
m = memory_usage()
chkt = system.time(chk<-summarise(ans, r2=sum(r2)))[["elapsed"]]
write.log(run=1L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=nrow(ans), out_cols=ncol(ans), solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
rm(ans)
t = system.time(print(dim(ans<-x %>% summarise(.by = c(id2, id4), r2=cor(v1, v2, use="na.or.complete")^2))))[["elapsed"]]
m = memory_usage()
chkt = system.time(chk<-summarise(ans, r2=sum(r2)))[["elapsed"]]
write.log(run=2L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=nrow(ans), out_cols=ncol(ans), solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
print(head(ans, 3))
print(tail(ans, 3))
rm(ans)

question = "sum v3 count by id1:id6" # q10
t = system.time(print(dim(ans<-x %>% summarise(.by = c(id1, id2, id3, id4, id5, id6), v3=sum(v3), count=n()))))[["elapsed"]]
m = memory_usage()
chkt = system.time(chk<-summarise(ans, v3=sum(v3), count=sum(count)))[["elapsed"]]
write.log(run=1L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=nrow(ans), out_cols=ncol(ans), solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
rm(ans)
t = system.time(print(dim(ans<-x %>% summarise(.by = c(id1, id2, id3, id4, id5, id6), v3=sum(v3), count=n()))))[["elapsed"]]
m = memory_usage()
chkt = system.time(chk<-summarise(ans, v3=sum(v3), count=sum(count)))[["elapsed"]]
write.log(run=2L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=nrow(ans), out_cols=ncol(ans), solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
print(head(ans, 3))
print(tail(ans, 3))
rm(ans)

cat(sprintf("grouping finished, took %.0fs\n", proc.time()[["elapsed"]]-task_init))

if( !interactive() ) q("no", status=0)
Loading
Loading