-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Anisotropic regularization parameter #214
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,8 +11,8 @@ | |
|
||
|
||
def split_bregman_tvd( | ||
img: np.ndarray, | ||
mu: Union[float, np.ndarray] = 1.0, | ||
img: Union[np.ndarray, da.Image], | ||
mu: Union[float, np.ndarray, list] = 1.0, | ||
omega: Union[float, np.ndarray] = 1.0, | ||
ell: Optional[Union[float, np.ndarray]] = None, | ||
dim: int = 2, | ||
|
@@ -31,8 +31,10 @@ def split_bregman_tvd( | |
shrinkage step. The regularization term is weighted by the parameter ell. | ||
|
||
Args: | ||
img (array): image array | ||
mu (float or array): TV penalization parameter | ||
img (array, da.Image): image | ||
mu (float or array or list): inverse TV penalization parameter, if list | ||
it must have the lenght of the dimension and each entry corresponds | ||
to the penalization in the respective direction. | ||
omega (float or array): mass penalization parameter | ||
ell (float or array): regularization parameter | ||
dim (int): spatial dimension of the image | ||
|
@@ -48,16 +50,26 @@ def split_bregman_tvd( | |
array: denoised image | ||
|
||
""" | ||
|
||
# Keep track of input type and convert input image to float for further calculations | ||
img_dtype = img.dtype | ||
|
||
# Store input image and its norm for convergence check | ||
img_nrm = np.linalg.norm(img) | ||
|
||
# Controll that mu has correct length if its a list | ||
if isinstance(mu, list): | ||
assert len(mu) == dim, "mu must be a list of length dim" | ||
|
||
# Feed the solver with parameters, follow the suggestion to use double the weight | ||
# for the diffusion coefficient if no value is provided. | ||
# for the mass coefficient if no value is provided. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would suggest to enhance the comment to explain why additionally ell is set to 1 if it is zero. Furthermore, I would use an inexact check whether ell is (almost) zero. |
||
if ell is None: | ||
ell = 2 * mu | ||
if isinstance(omega, float): | ||
ell = 2 * omega | ||
elif isinstance(omega, np.ndarray): | ||
ell = 2 * omega.copy() | ||
ell[ell == 0] = 1 | ||
|
||
solver.update_params( | ||
mass_coeff=omega, | ||
diffusion_coeff=ell, | ||
|
@@ -66,12 +78,20 @@ def split_bregman_tvd( | |
|
||
# Define energy functional for verbose | ||
def _functional(x: np.ndarray) -> float: | ||
return 0.5 * np.linalg.norm(omega * (x - img)) ** 2 + sum( | ||
[ | ||
np.sum(np.abs(mu * da.backward_diff(img=x, axis=j, dim=dim))) | ||
for j in range(dim) | ||
] | ||
) | ||
if isinstance(mu, list): | ||
return 0.5 * np.linalg.norm(omega * (x - img)) ** 2 + sum( | ||
[ | ||
np.sum(np.abs(mu[j] * da.backward_diff(img=x, axis=j, dim=dim))) | ||
for j in range(dim) | ||
] | ||
) | ||
else: | ||
return 0.5 * np.linalg.norm(omega * (x - img)) ** 2 + sum( | ||
[ | ||
np.sum(np.abs(mu * da.backward_diff(img=x, axis=j, dim=dim))) | ||
for j in range(dim) | ||
] | ||
) | ||
|
||
# Define shrinkage operator (shrinks element-wise by k) | ||
def _shrink(x: np.ndarray, k: Union[float, np.ndarray]) -> np.ndarray: | ||
|
@@ -86,7 +106,7 @@ def _rhs_function(dt: np.ndarray, bt: np.ndarray) -> np.ndarray: | |
] | ||
) | ||
|
||
# Define initial guess if provided, otherwise start with input image and allovate | ||
# Define initial guess if provided, otherwise start with input image and allocate | ||
# zero arrays for the split Bregman variables. | ||
if x0 is not None: | ||
img0, d0, b0 = x0 | ||
|
@@ -115,6 +135,11 @@ def _rhs_function(dt: np.ndarray, bt: np.ndarray) -> np.ndarray: | |
shrinkage_factor = np.maximum(s - mu / ell, 0) / (s + 1e-18) | ||
d = dub * shrinkage_factor[..., None] | ||
b = dub - d | ||
elif isinstance(mu, list): | ||
for j in range(dim): | ||
dub = da.backward_diff(img=img_new, axis=j, dim=dim) + b[..., j] | ||
d[..., j] = _shrink(dub, mu[j] / ell) | ||
b[..., j] = dub - d[..., j] | ||
else: | ||
for j in range(dim): | ||
dub = da.backward_diff(img=img_new, axis=j, dim=dim) + b[..., j] | ||
|
@@ -123,16 +148,17 @@ def _rhs_function(dt: np.ndarray, bt: np.ndarray) -> np.ndarray: | |
|
||
# Monitor performance | ||
relative_increment = np.linalg.norm(img_new - img_iter) / img_nrm | ||
|
||
# Update of result | ||
img_iter = img_new.copy() | ||
|
||
if verbose if isinstance(verbose, bool) else iter % verbose == 0: | ||
print( | ||
f"""Split Bregman iteration {iter} - """ | ||
f"""relative increment: {relative_increment}, """ | ||
f"""relative increment: {round(relative_increment,5)}, """ | ||
f"""energy functional: {_functional(img_iter)}""" | ||
) | ||
|
||
# Update of result | ||
img_iter = img_new.copy() | ||
|
||
# Convergence check | ||
if eps is not None: | ||
if relative_increment < eps: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,7 +3,7 @@ | |
""" | ||
from __future__ import annotations | ||
|
||
from typing import Union | ||
from typing import Optional, Union | ||
|
||
import numpy as np | ||
|
||
|
@@ -21,7 +21,9 @@ class Jacobi(da.Solver): | |
|
||
""" | ||
|
||
def _neighbor_accumulation(self, im: np.ndarray) -> np.ndarray: | ||
def _neighbor_accumulation( | ||
self, im: np.ndarray, h: Optional[Union[float, list]] = None | ||
) -> np.ndarray: | ||
"""Accumulation of neighbor pixels. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Better documentation would be good here. What is "accumulation" and how does it depend on the grid size provided? |
||
|
||
Accumulates for each entry, the entries of neighbors, regardless of dimension. | ||
|
@@ -37,44 +39,72 @@ def _neighbor_accumulation(self, im: np.ndarray) -> np.ndarray: | |
|
||
""" | ||
im_av: np.ndarray = np.zeros_like(im) | ||
for ax in range(im.ndim): | ||
im_av += np.concatenate( | ||
(da.array_slice(im, ax, 0, 1), da.array_slice(im, ax, 0, -1)), axis=ax | ||
) + np.concatenate( | ||
(da.array_slice(im, ax, 1, None), da.array_slice(im, ax, -1, None)), | ||
axis=ax, | ||
) | ||
if h is None: | ||
for ax in range(self.dim): | ||
im_av += np.concatenate( | ||
(da.array_slice(im, ax, 0, 1), da.array_slice(im, ax, 0, -1)), | ||
axis=ax, | ||
) + np.concatenate( | ||
(da.array_slice(im, ax, 1, None), da.array_slice(im, ax, -1, None)), | ||
axis=ax, | ||
) | ||
else: | ||
if isinstance(h, float): | ||
h = [h] * self.dim | ||
for ax in range(self.dim): | ||
im_av += ( | ||
np.concatenate( | ||
(da.array_slice(im, ax, 0, 1), da.array_slice(im, ax, 0, -1)), | ||
axis=ax, | ||
) | ||
+ np.concatenate( | ||
( | ||
da.array_slice(im, ax, 1, None), | ||
da.array_slice(im, ax, -1, None), | ||
), | ||
axis=ax, | ||
) | ||
) / h[ax] ** 2 | ||
return im_av | ||
|
||
def _diag(self, h: float = 1) -> Union[float, np.ndarray]: | ||
def _diag(self, h: Optional[Union[float, list]] = None) -> Union[float, np.ndarray]: | ||
""" | ||
Compute diagonal of the stiffness matrix. | ||
|
||
The stiffness matrix is understood in finite difference sense, with the | ||
possibility to identify pixel sizes with phyiscal mesh sizes. This is a helper | ||
possibility to identify pixel sizes with phyiscal voxel sizes. This is a helper | ||
function for the main Jacobi solver. | ||
|
||
Args: | ||
h (float): mesh diameter | ||
h (float): voxel size | ||
|
||
Returns: | ||
Union[float, np.ndarray]: diagonal of the stiffness matrix | ||
|
||
""" | ||
return self.mass_coeff + self.diffusion_coeff * 2 * self.dim / h**2 | ||
if h is None: | ||
voxel_weights = self.dim | ||
else: | ||
if isinstance(h, float): | ||
voxel_weights = self.dim / (h**2) | ||
else: | ||
assert len(h) == self.dim, "h must be a float or a list of length dim" | ||
voxel_weights = sum([1 / h[i] ** 2 for i in range(self.dim)]) | ||
|
||
return self.mass_coeff + 2 * self.diffusion_coeff * voxel_weights | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should the 2 be moved into the definition of voxel_weights in line 89? |
||
|
||
def __call__( | ||
self, | ||
x0: np.ndarray, | ||
rhs: np.ndarray, | ||
h: float = 1.0, | ||
h: Optional[Union[float, list]] = None, | ||
) -> np.ndarray: | ||
"""One iteration of a Jacobi solver for linear systems. | ||
|
||
Args: | ||
x0 (np.ndarray): initial guess | ||
rhs (np.ndarray): right hand side of the linear system | ||
h (float): mesh diameter | ||
h (optional[float, list]): voxel size | ||
|
||
Returns: | ||
np.ndarray: solution to the linear system | ||
|
@@ -88,14 +118,14 @@ def __call__( | |
if self.tol is None: | ||
for _ in range(self.maxiter): | ||
x = ( | ||
rhs + self.diffusion_coeff * self._neighbor_accumulation(x) / h**2 | ||
rhs + self.diffusion_coeff * self._neighbor_accumulation(x, h) | ||
) / diag | ||
if self.verbose: | ||
print(f"Jacobi iteration {_} of {self.maxiter} completed.") | ||
else: | ||
for _ in range(self.maxiter): | ||
x_new = ( | ||
rhs + self.diffusion_coeff * self._neighbor_accumulation(x) / h**2 | ||
rhs + self.diffusion_coeff * self._neighbor_accumulation(x, h) | ||
) / diag | ||
err = np.linalg.norm(x_new - x) / np.linalg.norm(x0) | ||
if err < self.tol: | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add comment like # Regularize image using isotropic tvd