epicure.laptrack_centroids
Tracking with laptrack package
Inspired from example https://github.com/yfukai/laptrack/blob/main/docs/examples/cell_segmentation.ipynb
1""" 2Tracking with laptrack package 3 4Inspired from example https://github.com/yfukai/laptrack/blob/main/docs/examples/cell_segmentation.ipynb 5""" 6 7import numpy as np 8import pandas as pd 9 10import laptrack 11from laptrack import LapTrack 12import epicure.Utils as ut 13from packaging.version import Version 14 15 16def squared_difference(a1, a2): 17 """ Squared difference, normalized """ 18 return ((a1-a2)**2)/(max(a1, a2)**2) 19 20def squared_distance(x1, y1, x2, y2): 21 """ Return squared distance """ 22 return (x1-x2)**2 + (y1-y2)**2 23 24 25class LaptrackCentroids(): 26 27 def __init__(self, track, epic): 28 self.max_distance = 15 29 self.splitting_cost = 1 30 self.merging_cost = 1 31 self.penal_area = 0 32 self.penal_solidity = 0 33 self.track = track 34 self.epicure = epic 35 self.inspecting = False 36 self.suggesting = False 37 self.region_properties = ["label", "frame", "centroid-0", "centroid-1", "area", "solidity"] 38 ## to handle difference in laptrack versions 39 self.version_over = Version(laptrack.__version__) >= Version("0.17.0") 40 41 if self.version_over: 42 self.lt = LapTrack( metric=self.tracking_metric, 43 cutoff=1, 44 splitting_metric=self.tracking_nofeat_metric, 45 splitting_cutoff=self.splitting_cost, 46 merging_metric=self.tracking_nofeat_metric, 47 merging_cutoff=self.merging_cost, ) 48 else: 49 self.lt = LapTrack( track_dist_metric=self.tracking_metric, 50 track_cost_cutoff=1, 51 splitting_dist_metric=self.tracking_nofeat_metric, 52 splitting_cost_cutoff=self.splitting_cost, 53 merging_dist_metric=self.tracking_nofeat_metric, 54 merging_cost_cutoff=self.merging_cost, ) 55 56 def set_region_properties(self, with_extra=False): 57 """ define the region properties used in the tracker """ 58 if with_extra: 59 self.region_properties = ["label", "frame", "centroid-0", "centroid-1", "area", "solidity"] 60 else: 61 self.region_properties = ["label", "frame", "centroid-0", "centroid-1" ] 62 63 def twoframes_track(self, df, labels): 64 """ Do track between two frames, only track label """ 65 #start_time = time.time() 66 track_df, split_df, merge_df = self.perform_track( df ) 67 #show_info("Performed in "+"{:.3f}".format((time.time()-start_time)/60)+" min") 68 69 track_ids = [None]*len(labels) 70 ## look for track id associated with label 71 frame_df = track_df[track_df["frame"] == 1] 72 label_to_tid = {int(row["label"]): int(row["track_id"]) for i, row in frame_df.iterrows()} 73 for i, curlabel in enumerate(labels): 74 track_ids[i] = label_to_tid.get(curlabel, None) 75 76 tracklabels = [None]*len(labels) 77 ## look for cell label associated with track_id in the first frame, if any 78 frame_df = track_df[track_df["frame"]==0] 79 frame_df = frame_df[frame_df["track_id"].isin(track_ids)] 80 label_to_tid = {int(row["track_id"]): int(row["label"]) for i, row in frame_df.iterrows()} 81 for i, tid in enumerate(track_ids): 82 tracklabels[i] = label_to_tid.get(tid, None) 83 #show_info("Finished in "+"{:.3f}".format((time.time()-start_time)/60)+" min") 84 return tracklabels 85 86 def tracking_metric(self, c1, c2): 87 """ Tracking cost metric: distance + feature penalties """ 88 dist = squared_distance(c1[2], c1[3], c2[2], c2[3]) 89 if dist > self.max_distance**2: 90 return dist 91 ## normalize all distances to combine them 92 dist = dist / (self.max_distance**2) 93 if self.penal_area > 0: 94 dist += self.penal_area * squared_difference(c1[4], c2[4]) 95 if self.penal_solidity > 0: 96 dist += self.penal_solidity * squared_difference(c1[5], c2[5]) 97 return dist 98 99 def tracking_nofeat_metric(self, c1, c2): 100 """ Tracking cost metric: distance, no penalties """ 101 dist = squared_distance(c1[2], c1[3], c2[2], c2[3]) 102 if dist > self.max_distance**2: 103 return dist 104 ## normalize all distances to combine them 105 dist = dist / (self.max_distance**2) 106 return dist 107 108 def perform_track(self, regionprops_df): 109 """ Do tracking with laptrack module """ 110 split_cost = False 111 merg_cost = False 112 #from laptrack import ParallelBackend 113 #self.lt.parallel_backend = ParallelBackend.ray 114 if self.splitting_cost > 0: 115 split_cost = self.splitting_cost**2 116 if self.version_over: 117 self.lt.splitting_cutoff = split_cost 118 else: 119 self.lt.splitting_cost_cutoff = split_cost 120 if self.merging_cost > 0: 121 merg_cost = self.merging_cost**2 122 if self.version_over: 123 self.lt.merging_cutoff = merg_cost 124 else: 125 self.lt.merging_cost_cutoff = merg_cost 126 track_df, split_df, merge_df = self.lt.predict_dataframe( 127 regionprops_df, 128 coordinate_cols=self.region_properties, 129 only_coordinate_cols=False, 130 ) 131 track_df = track_df.reset_index() 132 return track_df, split_df, merge_df 133 134 def track_centroids(self, regionprops_df): 135 """ Track cells based on their centroids positions + features penalties """ 136 ut.napari_info("Starting tracking with LapTrack centroids metrics...") 137 return self.perform_track( regionprops_df ) 138 139 def inspect_oneframe(self, graph, trackdf): 140 for track in np.unique(trackdf["track_id"]): 141 tr = trackdf[trackdf["track_id"] == track] 142 ## track is only on one frame, suspect 143 if len(np.unique(tr["frame"])) == 1: 144 # trackid + 1 as trackid starts as 0 145 pos = (tr.iloc[0]["frame"], int(tr.iloc[0]["centroid-0"]), int(tr.iloc[0]["centroid-1"])) 146 self.epicure.inspecting.add_event( pos, track+1, "tracking" ) 147 if self.track.suggesting: 148 if track in graph.keys(): 149 sisters = [] 150 refval = graph[track][0] 151 for key, val in graph.items(): 152 if val[0] == refval: 153 sisters.append( key ) 154 if len(sisters) == 2: 155 for sis in sisters: 156 self.epicure.add_suggestion( sis+1, refval+1 )
def
squared_difference(a1, a2):
17def squared_difference(a1, a2): 18 """ Squared difference, normalized """ 19 return ((a1-a2)**2)/(max(a1, a2)**2)
Squared difference, normalized
def
squared_distance(x1, y1, x2, y2):
21def squared_distance(x1, y1, x2, y2): 22 """ Return squared distance """ 23 return (x1-x2)**2 + (y1-y2)**2
Return squared distance
class
LaptrackCentroids:
26class LaptrackCentroids(): 27 28 def __init__(self, track, epic): 29 self.max_distance = 15 30 self.splitting_cost = 1 31 self.merging_cost = 1 32 self.penal_area = 0 33 self.penal_solidity = 0 34 self.track = track 35 self.epicure = epic 36 self.inspecting = False 37 self.suggesting = False 38 self.region_properties = ["label", "frame", "centroid-0", "centroid-1", "area", "solidity"] 39 ## to handle difference in laptrack versions 40 self.version_over = Version(laptrack.__version__) >= Version("0.17.0") 41 42 if self.version_over: 43 self.lt = LapTrack( metric=self.tracking_metric, 44 cutoff=1, 45 splitting_metric=self.tracking_nofeat_metric, 46 splitting_cutoff=self.splitting_cost, 47 merging_metric=self.tracking_nofeat_metric, 48 merging_cutoff=self.merging_cost, ) 49 else: 50 self.lt = LapTrack( track_dist_metric=self.tracking_metric, 51 track_cost_cutoff=1, 52 splitting_dist_metric=self.tracking_nofeat_metric, 53 splitting_cost_cutoff=self.splitting_cost, 54 merging_dist_metric=self.tracking_nofeat_metric, 55 merging_cost_cutoff=self.merging_cost, ) 56 57 def set_region_properties(self, with_extra=False): 58 """ define the region properties used in the tracker """ 59 if with_extra: 60 self.region_properties = ["label", "frame", "centroid-0", "centroid-1", "area", "solidity"] 61 else: 62 self.region_properties = ["label", "frame", "centroid-0", "centroid-1" ] 63 64 def twoframes_track(self, df, labels): 65 """ Do track between two frames, only track label """ 66 #start_time = time.time() 67 track_df, split_df, merge_df = self.perform_track( df ) 68 #show_info("Performed in "+"{:.3f}".format((time.time()-start_time)/60)+" min") 69 70 track_ids = [None]*len(labels) 71 ## look for track id associated with label 72 frame_df = track_df[track_df["frame"] == 1] 73 label_to_tid = {int(row["label"]): int(row["track_id"]) for i, row in frame_df.iterrows()} 74 for i, curlabel in enumerate(labels): 75 track_ids[i] = label_to_tid.get(curlabel, None) 76 77 tracklabels = [None]*len(labels) 78 ## look for cell label associated with track_id in the first frame, if any 79 frame_df = track_df[track_df["frame"]==0] 80 frame_df = frame_df[frame_df["track_id"].isin(track_ids)] 81 label_to_tid = {int(row["track_id"]): int(row["label"]) for i, row in frame_df.iterrows()} 82 for i, tid in enumerate(track_ids): 83 tracklabels[i] = label_to_tid.get(tid, None) 84 #show_info("Finished in "+"{:.3f}".format((time.time()-start_time)/60)+" min") 85 return tracklabels 86 87 def tracking_metric(self, c1, c2): 88 """ Tracking cost metric: distance + feature penalties """ 89 dist = squared_distance(c1[2], c1[3], c2[2], c2[3]) 90 if dist > self.max_distance**2: 91 return dist 92 ## normalize all distances to combine them 93 dist = dist / (self.max_distance**2) 94 if self.penal_area > 0: 95 dist += self.penal_area * squared_difference(c1[4], c2[4]) 96 if self.penal_solidity > 0: 97 dist += self.penal_solidity * squared_difference(c1[5], c2[5]) 98 return dist 99 100 def tracking_nofeat_metric(self, c1, c2): 101 """ Tracking cost metric: distance, no penalties """ 102 dist = squared_distance(c1[2], c1[3], c2[2], c2[3]) 103 if dist > self.max_distance**2: 104 return dist 105 ## normalize all distances to combine them 106 dist = dist / (self.max_distance**2) 107 return dist 108 109 def perform_track(self, regionprops_df): 110 """ Do tracking with laptrack module """ 111 split_cost = False 112 merg_cost = False 113 #from laptrack import ParallelBackend 114 #self.lt.parallel_backend = ParallelBackend.ray 115 if self.splitting_cost > 0: 116 split_cost = self.splitting_cost**2 117 if self.version_over: 118 self.lt.splitting_cutoff = split_cost 119 else: 120 self.lt.splitting_cost_cutoff = split_cost 121 if self.merging_cost > 0: 122 merg_cost = self.merging_cost**2 123 if self.version_over: 124 self.lt.merging_cutoff = merg_cost 125 else: 126 self.lt.merging_cost_cutoff = merg_cost 127 track_df, split_df, merge_df = self.lt.predict_dataframe( 128 regionprops_df, 129 coordinate_cols=self.region_properties, 130 only_coordinate_cols=False, 131 ) 132 track_df = track_df.reset_index() 133 return track_df, split_df, merge_df 134 135 def track_centroids(self, regionprops_df): 136 """ Track cells based on their centroids positions + features penalties """ 137 ut.napari_info("Starting tracking with LapTrack centroids metrics...") 138 return self.perform_track( regionprops_df ) 139 140 def inspect_oneframe(self, graph, trackdf): 141 for track in np.unique(trackdf["track_id"]): 142 tr = trackdf[trackdf["track_id"] == track] 143 ## track is only on one frame, suspect 144 if len(np.unique(tr["frame"])) == 1: 145 # trackid + 1 as trackid starts as 0 146 pos = (tr.iloc[0]["frame"], int(tr.iloc[0]["centroid-0"]), int(tr.iloc[0]["centroid-1"])) 147 self.epicure.inspecting.add_event( pos, track+1, "tracking" ) 148 if self.track.suggesting: 149 if track in graph.keys(): 150 sisters = [] 151 refval = graph[track][0] 152 for key, val in graph.items(): 153 if val[0] == refval: 154 sisters.append( key ) 155 if len(sisters) == 2: 156 for sis in sisters: 157 self.epicure.add_suggestion( sis+1, refval+1 )
LaptrackCentroids(track, epic)
28 def __init__(self, track, epic): 29 self.max_distance = 15 30 self.splitting_cost = 1 31 self.merging_cost = 1 32 self.penal_area = 0 33 self.penal_solidity = 0 34 self.track = track 35 self.epicure = epic 36 self.inspecting = False 37 self.suggesting = False 38 self.region_properties = ["label", "frame", "centroid-0", "centroid-1", "area", "solidity"] 39 ## to handle difference in laptrack versions 40 self.version_over = Version(laptrack.__version__) >= Version("0.17.0") 41 42 if self.version_over: 43 self.lt = LapTrack( metric=self.tracking_metric, 44 cutoff=1, 45 splitting_metric=self.tracking_nofeat_metric, 46 splitting_cutoff=self.splitting_cost, 47 merging_metric=self.tracking_nofeat_metric, 48 merging_cutoff=self.merging_cost, ) 49 else: 50 self.lt = LapTrack( track_dist_metric=self.tracking_metric, 51 track_cost_cutoff=1, 52 splitting_dist_metric=self.tracking_nofeat_metric, 53 splitting_cost_cutoff=self.splitting_cost, 54 merging_dist_metric=self.tracking_nofeat_metric, 55 merging_cost_cutoff=self.merging_cost, )
def
set_region_properties(self, with_extra=False):
57 def set_region_properties(self, with_extra=False): 58 """ define the region properties used in the tracker """ 59 if with_extra: 60 self.region_properties = ["label", "frame", "centroid-0", "centroid-1", "area", "solidity"] 61 else: 62 self.region_properties = ["label", "frame", "centroid-0", "centroid-1" ]
define the region properties used in the tracker
def
twoframes_track(self, df, labels):
64 def twoframes_track(self, df, labels): 65 """ Do track between two frames, only track label """ 66 #start_time = time.time() 67 track_df, split_df, merge_df = self.perform_track( df ) 68 #show_info("Performed in "+"{:.3f}".format((time.time()-start_time)/60)+" min") 69 70 track_ids = [None]*len(labels) 71 ## look for track id associated with label 72 frame_df = track_df[track_df["frame"] == 1] 73 label_to_tid = {int(row["label"]): int(row["track_id"]) for i, row in frame_df.iterrows()} 74 for i, curlabel in enumerate(labels): 75 track_ids[i] = label_to_tid.get(curlabel, None) 76 77 tracklabels = [None]*len(labels) 78 ## look for cell label associated with track_id in the first frame, if any 79 frame_df = track_df[track_df["frame"]==0] 80 frame_df = frame_df[frame_df["track_id"].isin(track_ids)] 81 label_to_tid = {int(row["track_id"]): int(row["label"]) for i, row in frame_df.iterrows()} 82 for i, tid in enumerate(track_ids): 83 tracklabels[i] = label_to_tid.get(tid, None) 84 #show_info("Finished in "+"{:.3f}".format((time.time()-start_time)/60)+" min") 85 return tracklabels
Do track between two frames, only track label
def
tracking_metric(self, c1, c2):
87 def tracking_metric(self, c1, c2): 88 """ Tracking cost metric: distance + feature penalties """ 89 dist = squared_distance(c1[2], c1[3], c2[2], c2[3]) 90 if dist > self.max_distance**2: 91 return dist 92 ## normalize all distances to combine them 93 dist = dist / (self.max_distance**2) 94 if self.penal_area > 0: 95 dist += self.penal_area * squared_difference(c1[4], c2[4]) 96 if self.penal_solidity > 0: 97 dist += self.penal_solidity * squared_difference(c1[5], c2[5]) 98 return dist
Tracking cost metric: distance + feature penalties
def
tracking_nofeat_metric(self, c1, c2):
100 def tracking_nofeat_metric(self, c1, c2): 101 """ Tracking cost metric: distance, no penalties """ 102 dist = squared_distance(c1[2], c1[3], c2[2], c2[3]) 103 if dist > self.max_distance**2: 104 return dist 105 ## normalize all distances to combine them 106 dist = dist / (self.max_distance**2) 107 return dist
Tracking cost metric: distance, no penalties
def
perform_track(self, regionprops_df):
109 def perform_track(self, regionprops_df): 110 """ Do tracking with laptrack module """ 111 split_cost = False 112 merg_cost = False 113 #from laptrack import ParallelBackend 114 #self.lt.parallel_backend = ParallelBackend.ray 115 if self.splitting_cost > 0: 116 split_cost = self.splitting_cost**2 117 if self.version_over: 118 self.lt.splitting_cutoff = split_cost 119 else: 120 self.lt.splitting_cost_cutoff = split_cost 121 if self.merging_cost > 0: 122 merg_cost = self.merging_cost**2 123 if self.version_over: 124 self.lt.merging_cutoff = merg_cost 125 else: 126 self.lt.merging_cost_cutoff = merg_cost 127 track_df, split_df, merge_df = self.lt.predict_dataframe( 128 regionprops_df, 129 coordinate_cols=self.region_properties, 130 only_coordinate_cols=False, 131 ) 132 track_df = track_df.reset_index() 133 return track_df, split_df, merge_df
Do tracking with laptrack module
def
track_centroids(self, regionprops_df):
135 def track_centroids(self, regionprops_df): 136 """ Track cells based on their centroids positions + features penalties """ 137 ut.napari_info("Starting tracking with LapTrack centroids metrics...") 138 return self.perform_track( regionprops_df )
Track cells based on their centroids positions + features penalties
def
inspect_oneframe(self, graph, trackdf):
140 def inspect_oneframe(self, graph, trackdf): 141 for track in np.unique(trackdf["track_id"]): 142 tr = trackdf[trackdf["track_id"] == track] 143 ## track is only on one frame, suspect 144 if len(np.unique(tr["frame"])) == 1: 145 # trackid + 1 as trackid starts as 0 146 pos = (tr.iloc[0]["frame"], int(tr.iloc[0]["centroid-0"]), int(tr.iloc[0]["centroid-1"])) 147 self.epicure.inspecting.add_event( pos, track+1, "tracking" ) 148 if self.track.suggesting: 149 if track in graph.keys(): 150 sisters = [] 151 refval = graph[track][0] 152 for key, val in graph.items(): 153 if val[0] == refval: 154 sisters.append( key ) 155 if len(sisters) == 2: 156 for sis in sisters: 157 self.epicure.add_suggestion( sis+1, refval+1 )