mastodon-bridge-bot/bridge.py

68 lines
2.9 KiB
Python

import asyncio
import os
import uuid
from io import BytesIO
import toml
import argparse
from mastodon import Mastodon
from telethon import TelegramClient, events
from telethon.events.common import EventBuilder
from telethon.tl.functions.channels import JoinChannelRequest, GetMessagesRequest
from telethon.tl.functions.contacts import ResolveUsernameRequest
from telethon.tl.types import InputChannel
class BridgeBot:
def __init__(self, cfg: dict):
self.config = cfg
self.mastodon_client = Mastodon(
client_id=cfg["mastodon"]["client_id"],
client_secret=cfg["mastodon"]["client_secret"],
access_token=cfg["mastodon"]["access_token"],
api_base_url=cfg["mastodon"]["api_base_url"]
)
self.tg_client = TelegramClient(cfg["telegram"]["session_file"], cfg["telegram"]["api_id"],
cfg["telegram"]["api_hash"])
async def run(self):
await self.tg_client.connect()
await self.tg_client.start()
for ch_id in self.config["telegram"]["channels"]:
result = await self.tg_client(ResolveUsernameRequest(ch_id))
channel = InputChannel(result.peer.channel_id, result.chats[0].access_hash)
await self.tg_client(JoinChannelRequest(channel))
self.tg_client.add_event_handler(self._tg_event_handler)
await self.tg_client.run_until_disconnected()
@events.register(events.NewMessage())
async def _tg_event_handler(self, event: events.NewMessage.Event):
if event.message.post:
channel = await event.get_chat()
if channel.broadcast:
if channel.username in self.config["telegram"]["channels"]:
print("dobbry vechur")
text: str = event.message.text
if event.message.forward:
text = f"[from {event.message.forward.chat.title} (https://t.me/{event.message.forward.chat.username})]\n\n" + text
if event.message.photo:
temp_image_name = uuid.uuid4()
await self.tg_client.download_media(event.message.photo, f"/tmp/{temp_image_name}.jpg")
mstdn_media_meta = self.mastodon_client.media_post(f"/tmp/{temp_image_name}.jpg")
self.mastodon_client.status_post(text, media_ids=[mstdn_media_meta])
os.remove(f"/tmp/{temp_image_name}.jpg")
return
self.mastodon_client.toot(text)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--config', type=str,
help='A path to bot configuration')
args = parser.parse_args()
config: dict = toml.loads(open(args.config, "r").read())
print(config)
bot = BridgeBot(config)
asyncio.get_event_loop().run_until_complete(bot.run())