Source code for eclypse.builders.application.deathstarbench.social_network.mpi_services.text
"""MPI workflow for text parsing and enrichment."""
import re
from eclypse.remote.communication import mpi
from eclypse.remote.service import Service
from eclypse.utils import format_log_kv
_MENTION_RE = re.compile(r"@([a-zA-Z0-9_]+)")
_URL_RE = re.compile(r"https?://[^\\s]+")
[docs]
class TextService(Service):
"""Extract mentions and URLs from post text."""
[docs]
async def step(self):
"""Handle the next text-enrichment request."""
await self.compose_request() # pylint: disable=no-value-for-parameter
@mpi.exchange(receive=True, send=True)
def compose_request(self, _sender_id, body):
"""Parse post text and forward the workflow to mention resolution."""
self.logger.info("Received request | " + format_log_kv(request=body))
return "UserMentionService", {
**body,
"mentions": _MENTION_RE.findall(body["text"]),
"urls": _URL_RE.findall(body["text"]),
}