__author__ = "Vanessa Sochat"
__copyright__ = "Copyright 2016-2022, Vanessa Sochat"
__license__ = "MIT"
# pylint: skip-file
import os
import re
import sys
from collections import OrderedDict
from deid.config.standards import (
actions,
filters,
formats,
group_actions,
groups,
sections,
)
from deid.data import data_base
from deid.logger import bot
from deid.utils import get_installdir, read_file
[docs]def load_combined_deid(deids):
"""load one or more deids, either based on a path or a tag
Parameters
==========
deids: should be a custom list of deids
"""
if not isinstance(deids, list):
bot.exit("load_combined_deids expects a list.")
found_format = None
deid = None
for single_deid in deids:
# If not a tag or path, returns None
next_deid = get_deid(tag=single_deid, exit_on_fail=False, quiet=True, load=True)
if next_deid is not None:
# Formats must match
if found_format is None:
found_format = next_deid["format"]
else:
if found_format != next_deid["format"]:
bot.exis(
"Mismatch in deid formats, %s and %s"
% (found_format, next_deid["format"])
)
# If it's the first one, use as starter template
if deid is None:
deid = next_deid
else:
# Update filter, appending to end to give first preference
if "filter" in next_deid:
if "filter" not in deid:
deid["filter"] = next_deid["filter"]
else:
for name, group in next_deid["filter"].items():
deid["filter"][name] = (
deid["filter"].get("name", []) + group
)
if "header" in next_deid:
deid["header"] = deid.get("header", []) + next_deid["header"]
else:
bot.warning("Problem loading %s, skipping." % single_deid)
return deid
[docs]def load_deid(path=None):
"""Load_deid will return a loaded in (user) deid configuration file.
This can be used to update a default config.json. If a file path is
specified, it is loaded directly. If a folder is specified, we look
for a deid file in the folder. If nothing is specified, we assume
the user wants to load a deid file in the present working directory.
If the user wants to have multiple deid files in a directory, this
can be done with an extension that specifies the module, eg;
deid.dicom
deid.nifti
Parameters
==========
path: a path to a deid file
Returns
=======
config: a parsed deid (dictionary) with valid sections
"""
path = find_deid(path)
# Read in spec, clean up extra spaces and newlines
spec = [
x.strip("\n").strip(" ")
for x in read_file(path)
if x.strip("\n").strip(" ") not in [""]
]
spec = [x for x in spec if x not in ["", None]]
config = OrderedDict()
section = None
while spec:
# Clean up white trailing/leading space
line = spec.pop(0).strip()
# Comment
if line.startswith("#"):
continue
# Set format
elif bool(re.match("^format", line, re.I)):
config["format"] = parse_format(line)
# A new section?
elif line.startswith("%"):
# Remove any comments
line = line.split("#", 1)[0].strip()
# Is there a section name?
section_name = None
parts = line.split(" ")
if len(parts) > 1:
section_name = " ".join(parts[1:])
section = re.sub("[%]|(\s+)", "", parts[0]).lower() # noqa
if section not in sections:
bot.exit("%s is not a valid section." % section)
config = add_section(
config=config, section=section, section_name=section_name
)
# A %fields action (only field allowed), %values allows split
elif line.upper().startswith(group_actions) and section in groups:
config = parse_group_action(
section=section, section_name=section_name, line=line, config=config
)
# An action (ADD, BLANK, JITTER, KEEP, REPLACE, REMOVE, LABEL)
elif line.upper().startswith(actions):
# Start of a filter group
if line.upper().startswith("LABEL") and section == "filter":
members = parse_filter_group(spec)
# Add the filter label to the config
config = parse_label(
config=config,
section=section,
label=line,
section_name=section_name,
members=members,
)
# Parse the action
else:
config = parse_config_action(
section=section, section_name=section_name, line=line, config=config
)
else:
bot.warning("%s not recognized to be in valid format, skipping." % line)
return config
[docs]def find_deid(path=None):
"""find_deid is a helper function to load_deid to find a deid file.
It can be in a folder, or return the path provided if it is the file.
Parameters
==========
path: a path on the filesystem. If not provided, will assume PWD.
"""
# A default deid will be loaded if all else fails
default_deid = os.path.join(get_installdir(), "data", "deid.dicom")
if path is None:
path = os.getcwd()
# The user has provided a directory
if os.path.isdir(path):
contenders = [
"%s/%s" % (path, x) for x in os.listdir(path) if x.startswith("deid")
]
if len(contenders) == 0:
bot.warning(
"No deid settings files found in %s, will use default dicom.deid."
% path
)
contenders.append(default_deid)
elif len(contenders) > 1:
bot.warning("Multiple deid files found in %s, will use first." % (path))
path = contenders[0]
# We have a file path at this point
if not os.path.exists(path):
bot.exit("Cannot find deid file %s, exiting." % (path))
return path
[docs]def parse_filter_group(spec):
"""given the specification (a list of lines) continue parsing lines
until the filter group ends, as indicated by the start of a new LABEL,
(case 1), the start of a new section (case 2) or the end of the spec
file (case 3). Returns a list of members (lines) that belong to the
filter group. The list (by way of using pop) is updated in the calling
function.
Parameters
==========
spec: unparsed lines of the deid recipe file
"""
members = []
keep_going = True
while keep_going and spec:
next_line = spec[0]
if next_line.upper().strip().startswith("LABEL"):
keep_going = False
elif next_line.upper().strip().startswith("%"):
keep_going = False
else:
new_member = spec.pop(0)
members.append(new_member)
return members
def _derive_ctp_coordinate(raw):
"""
Derive a ctp coordinate from a raw (comma separated) string.
A ctp coordinate is:
- the horizontal component of the left side
- the vertical component of the top
- the width
- the height
And we need to translate that into (xmin, ymin, xmax, ymax)
This largely means given that we have:
(--> (xmin), ^ (ymin), width, height)
This translates to (xmin, ymin, xmin+width, ymin+height)
Note that the first two values have no change, and the latter two are
derived by adding the width or height to the appropriate dimension.
"""
# Cut out early if we have an "all" to indicate the entire image
if raw == "all":
return raw
new_coordinate = [int(x) for x in raw.split(",") if x]
# Cut out early for malformed coordinate
if len(new_coordinate) != 4:
bot.exit("Coordinates are expected to have length of 4, found %s" % raw)
xmin, ymin, width, height = new_coordinate
new_coordinate[2] = xmin + width
new_coordinate[3] = ymin + height
# Translate CTP coordinate to the convention we use
return ",".join([str(i) for i in new_coordinate])
[docs]def parse_label(section, config, section_name, members, label=None):
"""
Add a named label to the filter section, including one or more criteria
Parameters
==========
section: the section name (e.g., header) must be one in sections
config: the config (dictionary) parsed thus far
section_name: an optional name for a section
members: the lines belonging to the section/section_name
label: an optional name for the group of commands
"""
criteria = {"filters": [], "coordinates": []}
if label is not None:
label = label.replace("label", "", 1).split("#")[0].strip()
criteria["name"] = label
while len(members) > 0:
member = members.pop(0).strip()
if member.lower().startswith("ctpcoordinates"):
coordinate = _derive_ctp_coordinate(
member.replace("ctpcoordinates", "").strip()
)
criteria["coordinates"].append([0, coordinate])
continue
elif member.lower().startswith("ctpkeepcoordinates"):
coordinate = _derive_ctp_coordinate(
member.replace("ctpkeepcoordinates", "").strip()
)
criteria["coordinates"].append([1, coordinate])
continue
# We have a coordinate line (coordinates to remove, mask 0)
elif member.lower().startswith("coordinates"):
coordinate = member.replace("coordinates", "").strip()
criteria["coordinates"].append([0, coordinate])
continue
# Coordinates to keep (mask 1)
elif member.lower().startswith("keepcoordinates"):
coordinate = member.replace("keepcoordinates", "").strip()
criteria["coordinates"].append([1, coordinate])
continue
operator = None
entry = None
if member.startswith("+"):
operator = "and"
member = member.replace("+", "", 1).strip()
elif member.startswith("||"):
operator = "or"
member = member.replace("||", "", 1).strip()
# Skip over comments
if member.startswith("#"):
continue
# Now that operators removed, parse member
if not member.lower().startswith(filters):
bot.warning("%s filter is not valid, skipping." % member.lower())
else:
# Returns single member with field, values, operator,
# Or if multiple or/and in statement, a list
entry = parse_member(member, operator)
if entry is not None:
criteria["filters"].append(entry.copy())
config[section][section_name].append(criteria)
return config
[docs]def parse_member(members, operator=None):
"""a parsing function for a filter member. Will return a single member
with fields, values, and an operator. In the case of multiple and/or
statements that are chained, will instead return a list.
"""
main_operator = operator
actions = []
values = []
fields = []
operators = []
members = [members]
while len(members) > 0:
operator = None
value = None
member = members.pop(0).strip()
# Find the first || or +
match_or = re.search("\|\|", member) # noqa
match_and = re.search("\+", member) # noqa
if match_or is not None:
operator = "||"
if match_and is not None:
if match_or is not None:
if match_or.start() >= match_and.start():
operator = "+"
else:
operator = "+"
if operator is not None:
member, rest = member.split(operator, 1)
# The rest is only valid if contains a filter statement
if any(word in rest for word in filters):
members.append(rest.strip())
# Split the statement based on found operator
operator = operator.replace("||", "or").replace("+", "and")
operators.append(operator)
else:
member = operator.join([member, rest])
# Parse the member
action, member = member.split(" ", 1)
action = action.lower().strip()
# Contains, notcontains, equals, not equals expects FieldName Values
if action in ["contains", "notcontains", "equals", "notequals"]:
try:
field, value = member.split(" ", 1)
except ValueError:
bot.exit(
"%s for line %s must have field and values, exiting."
% (action, member)
)
# Missing, empty, expect only a field
elif action in ["missing", "empty", "present"]:
field = member.strip()
else:
bot.exit("%s is not a valid filter action." % action)
actions.append(action)
fields.append(field.strip())
if value is not None:
values.append(value.strip())
entry = {
"action": actions,
"field": fields,
"operator": main_operator,
"InnerOperators": operators,
"value": values,
}
return entry
[docs]def add_section(config, section, section_name=None):
"""add section will add a section (and optionally)
section name to a config
Parameters
==========
config: the config (dict) parsed thus far
section: the section name to add
section_name: an optional name, added as a level
"""
if section is None:
bot.exit("You must define a section (e.g. %header) before any action.")
if section in ["filter", "values", "fields"] and section_name is None:
bot.exit("You must provide a name for a filter section.")
if section not in sections:
bot.exit("%s is not a valid section." % section)
if section not in config:
# If a section is named, we have more one level (dict)
if section_name is not None:
config[section] = OrderedDict()
config[section][section_name] = []
bot.debug("Adding section %s %s" % (section, section_name))
else:
config[section] = []
bot.debug("Adding section %s" % section)
return config
# Section is in config
if section_name is not None and section_name not in config[section]:
config[section][section_name] = []
return config
def _remove_comments(parts):
"""given a list of parts, and that the action and field are removed,
get the remainder of the line and clean up any trailing comments.
"""
value = " ".join(parts[0:]) # get remained of line
return value.split("#")[0] # remove comments
[docs]def parse_group_action(section, line, config, section_name):
"""parse a group action, either FIELD or SPLIT, which must belong to
either a fields or values section.
Parameters
=========
section: a valid section name from the deid config file
line: the line content to parse for the section/action
config: the growing/current config dictionary
section_name: optionally, a section name
"""
if not line.upper().startswith(group_actions):
bot.exit("%s is not a valid group action." % line)
if not line.upper().startswith("FIELD") and section == "fields":
bot.exit("%fields only supports FIELD actions.")
# We may have to deal with cases of spaces
bot.debug("%s: adding %s" % (section, line))
parts = line.split(" ")
action = parts.pop(0).replace(" ", "")
# Both require some parts
if not parts:
bot.exit("%s action %s requires additional arguments" % (section, action))
# For both, the second is always a field or field expander
field = parts.pop(0)
# Fields supports one or more fields with expanders (no third arguments)
if section == "fields":
config[section][section_name].append({"action": action, "field": field})
# Values supports FIELD or SPLIT
elif section == "values":
# If we have a third set of arguments
if parts:
value = _remove_comments(parts)
config[section][section_name].append(
{"action": action, "field": field, "value": value}
)
else:
config[section][section_name].append({"action": action, "field": field})
return config
[docs]def parse_config_action(section, line, config, section_name=None):
"""add action will take a line from a deid config file, a config (dictionary), and
an active section name (eg header) and add an entry to the config file to perform
the action.
Parameters
=========
section: a valid section name from the deid config file
line: the line content to parse for the section/action
config: the growing/current config dictionary
section_name: optionally, a section name
"""
if not line.upper().startswith(actions):
bot.exit("%s is not a valid action line." % line)
# We may have to deal with cases of spaces
parts = line.split(" ")
action = parts.pop(0).replace(" ", "")
# What field is the action for?
if len(parts) < 1:
bot.exit("%s requires a FIELD value, but not found." % action)
field = parts.pop(0)
# Actions that require a value
if action in ["ADD", "REPLACE", "JITTER"]:
if len(parts) == 0:
bot.exit("%s requires a VALUE, but not found" % action)
value = _remove_comments(parts)
bot.debug("%s: adding %s" % (section, line))
config[section].append({"action": action, "field": field, "value": value})
# Actions that can optionally have a value
elif action in ["REMOVE"]:
bot.debug("%s: adding %s" % (section, line))
# Case 1: removing without any criteria
if len(parts) == 0:
config[section].append({"action": action, "field": field})
# Case 2: REMOVE can have a func:is_thing to return boolean
else:
value = _remove_comments(parts)
config[section].append({"action": action, "field": field, "value": value})
# Actions that don't require a value
elif action in ["BLANK", "KEEP"]:
bot.debug("%s: adding %s" % (section, line))
config[section].append({"action": action, "field": field})
return config
[docs]def get_deid(tag=None, exit_on_fail=True, quiet=False, load=False):
"""get deid is intended to retrieve the full path of a deid file provided with
the software, based on a tag. For example, under deid/data if a file is called
"deid.dicom", the tag would be "dicom".
Parameters
==========
tag: the text that comes after deid to indicate the tag of the file in deid/data
exit_on_fail: if None is an acceptable return value, this should be set to False
(default is True).
quiet: Default False. If None is acceptable, quiet can be set to True
load: also load the deid, if resulting path (from path or tag) is not None
"""
# no tag/path means load default
if tag is None:
tag = "dicom"
# If it's already loaded
if isinstance(tag, dict):
bot.debug("deid is already loaded.")
return tag
# If it's a path, get full path
if os.path.exists(tag):
deid = os.path.abspath(tag)
else:
deid = "%s/deid.%s" % (data_base, tag)
if not os.path.exists(deid):
if quiet is False:
bot.error("Cannot find %s" % (deid))
if exit_on_fail is True:
sys.exit(1)
else:
return None
if load is True:
return load_deid(deid)
return deid