231 lines
6.8 KiB
Python
231 lines
6.8 KiB
Python
from collections.abc import Sequence
|
|
from dataclasses import dataclass, field, fields
|
|
from json import loads
|
|
from os import environ
|
|
from typing import Any, Callable, Self, TypedDict
|
|
from urllib.error import HTTPError
|
|
|
|
from .hints import JSONDataMap
|
|
from .requests import delete, get, put, post
|
|
from .env import EnvVar
|
|
|
|
|
|
class TaskRespCheckItemDict(TypedDict):
|
|
name: str
|
|
resolved: int
|
|
|
|
|
|
class TaskRespCheckDict(TypedDict):
|
|
name: str
|
|
items: list[TaskRespCheckItemDict]
|
|
|
|
|
|
class TaskRespDict(TypedDict):
|
|
name: str
|
|
markdown_description: str
|
|
status: dict[str, str]
|
|
|
|
id: str
|
|
assignees: list[dict[str, str]]
|
|
tags: list[dict[str, str]]
|
|
parent: str | None
|
|
locations: list[dict[str, str]]
|
|
checklists: list[TaskRespCheckDict]
|
|
list: dict[str, str]
|
|
|
|
|
|
@dataclass(kw_only=True)
|
|
class ClickupTask:
|
|
"""fields marked with ``repr=False`` will not be pushed for updates"""
|
|
|
|
name: str
|
|
status: str
|
|
|
|
id: str | None = field(default=None, repr=False)
|
|
assignees: list[str] | None = field(default=None, repr=False)
|
|
tags: list[str] | None = field(default=None, repr=False)
|
|
parent_list: str = field(repr=False)
|
|
locations: list[str] | None = field(default=None, repr=False)
|
|
checklists: dict[str, dict[str, bool]] | None = field(default=None, repr=False)
|
|
parent: str | None = field(default=None, repr=False)
|
|
|
|
markdown_content: str
|
|
|
|
@classmethod
|
|
def from_resp_data(cls, resp_data_raw: dict[str, Any]) -> Self:
|
|
resp_data = TaskRespDict(**resp_data_raw)
|
|
|
|
return cls(
|
|
name=resp_data["name"],
|
|
markdown_content=resp_data["markdown_description"],
|
|
status=resp_data["status"]["status"],
|
|
id=resp_data["id"],
|
|
assignees=cls.convert_assignees(resp_data["assignees"]),
|
|
tags=cls.convert_simple_list_with_names(resp_data["tags"]),
|
|
parent=resp_data.get("parent"),
|
|
parent_list=resp_data["list"]["id"],
|
|
locations=cls.convert_simple_list_with_names(resp_data["locations"]),
|
|
checklists={
|
|
tlist["name"]: {t["name"]: bool(t["resolved"]) for t in tlist["items"]}
|
|
for tlist in resp_data["checklists"]
|
|
},
|
|
)
|
|
|
|
@classmethod
|
|
def convert_checklists(cls, content: Sequence[Any]) -> dict[str, bool]:
|
|
return {
|
|
entry["name"]: entry["resolved"]
|
|
for checklist in content
|
|
for entry in checklist["items"]
|
|
}
|
|
|
|
@classmethod
|
|
def convert_assignees(cls, content: Sequence[str | dict[str, str]]) -> list[str]:
|
|
return [(e if isinstance(e, str) else e["username"]) for e in content]
|
|
|
|
@classmethod
|
|
def convert_simple_list_with_names(
|
|
cls,
|
|
content: Sequence[str | dict[str, str]],
|
|
) -> list[str]:
|
|
return [(e if isinstance(e, str) else e["name"]) for e in content]
|
|
|
|
@property
|
|
def updateables(self) -> dict[str, Any]:
|
|
return {f.name: getattr(self, f.name, None) for f in fields(type(self)) if f.repr}
|
|
|
|
@property
|
|
def showables(self) -> dict[str, Any]:
|
|
return {
|
|
dcf.name: getattr(self, dcf.name, ...)
|
|
for dcf in fields(self)
|
|
if getattr(self, dcf.name, ...) is not None
|
|
}
|
|
|
|
@property
|
|
def short(self) -> str:
|
|
ret = ""
|
|
|
|
if self.parent:
|
|
ret += " \033[32m "
|
|
|
|
ret += self.name
|
|
|
|
if self.tags:
|
|
ret += "".join(f" #{t}" for t in self.tags)
|
|
|
|
ret += f" (#{self.id})"
|
|
|
|
return ret
|
|
|
|
|
|
@dataclass
|
|
class ClickupSession:
|
|
auth_key: str = field(
|
|
default_factory=EnvVar(
|
|
"CLICKUP_AUTH",
|
|
"clickup auth token is required to be set",
|
|
)
|
|
)
|
|
workspace_id: str = field(
|
|
default_factory=EnvVar(
|
|
"CLICKUP_WORKSPACE_ID",
|
|
"clickup workspace id is required to be set",
|
|
)
|
|
)
|
|
user_id: str = field(
|
|
default_factory=EnvVar(
|
|
"CLICKUP_USER_ID",
|
|
"clickup user id is required to be set",
|
|
)
|
|
)
|
|
base_url: str = "https://api.clickup.com/api/v2"
|
|
|
|
def _get(self, endpoint: str, **query_params: str) -> JSONDataMap:
|
|
try:
|
|
raw_data = get(
|
|
self.base_url + endpoint,
|
|
query_params,
|
|
headers={
|
|
"accept": "application/json",
|
|
"Authorization": self.auth_key,
|
|
},
|
|
)
|
|
return loads(raw_data)
|
|
except HTTPError as e:
|
|
e.add_note(e.url)
|
|
raise
|
|
|
|
def _put(self, endpoint: str, **body_params: str) -> JSONDataMap:
|
|
raw_data = put(
|
|
self.base_url + endpoint,
|
|
body_params,
|
|
headers={
|
|
"accept": "application/json",
|
|
"content-type": "application/json",
|
|
"Authorization": self.auth_key,
|
|
},
|
|
)
|
|
return loads(raw_data)
|
|
|
|
def _post(self, endpoint: str, **body_params: str) -> JSONDataMap:
|
|
raw_data = post(
|
|
self.base_url + endpoint,
|
|
body_params,
|
|
headers={
|
|
"accept": "application/json",
|
|
"content-type": "application/json",
|
|
"Authorization": self.auth_key,
|
|
},
|
|
)
|
|
return loads(raw_data)
|
|
|
|
def _delete(self, endpoint: str) -> JSONDataMap:
|
|
raw_data = delete(
|
|
self.base_url + endpoint,
|
|
headers={
|
|
"accept": "application/json",
|
|
"content-type": "application/json",
|
|
"Authorization": self.auth_key,
|
|
},
|
|
)
|
|
return loads(raw_data)
|
|
|
|
def get_tasks(self, **filters: str) -> list[ClickupTask]:
|
|
data = self._get(
|
|
f"/team/{self.workspace_id}/task",
|
|
**{
|
|
"subtask": "true",
|
|
"include_markdown_description": "true",
|
|
"assignees[]": self.user_id,
|
|
}
|
|
| filters,
|
|
).get("tasks", [])
|
|
if isinstance(data, list):
|
|
return [ClickupTask.from_resp_data(t) for t in data if isinstance(t, dict)]
|
|
|
|
return []
|
|
|
|
def get_task(self, task_id: str) -> ClickupTask:
|
|
return ClickupTask.from_resp_data(
|
|
self._get(
|
|
f"/task/{task_id}",
|
|
include_markdown_description="true",
|
|
),
|
|
)
|
|
|
|
def update(self, data: ClickupTask) -> None:
|
|
ret_task = self._put(f"/task/{data.id}", **data.updateables)
|
|
current_tags: set[str] = set(ret_task["tags"])
|
|
new_tags = set(data.tags)
|
|
|
|
for del_tag in current_tags - new_tags:
|
|
self._delete(f"/task/{data.id}/tag/{del_tag}")
|
|
|
|
for add_tag in new_tags - current_tags:
|
|
self._post(f"/task/{data.id}/tag/{add_tag}")
|
|
|
|
def create(self, data: ClickupTask) -> str:
|
|
ret_task = self._post(f"/list/{data.parent_list}/task", **data.updateables)
|
|
return str(ret_task["id"])
|