1 #2

Merged
rejnronuzz merged 6 commits from main into dev 2026-03-09 06:19:03 +00:00
12 changed files with 539 additions and 651 deletions
Showing only changes of commit 7b45fdd8f1 - Show all commits

31
.gitignore vendored Normal file
View File

@@ -0,0 +1,31 @@
.env
bot.log
__pycache__/
*.pyc
*.py[cod]
*$py.class
*.so
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
.vscode/
.idea/
*.swp
*.swo
*.log

View File

@@ -1,88 +1,120 @@
import discord
from discord.ext import commands, tasks
from discord.ext import commands
from discord import app_commands
import asyncio
import logging
from datetime import datetime, timezone
from datetime import datetime
from typing import Optional
import config
from utils.database import FunchosaDatabase
logger = logging.getLogger(__name__)
def build_funchosa_embed(message_data: dict) -> discord.Embed:
embed = discord.Embed(
description=message_data['content'] or "*[без текста]*",
color=discord.Color.blue(),
timestamp=datetime.fromisoformat(message_data['timestamp'])
)
embed.set_author(name='random фунчоза of the day')
embed.add_field(
name="info",
value=(
f"автор: <@{message_data['author_id']}>\n"
f"дата: {message_data['timestamp'].replace('T', ' ')[:19]}\n"
f"номер в базе: {message_data['id']}"
),
inline=False
)
if message_data.get('attachments'):
links = [
f"[вложение {i}]({att['url']})"
for i, att in enumerate(message_data['attachments'][:3], 1)
]
embed.add_field(name="вложения", value="\n".join(links), inline=False)
return embed
def build_funchosa_view() -> discord.ui.View:
view = discord.ui.View()
view.add_item(discord.ui.Button(
label="подавай еще, раб",
custom_id="another_random",
style=discord.ButtonStyle.secondary
))
return view
class FunchosaView(discord.ui.View):
def __init__(self, db: FunchosaDatabase, message_url: str):
super().__init__(timeout=None)
self.db = db
self.add_item(discord.ui.Button(
label="перейти к сообщению",
url=message_url,
style=discord.ButtonStyle.link
))
@discord.ui.button(label="подавай еще, раб", custom_id="another_random", style=discord.ButtonStyle.secondary)
async def another_random(self, interaction: discord.Interaction, button: discord.ui.Button):
await interaction.response.defer()
message_data = await self.db.get_random_message()
if not message_data:
await interaction.followup.send("помоему чет поломалось. меня пингани", ephemeral=True)
return
embed = build_funchosa_embed(message_data)
view = FunchosaView(self.db, message_data['message_url'])
await interaction.followup.send(embed=embed, view=view)
class FunchosaParser(commands.Cog):
def __init__(self, bot):
self.bot = bot
self.db = FunchosaDatabase()
self.target_channel_id = config.FUNCHOSA_CHANNEL_ID
self.is_parsing = False
self.parsed_count = 0
self.rate_limit_delay = 0.5
async def cog_load(self):
await self.db.init_db()
logger.info("[FunchosaParser] cog initialized")
@commands.Cog.listener()
async def on_ready(self):
await asyncio.sleep(10)
if not self.is_parsing:
logger.info("FunchosaParser initialized")
await self.bot.wait_until_ready()
await self.auto_parse_on_startup()
async def auto_parse_on_startup(self):
try:
if not self.target_channel_id:
logger.warning("[FunchosaParser] no id channel ")
if self.is_parsing:
logger.warning("Parsing already in progress, skipping startup parse")
return
channel = self.bot.get_channel(self.target_channel_id)
if not channel:
logger.warning(f"[FunchosaParser] no channel with id {self.target_channel_id} found")
logger.warning("Channel with id %s not found", self.target_channel_id)
return
try:
status = await self.db.get_parsing_status()
logger.info(f"[FunchosaParser] parsing status; firsttry = {not status['first_parse_done']}")
is_first = not status['first_parse_done']
limit = None if is_first else 250
logger.info("Starting %s parse", "full" if is_first else "incremental")
if self.is_parsing:
logger.warning("[FunchosaParser] parsing already in progress")
return
count = await self._parse_history(channel, limit=limit)
logger.info("[FunchosaParser] starting to parse")
if not status['first_parse_done']:
logger.info("[FunchosaParser] first try, parse all")
count = await self._parse_history(channel, limit=None)
last_message_id = await self.db.get_last_message_in_db()
await self.db.update_parsing_status(
first_parse_done=True,
last_parsed_message_id=last_message_id
)
logger.info(f"[FunchosaParser] parsing finished, total msg count: {count}")
else:
logger.info("[FunchosaParser] NOTfirst try, parse first 250")
count = await self._parse_history(channel, limit=250)
if count > 0:
new_last_message_id = await self.db.get_last_message_in_db()
await self.db.update_parsing_status(
first_parse_done=True,
last_parsed_message_id=new_last_message_id
)
logger.info(f"[FunchosaParser] parsing finished, total msg count: {count}")
last_id = await self.db.get_last_message_in_db()
await self.db.update_parsing_status(first_parse_done=True, last_parsed_message_id=last_id)
logger.info("Parsing finished, %d new messages", count)
except Exception as e:
logger.error(f"[FunchosaParser] err when parsing: {e}", exc_info=True)
logger.error("Error during startup parse: %s", e, exc_info=True)
@commands.Cog.listener()
async def on_message(self, message):
if message.author.bot:
return
if self.target_channel_id and message.channel.id == self.target_channel_id:
await self._save_message(message)
@@ -91,16 +123,11 @@ class FunchosaParser(commands.Cog):
if await self.db.message_exists(message.id):
return
attachments_data = []
attachment_urls = []
for attachment in message.attachments:
if attachment.url.lower().endswith(('png', 'jpg', 'jpeg', 'gif', 'webp')):
attachments_data.append({
'url': attachment.url,
'filename': attachment.filename
})
attachment_urls.append(attachment.url)
attachments_data = [
{'url': a.url, 'filename': a.filename}
for a in message.attachments
if a.url.lower().endswith(('png', 'jpg', 'jpeg', 'gif', 'webp'))
]
message_data = {
'message_id': message.id,
@@ -110,83 +137,58 @@ class FunchosaParser(commands.Cog):
'content': message.content,
'timestamp': message.created_at.isoformat(),
'message_url': message.jump_url,
'has_attachments': len(message.attachments) > 0,
'attachment_urls': ','.join(attachment_urls),
'attachments': attachments_data
'has_attachments': bool(message.attachments),
'attachments': attachments_data,
}
saved = await self.db.save_message(message_data)
if saved:
self.parsed_count += 1
if self.parsed_count % 50 == 0:
logger.info(f"[FunchosaParser] saved messages total: {self.parsed_count}")
logger.info("Saved %d messages so far", self.parsed_count)
except Exception as e:
logger.error(f"[FunchosaParser] err when saving msg: {e}")
logger.error("Error saving message: %s", e)
async def _parse_history(self, channel, limit=None, after_message=None):
try:
async def _parse_history(self, channel, limit=None):
self.is_parsing = True
count = 0
skipped = 0
batch_size = 0
logger.info(f"[FunchosaParser] starting to parse {channel.name}")
try:
logger.info("Parsing history of #%s (limit=%s)", channel.name, limit)
oldest_first = not limit or limit < 0
parse_kwargs = {
'limit': abs(limit) if limit else None,
'oldest_first': oldest_first,
}
if after_message:
parse_kwargs['after'] = after_message
async for message in channel.history(**parse_kwargs):
async for message in channel.history(limit=limit, oldest_first=True):
if message.author.bot:
continue
if await self.db.message_exists(message.id):
skipped += 1
batch_size += 1
if batch_size >= 100:
logger.info(f"[FunchosaParser] batch: +{count} новых, -{skipped} skipped")
batch_size = 0
continue
await self._save_message(message)
count += 1
batch_size += 1
await asyncio.sleep(self.rate_limit_delay)
if (count + skipped) % 100 == 0:
logger.info("Progress: +%d new, -%d skipped", count, skipped)
if batch_size >= 100:
logger.info(f"[FunchosaParser] batch: +{count} новых, -{skipped} skipped")
batch_size = 0
logger.info(f"[FunchosaParser] parsing done. total new: {count}, total skipped: {skipped}")
logger.info("Parse done: %d new, %d skipped", count, skipped)
return count
except Exception as e:
logger.error(f"[FunchosaParser] err when parsing history: {e}", exc_info=True)
logger.error("Error parsing history: %s", e, exc_info=True)
return 0
finally:
self.is_parsing = False
@commands.hybrid_command()
@app_commands.describe(
number="номер сообщения из базы; optional"
)
@app_commands.describe(number="номер сообщения из базы; optional")
async def funchosarand(self, ctx, number: Optional[int] = None):
await ctx.defer()
if number:
message_data = await self.db.get_message_by_number(number)
if not message_data:
await ctx.send(f"сообщение с номером {number} не найдено в базе ||соси черт||")
await ctx.send(f"сообщение с номером {number} не найдено в базе")
return
else:
message_data = await self.db.get_random_message()
@@ -194,119 +196,16 @@ class FunchosaParser(commands.Cog):
await ctx.send("помоему чет поломалось. меня пингани")
return
embed = discord.Embed(
description=message_data['content'] or "*[без текста]*",
color=discord.Color.blue(),
timestamp=datetime.fromisoformat(message_data['timestamp'])
)
embed.set_author(
name='random фунчоза of the day'
)
attachments_text = []
if message_data.get('attachments'):
for i, att in enumerate(message_data['attachments'][:3], 1):
attachments_text.append(f"[вложение {i}]({att['url']})")
embed.add_field(
name="info",
value=(
f"автор: <@{message_data['author_id']}>\n"
f"дата: {message_data['timestamp'].replace('T', ' ')[:19]}\n"
f"номер в базе: {message_data['id']}"
),
inline=False
)
if attachments_text:
embed.add_field(
name="вложения",
value="\n".join(attachments_text),
inline=False
)
view = discord.ui.View()
view.add_item(
discord.ui.Button(
label="перейти к сообщению",
url=message_data['message_url'],
style=discord.ButtonStyle.link
)
)
view.add_item(
discord.ui.Button(
label="подавай еще, раб",
custom_id="another_random",
style=discord.ButtonStyle.secondary
)
)
embed = build_funchosa_embed(message_data)
view = FunchosaView(self.db, message_data['message_url'])
await ctx.send(embed=embed, view=view)
@commands.Cog.listener()
async def on_interaction(self, interaction: discord.Interaction):
if not interaction.data or 'custom_id' not in interaction.data:
return
if interaction.data['custom_id'] == "another_random":
await interaction.response.defer()
message_data = await self.db.get_random_message()
if not message_data:
await interaction.followup.send("помоему чет поломалось. меня пингани", ephemeral=True)
return
embed = discord.Embed(
description=message_data['content'] or "*[без текста]*",
color=discord.Color.blue(),
timestamp=datetime.fromisoformat(message_data['timestamp'])
)
embed.set_author(
name='random фунчоза of the day'
)
embed.add_field(
name="info",
value=(
f"автор: <@{message_data['author_id']}>\n"
f"дата: {message_data['timestamp'].replace('T', ' ')[:19]}\n"
f"номер в базе: {message_data['id']}"
),
inline=False
)
view = discord.ui.View()
view.add_item(
discord.ui.Button(
label="перейти к сообщению",
url=message_data['message_url'],
style=discord.ButtonStyle.link
)
)
view.add_item(
discord.ui.Button(
label="подавай еще, раб",
custom_id="another_random",
style=discord.ButtonStyle.secondary
)
)
await interaction.followup.send(embed=embed, view=view)
@commands.hybrid_command()
async def funchosainfo(self, ctx):
total = await self.db.get_total_count()
status = await self.db.get_parsing_status()
embed = discord.Embed(
title="фунчоза.статы",
color=discord.Color.green()
)
embed = discord.Embed(title="фунчоза.статы", color=discord.Color.green())
embed.add_field(name="сообщений в базе", value=f"**{total}**", inline=True)
if status['last_parsed_message_id']:
@@ -318,5 +217,6 @@ class FunchosaParser(commands.Cog):
await ctx.send(embed=embed)
async def setup(bot):
await bot.add_cog(FunchosaParser(bot))

View File

@@ -1,5 +1,9 @@
import discord
import logging
from discord.ext import commands
import config
logger = logging.getLogger(__name__)
class HelpCog(commands.Cog):
def __init__(self, bot):
@@ -7,21 +11,9 @@ class HelpCog(commands.Cog):
@commands.command(name="help")
async def help(self, ctx):
help_text = """
комманды:
await ctx.send(config.HELP_TEXT)
logger.debug("Help requested by %s", ctx.author.name)
```
uptime : сколько времени прошло с запуска бота
funchosarand <id> : рандомная пикча из фунчозы либо по айдишнику в базе
funchosainfo : фунчоза.статы
kitty : рандомная пикча кошечки из [thecatapi](https://thecatapi.com/)
```
префикс: `!`
в лс отпишите по предложениям че в бота докинуть
"""
await ctx.send(help_text)
async def setup(bot):
await bot.add_cog(HelpCog(bot))

View File

@@ -6,82 +6,83 @@ import logging
logger = logging.getLogger(__name__)
class Kitty(commands.Cog, name="Котики"):
def __init__(self, bot):
self.bot = bot
self.api_key = os.environ.get('CAT_API_KEY')
self.search_url = "https://api.thecatapi.com/v1/images/search"
self.session: aiohttp.ClientSession | None = None
if not self.api_key:
logger.warning("[Kitty] no api key found")
logger.warning("No CAT_API_KEY found, using unauthenticated requests")
async def cog_load(self):
headers = {"Content-Type": "application/json"}
if self.api_key:
headers["x-api-key"] = self.api_key
self.session = aiohttp.ClientSession(headers=headers)
async def cog_unload(self):
if self.session:
await self.session.close()
async def _fetch_random_cat(self):
headers = {"Content-Type": "application/json"}
if self.api_key and self.api_key != "DEMO-API-KEY":
headers["x-api-key"] = self.api_key
else:
headers["x-api-key"] = "DEMO-API-KEY"
params = {
'size': 'med',
'mime_types': 'jpg,png',
'format': 'json',
'has_breeds': 'true',
'order': 'RANDOM',
'limit': 1
'limit': 1,
}
try:
async with aiohttp.ClientSession() as session:
async with session.get(self.search_url, headers=headers, params=params) as response:
if response.status == 200:
data = await response.json()
if data and isinstance(data, list):
logger.info(f"[Kitty] API response received")
return data[0]
else:
logger.error(f"[Kitty] api err: {response.status}")
try:
async with self.session.get(self.search_url, params=params) as response:
if response.status != 200:
error_text = await response.text()
logger.error(f"[Kitty] api error text: {error_text}")
except:
pass
logger.error("API error %s: %s", response.status, error_text)
return None
data = await response.json()
if not data or not isinstance(data, list):
logger.error("Unexpected API response format: %s", data)
return None
return data[0]
except aiohttp.ClientError as e:
logger.error(f"[Kitty] client error when contacting api: {e}")
logger.error("HTTP client error: %s", e)
except Exception as e:
logger.error(f"[Kitty] err: {e}")
logger.error("Unexpected error fetching cat: %s", e)
return None
@commands.hybrid_command(name="kitty", description="kitty")
async def kitty(self, ctx):
await ctx.defer()
logger.info(f"[Kitty] kitty request from {ctx.author.name} ({ctx.author.id})")
logger.info("Kitty request from %s (%s)", ctx.author.name, ctx.author.id)
cat_data = await self._fetch_random_cat()
if not cat_data:
logger.warning("[Kitty] cat_data = null")
await ctx.send("помоему чет поломалось. меня пингани ||not cat_data||")
return
image_url = cat_data.get('url')
if not image_url:
logger.error("[Kitty] no image url")
await ctx.send("помоему чет поломалось. меня пингани ||no image url||")
return
breeds = cat_data.get('breeds')
breed_name = breeds[0].get('name') if breeds else None
breeds_info = cat_data.get('breeds')
if breeds_info and len(breeds_info) > 0:
breed = breeds_info[0]
if breed.get('name'):
caption = f"{breed['name']}"
logger.info(f"[Kitty] Breed found: {breed['name']}")
if breed_name:
logger.info("Breed found: %s", breed_name)
await ctx.send(f"random kitty of the day\n[{breed_name}]({image_url})")
else:
await ctx.send(f"random kitty of the day\n{image_url}")
logger.info("Successfully sent kitty to %s", ctx.author.name)
await ctx.send(f"random kitty of the day\n[{caption}]({image_url})")
logger.info(f"[Kitty] succesfully send kitty to {ctx.author.name}")
async def setup(bot):
await bot.add_cog(Kitty(bot))

View File

@@ -1,33 +1,29 @@
import discord
from discord.ext import commands
from discord import app_commands
import config
from utils.data_manager import save_message_id, load_message_id
import logging
# Создаем логгер для этого модуля
logger = logging.getLogger(__name__)
class RoleManager(commands.Cog):
def __init__(self, bot):
self.bot = bot
self.role_message_id = None
self.CHANNEL_ID = config.CHANNEL_ID
self.REACTION_ROLES = config.REACTION_ROLES
self._ready = False
async def cog_load(self):
self.role_message_id = load_message_id()
if self.role_message_id:
logger.info(f"[RoleManager] initialized role msg with id: '{self.role_message_id}'")
logger.info("Initialized role message with id: %s", self.role_message_id)
else:
logger.info('[RoleManager] no role msg found')
logger.info("No role message found")
@commands.Cog.listener()
async def on_ready(self):
if not self._ready and self.role_message_id:
await self.bot.wait_until_ready()
if self.role_message_id:
await self.check_and_sync_roles()
self._ready = True
@commands.Cog.listener()
async def on_raw_reaction_add(self, payload):
@@ -55,22 +51,21 @@ class RoleManager(commands.Cog):
role_id = self.REACTION_ROLES[emoji]
role = guild.get_role(role_id)
if not role:
logger.warning(f"[RoleManager] role with id '{role_id}' not found")
logger.warning("Role with id %s not found", role_id)
return
try:
if add_role:
await member.add_roles(role)
logger.info(f"[RoleManager] gave role '{role.name}' to '{member.name}'")
logger.info("Gave role '%s' to '%s'", role.name, member.name)
else:
await member.remove_roles(role)
logger.info(f"[RoleManager] removed role '{role.name}' from user '{member.name}'")
logger.info("Removed role '%s' from '%s'", role.name, member.name)
except discord.Forbidden:
logger.error(f"[RoleManager] not enough rights to give role '{role.name}'")
logger.error("Missing permissions to manage role '%s'", role.name)
except Exception as e:
logger.error(f"[RoleManager] err: '{e}'")
logger.error("Unexpected error in handle_reaction: %s", e)
async def check_and_sync_roles(self):
if not self.role_message_id:
@@ -78,70 +73,68 @@ class RoleManager(commands.Cog):
try:
channel = await self.bot.fetch_channel(self.CHANNEL_ID)
if not channel:
logger.warning(f"[RoleManager] channel with id '{self.CHANNEL_ID}' not found")
return
message = await channel.fetch_message(self.role_message_id)
for reaction in message.reactions:
emoji = str(reaction.emoji)
if emoji not in self.REACTION_ROLES:
continue
if emoji in self.REACTION_ROLES:
role_id = self.REACTION_ROLES[emoji]
role = message.guild.get_role(role_id)
role = message.guild.get_role(self.REACTION_ROLES[emoji])
if not role:
logger.warning(f"[RoleManager] role with id '{role_id}' not found")
logger.warning("Role with id %s not found during sync", self.REACTION_ROLES[emoji])
continue
async for user in reaction.users():
if user.bot:
continue
member = message.guild.get_member(user.id)
if member and role not in member.roles:
await member.add_roles(role)
logger.info(f"[RoleManager] gave role '{role.name}' to user '{member.name}'")
logger.info("Sync gave role '%s' to '%s'", role.name, member.name)
except discord.NotFound:
logger.warning("[RoleManager] role msg not found")
logger.warning("Role message not found during sync")
except discord.Forbidden:
logger.error("[RoleManager] no rights to get channel or message")
logger.error("Missing permissions to fetch channel or message")
except Exception as e:
logger.error(f"[RoleManager] sync err: '{e}'")
logger.error("Unexpected sync error: %s", e)
@commands.hybrid_command()
@commands.has_permissions(administrator=True)
async def create_role_message(self, ctx):
embed = discord.Embed(
title="ле",
description="пикните там роли снизу по реактам,\nшоб если куда то шли играть с фаршем >3 человек то сразу можно было ее пингануть\n\n"
"react_index = {\n"
" '💩': гревдиггер\n"
" '🤙': бара наверн хз\n"
" '🤕': пох ваще за любой движ\n"
" '🇺🇦': мтк\n"
"}\n\n"
"естессно кто будет спамить пингом ролей тот будет опущен и закинут в таймаут\n"
"если бот в оффе роли выдадутся когда я врублю его снова",
color=0x00ff00
)
message = await ctx.send(embed=embed)
for emoji in self.REACTION_ROLES.keys():
message = await ctx.send(config.ROLE_MESSAGE_TEXT)
for emoji in self.REACTION_ROLES:
await message.add_reaction(emoji)
self.role_message_id = message.id
save_message_id(message.id)
logger.info(f"[RoleManager] created new role message with id: '{message.id}'")
logger.info("Created new role message with id: %s", message.id)
@commands.hybrid_command()
@commands.has_permissions(administrator=True)
async def sync_roles(self, ctx):
await self.check_and_sync_roles()
logger.info("[RoleManager] manual sync triggered by admin")
async def update_role_message(self, ctx):
if not self.role_message_id:
return
try:
channel = await self.bot.fetch_channel(self.CHANNEL_ID)
message = await channel.fetch_message(self.role_message_id)
await message.edit(content=config.ROLE_MESSAGE_TEXT)
existing = [str(r.emoji) for r in message.reactions]
for emoji in self.REACTION_ROLES:
if emoji not in existing:
await message.add_reaction(emoji)
logger.info("Role message updated by %s", ctx.author.name)
except discord.NotFound:
logger.warning("Role message not found during update")
except discord.Forbidden:
logger.error("Missing permissions to edit role message")
async def setup(bot):
await bot.add_cog(RoleManager(bot))

View File

@@ -7,50 +7,59 @@ from typing import Optional
logger = logging.getLogger(__name__)
class StatusRotator(commands.Cog):
def __init__(self, bot):
def __init__(self, bot, *, status_file: str = 'data/statuses.json', interval: float = 1.0):
self.bot = bot
self.statuses: list[str] = []
self.current_index = 0
self.status_file = 'data/statuses.json'
self.status_file = status_file
self.rotate_status.change_interval(minutes=interval)
async def cog_load(self):
await self.load_statuses()
self.rotate_status.start()
logger.info("[StatusRotator] status rotator initialized")
logger.info("Status rotator initialized with %d statuses", len(self.statuses))
async def cog_unload(self):
self.rotate_status.cancel()
async def load_statuses(self):
try:
with open(self.status_file, 'r', encoding='utf-8') as f:
data = json.load(f)
self.statuses = data.get('statuses', [])
logger.info(f"[StatusRotator] loaded {len(self.statuses)} statuses")
logger.info("Loaded %d statuses", len(self.statuses))
except FileNotFoundError:
logger.error(f"[StatusRotator] file {self.status_file} notfound")
logger.error("Status file not found: %s", self.status_file)
except json.JSONDecodeError:
logger.error(f"[StatusRotaror] err while parsing JSON")
logger.error("Failed to parse JSON in: %s", self.status_file)
def get_random_status(self) -> str:
return random.choice(self.statuses)
def get_next_status(self) -> str:
def get_next_status(self) -> Optional[str]:
if not self.statuses:
return None
status = self.statuses[self.current_index]
self.current_index = (self.current_index + 1) % len(self.statuses)
return status
async def update_status(self, status_text: Optional[str] = None):
if status_text is None:
status_text = self.get_random_status()
def get_random_status(self) -> Optional[str]:
if not self.statuses:
return None
return random.choice(self.statuses)
activity = discord.Game(name=status_text)
async def update_status(self, status_text: Optional[str] = None):
if not self.statuses:
logger.warning("No statuses loaded, skipping update")
return
if status_text is None:
status_text = self.get_next_status()
try:
await self.bot.change_presence(activity=activity)
logger.debug(f"[StatusRotator] status updated: {status_text}")
await self.bot.change_presence(activity=discord.Game(name=status_text))
logger.debug("Status updated: %s", status_text)
except Exception as e:
logger.error(f"[StatusRotator] err while updating status: {e}")
logger.error("Failed to update status: %s", e)
@tasks.loop(minutes=1.0)
async def rotate_status(self):

View File

@@ -1,46 +1,47 @@
import discord
from discord.ext import commands
import datetime
def pluralize(n: int, one: str, few: str, many: str) -> str:
if 11 <= n % 100 <= 14:
return f"{n} {many}"
r = n % 10
if r == 1:
return f"{n} {one}"
if 2 <= r <= 4:
return f"{n} {few}"
return f"{n} {many}"
class UptimeSimple(commands.Cog):
def __init__(self, bot):
self.bot = bot
self.start_time = None
@commands.Cog.listener()
async def on_ready(self):
if self.start_time is None:
self.start_time = datetime.datetime.now(datetime.timezone.utc)
self.start_time = discord.utils.utcnow()
@commands.command(name="uptime")
async def uptime(self, ctx):
if self.start_time is None:
await ctx.send("ебать у тебя тайминги кнш")
return
delta = discord.utils.utcnow() - self.start_time
seconds = int(delta.total_seconds())
minutes, secs = divmod(seconds, 60)
hours, minutes = divmod(minutes, 60)
days, hours = divmod(hours, 24)
current_time = datetime.datetime.now(datetime.timezone.utc)
uptime = current_time - self.start_time
seconds = int(uptime.total_seconds())
days = seconds // 86400
hours = (seconds % 86400) // 3600
minutes = (seconds % 3600) // 60
secs = seconds % 60
result = "бот работает уже: "
parts = []
if days:
parts.append(pluralize(days, "день", "дня", "дней"))
if hours:
parts.append(pluralize(hours, "час", "часа", "часов"))
if minutes:
parts.append(pluralize(minutes, "минуту", "минуты", "минут"))
if secs or not parts:
parts.append(pluralize(secs, "секунду", "секунды", "секунд"))
if days > 0:
parts.append(f"{days} дня")
if hours > 0:
parts.append(f"{hours} часа")
if minutes > 0:
parts.append(f"{minutes} минут")
if secs > 0 or not parts:
parts.append(f"{secs} секунд")
embed = discord.Embed(
description="бот работает уже: " + " ".join(parts),
color=discord.Color.green()
)
await ctx.send(embed=embed)
result += " ".join(parts)
await ctx.send(result)
async def setup(bot):
await bot.add_cog(UptimeSimple(bot))

View File

@@ -5,6 +5,7 @@ load_dotenv()
TOKEN = os.getenv('DISCORD_TOKEN')
CHANNEL_ID = 1454107749028855971 # roles channel
roles_message_id = 1454128857102680187
FUNCHOSA_CHANNEL_ID = 1379127661095551048
REACTION_ROLES = {

View File

@@ -1 +0,0 @@
{"message_id": 1454128857102680187}

39
main.py
View File

@@ -13,13 +13,25 @@ logging.basicConfig(
]
)
logger = logging.getLogger(__name__)
intents = discord.Intents.default()
intents.message_content = True
intents.reactions = True
intents.members = True
intents.guilds = True
intents.messages = True
intents.voice_states = True
COGS = [
#! cogs to load
'cogs.role_manager',
'cogs.status_rotator',
'cogs.funchosa_parser',
'cogs.uptime',
'cogs.help',
'cogs.kitty',
]
class Bot(commands.Bot):
def __init__(self):
@@ -30,28 +42,25 @@ class Bot(commands.Bot):
)
async def setup_hook(self):
# ! load cogs
await self.load_extension('cogs.role_manager')
await self.load_extension('cogs.status_rotator')
await self.load_extension('cogs.funchosa_parser')
await self.load_extension('cogs.uptime')
await self.load_extension('cogs.help')
await self.load_extension('cogs.kitty')
#await self.load_extension('cogs.muter') # ass
# adding new modules:
# await self.load_extension('cogs.whyrureadingts')
for cog in COGS:
try:
await self.load_extension(cog)
logger.info("Loaded cog: %s", cog)
except Exception as e:
logger.error("Failed to load cog %s: %s", cog, e, exc_info=True)
await self.tree.sync()
async def on_ready(self):
print(f"bot initialized succesfully with user '{self.user}'")
print(f"user.id: '{self.user.id}'")
print('initialization (probably) complete; further is logs.')
print('\n*------*\n')
if not hasattr(self, '_ready'):
self._ready = True
logger.info("Bot ready: %s (id: %s)", self.user, self.user.id)
async def main():
bot = Bot()
await bot.start(config.TOKEN)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,16 +1,15 @@
import json
import os
MESSAGE_ID_FILE = 'data/message_id.json' #ts for roles
from config import roles_message_id
def save_message_id(message_id):
os.makedirs('data', exist_ok=True)
with open(MESSAGE_ID_FILE, 'w') as f:
with open(roles_message_id, 'w') as f:
json.dump({'message_id': message_id}, f)
def load_message_id():
try:
with open(MESSAGE_ID_FILE, 'r') as f:
with open(roles_message_id, 'r') as f:
data = json.load(f)
return data.get('message_id')
except FileNotFoundError:

View File

@@ -1,17 +1,19 @@
import aiosqlite
import asyncio
from datetime import datetime
import logging
logger = logging.getLogger(__name__)
class FunchosaDatabase:
def __init__(self, db_path='data/funchosa.db'):
self.db_path = db_path
self._conn: aiosqlite.Connection | None = None
async def init_db(self):
async with aiosqlite.connect(self.db_path) as db:
await db.execute('''
self._conn = await aiosqlite.connect(self.db_path)
self._conn.row_factory = aiosqlite.Row
await self._conn.executescript('''
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
message_id BIGINT UNIQUE NOT NULL,
@@ -22,85 +24,85 @@ class FunchosaDatabase:
timestamp TIMESTAMP NOT NULL,
message_url TEXT NOT NULL,
has_attachments BOOLEAN DEFAULT 0,
attachment_urls TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
);
await db.execute('''
CREATE TABLE IF NOT EXISTS attachments (
id INTEGER PRIMARY KEY AUTOINCREMENT,
message_id INTEGER,
url TEXT UNIQUE NOT NULL,
filename TEXT,
FOREIGN KEY (message_id) REFERENCES messages (id)
)
''')
);
await db.execute('''
CREATE TABLE IF NOT EXISTS parsing_status (
id INTEGER PRIMARY KEY CHECK (id = 1),
first_parse_done BOOLEAN DEFAULT 0,
last_parsed_message_id BIGINT,
last_parse_time TIMESTAMP
)
);
CREATE INDEX IF NOT EXISTS idx_message_id ON messages(message_id);
CREATE INDEX IF NOT EXISTS idx_author_id ON messages(author_id);
CREATE INDEX IF NOT EXISTS idx_timestamp ON messages(timestamp);
''')
await self._conn.commit()
logger.info("Database initialized")
await db.execute('CREATE INDEX IF NOT EXISTS idx_message_id ON messages(message_id)')
await db.execute('CREATE INDEX IF NOT EXISTS idx_author_id ON messages(author_id)')
await db.execute('CREATE INDEX IF NOT EXISTS idx_timestamp ON messages(timestamp)')
async def close(self):
if self._conn:
await self._conn.close()
await db.commit()
logger.info("[FunchosaDatabase] funchosa db initialized")
def _parse_message_row(self, row) -> dict:
message = dict(row)
if message.get('attachment_urls_list'):
urls = message['attachment_urls_list'].split(',')
filenames = (message['attachment_filenames'] or '').split(',')
message['attachments'] = [
{'url': url, 'filename': filename}
for url, filename in zip(urls, filenames)
]
else:
message['attachments'] = []
return message
async def get_parsing_status(self):
async with aiosqlite.connect(self.db_path) as db:
cursor = await db.execute(
async def get_parsing_status(self) -> dict:
cursor = await self._conn.execute(
'SELECT first_parse_done, last_parsed_message_id FROM parsing_status WHERE id = 1'
)
result = await cursor.fetchone()
if result:
return {
'first_parse_done': bool(result[0]),
'last_parsed_message_id': result[1]
}
else:
await db.execute(
return {'first_parse_done': bool(result[0]), 'last_parsed_message_id': result[1]}
await self._conn.execute(
'INSERT INTO parsing_status (id, first_parse_done, last_parsed_message_id) VALUES (1, 0, NULL)'
)
await db.commit()
return {
'first_parse_done': False,
'last_parsed_message_id': None
}
await self._conn.commit()
return {'first_parse_done': False, 'last_parsed_message_id': None}
async def update_parsing_status(self, first_parse_done=False, last_parsed_message_id=None):
async with aiosqlite.connect(self.db_path) as db:
await db.execute('''
await self._conn.execute('''
UPDATE parsing_status
SET first_parse_done = ?,
last_parsed_message_id = ?,
last_parse_time = CURRENT_TIMESTAMP
WHERE id = 1
''', (first_parse_done, last_parsed_message_id))
await db.commit()
await self._conn.commit()
async def get_last_message_in_db(self):
async with aiosqlite.connect(self.db_path) as db:
cursor = await db.execute(
cursor = await self._conn.execute(
'SELECT message_id FROM messages ORDER BY message_id DESC LIMIT 1'
)
result = await cursor.fetchone()
return result[0] if result else None
async def save_message(self, message_data):
async with aiosqlite.connect(self.db_path) as db:
cursor = await db.execute('''
async def save_message(self, message_data: dict) -> bool:
cursor = await self._conn.execute('''
INSERT OR IGNORE INTO messages
(message_id, channel_id, author_id, author_name, content,
timestamp, message_url, has_attachments, attachment_urls)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
timestamp, message_url, has_attachments)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''', (
message_data['message_id'],
message_data['channel_id'],
@@ -110,40 +112,27 @@ class FunchosaDatabase:
message_data['timestamp'],
message_data['message_url'],
message_data['has_attachments'],
message_data['attachment_urls']
))
if cursor.rowcount > 0:
message_db_id = cursor.lastrowid
if message_data['attachments']:
for attachment in message_data['attachments']:
await db.execute('''
INSERT OR IGNORE INTO attachments
(message_id, url, filename)
for attachment in message_data.get('attachments', []):
await self._conn.execute('''
INSERT OR IGNORE INTO attachments (message_id, url, filename)
VALUES (?, ?, ?)
''', (
message_db_id,
attachment['url'],
attachment['filename']
))
await db.commit()
''', (message_db_id, attachment['url'], attachment['filename']))
await self._conn.commit()
return True
return False
async def message_exists(self, message_id):
async with aiosqlite.connect(self.db_path) as db:
cursor = await db.execute(
'SELECT 1 FROM messages WHERE message_id = ? LIMIT 1',
(message_id,)
async def message_exists(self, message_id: int) -> bool:
cursor = await self._conn.execute(
'SELECT 1 FROM messages WHERE message_id = ? LIMIT 1', (message_id,)
)
result = await cursor.fetchone()
return result is not None
return await cursor.fetchone() is not None
async def get_random_message(self):
async with aiosqlite.connect(self.db_path) as db:
cursor = await db.execute('''
async def get_random_message(self) -> dict | None:
cursor = await self._conn.execute('''
SELECT m.*,
GROUP_CONCAT(a.url) as attachment_urls_list,
GROUP_CONCAT(a.filename) as attachment_filenames
@@ -153,35 +142,11 @@ class FunchosaDatabase:
ORDER BY RANDOM()
LIMIT 1
''')
row = await cursor.fetchone()
if not row:
return None
return self._parse_message_row(row) if row else None
columns = [description[0] for description in cursor.description]
message = dict(zip(columns, row))
if message['attachment_urls_list']:
urls = message['attachment_urls_list'].split(',')
filenames = message['attachment_filenames'].split(',') if message['attachment_filenames'] else []
message['attachments'] = [
{'url': url, 'filename': filename}
for url, filename in zip(urls, filenames)
]
else:
message['attachments'] = []
return message
async def get_total_count(self):
async with aiosqlite.connect(self.db_path) as db:
cursor = await db.execute('SELECT COUNT(*) FROM messages')
result = await cursor.fetchone()
return result[0] if result else 0
async def get_message_by_number(self, number):
async with aiosqlite.connect(self.db_path) as db:
cursor = await db.execute('''
async def get_message_by_number(self, number: int) -> dict | None:
cursor = await self._conn.execute('''
SELECT m.*,
GROUP_CONCAT(a.url) as attachment_urls_list,
GROUP_CONCAT(a.filename) as attachment_filenames
@@ -190,22 +155,10 @@ class FunchosaDatabase:
WHERE m.id = ?
GROUP BY m.id
''', (number,))
row = await cursor.fetchone()
if not row:
return None
return self._parse_message_row(row) if row else None
columns = [description[0] for description in cursor.description]
message = dict(zip(columns, row))
if message.get('attachment_urls_list'):
urls = message['attachment_urls_list'].split(',')
filenames = message['attachment_filenames'].split(',') if message['attachment_filenames'] else []
message['attachments'] = [
{'url': url, 'filename': filename}
for url, filename in zip(urls, filenames)
]
else:
message['attachments'] = []
return message
async def get_total_count(self) -> int:
cursor = await self._conn.execute('SELECT COUNT(*) FROM messages')
result = await cursor.fetchone()
return result[0] if result else 0