Source code for libertem_blobfinder.common.fullmatch

'''
Fullmatch
~~~~~~~~~

Code to extract parallelogram grids from point clouds. This
can guess lattices from a set of diffraction spots using clustering.
'''

import logging

import numpy as np
try:
    import hdbscan
except ModuleNotFoundError:
    raise ModuleNotFoundError(
        "The fullmatch module requires the hdbscan extra, "
        "please install -blobfinder with [hdbscan]."
    ) from None


import libertem_blobfinder.common.gridmatching as grm
from libertem_blobfinder.base.utils import make_polar, make_cartesian


log = logging.getLogger(__name__)


[docs]class NotFoundException(Exception): pass
[docs]class FullMatcher(grm.Matcher): ''' Extension of :class:`~libertem_blobfinder.common.gridmatching.Matcher` will full matching Include the ability to guess grid parameters from a point cloud. This is separated from the other code since it currently only works with :class:`~hdbscan.HDBSCAN`, which can be problematic to install on some platforms. For that reason it is an optional dependency. Parameters ---------- tolerance : float Position tolerance in px for peaks to be considered matches min_weight : float Minimum peak elevation of a peak to be considered for matching min_match : int Minimum number of matching peaks to be considered a match. min_angle : float Minimum angle in radians between two vectors to be considered candidates min_points : int Minimum points to try clustering matching. Otherwise match directly min_delta : float Minimum length of a potential grid vector max_delta : float Maximum length of a potential grid vector min_candidates : int Minimum number of candidates to consider clustering matching successful. If not enough are found, the algorithm uses a brute-force search with all pairwise vectors between points max_candidates : int Maximum number of candidates to return from clustering matching clusterer Instance of sklearn.cluster compatible clusterer. Default is :class:`~hdbscan.HDBSCAN`. min_cluster_size_fraction : float Tuning parameter for clustering matching with :class:`~hdbscan.HDBSCAN`. Larger values allow smaller or fuzzier clusters. This is used to adapt the :code:`min_cluster_size` parameter of :class:`~hdbscan.HDBSCAN` dynamically to the number of points to be matched. Set this to :code:`None` to disable dynamic adjustment of :code:`min_cluster_size`. If you like to set :code:`min_cluster_size` to a constant value, you can set this to :code:`None` and additionally set the :code:`clusterer` parameter with your own clusterer object to have direct control over all parameters. min_samples_fraction : float Tuning parameter for clustering matching with :class:`~hdbscan.HDBSCAN`. Larger values allow smaller or fuzzier clusters. This is used to adapt the :code:`min_samples` parameter of :class:`~hdbscan.HDBSCAN` dynamically to the number of points to be matched. Set this to :code:`None` to disable dynamic adjustment of :code:`min_samples`. If you like to set :code:`min_samples` to a constant value, you can set this to :code:`None` and additionally set the :code:`clusterer` parameter with your own clusterer object to have direct control over all parameters. ''' def __init__( self, tolerance=3, min_weight=0.1, min_match=3, min_angle=np.pi/10, min_points=10, min_delta=0, max_delta=np.inf, min_candidates=3, max_candidates=7, clusterer=None, min_cluster_size_fraction=4, min_samples_fraction=20): super().__init__(tolerance=tolerance, min_weight=min_weight, min_match=min_match) if clusterer is None: clusterer = hdbscan.HDBSCAN() self.min_angle = min_angle self.min_points = min_points self.min_delta = min_delta self.max_delta = max_delta self.min_candidates = min_candidates self.max_candidates = max_candidates self.clusterer = clusterer self.min_cluster_size_fraction = min_cluster_size_fraction self.min_samples_fraction = min_samples_fraction
[docs] def full_match( self, centers, zero=None, cand=None, refineds=None, peak_values=None, peak_elevations=None): ''' This function extracts a list of Match objects as well two PointSelection objects for unmatched and weak points from correlation_result and zero point. The zero point is included in each of the matches because it is shared between all grids. Parameters ---------- centers : numpy.ndarray numpy.ndarray of shape (n, 2) with integer centers (y, x) of peaks. This would typically be extracted with :meth:`libertem_blobfinder.common.correlation.get_peaks` zero : numpy.ndarray Zero point as numpy array (y, x). cand : list or numpy.ndarray Optional list of candidate vectors (y, x) to use in a first matching round before guessing. refineds : numpy.ndarray numpy.ndarray of shape (n, 2) with float centers (y, x) of peaks (subpixel refinement) peak_values : numpy.ndarray numpy.ndarray of shape (n,) with float maxima of correlation map of peaks peak_elevations : numpy.ndarray numpy.ndarray of shape (n,) with float elevation of correlation map of peaks. See :meth:`libertem_blobfinder.base.correlation.peak_elevation` for details. Returns ------- Tuple[List[libertem_blobfinder.common.gridmatching.Match, ...],\ libertem_blobfinder.common.gridmatching.PointSelection,\ libertem_blobfinder.common.gridmatching.PointSelection] matches: list of :class:`~libertem_blobfinder.common.gridmatching.Match` instances, unmatched: instance of :class:`~libertem_blobfinder.common.gridmatching.PointSelection`, weak: instance of :class:`~libertem_blobfinder.common.gridmatching.PointSelection` Example ------- >>> peaks = np.array([ ... # First peak is zero if not specified otherwise ... # Base lattice vectors (32, 0) and (0, 32) ... (64, 64), ... (32, 32), (32, 64), (32, 96), ... (64, 32), (64, 96), ... (96, 32), (96, 64), (96, 96), ... ]) >>> matcher = FullMatcher() >>> (matches, unmatched, weak) = matcher.full_match(peaks) >>> m = matches[0] >>> assert np.allclose(m.zero, (64, 64)) >>> assert np.allclose(m.a, (32, 0)) >>> assert np.allclose(m.b, (0, 32)) ''' class ExitException(Exception): pass if zero is None: zero = centers[0] corr = grm.CorrelationResult( centers=centers, refineds=refineds, peak_values=peak_values, peak_elevations=peak_elevations, ) matches = [] filt = corr.peak_elevations >= self.min_weight working_set = grm.PointSelection(corr, selector=filt) zero_selector = np.array([ np.allclose(corr.centers[i], zero) + np.allclose(corr.refineds[i], zero) for i in range(len(corr)) ], dtype=bool) def listed(working_set, polar_cand): return polar_cand def guess(working_set, polar_cand): return self._candidates(working_set.refineds) if cand is not None: polar_cand = size_filter( make_polar(np.array(cand)), min_delta=self.min_delta, max_delta=self.max_delta ) candidate_methods = [listed, guess] else: polar_cand = None candidate_methods = [guess] while True: new_selector = np.copy(working_set.selector) # First, find good candidate # Expensive operation, should be done on smaller sample # or sum frame result, at least for first passes to match majority # of peaks polar_candidate_vectors = candidate_methods[0](working_set, polar_cand) match = self._find_best_vector_match( point_selection=working_set, zero=zero, candidates=polar_candidate_vectors) if match is None: candidate_methods = candidate_methods[1:] if len(candidate_methods) == 0: break else: continue matches.append(match) # remove the ones that have been matched new_selector[match.selector] = False if np.count_nonzero(new_selector) >= self.min_match: # Add zero point that is shared by all patterns new_selector[zero_selector] = True working_set = working_set.derive(selector=new_selector) else: break if matches: new_selector[zero_selector] = False unmatched = working_set.derive(selector=new_selector) weak = grm.PointSelection(corr, selector=np.logical_not(filt)) return (matches, unmatched, weak)
[docs] def make_polar_vectors(self, coords): ''' Calculate all unique pairwise connecting polar vectors between points in coords. The pairwise connecting vectors are converted to polar coordinates and filtered with parameters :py:attr:`~min_delta` and :py:attr:`~max_delta` to avoid calculating for unwanted higher order or random smaller vectors. All calculated vectors have a positive or zero x direction. ''' # sort by x coordinate so that we have always positive x difference vectors sort_indices = np.argsort(coords[:, 1]) coords = coords[sort_indices] i, j = np.mgrid[0: len(coords), 0: len(coords)] selector = j > i deltas = coords[j[selector]] - coords[i[selector]] polar = make_polar(deltas) return size_filter(polar, self.min_delta, self.max_delta)
def check(self, match): if len(match) < self.min_match: return False papb = make_polar(np.array([match.a, match.b])) if len(size_filter(papb, self.min_delta, self.max_delta)) != 2: return False return angle_check(papb[0:1], papb[1:2], self.min_angle) def _tumble(self, point_selection, match): if not self.check(match): return None match = match.weighted_optimize() if not self.check(match): return None match = self._match_all( point_selection=point_selection, zero=match.zero, a=match.a, b=match.b) if not self.check(match): return None match = match.weighted_optimize() if not self.check(match): return None else: return match def _do_match(self, point_selection: grm.PointSelection, zero, polar_vectors): ''' Return a list with matches of all pairwise combinations of polar_vectors ''' match_list = [] # we test all pairs of candidate vectors # and populate match_matrix for i in range(len(polar_vectors)): for j in range(i + 1, len(polar_vectors)): a = polar_vectors[i] b = polar_vectors[j] # too parallel, not good lattice vectors if not angle_check(np.array([a]), np.array([b]), self.min_angle): continue if a[0] > b[0]: bb = a aa = b else: aa = a bb = b aa, bb = make_cartesian(np.array([aa, bb])) try: match = self._match_all( point_selection=point_selection, zero=zero, a=aa, b=bb) match = self._tumble(point_selection, match) except np.linalg.LinAlgError: continue if match is not None: match_list.append(match) return match_list def _find_best_vector_match(self, point_selection: grm.PointSelection, zero, candidates): ''' Return the match that matches with the best figure of merit Good properties for vectors are * Matching many points in the result * Orthogonal * Equal length * Short The function implements a heuristic to calculate a figure of merit that boosts candidates for each of the criteria that they fulfill or nearly fulfill. FIXME improve this on more real-world examples; define test cases. FIXME The figure of merit function (fom) could be a parameter, implement if need arises. ''' def fom(m): na = np.linalg.norm(m.a) nb = np.linalg.norm(m.b) # Matching many high-quality points is good res = np.sum(m.peak_elevations)**2 # favor orthogonality res *= (np.abs(np.cross(m.a, m.b)) / (na * nb)) # favor equal length res *= ((na * nb) / (na**2 + nb**2)) return res match_list = self._do_match(point_selection, zero, candidates) if match_list: return max(match_list, key=fom) else: return None def _make_hdbscan_config(self, points): # This is handled here because the defaults depend on the number of points defaults = {} if self.min_cluster_size_fraction is not None: defaults['min_cluster_size'] = max(len(points) // self.min_cluster_size_fraction, 2) if self.min_samples_fraction is not None: defaults['min_samples'] = max(len(points) // self.min_samples_fraction, 1) return defaults def _hdbscan_candidates(self, points): ''' Use hdbscan clustering to find potential candidates for lattice vectors. We rely on the clusterer and its settings to give us tight and well-populated clusters. Then we calculate a weighted mean for each cluster. In the end we return the shortest matches. ''' # We have special tuning parameters for the default :class:`~hdbscan.HDBSCAN` if isinstance(self.clusterer, hdbscan.HDBSCAN): defaults = self._make_hdbscan_config(points) for key, value in defaults.items(): setattr(self.clusterer, key, value) vectors = self.make_polar_vectors(points) self.clusterer.fit(vectors) labels = self.clusterer.labels_ cand = [] for cluster in range(max(labels) + 1): selector = labels == cluster v = vectors[selector] weights = self.clusterer.probabilities_[selector] std = v.std(axis=0) mean = np.average(v, axis=0, weights=weights) fom = np.linalg.norm(std) if fom > self.tolerance: # print("too fuzzy") continue cand.append(mean) # return the shortest candidate vectors return np.array(sorted(cand, key=lambda d: d[0])[:self.max_candidates]) def _candidates(self, points): polar_vectors = [] # Enough "flesh" to cluster if len(points) > self.min_points: # Get some candidates polar_vectors = self._hdbscan_candidates(points) # Not enough candidates found, use all pairwise vectors as candidates # Tighter vector limits mean less candidates from clustering # Adjust as needed because the full match is slow for too many points if len(polar_vectors) < self.min_candidates: if len(points) > self.min_points: log.warn( "Matching many points directly, might be computationally intensive: %s" % len(points) ) polar_vectors = self.make_polar_vectors(points) return polar_vectors
[docs]def size_filter(polar, min_delta, max_delta): ''' Accept a list of polar vectors Return a list of polar vectors with length between min_delta and max_delta ''' select = (polar[:, 0] >= min_delta) * (polar[:, 0] <= max_delta) return polar[select]
[docs]def angle_check(p1, p2, limit): ''' Check if p1 and p2 have an angle difference of at least limit, both parallel or antiparallel ''' diff = np.absolute(p1[:, 1] - p2[:, 1]) % np.pi return (diff > limit) * (diff < (np.pi - limit))