Compare commits

..

No commits in common. "82afdef873221569f0935f43c293663951742cc7" and "2dca8c833cf66085145ba8a52144fcc0a24958fb" have entirely different histories.

7 changed files with 70 additions and 574 deletions

View File

@ -18,25 +18,3 @@ return {
build = function(_) vim.cmd "UpdateRemotePlugins" end,
}
```
## Setup
### Shell Configuration
#### Environment Variables
CLICKUP_AUTH
: user api token for clickup
CLICKUP_USER_ID
: user id for clickup
CLICKUP_WORKSPACE_ID
: workspace id for clickup
## Commands
### `:MrPy`
- start task selector of user's tasks (not done/closed)

View File

@ -1,295 +0,0 @@
local curl = require("plenary.curl")
local M = {}
---@param data table
---@return string | nil
M.table_to_yaml = function(data)
local json = vim.json.encode(data)
-- Nutzt 'yq' um JSON zu YAML zu konvertieren
local handle = io.popen("echo '" .. json .. "' | yq -P e '. ' -")
local result = handle:read("*a")
handle:close()
if not result then
print("Error: yq not installed or got invalid json data")
return nil
end
return result
end
---@param yaml_string string
---@return table | nil
M.yaml_to_table = function(yaml_string)
-- 1. YAML String in yq einspeisen und JSON ausgeben lassen
-- Das Flag -o=json sorgt für die Konvertierung
local temp_file = os.tmpname()
local temp_file_handle = io.open(temp_file, "w+b")
if not temp_file_handle then
print("Error: could not open temp file")
print(" temp_file: " .. temp_file)
return nil
end
temp_file_handle:write(yaml_string)
temp_file_handle:flush()
temp_file_handle:close()
local cmd = "yq '.' " .. temp_file
local handle, errres = io.popen(cmd, 'r')
local json_result = handle:read("*a")
handle:close()
os.remove(temp_file)
-- 2. Fehlerprüfung
if json_result == "" then
print("Error: yq not installed or got invalid yaml data")
print(" cmd: " .. cmd)
print(" err: " .. errres)
return nil
end
-- 3. JSON in Lua-Table umwandeln
local ok, table_result = pcall(vim.json.decode, json_result)
if not ok then
print("Error: failed to decode json")
print(" cmd: " .. cmd)
print(" json: " .. json_result)
return nil
end
return table_result
end
---@class ClickupSession
---@field auth string
---@field user string
---@field workspace string
---@field base_url string
---@type ClickupSession
M.session = nil
---@class ClickupRef
---@field name string
---@field id string
---@class ClickupTask
---@field id string
---@field name string
---@field tags? table[]
---@field locations? table[]
---@field list? ClickupRef
---@field parent? string | nil
---@field [string] string
---@return ClickupTask[]
M.latest_tasks = function()
local resp = curl.get(M.session.base_url ..
"/team/" .. M.session.workspace .. "/task?subtasks=true&include_markdown_description=true&assignees[]=" ..
M.session.user, {
headers = {
['Authorization'] = M.session.auth,
["accept"] = "application/json",
["content-type"] = "application/json",
}
})
if resp.status == 200 then
return vim.json.decode(resp.body).tasks
end
print("failed http request: " .. tostring(resp.status) .. " (" .. resp.body .. ")")
return {}
end
---@param task ClickupTask
---@return ClickupTask
M._update_task = function(task)
local update_table = {}
for _, k in ipairs({ "name", "markdown_content", "status" }) do
if task[k] then
update_table[k] = task[k]
end
end
local resp = curl.put(M.session.base_url .. "/task/" .. task.id, {
headers = {
['Authorization'] = M.session.auth,
["accept"] = "application/json",
["content-type"] = "application/json",
},
body = vim.json.encode(update_table)
})
if resp.status ~= 200 then
error("failed to update " .. task.id .. "\n(" .. resp.body .. ")")
end
return vim.json.decode(resp.body)
end
---@param args vim.api.keyset.create_autocmd.callback_args
M._on_tempbuf_write = function(args)
local lines = vim.api.nvim_buf_get_lines(args.buf, 0, -1, false)
---@type string[]
local parts = {}
---@type string[]
local current = {}
for _, line in ipairs(lines) do
if line == "---" then
if #current then
table.insert(parts, table.concat(current, "\n"))
current = {}
end
else
table.insert(current, line)
end
end
table.insert(parts, table.concat(current, "\n"))
---@type ClickupTask
local data = vim.json.decode(parts[2])
data['markdown_content'] = string.gsub(parts[3], "%-%-%-\n", "")
M._update_task(data)
vim.api.nvim_set_option_value("modified", false, { buf = args.buf })
end
---@class SelectionItem
---@field status string
---@field parent string | nil
---@field list string
---@field name string
---@field description string
---@field tags string[]
---@field id string
---@param item SelectionItem
M._on_select_task = function(picker, item)
local new_name = "[ClickUp] " .. item.name
local new_buf_no = vim.api.nvim_create_buf(true, false)
local status, _ = pcall(vim.api.nvim_buf_set_name, new_buf_no, new_name)
if status then
-- vim.api.nvim_set_option_value("buftype", "nofile", { buf = new_buf_no })
vim.api.nvim_set_option_value("filetype", "markdown", { buf = new_buf_no })
vim.api.nvim_create_autocmd({ "BufWriteCmd" }, { buffer = new_buf_no, callback = M._on_tempbuf_write })
local content = { "---", "{" }
for _, k in ipairs({ "id", "name", "status", "tags", "list", "parent" }) do
if content[#content] ~= "{" then
content[#content] = content[#content] .. ","
end
table.insert(content, ' "' .. k .. '": ' .. vim.json.encode(item[k]))
end
table.insert(content, "}")
table.insert(content, "---")
for line in string.gmatch(item.description, "[^\r\n]+") do
table.insert(content, line)
end
vim.api.nvim_buf_set_lines(new_buf_no, 0, 0, false, content)
vim.api.nvim_set_option_value("modified", false, { buf = new_buf_no })
vim.api.nvim_win_set_buf(0, new_buf_no)
else
vim.api.nvim_buf_delete(new_buf_no, {})
picker:close()
end
end
---@param item SelectionItem
M._item_format = function(item, _)
local ret = {}
local hl = "SnacksPickerComment"
if item.status == "in progress" then
hl = "@method"
elseif item.status == "selected for development" then
hl = "@constant.builtin"
elseif item.status == "in review" then
hl = "@keyword"
elseif item.status == "done" then
hl = "@variable.builtin"
end
if item.parent ~= vim.NIL then
ret[#ret + 1] = { "󰘍 ", "SnacksPickerComment" }
end
ret[#ret + 1] = { item.name, hl }
for _, v in ipairs(item.tags) do
ret[#ret + 1] = { " #" .. v, "SnacksPickerComment" }
end
ret[#ret + 1] = {
" 󰻾" .. item.id,
"SnacksPickerComment"
}
return ret
end
---@param list table[]
---@param keys string[]
M.retrieve_subkeys = function(list, keys)
local ret = {}
local interm = list
for _, key in ipairs(keys) do
ret = {}
for _, item in ipairs(interm) do
table.insert(ret, item[key])
end
interm = ret
end
return ret
end
---@param data vim.api.keyset.create_user_command.command_args
M.select_task = function(data)
local tasks = M.latest_tasks()
---@type SelectionItem[]
local items = {}
for tix, t in ipairs(tasks) do
if string.sub(t.name, -7, -1) ~= "Absence" then
local preview_frontmatter = ""
---@type SelectionItem
local prepared = {
idx = tix,
id = t.id,
text = t.name .. t.id .. t.markdown_description,
name = t.name,
tags = M.retrieve_subkeys(t.tags, { "name" }),
status = t.status.status,
parent = t.parent,
list = t.list.name,
description = t.markdown_description,
preview = {
ft = "markdown",
},
action = M._on_select_task
}
for _, k in ipairs({ "id", "name", "status", "tags", "parent" }) do
preview_frontmatter = preview_frontmatter .. "\n" .. k .. ": " .. vim.json.encode(prepared[k])
end
prepared.preview.text = "---" .. preview_frontmatter .. "\n---\n" .. t.markdown_description
table.insert(items, #items + 1, prepared)
end
end
require("snacks.picker").pick({
title = "Select Task",
format = M._item_format,
preview = "preview",
confirm = M._on_select_task,
items = items
})
end
return M

View File

@ -1,10 +0,0 @@
local mrpy = require("mrpy")
vim.api.nvim_create_user_command("MrPy", mrpy.select_task, {})
mrpy.session = {
auth = os.getenv("CLICKUP_AUTH") or "",
user = os.getenv("CLICKUP_USER_ID") or "",
workspace = os.getenv("CLICKUP_WORKSPACE_ID") or "",
base_url = "https://api.clickup.com/api/v2",
}

View File

@ -6,20 +6,10 @@ from typing import Any, Callable, Self, TypedDict
from urllib.error import HTTPError
from .hints import JSONDataMap
from .requests import delete, get, put, post
from .requests import get, put
from .env import EnvVar
class TaskRespCheckItemDict(TypedDict):
name: str
resolved: int
class TaskRespCheckDict(TypedDict):
name: str
items: list[TaskRespCheckItemDict]
class TaskRespDict(TypedDict):
name: str
markdown_description: str
@ -30,7 +20,7 @@ class TaskRespDict(TypedDict):
tags: list[dict[str, str]]
parent: str | None
locations: list[dict[str, str]]
checklists: list[TaskRespCheckDict]
checklists: dict[str, bool]
list: dict[str, str]
@ -39,25 +29,24 @@ class ClickupTask:
"""fields marked with ``repr=False`` will not be pushed for updates"""
name: str
markdown_description: str
status: str
id: str | None = field(default=None, repr=False)
assignees: list[str] | None = field(default=None, repr=False)
tags: list[str] | None = field(default=None, repr=False)
id: str = field(repr=False)
assignees: list[str] = field(repr=False)
tags: list[str] = field(repr=False)
parent_list: str = field(repr=False)
locations: list[str] | None = field(default=None, repr=False)
checklists: dict[str, dict[str, bool]] | None = field(default=None, repr=False)
locations: list[str] = field(repr=False)
checklists: dict[str, bool] = field(repr=False)
parent: str | None = field(default=None, repr=False)
markdown_content: str
@classmethod
def from_resp_data(cls, resp_data_raw: dict[str, Any]) -> Self:
resp_data = TaskRespDict(**resp_data_raw)
return cls(
name=resp_data["name"],
markdown_content=resp_data["markdown_description"],
markdown_description=resp_data["markdown_description"],
status=resp_data["status"]["status"],
id=resp_data["id"],
assignees=cls.convert_assignees(resp_data["assignees"]),
@ -65,10 +54,7 @@ class ClickupTask:
parent=resp_data.get("parent"),
parent_list=resp_data["list"]["id"],
locations=cls.convert_simple_list_with_names(resp_data["locations"]),
checklists={
tlist["name"]: {t["name"]: bool(t["resolved"]) for t in tlist["items"]}
for tlist in resp_data["checklists"]
},
checklists=resp_data["checklists"],
)
@classmethod
@ -92,14 +78,16 @@ class ClickupTask:
@property
def updateables(self) -> dict[str, Any]:
return {f.name: getattr(self, f.name, None) for f in fields(type(self)) if f.repr}
return {
f.name: getattr(self, f.name, None) for f in fields(type(self)) if f.repr
}
@property
def showables(self) -> dict[str, Any]:
return {
dcf.name: getattr(self, dcf.name, ...)
for dcf in fields(self)
if getattr(self, dcf.name, ...) is not None
f.name: getattr(self, f.name, None)
for f in fields(type(self))
if f.name != "markdown_description"
}
@property
@ -109,12 +97,7 @@ class ClickupTask:
if self.parent:
ret += " \033[32m 󰙅 "
ret += self.name
if self.tags:
ret += "".join(f" #{t}" for t in self.tags)
ret += f" (#{self.id})"
ret += f"{self.name} (#{self.id})"
return ret
@ -168,38 +151,9 @@ class ClickupSession:
)
return loads(raw_data)
def _post(self, endpoint: str, **body_params: str) -> JSONDataMap:
raw_data = post(
self.base_url + endpoint,
body_params,
headers={
"accept": "application/json",
"content-type": "application/json",
"Authorization": self.auth_key,
},
)
return loads(raw_data)
def _delete(self, endpoint: str) -> JSONDataMap:
raw_data = delete(
self.base_url + endpoint,
headers={
"accept": "application/json",
"content-type": "application/json",
"Authorization": self.auth_key,
},
)
return loads(raw_data)
def get_tasks(self, **filters: str) -> list[ClickupTask]:
def get_tasks(self) -> list[ClickupTask]:
data = self._get(
f"/team/{self.workspace_id}/task",
**{
"subtask": "true",
"include_markdown_description": "true",
"assignees[]": self.user_id,
}
| filters,
f"/team/{self.workspace_id}/task?subtasks=true&include_markdown_description=true&assignees[]={self.user_id}"
).get("tasks", [])
if isinstance(data, list):
return [ClickupTask.from_resp_data(t) for t in data if isinstance(t, dict)]
@ -215,16 +169,4 @@ class ClickupSession:
)
def update(self, data: ClickupTask) -> None:
ret_task = self._put(f"/task/{data.id}", **data.updateables)
current_tags: set[str] = set(ret_task["tags"])
new_tags = set(data.tags)
for del_tag in current_tags - new_tags:
self._delete(f"/task/{data.id}/tag/{del_tag}")
for add_tag in new_tags - current_tags:
self._post(f"/task/{data.id}/tag/{add_tag}")
def create(self, data: ClickupTask) -> str:
ret_task = self._post(f"/list/{data.parent_list}/task", **data.updateables)
return str(ret_task["id"])
_ = self._put(f"/task/{data.id}", **data.updateables)

View File

@ -1,11 +1,9 @@
from collections.abc import Callable, Sequence
from pathlib import Path
from typing import Any
from pynvim import Nvim, command, plugin
from pynvim.api import Buffer
from .clickup import ClickupSession, ClickupTask
from .clickup import ClickupSession
from .hints import JSONData
from .yaml import load, dump
@ -30,20 +28,18 @@ class MrPyPlugin:
"assignees": usernames_from_objs,
}
def select_task_id(self, tags: set[str] | None = None) -> None:
tasks = self.clickup.get_tasks(**({"tags[]": "&tags[]=".join(tags)} if tags else {}))
def select_task_id(self) -> None:
tasks = self.clickup.get_tasks()
task_names_by_id = [
{
"idx": tix + 1,
"id": t.id,
"text": t.short,
"name": t.name,
"tags": t.tags,
"status": t.status,
"is_child": bool(t.parent),
"preview": {
"text": dump(t.showables),
"ft": "yaml",
"text": f"---\n{dump(t.showables)}---\n{t.markdown_description}",
"ft": "markdown",
},
"action": f":Mrpy {t.id}",
}
@ -74,12 +70,8 @@ class MrPyPlugin:
ret[#ret + 1] = { item.name, hl }
for _, v in ipairs(item.tags) do
ret[#ret + 1] = { " #" .. v, "SnacksPickerComment" }
end
ret[#ret + 1] = {
" 󰻾" .. item.id,
" (#" .. item.id .. ")",
"SnacksPickerComment"
}
return ret
@ -99,36 +91,34 @@ class MrPyPlugin:
task = self.clickup.get_task(task_id)
temp_buf: Buffer = self.nvim.api.create_buf(True, False)
self.nvim.api.buf_set_name(temp_buf, f"[ClickUp] {task.name}")
self.nvim.buffers[temp_buf.number].options["filetype"] = "yaml"
self.nvim.buffers[temp_buf.number].options["filetype"] = "markdown"
self.nvim.api.create_autocmd(
["BufWriteCmd"],
{"buffer": temp_buf.number, "command": "MrpyPush " + str(temp_buf.number)},
)
content = [
f"# yaml-language-server: $schema={Path(__file__).parent.parent.parent.parent}/schema.json ",
"---",
]
content = ["---"]
content.extend(
dump(
task.showables,
).splitlines()
)
content.append("---")
content.extend(task.markdown_description.splitlines())
self.nvim.api.buf_set_lines(temp_buf, 0, 0, False, content)
self.nvim.buffers[temp_buf.number].options["modified"] = False
self.nvim.api.win_set_buf(0, temp_buf)
@command("Mrpy", nargs="*")
@command("Mrpy", nargs="?")
def entry(self, args: Sequence[str] = ()) -> None:
match args:
case (str() as task_id,):
try:
self.open_task_buffer(task_id)
except:
self.select_task_id({task_id})
case tags:
self.select_task_id(set(tags))
case ():
self.select_task_id()
case _:
pass
@command("MrpyPush", nargs="?")
def on_vbuf_write(self, args: Sequence[str] = ()) -> None:
@ -142,60 +132,14 @@ class MrPyPlugin:
try:
a = self.nvim.buffers[buf_no][0:-1]
fm = "\n".join(a)
_, fm, *text = "\n".join(a).split("---\n")
data = load(fm)
assert isinstance(data, dict)
data["markdown_content"] = data["markdown_content"].strip() + "\n"
data["markdown_description"] = "\n\n".join(text)
self.clickup.update(ClickupTask(**data))
self.nvim.err_write(str(data) + "\n")
# self.clickup.update(data)
self.nvim.buffers[buf_no].options["modified"] = False
except ValueError:
pass
@command("MrpyNew", nargs=0)
def on_new_task(self) -> None:
task = ClickupTask(name="", status="backlog", markdown_content="", parent_list="")
temp_buf: Buffer = self.nvim.api.create_buf(True, False)
self.nvim.api.buf_set_name(temp_buf, f"[ClickUp] New Task")
self.nvim.buffers[temp_buf.number].options["filetype"] = "yaml"
self.nvim.api.create_autocmd(
["BufWriteCmd"],
{"buffer": temp_buf.number, "command": "MrpyPushNew " + str(temp_buf.number)},
)
content = [
f"# yaml-language-server: $schema={Path(__file__).parent.parent.parent.parent}/schema.json ",
"---",
]
content.extend(
dump(
task.showables,
).splitlines()
)
self.nvim.api.buf_set_lines(temp_buf, 0, 0, False, content)
self.nvim.buffers[temp_buf.number].options["modified"] = False
self.nvim.api.win_set_buf(0, temp_buf)
@command("MrpyPushNew", nargs=1)
def on_new_vbuf_write(self, buf_no: Sequence[Any] = ()) -> None:
buf_no = int(buf_no[0])
try:
a = self.nvim.buffers[buf_no][0:-1]
fm = "\n".join(a)
data = load(fm)
assert isinstance(data, dict)
data["markdown_content"] = data["markdown_content"].strip() + "\n"
new_task = ClickupTask(**data)
new_task_id = self.clickup.create(new_task)
self.nvim.buffers[buf_no].options["modified"] = False
self.nvim.command("bdelete " + str(buf_no))
self.entry([new_task_id])
except ValueError:
pass

View File

@ -1,18 +1,12 @@
from json import dumps
from ssl import SSLContext, create_default_context
from urllib.error import HTTPError
from urllib.request import Request, urlopen
from urllib.parse import urlencode
def request(
def get(
url: str,
method: str,
query_params: dict[str, str] | None = None,
data: dict[str, str] | None = None,
headers: dict[str, str] | None = None,
) -> str:
try:
with urlopen(
Request(
url
@ -21,32 +15,12 @@ def request(
if query_params
else ""
),
dumps(data).encode() if data else None,
headers=headers or {},
method=method,
method="GET",
),
context=create_default_context(),
) as resp:
return resp.read().decode("utf-8")
except HTTPError as e:
e.add_note(e.url)
raise
def get(
url: str,
query_params: dict[str, str] | None = None,
headers: dict[str, str] | None = None,
) -> str:
return request(url, "GET", query_params=query_params, headers=headers)
def post(
url: str,
data: dict[str, str] | None = None,
headers: dict[str, str] | None = None,
) -> str:
return request(url, "POST", data=data, headers=headers)
def put(
@ -54,11 +28,13 @@ def put(
data: dict[str, str] | None = None,
headers: dict[str, str] | None = None,
) -> str:
return request(url, "PUT", data=data, headers=headers)
def delete(
url: str,
headers: dict[str, str] | None = None,
) -> str:
return request(url, "DELETE", headers=headers)
with urlopen(
Request(
url,
str(data).encode(),
headers=headers or {},
method="PUT",
),
context=create_default_context(),
) as resp:
return resp.read().decode("utf-8")

View File

@ -17,7 +17,7 @@ def dump_scalar(entry: hints.JSONDataScalar) -> str:
return f"{entry}\n"
def dump(obj: hints.JSONDataMap) -> str:
def dump(obj: dict[str, hints.JSONDataScalar | list[hints.JSONDataScalar]]) -> str:
ret = ""
for key, value in obj.items():
ret += key
@ -26,32 +26,11 @@ def dump(obj: hints.JSONDataMap) -> str:
match value:
case []:
ret += " []\n"
case {}:
ret += " {}\n"
case "":
ret += ' ""\n'
case list() as entries:
ret += "\n"
for entry in entries:
match entry:
case dict():
subdump = dump(entry)
ret += "\n".join(" " + l for l in subdump.splitlines())
case list():
pass
case _:
ret += f" - {dump_scalar(entry)}"
case dict() as substruct:
ret += "\n"
subdump = dump(substruct)
ret += "\n".join(" " + l for l in subdump.splitlines()) + "\n"
case str() as mlstring if key == "markdown_content":
if mlstring.strip() == "" or dump_scalar(mlstring).strip() == "null":
ret += ' ""\n'
else:
indented = "\n".join((" " + l) for l in dump_scalar(mlstring).splitlines())
ret += f" >\n{indented}\n"
case entry:
ret += f" {dump_scalar(entry)}"
@ -77,10 +56,6 @@ def load(content: str) -> hints.JSONData:
... key10:
... - str
... - "str"
... key11: >
... * a
... * b:
... * c
... '''
>>> yaml_parsed = load(yaml)
>>> compare = {
@ -94,8 +69,8 @@ def load(content: str) -> hints.JSONData:
... 'key8': False,
... 'key9': [1, 23.2],
... 'key10': ['str', 'str'],
... 'key11': "* a\\n* b:\\n * c",
... }
>>> {k: yaml_parsed.get(k, None) for k in compare if compare[k] != yaml_parsed[k]}
{}
"""
@ -103,34 +78,20 @@ def load(content: str) -> hints.JSONData:
ret = {}
key = None
value = None
mlstring = False
for line in sub(r"(:( >)?)[\s\n]*", r"\1\n ", dedent(content).strip()).splitlines():
if line.startswith("#") or line.strip() == "---":
continue
elif line.endswith(":"):
for line in sub(r":[\s\n]*", ":\n ", dedent(content).strip()).splitlines():
if line.endswith(":"):
if key:
ret[key] = value
value = None
mlstring = False
key = line.removesuffix(":")
elif line.endswith(": >"):
if key:
ret[key] = value
value = None
mlstring = True
key = line.removesuffix(": >")
elif line.startswith(" -") and not mlstring:
elif line.startswith(" -"):
value = value or []
try:
parsed = loads(line.removeprefix(" -").strip())
except:
parsed = loads('"' + line.removeprefix(" -").strip() + '"')
value.append(parsed)
elif line.startswith(" ") and mlstring:
value = value or ""
value += line.removeprefix(" ") + "\n"
else:
try:
value = loads(line.strip())