__author__ = "Vanessa Sochat"
__copyright__ = "Copyright 2016-2022, Vanessa Sochat"
__license__ = "MIT"
import math
import os
import random
import re
import sys
from typing import Optional
import matplotlib
import numpy
from numpy.typing import NDArray
from pydicom import read_file
from pydicom.pixel_data_handlers.util import get_expected_length
from deid.config import DeidRecipe
from deid.dicom import utils
from deid.logger import bot
from deid.utils import get_temporary_name
matplotlib.use("pdf")
from matplotlib import pyplot as plt # noqa
bot.level = 3
[docs]class DicomCleaner:
"""
Clean a dicom file of burned pixels.
take an input dicom file, check for burned pixels, and then clean,
with option to save / output in multiple formats. This object should
map to one dicom file, and the usage flow is the following:
cleaner = DicomCleaner()
summary = cleaner.detect(dicom_file)
cleaner.clean()
"""
def __init__(
self,
output_folder=None,
add_padding=False,
margin=3,
deid=None,
font=None,
force=True,
):
if output_folder is None:
output_folder = get_temporary_name(prefix="clean")
if font is None:
font = self.default_font()
self.font = font
self.cmap = "gray"
self.output_folder = output_folder
self.recipe = DeidRecipe(deid)
self.results = None
self.force = force
self.dicom_file: Optional[str] = None
self.cleaned: Optional[NDArray] = None
[docs] def default_font(self):
"""
Get the default font to use for a title.
define the font style for saving png figures
if a title is provided
"""
return {"family": "serif", "color": "darkred", "weight": "normal", "size": 16}
[docs] def detect(self, dicom_file):
"""
Initiate the cleaner for a new dicom file.
"""
from deid.dicom.pixels.detect import has_burned_pixels
self.results = has_burned_pixels(
dicom_file, deid=self.recipe.deid, force=self.force
)
self.dicom_file = dicom_file
return self.results
[docs] def clean(
self, fix_interpretation: bool = True, pixel_data_attribute: str = "PixelData"
) -> Optional[NDArray]:
if not self.results:
bot.warning(
"Use %s.detect() with a dicom file to find coordinates first." % self
)
return
bot.info("Scrubbing %s." % self.dicom_file)
self.cleaned = clean_pixel_data(
dicom_file=self.dicom_file,
results=self.results,
fix_interpretation=fix_interpretation,
pixel_data_attribute=pixel_data_attribute,
)
return self.cleaned
def _get_clean_name(self, output_folder, extension="dcm"):
"""
Get path to a cleaned output file.
Return a full path to an output file, with custom folder and
extension. If the output folder isn't yet created, make it.
Parameters
==========
output_folder: the output folder to create, will be created if doesn't
exist.
extension: the extension of the file to create a name for, should
not start with "."
"""
if output_folder is None:
output_folder = self.output_folder
if not os.path.exists(output_folder):
bot.debug("Creating output folder %s" % output_folder)
os.makedirs(output_folder)
basename = re.sub("[.]dicom|[.]dcm", "", os.path.basename(self.dicom_file))
return "%s/cleaned-%s.%s" % (output_folder, basename, extension)
[docs] def save_png(self, output_folder=None, image_type="cleaned", title=None):
"""
Save an original or cleaned dicom as png to disk.
Default image_format is "cleaned" and can be set to "original." If the
image was already clean (not flagged) the cleaned image is just a
copy of original. If a 4d image is provided, we save the dimension
specified (or if not provided, a randomly chosen dimension).
"""
if hasattr(self, image_type):
png_file = self._get_clean_name(output_folder, "png")
plt = self.get_figure(image_type=image_type, title=title)
plt.savefig(png_file)
plt.close()
return png_file
else:
bot.warning("use detect() --> clean() before saving is possible.")
[docs] def save_animation(self, output_folder=None, image_type="cleaned", title=None):
"""
Save an original or cleaned animation of a dicom.
If there are not enough frames, then save_png should be used instead.
"""
if hasattr(self, image_type):
from matplotlib import animation
animation.rcParams["animation.writer"] = "ffmpeg"
image = getattr(self, image_type)
# If we have rgb, choose a channel
if len(image.shape) == 4:
channel = random.choice(range(image.shape[3]))
bot.warning("Selecting channel %s for rendering" % channel)
image = image[:, :, :, channel]
# Now we expect 3D, we can animate one dimension over time
if len(image.shape) == 3:
movie_file = self._get_clean_name(output_folder, "mp4")
# First set up the figure, the axis, and the plot element we want to animate
fig, ax = plt.subplots(nrows=1, ncols=1, figsize=(10, 6))
plt.close()
ax.xlim = (0, image.shape[1])
ax.ylim = (0, image.shape[2])
ax.set_xticks([])
ax.set_yticks([])
img = ax.imshow(image[0, :, :].T, cmap="gray")
img.set_interpolation("nearest")
# The animation function should take an index i
def animate(i):
img.set_data(image[i, :, :].T)
sys.stdout.flush()
return (img,)
bot.info("Generating animation...")
anim = animation.FuncAnimation(
fig, animate, frames=image.shape[0], interval=50, blit=True
)
anim.save(
movie_file,
writer="ffmpeg",
fps=10,
dpi=100,
metadata={"title": title or "deid-animation"},
)
return movie_file
else:
bot.warning(
"save_animation() is only for 4D data. Use save_png instead."
)
else:
bot.warning("use detect() --> clean() before saving is possible.")
[docs] def save_dicom(self, output_folder=None, image_type="cleaned"):
"""
Save a cleaned dicom to disk.
We expose an option to save an original (change image_type to "original"
to be consistent, although this is not incredibly useful given it would
duplicate the original data.
"""
# Having clean also means has dicom image
if hasattr(self, image_type):
dicom_name = self._get_clean_name(output_folder)
dicom = read_file(self.dicom_file, force=True)
# If going from compressed, change TransferSyntax
if dicom.file_meta.TransferSyntaxUID.is_compressed is True:
dicom.decompress()
dicom.PixelData = self.cleaned.tobytes()
dicom.save_as(dicom_name)
return dicom_name
else:
bot.warning("use detect() --> clean() before saving is possible.")
[docs]def clean_pixel_data(
dicom_file,
results: dict,
fix_interpretation: bool = True,
pixel_data_attribute: str = "PixelData",
):
"""
Clean a dicom file.
take a dicom image and a list of pixel coordinates, and return
a cleaned file (if output file is specified) or simply plot
the cleaned result (if no file is specified)
Parameters
==========
dicom_file: (str or FileDataset instance) Dicom file to clean
results: Result of the .has_burned_pixels() method
fix_interpretation: fix the photometric interpretation if found off
pixel_data_attribute: PixelData attribute name in the dicom file
"""
cleaned = None
# Load in dicom file, and image data
dicom = utils.load_dicom(dicom_file)
pixel_data = getattr(dicom, pixel_data_attribute)
# Get expected and actual length of the pixel data (bytes, expected does not include trailing null byte)
expected_length = get_expected_length(dicom)
actual_length = len(pixel_data)
full_length = expected_length / 2 * 3 # upsampled data is a third larger
full_length += 1 if full_length % 2 else 0 # trailing padding byte if even length
# If we have YBR_FULL_2, must be RGB to obtain pixel data
if (
not dicom.file_meta.TransferSyntaxUID.is_compressed
and dicom.PhotometricInterpretation == "YBR_FULL_422"
and fix_interpretation
and actual_length >= full_length
):
bot.warning(
"Updating dicom.PhotometricInterpretation to RGB, set fix_interpretation to False to skip."
)
photometric_original = dicom.PhotometricInterpretation
dicom.PhotometricInterpretation = "RGB"
original = dicom.pixel_array
dicom.PhotometricInterpretation = photometric_original
else:
original = dicom.pixel_array
# Compile coordinates from result, generate list of tuples with coordinate and value
# keepcoordinates == 1 (included in mask) and coordinates == 0 (remove).
coordinates = []
for item in results["results"]:
# We iterate through coordinates in order specified in file
for coordinate_set in item.get("coordinates", []):
# Each is a list with [value, coordinate]
mask_value, new_coordinates = coordinate_set
if not isinstance(new_coordinates, list):
new_coordinates = [new_coordinates]
for new_coordinate in new_coordinates:
# Case 1: an "all" indicates applying to entire image
if new_coordinate.lower() == "all":
# 2D - Greyscale Image - Shape = (X, Y) OR 3D - RGB Image - Shape = (X, Y, Channel)
if len(original.shape) == 2 or (
len(original.shape) == 3 and dicom.SamplesPerPixel == 3
):
# minr, minc, maxr, maxc = [0, 0, Y, X]
new_coordinate = [
0,
0,
original.shape[1],
original.shape[0],
]
# 4D - RGB Cine Clip - Shape = (frames, X, Y, channel) OR 3D - Greyscale Cine Clip - Shape = (frames, X, Y)
if len(original.shape) == 4 or (
len(original.shape) == 3 and dicom.SamplesPerPixel == 1
):
new_coordinate = [
0,
0,
original.shape[2],
original.shape[1],
]
else:
new_coordinate = [int(x) for x in new_coordinate.split(",")]
coordinates.append(
(mask_value, new_coordinate)
) # [(1, [1,2,3,4]),...(0, [1,2,3,4])]
# Instead of writing directly to data, create a mask of 1s (start keeping all)
# For 4D RGB Cine - (frames, X, Y, channel) or 3D Greyscale Cine - (frames, X, Y)
if len(original.shape) == 4 or (
len(original.shape) == 3 and dicom.SamplesPerPixel == 1
):
mask = numpy.ones(original.shape[1:3], dtype=numpy.uint8)
# For 2D Greyscale image (X, Y) or 3D RGB Image (X, Y channel)
else:
mask = numpy.ones(original.shape[0:2], dtype=numpy.uint8)
# Here we apply the coordinates to the mask, 1==keep, 0==clean
for coordinate_value, coordinate in coordinates:
minr, minc, maxr, maxc = coordinate
# Update the mask: values set to 0 to be black
mask[minc:maxc, minr:maxr] = coordinate_value
# Now apply finished mask to the data
# RGB cine clip
if len(original.shape) == 4:
# np.tile does the copying and stacking of masks into the channel dim to produce 3D masks
# transposition to convert tile output (channel, X, Y) into (X, Y, channel)
# see: https://github.com/nquach/anonymize/blob/master/anonymize.py#L154
channel3mask = numpy.transpose(numpy.tile(mask, (3, 1, 1)), (1, 2, 0))
# use numpy.tile to copy and stack the 3D masks into 4D array to apply to 4D pixel data
# tile converts (X, Y, channels) -> (frames, X, Y, channels), presumed ordering for 4D pixel data
final_mask = numpy.tile(channel3mask, (original.shape[0], 1, 1, 1))
# apply final 4D mask to 4D pixel data
cleaned = final_mask * original
# RGB image or Greyscale cine clip
elif len(original.shape) == 3:
# This condition is ambiguous. If the image shape is 3, we may have a single frame RGB image: size (X, Y, channel)
# or a multiframe greyscale image: size (frames, X, Y). Interrogate the SamplesPerPixel field.
if dicom.SamplesPerPixel == 3:
# RGB Image
# Convert (X, Y) -> (X, Y, channel)
final_mask = numpy.transpose(
numpy.tile(mask, (original.shape[2], 1, 1)), (1, 2, 0)
)
else:
# Greyscale cine clip
# Convert (X, Y) -> (frames, X, Y)
final_mask = numpy.tile(mask, (original.shape[0], 1, 1))
# apply final 3D mask to 3D pixel data
cleaned = final_mask * original
# greyscale image: no need to stack into the channel dim since it doesn't exist
elif len(original.shape) == 2:
cleaned = mask * original
else:
bot.warning(
"Pixel array dimension %s is not recognized." % (str(original.shape))
)
return cleaned