epicure.laptrack_centroids

  1"""
  2Tracking with laptrack package
  3
  4Inspired from example https://github.com/yfukai/laptrack/blob/main/docs/examples/cell_segmentation.ipynb
  5"""
  6
  7import numpy as np
  8import pandas as pd
  9
 10import laptrack
 11from laptrack import LapTrack
 12import epicure.Utils as ut
 13from packaging.version import Version
 14    
 15
 16def squared_difference(a1, a2):
 17    """ Squared difference, normalized """
 18    return ((a1-a2)**2)/(max(a1, a2)**2)
 19
 20def squared_distance(x1, y1, x2, y2):
 21    """ Return squared distance """
 22    return (x1-x2)**2 + (y1-y2)**2
 23
 24
 25class LaptrackCentroids():
 26
 27    def __init__(self, track, epic):
 28        self.max_distance = 15
 29        self.splitting_cost = 1
 30        self.merging_cost = 1
 31        self.penal_area = 0
 32        self.penal_solidity = 0
 33        self.track = track
 34        self.epicure = epic
 35        self.inspecting = False
 36        self.suggesting = False
 37        self.region_properties = ["label", "frame", "centroid-0", "centroid-1", "area", "solidity"]
 38        ## to handle difference in laptrack versions
 39        self.version_over = Version(laptrack.__version__) >= Version("0.17.0")
 40
 41        if self.version_over:
 42            self.lt = LapTrack( metric=self.tracking_metric,
 43                                cutoff=1, 
 44                                splitting_metric=self.tracking_nofeat_metric, 
 45                                splitting_cutoff=self.splitting_cost, 
 46                                merging_metric=self.tracking_nofeat_metric,
 47                                merging_cutoff=self.merging_cost, )
 48        else:
 49            self.lt = LapTrack( track_dist_metric=self.tracking_metric,
 50                                track_cost_cutoff=1, 
 51                                splitting_dist_metric=self.tracking_nofeat_metric, 
 52                                splitting_cost_cutoff=self.splitting_cost, 
 53                                merging_dist_metric=self.tracking_nofeat_metric,
 54                                merging_cost_cutoff=self.merging_cost, )
 55        
 56    def set_region_properties(self, with_extra=False):
 57        """ define the region properties used in the tracker """
 58        if with_extra:
 59            self.region_properties = ["label", "frame", "centroid-0", "centroid-1", "area", "solidity"]
 60        else:
 61            self.region_properties = ["label", "frame", "centroid-0", "centroid-1" ]
 62
 63    def twoframes_track(self, df, labels):
 64        """ Do track between two frames, only track label """
 65        #start_time = time.time()
 66        track_df, split_df, merge_df = self.perform_track( df )
 67        #show_info("Performed in "+"{:.3f}".format((time.time()-start_time)/60)+" min")
 68        
 69        track_ids = [None]*len(labels)
 70        ## look for track id associated with label
 71        frame_df = track_df[track_df["frame"] == 1]
 72        label_to_tid = {int(row["label"]): int(row["track_id"]) for i, row in frame_df.iterrows()}
 73        for i, curlabel in enumerate(labels):
 74            track_ids[i] = label_to_tid.get(curlabel, None)
 75
 76        tracklabels = [None]*len(labels)
 77        ## look for cell label associated with track_id in the first frame, if any
 78        frame_df = track_df[track_df["frame"]==0]
 79        frame_df = frame_df[frame_df["track_id"].isin(track_ids)]
 80        label_to_tid = {int(row["track_id"]): int(row["label"]) for i, row in frame_df.iterrows()}
 81        for i, tid in enumerate(track_ids):
 82            tracklabels[i] = label_to_tid.get(tid, None)
 83        #show_info("Finished in "+"{:.3f}".format((time.time()-start_time)/60)+" min")
 84        return tracklabels
 85
 86    def tracking_metric(self, c1, c2):
 87        """ Tracking cost metric: distance + feature penalties """
 88        dist = squared_distance(c1[2], c1[3], c2[2], c2[3])
 89        if dist > self.max_distance**2:
 90            return dist
 91        ## normalize all distances to combine them
 92        dist = dist / (self.max_distance**2)
 93        if self.penal_area > 0:
 94            dist += self.penal_area * squared_difference(c1[4], c2[4])
 95        if self.penal_solidity > 0:
 96            dist += self.penal_solidity * squared_difference(c1[5], c2[5])
 97        return dist
 98    
 99    def tracking_nofeat_metric(self, c1, c2):
100        """ Tracking cost metric: distance, no penalties """
101        dist = squared_distance(c1[2], c1[3], c2[2], c2[3])
102        if dist > self.max_distance**2:
103            return dist
104        ## normalize all distances to combine them
105        dist = dist / (self.max_distance**2)
106        return dist
107
108    def perform_track(self, regionprops_df):
109        """ Do tracking with laptrack module """
110        split_cost = False
111        merg_cost = False
112        #from laptrack import ParallelBackend
113        #self.lt.parallel_backend = ParallelBackend.ray
114        if self.splitting_cost > 0:
115            split_cost = self.splitting_cost**2
116        if self.version_over:
117            self.lt.splitting_cutoff = split_cost
118        else:
119            self.lt.splitting_cost_cutoff = split_cost
120        if self.merging_cost > 0:
121            merg_cost = self.merging_cost**2
122        if self.version_over:
123            self.lt.merging_cutoff = merg_cost
124        else:
125            self.lt.merging_cost_cutoff = merg_cost
126        track_df, split_df, merge_df = self.lt.predict_dataframe(
127            regionprops_df,
128            coordinate_cols=self.region_properties,
129            only_coordinate_cols=False,
130        )    
131        track_df = track_df.reset_index()
132        return track_df, split_df, merge_df
133
134    def track_centroids(self, regionprops_df):
135        """ Track cells based on their centroids positions + features penalties """
136        ut.napari_info("Starting tracking with LapTrack centroids metrics...")
137        return self.perform_track( regionprops_df )
138
139    def inspect_oneframe(self, graph, trackdf):
140        for track in np.unique(trackdf["track_id"]):
141            tr = trackdf[trackdf["track_id"] == track]
142            ## track is only on one frame, suspect
143            if len(np.unique(tr["frame"])) == 1:
144                # trackid + 1 as trackid starts as 0
145                pos = (tr.iloc[0]["frame"], int(tr.iloc[0]["centroid-0"]), int(tr.iloc[0]["centroid-1"]))
146                self.epicure.inspecting.add_event( pos, track+1, "tracking" )
147                if self.track.suggesting:
148                    if track in graph.keys():
149                        sisters = []
150                        refval = graph[track][0]
151                        for key, val in graph.items():
152                            if val[0] == refval:
153                                sisters.append( key )
154                        if len(sisters) == 2:
155                            for sis in sisters:
156                                self.epicure.add_suggestion( sis+1, refval+1 )
def squared_difference(a1, a2):
17def squared_difference(a1, a2):
18    """ Squared difference, normalized """
19    return ((a1-a2)**2)/(max(a1, a2)**2)

Squared difference, normalized

def squared_distance(x1, y1, x2, y2):
21def squared_distance(x1, y1, x2, y2):
22    """ Return squared distance """
23    return (x1-x2)**2 + (y1-y2)**2

Return squared distance

class LaptrackCentroids:
 26class LaptrackCentroids():
 27
 28    def __init__(self, track, epic):
 29        self.max_distance = 15
 30        self.splitting_cost = 1
 31        self.merging_cost = 1
 32        self.penal_area = 0
 33        self.penal_solidity = 0
 34        self.track = track
 35        self.epicure = epic
 36        self.inspecting = False
 37        self.suggesting = False
 38        self.region_properties = ["label", "frame", "centroid-0", "centroid-1", "area", "solidity"]
 39        ## to handle difference in laptrack versions
 40        self.version_over = Version(laptrack.__version__) >= Version("0.17.0")
 41
 42        if self.version_over:
 43            self.lt = LapTrack( metric=self.tracking_metric,
 44                                cutoff=1, 
 45                                splitting_metric=self.tracking_nofeat_metric, 
 46                                splitting_cutoff=self.splitting_cost, 
 47                                merging_metric=self.tracking_nofeat_metric,
 48                                merging_cutoff=self.merging_cost, )
 49        else:
 50            self.lt = LapTrack( track_dist_metric=self.tracking_metric,
 51                                track_cost_cutoff=1, 
 52                                splitting_dist_metric=self.tracking_nofeat_metric, 
 53                                splitting_cost_cutoff=self.splitting_cost, 
 54                                merging_dist_metric=self.tracking_nofeat_metric,
 55                                merging_cost_cutoff=self.merging_cost, )
 56        
 57    def set_region_properties(self, with_extra=False):
 58        """ define the region properties used in the tracker """
 59        if with_extra:
 60            self.region_properties = ["label", "frame", "centroid-0", "centroid-1", "area", "solidity"]
 61        else:
 62            self.region_properties = ["label", "frame", "centroid-0", "centroid-1" ]
 63
 64    def twoframes_track(self, df, labels):
 65        """ Do track between two frames, only track label """
 66        #start_time = time.time()
 67        track_df, split_df, merge_df = self.perform_track( df )
 68        #show_info("Performed in "+"{:.3f}".format((time.time()-start_time)/60)+" min")
 69        
 70        track_ids = [None]*len(labels)
 71        ## look for track id associated with label
 72        frame_df = track_df[track_df["frame"] == 1]
 73        label_to_tid = {int(row["label"]): int(row["track_id"]) for i, row in frame_df.iterrows()}
 74        for i, curlabel in enumerate(labels):
 75            track_ids[i] = label_to_tid.get(curlabel, None)
 76
 77        tracklabels = [None]*len(labels)
 78        ## look for cell label associated with track_id in the first frame, if any
 79        frame_df = track_df[track_df["frame"]==0]
 80        frame_df = frame_df[frame_df["track_id"].isin(track_ids)]
 81        label_to_tid = {int(row["track_id"]): int(row["label"]) for i, row in frame_df.iterrows()}
 82        for i, tid in enumerate(track_ids):
 83            tracklabels[i] = label_to_tid.get(tid, None)
 84        #show_info("Finished in "+"{:.3f}".format((time.time()-start_time)/60)+" min")
 85        return tracklabels
 86
 87    def tracking_metric(self, c1, c2):
 88        """ Tracking cost metric: distance + feature penalties """
 89        dist = squared_distance(c1[2], c1[3], c2[2], c2[3])
 90        if dist > self.max_distance**2:
 91            return dist
 92        ## normalize all distances to combine them
 93        dist = dist / (self.max_distance**2)
 94        if self.penal_area > 0:
 95            dist += self.penal_area * squared_difference(c1[4], c2[4])
 96        if self.penal_solidity > 0:
 97            dist += self.penal_solidity * squared_difference(c1[5], c2[5])
 98        return dist
 99    
100    def tracking_nofeat_metric(self, c1, c2):
101        """ Tracking cost metric: distance, no penalties """
102        dist = squared_distance(c1[2], c1[3], c2[2], c2[3])
103        if dist > self.max_distance**2:
104            return dist
105        ## normalize all distances to combine them
106        dist = dist / (self.max_distance**2)
107        return dist
108
109    def perform_track(self, regionprops_df):
110        """ Do tracking with laptrack module """
111        split_cost = False
112        merg_cost = False
113        #from laptrack import ParallelBackend
114        #self.lt.parallel_backend = ParallelBackend.ray
115        if self.splitting_cost > 0:
116            split_cost = self.splitting_cost**2
117        if self.version_over:
118            self.lt.splitting_cutoff = split_cost
119        else:
120            self.lt.splitting_cost_cutoff = split_cost
121        if self.merging_cost > 0:
122            merg_cost = self.merging_cost**2
123        if self.version_over:
124            self.lt.merging_cutoff = merg_cost
125        else:
126            self.lt.merging_cost_cutoff = merg_cost
127        track_df, split_df, merge_df = self.lt.predict_dataframe(
128            regionprops_df,
129            coordinate_cols=self.region_properties,
130            only_coordinate_cols=False,
131        )    
132        track_df = track_df.reset_index()
133        return track_df, split_df, merge_df
134
135    def track_centroids(self, regionprops_df):
136        """ Track cells based on their centroids positions + features penalties """
137        ut.napari_info("Starting tracking with LapTrack centroids metrics...")
138        return self.perform_track( regionprops_df )
139
140    def inspect_oneframe(self, graph, trackdf):
141        for track in np.unique(trackdf["track_id"]):
142            tr = trackdf[trackdf["track_id"] == track]
143            ## track is only on one frame, suspect
144            if len(np.unique(tr["frame"])) == 1:
145                # trackid + 1 as trackid starts as 0
146                pos = (tr.iloc[0]["frame"], int(tr.iloc[0]["centroid-0"]), int(tr.iloc[0]["centroid-1"]))
147                self.epicure.inspecting.add_event( pos, track+1, "tracking" )
148                if self.track.suggesting:
149                    if track in graph.keys():
150                        sisters = []
151                        refval = graph[track][0]
152                        for key, val in graph.items():
153                            if val[0] == refval:
154                                sisters.append( key )
155                        if len(sisters) == 2:
156                            for sis in sisters:
157                                self.epicure.add_suggestion( sis+1, refval+1 )
LaptrackCentroids(track, epic)
28    def __init__(self, track, epic):
29        self.max_distance = 15
30        self.splitting_cost = 1
31        self.merging_cost = 1
32        self.penal_area = 0
33        self.penal_solidity = 0
34        self.track = track
35        self.epicure = epic
36        self.inspecting = False
37        self.suggesting = False
38        self.region_properties = ["label", "frame", "centroid-0", "centroid-1", "area", "solidity"]
39        ## to handle difference in laptrack versions
40        self.version_over = Version(laptrack.__version__) >= Version("0.17.0")
41
42        if self.version_over:
43            self.lt = LapTrack( metric=self.tracking_metric,
44                                cutoff=1, 
45                                splitting_metric=self.tracking_nofeat_metric, 
46                                splitting_cutoff=self.splitting_cost, 
47                                merging_metric=self.tracking_nofeat_metric,
48                                merging_cutoff=self.merging_cost, )
49        else:
50            self.lt = LapTrack( track_dist_metric=self.tracking_metric,
51                                track_cost_cutoff=1, 
52                                splitting_dist_metric=self.tracking_nofeat_metric, 
53                                splitting_cost_cutoff=self.splitting_cost, 
54                                merging_dist_metric=self.tracking_nofeat_metric,
55                                merging_cost_cutoff=self.merging_cost, )
max_distance
splitting_cost
merging_cost
penal_area
penal_solidity
track
epicure
inspecting
suggesting
region_properties
version_over
def set_region_properties(self, with_extra=False):
57    def set_region_properties(self, with_extra=False):
58        """ define the region properties used in the tracker """
59        if with_extra:
60            self.region_properties = ["label", "frame", "centroid-0", "centroid-1", "area", "solidity"]
61        else:
62            self.region_properties = ["label", "frame", "centroid-0", "centroid-1" ]

define the region properties used in the tracker

def twoframes_track(self, df, labels):
64    def twoframes_track(self, df, labels):
65        """ Do track between two frames, only track label """
66        #start_time = time.time()
67        track_df, split_df, merge_df = self.perform_track( df )
68        #show_info("Performed in "+"{:.3f}".format((time.time()-start_time)/60)+" min")
69        
70        track_ids = [None]*len(labels)
71        ## look for track id associated with label
72        frame_df = track_df[track_df["frame"] == 1]
73        label_to_tid = {int(row["label"]): int(row["track_id"]) for i, row in frame_df.iterrows()}
74        for i, curlabel in enumerate(labels):
75            track_ids[i] = label_to_tid.get(curlabel, None)
76
77        tracklabels = [None]*len(labels)
78        ## look for cell label associated with track_id in the first frame, if any
79        frame_df = track_df[track_df["frame"]==0]
80        frame_df = frame_df[frame_df["track_id"].isin(track_ids)]
81        label_to_tid = {int(row["track_id"]): int(row["label"]) for i, row in frame_df.iterrows()}
82        for i, tid in enumerate(track_ids):
83            tracklabels[i] = label_to_tid.get(tid, None)
84        #show_info("Finished in "+"{:.3f}".format((time.time()-start_time)/60)+" min")
85        return tracklabels

Do track between two frames, only track label

def tracking_metric(self, c1, c2):
87    def tracking_metric(self, c1, c2):
88        """ Tracking cost metric: distance + feature penalties """
89        dist = squared_distance(c1[2], c1[3], c2[2], c2[3])
90        if dist > self.max_distance**2:
91            return dist
92        ## normalize all distances to combine them
93        dist = dist / (self.max_distance**2)
94        if self.penal_area > 0:
95            dist += self.penal_area * squared_difference(c1[4], c2[4])
96        if self.penal_solidity > 0:
97            dist += self.penal_solidity * squared_difference(c1[5], c2[5])
98        return dist

Tracking cost metric: distance + feature penalties

def tracking_nofeat_metric(self, c1, c2):
100    def tracking_nofeat_metric(self, c1, c2):
101        """ Tracking cost metric: distance, no penalties """
102        dist = squared_distance(c1[2], c1[3], c2[2], c2[3])
103        if dist > self.max_distance**2:
104            return dist
105        ## normalize all distances to combine them
106        dist = dist / (self.max_distance**2)
107        return dist

Tracking cost metric: distance, no penalties

def perform_track(self, regionprops_df):
109    def perform_track(self, regionprops_df):
110        """ Do tracking with laptrack module """
111        split_cost = False
112        merg_cost = False
113        #from laptrack import ParallelBackend
114        #self.lt.parallel_backend = ParallelBackend.ray
115        if self.splitting_cost > 0:
116            split_cost = self.splitting_cost**2
117        if self.version_over:
118            self.lt.splitting_cutoff = split_cost
119        else:
120            self.lt.splitting_cost_cutoff = split_cost
121        if self.merging_cost > 0:
122            merg_cost = self.merging_cost**2
123        if self.version_over:
124            self.lt.merging_cutoff = merg_cost
125        else:
126            self.lt.merging_cost_cutoff = merg_cost
127        track_df, split_df, merge_df = self.lt.predict_dataframe(
128            regionprops_df,
129            coordinate_cols=self.region_properties,
130            only_coordinate_cols=False,
131        )    
132        track_df = track_df.reset_index()
133        return track_df, split_df, merge_df

Do tracking with laptrack module

def track_centroids(self, regionprops_df):
135    def track_centroids(self, regionprops_df):
136        """ Track cells based on their centroids positions + features penalties """
137        ut.napari_info("Starting tracking with LapTrack centroids metrics...")
138        return self.perform_track( regionprops_df )

Track cells based on their centroids positions + features penalties

def inspect_oneframe(self, graph, trackdf):
140    def inspect_oneframe(self, graph, trackdf):
141        for track in np.unique(trackdf["track_id"]):
142            tr = trackdf[trackdf["track_id"] == track]
143            ## track is only on one frame, suspect
144            if len(np.unique(tr["frame"])) == 1:
145                # trackid + 1 as trackid starts as 0
146                pos = (tr.iloc[0]["frame"], int(tr.iloc[0]["centroid-0"]), int(tr.iloc[0]["centroid-1"]))
147                self.epicure.inspecting.add_event( pos, track+1, "tracking" )
148                if self.track.suggesting:
149                    if track in graph.keys():
150                        sisters = []
151                        refval = graph[track][0]
152                        for key, val in graph.items():
153                            if val[0] == refval:
154                                sisters.append( key )
155                        if len(sisters) == 2:
156                            for sis in sisters:
157                                self.epicure.add_suggestion( sis+1, refval+1 )