Compare commits

...

5 Commits

Author SHA1 Message Date
Patrick Nisble 82afdef873 add lua only version 2026-01-19 08:34:25 +01:00
Patrick Nisble 14e757dd77 more type support 2026-01-12 08:40:46 +01:00
Patrick Nisble 91fcb96619 simplify visualization 2026-01-12 08:37:51 +01:00
Patrick Nisble d6fc41fe05 extend possible in-/output 2026-01-12 08:37:31 +01:00
Patrick Nisble 93a39ad889 update requests for DRY 2026-01-12 08:37:16 +01:00
7 changed files with 574 additions and 70 deletions

View File

@ -18,3 +18,25 @@ return {
build = function(_) vim.cmd "UpdateRemotePlugins" end,
}
```
## Setup
### Shell Configuration
#### Environment Variables
CLICKUP_AUTH
: user api token for clickup
CLICKUP_USER_ID
: user id for clickup
CLICKUP_WORKSPACE_ID
: workspace id for clickup
## Commands
### `:MrPy`
- start task selector of user's tasks (not done/closed)

295
lua/mrpy.lua Normal file
View File

@ -0,0 +1,295 @@
local curl = require("plenary.curl")
local M = {}
---@param data table
---@return string | nil
M.table_to_yaml = function(data)
local json = vim.json.encode(data)
-- Nutzt 'yq' um JSON zu YAML zu konvertieren
local handle = io.popen("echo '" .. json .. "' | yq -P e '. ' -")
local result = handle:read("*a")
handle:close()
if not result then
print("Error: yq not installed or got invalid json data")
return nil
end
return result
end
---@param yaml_string string
---@return table | nil
M.yaml_to_table = function(yaml_string)
-- 1. YAML String in yq einspeisen und JSON ausgeben lassen
-- Das Flag -o=json sorgt für die Konvertierung
local temp_file = os.tmpname()
local temp_file_handle = io.open(temp_file, "w+b")
if not temp_file_handle then
print("Error: could not open temp file")
print(" temp_file: " .. temp_file)
return nil
end
temp_file_handle:write(yaml_string)
temp_file_handle:flush()
temp_file_handle:close()
local cmd = "yq '.' " .. temp_file
local handle, errres = io.popen(cmd, 'r')
local json_result = handle:read("*a")
handle:close()
os.remove(temp_file)
-- 2. Fehlerprüfung
if json_result == "" then
print("Error: yq not installed or got invalid yaml data")
print(" cmd: " .. cmd)
print(" err: " .. errres)
return nil
end
-- 3. JSON in Lua-Table umwandeln
local ok, table_result = pcall(vim.json.decode, json_result)
if not ok then
print("Error: failed to decode json")
print(" cmd: " .. cmd)
print(" json: " .. json_result)
return nil
end
return table_result
end
---@class ClickupSession
---@field auth string
---@field user string
---@field workspace string
---@field base_url string
---@type ClickupSession
M.session = nil
---@class ClickupRef
---@field name string
---@field id string
---@class ClickupTask
---@field id string
---@field name string
---@field tags? table[]
---@field locations? table[]
---@field list? ClickupRef
---@field parent? string | nil
---@field [string] string
---@return ClickupTask[]
M.latest_tasks = function()
local resp = curl.get(M.session.base_url ..
"/team/" .. M.session.workspace .. "/task?subtasks=true&include_markdown_description=true&assignees[]=" ..
M.session.user, {
headers = {
['Authorization'] = M.session.auth,
["accept"] = "application/json",
["content-type"] = "application/json",
}
})
if resp.status == 200 then
return vim.json.decode(resp.body).tasks
end
print("failed http request: " .. tostring(resp.status) .. " (" .. resp.body .. ")")
return {}
end
---@param task ClickupTask
---@return ClickupTask
M._update_task = function(task)
local update_table = {}
for _, k in ipairs({ "name", "markdown_content", "status" }) do
if task[k] then
update_table[k] = task[k]
end
end
local resp = curl.put(M.session.base_url .. "/task/" .. task.id, {
headers = {
['Authorization'] = M.session.auth,
["accept"] = "application/json",
["content-type"] = "application/json",
},
body = vim.json.encode(update_table)
})
if resp.status ~= 200 then
error("failed to update " .. task.id .. "\n(" .. resp.body .. ")")
end
return vim.json.decode(resp.body)
end
---@param args vim.api.keyset.create_autocmd.callback_args
M._on_tempbuf_write = function(args)
local lines = vim.api.nvim_buf_get_lines(args.buf, 0, -1, false)
---@type string[]
local parts = {}
---@type string[]
local current = {}
for _, line in ipairs(lines) do
if line == "---" then
if #current then
table.insert(parts, table.concat(current, "\n"))
current = {}
end
else
table.insert(current, line)
end
end
table.insert(parts, table.concat(current, "\n"))
---@type ClickupTask
local data = vim.json.decode(parts[2])
data['markdown_content'] = string.gsub(parts[3], "%-%-%-\n", "")
M._update_task(data)
vim.api.nvim_set_option_value("modified", false, { buf = args.buf })
end
---@class SelectionItem
---@field status string
---@field parent string | nil
---@field list string
---@field name string
---@field description string
---@field tags string[]
---@field id string
---@param item SelectionItem
M._on_select_task = function(picker, item)
local new_name = "[ClickUp] " .. item.name
local new_buf_no = vim.api.nvim_create_buf(true, false)
local status, _ = pcall(vim.api.nvim_buf_set_name, new_buf_no, new_name)
if status then
-- vim.api.nvim_set_option_value("buftype", "nofile", { buf = new_buf_no })
vim.api.nvim_set_option_value("filetype", "markdown", { buf = new_buf_no })
vim.api.nvim_create_autocmd({ "BufWriteCmd" }, { buffer = new_buf_no, callback = M._on_tempbuf_write })
local content = { "---", "{" }
for _, k in ipairs({ "id", "name", "status", "tags", "list", "parent" }) do
if content[#content] ~= "{" then
content[#content] = content[#content] .. ","
end
table.insert(content, ' "' .. k .. '": ' .. vim.json.encode(item[k]))
end
table.insert(content, "}")
table.insert(content, "---")
for line in string.gmatch(item.description, "[^\r\n]+") do
table.insert(content, line)
end
vim.api.nvim_buf_set_lines(new_buf_no, 0, 0, false, content)
vim.api.nvim_set_option_value("modified", false, { buf = new_buf_no })
vim.api.nvim_win_set_buf(0, new_buf_no)
else
vim.api.nvim_buf_delete(new_buf_no, {})
picker:close()
end
end
---@param item SelectionItem
M._item_format = function(item, _)
local ret = {}
local hl = "SnacksPickerComment"
if item.status == "in progress" then
hl = "@method"
elseif item.status == "selected for development" then
hl = "@constant.builtin"
elseif item.status == "in review" then
hl = "@keyword"
elseif item.status == "done" then
hl = "@variable.builtin"
end
if item.parent ~= vim.NIL then
ret[#ret + 1] = { "󰘍 ", "SnacksPickerComment" }
end
ret[#ret + 1] = { item.name, hl }
for _, v in ipairs(item.tags) do
ret[#ret + 1] = { " #" .. v, "SnacksPickerComment" }
end
ret[#ret + 1] = {
" 󰻾" .. item.id,
"SnacksPickerComment"
}
return ret
end
---@param list table[]
---@param keys string[]
M.retrieve_subkeys = function(list, keys)
local ret = {}
local interm = list
for _, key in ipairs(keys) do
ret = {}
for _, item in ipairs(interm) do
table.insert(ret, item[key])
end
interm = ret
end
return ret
end
---@param data vim.api.keyset.create_user_command.command_args
M.select_task = function(data)
local tasks = M.latest_tasks()
---@type SelectionItem[]
local items = {}
for tix, t in ipairs(tasks) do
if string.sub(t.name, -7, -1) ~= "Absence" then
local preview_frontmatter = ""
---@type SelectionItem
local prepared = {
idx = tix,
id = t.id,
text = t.name .. t.id .. t.markdown_description,
name = t.name,
tags = M.retrieve_subkeys(t.tags, { "name" }),
status = t.status.status,
parent = t.parent,
list = t.list.name,
description = t.markdown_description,
preview = {
ft = "markdown",
},
action = M._on_select_task
}
for _, k in ipairs({ "id", "name", "status", "tags", "parent" }) do
preview_frontmatter = preview_frontmatter .. "\n" .. k .. ": " .. vim.json.encode(prepared[k])
end
prepared.preview.text = "---" .. preview_frontmatter .. "\n---\n" .. t.markdown_description
table.insert(items, #items + 1, prepared)
end
end
require("snacks.picker").pick({
title = "Select Task",
format = M._item_format,
preview = "preview",
confirm = M._on_select_task,
items = items
})
end
return M

10
plugin/mrpy.lua Normal file
View File

@ -0,0 +1,10 @@
local mrpy = require("mrpy")
vim.api.nvim_create_user_command("MrPy", mrpy.select_task, {})
mrpy.session = {
auth = os.getenv("CLICKUP_AUTH") or "",
user = os.getenv("CLICKUP_USER_ID") or "",
workspace = os.getenv("CLICKUP_WORKSPACE_ID") or "",
base_url = "https://api.clickup.com/api/v2",
}

View File

@ -6,10 +6,20 @@ from typing import Any, Callable, Self, TypedDict
from urllib.error import HTTPError
from .hints import JSONDataMap
from .requests import get, put
from .requests import delete, get, put, post
from .env import EnvVar
class TaskRespCheckItemDict(TypedDict):
name: str
resolved: int
class TaskRespCheckDict(TypedDict):
name: str
items: list[TaskRespCheckItemDict]
class TaskRespDict(TypedDict):
name: str
markdown_description: str
@ -20,7 +30,7 @@ class TaskRespDict(TypedDict):
tags: list[dict[str, str]]
parent: str | None
locations: list[dict[str, str]]
checklists: dict[str, bool]
checklists: list[TaskRespCheckDict]
list: dict[str, str]
@ -29,24 +39,25 @@ class ClickupTask:
"""fields marked with ``repr=False`` will not be pushed for updates"""
name: str
markdown_description: str
status: str
id: str = field(repr=False)
assignees: list[str] = field(repr=False)
tags: list[str] = field(repr=False)
id: str | None = field(default=None, repr=False)
assignees: list[str] | None = field(default=None, repr=False)
tags: list[str] | None = field(default=None, repr=False)
parent_list: str = field(repr=False)
locations: list[str] = field(repr=False)
checklists: dict[str, bool] = field(repr=False)
locations: list[str] | None = field(default=None, repr=False)
checklists: dict[str, dict[str, bool]] | None = field(default=None, repr=False)
parent: str | None = field(default=None, repr=False)
markdown_content: str
@classmethod
def from_resp_data(cls, resp_data_raw: dict[str, Any]) -> Self:
resp_data = TaskRespDict(**resp_data_raw)
return cls(
name=resp_data["name"],
markdown_description=resp_data["markdown_description"],
markdown_content=resp_data["markdown_description"],
status=resp_data["status"]["status"],
id=resp_data["id"],
assignees=cls.convert_assignees(resp_data["assignees"]),
@ -54,7 +65,10 @@ class ClickupTask:
parent=resp_data.get("parent"),
parent_list=resp_data["list"]["id"],
locations=cls.convert_simple_list_with_names(resp_data["locations"]),
checklists=resp_data["checklists"],
checklists={
tlist["name"]: {t["name"]: bool(t["resolved"]) for t in tlist["items"]}
for tlist in resp_data["checklists"]
},
)
@classmethod
@ -78,16 +92,14 @@ class ClickupTask:
@property
def updateables(self) -> dict[str, Any]:
return {
f.name: getattr(self, f.name, None) for f in fields(type(self)) if f.repr
}
return {f.name: getattr(self, f.name, None) for f in fields(type(self)) if f.repr}
@property
def showables(self) -> dict[str, Any]:
return {
f.name: getattr(self, f.name, None)
for f in fields(type(self))
if f.name != "markdown_description"
dcf.name: getattr(self, dcf.name, ...)
for dcf in fields(self)
if getattr(self, dcf.name, ...) is not None
}
@property
@ -97,7 +109,12 @@ class ClickupTask:
if self.parent:
ret += " \033[32m 󰙅 "
ret += f"{self.name} (#{self.id})"
ret += self.name
if self.tags:
ret += "".join(f" #{t}" for t in self.tags)
ret += f" (#{self.id})"
return ret
@ -151,9 +168,38 @@ class ClickupSession:
)
return loads(raw_data)
def get_tasks(self) -> list[ClickupTask]:
def _post(self, endpoint: str, **body_params: str) -> JSONDataMap:
raw_data = post(
self.base_url + endpoint,
body_params,
headers={
"accept": "application/json",
"content-type": "application/json",
"Authorization": self.auth_key,
},
)
return loads(raw_data)
def _delete(self, endpoint: str) -> JSONDataMap:
raw_data = delete(
self.base_url + endpoint,
headers={
"accept": "application/json",
"content-type": "application/json",
"Authorization": self.auth_key,
},
)
return loads(raw_data)
def get_tasks(self, **filters: str) -> list[ClickupTask]:
data = self._get(
f"/team/{self.workspace_id}/task?subtasks=true&include_markdown_description=true&assignees[]={self.user_id}"
f"/team/{self.workspace_id}/task",
**{
"subtask": "true",
"include_markdown_description": "true",
"assignees[]": self.user_id,
}
| filters,
).get("tasks", [])
if isinstance(data, list):
return [ClickupTask.from_resp_data(t) for t in data if isinstance(t, dict)]
@ -169,4 +215,16 @@ class ClickupSession:
)
def update(self, data: ClickupTask) -> None:
_ = self._put(f"/task/{data.id}", **data.updateables)
ret_task = self._put(f"/task/{data.id}", **data.updateables)
current_tags: set[str] = set(ret_task["tags"])
new_tags = set(data.tags)
for del_tag in current_tags - new_tags:
self._delete(f"/task/{data.id}/tag/{del_tag}")
for add_tag in new_tags - current_tags:
self._post(f"/task/{data.id}/tag/{add_tag}")
def create(self, data: ClickupTask) -> str:
ret_task = self._post(f"/list/{data.parent_list}/task", **data.updateables)
return str(ret_task["id"])

View File

@ -1,9 +1,11 @@
from collections.abc import Callable, Sequence
from pathlib import Path
from typing import Any
from pynvim import Nvim, command, plugin
from pynvim.api import Buffer
from .clickup import ClickupSession
from .clickup import ClickupSession, ClickupTask
from .hints import JSONData
from .yaml import load, dump
@ -28,18 +30,20 @@ class MrPyPlugin:
"assignees": usernames_from_objs,
}
def select_task_id(self) -> None:
tasks = self.clickup.get_tasks()
def select_task_id(self, tags: set[str] | None = None) -> None:
tasks = self.clickup.get_tasks(**({"tags[]": "&tags[]=".join(tags)} if tags else {}))
task_names_by_id = [
{
"idx": tix + 1,
"id": t.id,
"text": t.short,
"name": t.name,
"tags": t.tags,
"status": t.status,
"is_child": bool(t.parent),
"preview": {
"text": f"---\n{dump(t.showables)}---\n{t.markdown_description}",
"ft": "markdown",
"text": dump(t.showables),
"ft": "yaml",
},
"action": f":Mrpy {t.id}",
}
@ -70,8 +74,12 @@ class MrPyPlugin:
ret[#ret + 1] = { item.name, hl }
for _, v in ipairs(item.tags) do
ret[#ret + 1] = { " #" .. v, "SnacksPickerComment" }
end
ret[#ret + 1] = {
" (#" .. item.id .. ")",
" 󰻾" .. item.id,
"SnacksPickerComment"
}
return ret
@ -91,34 +99,36 @@ class MrPyPlugin:
task = self.clickup.get_task(task_id)
temp_buf: Buffer = self.nvim.api.create_buf(True, False)
self.nvim.api.buf_set_name(temp_buf, f"[ClickUp] {task.name}")
self.nvim.buffers[temp_buf.number].options["filetype"] = "markdown"
self.nvim.buffers[temp_buf.number].options["filetype"] = "yaml"
self.nvim.api.create_autocmd(
["BufWriteCmd"],
{"buffer": temp_buf.number, "command": "MrpyPush " + str(temp_buf.number)},
)
content = ["---"]
content = [
f"# yaml-language-server: $schema={Path(__file__).parent.parent.parent.parent}/schema.json ",
"---",
]
content.extend(
dump(
task.showables,
).splitlines()
)
content.append("---")
content.extend(task.markdown_description.splitlines())
self.nvim.api.buf_set_lines(temp_buf, 0, 0, False, content)
self.nvim.buffers[temp_buf.number].options["modified"] = False
self.nvim.api.win_set_buf(0, temp_buf)
@command("Mrpy", nargs="?")
@command("Mrpy", nargs="*")
def entry(self, args: Sequence[str] = ()) -> None:
match args:
case (str() as task_id,):
self.open_task_buffer(task_id)
case ():
self.select_task_id()
case _:
pass
try:
self.open_task_buffer(task_id)
except:
self.select_task_id({task_id})
case tags:
self.select_task_id(set(tags))
@command("MrpyPush", nargs="?")
def on_vbuf_write(self, args: Sequence[str] = ()) -> None:
@ -132,14 +142,60 @@ class MrPyPlugin:
try:
a = self.nvim.buffers[buf_no][0:-1]
_, fm, *text = "\n".join(a).split("---\n")
fm = "\n".join(a)
data = load(fm)
assert isinstance(data, dict)
data["markdown_description"] = "\n\n".join(text)
data["markdown_content"] = data["markdown_content"].strip() + "\n"
self.nvim.err_write(str(data) + "\n")
# self.clickup.update(data)
self.clickup.update(ClickupTask(**data))
self.nvim.buffers[buf_no].options["modified"] = False
except ValueError:
pass
@command("MrpyNew", nargs=0)
def on_new_task(self) -> None:
task = ClickupTask(name="", status="backlog", markdown_content="", parent_list="")
temp_buf: Buffer = self.nvim.api.create_buf(True, False)
self.nvim.api.buf_set_name(temp_buf, f"[ClickUp] New Task")
self.nvim.buffers[temp_buf.number].options["filetype"] = "yaml"
self.nvim.api.create_autocmd(
["BufWriteCmd"],
{"buffer": temp_buf.number, "command": "MrpyPushNew " + str(temp_buf.number)},
)
content = [
f"# yaml-language-server: $schema={Path(__file__).parent.parent.parent.parent}/schema.json ",
"---",
]
content.extend(
dump(
task.showables,
).splitlines()
)
self.nvim.api.buf_set_lines(temp_buf, 0, 0, False, content)
self.nvim.buffers[temp_buf.number].options["modified"] = False
self.nvim.api.win_set_buf(0, temp_buf)
@command("MrpyPushNew", nargs=1)
def on_new_vbuf_write(self, buf_no: Sequence[Any] = ()) -> None:
buf_no = int(buf_no[0])
try:
a = self.nvim.buffers[buf_no][0:-1]
fm = "\n".join(a)
data = load(fm)
assert isinstance(data, dict)
data["markdown_content"] = data["markdown_content"].strip() + "\n"
new_task = ClickupTask(**data)
new_task_id = self.clickup.create(new_task)
self.nvim.buffers[buf_no].options["modified"] = False
self.nvim.command("bdelete " + str(buf_no))
self.entry([new_task_id])
except ValueError:
pass

View File

@ -1,5 +1,36 @@
from json import dumps
from ssl import SSLContext, create_default_context
from urllib.error import HTTPError
from urllib.request import Request, urlopen
from urllib.parse import urlencode
def request(
url: str,
method: str,
query_params: dict[str, str] | None = None,
data: dict[str, str] | None = None,
headers: dict[str, str] | None = None,
) -> str:
try:
with urlopen(
Request(
url
+ (
("?" + "&".join(f"{k}={v}" for k, v in query_params.items()))
if query_params
else ""
),
dumps(data).encode() if data else None,
headers=headers or {},
method=method,
),
context=create_default_context(),
) as resp:
return resp.read().decode("utf-8")
except HTTPError as e:
e.add_note(e.url)
raise
def get(
@ -7,20 +38,15 @@ def get(
query_params: dict[str, str] | None = None,
headers: dict[str, str] | None = None,
) -> str:
with urlopen(
Request(
url
+ (
("?" + "&".join(f"{k}={v}" for k, v in query_params.items()))
if query_params
else ""
),
headers=headers or {},
method="GET",
),
context=create_default_context(),
) as resp:
return resp.read().decode("utf-8")
return request(url, "GET", query_params=query_params, headers=headers)
def post(
url: str,
data: dict[str, str] | None = None,
headers: dict[str, str] | None = None,
) -> str:
return request(url, "POST", data=data, headers=headers)
def put(
@ -28,13 +54,11 @@ def put(
data: dict[str, str] | None = None,
headers: dict[str, str] | None = None,
) -> str:
with urlopen(
Request(
url,
str(data).encode(),
headers=headers or {},
method="PUT",
),
context=create_default_context(),
) as resp:
return resp.read().decode("utf-8")
return request(url, "PUT", data=data, headers=headers)
def delete(
url: str,
headers: dict[str, str] | None = None,
) -> str:
return request(url, "DELETE", headers=headers)

View File

@ -17,7 +17,7 @@ def dump_scalar(entry: hints.JSONDataScalar) -> str:
return f"{entry}\n"
def dump(obj: dict[str, hints.JSONDataScalar | list[hints.JSONDataScalar]]) -> str:
def dump(obj: hints.JSONDataMap) -> str:
ret = ""
for key, value in obj.items():
ret += key
@ -26,11 +26,32 @@ def dump(obj: dict[str, hints.JSONDataScalar | list[hints.JSONDataScalar]]) -> s
match value:
case []:
ret += " []\n"
case {}:
ret += " {}\n"
case "":
ret += ' ""\n'
case list() as entries:
ret += "\n"
for entry in entries:
ret += f" - {dump_scalar(entry)}"
match entry:
case dict():
subdump = dump(entry)
ret += "\n".join(" " + l for l in subdump.splitlines())
case list():
pass
case _:
ret += f" - {dump_scalar(entry)}"
case dict() as substruct:
ret += "\n"
subdump = dump(substruct)
ret += "\n".join(" " + l for l in subdump.splitlines()) + "\n"
case str() as mlstring if key == "markdown_content":
if mlstring.strip() == "" or dump_scalar(mlstring).strip() == "null":
ret += ' ""\n'
else:
indented = "\n".join((" " + l) for l in dump_scalar(mlstring).splitlines())
ret += f" >\n{indented}\n"
case entry:
ret += f" {dump_scalar(entry)}"
@ -56,6 +77,10 @@ def load(content: str) -> hints.JSONData:
... key10:
... - str
... - "str"
... key11: >
... * a
... * b:
... * c
... '''
>>> yaml_parsed = load(yaml)
>>> compare = {
@ -69,8 +94,8 @@ def load(content: str) -> hints.JSONData:
... 'key8': False,
... 'key9': [1, 23.2],
... 'key10': ['str', 'str'],
... 'key11': "* a\\n* b:\\n * c",
... }
>>> {k: yaml_parsed.get(k, None) for k in compare if compare[k] != yaml_parsed[k]}
{}
"""
@ -78,20 +103,34 @@ def load(content: str) -> hints.JSONData:
ret = {}
key = None
value = None
for line in sub(r":[\s\n]*", ":\n ", dedent(content).strip()).splitlines():
if line.endswith(":"):
mlstring = False
for line in sub(r"(:( >)?)[\s\n]*", r"\1\n ", dedent(content).strip()).splitlines():
if line.startswith("#") or line.strip() == "---":
continue
elif line.endswith(":"):
if key:
ret[key] = value
value = None
mlstring = False
key = line.removesuffix(":")
elif line.startswith(" -"):
elif line.endswith(": >"):
if key:
ret[key] = value
value = None
mlstring = True
key = line.removesuffix(": >")
elif line.startswith(" -") and not mlstring:
value = value or []
try:
parsed = loads(line.removeprefix(" -").strip())
except:
parsed = loads('"' + line.removeprefix(" -").strip() + '"')
value.append(parsed)
elif line.startswith(" ") and mlstring:
value = value or ""
value += line.removeprefix(" ") + "\n"
else:
try:
value = loads(line.strip())