mirror of
https://github.com/ChronosX88/mastodon-bridge-bot.git
synced 2024-11-08 16:21:01 +00:00
166 lines
9.5 KiB
Python
166 lines
9.5 KiB
Python
import argparse
|
||
import asyncio
|
||
import logging
|
||
import os
|
||
import uuid
|
||
import re
|
||
|
||
import toml
|
||
# from mastodon import Mastodon
|
||
import atoot
|
||
from telethon import TelegramClient, events
|
||
from telethon.tl.functions.channels import JoinChannelRequest
|
||
from telethon.tl.functions.contacts import ResolveUsernameRequest
|
||
from telethon.tl.types import InputChannel
|
||
from telethon.utils import get_extension
|
||
|
||
logging.basicConfig(level=logging.INFO)
|
||
|
||
class BridgeBot:
|
||
async def __new__(cls, cfg: dict):
|
||
self = super().__new__(cls)
|
||
|
||
# RegExps
|
||
self.re_md_links = re.compile(r'\[(.*?)\]\((https?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+)\)')
|
||
# Config init
|
||
self.config = cfg
|
||
self.mastodon_clients = {}
|
||
self.mastodon_clients_visibility = {}
|
||
self.tg_mstdn_mappings = {}
|
||
for acc in cfg["mastodon"]["accounts"]:
|
||
mastodon_client = await atoot.MastodonAPI.create(
|
||
client_id=acc["client_id"],
|
||
client_secret=acc["client_secret"],
|
||
access_token=acc["access_token"],
|
||
instance=acc["api_base_url"]
|
||
)
|
||
self.mastodon_clients[acc["name"]] = mastodon_client
|
||
try:
|
||
self.mastodon_clients_visibility[acc["name"]] = acc["visibility"]
|
||
except KeyError:
|
||
self.mastodon_clients_visibility[acc["name"]] = None
|
||
for m in cfg["mastodon"]["mappings"]:
|
||
if self.tg_mstdn_mappings.get("tg_channel_handle", None) is None:
|
||
self.tg_mstdn_mappings[m["tg_channel_handle"]] = []
|
||
self.tg_mstdn_mappings[m["tg_channel_handle"]].append(m["account_name"])
|
||
|
||
self.tg_client = TelegramClient(cfg["telegram"]["session_file"], cfg["telegram"]["api_id"],
|
||
cfg["telegram"]["api_hash"])
|
||
return self
|
||
|
||
async def run(self):
|
||
await self.tg_client.connect()
|
||
await self.tg_client.start()
|
||
for ch_id in self.config["telegram"]["channels"]:
|
||
result = await self.tg_client(ResolveUsernameRequest(ch_id))
|
||
channel = InputChannel(result.peer.channel_id, result.chats[0].access_hash)
|
||
await self.tg_client(JoinChannelRequest(channel))
|
||
self.tg_client.add_event_handler(self._tg_event_handler)
|
||
logging.info("Bot has been started")
|
||
await self.tg_client.run_until_disconnected()
|
||
|
||
@events.register(events.NewMessage())
|
||
async def _tg_event_handler(self, event: events.NewMessage.Event):
|
||
if event.message.post:
|
||
channel = await event.get_chat()
|
||
if channel.broadcast:
|
||
if channel.username in self.tg_mstdn_mappings.keys():
|
||
if event.message.grouped_id is not None:
|
||
logging.warning("Albums isn't supported yet")
|
||
return
|
||
logging.info(f"Catched new post from telegram channel {channel.username}")
|
||
# Common Mastodon message limit size. Change if you increased this limit.
|
||
mstdn_post_limit = 500
|
||
full_text = event.message.text
|
||
# Uncomment for debug
|
||
#print(full_text)
|
||
full_text = full_text.replace('**', '')
|
||
full_text = full_text.replace('__', '')
|
||
full_text = full_text.replace('~~', '')
|
||
full_text = full_text.replace('`', '')
|
||
full_text = re.sub(self.re_md_links, r'\g<1> \g<2> ', full_text)
|
||
# URL of Telegram message
|
||
tg_message_url = f"[https://t.me/{channel.username}/" + str(event.message.id) + "]\n\n"
|
||
if event.message.file and not (event.message.photo or event.message.video or event.message.gif):
|
||
full_text = full_text + "\n\n[К оригинальному посту приложен файл " + event.message.file.name + "]"
|
||
# Size of full text
|
||
full_text_size = len(full_text)
|
||
# Mastodon max post size with TG message URL
|
||
mstdn_post_size = mstdn_post_limit - len(tg_message_url)
|
||
# Initial vars
|
||
long_post_tail = ''
|
||
reply_start = 0
|
||
reply_end = 0
|
||
# Post text if tg message lenght is lt mstdn post limit
|
||
post_text: str = tg_message_url + full_text[reply_start:mstdn_post_size]
|
||
# Set reply_start to non zero for future chunking and make mstdn post with continuaniton note
|
||
logging.debug("full_text_size: " + str(full_text_size))
|
||
if full_text_size > mstdn_post_size:
|
||
long_post_tail = "\n\n[Откройте пост по ссылке или прочитайте продолжение в обсуждении]"
|
||
mstdn_post_size = mstdn_post_size - len(long_post_tail)
|
||
reply_start = full_text.rfind(' ', reply_start, mstdn_post_size)
|
||
post_text = tg_message_url + full_text[0:reply_start] + long_post_tail
|
||
logging.debug("start reply_start: " + str(reply_start))
|
||
temp_file_path: str = ""
|
||
# Downloading media if tg post contains it
|
||
if (event.message.photo or event.message.video or event.message.gif) and not hasattr(event.message.media, "webpage"):
|
||
logging.info("Post contains the media, downloading it...")
|
||
temp_file_name = uuid.uuid4()
|
||
temp_file_path = f"/tmp/{temp_file_name}{get_extension(event.message.media)}"
|
||
await self.tg_client.download_media(event.message.media, temp_file_path)
|
||
# Starting to post messages
|
||
for mstdn_acc_name in self.tg_mstdn_mappings[channel.username]:
|
||
if self.mastodon_clients.get(mstdn_acc_name, None) is None:
|
||
logging.error(f"{mstdn_acc_name} doesn't exists in mastodon.accounts section of config!")
|
||
return
|
||
# Make current client with config
|
||
current_mastodon_client = self.mastodon_clients[mstdn_acc_name]
|
||
current_mstdn_acc_visibility = self.mastodon_clients_visibility[mstdn_acc_name]
|
||
# Attach media if tg post contains it
|
||
if temp_file_path != "":
|
||
mstdn_media_meta = await current_mastodon_client.upload_attachment(open(temp_file_path, 'rb'))
|
||
else:
|
||
mstdn_media_meta = None
|
||
# First root mstdn post
|
||
if mstdn_media_meta != None:
|
||
media_ids = [mstdn_media_meta["id"]]
|
||
else:
|
||
media_ids = None
|
||
reply_to = await current_mastodon_client.create_status(status=post_text, media_ids=media_ids, visibility=current_mstdn_acc_visibility)
|
||
tg_message_url = f"[Продолжение https://t.me/{channel.username}/" + str(event.message.id) + "]\n\n"
|
||
# Chunking post into mstdn limit chunks and reply to root post
|
||
emergency_break = 0
|
||
emergency_break_limit = 14
|
||
while reply_start + mstdn_post_limit < full_text_size:
|
||
logging.debug("while reply_start:" + str(reply_start))
|
||
reply_end = full_text.rfind(' ', reply_start, reply_start + mstdn_post_limit - len(tg_message_url)*2)
|
||
if reply_end == reply_start:
|
||
reply_end = reply_start + mstdn_post_limit - len(tg_message_url)*2
|
||
logging.debug("while reply_end:" + str(reply_end))
|
||
post_text = tg_message_url + full_text[reply_start+1:reply_end]
|
||
reply_to = await current_mastodon_client.create_status(status=post_text, in_reply_to_id=reply_to["id"], visibility=current_mstdn_acc_visibility)
|
||
reply_start = reply_end
|
||
# Emergency break for long or endlessly looped posts
|
||
emergency_break = emergency_break + 1
|
||
if emergency_break >= emergency_break_limit:
|
||
logging.debug("Breaking very long reply thread")
|
||
break
|
||
# Final chunk to reply to root post
|
||
if reply_start > 0:
|
||
logging.debug("final reply_start: " + str(reply_start))
|
||
post_text = tg_message_url + full_text[reply_start+1:full_text_size]
|
||
reply_to = await current_mastodon_client.create_status(status=post_text, in_reply_to_id=reply_to["id"], visibility=current_mstdn_acc_visibility)
|
||
# Delete media attach
|
||
if temp_file_path != "":
|
||
os.remove(temp_file_path)
|
||
|
||
if __name__ == '__main__':
|
||
parser = argparse.ArgumentParser()
|
||
parser.add_argument('--config', type=str,
|
||
help='A path to bot configuration')
|
||
args = parser.parse_args()
|
||
|
||
config: dict = toml.loads(open(args.config, "r").read())
|
||
bot = asyncio.get_event_loop().run_until_complete(BridgeBot(config))
|
||
asyncio.get_event_loop().run_until_complete(bot.run())
|