cashflow_analyse/cashflow_analyser.py

49 lines
1.7 KiB
Python
Raw Normal View History

2023-08-09 13:10:24 +00:00
import asyncio
import kanboard
from time import sleep
2023-08-09 13:15:13 +00:00
kb = kanboard.Client(url="add", username="add", api_token="add")
2023-08-09 13:10:24 +00:00
async def fetch_tasks_from_column(project_id, column_name):
all_tasks = await kb.get_all_tasks_async(project_id=project_id, column_name=column_name)
return all_tasks
async def create_new_task(project_id, column_name, title, description):
task_id = await kb.create_task_async(
project_id=project_id,
title=title,
description=description,
column_name=column_name
)
return task_id
async def sum_costs_from_tasks(project_id, column_names):
total_cost = 0
for column in column_names:
tasks = await fetch_tasks_from_column(project_id, column)
for task in tasks:
if "Стоимость: " in task['description']:
try:
cost = float(task['description'].split("Стоимость: ")[1])
total_cost += cost
except ValueError as e:
print(f"Failed to extract cost from task {task['id']}. Error: {e}")
return total_cost
async def main_loop(project_id):
while True:
try:
total_cost = await sum_costs_from_tasks(project_id, ["Отгрузка", "Оплата"])
await create_new_task(project_id, "атистика", "Стоимость договоров", f"Total: {total_cost}")
sleep(1200) # Wait for 20 minutes
except KeyboardInterrupt:
print("Script interrupted by user.")
break
except Exception as e:
print(f"Unexpected error: {e}")
PROJECT_ID = "2"
asyncio.run(main_loop(PROJECT_ID))