diff --git a/README.md b/README.md index c7bec55..8ae360b 100644 --- a/README.md +++ b/README.md @@ -39,9 +39,14 @@ docker run -d --name VodokanalBot \ -e HOME_ASSISTANT_WEBHOOK_ID='CHANGE_ME_RANDOM_WEBHOOK_ID' \ -e HOME_ASSISTANT_URL='http://homeassistant.local:8123' \ -e HOME_ASSISTANT_WEBHOOK_CHANNEL='0' \ + -e MESHCORE_MESSAGE_LIMIT_BYTES='133' \ + -e MESHCORE_CYR2LAT_MODE='soft' \ + -e MESHCORE_CHUNK_DELAY_MS='0' \ ghcr.io/serega404/vodokanalbot-meshcore-ha:main ``` +Перед отправкой в MeshCore текст транслитерируется и делится на части по `MESHCORE_MESSAGE_LIMIT_BYTES` байт. По умолчанию используется лимит `133` и мягкий режим `MESHCORE_CYR2LAT_MODE=soft`, где заменяются только похожие буквы. Для полной транслитерации укажи `MESHCORE_CYR2LAT_MODE=full`, для отключения транслитерации — `MESHCORE_CYR2LAT_MODE=off`. Задержку между частями можно задать через `MESHCORE_CHUNK_DELAY_MS` в миллисекундах, по умолчанию `0`. + ### Запуск в Docker Compose Укажи `HOME_ASSISTANT_WEBHOOK_ID` и `HOME_ASSISTANT_WEBHOOK_CHANNEL` в [`docker-compose.ha.yml`](./docker-compose.ha.yml), затем запусти: diff --git a/cyr2lat.py b/cyr2lat.py new file mode 100644 index 0000000..2c9b39f --- /dev/null +++ b/cyr2lat.py @@ -0,0 +1,92 @@ +FULL_CYR2LAT = { + 'а': 'a', + 'б': 'b', + 'в': 'v', + 'г': 'g', + 'д': 'd', + 'е': 'e', + 'ё': 'yo', + 'ж': 'zh', + 'з': 'z', + 'и': 'i', + 'й': 'y', + 'к': 'k', + 'л': 'l', + 'м': 'm', + 'н': 'n', + 'о': 'o', + 'п': 'p', + 'р': 'r', + 'с': 's', + 'т': 't', + 'у': 'u', + 'ф': 'f', + 'х': 'h', + 'ц': 'ts', + 'ч': 'ch', + 'ш': 'sh', + 'щ': 'sch', + 'ъ': '', + 'ы': 'y', + 'ь': '', + 'э': 'e', + 'ю': 'yu', + 'я': 'ya', +} + +SOFT_CYR2LAT = { + 'а': 'a', + 'е': 'e', + 'о': 'o', + 'р': 'p', + 'с': 'c', + 'у': 'y', + 'х': 'x', + 'А': 'A', + 'В': 'B', + 'Е': 'E', + 'К': 'K', + 'М': 'M', + 'Н': 'H', + 'О': 'O', + 'Р': 'P', + 'С': 'C', + 'Т': 'T', + 'Х': 'X', +} + +CYR2LAT_MODES = { + 'full': FULL_CYR2LAT, + 'off': None, + 'soft': SOFT_CYR2LAT, +} + + +def apply_case(replacement, char): + if char.isupper(): + return replacement.capitalize() + + return replacement + + +def cyr2lat(message, mode='full'): + if mode not in CYR2LAT_MODES: + raise ValueError("Unknown cyr2lat mode: " + mode) + + if mode == 'off': + return message + + mapping = CYR2LAT_MODES[mode] + result = [] + for char in message: + if mode == 'soft': + result.append(mapping.get(char, char)) + continue + + replacement = mapping.get(char.lower()) + if replacement is None: + result.append(char) + else: + result.append(apply_case(replacement, char)) + + return ''.join(result) diff --git a/docker-compose.ha.yml b/docker-compose.ha.yml index 9834d66..5dfcf35 100644 --- a/docker-compose.ha.yml +++ b/docker-compose.ha.yml @@ -12,6 +12,9 @@ services: HOME_ASSISTANT_URL: https://your-home-assistant-url HOME_ASSISTANT_WEBHOOK_ID: CHANGE_ME_RANDOM_WEBHOOK_ID HOME_ASSISTANT_WEBHOOK_CHANNEL: "0" + MESHCORE_MESSAGE_LIMIT_BYTES: "133" + MESHCORE_CYR2LAT_MODE: soft + MESHCORE_CHUNK_DELAY_MS: "0" # VODOKANAL_URL: http://www.tgnvoda.ru/avarii.php # PROXY_URL: socks5h://user:password@proxy-host:1080 # PROXY_URL: http://user:password@proxy-host:3128 diff --git a/start_meshcore_ha.py b/start_meshcore_ha.py index 9bce9c3..7433388 100644 --- a/start_meshcore_ha.py +++ b/start_meshcore_ha.py @@ -1,5 +1,7 @@ import os +import time +from cyr2lat import cyr2lat from parser import create_session, publish_new_posts @@ -7,6 +9,13 @@ PROXY_URL = os.environ.get('PROXY_URL', '') HOME_ASSISTANT_URL = os.environ.get('HOME_ASSISTANT_URL', '') HOME_ASSISTANT_WEBHOOK_ID = os.environ.get('HOME_ASSISTANT_WEBHOOK_ID', '') HOME_ASSISTANT_WEBHOOK_CHANNEL = os.environ.get('HOME_ASSISTANT_WEBHOOK_CHANNEL', '0') +MESHCORE_MESSAGE_LIMIT_BYTES_ENV = 'MESHCORE_MESSAGE_LIMIT_BYTES' +MESHCORE_CYR2LAT_MODE_ENV = 'MESHCORE_CYR2LAT_MODE' +MESHCORE_CHUNK_DELAY_MS_ENV = 'MESHCORE_CHUNK_DELAY_MS' +DEFAULT_MESHCORE_MESSAGE_LIMIT_BYTES = 133 +DEFAULT_MESHCORE_CYR2LAT_MODE = 'soft' +DEFAULT_MESHCORE_CHUNK_DELAY_MS = 0 +MIN_MESHCORE_MESSAGE_LIMIT_BYTES = 50 def create_webhook_url(): @@ -17,6 +26,138 @@ def create_webhook_url(): ) +def byte_length(text): + return len(text.encode("utf-8")) + + +def parse_meshcore_message_limit(): + raw_limit = os.environ.get( + MESHCORE_MESSAGE_LIMIT_BYTES_ENV, + str(DEFAULT_MESHCORE_MESSAGE_LIMIT_BYTES), + ) + try: + limit = int(raw_limit) + except ValueError: + raise ValueError(MESHCORE_MESSAGE_LIMIT_BYTES_ENV + " must be an integer") + + if limit < MIN_MESHCORE_MESSAGE_LIMIT_BYTES: + raise ValueError( + MESHCORE_MESSAGE_LIMIT_BYTES_ENV + + " must be at least " + + str(MIN_MESHCORE_MESSAGE_LIMIT_BYTES) + + " bytes" + ) + + return limit + + +def parse_meshcore_cyr2lat_mode(): + mode = os.environ.get(MESHCORE_CYR2LAT_MODE_ENV, DEFAULT_MESHCORE_CYR2LAT_MODE) + if mode not in ('full', 'off', 'soft'): + raise ValueError(MESHCORE_CYR2LAT_MODE_ENV + " must be 'full', 'off' or 'soft'") + + return mode + + +def parse_meshcore_chunk_delay_ms(): + raw_delay = os.environ.get( + MESHCORE_CHUNK_DELAY_MS_ENV, + str(DEFAULT_MESHCORE_CHUNK_DELAY_MS), + ) + try: + delay = int(raw_delay) + except ValueError: + raise ValueError(MESHCORE_CHUNK_DELAY_MS_ENV + " must be an integer") + + if delay < 0: + raise ValueError(MESHCORE_CHUNK_DELAY_MS_ENV + " must be zero or greater") + + return delay + + +def split_long_word(word, limit): + chunks = [] + chunk = '' + + for char in word: + if byte_length(char) > limit: + raise ValueError("Single character does not fit MeshCore message limit") + + candidate = chunk + char + if byte_length(candidate) <= limit: + chunk = candidate + continue + + chunks.append(chunk) + chunk = char + + if chunk: + chunks.append(chunk) + + return chunks + + +def split_text_by_byte_limit(message, limit): + words = message.split() + if not words: + return [message] if byte_length(message) <= limit else split_long_word(message, limit) + + chunks = [] + chunk = '' + + for word in words: + if byte_length(word) > limit: + if chunk: + chunks.append(chunk) + chunk = '' + chunks.extend(split_long_word(word, limit)) + continue + + candidate = word if not chunk else chunk + ' ' + word + if byte_length(candidate) <= limit: + chunk = candidate + continue + + chunks.append(chunk) + chunk = word + + if chunk: + chunks.append(chunk) + + return chunks + + +def max_chunk_prefix_length(total_digits): + max_number = '9' * total_digits + return byte_length("[" + max_number + "/" + max_number + "] ") + + +def split_meshcore_message(message, limit): + if byte_length(message) <= limit: + return [message] + + total_digits = 1 + while True: + prefix_length = max_chunk_prefix_length(total_digits) + if prefix_length >= limit: + raise ValueError("MeshCore message limit is too small for chunk prefixes") + + chunks = split_text_by_byte_limit(message, limit - prefix_length) + next_total_digits = len(str(len(chunks))) + if next_total_digits == total_digits: + total = len(chunks) + return [ + "[" + str(index) + "/" + str(total) + "] " + chunk + for index, chunk in enumerate(chunks, start=1) + ] + + total_digits = next_total_digits + + +def prepare_meshcore_messages(message, limit, cyr2lat_mode=DEFAULT_MESHCORE_CYR2LAT_MODE): + return split_meshcore_message(cyr2lat(message, mode=cyr2lat_mode), limit) + + def send_webhook_message(session, message): req = session.get( create_webhook_url(), @@ -32,6 +173,20 @@ def send_webhook_message(session, message): print("Home Assistant webhook message sent") +def send_meshcore_message( + session, + message, + limit, + cyr2lat_mode=DEFAULT_MESHCORE_CYR2LAT_MODE, + chunk_delay_ms=DEFAULT_MESHCORE_CHUNK_DELAY_MS, +): + parts = prepare_meshcore_messages(message, limit, cyr2lat_mode) + for index, part in enumerate(parts): + send_webhook_message(session, part) + if chunk_delay_ms > 0 and index < len(parts) - 1: + time.sleep(chunk_delay_ms / 1000) + + def main(): if HOME_ASSISTANT_URL == '': print("Home Assistant URL is not set") @@ -41,13 +196,27 @@ def main(): print("Home Assistant webhook id is not set") exit() + try: + message_limit = parse_meshcore_message_limit() + cyr2lat_mode = parse_meshcore_cyr2lat_mode() + chunk_delay_ms = parse_meshcore_chunk_delay_ms() + except ValueError as error: + print(error) + exit() + session = create_session(PROXY_URL) try: publish_new_posts( - send_message=lambda message: send_webhook_message(session, message), + send_message=lambda message: send_meshcore_message( + session, + message, + message_limit, + cyr2lat_mode, + chunk_delay_ms, + ), session=session, ) - except RuntimeError as error: + except (RuntimeError, ValueError) as error: print(error) exit() diff --git a/tests/test_start_meshcore_ha.py b/tests/test_start_meshcore_ha.py new file mode 100644 index 0000000..25a693e --- /dev/null +++ b/tests/test_start_meshcore_ha.py @@ -0,0 +1,151 @@ +import contextlib +import io +import unittest +from unittest.mock import patch + +from cyr2lat import cyr2lat +from start_meshcore_ha import ( + parse_meshcore_cyr2lat_mode, + parse_meshcore_chunk_delay_ms, + parse_meshcore_message_limit, + prepare_meshcore_messages, + send_meshcore_message, +) + + +class FakeResponse: + status_code = 200 + + +class FakeSession: + def __init__(self): + self.messages = [] + + def get(self, url, params): + self.messages.append(params['message']) + return FakeResponse() + + +class MeshCoreMessageTest(unittest.TestCase): + def test_cyr2lat_transliterates_russian_text(self): + self.assertEqual( + cyr2lat("Авария: улица Южная", mode='full'), + "Avariya: ulitsa Yuzhnaya", + ) + + def test_cyr2lat_soft_mode_replaces_only_similar_letters(self): + self.assertEqual( + cyr2lat("Авария: улица Южная", mode='soft'), + "Aвapия: yлицa Южнaя", + ) + + def test_cyr2lat_off_mode_keeps_original_text(self): + self.assertEqual( + cyr2lat("Авария: улица Южная", mode='off'), + "Авария: улица Южная", + ) + + def test_short_message_is_sent_as_single_payload(self): + session = FakeSession() + + with contextlib.redirect_stdout(io.StringIO()): + send_meshcore_message(session, "Авария", 133) + + self.assertEqual(session.messages, ["Aвapия"]) + + def test_long_message_is_split_by_byte_limit(self): + limit = 30 + messages = prepare_meshcore_messages("улица Южная " * 10, limit) + + self.assertGreater(len(messages), 1) + for message in messages: + self.assertLessEqual(len(message.encode("utf-8")), limit) + + def test_prepare_messages_can_skip_cyr2lat(self): + self.assertEqual( + prepare_meshcore_messages("Авария", 133, cyr2lat_mode='off'), + ["Авария"], + ) + + def test_chunk_prefix_is_counted_inside_limit(self): + limit = 12 + messages = prepare_meshcore_messages("aa bb cc dd ee ff gg hh", limit) + + self.assertGreater(len(messages), 1) + for index, message in enumerate(messages, start=1): + self.assertTrue( + message.startswith("[" + str(index) + "/" + str(len(messages)) + "] ") + ) + self.assertLessEqual(len(message.encode("utf-8")), limit) + + def test_delay_is_applied_between_chunks_only(self): + session = FakeSession() + + with contextlib.redirect_stdout(io.StringIO()): + with patch('start_meshcore_ha.time.sleep') as sleep: + send_meshcore_message(session, "aa bb cc dd ee ff gg hh", 12, chunk_delay_ms=250) + + self.assertGreater(len(session.messages), 1) + self.assertEqual(sleep.call_count, len(session.messages) - 1) + sleep.assert_called_with(0.25) + + def test_long_word_is_split_without_breaking_utf8_characters(self): + limit = 15 + messages = prepare_meshcore_messages("😀" * 10, limit) + + self.assertGreater(len(messages), 1) + for message in messages: + self.assertLessEqual(len(message.encode("utf-8")), limit) + + def test_env_limit_defaults_to_133_bytes(self): + with patch.dict('os.environ', {}, clear=True): + self.assertEqual(parse_meshcore_message_limit(), 133) + + def test_env_limit_rejects_too_small_value(self): + with patch.dict('os.environ', {'MESHCORE_MESSAGE_LIMIT_BYTES': '49'}): + with self.assertRaises(ValueError): + parse_meshcore_message_limit() + + def test_env_limit_rejects_non_integer_value(self): + with patch.dict('os.environ', {'MESHCORE_MESSAGE_LIMIT_BYTES': 'small'}): + with self.assertRaises(ValueError): + parse_meshcore_message_limit() + + def test_env_cyr2lat_mode_defaults_to_soft(self): + with patch.dict('os.environ', {}, clear=True): + self.assertEqual(parse_meshcore_cyr2lat_mode(), 'soft') + + def test_env_cyr2lat_mode_accepts_full(self): + with patch.dict('os.environ', {'MESHCORE_CYR2LAT_MODE': 'full'}): + self.assertEqual(parse_meshcore_cyr2lat_mode(), 'full') + + def test_env_cyr2lat_mode_accepts_off(self): + with patch.dict('os.environ', {'MESHCORE_CYR2LAT_MODE': 'off'}): + self.assertEqual(parse_meshcore_cyr2lat_mode(), 'off') + + def test_env_cyr2lat_mode_rejects_unknown_value(self): + with patch.dict('os.environ', {'MESHCORE_CYR2LAT_MODE': 'mixed'}): + with self.assertRaises(ValueError): + parse_meshcore_cyr2lat_mode() + + def test_env_chunk_delay_defaults_to_zero_ms(self): + with patch.dict('os.environ', {}, clear=True): + self.assertEqual(parse_meshcore_chunk_delay_ms(), 0) + + def test_env_chunk_delay_accepts_positive_integer(self): + with patch.dict('os.environ', {'MESHCORE_CHUNK_DELAY_MS': '250'}): + self.assertEqual(parse_meshcore_chunk_delay_ms(), 250) + + def test_env_chunk_delay_rejects_negative_value(self): + with patch.dict('os.environ', {'MESHCORE_CHUNK_DELAY_MS': '-1'}): + with self.assertRaises(ValueError): + parse_meshcore_chunk_delay_ms() + + def test_env_chunk_delay_rejects_non_integer_value(self): + with patch.dict('os.environ', {'MESHCORE_CHUNK_DELAY_MS': 'slow'}): + with self.assertRaises(ValueError): + parse_meshcore_chunk_delay_ms() + + +if __name__ == '__main__': + unittest.main()