Skip to content

Commit

Permalink
Merge pull request #6 from schroeme/Mansour/alignment_eval
Browse files Browse the repository at this point in the history
 📐 Introducing Alignment Evaluation Support
  • Loading branch information
mansouralawi authored Jul 28, 2023
2 parents 7540e3c + 0837191 commit d3c9fc5
Show file tree
Hide file tree
Showing 3 changed files with 259 additions and 1 deletion.
3 changes: 2 additions & 1 deletion exr/align/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
from exr.align.align import *
from exr.align.align import *
from exr.align.align_eval import *
63 changes: 63 additions & 0 deletions exr/align/align_eval.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import h5py
import numpy as np
from exr.align.align_utils import alignment_NCC
from exr.utils import configure_logger
from exr.config import Config
from typing import List

logger = configure_logger('ExR-Tools')


def measure_round_alignment_NCC(config: Config, round: int, roi: int) -> List[float]:
r"""
Measures the alignment of a specific round and ROI (Region Of Interest) against a reference round using Normalized Cross-Correlation (NCC).
:param config: Configuration options.
:type config: Config
:param round: The round to measure alignment for.
:type round: int
:param roi: The ROI to measure alignment for.
:type roi: int
:return: List of distance errors after alignment.
:rtype: List[float]
"""
distance_errors = []
logger.info(
f"Alignment Evaluation: Analyzing alignment between ref round:{config.ref_round} and round:{round} - ROI:{roi}")

try:
with h5py.File(config.h5_path.format(config.ref_round, roi), "r") as f:
ref_vol = f[config.ref_channel][()]

with h5py.File(config.h5_path.format(round, roi), "r") as f:
aligned_vol = f[config.ref_channel][()]

if np.count_nonzero(aligned_vol) > config.nonzero_thresh:
ref_vol = (ref_vol - np.min(ref_vol)) / \
(np.max(ref_vol) - np.min(ref_vol))
aligned_vol = (aligned_vol - np.min(aligned_vol)) / \
(np.max(aligned_vol) - np.min(aligned_vol))
keepers = []

for zz in range(aligned_vol.shape[0]):
if np.count_nonzero(aligned_vol[zz, :, :]) > 0:
keepers.append(zz)

logger.info(
f"Alignment Evaluation: Round:{round} - ROI:{roi}, {len(keepers)} slices of {aligned_vol.shape[0]} kept.")

if len(keepers) < 10:
logger.info(
f"Alignment Evaluation: Round:{round} - ROI:{roi}, fewer than 10 slices. Skipping evaluation...")
else:
ref_vol = ref_vol[keepers, :, :]
aligned_vol = aligned_vol[keepers, :, :]

distance_errors = alignment_NCC(config, ref_vol, aligned_vol)

return distance_errors

except Exception as e:
logger.error(
f"Error during NCC alignment measurement for Round: {round}, ROI: {roi}, Error: {e}")
raise
194 changes: 194 additions & 0 deletions exr/align/align_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
import numpy as np
from skimage.filters import threshold_otsu
from scipy.fftpack import fftn, ifftn
from exr.config import Config
from typing import List, Dict

from exr.utils import configure_logger

logger = configure_logger('ExR-Tools')

def template_matching(T: np.ndarray, I: np.ndarray, IdataIn: Dict = None) -> np.ndarray:
"""
Implements template matching between two images using Fourier transform.
:param T: The template image.
:type T: np.ndarray
:param I: The image to be matched.
:type I: np.ndarray
:param IdataIn: A dictionary containing additional image data (optional).
:type IdataIn: Dict, optional
:return: The Normalized Cross-Correlation (NCC) between the template and the image.
:rtype: np.ndarray
"""
def unpadarray(A: np.ndarray, Bsize: np.ndarray) -> np.ndarray:
Bsize = np.array(Bsize)
Bstart = np.ceil((np.array(A.shape) - Bsize) / 2).astype(int)
Bend = Bstart + Bsize
if len(A.shape) == 2:
B = A[Bstart[0]:Bend[0], Bstart[1]:Bend[1]]
elif len(A.shape) == 3:
B = A[Bstart[0]:Bend[0], Bstart[1]:Bend[1], Bstart[2]:Bend[2]]
return B

def local_sum(I: np.ndarray, T_size: np.ndarray) -> np.ndarray:
T_size = np.array(T_size)
# Add padding to the image
B = np.pad(I, ((T_size[0], T_size[0]), (T_size[1], T_size[1]), (T_size[2], T_size[2])), mode='constant')

s = np.cumsum(B, axis=0)
c = s[T_size[0]:-1, :, :] - s[:-T_size[0]-1, :, :]
s = np.cumsum(c, axis=1)
c = s[:, T_size[1]:-1, :] - s[:, :-T_size[1]-1, :]
s = np.cumsum(c, axis=2)
local_sum_I = s[:, :, T_size[2]:-1] - s[:, :, :-T_size[2]-1]

return local_sum_I

try:
# Convert images to double
T = T.astype(float)
I = I.astype(float)

T_size = np.array(T.shape)
I_size = np.array(I.shape)
outsize = I_size + T_size - 1
Idata = {}

# calculate correlation in frequency domain
FT = fftn(np.flip(T, axis=(0, 1, 2)), outsize)
FI = fftn(I, outsize)
Icorr = np.real(ifftn(FI * FT))

# Calculate Local Quadratic sum of Image and Template
if IdataIn is None or 'LocalQSumI' not in IdataIn:
Idata['LocalQSumI'] = local_sum(I * I, T_size)
else:
Idata['LocalQSumI'] = IdataIn['LocalQSumI']

QSumT = np.sum(T**2)

# SSD between template and image
I_SSD = Idata['LocalQSumI'] + QSumT - 2 * Icorr

# Normalize to range 0..1
I_SSD = I_SSD - np.min(I_SSD)
I_SSD = 1 - I_SSD / np.max(I_SSD)

# Remove padding
I_SSD = unpadarray(I_SSD, I_size)

if len(Idata) > 0:
# Normalized cross correlation STD
if 'LocalSumI' not in IdataIn:
Idata['LocalSumI'] = local_sum(I, T_size)
else:
Idata['LocalSumI'] = IdataIn['LocalSumI']

# Standard deviation
if 'stdI' not in IdataIn:
Idata['stdI'] = np.sqrt(np.maximum(Idata['LocalQSumI'] - (Idata['LocalSumI']**2) / np.prod(T_size), 0))
else:
Idata['stdI'] = IdataIn
stdT = np.sqrt(T.size - 1) * np.std(T, ddof=1)

# Mean compensation
meanIT = Idata['LocalSumI'] * np.sum(T) / np.prod(T_size)
I_NCC = 0.5 + (Icorr - meanIT) / (2 * stdT * np.maximum(Idata['stdI'], stdT / 100000))

# Remove padding
I_NCC = unpadarray(I_NCC, I_size)

return I_NCC

except Exception as e:
logger.error(f"Error during template matching, Error: {e}")
raise


def alignment_NCC(config: Config, vol1: np.ndarray, vol2: np.ndarray) -> List[float]:
r"""
Measures the alignment of two images using Normalized Cross-Correlation (NCC). expected shape `[Z,Y,X]`
:param config: Configuration options.
:type config: Config
:param vol1: The first volume (reference volume) for alignment comparison.
:type vol1: np.ndarray
:param vol2: The second volume (aligned volume) for alignment comparison.
:type vol2: np.ndarray
:return: List of distance errors after alignment.
:rtype: List[float]
"""
xy_vol_half = int(config.subvol_dim/2)
z_vol_half = int(min(np.floor(vol1.shape[0]/2)-1,np.floor(xy_vol_half*(config.xystep/config.zstep))))

offsets_total = np.full((config.N, 3), -30)

# Calculate the percentile of the values in vol1 and vol2 specified by config.pct_thresh
thresh_vol1 = np.percentile(vol1, config.pct_thresh)
thresh_vol2 = np.percentile(vol2, config.pct_thresh)

xpos = np.random.randint(0, vol1.shape[2], config.N)
ypos = np.random.randint(0, vol1.shape[1], config.N)
zpos = np.random.randint(0, vol1.shape[0], config.N)

i = 0
while i < config.N:
try:
# Generate new random positions for xpos and ypos
xpos[i] = np.random.randint(0, vol1.shape[2], 1)[0]
ypos[i] = np.random.randint(0, vol1.shape[1], 1)[0]

# If the first dimension of vol1 is less than 50, set zpos[i] to the middle
# Otherwise, generate a new random position for zpos
if vol1.shape[0] < 50:
zpos[i] = vol1.shape[0] // 2
else:
zpos[i] = np.random.randint(0, vol1.shape[0],1)[0]

# Check that the random position is within bounds
if not (xpos[i] > xy_vol_half and xpos[i] < vol1.shape[2] - xy_vol_half):
continue
elif not (ypos[i] > xy_vol_half and ypos[i] < vol1.shape[1] - xy_vol_half):
continue
elif not (zpos[i] > z_vol_half and zpos[i] < vol1.shape[0] - z_vol_half):
continue

# Create the subvolumes
subvolume1 = vol1[zpos[i]-z_vol_half:zpos[i]+z_vol_half+1,
xpos[i]-xy_vol_half:xpos[i]+xy_vol_half+1,
ypos[i]-xy_vol_half:ypos[i]+xy_vol_half+1
]

subvolume2 = vol2[zpos[i]-z_vol_half:zpos[i]+z_vol_half+1,
xpos[i]-xy_vol_half:xpos[i]+xy_vol_half+1,
ypos[i]-xy_vol_half:ypos[i]+xy_vol_half+1
]

# Flatten the subvolumes
subvec1 = subvolume1.flatten()
subvec2 = subvolume2.flatten()
# Binarize the subvolumes using Otsu's method
thresh_vol1 = threshold_otsu(subvec1)
thresh_vol2 = threshold_otsu(subvec2)
subvec1_bin = subvec1 > thresh_vol1
subvec2_bin = subvec2 > thresh_vol2

# Check if the subvolumes contain enough non-zero pixels
if np.mean(subvec1_bin) <= 0.01 or np.mean(subvec2_bin) <= 0.01:
continue

I_NCC = template_matching(subvolume1,subvolume2)
idx = np.unravel_index(np.argmax(I_NCC, axis=None), I_NCC.shape)
offsets_total[i, :] = np.array(idx) - np.ceil(np.array(I_NCC.shape) / 2)

i += 1

except Exception as e:
logger.error(
f"Error during NCC alignment calculation, Error: {e}")
raise

distance_errors = np.sqrt((config.xystep * offsets_total[:, 0]) ** 2 + (config.xystep * offsets_total[:, 1]) ** 2 + (config.zstep * offsets_total[:, 2]) ** 2)

return distance_errors

0 comments on commit d3c9fc5

Please sign in to comment.