__author__ = "Vanessa Sochat"
__copyright__ = "Copyright 2016-2022, Vanessa Sochat"
__license__ = "MIT"
import os
import re
from copy import deepcopy
from pydicom import read_file
from pydicom.dataelem import DataElement
from pydicom.dataset import Dataset
from pydicom.tag import Tag
from deid.config import DeidRecipe
from deid.config.standards import actions as valid_actions
from deid.dicom.actions import deid_funcs, jitter_timestamp
from deid.dicom.fields import DicomField, expand_field_expression, get_fields
from deid.dicom.groups import extract_fields_list, extract_values_list
from deid.dicom.tags import add_tag, get_private, get_tag, remove_sequences
from deid.dicom.utils import save_dicom
from deid.logger import bot
from deid.utils import parse_value, read_json
here = os.path.dirname(os.path.abspath(__file__))
[docs]class DicomParser:
"""
Parse a dicom, performing one or more actions on fields.
A dicom parser serves as a cache to read in all fields from a dicom
file. For each, we store the element and child elements
"""
"""
list of fields to exclude from remove because they are replaced and jittered
"""
_excluded_fields = None
def __init__(
self, dicom_file, recipe=None, config=None, force=True, disable_skip=False
):
"""
Create new instance of DicomParser
:param dicom_file: Path to a dicom file or instance of a pydicom.Dataset
:param recipe: a deid recipe, defaults to None
:param config: deid config, defaults to None
:param force: ignore errors when reading a dicom file, defaults to True
:param disable_skip: _description_, defaults to False
"""
# Lookup for the dicom
self.lookup = {}
# Will be a list of DicomField
self.fields = {}
# Disable skip will load ALL fields, even those protected
self.disable_skip = disable_skip
# Load default configuration, or a custom one
config = config or os.path.join(here, "config.json")
if not os.path.exists(config):
bot.error("Cannot find config %s, exiting" % (config))
self.config = read_json(config, ordered_dict=True)
# Keep a lookup of deid provided functions
self.deid_funcs = deid_funcs
# Deid can be a recipe or filename
if not isinstance(recipe, DeidRecipe):
recipe = DeidRecipe(recipe)
self.load(dicom_file, force=force)
self.recipe = recipe
def __str__(self):
return "[dicom-parser:%s]" % self.dicom_name
def __repr__(self):
return self.__str__()
[docs] def load(self, dicom_file, force=True):
"""
Load the dicom file.
Ensure that the dicom file exists, and use full path. Here
we load the file, and save the dicom, dicom_file, and dicom_name.
"""
# Reset seen, which is generated when we parse
self.seen = []
# The user might already have provided a dataset
if isinstance(dicom_file, Dataset):
self.dicom = dicom_file
else:
# If we must read the file, the path must exist
if not os.path.exists(dicom_file):
bot.exit("%s does not exist." % dicom_file)
self.dicom = read_file(dicom_file, force=force)
# Set class variables that might be helpful later
df = self.dicom.get("filename")
self.dicom_file = None if not df else os.path.abspath(df)
self.dicom_name = None if not df else os.path.basename(self.dicom_file)
[docs] def define(self, name, value):
"""
Add a function or variable to the lookup for later usage.
This can be used for functions, lists, or variables.
"""
self.lookup[name] = value
[docs] def reset_preamble(self):
"""reset the preamble"""
# We aren't including preamble, we will reset to be empty 128 bytes
self.dicom.preamble = b"\0" * 128
[docs] def get_nested_field(self, field, return_parent=False):
"""
Retrieve a nested field.
Based on a DicomField, return the one referenced in self.dicom.
If a delete is needed, then the parent should be returned as well.
"""
# The field provided will be last in the list, the one we want
# It is not be nested because fields are stored flat
uids = field.uid.split("__")
# Keep a reference to where we are in dicom (can nest)
if field.is_filemeta:
parent = self.dicom.file_meta
else:
parent = self.dicom # dicom is of type Dataset
desired = field.element.tag
while uids:
uid = uids.pop(0)
# If if's a uid for a tag, has parens
if re.search("[(]|[)]", uid):
group, element = [
x.strip() for x in re.sub("[(]|[)]", "", uid).split(",")
]
group = int(group, 16)
element = int(element, 16)
tag = Tag(group, element)
# We keep going until we find the desired tag
if tag != desired:
# If the parent has been removed, we can't continue
if tag not in parent:
return None, desired
parent = parent[tag]
# Otherwise it's an index into a sequence
else:
# If the parent has been removed, we can't continue
if not int(uid) in parent:
return None, desired
parent = parent[int(uid)]
if return_parent:
return parent, desired
return desired
[docs] def delete_field(self, field):
"""
Delete a field from the dicom.
We do this by way of parsing all nested levels of a tag into actual tags,
and deleting the child node.
"""
# Returns the parent, and a DataElement (indexes into parent by tag)
parent, desired = self.get_nested_field(field, return_parent=True)
if parent and desired in parent:
del parent[desired]
del self.fields[field.uid]
[docs] def blank_field(self, field):
"""
Blank a field
"""
element = self.get_nested_field(field)
# Assert we have a data element, and can blank a string
if element:
if not isinstance(element, DataElement):
bot.warning("Issue parsing %s as a DataElement, not blanked." % field)
elif element.VR in ["US", "SS"]:
element.value = ""
else:
bot.warning("Unrecognized VR for %s, skipping blank." % field)
[docs] def replace_field(self, field, value):
"""
Replace a value in a field.
This uses the same function as ADD, but likely the dicom has the value.
"""
self.add_field(field, value)
[docs] def parse(self, strip_sequences=False, remove_private=False):
"""
Parse the dicom.
The parse action corresponds to iterating through fields, and
for each one, saving a data structure with the full element,
the string (with nested representation of the keywords)
and the tag. We want to save all three in a flat list that is
easy to search over, and also build up actions for the lookup
on the first parsing.
"""
# Remove sequences first, maintained in DataStore
if strip_sequences is True:
remove_sequences(self.dicom)
# Remove private tags at the onset, if requested
if remove_private:
self.remove_private()
# In the parsing, we generate a list of DicomField objects.
fields = self.get_fields(expand_sequences=True)
# if we loaded a deid recipe
if self.recipe.deid is not None:
# Prepare additional lists of values and lookup fields (index by nested uid)
if self.recipe.has_values_lists():
for group, actions in self.recipe.get_values_lists().items():
self.lookup[group] = extract_values_list(
dicom=self.dicom, actions=actions, fields=fields
)
if self.recipe.has_fields_lists():
for group, actions in self.recipe.get_fields_lists().items():
self.lookup[group] = extract_fields_list(
dicom=self.dicom, actions=actions, fields=fields
)
# actions on the header
for action in self.recipe.get_actions():
self.perform_action(
field=action.get("field"),
value=action.get("value"),
action=action.get("action"),
)
# Next perform actions in default config, only if not done
for action in self.config["put"]["actions"]:
self.perform_action(
field=action.get("field"),
value=action.get("value"),
action=action.get("action"),
)
# At this point the self.dicom should be updated fully
# The user can save, or take other action
[docs] def save(self, filename, overwrite=False):
"""
Save a dicom to file.
"""
filename = filename or self.dicom_file
ds = save_dicom(
dicom=self.dicom,
dicom_file=os.path.basename(filename),
output_folder=os.path.dirname(filename),
overwrite=overwrite,
)
return ds
@property
def skip(self):
"""
Return a list of fields to skip, as defined in the self.config
"""
skips = []
if self.config and not self.disable_skip:
skips = self.config.get("get", {}).get("skip", {})
return skips
@property
def keep(self):
"""
Return a list of fields to keep original, as defined by all KEEP actions in recipe
Those fields are not impacted by REPLACE/JITTER actions
"""
keeps = []
if self.recipe.deid is not None:
keeps = [
action.get("field")
for action in self.recipe.get_actions(action="KEEP")
if action and action.get("field")
]
return keeps
@property
def excluded_from_deletion(self):
"""
Return once-evaluated list of fields that are not removed by REMOVE ALL or REMOVE SomeField,
as they later have to be changed by REPLACE / JITTER
That allows whitelisting fields from REMOVE ALL/SomeField to change them if needed (i.e. obfuscation)
"""
if self._excluded_fields is None:
self._excluded_fields = []
if self.recipe.deid is not None:
self._excluded_fields = [
action.get("field")
for action in (
self.recipe.get_actions(action="JITTER")
+ self.recipe.get_actions(action="REPLACE")
)
if action and action.get("field")
]
return self._excluded_fields
[docs] def get_fields(self, expand_sequences=True):
"""expand all dicom fields into a list, where each entry is
a DicomField. If we find a sequence, we unwrap it and
represent the location with the name (e.g., Sequence__Child)
"""
if not self.fields:
self.fields = get_fields(
dicom=self.dicom,
expand_sequences=expand_sequences,
seen=self.seen,
skip=self.skip + self.keep,
)
return self.fields
[docs] def find_by_values(self, values):
"""
Find fields by values.
Given a list of values, find fields in the dicom that contain any
of those values, as determined by a regular expression search.
"""
# Values must be strings
values = [str(x) for x in values]
fields = {}
if values:
# Create single regular expression to search by
regexp = "(%s)" % "|".join(values)
for uid, field in self.fields.items():
if field.value_contains(regexp):
fields[uid] = field
else:
bot.warning("Empty values list encountered. No fields will be identified.")
return fields
[docs] def find_by_name(self, name):
"""
Find fields by name.
Given a string, find all field objects that contain the name.
Name can correspond to:
- a string of the tag, with or without the parens and comma/space
- a keyword
- a field name
"""
fields = {}
# Create single regular expression to search by
for uid, field in self.fields.items():
if field.name_contains(name):
fields[uid] = field
return fields
# Actions
[docs] def add_field(self, field, value):
"""
Add a field to the dicom.
If it's already present, update the value.
"""
value = parse_value(
item=self.lookup,
value=value,
field=field,
dicom=self.dicom,
funcs=self.deid_funcs,
)
# The addition will be different depending on if we have filemeta
is_filemeta = False
# Helper function to update dicom
def update_dicom(element, is_filemeta):
if is_filemeta:
self.dicom.file_meta.add(element)
else:
self.dicom.add(element)
# Assume we don't want to add an empty value
if value is not None:
# If provided a field object, create based on keyword or tag identifier
name = field
if isinstance(field, DicomField):
name = field.element.keyword or field.stripped_tag
# Generate a tag item, add if it's a name found in the dicom dictionary
tag = get_tag(name)
# Second try, it might be a private (or other numerical) string identifier
if not tag:
tag = add_tag(name)
if tag:
uid = getattr(field, "uid", None) or str(tag["tag"])
# For a replacement, this is likely
if uid in self.fields:
element = self.fields[uid]
if element.is_filemeta:
is_filemeta = True
# Nested fields
while not hasattr(element, "value"):
element = element.element
element.value = value
# Add either to file meta or dicom directly
update_dicom(element, is_filemeta)
else:
element = DataElement(tag["tag"], tag["VR"], value)
is_filemeta = str(element.tag).startswith("(0002")
update_dicom(element, is_filemeta)
self.fields[uid] = DicomField(element, name, uid, is_filemeta)
else:
bot.warning("Cannot find tag for field %s, skipping." % name)
def _run_action(self, field, action, value=None):
"""
Underlying function to run an action.
perform_action (above) typically is called using a loaded deid,
and _run_addition is typically done via an addition in a config
Both result in a call to this function. If an action fails or is not
done, None is returned, and the calling function should handle this.
"""
# Blank the value
if action == "BLANK":
self.blank_field(field)
# Unlikely to be called from perform_action
elif action == "ADD":
self.add_field(field, value)
# Code the value with something in the response
elif action == "REPLACE":
self.replace_field(field, value)
# Code the value with something in the response
elif action == "JITTER":
value = parse_value(
item=self.lookup,
dicom=self.dicom,
value=value,
field=field,
funcs=self.deid_funcs,
)
if value is not None:
# Jitter the field by the supplied value
new_val = jitter_timestamp(field=field, value=value)
if new_val not in [None, ""]:
self.replace_field(field, new_val)
else:
bot.warning("JITTER %s unsuccessful" % field)
# elif "KEEP" --> Do nothing. Keep the original
# Remove the field entirely
elif action == "REMOVE":
# If a value is defined, parse it (could be filter)
do_removal = True
if value is not None:
do_removal = parse_value(
item=self.lookup,
dicom=self.dicom,
value=value,
field=field,
funcs=self.deid_funcs,
)
if do_removal is True and field.name not in self.excluded_from_deletion:
self.delete_field(field)
[docs] def remove_private(self):
"""
Remove private tags from the loaded dicom
"""
try:
self.dicom.remove_private_tags()
except Exception:
bot.error(
"""Private tags for %s could not be completely removed, usually
this is due to invalid data type. Removing others."""
% self.dicom_name
)
for ptag in get_private(self.dicom):
del self.dicom[ptag.tag]