Compare commits

...

4 Commits

Author SHA1 Message Date
Eugene Yurtsev
cd9e7d330b x 2023-10-29 21:55:12 -04:00
Guy Halfon
cbf0cdacdd fixed formatting / linting issues 2023-10-26 18:38:26 +03:00
Guy Halfon
aa3dea0b1c Refactor GmailGetMessage for linting and encoding support
- Broke long lines to adhere to the 88-character limit.
- Integrated decode_payload function to handle various encodings.
- Improved readability and error handling for email decoding.
2023-10-26 18:19:36 +03:00
halfag
f7d416fe89 Update search.py
Enhance email decoding to handle multiple charsets

- Integrated a decode_payload function to handle various encodings.
- Gracefully handle decoding errors by trying a list of common encodings.
- Default to UTF-8 with character replacement for unknown characters.
2023-10-26 17:34:04 +03:00
2 changed files with 114 additions and 6 deletions

View File

@@ -1,6 +1,6 @@
import base64
import email
from typing import Dict, Optional, Type
from typing import Dict, Optional, Sequence, Type
from langchain.callbacks.manager import CallbackManagerForToolRun
from langchain.pydantic_v1 import BaseModel, Field
@@ -17,6 +17,48 @@ class SearchArgsSchema(BaseModel):
)
def _decode_payload(
payload: bytes,
charset: Optional[str],
fallback_encodings: Sequence[str],
*,
allow_utf8_replace: bool = True
) -> str:
"""Decode a payload using a list of encodings.
Args:
payload: The payload to decode.
charset: The charset to use for decoding. If None, will try utf-8 first.
fallback_encodings: A list of encodings to try in order.
allow_utf8_replace: The error handling scheme to use for decoding. For example,
"replace" will replace unknown characters with a question mark.
Decodes using utf-8 with replacement for unknown characters.
This is used as a fallback when all other encodings fail.
Returns:
The decoded payload if decoding was successful, otherwise will attempt
to decode using utf-8 with replacement for unknown characters.
"""
if charset:
all_encodings = [charset] + list(fallback_encodings)
else:
all_encodings = list(fallback_encodings)
for encoding in all_encodings:
try:
return payload.decode(encoding=encoding)
except UnicodeDecodeError:
pass
# Default to utf-8 with replacement for unknown characters
if allow_utf8_replace:
return payload.decode("utf-8", errors="ignore")
else:
raise UnicodeDecodeError(
"Unable to decode payload using any of the specified encodings."
)
class GmailGetMessage(GmailBaseTool):
"""Tool that gets a message by ID from Gmail."""
@@ -26,6 +68,22 @@ class GmailGetMessage(GmailBaseTool):
" Returns the thread ID, snippet, body, subject, and sender."
)
args_schema: Type[SearchArgsSchema] = SearchArgsSchema
fallback_encodings: Sequence[str] = (
"utf-8",
"latin1",
"iso-8859-1",
"cp1252",
)
"""Encodings to try when decoding the email body if the charset is not specified.
The decoder will first try the charset specified in the email. If that fails
it'll try the encodings specified in this tuple.
Finally, if all else fails, it'll try utf-8 with replacement for unknown characters.
"""
allow_utf8_replace: bool = True
"""If true, and decoding using fallback encodings fails, try utf-8 with replacement
for unknown characters."""
def _run(
self,
@@ -33,6 +91,7 @@ class GmailGetMessage(GmailBaseTool):
run_manager: Optional[CallbackManagerForToolRun] = None,
) -> Dict:
"""Run the tool."""
query = (
self.api_resource.users()
.messages()
@@ -49,13 +108,37 @@ class GmailGetMessage(GmailBaseTool):
message_body = ""
if email_msg.is_multipart():
for part in email_msg.walk():
# Get the content type of the current email part
# This could be "text/plain" or "text/html" or in theory,
# any other MIME type.
# If the email is not plain text, we ignore it.
ctype = part.get_content_type()
# Get the content disposition
# The content disposition indicates whether the part is an inline
# part or whether it is an attachment.
cdispo = str(part.get("Content-Disposition"))
# Currently this code does not handle non text/plain content types
# and attachments.
if ctype == "text/plain" and "attachment" not in cdispo:
message_body = part.get_payload(decode=True).decode("utf-8")
charset = part.get_content_charset() # Extract charset
message_body = _decode_payload(
part.get_payload(decode=True),
charset,
# Try the current charset, then utf-8, then latin1, then cp1252
self.fallback_encodings,
allow_utf8_replace=self.allow_utf8_replace,
)
break
else:
message_body = email_msg.get_payload(decode=True).decode("utf-8")
# The email message is not multipart
# Get the charset and payload
charset = email_msg.get_content_charset()
message_body = _decode_payload(
email_msg.get_payload(decode=True),
charset,
self.fallback_encodings,
allow_utf8_replace=self.allow_utf8_replace,
)
body = clean_email_body(message_body)

View File

@@ -74,6 +74,26 @@ class GmailSearch(GmailBaseTool):
return results
def _parse_messages(self, messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
def decode_payload(payload: bytes, charset: Optional[str] = None) -> str:
encodings_to_try = [
"utf-8",
"latin1",
"iso-8859-1",
"cp1252",
] # Add more if needed
if charset:
encodings_to_try.insert(0, charset)
for encoding in encodings_to_try:
try:
return payload.decode(encoding)
except UnicodeDecodeError:
pass
# Default to utf-8 with replacement for unknown characters
return payload.decode("utf-8", "replace")
results = []
for message in messages:
message_id = message["id"]
@@ -85,7 +105,6 @@ class GmailSearch(GmailBaseTool):
)
raw_message = base64.urlsafe_b64decode(message_data["raw"])
email_msg = email.message_from_bytes(raw_message)
subject = email_msg["Subject"]
@@ -97,10 +116,16 @@ class GmailSearch(GmailBaseTool):
ctype = part.get_content_type()
cdispo = str(part.get("Content-Disposition"))
if ctype == "text/plain" and "attachment" not in cdispo:
message_body = part.get_payload(decode=True).decode("utf-8")
charset = part.get_content_charset() # Extract charset
message_body = decode_payload(
part.get_payload(decode=True), charset
)
break
else:
message_body = email_msg.get_payload(decode=True).decode("utf-8")
charset = email_msg.get_content_charset()
message_body = decode_payload(
email_msg.get_payload(decode=True), charset
)
body = clean_email_body(message_body)