Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use pyarrow dtype_backend #1781

Merged
merged 22 commits into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions core/src/schema.jl
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ end
demand::Union{Missing, Float64}
return_factor::Float64
min_level::Float64
priority::Int32
priority::Union{Missing, Int32}
end

@version UserDemandTimeV1 begin
Expand All @@ -295,33 +295,33 @@ end
demand::Float64
return_factor::Float64
min_level::Float64
priority::Int32
priority::Union{Missing, Int32}
end

@version LevelDemandStaticV1 begin
node_id::Int32
min_level::Union{Missing, Float64}
max_level::Union{Missing, Float64}
priority::Int32
priority::Union{Missing, Int32}
end

@version LevelDemandTimeV1 begin
node_id::Int32
time::DateTime
min_level::Union{Missing, Float64}
max_level::Union{Missing, Float64}
priority::Int32
priority::Union{Missing, Int32}
end

@version FlowDemandStaticV1 begin
node_id::Int
demand::Float64
priority::Int32
priority::Union{Missing, Int32}
end

@version FlowDemandTimeV1 begin
node_id::Int
time::DateTime
demand::Float64
priority::Int32
priority::Union{Missing, Int32}
end
9 changes: 4 additions & 5 deletions core/src/util.jl
Original file line number Diff line number Diff line change
Expand Up @@ -446,11 +446,10 @@ function get_all_priorities(db::DB, config::Config)::Vector{Int32}
(FlowDemandStaticV1, "FlowDemand / static"),
(FlowDemandTimeV1, "FlowDemand / time"),
]
if valid_priorities(
load_structvector(db, config, type).priority,
config.allocation.use_allocation,
)
union!(priorities, load_structvector(db, config, type).priority)
priority_col = load_structvector(db, config, type).priority
priority_col = Int32.(coalesce.(priority_col, Int32(0)))
if valid_priorities(priority_col, config.allocation.use_allocation)
union!(priorities, priority_col)
else
is_valid = false
@error "Missing priority parameter(s) for a $name node in the allocation problem."
Expand Down
10 changes: 5 additions & 5 deletions core/src/validation.jl
visr marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -673,10 +673,10 @@ function valid_sources(
return !errors
end

function valid_priorities(priorities::Vector, use_allocation::Bool)::Bool
errors = false
if 0 in priorities && use_allocation
errors = true
function valid_priorities(priorities::Vector{Int32}, use_allocation::Bool)::Bool
if use_allocation && any(iszero, priorities)
return false
else
return true
visr marked this conversation as resolved.
Show resolved Hide resolved
end
return !errors
end
4 changes: 1 addition & 3 deletions core/test/validation_test.jl
Original file line number Diff line number Diff line change
Expand Up @@ -460,11 +460,9 @@ end
normpath(@__DIR__, "../../generated_testmodels/invalid_priorities/ribasim.toml")
@test ispath(toml_path)

config = Ribasim.Config(toml_path; allocation_use_allocation = true)

logger = TestLogger()
with_logger(logger) do
@test_throws "Priority parameter is missing" Ribasim.run(config)
@test_throws "Priority parameter is missing" Ribasim.run(toml_path)
end
@test length(logger.logs) == 3
@test logger.logs[1].level == Error
Expand Down
38 changes: 27 additions & 11 deletions docs/guide/examples.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,9 @@
"metadata": {},
"outputs": [],
"source": [
"df_basin = pd.read_feather(datadir / \"basic/results/basin.arrow\")\n",
"df_basin = pd.read_feather(\n",
" datadir / \"basic/results/basin.arrow\", dtype_backend=\"pyarrow\"\n",
")\n",
"df_basin_wide = df_basin.pivot_table(\n",
" index=\"time\", columns=\"node_id\", values=[\"storage\", \"level\"]\n",
")\n",
Expand All @@ -428,7 +430,7 @@
"metadata": {},
"outputs": [],
"source": [
"df_flow = pd.read_feather(datadir / \"basic/results/flow.arrow\")\n",
"df_flow = pd.read_feather(datadir / \"basic/results/flow.arrow\", dtype_backend=\"pyarrow\")\n",
"df_flow[\"edge\"] = list(zip(df_flow.from_node_id, df_flow.to_node_id))\n",
"df_flow[\"flow_m3d\"] = df_flow.flow_rate * 86400\n",
"ax = df_flow.pivot_table(index=\"time\", columns=\"edge\", values=\"flow_m3d\").plot()\n",
Expand Down Expand Up @@ -709,7 +711,9 @@
"metadata": {},
"outputs": [],
"source": [
"df_basin = pd.read_feather(datadir / \"level_range/results/basin.arrow\")\n",
"df_basin = pd.read_feather(\n",
" datadir / \"level_range/results/basin.arrow\", dtype_backend=\"pyarrow\"\n",
")\n",
"df_basin_wide = df_basin.pivot_table(\n",
" index=\"time\", columns=\"node_id\", values=[\"storage\", \"level\"]\n",
")\n",
Expand Down Expand Up @@ -991,7 +995,9 @@
"source": [
"from matplotlib.dates import date2num\n",
"\n",
"df_basin = pd.read_feather(datadir / \"pid_control/results/basin.arrow\")\n",
"df_basin = pd.read_feather(\n",
" datadir / \"pid_control/results/basin.arrow\", dtype_backend=\"pyarrow\"\n",
")\n",
"df_basin_wide = df_basin.pivot_table(\n",
" index=\"time\", columns=\"node_id\", values=[\"storage\", \"level\"]\n",
")\n",
Expand Down Expand Up @@ -1272,7 +1278,9 @@
"source": [
"import matplotlib.ticker as plticker\n",
"\n",
"df_allocation = pd.read_feather(datadir / \"allocation_example/results/allocation.arrow\")\n",
"df_allocation = pd.read_feather(\n",
" datadir / \"allocation_example/results/allocation.arrow\", dtype_backend=\"pyarrow\"\n",
")\n",
"df_allocation_wide = df_allocation.pivot_table(\n",
" index=\"time\",\n",
" columns=[\"node_type\", \"node_id\", \"priority\"],\n",
Expand Down Expand Up @@ -1318,7 +1326,9 @@
"metadata": {},
"outputs": [],
"source": [
"df_basin = pd.read_feather(datadir / \"allocation_example/results/basin.arrow\")\n",
"df_basin = pd.read_feather(\n",
" datadir / \"allocation_example/results/basin.arrow\", dtype_backend=\"pyarrow\"\n",
")\n",
"df_basin_wide = df_basin.pivot_table(\n",
" index=\"time\", columns=\"node_id\", values=[\"storage\", \"level\"]\n",
")\n",
Expand Down Expand Up @@ -1557,7 +1567,9 @@
"metadata": {},
"outputs": [],
"source": [
"df_basin = pd.read_feather(datadir / \"level_demand/results/basin.arrow\")\n",
"df_basin = pd.read_feather(\n",
" datadir / \"level_demand/results/basin.arrow\", dtype_backend=\"pyarrow\"\n",
")\n",
"df_basin = df_basin[df_basin.node_id == 2]\n",
"df_basin_wide = df_basin.pivot_table(\n",
" index=\"time\", columns=\"node_id\", values=[\"storage\", \"level\"]\n",
Expand Down Expand Up @@ -1953,7 +1965,7 @@
"outputs": [],
"source": [
"datadir_flow = datadir / \"local_pidcontrolled_cascade/results/flow.arrow\"\n",
"df_flow = pd.read_feather(datadir_flow)\n",
"df_flow = pd.read_feather(datadir_flow, dtype_backend=\"pyarrow\")\n",
"df_flow[\"edge\"] = list(zip(df_flow.from_node_id, df_flow.to_node_id))\n",
"df_flow[\"flow_m3d\"] = df_flow.flow_rate * 86400\n",
"\n",
Expand Down Expand Up @@ -1995,7 +2007,7 @@
"outputs": [],
"source": [
"datadir_basin = datadir / \"local_pidcontrolled_cascade/results/basin.arrow\"\n",
"df_basin = pd.read_feather(datadir_basin)\n",
"df_basin = pd.read_feather(datadir_basin, dtype_backend=\"pyarrow\")\n",
"df_basin[\"vertical_flux\"] = (\n",
" df_basin[\"precipitation\"]\n",
" - df_basin[\"evaporation\"]\n",
Expand Down Expand Up @@ -2060,7 +2072,9 @@
" Node(1, Point(0, 0)),\n",
" [\n",
" level_boundary.Time(\n",
" time=pd.date_range(start=\"2020-01-01\", end=\"2021-01-01\", periods=100),\n",
" time=pd.date_range(\n",
" start=\"2020-01-01\", end=\"2021-01-01\", periods=100, unit=\"ms\"\n",
" ),\n",
" level=6.0 + np.sin(np.linspace(0, 6 * np.pi, 100)),\n",
" )\n",
" ],\n",
Expand Down Expand Up @@ -2286,7 +2300,9 @@
"metadata": {},
"outputs": [],
"source": [
"df_flow = pd.read_feather(datadir / \"outlet_continuous_control/results/flow.arrow\")\n",
"df_flow = pd.read_feather(\n",
" datadir / \"outlet_continuous_control/results/flow.arrow\", dtype_backend=\"pyarrow\"\n",
")\n",
"fig, ax = plt.subplots()\n",
"\n",
"\n",
Expand Down
8 changes: 6 additions & 2 deletions docs/tutorial/irrigation-demand.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,9 @@
"metadata": {},
"outputs": [],
"source": [
"df_basin = pd.read_feather(base_dir / \"Crystal-2/results/basin.arrow\")\n",
"df_basin = pd.read_feather(\n",
" base_dir / \"Crystal-2/results/basin.arrow\", dtype_backend=\"pyarrow\"\n",
")\n",
"\n",
"# Create pivot tables and plot for basin data\n",
"df_basin_wide = df_basin.pivot_table(\n",
Expand Down Expand Up @@ -393,7 +395,9 @@
"metadata": {},
"outputs": [],
"source": [
"df_flow = pd.read_feather(base_dir / \"Crystal-2/results/flow.arrow\")\n",
"df_flow = pd.read_feather(\n",
" base_dir / \"Crystal-2/results/flow.arrow\", dtype_backend=\"pyarrow\"\n",
")\n",
"# Add the edge names and then remove unnamed edges\n",
"df_flow[\"name\"] = model.edge.df[\"name\"].loc[df_flow[\"edge_id\"]].to_numpy()\n",
"df_flow = df_flow[df_flow[\"name\"].astype(bool)]\n",
Expand Down
8 changes: 6 additions & 2 deletions docs/tutorial/natural-flow.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,9 @@
"metadata": {},
"outputs": [],
"source": [
"df_basin = pd.read_feather(base_dir / \"Crystal-1/results/basin.arrow\")\n",
"df_basin = pd.read_feather(\n",
" base_dir / \"Crystal-1/results/basin.arrow\", dtype_backend=\"pyarrow\"\n",
")\n",
"\n",
"# Create pivot tables and plot for Basin data\n",
"df_basin_wide = df_basin.pivot_table(\n",
Expand Down Expand Up @@ -453,7 +455,9 @@
"source": [
"# Plot flow data\n",
"# Read the flow results\n",
"df_flow = pd.read_feather(base_dir / \"Crystal-1/results/flow.arrow\")\n",
"df_flow = pd.read_feather(\n",
" base_dir / \"Crystal-1/results/flow.arrow\", dtype_backend=\"pyarrow\"\n",
")\n",
"# Add the edge names and then remove unnamed edges\n",
"df_flow[\"name\"] = model.edge.df[\"name\"].loc[df_flow[\"edge_id\"]].to_numpy()\n",
"df_flow = df_flow[df_flow[\"name\"].astype(bool)]\n",
Expand Down
8 changes: 6 additions & 2 deletions docs/tutorial/reservoir.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,9 @@
"metadata": {},
"outputs": [],
"source": [
"df_basin = pd.read_feather(base_dir / \"Crystal-3/results/basin.arrow\")\n",
"df_basin = pd.read_feather(\n",
" base_dir / \"Crystal-3/results/basin.arrow\", dtype_backend=\"pyarrow\"\n",
")\n",
"\n",
"# Create pivot tables and plot for Basin data\n",
"df_basin_wide = df_basin.pivot_table(\n",
Expand Down Expand Up @@ -329,7 +331,9 @@
"metadata": {},
"outputs": [],
"source": [
"df_flow = pd.read_feather(base_dir / \"Crystal-3/results/flow.arrow\")\n",
"df_flow = pd.read_feather(\n",
" base_dir / \"Crystal-3/results/flow.arrow\", dtype_backend=\"pyarrow\"\n",
")\n",
"# Add the edge names and then remove unnamed edges\n",
"df_flow[\"name\"] = model.edge.df[\"name\"].loc[df_flow[\"edge_id\"]].to_numpy()\n",
"df_flow = df_flow[df_flow[\"name\"].astype(bool)]\n",
Expand Down
2 changes: 2 additions & 0 deletions pixi.toml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ test-ribasim-regression = { cmd = "julia --project=core --eval 'using Pkg; Pkg.t
generate-testmodels = { cmd = "python utils/generate-testmodels.py", inputs = [
"python/ribasim",
"python/ribasim_testmodels",
"utils/generate-testmodels.py",
], outputs = [
"generated_testmodels",
] }
Expand All @@ -99,6 +100,7 @@ codegen = { cmd = "julia --project utils/gen_python.jl && ruff format python/rib
"initialize-julia",
], inputs = [
"core",
"utils",
], outputs = [
"python/ribasim/ribasim/schemas.py",
"python/ribasim/ribasim/validation.py",
Expand Down
13 changes: 6 additions & 7 deletions python/ribasim/ribasim/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
UserDemandStaticSchema,
UserDemandTimeSchema,
)
from ribasim.utils import _pascal_to_snake
from ribasim.utils import _concat, _pascal_to_snake


class Allocation(ChildModel):
Expand Down Expand Up @@ -242,11 +242,10 @@ def add(
)
assert table.df is not None
table_to_append = table.df.assign(node_id=node_id)
setattr(
self,
member_name,
pd.concat([existing_table, table_to_append], ignore_index=True),
)
if isinstance(table_to_append, GeoDataFrame):
table_to_append.set_crs(self._parent.crs, inplace=True)
new_table = _concat([existing_table, table_to_append], ignore_index=True)
setattr(self, member_name, new_table)

node_table = node.into_geodataframe(
node_type=self.__class__.__name__, node_id=node_id
Expand All @@ -255,7 +254,7 @@ def add(
if self.node.df is None:
self.node.df = node_table
else:
df = pd.concat([self.node.df, node_table])
df = _concat([self.node.df, node_table])
self.node.df = df

self._parent._used_node_ids.add(node_id)
Expand Down
25 changes: 15 additions & 10 deletions python/ribasim/ribasim/delwaq/generate.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from datetime import timedelta
from pathlib import Path

from ribasim.utils import MissingOptionalModule
from ribasim.utils import MissingOptionalModule, _concat

try:
import networkx as nx
Expand Down Expand Up @@ -70,17 +70,18 @@
"""
bid = _boundary_name(data.node_id.iloc[0], boundary_type)
piv = (
data.pivot_table(index="time", columns="substance", values="concentration")
data.pivot_table(
index="time", columns="substance", values="concentration", fill_value=-999
)
.reset_index()
.reset_index(drop=True)
)
piv.time = piv.time.dt.strftime("%Y/%m/%d-%H:%M:%S")
# Convert Arrow time to Numpy to avoid needing tzdata somehow
piv.time = piv.time.astype("datetime64[ns]").dt.strftime("%Y/%m/%d-%H:%M:%S")

Check warning on line 80 in python/ribasim/ribasim/delwaq/generate.py

View check run for this annotation

Codecov / codecov/patch

python/ribasim/ribasim/delwaq/generate.py#L80

Added line #L80 was not covered by tests
boundary = {
"name": bid,
"substances": list(map(_quote, piv.columns[1:])),
"df": piv.to_string(
formatters={"time": _quote}, header=False, index=False, na_rep=-999
),
"df": piv.to_string(formatters={"time": _quote}, header=False, index=False),
}
evetion marked this conversation as resolved.
Show resolved Hide resolved
substances = data.substance.unique()
return boundary, substances
Expand Down Expand Up @@ -181,7 +182,7 @@
boundary_id -= 1
node_mapping[node_id] = boundary_id
else:
raise Exception("Found unexpected node $node_id in delwaq graph.")
raise Exception(f"Found unexpected node {node_id} in delwaq graph.")

Check warning on line 185 in python/ribasim/ribasim/delwaq/generate.py

View check run for this annotation

Codecov / codecov/patch

python/ribasim/ribasim/delwaq/generate.py#L185

Added line #L185 was not covered by tests

nx.relabel_nodes(G, node_mapping, copy=False)

Expand Down Expand Up @@ -281,8 +282,12 @@

# Read in model and results
model = ribasim.Model.read(toml_path)
basins = pd.read_feather(toml_path.parent / "results" / "basin.arrow")
flows = pd.read_feather(toml_path.parent / "results" / "flow.arrow")
basins = pd.read_feather(

Check warning on line 285 in python/ribasim/ribasim/delwaq/generate.py

View check run for this annotation

Codecov / codecov/patch

python/ribasim/ribasim/delwaq/generate.py#L285

Added line #L285 was not covered by tests
toml_path.parent / "results" / "basin.arrow", dtype_backend="pyarrow"
)
flows = pd.read_feather(

Check warning on line 288 in python/ribasim/ribasim/delwaq/generate.py

View check run for this annotation

Codecov / codecov/patch

python/ribasim/ribasim/delwaq/generate.py#L288

Added line #L288 was not covered by tests
toml_path.parent / "results" / "flow.arrow", dtype_backend="pyarrow"
)

output_folder.mkdir(exist_ok=True)

Expand Down Expand Up @@ -359,7 +364,7 @@
columns={boundary_type: "flow_rate"}
)
df["edge_id"] = edge_id
nflows = pd.concat([nflows, df], ignore_index=True)
nflows = _concat([nflows, df], ignore_index=True)

Check warning on line 367 in python/ribasim/ribasim/delwaq/generate.py

View check run for this annotation

Codecov / codecov/patch

python/ribasim/ribasim/delwaq/generate.py#L367

Added line #L367 was not covered by tests

# Save flows to Delwaq format
nflows.sort_values(by=["time", "edge_id"], inplace=True)
Expand Down
Loading