'''
Fullmatch
~~~~~~~~~
Code to extract parallelogram grids from point clouds. This
can guess lattices from a set of diffraction spots using clustering.
'''
import logging
import numpy as np
try:
import hdbscan
except ModuleNotFoundError:
raise ModuleNotFoundError(
"The fullmatch module requires the hdbscan extra, "
"please install -blobfinder with [hdbscan]."
) from None
import libertem_blobfinder.common.gridmatching as grm
from libertem_blobfinder.base.utils import make_polar, make_cartesian
log = logging.getLogger(__name__)
[docs]class NotFoundException(Exception):
pass
[docs]class FullMatcher(grm.Matcher):
'''
Extension of :class:`~libertem_blobfinder.common.gridmatching.Matcher` will full matching
Include the ability to guess grid parameters from a point cloud. This is separated
from the other code since it currently only works with :class:`~hdbscan.HDBSCAN`,
which can be problematic
to install on some platforms. For that reason it is an optional dependency.
Parameters
----------
tolerance : float
Position tolerance in px for peaks to be considered matches
min_weight : float
Minimum peak elevation of a peak to be considered for matching
min_match : int
Minimum number of matching peaks to be considered a match.
min_angle : float
Minimum angle in radians between two vectors to be considered candidates
min_points : int
Minimum points to try clustering matching. Otherwise match directly
min_delta : float
Minimum length of a potential grid vector
max_delta : float
Maximum length of a potential grid vector
min_candidates : int
Minimum number of candidates to consider clustering matching successful.
If not enough are found, the algorithm uses a brute-force search with all
pairwise vectors between points
max_candidates : int
Maximum number of candidates to return from clustering matching
clusterer
Instance of sklearn.cluster compatible clusterer. Default is :class:`~hdbscan.HDBSCAN`.
min_cluster_size_fraction : float
Tuning parameter for clustering matching with :class:`~hdbscan.HDBSCAN`.
Larger values allow
smaller or fuzzier clusters. This is used to adapt the :code:`min_cluster_size`
parameter of :class:`~hdbscan.HDBSCAN` dynamically to the number of points to be
matched.
Set this to :code:`None` to disable dynamic adjustment of :code:`min_cluster_size`.
If you like to set :code:`min_cluster_size` to a constant value, you can
set this to :code:`None` and additionally set the :code:`clusterer` parameter with
your own clusterer object to have direct control over all parameters.
min_samples_fraction : float
Tuning parameter for clustering matching with :class:`~hdbscan.HDBSCAN`.
Larger values allow
smaller or fuzzier clusters. This is used to adapt the :code:`min_samples`
parameter of :class:`~hdbscan.HDBSCAN` dynamically to the number of points to be
matched.
Set this to :code:`None` to disable dynamic adjustment of :code:`min_samples`.
If you like to set :code:`min_samples` to a constant value, you can
set this to :code:`None` and additionally set the :code:`clusterer` parameter with
your own clusterer object to have direct control over all parameters.
'''
def __init__(
self, tolerance=3, min_weight=0.1, min_match=3, min_angle=np.pi/10,
min_points=10, min_delta=0, max_delta=np.inf, min_candidates=3,
max_candidates=7, clusterer=None, min_cluster_size_fraction=4,
min_samples_fraction=20):
super().__init__(tolerance=tolerance, min_weight=min_weight, min_match=min_match)
if clusterer is None:
clusterer = hdbscan.HDBSCAN()
self.min_angle = min_angle
self.min_points = min_points
self.min_delta = min_delta
self.max_delta = max_delta
self.min_candidates = min_candidates
self.max_candidates = max_candidates
self.clusterer = clusterer
self.min_cluster_size_fraction = min_cluster_size_fraction
self.min_samples_fraction = min_samples_fraction
[docs] def full_match(
self, centers, zero=None, cand=None,
refineds=None, peak_values=None, peak_elevations=None):
'''
This function extracts a list of Match objects as well two PointSelection objects
for unmatched and weak points from correlation_result and zero point.
The zero point is included in each of the matches because it is shared between all grids.
Parameters
----------
centers : numpy.ndarray
numpy.ndarray of shape (n, 2) with integer centers (y, x) of peaks. This would typically
be extracted with :meth:`libertem_blobfinder.common.correlation.get_peaks`
zero : numpy.ndarray
Zero point as numpy array (y, x).
cand : list or numpy.ndarray
Optional list of candidate vectors (y, x) to use in a first matching round before
guessing.
refineds : numpy.ndarray
numpy.ndarray of shape (n, 2) with float centers (y, x) of peaks (subpixel refinement)
peak_values : numpy.ndarray
numpy.ndarray of shape (n,) with float maxima of correlation map of peaks
peak_elevations : numpy.ndarray
numpy.ndarray of shape (n,) with float elevation of correlation map of peaks.
See :meth:`libertem_blobfinder.base.correlation.peak_elevation` for details.
Returns
-------
Tuple[List[libertem_blobfinder.common.gridmatching.Match, ...],\
libertem_blobfinder.common.gridmatching.PointSelection,\
libertem_blobfinder.common.gridmatching.PointSelection]
matches: list of :class:`~libertem_blobfinder.common.gridmatching.Match` instances,
unmatched: instance of :class:`~libertem_blobfinder.common.gridmatching.PointSelection`,
weak: instance of :class:`~libertem_blobfinder.common.gridmatching.PointSelection`
Example
-------
>>> peaks = np.array([
... # First peak is zero if not specified otherwise
... # Base lattice vectors (32, 0) and (0, 32)
... (64, 64),
... (32, 32), (32, 64), (32, 96),
... (64, 32), (64, 96),
... (96, 32), (96, 64), (96, 96),
... ])
>>> matcher = FullMatcher()
>>> (matches, unmatched, weak) = matcher.full_match(peaks)
>>> m = matches[0]
>>> assert np.allclose(m.zero, (64, 64))
>>> assert np.allclose(m.a, (32, 0))
>>> assert np.allclose(m.b, (0, 32))
'''
class ExitException(Exception):
pass
if zero is None:
zero = centers[0]
corr = grm.CorrelationResult(
centers=centers,
refineds=refineds,
peak_values=peak_values,
peak_elevations=peak_elevations,
)
matches = []
filt = corr.peak_elevations >= self.min_weight
working_set = grm.PointSelection(corr, selector=filt)
zero_selector = np.array([
np.allclose(corr.centers[i], zero)
+ np.allclose(corr.refineds[i], zero)
for i in range(len(corr))
], dtype=bool)
def listed(working_set, polar_cand):
return polar_cand
def guess(working_set, polar_cand):
return self._candidates(working_set.refineds)
if cand is not None:
polar_cand = size_filter(
make_polar(np.array(cand)),
min_delta=self.min_delta,
max_delta=self.max_delta
)
candidate_methods = [listed, guess]
else:
polar_cand = None
candidate_methods = [guess]
while True:
new_selector = np.copy(working_set.selector)
# First, find good candidate
# Expensive operation, should be done on smaller sample
# or sum frame result, at least for first passes to match majority
# of peaks
polar_candidate_vectors = candidate_methods[0](working_set, polar_cand)
match = self._find_best_vector_match(
point_selection=working_set, zero=zero,
candidates=polar_candidate_vectors)
if match is None:
candidate_methods = candidate_methods[1:]
if len(candidate_methods) == 0:
break
else:
continue
matches.append(match)
# remove the ones that have been matched
new_selector[match.selector] = False
if np.count_nonzero(new_selector) >= self.min_match:
# Add zero point that is shared by all patterns
new_selector[zero_selector] = True
working_set = working_set.derive(selector=new_selector)
else:
break
if matches:
new_selector[zero_selector] = False
unmatched = working_set.derive(selector=new_selector)
weak = grm.PointSelection(corr, selector=np.logical_not(filt))
return (matches, unmatched, weak)
[docs] def make_polar_vectors(self, coords):
'''
Calculate all unique pairwise connecting polar vectors between points in coords.
The pairwise connecting vectors are converted to polar coordinates and
filtered with parameters :py:attr:`~min_delta` and :py:attr:`~max_delta`
to avoid calculating for unwanted higher order or random smaller vectors.
All calculated vectors have a positive or zero x direction.
'''
# sort by x coordinate so that we have always positive x difference vectors
sort_indices = np.argsort(coords[:, 1])
coords = coords[sort_indices]
i, j = np.mgrid[0: len(coords), 0: len(coords)]
selector = j > i
deltas = coords[j[selector]] - coords[i[selector]]
polar = make_polar(deltas)
return size_filter(polar, self.min_delta, self.max_delta)
def check(self, match):
if len(match) < self.min_match:
return False
papb = make_polar(np.array([match.a, match.b]))
if len(size_filter(papb, self.min_delta, self.max_delta)) != 2:
return False
return angle_check(papb[0:1], papb[1:2], self.min_angle)
def _tumble(self, point_selection, match):
if not self.check(match):
return None
match = match.weighted_optimize()
if not self.check(match):
return None
match = self._match_all(
point_selection=point_selection, zero=match.zero, a=match.a, b=match.b)
if not self.check(match):
return None
match = match.weighted_optimize()
if not self.check(match):
return None
else:
return match
def _do_match(self, point_selection: grm.PointSelection, zero, polar_vectors):
'''
Return a list with matches of all pairwise combinations of polar_vectors
'''
match_list = []
# we test all pairs of candidate vectors
# and populate match_matrix
for i in range(len(polar_vectors)):
for j in range(i + 1, len(polar_vectors)):
a = polar_vectors[i]
b = polar_vectors[j]
# too parallel, not good lattice vectors
if not angle_check(np.array([a]), np.array([b]), self.min_angle):
continue
if a[0] > b[0]:
bb = a
aa = b
else:
aa = a
bb = b
aa, bb = make_cartesian(np.array([aa, bb]))
try:
match = self._match_all(
point_selection=point_selection, zero=zero, a=aa, b=bb)
match = self._tumble(point_selection, match)
except np.linalg.LinAlgError:
continue
if match is not None:
match_list.append(match)
return match_list
def _find_best_vector_match(self, point_selection: grm.PointSelection, zero, candidates):
'''
Return the match that matches with the best figure of merit
Good properties for vectors are
* Matching many points in the result
* Orthogonal
* Equal length
* Short
The function implements a heuristic to calculate a figure of merit that boosts
candidates for each of the criteria that they fulfill or nearly fulfill.
FIXME improve this on more real-world examples; define test cases.
FIXME The figure of merit function (fom) could be a parameter, implement if need arises.
'''
def fom(m):
na = np.linalg.norm(m.a)
nb = np.linalg.norm(m.b)
# Matching many high-quality points is good
res = np.sum(m.peak_elevations)**2
# favor orthogonality
res *= (np.abs(np.cross(m.a, m.b)) / (na * nb))
# favor equal length
res *= ((na * nb) / (na**2 + nb**2))
return res
match_list = self._do_match(point_selection, zero, candidates)
if match_list:
return max(match_list, key=fom)
else:
return None
def _make_hdbscan_config(self, points):
# This is handled here because the defaults depend on the number of points
defaults = {}
if self.min_cluster_size_fraction is not None:
defaults['min_cluster_size'] = max(len(points) // self.min_cluster_size_fraction, 2)
if self.min_samples_fraction is not None:
defaults['min_samples'] = max(len(points) // self.min_samples_fraction, 1)
return defaults
def _hdbscan_candidates(self, points):
'''
Use hdbscan clustering to find potential candidates for lattice vectors.
We rely on the clusterer and its settings to give us tight and well-populated clusters.
Then we calculate a weighted mean for each cluster.
In the end we return the shortest matches.
'''
# We have special tuning parameters for the default :class:`~hdbscan.HDBSCAN`
if isinstance(self.clusterer, hdbscan.HDBSCAN):
defaults = self._make_hdbscan_config(points)
for key, value in defaults.items():
setattr(self.clusterer, key, value)
vectors = self.make_polar_vectors(points)
self.clusterer.fit(vectors)
labels = self.clusterer.labels_
cand = []
for cluster in range(max(labels) + 1):
selector = labels == cluster
v = vectors[selector]
weights = self.clusterer.probabilities_[selector]
std = v.std(axis=0)
mean = np.average(v, axis=0, weights=weights)
fom = np.linalg.norm(std)
if fom > self.tolerance:
# print("too fuzzy")
continue
cand.append(mean)
# return the shortest candidate vectors
return np.array(sorted(cand, key=lambda d: d[0])[:self.max_candidates])
def _candidates(self, points):
polar_vectors = []
# Enough "flesh" to cluster
if len(points) > self.min_points:
# Get some candidates
polar_vectors = self._hdbscan_candidates(points)
# Not enough candidates found, use all pairwise vectors as candidates
# Tighter vector limits mean less candidates from clustering
# Adjust as needed because the full match is slow for too many points
if len(polar_vectors) < self.min_candidates:
if len(points) > self.min_points:
log.warn(
"Matching many points directly, might be computationally intensive: %s" %
len(points)
)
polar_vectors = self.make_polar_vectors(points)
return polar_vectors
[docs]def size_filter(polar, min_delta, max_delta):
'''
Accept a list of polar vectors
Return a list of polar vectors with length between min_delta and max_delta
'''
select = (polar[:, 0] >= min_delta) * (polar[:, 0] <= max_delta)
return polar[select]
[docs]def angle_check(p1, p2, limit):
'''
Check if p1 and p2 have an angle difference of at least limit,
both parallel or antiparallel
'''
diff = np.absolute(p1[:, 1] - p2[:, 1]) % np.pi
return (diff > limit) * (diff < (np.pi - limit))