Data deanonymization (#10093)

### Description

The feature for pseudonymizing data with ability to retrieve original
text (deanonymization) has been implemented. In order to protect private
data, such as when querying external APIs (OpenAI), it is worth
pseudonymizing sensitive data to maintain full privacy. But then, after
the model response, it would be good to have the data in the original
form.

I implemented the `PresidioReversibleAnonymizer`, which consists of two
parts:

1. anonymization - it works the same way as `PresidioAnonymizer`, plus
the object itself stores a mapping of made-up values to original ones,
for example:
```
    {
        "PERSON": {
            "<anonymized>": "<original>",
            "John Doe": "Slim Shady"
        },
        "PHONE_NUMBER": {
            "111-111-1111": "555-555-5555"
        }
        ...
    }
```

2. deanonymization - using the mapping described above, it matches fake
data with original data and then substitutes it.

Between anonymization and deanonymization user can perform different
operations, for example, passing the output to LLM.

### Future works

- **instance anonymization** - at this point, each occurrence of PII is
treated as a separate entity and separately anonymized. Therefore, two
occurrences of the name John Doe in the text will be changed to two
different names. It is therefore worth introducing support for full
instance detection, so that repeated occurrences are treated as a single
object.
- **better matching and substitution of fake values for real ones** -
currently the strategy is based on matching full strings and then
substituting them. Due to the indeterminism of language models, it may
happen that the value in the answer is slightly changed (e.g. *John Doe*
-> *John* or *Main St, New York* -> *New York*) and such a substitution
is then no longer possible. Therefore, it is worth adjusting the
matching for your needs.
- **Q&A with anonymization** - when I'm done writing all the
functionality, I thought it would be a cool resource in documentation to
write a notebook about retrieval from documents using anonymization. An
iterative process, adding new recognizers to fit the data, lessons
learned and what to look out for

### Twitter handle
@deepsense_ai / @MaksOpp

---------

Co-authored-by: MaksOpp <maks.operlejn@gmail.com>
Co-authored-by: Bagatur <baskaryan@gmail.com>
This commit is contained in:
maks-operlejn-ds
2023-09-07 06:33:24 +02:00
committed by GitHub
parent 67696fe3ba
commit 4cc4534d81
9 changed files with 989 additions and 81 deletions

View File

@@ -1,4 +1,7 @@
"""Data anonymizer package"""
from langchain_experimental.data_anonymizer.presidio import PresidioAnonymizer
from langchain_experimental.data_anonymizer.presidio import (
PresidioAnonymizer,
PresidioReversibleAnonymizer,
)
__all__ = ["PresidioAnonymizer"]
__all__ = ["PresidioAnonymizer", "PresidioReversibleAnonymizer"]

View File

@@ -15,3 +15,17 @@ class AnonymizerBase(ABC):
@abstractmethod
def _anonymize(self, text: str) -> str:
"""Abstract method to anonymize text"""
class ReversibleAnonymizerBase(AnonymizerBase):
"""
Base abstract class for reversible anonymizers.
"""
def deanonymize(self, text: str) -> str:
"""Deanonymize text"""
return self._deanonymize(text)
@abstractmethod
def _deanonymize(self, text: str) -> str:
"""Abstract method to deanonymize text"""

View File

@@ -0,0 +1,21 @@
from collections import defaultdict
from dataclasses import dataclass, field
from typing import Dict
MappingDataType = Dict[str, Dict[str, str]]
@dataclass
class DeanonymizerMapping:
mapping: MappingDataType = field(
default_factory=lambda: defaultdict(lambda: defaultdict(str))
)
@property
def data(self) -> MappingDataType:
"""Return the deanonymizer mapping"""
return {k: dict(v) for k, v in self.mapping.items()}
def update(self, new_mapping: MappingDataType) -> None:
for entity_type, values in new_mapping.items():
self.mapping[entity_type].update(values)

View File

@@ -0,0 +1,17 @@
from langchain_experimental.data_anonymizer.presidio import MappingDataType
def default_matching_strategy(text: str, deanonymizer_mapping: MappingDataType) -> str:
"""
Default matching strategy for deanonymization.
It replaces all the anonymized entities with the original ones.
Args:
text: text to deanonymize
deanonymizer_mapping: mapping between anonymized entities and original ones"""
# Iterate over all the entities (PERSON, EMAIL_ADDRESS, etc.)
for entity_type in deanonymizer_mapping:
for anonymized, original in deanonymizer_mapping[entity_type].items():
text = text.replace(anonymized, original)
return text

View File

@@ -1,8 +1,8 @@
import string
from typing import Callable, Dict
from typing import Callable, Dict, Optional
def get_pseudoanonymizer_mapping() -> Dict[str, Callable]:
def get_pseudoanonymizer_mapping(seed: Optional[int] = None) -> Dict[str, Callable]:
try:
from faker import Faker
except ImportError as e:
@@ -11,6 +11,7 @@ def get_pseudoanonymizer_mapping() -> Dict[str, Callable]:
) from e
fake = Faker()
fake.seed_instance(seed)
# Listed entities supported by Microsoft Presidio (for now, global and US only)
# Source: https://microsoft.github.io/presidio/supported_entities/

View File

@@ -1,24 +1,56 @@
from __future__ import annotations
from typing import TYPE_CHECKING, Dict, List, Optional
import json
from collections import defaultdict
from pathlib import Path
from typing import TYPE_CHECKING, Callable, Dict, List, Optional, Union
from langchain_experimental.data_anonymizer.base import AnonymizerBase
import yaml
from langchain_experimental.data_anonymizer.base import (
AnonymizerBase,
ReversibleAnonymizerBase,
)
from langchain_experimental.data_anonymizer.deanonymizer_mapping import (
DeanonymizerMapping,
MappingDataType,
)
from langchain_experimental.data_anonymizer.deanonymizer_matching_strategies import (
default_matching_strategy,
)
from langchain_experimental.data_anonymizer.faker_presidio_mapping import (
get_pseudoanonymizer_mapping,
)
if TYPE_CHECKING:
from presidio_analyzer import EntityRecognizer
try:
from presidio_analyzer import AnalyzerEngine
except ImportError as e:
raise ImportError(
"Could not import presidio_analyzer, please install with "
"`pip install presidio-analyzer`. You will also need to download a "
"spaCy model to use the analyzer, e.g. "
"`python -m spacy download en_core_web_lg`."
) from e
try:
from presidio_anonymizer import AnonymizerEngine
from presidio_anonymizer.entities import OperatorConfig
except ImportError as e:
raise ImportError(
"Could not import presidio_anonymizer, please install with "
"`pip install presidio-anonymizer`."
) from e
if TYPE_CHECKING:
from presidio_analyzer import EntityRecognizer, RecognizerResult
from presidio_anonymizer.entities import EngineResult
class PresidioAnonymizer(AnonymizerBase):
"""Anonymizer using Microsoft Presidio."""
class PresidioAnonymizerBase(AnonymizerBase):
def __init__(
self,
analyzed_fields: Optional[List[str]] = None,
operators: Optional[Dict[str, OperatorConfig]] = None,
faker_seed: Optional[int] = None,
):
"""
Args:
@@ -28,25 +60,10 @@ class PresidioAnonymizer(AnonymizerBase):
Operators allow for custom anonymization of detected PII.
Learn more:
https://microsoft.github.io/presidio/tutorial/10_simple_anonymization/
faker_seed: Seed used to initialize faker.
Defaults to None, in which case faker will be seeded randomly
and provide random values.
"""
try:
from presidio_analyzer import AnalyzerEngine
except ImportError as e:
raise ImportError(
"Could not import presidio_analyzer, please install with "
"`pip install presidio-analyzer`. You will also need to download a "
"spaCy model to use the analyzer, e.g. "
"`python -m spacy download en_core_web_lg`."
) from e
try:
from presidio_anonymizer import AnonymizerEngine
from presidio_anonymizer.entities import OperatorConfig
except ImportError as e:
raise ImportError(
"Could not import presidio_anonymizer, please install with "
"`pip install presidio-anonymizer`."
) from e
self.analyzed_fields = (
analyzed_fields
if analyzed_fields is not None
@@ -59,13 +76,41 @@ class PresidioAnonymizer(AnonymizerBase):
field: OperatorConfig(
operator_name="custom", params={"lambda": faker_function}
)
for field, faker_function in get_pseudoanonymizer_mapping().items()
for field, faker_function in get_pseudoanonymizer_mapping(
faker_seed
).items()
}
)
self._analyzer = AnalyzerEngine()
self._anonymizer = AnonymizerEngine()
def add_recognizer(self, recognizer: EntityRecognizer) -> None:
"""Add a recognizer to the analyzer
Args:
recognizer: Recognizer to add to the analyzer.
"""
self._analyzer.registry.add_recognizer(recognizer)
self.analyzed_fields.extend(recognizer.supported_entities)
def add_operators(self, operators: Dict[str, OperatorConfig]) -> None:
"""Add operators to the anonymizer
Args:
operators: Operators to add to the anonymizer.
"""
self.operators.update(operators)
class PresidioAnonymizer(PresidioAnonymizerBase):
def _anonymize(self, text: str) -> str:
"""Anonymize text.
Each PII entity is replaced with a fake value.
Each time fake values will be different, as they are generated randomly.
Args:
text: text to anonymize
"""
results = self._analyzer.analyze(
text,
entities=self.analyzed_fields,
@@ -78,11 +123,185 @@ class PresidioAnonymizer(AnonymizerBase):
operators=self.operators,
).text
def add_recognizer(self, recognizer: EntityRecognizer) -> None:
"""Add a recognizer to the analyzer"""
self._analyzer.registry.add_recognizer(recognizer)
self.analyzed_fields.extend(recognizer.supported_entities)
def add_operators(self, operators: Dict[str, OperatorConfig]) -> None:
"""Add operators to the anonymizer"""
self.operators.update(operators)
class PresidioReversibleAnonymizer(PresidioAnonymizerBase, ReversibleAnonymizerBase):
def __init__(
self,
analyzed_fields: Optional[List[str]] = None,
operators: Optional[Dict[str, OperatorConfig]] = None,
faker_seed: Optional[int] = None,
):
super().__init__(analyzed_fields, operators, faker_seed)
self._deanonymizer_mapping = DeanonymizerMapping()
@property
def deanonymizer_mapping(self) -> MappingDataType:
"""Return the deanonymizer mapping"""
return self._deanonymizer_mapping.data
def _update_deanonymizer_mapping(
self,
original_text: str,
analyzer_results: List[RecognizerResult],
anonymizer_results: EngineResult,
) -> None:
"""Creates or updates the mapping used to de-anonymize text.
This method exploits the results returned by the
analysis and anonymization processes.
It constructs a mapping from each anonymized entity
back to its original text value.
Mapping will be stored as "deanonymizer_mapping" property.
Example of "deanonymizer_mapping":
{
"PERSON": {
"<anonymized>": "<original>",
"John Doe": "Slim Shady"
},
"PHONE_NUMBER": {
"111-111-1111": "555-555-5555"
}
...
}
"""
# We are able to zip and loop through both lists because we expect
# them to return corresponding entities for each identified piece
# of analyzable data from our input.
# We sort them by their 'start' attribute because it allows us to
# match corresponding entities by their position in the input text.
analyzer_results = sorted(analyzer_results, key=lambda d: d.start)
anonymizer_results.items = sorted(
anonymizer_results.items, key=lambda d: d.start
)
new_deanonymizer_mapping: MappingDataType = defaultdict(dict)
for analyzed_entity, anonymized_entity in zip(
analyzer_results, anonymizer_results.items
):
original_value = original_text[analyzed_entity.start : analyzed_entity.end]
new_deanonymizer_mapping[anonymized_entity.entity_type][
anonymized_entity.text
] = original_value
self._deanonymizer_mapping.update(new_deanonymizer_mapping)
def _anonymize(self, text: str) -> str:
"""Anonymize text.
Each PII entity is replaced with a fake value.
Each time fake values will be different, as they are generated randomly.
At the same time, we will create a mapping from each anonymized entity
back to its original text value.
Args:
text: text to anonymize
"""
analyzer_results = self._analyzer.analyze(
text,
entities=self.analyzed_fields,
language="en",
)
filtered_analyzer_results = (
self._anonymizer._remove_conflicts_and_get_text_manipulation_data(
analyzer_results
)
)
anonymizer_results = self._anonymizer.anonymize(
text,
analyzer_results=analyzer_results,
operators=self.operators,
)
self._update_deanonymizer_mapping(
text, filtered_analyzer_results, anonymizer_results
)
return anonymizer_results.text
def _deanonymize(
self,
text_to_deanonymize: str,
deanonymizer_matching_strategy: Callable[
[str, MappingDataType], str
] = default_matching_strategy,
) -> str:
"""Deanonymize text.
Each anonymized entity is replaced with its original value.
This method exploits the mapping created during the anonymization process.
Args:
text_to_deanonymize: text to deanonymize
deanonymizer_matching_strategy: function to use to match
anonymized entities with their original values and replace them.
"""
if not self._deanonymizer_mapping:
raise ValueError(
"Deanonymizer mapping is empty.",
"Please call anonymize() and anonymize some text first.",
)
text_to_deanonymize = deanonymizer_matching_strategy(
text_to_deanonymize, self.deanonymizer_mapping
)
return text_to_deanonymize
def save_deanonymizer_mapping(self, file_path: Union[Path, str]) -> None:
"""Save the deanonymizer mapping to a JSON or YAML file.
Args:
file_path: Path to file to save the mapping to.
Example:
.. code-block:: python
anonymizer.save_deanonymizer_mapping(file_path="path/mapping.json")
"""
save_path = Path(file_path)
if save_path.suffix not in [".json", ".yaml"]:
raise ValueError(f"{save_path} must have an extension of .json or .yaml")
# Make sure parent directories exist
save_path.parent.mkdir(parents=True, exist_ok=True)
if save_path.suffix == ".json":
with open(save_path, "w") as f:
json.dump(self.deanonymizer_mapping, f, indent=2)
elif save_path.suffix == ".yaml":
with open(save_path, "w") as f:
yaml.dump(self.deanonymizer_mapping, f, default_flow_style=False)
def load_deanonymizer_mapping(self, file_path: Union[Path, str]) -> None:
"""Load the deanonymizer mapping from a JSON or YAML file.
Args:
file_path: Path to file to load the mapping from.
Example:
.. code-block:: python
anonymizer.load_deanonymizer_mapping(file_path="path/mapping.json")
"""
load_path = Path(file_path)
if load_path.suffix not in [".json", ".yaml"]:
raise ValueError(f"{load_path} must have an extension of .json or .yaml")
if load_path.suffix == ".json":
with open(load_path, "r") as f:
loaded_mapping = json.load(f)
elif load_path.suffix == ".yaml":
with open(load_path, "r") as f:
loaded_mapping = yaml.load(f, Loader=yaml.FullLoader)
self._deanonymizer_mapping.update(loaded_mapping)

View File

@@ -0,0 +1,154 @@
import os
from typing import Iterator, List
import pytest
@pytest.fixture(scope="module", autouse=True)
def check_spacy_model() -> Iterator[None]:
import spacy
if not spacy.util.is_package("en_core_web_lg"):
pytest.skip(reason="Spacy model 'en_core_web_lg' not installed")
yield
@pytest.mark.requires("presidio_analyzer", "presidio_anonymizer", "faker")
@pytest.mark.parametrize(
"analyzed_fields,should_contain",
[(["PERSON"], False), (["PHONE_NUMBER"], True), (None, False)],
)
def test_anonymize(analyzed_fields: List[str], should_contain: bool) -> None:
"""Test anonymizing a name in a simple sentence"""
from langchain_experimental.data_anonymizer import PresidioReversibleAnonymizer
text = "Hello, my name is John Doe."
anonymizer = PresidioReversibleAnonymizer(analyzed_fields=analyzed_fields)
anonymized_text = anonymizer.anonymize(text)
assert ("John Doe" in anonymized_text) == should_contain
@pytest.mark.requires("presidio_analyzer", "presidio_anonymizer", "faker")
def test_anonymize_multiple() -> None:
"""Test anonymizing multiple items in a sentence"""
from langchain_experimental.data_anonymizer import PresidioReversibleAnonymizer
text = "John Smith's phone number is 313-666-7440 and email is johnsmith@gmail.com"
anonymizer = PresidioReversibleAnonymizer()
anonymized_text = anonymizer.anonymize(text)
for phrase in ["John Smith", "313-666-7440", "johnsmith@gmail.com"]:
assert phrase not in anonymized_text
@pytest.mark.requires("presidio_analyzer", "presidio_anonymizer", "faker")
def test_anonymize_with_custom_operator() -> None:
"""Test anonymize a name with a custom operator"""
from presidio_anonymizer.entities import OperatorConfig
from langchain_experimental.data_anonymizer import PresidioReversibleAnonymizer
custom_operator = {"PERSON": OperatorConfig("replace", {"new_value": "<name>"})}
anonymizer = PresidioReversibleAnonymizer(operators=custom_operator)
text = "Jane Doe was here."
anonymized_text = anonymizer.anonymize(text)
assert anonymized_text == "<name> was here."
@pytest.mark.requires("presidio_analyzer", "presidio_anonymizer", "faker")
def test_add_recognizer_operator() -> None:
"""
Test add recognizer and anonymize a new type of entity and with a custom operator
"""
from presidio_analyzer import PatternRecognizer
from presidio_anonymizer.entities import OperatorConfig
from langchain_experimental.data_anonymizer import PresidioReversibleAnonymizer
anonymizer = PresidioReversibleAnonymizer(analyzed_fields=[])
titles_list = ["Sir", "Madam", "Professor"]
custom_recognizer = PatternRecognizer(
supported_entity="TITLE", deny_list=titles_list
)
anonymizer.add_recognizer(custom_recognizer)
# anonymizing with custom recognizer
text = "Madam Jane Doe was here."
anonymized_text = anonymizer.anonymize(text)
assert anonymized_text == "<TITLE> Jane Doe was here."
# anonymizing with custom recognizer and operator
custom_operator = {"TITLE": OperatorConfig("replace", {"new_value": "Dear"})}
anonymizer.add_operators(custom_operator)
anonymized_text = anonymizer.anonymize(text)
assert anonymized_text == "Dear Jane Doe was here."
@pytest.mark.requires("presidio_analyzer", "presidio_anonymizer", "faker")
def test_deanonymizer_mapping() -> None:
"""Test if deanonymizer mapping is correctly populated"""
from langchain_experimental.data_anonymizer import PresidioReversibleAnonymizer
anonymizer = PresidioReversibleAnonymizer(
analyzed_fields=["PERSON", "PHONE_NUMBER", "EMAIL_ADDRESS", "CREDIT_CARD"]
)
anonymizer.anonymize("Hello, my name is John Doe and my number is 444 555 6666.")
# ["PERSON", "PHONE_NUMBER"]
assert len(anonymizer.deanonymizer_mapping.keys()) == 2
assert "John Doe" in anonymizer.deanonymizer_mapping.get("PERSON", {}).values()
assert (
"444 555 6666"
in anonymizer.deanonymizer_mapping.get("PHONE_NUMBER", {}).values()
)
text_to_anonymize = (
"And my name is Jane Doe, my email is jane@gmail.com and "
"my credit card is 4929 5319 6292 5362."
)
anonymizer.anonymize(text_to_anonymize)
# ["PERSON", "PHONE_NUMBER", "EMAIL_ADDRESS", "CREDIT_CARD"]
assert len(anonymizer.deanonymizer_mapping.keys()) == 4
assert "Jane Doe" in anonymizer.deanonymizer_mapping.get("PERSON", {}).values()
assert (
"jane@gmail.com"
in anonymizer.deanonymizer_mapping.get("EMAIL_ADDRESS", {}).values()
)
assert (
"4929 5319 6292 5362"
in anonymizer.deanonymizer_mapping.get("CREDIT_CARD", {}).values()
)
@pytest.mark.requires("presidio_analyzer", "presidio_anonymizer", "faker")
def test_deanonymize() -> None:
"""Test deanonymizing a name in a simple sentence"""
from langchain_experimental.data_anonymizer import PresidioReversibleAnonymizer
text = "Hello, my name is John Doe."
anonymizer = PresidioReversibleAnonymizer(analyzed_fields=["PERSON"])
anonymized_text = anonymizer.anonymize(text)
deanonymized_text = anonymizer.deanonymize(anonymized_text)
assert deanonymized_text == text
@pytest.mark.requires("presidio_analyzer", "presidio_anonymizer", "faker")
def test_save_load_deanonymizer_mapping() -> None:
from langchain_experimental.data_anonymizer import PresidioReversibleAnonymizer
anonymizer = PresidioReversibleAnonymizer(analyzed_fields=["PERSON"])
anonymizer.anonymize("Hello, my name is John Doe.")
try:
anonymizer.save_deanonymizer_mapping("test_file.json")
assert os.path.isfile("test_file.json")
anonymizer = PresidioReversibleAnonymizer()
anonymizer.load_deanonymizer_mapping("test_file.json")
assert "John Doe" in anonymizer.deanonymizer_mapping.get("PERSON", {}).values()
finally:
os.remove("test_file.json")