Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 27 additions & 41 deletions slack_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,38 +27,39 @@

def _compact_attachment(a: dict) -> dict:
"""Strip a Slack attachment to LLM-essential fields."""
ca = {}
if a.get("text"):
ca["text"] = a["text"]
if a.get("fallback"):
ca["fallback"] = a["fallback"]
if a.get("author_name"):
ca["author_name"] = a["author_name"]
if a.get("title"):
ca["title"] = a["title"]
return ca
return {k: a[k] for k in ("text", "fallback", "author_name", "title") if a.get(k)}


def _extract_block_text(blocks: list) -> str:
"""Extract plain text from Slack Block Kit blocks.

When a message uses Block Kit, the top-level `text` field is often empty
and all content lives in `blocks`. This extracts a readable text fallback.

Rich-text blocks nest arbitrarily — rich_text_list, rich_text_quote, and
rich_text_section wrap their content in further `elements` arrays — so the
inner walker recurses into any container that exposes `elements`.
"""
parts = []

def _walk_rich_text(elements):
for item in elements:
item_type = item.get("type", "")
if item_type == "text":
parts.append(item.get("text", ""))
elif item_type == "link":
parts.append(item.get("url", ""))
elif item_type == "user":
parts.append(f"<@{item.get('user_id', '')}>")
elif item_type == "channel":
parts.append(f"<#{item.get('channel_id', '')}>")
elif item.get("elements"):
_walk_rich_text(item["elements"])

for block in blocks:
block_type = block.get("type", "")
if block_type == "rich_text":
for element in block.get("elements", []):
for item in element.get("elements", []):
if item.get("type") == "text":
parts.append(item.get("text", ""))
elif item.get("type") == "link":
parts.append(item.get("url", ""))
elif item.get("type") == "user":
parts.append(f"<@{item.get('user_id', '')}>")
elif item.get("type") == "channel":
parts.append(f"<#{item.get('channel_id', '')}>")
_walk_rich_text(block.get("elements", []))
elif block_type == "section":
text_obj = block.get("text", {})
if isinstance(text_obj, dict) and text_obj.get("text"):
Expand All @@ -84,14 +85,9 @@ def _compact_message(msg: dict) -> dict:
"user": msg.get("user", msg.get("bot_id", "")),
"ts": msg.get("ts", ""),
}
if msg.get("username"):
result["username"] = msg["username"]
if msg.get("thread_ts"):
result["thread_ts"] = msg["thread_ts"]
if msg.get("reply_count"):
result["reply_count"] = msg["reply_count"]
if msg.get("subtype"):
result["subtype"] = msg["subtype"]
for key in ("username", "thread_ts", "reply_count", "subtype"):
if msg.get(key):
result[key] = msg[key]
if msg.get("edited"):
result["edited"] = True
if msg.get("reactions"):
Expand Down Expand Up @@ -120,19 +116,9 @@ def _compact_user(user: dict) -> dict:
"deleted": user.get("deleted", False),
}
profile = user.get("profile", {})
if profile:
display_name = profile.get("display_name", "")
if display_name:
result["display_name"] = display_name
title = profile.get("title", "")
if title:
result["title"] = title
status_text = profile.get("status_text", "")
if status_text:
result["status_text"] = status_text
email = profile.get("email", "")
if email:
result["email"] = email
for field in ("display_name", "title", "status_text", "email"):
if val := profile.get(field, ""):
result[field] = val
Comment thread
cursor[bot] marked this conversation as resolved.
return result


Expand Down
Loading