mastodon-bridge-bot/bridge.py

159 lines
9.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import argparse
import asyncio
import logging
import os
import uuid
import re
import toml
from mastodon import Mastodon
from telethon import TelegramClient, events
from telethon.tl.functions.channels import JoinChannelRequest
from telethon.tl.functions.contacts import ResolveUsernameRequest
from telethon.tl.types import InputChannel
from telethon.utils import get_extension
logging.basicConfig(level=logging.INFO)
class BridgeBot:
def __init__(self, cfg: dict):
# RegExps
self.re_md_links = re.compile(r'\[(.*?)\]\((https?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+)\)')
# Config init
self.config = cfg
self.mastodon_clients = {}
self.mastodon_clients_visibility = {}
self.tg_mstdn_mappings = {}
for acc in cfg["mastodon"]["accounts"]:
mastodon_client = Mastodon(
client_id=acc["client_id"],
client_secret=acc["client_secret"],
access_token=acc["access_token"],
api_base_url=acc["api_base_url"]
)
self.mastodon_clients[acc["name"]] = mastodon_client
try:
self.mastodon_clients_visibility[acc["name"]] = acc["visibility"]
except KeyError:
self.mastodon_clients_visibility[acc["name"]] = None
for m in cfg["mastodon"]["mappings"]:
if self.tg_mstdn_mappings.get("tg_channel_handle", None) is None:
self.tg_mstdn_mappings[m["tg_channel_handle"]] = []
self.tg_mstdn_mappings[m["tg_channel_handle"]].append(m["account_name"])
self.tg_client = TelegramClient(cfg["telegram"]["session_file"], cfg["telegram"]["api_id"],
cfg["telegram"]["api_hash"])
async def run(self):
await self.tg_client.connect()
await self.tg_client.start()
for ch_id in self.config["telegram"]["channels"]:
result = await self.tg_client(ResolveUsernameRequest(ch_id))
channel = InputChannel(result.peer.channel_id, result.chats[0].access_hash)
await self.tg_client(JoinChannelRequest(channel))
self.tg_client.add_event_handler(self._tg_event_handler)
logging.info("Bot has been started")
await self.tg_client.run_until_disconnected()
@events.register(events.NewMessage())
async def _tg_event_handler(self, event: events.NewMessage.Event):
if event.message.post:
channel = await event.get_chat()
if channel.broadcast:
if channel.username in self.tg_mstdn_mappings.keys():
if event.message.grouped_id is not None:
logging.warning("Albums isn't supported yet")
return
logging.info(f"Catched new post from telegram channel {channel.username}")
# Common Mastodon message limit size. Change if you increased this limit.
mstdn_post_limit = 500
full_text = event.message.text
# Uncomment for debug
#print(full_text)
full_text = full_text.replace('**', '')
full_text = full_text.replace('__', '')
full_text = full_text.replace('~~', '')
full_text = full_text.replace('`', '')
full_text = re.sub(self.re_md_links, r'\g<1> \g<2> ', full_text)
# URL of Telegram message
tg_message_url = f"[https://t.me/{channel.username}/" + str(event.message.id) + "]\n\n"
if event.message.file and not (event.message.photo or event.message.video or event.message.gif):
full_text = full_text + "\n\n[К оригинальному посту приложен файл " + event.message.file.name + "]"
# Size of full text
full_text_size = len(full_text)
# Mastodon max post size with TG message URL
mstdn_post_size = mstdn_post_limit - len(tg_message_url)
# Initial vars
long_post_tail = ''
reply_start = 0
reply_end = 0
# Post text if tg message lenght is lt mstdn post limit
post_text: str = tg_message_url + full_text[reply_start:mstdn_post_size]
# Set reply_start to non zero for future chunking and make mstdn post with continuaniton note
logging.debug("full_text_size: " + str(full_text_size))
if full_text_size > mstdn_post_size:
long_post_tail = "\n\n[Откройте пост по ссылке или прочитайте продолжение в обсуждении]"
mstdn_post_size = mstdn_post_size - len(long_post_tail)
reply_start = full_text.rfind(' ', reply_start, mstdn_post_size)
post_text: str = tg_message_url + full_text[0:reply_start] + long_post_tail
logging.debug("start reply_start: " + str(reply_start))
temp_file_path: str = ""
# Downloading media if tg post contains it
if (event.message.photo or event.message.video or event.message.gif) and not hasattr(event.message.media, "webpage"):
logging.info("Post contains the media, downloading it...")
temp_file_name = uuid.uuid4()
temp_file_path = f"/tmp/{temp_file_name}{get_extension(event.message.media)}"
await self.tg_client.download_media(event.message.media, temp_file_path)
# Starting to post messages
for mstdn_acc_name in self.tg_mstdn_mappings[channel.username]:
if self.mastodon_clients.get(mstdn_acc_name, None) is None:
logging.error(f"{mstdn_acc_name} doesn't exists in mastodon.accounts section of config!")
return
# Make current client with config
current_mastodon_client = self.mastodon_clients[mstdn_acc_name]
current_mstdn_acc_visibility = self.mastodon_clients_visibility[mstdn_acc_name]
# Attach media if tg post contains it
if temp_file_path != "":
mstdn_media_meta = current_mastodon_client.media_post(temp_file_path)
else:
mstdn_media_meta = None
# First root mstdn post
reply_to = current_mastodon_client.status_post(post_text, media_ids=[mstdn_media_meta], visibility=current_mstdn_acc_visibility)
tg_message_url = f"[Продолжение https://t.me/{channel.username}/" + str(event.message.id) + "]\n\n"
# Chunking post into mstdn limit chunks and reply to root post
emergency_break = 0
emergency_break_limit = 14
while reply_start + mstdn_post_limit < full_text_size:
logging.debug("while reply_start:" + str(reply_start))
reply_end = full_text.rfind(' ', reply_start, reply_start + mstdn_post_limit - len(tg_message_url)*2)
if reply_end == reply_start:
reply_end = reply_start + mstdn_post_limit - len(tg_message_url)*2
logging.debug("while reply_end:" + str(reply_end))
post_text: str = tg_message_url + full_text[reply_start+1:reply_end]
reply_to = current_mastodon_client.status_post(post_text, in_reply_to_id=reply_to, visibility=current_mstdn_acc_visibility)
reply_start = reply_end
# Emergency break for long or endlessly looped posts
emergency_break = emergency_break + 1
if emergency_break >= emergency_break_limit:
logging.debug("Breaking very long reply thread")
break
# Final chunk to reply to root post
if reply_start > 0:
logging.debug("final reply_start: " + str(reply_start))
post_text: str = tg_message_url + full_text[reply_start+1:full_text_size]
reply_to = current_mastodon_client.status_post(post_text, in_reply_to_id=reply_to, visibility=current_mstdn_acc_visibility)
# Delete media attach
if temp_file_path != "":
os.remove(temp_file_path)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--config', type=str,
help='A path to bot configuration')
args = parser.parse_args()
config: dict = toml.loads(open(args.config, "r").read())
bot = BridgeBot(config)
asyncio.get_event_loop().run_until_complete(bot.run())