First Commit

This commit is contained in:
Mattia Mascarello 2024-05-16 00:35:28 +02:00
commit d08713c359
5 changed files with 877 additions and 0 deletions

21
LICENSE Normal file
View File

@ -0,0 +1,21 @@
MIT License
Copyright (c) 2021 Mattia Mascarello
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

42
README.md Normal file
View File

@ -0,0 +1,42 @@
# Birthday Reminder Bot
A simple telegram bot that reminds you of your friends' birthdays.
- Birthday list (current age, time to birthday, sorted)
- Monthly, weekly, daily reminders
> I made this bot for my personal use, but you can use it too. Just follow the instructions below.
## Installation
1. Clone the repository
```bash
git clone git@github.com:MatMasIt/birthdaybot.git
```
2. Create a virtual environment
```bash
python3 -m venv venv
```
3. Activate the virtual environment
```bash
source venv/bin/activate
```
4. Install the requirements
```bash
pip install -r requirements.txt
```
5. Create a `.env` file in the root directory and add the following variables
```bash
TOKEN=YOUR_TELEGRAM_BOT_TOKEN
```
6. Run the bot
```bash
python bot.py
```

66
dataTypes.py Normal file
View File

@ -0,0 +1,66 @@
from __future__ import annotations
import datetime
from typing import List
from sqlalchemy import String, Column, Integer, DateTime, create_engine, ForeignKey, Boolean
from sqlalchemy.orm import declarative_base, Mapped, relationship, mapped_column
from telegram import User
Base = declarative_base()
class TelegramUser(Base):
__tablename__ = "user"
id: Mapped[int] = Column(Integer, primary_key=True)
username: Mapped[str | None] = Column(String)
first_name: Mapped[str] = Column(String)
last_name: Mapped[str | None] = Column(String)
language_code: Mapped[str | None] = Column(String)
last_seen: Mapped[datetime.datetime] = Column(DateTime)
birthdays: Mapped[List["Birthday"]] = relationship("Birthday", back_populates="user")
monthly: Mapped[bool] = Column(Boolean, default=True)
weekly: Mapped[bool] = Column(Boolean, default=True)
dailiy: Mapped[bool] = Column(Boolean, default=True)
@staticmethod
def from_user(user: User) -> TelegramUser:
return TelegramUser(
id=user.id,
username=user.username,
first_name=user.first_name,
last_name=user.last_name,
language_code=user.language_code,
last_seen=datetime.datetime.now(),
monthly = True,
weekly = True,
dailiy = True,
)
def update_user(self, user: User) -> None:
self.username = user.username
self.first_name = user.first_name
self.last_name = user.last_name
self.language_code = user.language_code
self.last_seen = datetime.datetime.now()
def __repr__(self) -> str:
return f"<TelegramUser(id={self.id}, username={self.username}, first_name={self.first_name}, last_name={self.last_name}, language_code={self.language_code}, last_seen={self.last_seen})>"
def __str__(self) -> str:
return f"{self.first_name} {self.last_name} (@{self.username})"
class Birthday(Base):
__tablename__ = "birthday"
id: Mapped[int] = Column(Integer, primary_key=True, autoincrement=True)
first_name: Mapped[str] = Column(String)
last_name: Mapped[str] = Column(String)
birth: Mapped[datetime.datetime] = Column(DateTime)
user_id: Mapped[int] = Column(Integer, ForeignKey("user.id"))
user: Mapped["TelegramUser"] = relationship("TelegramUser", back_populates="birthdays")
is_anniversary: Mapped[bool] = Column(Boolean, default=False)

745
main.py Normal file
View File

@ -0,0 +1,745 @@
#!/usr/bin/env python
# pylint: disable=unused-argument
import datetime
import logging
from typing import Final
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
from dataTypes import TelegramUser, Birthday
import threading
import asyncio
# load token from env
from dotenv import load_dotenv
import os
load_dotenv()
TOKEN: Final = os.getenv("TOKEN")
async def sleep_until(hour: int, minute: int, second: int):
"""Asynchronous wait until specific hour, minute and second
Args:
hour (int): Hour
minute (int): Minute
second (int): Second
"""
t = datetime.datetime.today()
future = datetime.datetime(t.year, t.month, t.day, hour, minute, second)
if t.timestamp() > future.timestamp():
future += datetime.timedelta(days=1)
await asyncio.sleep((future - t).total_seconds())
engine = create_engine("sqlite+pysqlite:///database.db", echo=True)
#create_database(engine.url)
session = Session(engine)
from telegram import ReplyKeyboardMarkup, ReplyKeyboardRemove, Update, InlineKeyboardMarkup, InlineKeyboardButton
from telegram.ext import (
Application,
CommandHandler,
ContextTypes,
ConversationHandler,
MessageHandler,
filters, PicklePersistence, CallbackQueryHandler,
)
# Enable logging
logging.basicConfig(
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.DEBUG
)
# set higher logging level for httpx to avoid all GET and POST requests being logged
#logging.getLogger("httpx").setLevel(logging.WARNING)
logger = logging.getLogger(__name__)
NAME, SURNAME, DATETIME = range(3)
def remaining_months_and_days(birth: datetime.datetime) -> tuple[int, int]:
"""Returns the remaining months and days until the next birthday"""
today = datetime.date.today()
# if the birthday has already passed this year, calculate the remaining time until next year's birthday
if today.month > birth.month or (today.month == birth.month and today.day > birth.day):
next_birthday = datetime.date(today.year + 1, birth.month, birth.day)
else:
# otherwise, calculate the remaining time until this year's birthday
next_birthday = datetime.date(today.year, birth.month, birth.day)
# calculate the difference between the two dates and return the remaining months and days
remaining = next_birthday - today
return remaining.days // 30, remaining.days % 30
def calculate_age(born):
"""Returns the age of a person given the date of birth"""
today = datetime.date.today()
return today.year - born.year - ((today.month, today.day) < (born.month, born.day))
def main_menu(t: TelegramUser) -> list[list[InlineKeyboardButton]]:
"""The main menu keyboard, with the options to add a birthday, list all birthdays and set reminders"""
global session # this is needed to access the database
final = []
first_row = [
InlineKeyboardButton("🎂➕ Add Birthday", callback_data="main_menu_add_birthday")
]
# if there are birthdays in the database, add the option to list them, but not if they are anniversaries (not yet implemented)
if session.query(Birthday).filter(Birthday.user_id == t.id, Birthday.is_anniversary == False).count() > 0:
first_row.append(InlineKeyboardButton("🎂📒 List Birthdays", callback_data="main_menu_list_birthday"))
final.append(first_row)
final.append([InlineKeyboardButton("🔔 Set reminders", callback_data="main_menu_set_reminders")])
final.append([InlineKeyboardButton(" About", callback_data="main_menu_info")])
return final
def db_user_ping(update: Update) -> list[bool, TelegramUser]:
"""This is for updating the user in the database, whatever action he does, and creating it if it doesn't exist"""
global session
id_t: int = update.message.from_user.id
t = session.query(TelegramUser).filter(
TelegramUser.id == id_t).first() # check if the user exists, and if it does, update it
new_user = t is None
if new_user:
t: TelegramUser = TelegramUser.from_user(update.message.from_user)
session.add(t)
else:
t.update_user(update.message.from_user)
session.commit()
return [new_user, t]
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle the /start command, which is the first command that is sent when the user opens the bot"""
new_user, t = db_user_ping(update) # update the user in the database
reply_keyboard = main_menu(t) # get the main menu keyboard
# send the welcome message
await update.message.reply_text(
("Welcome!" if new_user else "Welcome back!") +
"\n\nThis bot will help you keep track of your friends' birthdays and remind you when they are coming up.\n\n",
reply_markup=ReplyKeyboardMarkup(
reply_keyboard, one_time_keyboard=True, input_field_placeholder="Action"
),
)
async def add_birthday(update: Update, context: ContextTypes.DEFAULT_TYPE):
db_user_ping(update)
"""Start the birthday adding conversation, asking for the name of the person"""
reply_keyboard = [[
InlineKeyboardButton("❌ Cancel", callback_data="main_menu")
]]
await update.message.reply_text(
"Type the first name of the person you want to add\n\n",
reply_markup=ReplyKeyboardMarkup(
reply_keyboard, one_time_keyboard=True
),
parse_mode="Markdown"
)
# go to the next step of the conversation
return NAME
async def name(update: Update, context: ContextTypes.DEFAULT_TYPE):
db_user_ping(update)
"""Ask for the name of the person"""
context.user_data["name"] = update.message.text.strip()
reply_keyboard = [[
InlineKeyboardButton("❌ Cancel", callback_data="main_menu")
]]
await update.message.reply_text( # ask for the surname
"Add the last name of *" + context.user_data["name"] + "*\n\n",
reply_markup=ReplyKeyboardMarkup(
reply_keyboard, one_time_keyboard=True
),
parse_mode="Markdown"
)
return SURNAME
async def surname(update: Update, context: ContextTypes.DEFAULT_TYPE):
db_user_ping(update)
"""Ask for the last name of the person"""
global session
context.user_data["surname"] = update.message.text.strip()
reply_keyboard = [[
InlineKeyboardButton("❌ Cancel", callback_data="main_menu")
]]
# check if the person is already in the database, and if it is, ask for the name again
if session.query(Birthday).filter(Birthday.user_id == update.message.from_user.id,
Birthday.first_name == context.user_data["name"],
Birthday.last_name == context.user_data["surname"]).count() > 0:
await update.message.reply_text(
"*" + context.user_data["name"] + " " + context.user_data[
"surname"] + "* is already in your database, set the name again or cancel the operation\n\n",
reply_markup=ReplyKeyboardMarkup(
reply_keyboard, one_time_keyboard=True
),
parse_mode="Markdown"
)
return NAME
# otherwise, ask for the date of birth
await update.message.reply_text(
"Add the date of birth of *" + context.user_data["name"] + " " + context.user_data["surname"] +
"* in the day/month/year format \n\n",
reply_markup=ReplyKeyboardMarkup(
reply_keyboard, one_time_keyboard=True
),
parse_mode="Markdown"
)
return DATETIME # advance to the next step of the conversation
async def datetime_p(update: Update, context: ContextTypes.DEFAULT_TYPE):
db_user_ping(update)
"""Ask for the date of birth"""
context.user_data["datetime"] = update.message.text.strip()
reply_keyboard = [[
InlineKeyboardButton("❌ Cancel", callback_data="main_menu")
]]
# gather saved information from cotnext
name = context.user_data["name"].strip()
surname = context.user_data["surname"].strip()
dt = context.user_data["datetime"].strip()
try:
datetime_object = datetime.datetime.strptime(dt, '%d/%m/%Y')
except ValueError: # malformed
await update.message.reply_text(
"Wrong date format, please try again\n\n",
reply_markup=ReplyKeyboardMarkup(
reply_keyboard, one_time_keyboard=True
)
)
return DATETIME
if datetime_object > datetime.datetime.now(): # future date?
await update.message.reply_text(
"The date you entered is in the future, please try again\n\n",
reply_markup=ReplyKeyboardMarkup(
reply_keyboard, one_time_keyboard=True
)
)
return DATETIME
await update.message.reply_text(
"*Information saved*\n\nName: _" + name + "_ \nLast name: _" + surname + "_ \nDate of birth: _" + dt + "_\n\n",
reply_markup=ReplyKeyboardMarkup(
reply_keyboard, one_time_keyboard=True
),
parse_mode="Markdown"
)
# create and save
b = Birthday(first_name=name, last_name=surname, birth=datetime_object, user_id=update.message.from_user.id,
is_anniversary=False)
session.add(b)
session.commit()
await start(update, context)
return ConversationHandler.END
async def end(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
db_user_ping(update)
context.user_data.clear()
await start(update, context)
return ConversationHandler.END
async def list_birthday(update: Update, context: ContextTypes.DEFAULT_TYPE):
db_user_ping(update)
"""List all the birthdays in the database"""
global session
birthdays = session.query(Birthday).filter(Birthday.user_id == update.message.from_user.id).order_by(
Birthday.last_name).all()
reply_keyboard = [[
InlineKeyboardButton("🏠 Home", callback_data="home") # go back to the main menu
]]
messages = []
list = ""
for b in birthdays:
# calculate the remaining time until the next birthday
nmonths, ndays = remaining_months_and_days(b.birth)
nextin = ""
# pretty formatting of the remaining time
if nmonths > 0:
nextin += str(nmonths) + " months"
if ndays > 0 and nmonths > 0:
nextin += " and "
if ndays > 0:
nextin += str(ndays) + " days"
# add the birthday to the list
row = "" + b.first_name + " " + b.last_name + " " + b.birth.strftime("%d/%m/%Y") + "\n *" + str(
calculate_age(b.birth)) + "* years old\n next in *" + nextin + "\n /view_bd_" + str(b.id) + "*\n"
if len(list + row) > 3000: # Telegram has a limit of 4096 characters per message, so we split the list in multiple messages
messages.append(list) # add the current message to the list of messages
list = row
else:
list += row
if len(list) != 0: # if there is some remaining text, add it to the list of messages as the last message
messages.append(list)
for m in messages: # send all the messages
await update.message.reply_text(
"*Birthdays* 🎂\n\n" + m,
reply_markup=ReplyKeyboardMarkup(
reply_keyboard, one_time_keyboard=True
),
parse_mode="Markdown"
)
async def view_birthday(update: Update, context: ContextTypes.DEFAULT_TYPE, birthday_id: int = None):
db_user_ping(update)
"""View a birthday in detail, with the possibility to edit or delete it"""
reply_keyboard = [[
InlineKeyboardButton("🏠 Home"),
InlineKeyboardButton("🗑️ Delete"),
InlineKeyboardButton("📝 Edit")
]]
if birthday_id is None: # if the id is empty, return to the main menu
birthday_id = update.message.text.strip().split("_")[-1]
b = session.query(Birthday).filter(Birthday.id == birthday_id).first()
if b is not None and b.user_id == update.message.from_user.id: # check if the birthday exists and belongs to the user
context.user_data["view_bd_id"] = birthday_id
nmonths, ndays = remaining_months_and_days(b.birth)
nextin = ""
if nmonths > 0:
nextin += str(nmonths) + " months"
if ndays > 0 and nmonths > 0:
nextin += " and "
if ndays > 0:
nextin += str(ndays) + " days"
await update.message.reply_text(
b.first_name + " " + b.last_name + " " + b.birth.strftime("%d/%m/%Y") + "\n *" + str(
calculate_age(b.birth)) + "* years old\n next in *" + nextin + "\n*\n",
reply_markup=ReplyKeyboardMarkup(
reply_keyboard, one_time_keyboard=True
),
parse_mode="Markdown")
async def delete_birthday(update: Update, context: ContextTypes.DEFAULT_TYPE):
db_user_ping(update)
"""ask for confirmation for the deletion of a birthday"""
bdid = context.user_data["view_bd_id"]
b = session.query(Birthday).filter(Birthday.id == bdid).first()
if b is not None and b.user_id == update.message.from_user.id:
# are you sure?
keyboard = [
[InlineKeyboardButton("✅ Yes, delete")],
[InlineKeyboardButton("🏠 Home")]
]
await update.message.reply_text(
"Are you sure you want to delete the entry for *" + b.first_name + " " + b.last_name + "*?",
reply_markup=ReplyKeyboardMarkup(keyboard, one_time_keyboard=True, input_field_placeholder="Action"),
parse_mode="Markdown")
async def edit_birthday(update: Update, context: ContextTypes.DEFAULT_TYPE):
db_user_ping(update)
"""ask for confirmation for the deletion of a birthday"""
bdid = context.user_data["view_bd_id"]
b = session.query(Birthday).filter(Birthday.id == bdid).first()
if b is not None and b.user_id == update.message.from_user.id:
# which field do you want to edit?
keyboard = [
[InlineKeyboardButton("📅 Date of birth")],
[InlineKeyboardButton("👤 First name"), InlineKeyboardButton("👤 Last name")],
[InlineKeyboardButton("🏠 Home")]
]
await update.message.reply_text("Which one to edit?",
reply_markup=ReplyKeyboardMarkup(keyboard, one_time_keyboard=True,
input_field_placeholder="Action"),
parse_mode="Markdown")
async def edit_date(update: Update, context: ContextTypes.DEFAULT_TYPE):
db_user_ping(update)
"""allow to edit the date of birth of a birthday"""
keyboard = [
[InlineKeyboardButton("❌ Cancel")]
]
bdid = context.user_data["view_bd_id"]
b = session.query(Birthday).filter(Birthday.id == bdid).first()
if b is not None and b.user_id == update.message.from_user.id:
# ask for the new date of birth
await update.message.reply_text("Enter the new date of birth in the format DD/MM/YYYY",
reply_markup=ReplyKeyboardMarkup(keyboard, one_time_keyboard=True,
input_field_placeholder="DD/MM/YYYY"),
parse_mode="Markdown")
return DATETIME
else:
await start(update, context)
await update.message.reply_text("⚠️ Birthday not found",
reply_markup=ReplyKeyboardMarkup(keyboard, one_time_keyboard=True,
input_field_placeholder="Action"))
return ConversationHandler.END
async def edit_name(update: Update, context: ContextTypes.DEFAULT_TYPE):
db_user_ping(update)
"""allow to edit the first name of a birthday"""
keyboard = [
[InlineKeyboardButton("❌ Cancel")]
]
bdid = context.user_data["view_bd_id"]
b = session.query(Birthday).filter(Birthday.id == bdid).first()
if b is not None and b.user_id == update.message.from_user.id:
# ask for the new first name
await update.message.reply_text("Enter the new first name",
reply_markup=ReplyKeyboardMarkup(keyboard, one_time_keyboard=True,
input_field_placeholder="First name"),
parse_mode="Markdown")
return NAME
else:
await start(update, context)
await update.message.reply_text("⚠️ Birthday not found",
reply_markup=ReplyKeyboardMarkup(keyboard, one_time_keyboard=True,
input_field_placeholder="Action"))
return ConversationHandler.END
async def edit_surname(update: Update, context: ContextTypes.DEFAULT_TYPE):
db_user_ping(update)
"""allow to edit the last name of a birthday"""
keyboard = [
[InlineKeyboardButton("❌ Cancel")]
]
bdid = context.user_data["view_bd_id"]
b = session.query(Birthday).filter(Birthday.id == bdid).first()
if b is not None and b.user_id == update.message.from_user.id:
# ask for the new last name
await update.message.reply_text("Enter the new last name",
reply_markup=ReplyKeyboardMarkup(keyboard, one_time_keyboard=True,
input_field_placeholder="Last name"),
parse_mode="Markdown")
return SURNAME
else:
await start(update, context)
await update.message.reply_text("⚠️ Birthday not found",
reply_markup=ReplyKeyboardMarkup(keyboard, one_time_keyboard=True,
input_field_placeholder="Action"))
return ConversationHandler.END
async def edit_date_data(update: Update, context: ContextTypes):
db_user_ping(update)
"""edit the date of birth of a birthday"""
bdid = context.user_data["view_bd_id"]
b = session.query(Birthday).filter(Birthday.id == bdid).first()
if b is not None and b.user_id == update.message.from_user.id:
try:
date = datetime.datetime.strptime(update.message.text, "%d/%m/%Y")
b.birth = date
session.commit()
await start(update, context)
if datetime.datetime.now() < date:
await update.message.reply_text("⚠️ The date of birth is in the future, type it again or cancel")
return DATETIME
else:
await update.message.reply_text("✅ Date of birth updated")
await view_birthday(update, context, bdid)
return ConversationHandler.END
except ValueError:
await update.message.reply_text("⚠️ Invalid date format")
await edit_date(update, context)
return DATETIME
else:
await start(update, context)
await update.message.reply_text("⚠️ Birthday not found")
return ConversationHandler.END
async def edit_name_data(update: Update, context: ContextTypes):
db_user_ping(update)
"""edit the first name of a birthday"""
bdid = context.user_data["view_bd_id"]
b = session.query(Birthday).filter(Birthday.id == bdid).first()
if b is not None and b.user_id == update.message.from_user.id:
b.first_name = update.message.text
session.commit()
await start(update, context)
await update.message.reply_text("✅ First name updated")
await view_birthday(update, context, bdid)
return ConversationHandler.END
else:
await start(update, context)
await update.message.reply_text("⚠️ Birthday not found")
return ConversationHandler.END
async def edit_surname_data(update: Update, context: ContextTypes):
db_user_ping(update)
"""edit the last name of a birthday"""
bdid = context.user_data["view_bd_id"]
b = session.query(Birthday).filter(Birthday.id == bdid).first()
if b is not None and b.user_id == update.message.from_user.id:
b.last_name = update.message.text
session.commit()
await start(update, context)
await update.message.reply_text("✅ Last name updated")
await view_birthday(update, context, bdid)
return ConversationHandler.END
else:
await start(update, context)
await update.message.reply_text("⚠️ Birthday not found")
return ConversationHandler.END
async def delete_birthday_confirmed(update: Update, context: ContextTypes.DEFAULT_TYPE):
db_user_ping(update)
"""delete a birthday"""
bdid = context.user_data["view_bd_id"]
b = session.query(Birthday).filter(Birthday.id == bdid).first()
if b is not None and b.user_id == update.message.from_user.id:
session.delete(b)
session.commit()
await update.message.reply_text("✅ Birthday deleted")
await start(update, context)
else:
await start(update, context)
await update.message.reply_text("⚠️ Birthday not found")
async def about(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""show info about the bot"""
await update.message.reply_text("This bot is made by @matmasak.\n"
"The source code is available on [GitHub](https://github.com/MatMasIt/birthdaybot)", parse_mode="Markdown")
async def reminders(update: Update, context: ContextTypes.DEFAULT_TYPE):
keyboard = [
]
_, t = db_user_ping(update)
if t.weekly:
keyboard.append([InlineKeyboardButton("✅ Weekly")])
else:
keyboard.append([InlineKeyboardButton("❌ Weekly")])
if t.monthly:
keyboard.append([InlineKeyboardButton("✅ Monthly")])
else:
keyboard.append([InlineKeyboardButton("❌ Monthly")])
if t.dailiy:
keyboard.append([InlineKeyboardButton("✅ Daily")])
else:
keyboard.append([InlineKeyboardButton("❌ Daily")])
keyboard.append([InlineKeyboardButton("🏠 Home")])
await update.message.reply_text("Choose which reminders you want to receive",
reply_markup=ReplyKeyboardMarkup(keyboard))
async def weekly_on(update: Update, context: ContextTypes.DEFAULT_TYPE):
_, t = db_user_ping(update)
t.weekly = True
session.commit()
await reminders(update, context)
async def weekly_off(update: Update, context: ContextTypes.DEFAULT_TYPE):
_, t = db_user_ping(update)
t.weekly = False
session.commit()
await reminders(update, context)
async def monthly_on(update: Update, context: ContextTypes.DEFAULT_TYPE):
_, t = db_user_ping(update)
t.monthly = True
session.commit()
await reminders(update, context)
async def monthly_off(update: Update, context: ContextTypes.DEFAULT_TYPE):
_, t = db_user_ping(update)
t.monthly = False
session.commit()
await reminders(update, context)
async def daily_on(update: Update, context: ContextTypes.DEFAULT_TYPE):
_, t = db_user_ping(update)
t.dailiy = True
session.commit()
await reminders(update, context)
async def daily_off(update: Update, context: ContextTypes.DEFAULT_TYPE):
_, t = db_user_ping(update)
t.dailiy = False
session.commit()
await reminders(update, context)
async def report(application: Application):
global session
while True:
for user in session.query(TelegramUser).all():
if datetime.datetime.now().day == 1 and user.monthly:
messages = []
text = ""
bds = session.query(Birthday).filter(Birthday.user_id == user.id).all()
if len(bds) == 0:
bds = sorted(bds, key=lambda x: x.birth.month + x.birth.day / 100)
for b in session.query(Birthday).filter(Birthday.user_id == user.id).all():
if b.birth.month == datetime.datetime.now().month:
tt = f"{b.first_name} {b.last_name} - {b.birth.day}/{b.birth.month}/{b.birth.year}, {calculate_age(b.birth)} years\n"
if len(text) + len(tt) > 4096:
messages.append(text)
text = tt
else:
text += tt
if len(text) > 0:
messages.append(text)
if len(messages) > 0:
await application.bot.send_message(chat_id=user.id, text="*Birthdays in this month*\n",
parse_mode="Markdown")
for m in messages:
await application.bot.send_message(chat_id=user.id, text=m)
elif datetime.datetime.now().weekday() == 0 and user.weekly:
messages = []
text = ""
bds = session.query(Birthday).filter(Birthday.user_id == user.id).all()
if len(bds):
bds = sorted(bds, key=lambda x: x.birth.month + x.birth.day / 100)
for b in session.query(Birthday).filter(Birthday.user_id == user.id).all():
# check if date falls in this week
if b.birth.month == datetime.datetime.now().month and datetime.datetime.now().day <= b.birth.day < datetime.datetime.now().day + 7:
tt = f"{b.first_name} {b.last_name} - {b.birth.day}/{b.birth.month}/{b.birth.year}, {calculate_age(b.birth)} anni\n"
if len(text) + len(tt) > 4096:
messages.append(text)
text = tt
else:
text += tt
if len(text) > 0:
messages.append(text)
if len(messages) > 0:
await application.bot.send_message(chat_id=user.id, text="*Birthdays in this week*\n",
parse_mode="Markdown")
for m in messages:
await application.bot.send_message(chat_id=user.id, text=m)
else:
# daily
messages = []
text = ""
bds = session.query(Birthday).filter(Birthday.user_id == user.id).all()
if len(bds):
bds = sorted(bds, key=lambda x: x.birth.month + x.birth.day / 100)
for b in bds:
if b.birth.month == datetime.datetime.now().month and b.birth.day == datetime.datetime.now().day:
tt = f"{b.first_name} {b.last_name} - {b.birth.day}/{b.birth.month}/{b.birth.year}, {calculate_age(b.birth)} anni\n"
if len(text) + len(tt) > 4096:
messages.append(text)
text = tt
else:
text += tt
if len(text) > 0:
messages.append(text)
if len(messages) > 0:
await application.bot.send_message(chat_id=user.id, text="*Birthdays today*\n",
parse_mode="Markdown")
for m in messages:
await application.bot.send_message(chat_id=user.id, text=m)
await sleep_until(0, 0, 0)
def main() -> None:
"""Run the bot."""
# Create the Application and pass it your bot's token.
persistence = PicklePersistence(filepath="conversationbot")
application = Application.builder().token(TOKEN).persistence(persistence).build()
start_handler = CommandHandler("start", start)
birthday_conversation = ConversationHandler(
entry_points=[MessageHandler(filters.Regex("🎂➕ Add Birthday"), add_birthday)],
states={
NAME: [
MessageHandler(filters.Regex("❌ Cancel"), end),
MessageHandler(filters.TEXT, name),
],
SURNAME: [
MessageHandler(filters.Regex("❌ Cancel"), end),
MessageHandler(filters.TEXT, surname),
],
DATETIME: [
MessageHandler(filters.Regex("❌ Cancel"), end),
MessageHandler(filters.TEXT, datetime_p),
]
},
fallbacks=[MessageHandler(filters.Regex("❌ Cancel"), end)],
)
edit_conversation = ConversationHandler(
entry_points=[
MessageHandler(filters.Regex("📅 Date of birth"), edit_date),
MessageHandler(filters.Regex("👤 First name"), edit_name),
MessageHandler(filters.Regex("👤 Last name"), edit_surname)
],
states={
NAME: [
MessageHandler(filters.Regex("❌ Cancel"), end),
MessageHandler(filters.TEXT, edit_name_data),
],
SURNAME: [
MessageHandler(filters.Regex("❌ Cancel"), end),
MessageHandler(filters.TEXT, edit_surname_data),
],
DATETIME: [
MessageHandler(filters.Regex("❌ Cancel"), end),
MessageHandler(filters.TEXT, edit_date_data),
]
},
fallbacks=[MessageHandler(filters.Regex("❌ Cancel"), end)],
)
application.add_handlers([start_handler,
birthday_conversation,
MessageHandler(filters.Regex("🎂📒 List Birthdays"), list_birthday),
MessageHandler(filters.Regex("🏠 Home"), start),
MessageHandler(filters.Regex("🗑️ Delete"), delete_birthday),
MessageHandler(filters.Regex("📝 Edit"), edit_birthday),
MessageHandler(filters.Regex("✅ Yes, delete"), delete_birthday_confirmed),
MessageHandler(filters.Regex("^(/view_bd_[\d]+)$"), view_birthday),
edit_conversation,
MessageHandler(filters.Regex("👤 First name"), edit_name),
MessageHandler(filters.Regex("👤 Last name"), edit_surname),
MessageHandler(filters.Regex("🔔 Set reminders"), reminders),
MessageHandler(filters.Regex(" About"), about),
MessageHandler(filters.Regex("✅ Weekly"), weekly_off),
MessageHandler(filters.Regex("❌ Weekly"), weekly_on),
MessageHandler(filters.Regex("✅ Daily"), daily_off),
MessageHandler(filters.Regex("❌ Daily"), daily_on),
MessageHandler(filters.Regex("✅ Monthly"), monthly_off),
MessageHandler(filters.Regex("❌ Monthly"), monthly_on),
])
threading.Thread(target=asyncio.run, args=(report(application),)).start()
# Run the bot until the user presses Ctrl-C
application.run_polling(allowed_updates=Update.ALL_TYPES)
if __name__ == "__main__":
main()

3
requirements.txt Normal file
View File

@ -0,0 +1,3 @@
SQLAlchemy==1.4.47
python-telegram-bot==21.0.1
python-dotenv==1.0.1