Instance anonymization (#10501)

### Description

Add instance anonymization - if `John Doe` will appear twice in the
text, it will be treated as the same entity.
The difference between `PresidioAnonymizer` and
`PresidioReversibleAnonymizer` is that only the second one has a
built-in memory, so it will remember anonymization mapping for multiple
texts:

```
>>> anonymizer = PresidioAnonymizer()
>>> anonymizer.anonymize("My name is John Doe. Hi John Doe!")
'My name is Noah Rhodes. Hi Noah Rhodes!'
>>> anonymizer.anonymize("My name is John Doe. Hi John Doe!")
'My name is Brett Russell. Hi Brett Russell!'
```
```
>>> anonymizer = PresidioReversibleAnonymizer()
>>> anonymizer.anonymize("My name is John Doe. Hi John Doe!")
'My name is Noah Rhodes. Hi Noah Rhodes!'
>>> anonymizer.anonymize("My name is John Doe. Hi John Doe!")
'My name is Noah Rhodes. Hi Noah Rhodes!'
```

### Twitter handle
@deepsense_ai / @MaksOpp

### Tag maintainer
@baskaryan @hwchase17 @hinthornw

---------

Co-authored-by: Bagatur <baskaryan@gmail.com>
This commit is contained in:
maks-operlejn-ds
2023-10-05 20:23:02 +02:00
committed by GitHub
parent 203258b4d6
commit 2aae1102b0
6 changed files with 462 additions and 150 deletions

View File

@@ -1,10 +1,26 @@
import re
from collections import defaultdict
from dataclasses import dataclass, field
from typing import Dict
from typing import Dict, List
from presidio_analyzer import RecognizerResult
from presidio_anonymizer.entities import EngineResult
MappingDataType = Dict[str, Dict[str, str]]
def format_duplicated_operator(operator_name: str, count: int) -> str:
"""Format the operator name with the count"""
clean_operator_name = re.sub(r"[<>]", "", operator_name)
clean_operator_name = re.sub(r"_\d+$", "", clean_operator_name)
if operator_name.startswith("<") and operator_name.endswith(">"):
return f"<{clean_operator_name}_{count}>"
else:
return f"{clean_operator_name}_{count}"
@dataclass
class DeanonymizerMapping:
mapping: MappingDataType = field(
@@ -17,5 +33,107 @@ class DeanonymizerMapping:
return {k: dict(v) for k, v in self.mapping.items()}
def update(self, new_mapping: MappingDataType) -> None:
"""Update the deanonymizer mapping with new values
Duplicated values will not be added
If there are multiple entities of the same type, the mapping will
include a count to differentiate them. For example, if there are
two names in the input text, the mapping will include NAME_1 and NAME_2.
"""
seen_values = set()
for entity_type, values in new_mapping.items():
self.mapping[entity_type].update(values)
count = len(self.mapping[entity_type]) + 1
for key, value in values.items():
if (
value not in seen_values
and value not in self.mapping[entity_type].values()
):
new_key = (
format_duplicated_operator(key, count)
if key in self.mapping[entity_type]
else key
)
self.mapping[entity_type][new_key] = value
seen_values.add(value)
count += 1
def create_anonymizer_mapping(
original_text: str,
analyzer_results: List[RecognizerResult],
anonymizer_results: EngineResult,
is_reversed: bool = False,
) -> MappingDataType:
"""Creates or updates the mapping used to anonymize and/or deanonymize text.
This method exploits the results returned by the
analysis and anonymization processes.
If is_reversed is True, it constructs a mapping from each original
entity to its anonymized value.
If is_reversed is False, it constructs a mapping from each
anonymized entity back to its original text value.
If there are multiple entities of the same type, the mapping will
include a count to differentiate them. For example, if there are
two names in the input text, the mapping will include NAME_1 and NAME_2.
Example of mapping:
{
"PERSON": {
"<original>": "<anonymized>",
"John Doe": "Slim Shady"
},
"PHONE_NUMBER": {
"111-111-1111": "555-555-5555"
}
...
}
"""
# We are able to zip and loop through both lists because we expect
# them to return corresponding entities for each identified piece
# of analyzable data from our input.
# We sort them by their 'start' attribute because it allows us to
# match corresponding entities by their position in the input text.
analyzer_results.sort(key=lambda d: d.start)
anonymizer_results.items.sort(key=lambda d: d.start)
mapping: MappingDataType = defaultdict(dict)
count: dict = defaultdict(int)
for analyzed, anonymized in zip(analyzer_results, anonymizer_results.items):
original_value = original_text[analyzed.start : analyzed.end]
entity_type = anonymized.entity_type
if is_reversed:
cond = original_value in mapping[entity_type].values()
else:
cond = original_value in mapping[entity_type]
if cond:
continue
if (
anonymized.text in mapping[entity_type].values()
or anonymized.text in mapping[entity_type]
):
anonymized_value = format_duplicated_operator(
anonymized.text, count[entity_type] + 2
)
count[entity_type] += 1
else:
anonymized_value = anonymized.text
mapping_key, mapping_value = (
(anonymized_value, original_value)
if is_reversed
else (original_value, anonymized_value)
)
mapping[entity_type][mapping_key] = mapping_value
return mapping

View File

@@ -1,7 +1,6 @@
from __future__ import annotations
import json
from collections import defaultdict
from pathlib import Path
from typing import TYPE_CHECKING, Callable, Dict, List, Optional, Union
@@ -14,6 +13,7 @@ from langchain_experimental.data_anonymizer.base import (
from langchain_experimental.data_anonymizer.deanonymizer_mapping import (
DeanonymizerMapping,
MappingDataType,
create_anonymizer_mapping,
)
from langchain_experimental.data_anonymizer.deanonymizer_matching_strategies import (
default_matching_strategy,
@@ -43,8 +43,7 @@ except ImportError as e:
) from e
if TYPE_CHECKING:
from presidio_analyzer import EntityRecognizer, RecognizerResult
from presidio_anonymizer.entities import EngineResult
from presidio_analyzer import EntityRecognizer
# Configuring Anonymizer for multiple languages
# Detailed description and examples can be found here:
@@ -69,6 +68,7 @@ class PresidioAnonymizerBase(AnonymizerBase):
analyzed_fields: Optional[List[str]] = None,
operators: Optional[Dict[str, OperatorConfig]] = None,
languages_config: Dict = DEFAULT_LANGUAGES_CONFIG,
add_default_faker_operators: bool = True,
faker_seed: Optional[int] = None,
):
"""
@@ -93,10 +93,9 @@ class PresidioAnonymizerBase(AnonymizerBase):
if analyzed_fields is not None
else list(get_pseudoanonymizer_mapping().keys())
)
self.operators = (
operators
if operators is not None
else {
if add_default_faker_operators:
self.operators = {
field: OperatorConfig(
operator_name="custom", params={"lambda": faker_function}
)
@@ -104,7 +103,11 @@ class PresidioAnonymizerBase(AnonymizerBase):
faker_seed
).items()
}
)
else:
self.operators = {}
if operators:
self.add_operators(operators)
provider = NlpEngineProvider(nlp_configuration=languages_config)
nlp_engine = provider.create_engine()
@@ -140,109 +143,13 @@ class PresidioAnonymizer(PresidioAnonymizerBase):
Each PII entity is replaced with a fake value.
Each time fake values will be different, as they are generated randomly.
Args:
text: text to anonymize
language: language to use for analysis of PII
If None, the first (main) language in the list
of languages specified in the configuration will be used.
"""
if language is None:
language = self.supported_languages[0]
if language not in self.supported_languages:
raise ValueError(
f"Language '{language}' is not supported. "
f"Supported languages are: {self.supported_languages}. "
"Change your language configuration file to add more languages."
)
results = self._analyzer.analyze(
text,
entities=self.analyzed_fields,
language=language,
)
return self._anonymizer.anonymize(
text,
analyzer_results=results,
operators=self.operators,
).text
class PresidioReversibleAnonymizer(PresidioAnonymizerBase, ReversibleAnonymizerBase):
def __init__(
self,
analyzed_fields: Optional[List[str]] = None,
operators: Optional[Dict[str, OperatorConfig]] = None,
languages_config: Dict = DEFAULT_LANGUAGES_CONFIG,
faker_seed: Optional[int] = None,
):
super().__init__(analyzed_fields, operators, languages_config, faker_seed)
self._deanonymizer_mapping = DeanonymizerMapping()
@property
def deanonymizer_mapping(self) -> MappingDataType:
"""Return the deanonymizer mapping"""
return self._deanonymizer_mapping.data
def _update_deanonymizer_mapping(
self,
original_text: str,
analyzer_results: List[RecognizerResult],
anonymizer_results: EngineResult,
) -> None:
"""Creates or updates the mapping used to de-anonymize text.
This method exploits the results returned by the
analysis and anonymization processes.
It constructs a mapping from each anonymized entity
back to its original text value.
Mapping will be stored as "deanonymizer_mapping" property.
Example of "deanonymizer_mapping":
{
"PERSON": {
"<anonymized>": "<original>",
"John Doe": "Slim Shady"
},
"PHONE_NUMBER": {
"111-111-1111": "555-555-5555"
}
...
}
"""
# We are able to zip and loop through both lists because we expect
# them to return corresponding entities for each identified piece
# of analyzable data from our input.
# We sort them by their 'start' attribute because it allows us to
# match corresponding entities by their position in the input text.
analyzer_results = sorted(analyzer_results, key=lambda d: d.start)
anonymizer_results.items = sorted(
anonymizer_results.items, key=lambda d: d.start
)
new_deanonymizer_mapping: MappingDataType = defaultdict(dict)
for analyzed_entity, anonymized_entity in zip(
analyzer_results, anonymizer_results.items
):
original_value = original_text[analyzed_entity.start : analyzed_entity.end]
new_deanonymizer_mapping[anonymized_entity.entity_type][
anonymized_entity.text
] = original_value
self._deanonymizer_mapping.update(new_deanonymizer_mapping)
def _anonymize(self, text: str, language: Optional[str] = None) -> str:
"""Anonymize text.
Each PII entity is replaced with a fake value.
Each time fake values will be different, as they are generated randomly.
At the same time, we will create a mapping from each anonymized entity
back to its original text value.
PresidioAnonymizer has no built-in memory -
so it will not remember the effects of anonymizing previous texts.
>>> anonymizer = PresidioAnonymizer()
>>> anonymizer.anonymize("My name is John Doe. Hi John Doe!")
'My name is Noah Rhodes. Hi Noah Rhodes!'
>>> anonymizer.anonymize("My name is John Doe. Hi John Doe!")
'My name is Brett Russell. Hi Brett Russell!'
Args:
text: text to anonymize
@@ -278,11 +185,104 @@ class PresidioReversibleAnonymizer(PresidioAnonymizerBase, ReversibleAnonymizerB
operators=self.operators,
)
self._update_deanonymizer_mapping(
text, filtered_analyzer_results, anonymizer_results
anonymizer_mapping = create_anonymizer_mapping(
text,
filtered_analyzer_results,
anonymizer_results,
)
return default_matching_strategy(text, anonymizer_mapping)
class PresidioReversibleAnonymizer(PresidioAnonymizerBase, ReversibleAnonymizerBase):
def __init__(
self,
analyzed_fields: Optional[List[str]] = None,
operators: Optional[Dict[str, OperatorConfig]] = None,
languages_config: Dict = DEFAULT_LANGUAGES_CONFIG,
add_default_faker_operators: bool = True,
faker_seed: Optional[int] = None,
):
super().__init__(
analyzed_fields,
operators,
languages_config,
add_default_faker_operators,
faker_seed,
)
self._deanonymizer_mapping = DeanonymizerMapping()
@property
def deanonymizer_mapping(self) -> MappingDataType:
"""Return the deanonymizer mapping"""
return self._deanonymizer_mapping.data
@property
def anonymizer_mapping(self) -> MappingDataType:
"""Return the anonymizer mapping
This is just the reverse version of the deanonymizer mapping."""
return {
key: {v: k for k, v in inner_dict.items()}
for key, inner_dict in self.deanonymizer_mapping.items()
}
def _anonymize(self, text: str, language: Optional[str] = None) -> str:
"""Anonymize text.
Each PII entity is replaced with a fake value.
Each time fake values will be different, as they are generated randomly.
At the same time, we will create a mapping from each anonymized entity
back to its original text value.
Thanks to the built-in memory, all previously anonymised entities
will be remembered and replaced by the same fake values:
>>> anonymizer = PresidioReversibleAnonymizer()
>>> anonymizer.anonymize("My name is John Doe. Hi John Doe!")
'My name is Noah Rhodes. Hi Noah Rhodes!'
>>> anonymizer.anonymize("My name is John Doe. Hi John Doe!")
'My name is Noah Rhodes. Hi Noah Rhodes!'
Args:
text: text to anonymize
language: language to use for analysis of PII
If None, the first (main) language in the list
of languages specified in the configuration will be used.
"""
if language is None:
language = self.supported_languages[0]
if language not in self.supported_languages:
raise ValueError(
f"Language '{language}' is not supported. "
f"Supported languages are: {self.supported_languages}. "
"Change your language configuration file to add more languages."
)
analyzer_results = self._analyzer.analyze(
text,
entities=self.analyzed_fields,
language=language,
)
return anonymizer_results.text
filtered_analyzer_results = (
self._anonymizer._remove_conflicts_and_get_text_manipulation_data(
analyzer_results
)
)
anonymizer_results = self._anonymizer.anonymize(
text,
analyzer_results=analyzer_results,
operators=self.operators,
)
new_deanonymizer_mapping = create_anonymizer_mapping(
text,
filtered_analyzer_results,
anonymizer_results,
is_reversed=True,
)
self._deanonymizer_mapping.update(new_deanonymizer_mapping)
return default_matching_strategy(text, self.anonymizer_mapping)
def _deanonymize(
self,