add files
This commit is contained in:
@@ -0,0 +1,3 @@
|
||||
from .main import MrPyPlugin
|
||||
|
||||
__all__ = ["MrPyPlugin"]
|
||||
@@ -0,0 +1,114 @@
|
||||
from typing import Any
|
||||
from pydantic import AliasChoices, AliasPath, BaseModel, Field, field_validator
|
||||
from pydantic_settings import BaseSettings
|
||||
from requests import get, put
|
||||
from ruamel.yaml import Node, Representer, ScalarNode
|
||||
|
||||
|
||||
class ClickupTask(BaseModel):
|
||||
"""fields marked with `exclude=True` will not be pushed for updates"""
|
||||
|
||||
name: str
|
||||
markdown_description: str
|
||||
status: str = Field(validation_alias=AliasPath("status", "status"))
|
||||
|
||||
id: str = Field(exclude=True)
|
||||
assignees: list[str] = Field(exclude=True)
|
||||
tags: list[str] = Field(exclude=True)
|
||||
parent: str | None = Field(None, exclude=True)
|
||||
parent_list: str = Field(validation_alias=AliasPath("list", "id"), exclude=True)
|
||||
locations: list[str] = Field(exclude=True)
|
||||
checklists: dict[str, bool] = Field(exclude=True)
|
||||
|
||||
@field_validator("checklists", mode="before")
|
||||
@classmethod
|
||||
def convert_checklists(cls, content: list[Any]) -> dict[str, bool]:
|
||||
return {
|
||||
entry["name"]: entry["resolved"]
|
||||
for checklist in content
|
||||
for entry in checklist["items"]
|
||||
}
|
||||
|
||||
@field_validator("assignees", mode="before")
|
||||
@classmethod
|
||||
def convert_assignees(cls, content: list[str | dict[str, Any]]) -> list[str]:
|
||||
return [(e if isinstance(e, str) else e["username"]) for e in content]
|
||||
|
||||
@field_validator("tags", "locations", mode="before")
|
||||
@classmethod
|
||||
def convert_simple_list_with_names(
|
||||
cls,
|
||||
content: list[str | dict[str, Any]],
|
||||
) -> list[str]:
|
||||
return [(e if isinstance(e, str) else e["name"]) for e in content]
|
||||
|
||||
@property
|
||||
def updateables(self) -> dict[str, Any]:
|
||||
return {
|
||||
k: getattr(self, k, None)
|
||||
for k, v in type(self).model_fields.items()
|
||||
if not v.exclude
|
||||
}
|
||||
|
||||
@property
|
||||
def showables(self) -> dict[str, Any]:
|
||||
return {
|
||||
k: getattr(self, k, None)
|
||||
for k in type(self).model_fields
|
||||
if k != "markdown_description"
|
||||
}
|
||||
|
||||
@property
|
||||
def short(self) -> str:
|
||||
ret = ""
|
||||
|
||||
if self.parent:
|
||||
ret += " \033[32m "
|
||||
|
||||
ret += f"{self.name} (#{self.id})"
|
||||
|
||||
return ret
|
||||
|
||||
|
||||
class ClickupSession(BaseSettings):
|
||||
auth_key: str = Field(alias="CLICKUP_AUTH", default=...)
|
||||
workspace_id: str = Field(alias="CLICKUP_WORKSPACE_ID", default=...)
|
||||
user_id: str = Field(alias="CLICKUP_USER_ID", default=...)
|
||||
base_url: str = "https://api.clickup.com/api/v2"
|
||||
|
||||
def _get(self, endpoint: str, **query_params: Any) -> dict[str, Any]:
|
||||
with get(
|
||||
self.base_url + endpoint,
|
||||
query_params,
|
||||
headers={
|
||||
"accept": "application/json",
|
||||
"Authorization": self.auth_key,
|
||||
},
|
||||
) as resp:
|
||||
return resp.json()
|
||||
|
||||
def _put(self, endpoint: str, **body_params: Any) -> dict[str, Any]:
|
||||
with put(
|
||||
self.base_url + endpoint,
|
||||
body_params,
|
||||
headers={
|
||||
"accept": "application/json",
|
||||
"content-type": "application/json",
|
||||
"Authorization": self.auth_key,
|
||||
},
|
||||
) as resp:
|
||||
return resp.json()
|
||||
|
||||
def get_tasks(self) -> list[ClickupTask]:
|
||||
data = self._get(
|
||||
f"/team/{self.workspace_id}/task?subtasks=true&include_markdown_description=true&assignees[]={self.user_id}"
|
||||
)
|
||||
return [ClickupTask.model_validate(t) for t in data["tasks"]]
|
||||
|
||||
def get_task(self, task_id: str) -> ClickupTask:
|
||||
return ClickupTask.model_validate(
|
||||
self._get(f"/task/{task_id}?include_markdown_description=true")
|
||||
)
|
||||
|
||||
def update(self, data: ClickupTask) -> None:
|
||||
_ = self._put(f"/task/{data.id}", **data.updateables)
|
||||
@@ -0,0 +1,145 @@
|
||||
from collections.abc import Callable, Sequence
|
||||
|
||||
from pynvim import Nvim, command, plugin
|
||||
from pynvim.api import Buffer
|
||||
|
||||
from .clickup import ClickupSession
|
||||
from .types import JSONData
|
||||
from .yaml import load, dump
|
||||
|
||||
|
||||
def usernames_from_objs(objs: JSONData) -> JSONData:
|
||||
assert isinstance(objs, list)
|
||||
return [k["username"] for k in objs if isinstance(k, dict)]
|
||||
|
||||
|
||||
@plugin
|
||||
class MrPyPlugin:
|
||||
nvim: Nvim
|
||||
clickup: ClickupSession
|
||||
frontmatter_keys: dict[str, Callable[[JSONData], JSONData]]
|
||||
|
||||
def __init__(self, nvim: Nvim) -> None:
|
||||
self.nvim = nvim
|
||||
self.clickup = ClickupSession()
|
||||
self.frontmatter_keys = {
|
||||
"id": str,
|
||||
"name": str,
|
||||
"assignees": usernames_from_objs,
|
||||
}
|
||||
|
||||
def select_task_id(self) -> None:
|
||||
tasks = self.clickup.get_tasks()
|
||||
task_names_by_id = [
|
||||
{
|
||||
"idx": tix + 1,
|
||||
"id": t.id,
|
||||
"name": t.name,
|
||||
"status": t.status,
|
||||
"is_child": bool(t.parent),
|
||||
"preview": {
|
||||
"text": f"---\n{dump(t.showables)}---\n{t.markdown_description}",
|
||||
"ft": "markdown",
|
||||
},
|
||||
"action": f":Mrpy {t.id}",
|
||||
}
|
||||
for tix, t in enumerate(tasks)
|
||||
if not t.name.endswith("Absence")
|
||||
]
|
||||
|
||||
self.nvim.exec_lua(
|
||||
"""require('snacks').picker.pick({
|
||||
title="Select Task",
|
||||
format = function(item, _)
|
||||
local ret = {}
|
||||
|
||||
local hl = "SnacksPickerComment"
|
||||
if item.status == "in progress" then
|
||||
hl = "@method"
|
||||
elseif item.status == "selected for development" then
|
||||
hl = "@constant.builtin"
|
||||
elseif item.status == "in review" then
|
||||
hl = "@keyword"
|
||||
elseif item.status == "done" then
|
||||
hl = "@variable.builtin"
|
||||
end
|
||||
|
||||
if item.is_child == true then
|
||||
ret[#ret + 1] = { " ", "SnacksPickerComment" }
|
||||
end
|
||||
|
||||
ret[#ret + 1] = { item.name, hl }
|
||||
|
||||
ret[#ret + 1] = {
|
||||
" (#" .. item.id .. ")",
|
||||
"SnacksPickerComment"
|
||||
}
|
||||
return ret
|
||||
end,
|
||||
preview="preview",
|
||||
confirm=function(_, item) vim.cmd(("Mrpy %s"):format(item.id)) end,
|
||||
items=...
|
||||
})""",
|
||||
task_names_by_id,
|
||||
)
|
||||
|
||||
def open_task_buffer(self, task_id: str) -> None:
|
||||
if " " in task_id:
|
||||
*_, last = task_id.split(" ")
|
||||
task_id = last.removeprefix("(#").removesuffix(")")
|
||||
|
||||
task = self.clickup.get_task(task_id)
|
||||
temp_buf: Buffer = self.nvim.api.create_buf(True, False)
|
||||
self.nvim.api.buf_set_name(temp_buf, f"[ClickUp] {task.name}")
|
||||
self.nvim.buffers[temp_buf.number].options["filetype"] = "markdown"
|
||||
self.nvim.api.create_autocmd(
|
||||
["BufWriteCmd"],
|
||||
{"buffer": temp_buf.number, "command": "MrpyPush " + str(temp_buf.number)},
|
||||
)
|
||||
|
||||
content = ["---"]
|
||||
content.extend(
|
||||
dump(
|
||||
task.showables,
|
||||
).splitlines()
|
||||
)
|
||||
content.append("---")
|
||||
content.extend(task.markdown_description.splitlines())
|
||||
|
||||
self.nvim.api.buf_set_lines(temp_buf, 0, 0, False, content)
|
||||
self.nvim.buffers[temp_buf.number].options["modified"] = False
|
||||
self.nvim.api.win_set_buf(0, temp_buf)
|
||||
|
||||
@command("Mrpy", nargs="?")
|
||||
def entry(self, args: Sequence[str] = ()) -> None:
|
||||
match args:
|
||||
case (str() as task_id,):
|
||||
self.open_task_buffer(task_id)
|
||||
case ():
|
||||
self.select_task_id()
|
||||
case _:
|
||||
pass
|
||||
|
||||
@command("MrpyPush", nargs="?")
|
||||
def on_vbuf_write(self, args: Sequence[str] = ()) -> None:
|
||||
match args:
|
||||
case "0", *_:
|
||||
return
|
||||
case buf_no, *_:
|
||||
buf_no = int(buf_no)
|
||||
case _:
|
||||
return
|
||||
|
||||
try:
|
||||
a = self.nvim.buffers[buf_no][0:-1]
|
||||
_, fm, *text = "\n".join(a).split("---\n")
|
||||
data = load(fm)
|
||||
assert isinstance(data, dict)
|
||||
|
||||
data["markdown_description"] = "\n\n".join(text)
|
||||
|
||||
self.nvim.err_write(str(data) + "\n")
|
||||
# self.clickup.update(data)
|
||||
self.nvim.buffers[buf_no].options["modified"] = False
|
||||
except ValueError:
|
||||
pass
|
||||
@@ -0,0 +1,4 @@
|
||||
type JSONDataScalar = str | None | float | bool
|
||||
type JSONDataList = list[JSONDataScalar | "JSONDataMap" | "JSONDataList"]
|
||||
type JSONDataMap = dict[str, JSONDataScalar | JSONDataList | "JSONDataMap"]
|
||||
type JSONData = JSONDataMap | JSONDataList
|
||||
@@ -0,0 +1,40 @@
|
||||
from io import StringIO
|
||||
from ruamel.yaml import YAML
|
||||
|
||||
from .types import JSONData, JSONDataScalar
|
||||
|
||||
yaml = YAML(typ="safe")
|
||||
|
||||
|
||||
def dump_scalar(entry: JSONDataScalar) -> str:
|
||||
match entry:
|
||||
case None:
|
||||
return "null\n"
|
||||
case True:
|
||||
return "true\n"
|
||||
case False:
|
||||
return "false\n"
|
||||
case _:
|
||||
return f"{entry:s}\n"
|
||||
|
||||
|
||||
def dump(obj: dict[str, JSONDataScalar | list[JSONDataScalar]]) -> str:
|
||||
ret = ""
|
||||
for key, value in obj.items():
|
||||
ret += key
|
||||
ret += ":"
|
||||
|
||||
match value:
|
||||
case list() as entries:
|
||||
ret += "\n"
|
||||
for entry in entries:
|
||||
ret += f" - {dump_scalar(entry)}"
|
||||
case entry:
|
||||
ret += f" {dump_scalar(entry)}"
|
||||
|
||||
return ret
|
||||
|
||||
|
||||
def load(content: str) -> JSONData:
|
||||
with StringIO(content) as sio:
|
||||
return yaml.load(sio) # pyright: ignore[reportAny, reportUnknownMemberType]
|
||||
Reference in New Issue
Block a user