Skip to content

NGI Autoscaling

C pynasonde.vipir.ngi.scaleNoiseProfile (background estimation) and AutoScaler (ionogram autoscaling).

pynasonde.vipir.ngi.scale

Automatic ionogram scaling utilities for VIPIR NGI datasets.

NoiseProfile

Bases: object

Encapsulate the noise amplification profile applied during scaling.

Attributes:

Name Type Description
type

Descriptive name of the profile (currently unused).

profile

Scalar or array multiplier applied to the dataset noise floor.

Source code in pynasonde/vipir/ngi/scale.py
class NoiseProfile(object):
    """Encapsulate the noise amplification profile applied during scaling.

    Attributes:
        type: Descriptive name of the profile (currently unused).
        profile: Scalar or array multiplier applied to the dataset noise floor.
    """

    def __init__(self, type="exp", constant=1.5):
        self.type = type
        self.profile = constant
        return

    def get_exp_profile(self, x: np.array, a0: float, b0: float, x0: float):
        """Evaluate an exponential decay profile and store it in `profile`."""
        self.profile = a0 * np.exp(-b0 * x / x0)
        return self.profile

get_exp_profile(x, a0, b0, x0)

Evaluate an exponential decay profile and store it in profile.

Source code in pynasonde/vipir/ngi/scale.py
def get_exp_profile(self, x: np.array, a0: float, b0: float, x0: float):
    """Evaluate an exponential decay profile and store it in `profile`."""
    self.profile = a0 * np.exp(-b0 * x / x0)
    return self.profile

AutoScaler

Bases: object

Pipeline that filters, segments, and extracts traces from an ionogram.

Attributes:

Name Type Description
ds

Source dataset containing power/noise arrays and metadata.

noise_profile

Instance describing how to scale the noise floor.

mode

Mode (O, X, etc.) currently processed.

filter

Bounds applied during preprocessing (frequency/height).

apply_filter

Whether the filter mask is used.

segmentation_method

Name of the segmentation routine.

image2d

Working copy of the mode power array.

filtered_2D_image

Median-filtered version of image2d.

binary_image

Binary segmentation mask.

traces

Mapping of cluster labels to sampled trace points.

trace_params

Metadata derived from fitted parabolic traces.

Source code in pynasonde/vipir/ngi/scale.py
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
class AutoScaler(object):
    """Pipeline that filters, segments, and extracts traces from an ionogram.

    Attributes:
        ds: Source dataset containing power/noise arrays and metadata.
        noise_profile: Instance describing how to scale the noise floor.
        mode: Mode (O, X, etc.) currently processed.
        filter: Bounds applied during preprocessing (frequency/height).
        apply_filter: Whether the filter mask is used.
        segmentation_method: Name of the segmentation routine.
        image2d: Working copy of the mode power array.
        filtered_2D_image: Median-filtered version of `image2d`.
        binary_image: Binary segmentation mask.
        traces: Mapping of cluster labels to sampled trace points.
        trace_params: Metadata derived from fitted parabolic traces.
    """

    def __init__(
        self,
        ds: Dataset,
        noise_profile: NoiseProfile = NoiseProfile(),
        mode: str = "O",
        filter: dict = dict(
            frequency=[0, 10],
            height=[50, 400],
        ),
        apply_filter: bool = True,
        segmentation_method: str = "k-means",
    ):
        """Instantiate the scaler and immediately extract the mode-specific view."""
        self.ds = ds
        self.noise_profile = noise_profile
        self.mode = mode
        self.apply_filter = apply_filter
        self.filter = filter
        self.segmentation_method = segmentation_method
        self.extract()
        return

    def extract(self, mode: str = None):
        """Prepare filtered power arrays and metadata for the requested mode."""
        mode = mode if mode else self.mode
        logger.info(f"Run {mode}-Mode Scaler")
        self.param_name = f"{mode}_mode_power"
        self.param_noise_name = f"{mode}_mode_noise"
        self.noise_profile = self.noise_profile.profile
        self.noise = np.copy(
            getattr(self.ds, self.param_noise_name) * self.noise_profile
        )
        self.image2d = np.copy(getattr(self.ds, self.param_name))
        self.frequency, self.height = (
            np.copy(self.ds.Frequency / 1e3),
            np.copy(self.ds.Range),
        )
        if self.apply_filter:
            self.image2d[self.image2d < self.noise[:, np.newaxis]] = 0
            self.image2d[
                :,
                (self.height <= self.filter["height"][0])
                | (self.height >= self.filter["height"][1]),
            ] = 0
            self.image2d[
                (self.frequency <= self.filter["frequency"][0])
                | (self.frequency >= self.filter["frequency"][1]),
                :,
            ] = 0
        return

    def mdeian_filter(
        self,
        tau: int = 4,
        kernel: np.array = np.array([[1, 2, 1], [2, 5, 2], [1, 2, 1]]),
    ):
        """Apply a weighted median filter to suppress sparse noise."""
        self.filtered_2D_image = np.zeros_like(self.image2d)
        med_filt_data = np.copy(self.image2d)
        shape = med_filt_data.shape
        logger.info(f"Inside median filter {shape}")
        for i in range(1, shape[0] - 1):
            for j in range(1, shape[1] - 1):
                local_window = med_filt_data[i - 1 : i + 2, j - 1 : j + 2]
                valid_count = np.count_nonzero(local_window)
                if valid_count >= tau:
                    self.filtered_2D_image[i, j] = np.nanmean(
                        np.repeat(local_window.ravel(), kernel.ravel())
                    )
                else:
                    self.filtered_2D_image[i, j] = 0.0
        return

    def image_segmentation(
        self, segmentation_method: str = None, data: np.array = None, **kwargs
    ):
        """Run the configured image segmentation routine on the provided data."""
        if data is None:
            data = np.copy(self.filtered_2D_image)
        segmentation_method = (
            segmentation_method if segmentation_method else self.segmentation_method
        )
        logger.info(f"Running {segmentation_method} image segmentation...")
        if segmentation_method == "k-means":
            self.kmeans_image_segmentation(data, *kwargs)
        return

    def kmeans_image_segmentation(
        self,
        image: np.array,
        K: int = 3,
        criteria: Tuple = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 10, 0.1),
        center_selection: int = cv2.KMEANS_PP_CENTERS,
    ):
        """Cluster the ionogram using OpenCV's K-means implementation."""
        pixels = np.float32(image)
        pixels = pixels.reshape((-1, 3))
        attempts = criteria[1]
        self.compactness, self.label, self.center = cv2.kmeans(
            pixels,
            K,
            None,
            criteria,
            attempts,
            center_selection,
        )
        self.center = np.uint8(self.center)
        self.segmented_image = self.center[self.label.flatten()]
        self.segmented_image = self.segmented_image.reshape(image.shape)
        return

    def fit_parabola(self, tr, label):
        """Fit a parabolic trace model to the provided cluster samples."""
        if len(tr) > 10:
            try:
                logger.info("Identified region, fitting parabola")
                # Fit the curve
                popt, _ = curve_fit(
                    parabola,
                    np.array(tr.frequency),
                    np.array(tr.height),
                    maxfev=10000,
                )
                self.traces[label] = tr.copy()
                self.trace_params[label] = {
                    "hs": tr.height.min(),
                    "fs": tr.frequency.max(),
                    "popt": popt,
                    "function": parabola,
                    "frequency": np.array(tr.frequency),
                    "height": parabola(
                        np.array(tr.frequency), popt[0], popt[1], popt[2]
                    ),
                }
            except Exception:
                logger.error(f"Issue in long traces: {traceback.format_exc()}")
        else:
            logger.warning(f"Issue in len of dataset < 10")
        return

    def to_binary_traces(
        self,
        nbins: int = 1000,
        thresh: float = 2,
        eps: int = 7,
        min_samples: int = 40,
        trace_params: dict = dict(
            region_thick_thresh=100,
            sza_thresh=90.0,
        ),
    ):
        """Convert segmented ionogram data into clustered traces and fits."""
        import pandas as pd
        from skimage.filters import threshold_otsu
        from sklearn.cluster import DBSCAN

        thresh = thresh * threshold_otsu(np.copy(self.image2d), nbins=nbins)
        logger.info(f"Otsu's thresh {thresh}")
        self.binary_image = self.segmented_image > thresh

        vertices = np.where(self.binary_image == True)
        self.indices, self.traces, self.trace_params, self.fitted_trace_params = (
            [],
            dict(),
            dict(),
            dict(),
        )
        for i, j in zip(vertices[0], vertices[1]):
            self.indices.append(
                dict(frequency=self.frequency[i], height=self.height[j])
            )
        self.indices = pd.DataFrame.from_records(self.indices)
        self.indices.dropna(inplace=True)

        if len(self.indices) > 0:
            self.indices.sort_values(by=["frequency"], inplace=True)
            freqs, dfreqs = self.indices.frequency.unique(), []
            for i, f in enumerate(freqs):
                dfreqs.extend([i] * len(self.indices[self.indices.frequency == f]))
            self.indices["dfrequency"] = dfreqs
            # Run DBScan to identify individual regions
            dbscan = DBSCAN(eps=eps, min_samples=min_samples).fit(
                self.indices[["dfrequency", "height"]]
            )
            self.indices["labels"] = dbscan.labels_
            self.indices = self.indices[self.indices.labels != -1]
            # Create new labels for F1/F2
            new_label_start = dbscan.labels_.max() + 1

            for label in np.unique(self.indices["labels"]):
                trace = self.indices[self.indices.labels == label]

                # TODO: Add more than one curves
                if (
                    trace.height.max() - trace.height.min()
                    > trace_params["region_thick_thresh"]
                ) & (self.ds.sza < trace_params["sza_thresh"]):
                    logger.info(f"SZA: {self.ds.sza}")
                    # Fit the duble gaussian curve
                    logger.info("Identified region(s), fitting 2-parabola(s)")
                    try:
                        logger.info("Identified region is too thick")
                        # Extract F1/F2 Properties
                        # Estimated theoritical values of median F1/2 height
                        med_hs = np.nanmedian(trace.height)
                        trace.loc[trace.height >= med_hs, "labels"] = new_label_start
                        for i, l in enumerate(trace.labels.unique()):
                            o = trace[trace.labels == l]
                            self.fit_parabola(o, l)
                        new_label_start += 1
                    except Exception:
                        self.fit_parabola(trace, label)
                        logger.error(f"Issue in long traces: {traceback.format_exc()}")
                # Fit a parabolic curve
                else:
                    self.fit_parabola(trace, label)
            if len(self.traces) > 0:
                self.ds.set_traces(self.traces, self.trace_params)
        return

    def draw_sanity_check_images(
        self,
        fname: str,
        cmap: str = "Greens",
        xticks: List[float] = [1.5, 2.0, 3.0, 5.0, 7.0, 10.0, 15.0, 20.0],
        xlabel: str = "Frequency, MHz",
        ylabel: str = "Virtual Height, km",
        prange: List[float] = [5, 70],
        ylim: List[float] = [50, 800],
        xlim: List[float] = [1, 22],
        font_size: float = 10,
        figsize: tuple = (6, 3),
        txt_color: str = "w",
    ):
        """Generate a multi-panel figure showing intermediate scaling outputs."""
        ion = Ionogram(
            fig_title=f"{self.ds.StationName.strip()} / {self.ds.time.strftime('%H:%M:%S UT %d %b %Y')} / {self.mode}-Mode",
            font_size=font_size,
            figsize=figsize,
        )
        ion.add_ionogram(
            self.frequency,
            self.height,
            getattr(self.ds, self.param_name),
            mode=self.mode,
            text="(a) Raw ionogram",
            xlabel="",
            ylabel=ylabel,
            cmap=cmap,
            del_ticks=False,
            ylim=ylim,
            xlim=xlim,
            prange=prange,
            xticks=xticks,
            txt_color=txt_color,
        )
        ion.add_ionogram(
            self.frequency,
            self.height,
            self.image2d,
            mode=self.mode,
            text="(b) Filtered ionogram",
            xlabel="",
            ylabel="",
            cmap=cmap,
            ylim=ylim,
            xlim=xlim,
            prange=prange,
            xticks=xticks,
            txt_color=txt_color,
        )
        ion.add_ionogram(
            self.frequency,
            self.height,
            self.filtered_2D_image,
            mode=self.mode,
            text="(c) Med-filtered ionogram",
            xlabel="",
            ylabel="",
            cmap=cmap,
            ylim=ylim,
            xlim=xlim,
            prange=prange,
            xticks=xticks,
            txt_color=txt_color,
        )
        ion.add_ionogram(
            self.frequency,
            self.height,
            self.binary_image,
            mode=self.mode,
            text="(d) Binary ionogram",
            xlabel=xlabel,
            ylabel=ylabel,
            cmap=cmap,
            ylim=ylim,
            xlim=xlim,
            xticks=xticks,
            del_ticks=False,
            prange=[0, 1],
            txt_color=txt_color,
        )

        ax = ion.add_ionogram(
            self.frequency,
            self.height,
            self.binary_image,
            mode=self.mode,
            text="(e) Trace",
            xlabel="",
            ylabel="",
            cmap=cmap,
            ylim=ylim,
            xlim=xlim,
            xticks=xticks,
            prange=[0, 1],
            txt_color=txt_color,
        )
        for key in list(self.traces.keys()):
            trace = self.traces[key]
            ax.plot(
                np.log10(trace["frequency"]),
                trace["height"],
                marker="o",
                ls="None",
                color=utils.get_color_by_index(key, len(self.traces.keys())),
                ms=0.3,
                zorder=5,
                alpha=0.6,
            )

        ax = ion.add_ionogram(
            self.frequency,
            self.height,
            self.binary_image,
            mode=self.mode,
            text="(f) Scaled",
            xlabel="",
            ylabel="",
            cmap=cmap,
            ylim=ylim,
            xlim=xlim,
            xticks=xticks,
            prange=[0, 1],
            txt_color=txt_color,
        )
        for key in list(self.trace_params.keys()):
            trace_info = self.trace_params[key]
            color = utils.get_color_by_index(key, len(self.traces.keys()))
            ax.axvline(
                np.log10(trace_info["fs"]),
                color=color,
                ls="--",
                lw=0.6,
                alpha=0.8,
                zorder=5,
            )
            ax.axhline(
                trace_info["hs"], color=color, ls="--", lw=0.6, alpha=0.8, zorder=5
            )
            trace = self.traces[key]
            ax.plot(
                np.log10(trace["frequency"]),
                trace["height"],
                marker="o",
                ls="None",
                color=utils.get_color_by_index(key, len(self.traces.keys())),
                ms=0.3,
                zorder=5,
                alpha=0.6,
            )
            ax.plot(
                np.log10(trace_info["frequency"]),
                trace_info["height"],
                color="yellow",
                ls="-",
                lw=1.2,
                zorder=8,
            )
        ion.save(fname)
        ion.close()
        logger.info(f"Save file to: {fname}")
        return

    @staticmethod
    def median_filter_fos_hms(x: np.array, method: str = "", window: int = 11):
        # This method is used to conduct a 1D median filter on the final hms and fos.
        if x.ndim != 1:
            raise ValueError("smooth only accepts 1 dimension arrays.")
        if x.size < window:
            raise ValueError("Input vector needs to be bigger than window size.")
        if window < 3:
            return x
        if not window in ["flat", "hanning", "hamming", "bartlett", "blackman"]:
            raise ValueError(
                "Window is on of 'flat', 'hanning', 'hamming', 'bartlett', 'blackman'"
            )
        s = np.r_[x[window_len - 1 : 0 : -1], x, x[-2 : -window_len - 1 : -1]]
        if window == "flat":
            w = numpy.ones(window_len, "d")
        else:
            w = eval("np." + window + "(window_len)")
        y = np.convolve(w / w.sum(), s, mode="valid")
        d = window_len - 1
        y = y[int(d / 2) : -int(d / 2)]
        return y

__init__(ds, noise_profile=NoiseProfile(), mode='O', filter=dict(frequency=[0, 10], height=[50, 400]), apply_filter=True, segmentation_method='k-means')

Instantiate the scaler and immediately extract the mode-specific view.

Source code in pynasonde/vipir/ngi/scale.py
def __init__(
    self,
    ds: Dataset,
    noise_profile: NoiseProfile = NoiseProfile(),
    mode: str = "O",
    filter: dict = dict(
        frequency=[0, 10],
        height=[50, 400],
    ),
    apply_filter: bool = True,
    segmentation_method: str = "k-means",
):
    """Instantiate the scaler and immediately extract the mode-specific view."""
    self.ds = ds
    self.noise_profile = noise_profile
    self.mode = mode
    self.apply_filter = apply_filter
    self.filter = filter
    self.segmentation_method = segmentation_method
    self.extract()
    return

extract(mode=None)

Prepare filtered power arrays and metadata for the requested mode.

Source code in pynasonde/vipir/ngi/scale.py
def extract(self, mode: str = None):
    """Prepare filtered power arrays and metadata for the requested mode."""
    mode = mode if mode else self.mode
    logger.info(f"Run {mode}-Mode Scaler")
    self.param_name = f"{mode}_mode_power"
    self.param_noise_name = f"{mode}_mode_noise"
    self.noise_profile = self.noise_profile.profile
    self.noise = np.copy(
        getattr(self.ds, self.param_noise_name) * self.noise_profile
    )
    self.image2d = np.copy(getattr(self.ds, self.param_name))
    self.frequency, self.height = (
        np.copy(self.ds.Frequency / 1e3),
        np.copy(self.ds.Range),
    )
    if self.apply_filter:
        self.image2d[self.image2d < self.noise[:, np.newaxis]] = 0
        self.image2d[
            :,
            (self.height <= self.filter["height"][0])
            | (self.height >= self.filter["height"][1]),
        ] = 0
        self.image2d[
            (self.frequency <= self.filter["frequency"][0])
            | (self.frequency >= self.filter["frequency"][1]),
            :,
        ] = 0
    return

mdeian_filter(tau=4, kernel=np.array([[1, 2, 1], [2, 5, 2], [1, 2, 1]]))

Apply a weighted median filter to suppress sparse noise.

Source code in pynasonde/vipir/ngi/scale.py
def mdeian_filter(
    self,
    tau: int = 4,
    kernel: np.array = np.array([[1, 2, 1], [2, 5, 2], [1, 2, 1]]),
):
    """Apply a weighted median filter to suppress sparse noise."""
    self.filtered_2D_image = np.zeros_like(self.image2d)
    med_filt_data = np.copy(self.image2d)
    shape = med_filt_data.shape
    logger.info(f"Inside median filter {shape}")
    for i in range(1, shape[0] - 1):
        for j in range(1, shape[1] - 1):
            local_window = med_filt_data[i - 1 : i + 2, j - 1 : j + 2]
            valid_count = np.count_nonzero(local_window)
            if valid_count >= tau:
                self.filtered_2D_image[i, j] = np.nanmean(
                    np.repeat(local_window.ravel(), kernel.ravel())
                )
            else:
                self.filtered_2D_image[i, j] = 0.0
    return

image_segmentation(segmentation_method=None, data=None, **kwargs)

Run the configured image segmentation routine on the provided data.

Source code in pynasonde/vipir/ngi/scale.py
def image_segmentation(
    self, segmentation_method: str = None, data: np.array = None, **kwargs
):
    """Run the configured image segmentation routine on the provided data."""
    if data is None:
        data = np.copy(self.filtered_2D_image)
    segmentation_method = (
        segmentation_method if segmentation_method else self.segmentation_method
    )
    logger.info(f"Running {segmentation_method} image segmentation...")
    if segmentation_method == "k-means":
        self.kmeans_image_segmentation(data, *kwargs)
    return

kmeans_image_segmentation(image, K=3, criteria=(cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 10, 0.1), center_selection=cv2.KMEANS_PP_CENTERS)

Cluster the ionogram using OpenCV's K-means implementation.

Source code in pynasonde/vipir/ngi/scale.py
def kmeans_image_segmentation(
    self,
    image: np.array,
    K: int = 3,
    criteria: Tuple = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 10, 0.1),
    center_selection: int = cv2.KMEANS_PP_CENTERS,
):
    """Cluster the ionogram using OpenCV's K-means implementation."""
    pixels = np.float32(image)
    pixels = pixels.reshape((-1, 3))
    attempts = criteria[1]
    self.compactness, self.label, self.center = cv2.kmeans(
        pixels,
        K,
        None,
        criteria,
        attempts,
        center_selection,
    )
    self.center = np.uint8(self.center)
    self.segmented_image = self.center[self.label.flatten()]
    self.segmented_image = self.segmented_image.reshape(image.shape)
    return

fit_parabola(tr, label)

Fit a parabolic trace model to the provided cluster samples.

Source code in pynasonde/vipir/ngi/scale.py
def fit_parabola(self, tr, label):
    """Fit a parabolic trace model to the provided cluster samples."""
    if len(tr) > 10:
        try:
            logger.info("Identified region, fitting parabola")
            # Fit the curve
            popt, _ = curve_fit(
                parabola,
                np.array(tr.frequency),
                np.array(tr.height),
                maxfev=10000,
            )
            self.traces[label] = tr.copy()
            self.trace_params[label] = {
                "hs": tr.height.min(),
                "fs": tr.frequency.max(),
                "popt": popt,
                "function": parabola,
                "frequency": np.array(tr.frequency),
                "height": parabola(
                    np.array(tr.frequency), popt[0], popt[1], popt[2]
                ),
            }
        except Exception:
            logger.error(f"Issue in long traces: {traceback.format_exc()}")
    else:
        logger.warning(f"Issue in len of dataset < 10")
    return

to_binary_traces(nbins=1000, thresh=2, eps=7, min_samples=40, trace_params=dict(region_thick_thresh=100, sza_thresh=90.0))

Convert segmented ionogram data into clustered traces and fits.

Source code in pynasonde/vipir/ngi/scale.py
def to_binary_traces(
    self,
    nbins: int = 1000,
    thresh: float = 2,
    eps: int = 7,
    min_samples: int = 40,
    trace_params: dict = dict(
        region_thick_thresh=100,
        sza_thresh=90.0,
    ),
):
    """Convert segmented ionogram data into clustered traces and fits."""
    import pandas as pd
    from skimage.filters import threshold_otsu
    from sklearn.cluster import DBSCAN

    thresh = thresh * threshold_otsu(np.copy(self.image2d), nbins=nbins)
    logger.info(f"Otsu's thresh {thresh}")
    self.binary_image = self.segmented_image > thresh

    vertices = np.where(self.binary_image == True)
    self.indices, self.traces, self.trace_params, self.fitted_trace_params = (
        [],
        dict(),
        dict(),
        dict(),
    )
    for i, j in zip(vertices[0], vertices[1]):
        self.indices.append(
            dict(frequency=self.frequency[i], height=self.height[j])
        )
    self.indices = pd.DataFrame.from_records(self.indices)
    self.indices.dropna(inplace=True)

    if len(self.indices) > 0:
        self.indices.sort_values(by=["frequency"], inplace=True)
        freqs, dfreqs = self.indices.frequency.unique(), []
        for i, f in enumerate(freqs):
            dfreqs.extend([i] * len(self.indices[self.indices.frequency == f]))
        self.indices["dfrequency"] = dfreqs
        # Run DBScan to identify individual regions
        dbscan = DBSCAN(eps=eps, min_samples=min_samples).fit(
            self.indices[["dfrequency", "height"]]
        )
        self.indices["labels"] = dbscan.labels_
        self.indices = self.indices[self.indices.labels != -1]
        # Create new labels for F1/F2
        new_label_start = dbscan.labels_.max() + 1

        for label in np.unique(self.indices["labels"]):
            trace = self.indices[self.indices.labels == label]

            # TODO: Add more than one curves
            if (
                trace.height.max() - trace.height.min()
                > trace_params["region_thick_thresh"]
            ) & (self.ds.sza < trace_params["sza_thresh"]):
                logger.info(f"SZA: {self.ds.sza}")
                # Fit the duble gaussian curve
                logger.info("Identified region(s), fitting 2-parabola(s)")
                try:
                    logger.info("Identified region is too thick")
                    # Extract F1/F2 Properties
                    # Estimated theoritical values of median F1/2 height
                    med_hs = np.nanmedian(trace.height)
                    trace.loc[trace.height >= med_hs, "labels"] = new_label_start
                    for i, l in enumerate(trace.labels.unique()):
                        o = trace[trace.labels == l]
                        self.fit_parabola(o, l)
                    new_label_start += 1
                except Exception:
                    self.fit_parabola(trace, label)
                    logger.error(f"Issue in long traces: {traceback.format_exc()}")
            # Fit a parabolic curve
            else:
                self.fit_parabola(trace, label)
        if len(self.traces) > 0:
            self.ds.set_traces(self.traces, self.trace_params)
    return

draw_sanity_check_images(fname, cmap='Greens', xticks=[1.5, 2.0, 3.0, 5.0, 7.0, 10.0, 15.0, 20.0], xlabel='Frequency, MHz', ylabel='Virtual Height, km', prange=[5, 70], ylim=[50, 800], xlim=[1, 22], font_size=10, figsize=(6, 3), txt_color='w')

Generate a multi-panel figure showing intermediate scaling outputs.

Source code in pynasonde/vipir/ngi/scale.py
def draw_sanity_check_images(
    self,
    fname: str,
    cmap: str = "Greens",
    xticks: List[float] = [1.5, 2.0, 3.0, 5.0, 7.0, 10.0, 15.0, 20.0],
    xlabel: str = "Frequency, MHz",
    ylabel: str = "Virtual Height, km",
    prange: List[float] = [5, 70],
    ylim: List[float] = [50, 800],
    xlim: List[float] = [1, 22],
    font_size: float = 10,
    figsize: tuple = (6, 3),
    txt_color: str = "w",
):
    """Generate a multi-panel figure showing intermediate scaling outputs."""
    ion = Ionogram(
        fig_title=f"{self.ds.StationName.strip()} / {self.ds.time.strftime('%H:%M:%S UT %d %b %Y')} / {self.mode}-Mode",
        font_size=font_size,
        figsize=figsize,
    )
    ion.add_ionogram(
        self.frequency,
        self.height,
        getattr(self.ds, self.param_name),
        mode=self.mode,
        text="(a) Raw ionogram",
        xlabel="",
        ylabel=ylabel,
        cmap=cmap,
        del_ticks=False,
        ylim=ylim,
        xlim=xlim,
        prange=prange,
        xticks=xticks,
        txt_color=txt_color,
    )
    ion.add_ionogram(
        self.frequency,
        self.height,
        self.image2d,
        mode=self.mode,
        text="(b) Filtered ionogram",
        xlabel="",
        ylabel="",
        cmap=cmap,
        ylim=ylim,
        xlim=xlim,
        prange=prange,
        xticks=xticks,
        txt_color=txt_color,
    )
    ion.add_ionogram(
        self.frequency,
        self.height,
        self.filtered_2D_image,
        mode=self.mode,
        text="(c) Med-filtered ionogram",
        xlabel="",
        ylabel="",
        cmap=cmap,
        ylim=ylim,
        xlim=xlim,
        prange=prange,
        xticks=xticks,
        txt_color=txt_color,
    )
    ion.add_ionogram(
        self.frequency,
        self.height,
        self.binary_image,
        mode=self.mode,
        text="(d) Binary ionogram",
        xlabel=xlabel,
        ylabel=ylabel,
        cmap=cmap,
        ylim=ylim,
        xlim=xlim,
        xticks=xticks,
        del_ticks=False,
        prange=[0, 1],
        txt_color=txt_color,
    )

    ax = ion.add_ionogram(
        self.frequency,
        self.height,
        self.binary_image,
        mode=self.mode,
        text="(e) Trace",
        xlabel="",
        ylabel="",
        cmap=cmap,
        ylim=ylim,
        xlim=xlim,
        xticks=xticks,
        prange=[0, 1],
        txt_color=txt_color,
    )
    for key in list(self.traces.keys()):
        trace = self.traces[key]
        ax.plot(
            np.log10(trace["frequency"]),
            trace["height"],
            marker="o",
            ls="None",
            color=utils.get_color_by_index(key, len(self.traces.keys())),
            ms=0.3,
            zorder=5,
            alpha=0.6,
        )

    ax = ion.add_ionogram(
        self.frequency,
        self.height,
        self.binary_image,
        mode=self.mode,
        text="(f) Scaled",
        xlabel="",
        ylabel="",
        cmap=cmap,
        ylim=ylim,
        xlim=xlim,
        xticks=xticks,
        prange=[0, 1],
        txt_color=txt_color,
    )
    for key in list(self.trace_params.keys()):
        trace_info = self.trace_params[key]
        color = utils.get_color_by_index(key, len(self.traces.keys()))
        ax.axvline(
            np.log10(trace_info["fs"]),
            color=color,
            ls="--",
            lw=0.6,
            alpha=0.8,
            zorder=5,
        )
        ax.axhline(
            trace_info["hs"], color=color, ls="--", lw=0.6, alpha=0.8, zorder=5
        )
        trace = self.traces[key]
        ax.plot(
            np.log10(trace["frequency"]),
            trace["height"],
            marker="o",
            ls="None",
            color=utils.get_color_by_index(key, len(self.traces.keys())),
            ms=0.3,
            zorder=5,
            alpha=0.6,
        )
        ax.plot(
            np.log10(trace_info["frequency"]),
            trace_info["height"],
            color="yellow",
            ls="-",
            lw=1.2,
            zorder=8,
        )
    ion.save(fname)
    ion.close()
    logger.info(f"Save file to: {fname}")
    return