mrpy.nvim/rplugin/python3/mrpy/clickup.py

231 lines
6.8 KiB
Python

from collections.abc import Sequence
from dataclasses import dataclass, field, fields
from json import loads
from os import environ
from typing import Any, Callable, Self, TypedDict
from urllib.error import HTTPError
from .hints import JSONDataMap
from .requests import delete, get, put, post
from .env import EnvVar
class TaskRespCheckItemDict(TypedDict):
name: str
resolved: int
class TaskRespCheckDict(TypedDict):
name: str
items: list[TaskRespCheckItemDict]
class TaskRespDict(TypedDict):
name: str
markdown_description: str
status: dict[str, str]
id: str
assignees: list[dict[str, str]]
tags: list[dict[str, str]]
parent: str | None
locations: list[dict[str, str]]
checklists: list[TaskRespCheckDict]
list: dict[str, str]
@dataclass(kw_only=True)
class ClickupTask:
"""fields marked with ``repr=False`` will not be pushed for updates"""
name: str
status: str
id: str | None = field(default=None, repr=False)
assignees: list[str] | None = field(default=None, repr=False)
tags: list[str] | None = field(default=None, repr=False)
parent_list: str = field(repr=False)
locations: list[str] | None = field(default=None, repr=False)
checklists: dict[str, dict[str, bool]] | None = field(default=None, repr=False)
parent: str | None = field(default=None, repr=False)
markdown_content: str
@classmethod
def from_resp_data(cls, resp_data_raw: dict[str, Any]) -> Self:
resp_data = TaskRespDict(**resp_data_raw)
return cls(
name=resp_data["name"],
markdown_content=resp_data["markdown_description"],
status=resp_data["status"]["status"],
id=resp_data["id"],
assignees=cls.convert_assignees(resp_data["assignees"]),
tags=cls.convert_simple_list_with_names(resp_data["tags"]),
parent=resp_data.get("parent"),
parent_list=resp_data["list"]["id"],
locations=cls.convert_simple_list_with_names(resp_data["locations"]),
checklists={
tlist["name"]: {t["name"]: bool(t["resolved"]) for t in tlist["items"]}
for tlist in resp_data["checklists"]
},
)
@classmethod
def convert_checklists(cls, content: Sequence[Any]) -> dict[str, bool]:
return {
entry["name"]: entry["resolved"]
for checklist in content
for entry in checklist["items"]
}
@classmethod
def convert_assignees(cls, content: Sequence[str | dict[str, str]]) -> list[str]:
return [(e if isinstance(e, str) else e["username"]) for e in content]
@classmethod
def convert_simple_list_with_names(
cls,
content: Sequence[str | dict[str, str]],
) -> list[str]:
return [(e if isinstance(e, str) else e["name"]) for e in content]
@property
def updateables(self) -> dict[str, Any]:
return {f.name: getattr(self, f.name, None) for f in fields(type(self)) if f.repr}
@property
def showables(self) -> dict[str, Any]:
return {
dcf.name: getattr(self, dcf.name, ...)
for dcf in fields(self)
if getattr(self, dcf.name, ...) is not None
}
@property
def short(self) -> str:
ret = ""
if self.parent:
ret += " \033[32m 󰙅 "
ret += self.name
if self.tags:
ret += "".join(f" #{t}" for t in self.tags)
ret += f" (#{self.id})"
return ret
@dataclass
class ClickupSession:
auth_key: str = field(
default_factory=EnvVar(
"CLICKUP_AUTH",
"clickup auth token is required to be set",
)
)
workspace_id: str = field(
default_factory=EnvVar(
"CLICKUP_WORKSPACE_ID",
"clickup workspace id is required to be set",
)
)
user_id: str = field(
default_factory=EnvVar(
"CLICKUP_USER_ID",
"clickup user id is required to be set",
)
)
base_url: str = "https://api.clickup.com/api/v2"
def _get(self, endpoint: str, **query_params: str) -> JSONDataMap:
try:
raw_data = get(
self.base_url + endpoint,
query_params,
headers={
"accept": "application/json",
"Authorization": self.auth_key,
},
)
return loads(raw_data)
except HTTPError as e:
e.add_note(e.url)
raise
def _put(self, endpoint: str, **body_params: str) -> JSONDataMap:
raw_data = put(
self.base_url + endpoint,
body_params,
headers={
"accept": "application/json",
"content-type": "application/json",
"Authorization": self.auth_key,
},
)
return loads(raw_data)
def _post(self, endpoint: str, **body_params: str) -> JSONDataMap:
raw_data = post(
self.base_url + endpoint,
body_params,
headers={
"accept": "application/json",
"content-type": "application/json",
"Authorization": self.auth_key,
},
)
return loads(raw_data)
def _delete(self, endpoint: str) -> JSONDataMap:
raw_data = delete(
self.base_url + endpoint,
headers={
"accept": "application/json",
"content-type": "application/json",
"Authorization": self.auth_key,
},
)
return loads(raw_data)
def get_tasks(self, **filters: str) -> list[ClickupTask]:
data = self._get(
f"/team/{self.workspace_id}/task",
**{
"subtask": "true",
"include_markdown_description": "true",
"assignees[]": self.user_id,
}
| filters,
).get("tasks", [])
if isinstance(data, list):
return [ClickupTask.from_resp_data(t) for t in data if isinstance(t, dict)]
return []
def get_task(self, task_id: str) -> ClickupTask:
return ClickupTask.from_resp_data(
self._get(
f"/task/{task_id}",
include_markdown_description="true",
),
)
def update(self, data: ClickupTask) -> None:
ret_task = self._put(f"/task/{data.id}", **data.updateables)
current_tags: set[str] = set(ret_task["tags"])
new_tags = set(data.tags)
for del_tag in current_tags - new_tags:
self._delete(f"/task/{data.id}/tag/{del_tag}")
for add_tag in new_tags - current_tags:
self._post(f"/task/{data.id}/tag/{add_tag}")
def create(self, data: ClickupTask) -> str:
ret_task = self._post(f"/list/{data.parent_list}/task", **data.updateables)
return str(ret_task["id"])