Compare commits

...

9 Commits

Author SHA1 Message Date
Patrick Nisble 7c6c3d7223 rename 2026-01-27 11:06:23 +01:00
Patrick Nisble 518f2df5c8 start lsp proj 2026-01-21 08:39:51 +01:00
Patrick Nisble 975dd39437 add notifications 2026-01-20 08:59:28 +01:00
Patrick Nisble 8dc1d080d8 update install 2026-01-19 15:56:11 +01:00
Patrick Nisble 82afdef873 add lua only version 2026-01-19 08:34:25 +01:00
Patrick Nisble 14e757dd77 more type support 2026-01-12 08:40:46 +01:00
Patrick Nisble 91fcb96619 simplify visualization 2026-01-12 08:37:51 +01:00
Patrick Nisble d6fc41fe05 extend possible in-/output 2026-01-12 08:37:31 +01:00
Patrick Nisble 93a39ad889 update requests for DRY 2026-01-12 08:37:16 +01:00
16 changed files with 1177 additions and 77 deletions
+25 -3
View File
@@ -4,7 +4,7 @@ NeoVim plugin, integrating a clickup to gitlab workflow (1 task to 1 issue to 1
## Install
### Layzy.nvim
### Lazy.nvim
#### From Source
@@ -13,8 +13,30 @@ return {
dir = "<path/to/cloned/repo>",
dev = true,
dependencies = {
"MunifTanjim/nui.nvim",
"nvim-lua/plenary.nvim",
},
build = function(_) vim.cmd "UpdateRemotePlugins" end,
}
```
## Setup
### Shell Configuration
#### Environment Variables
CLICKUP_AUTH
: user api token for clickup
CLICKUP_USER_ID
: user id for clickup
CLICKUP_WORKSPACE_ID
: workspace id for clickup
## Commands
### `:MrPy`
- start task selector of user's tasks (not done/closed)
View File
+68
View File
@@ -0,0 +1,68 @@
from dataclasses import dataclass, field
from json import loads
from pydantic import BaseModel
from requests import HTTPError, get
from .common import EnvVar, JSONDataMap, JSONData, JSONDataList, JSONDataScalar
class ClickupTask(BaseModel):
id: str
name: str
@dataclass
class ClickupSession:
auth_key: str = field(
default_factory=EnvVar(
"CLICKUP_AUTH",
"clickup auth token is required to be set",
)
)
workspace_id: str = field(
default_factory=EnvVar(
"CLICKUP_WORKSPACE_ID",
"clickup workspace id is required to be set",
)
)
user_id: str = field(
default_factory=EnvVar(
"CLICKUP_USER_ID",
"clickup user id is required to be set",
)
)
base_url: str = "https://api.clickup.com/api/v2"
def _get(self, endpoint: str, **query_params: str) -> JSONDataMap:
with get(
self.base_url + endpoint,
query_params,
headers={
"accept": "application/json",
"Authorization": self.auth_key,
},
) as resp:
return resp.json()
def get_tasks(self, **filters: str) -> list[ClickupTask]:
data = self._get(
f"/team/{self.workspace_id}/task",
**{
"subtasks": "true",
"include_markdown_description": "true",
"assignees[]": self.user_id,
}
| filters,
).get("tasks", [])
if isinstance(data, list):
return [ClickupTask.model_validate(t) for t in data if isinstance(t, dict)]
return []
def get_task(self, task_id: str) -> ClickupTask:
return ClickupTask.model_validate(
self._get(
f"/task/{task_id}",
include_markdown_description="true",
),
)
+32
View File
@@ -0,0 +1,32 @@
from __future__ import annotations
from dataclasses import dataclass
from os import environ
from typing import TypeAlias
JSONDataScalar: TypeAlias = str | None | float | bool
JSONDataList: TypeAlias = list["JSONDataScalar | JSONDataMap | JSONDataList"]
JSONDataMap: TypeAlias = dict[str, "JSONDataScalar | JSONDataList | JSONDataMap"]
JSONData: TypeAlias = "JSONDataMap | JSONDataList"
@dataclass
class EnvVar:
"""
Environment Variable fetcher for use in dataclass ``field(default_factory=...)``
>>> @dataclass
>>> class SomeDataclass:
... field_name: str = field(default_factory=EnvVar("SOME_VAR_NAME", "err msg"))
"""
var_name: str
err_msg: str = ""
def __call__(self) -> str:
try:
return environ[self.var_name]
except KeyError as e:
e.add_note(self.err_msg)
raise
+90
View File
@@ -0,0 +1,90 @@
from lsprotocol.types import (
TEXT_DOCUMENT_COMPLETION,
TEXT_DOCUMENT_DOCUMENT_SYMBOL,
CompletionItem,
CompletionItemKind,
CompletionItemLabelDetails,
CompletionOptions,
CompletionParams,
DocumentSymbol,
DocumentSymbolParams,
NotebookDocumentSyncOptions,
Position,
Range,
SymbolKind,
TextDocumentSyncKind,
WorkDoneProgressBegin,
WorkDoneProgressEnd,
WorkDoneProgressReport,
)
from pygls.lsp.server import LanguageServer
from .clickup import ClickupSession, ClickupTask
class CustomServer(LanguageServer):
cache: dict[str, ClickupTask]
cu_session: ClickupSession
def __init__(
self,
name: str,
version: str,
text_document_sync_kind: TextDocumentSyncKind = TextDocumentSyncKind.Incremental,
notebook_document_sync: NotebookDocumentSyncOptions | None = None,
) -> None:
super().__init__(name, version, text_document_sync_kind, notebook_document_sync)
self.cache = {}
self.cu_session = ClickupSession()
self.update_task_cache()
def update_task_cache(self) -> None:
self.protocol.progress.begin(
"startup", WorkDoneProgressBegin("Fetching Cache ...", percentage=0, cancellable=True)
)
self.cache = {}
tasks = self.cu_session.get_ta()
for ti, t in enumerate(tasks):
self.cache[t.id] = t
self.protocol.progress.report(
"startup",
WorkDoneProgressReport(
message="Fetched Cache", percentage=int(100 * (1 + ti) / len(tasks))
),
)
self.protocol.progress.end("startup", WorkDoneProgressEnd(message="Done Caching"))
server = CustomServer("mrpy-server", "0.1.0")
@server.feature(TEXT_DOCUMENT_DOCUMENT_SYMBOL)
async def list_ids(params: DocumentSymbolParams) -> list[DocumentSymbol]:
return [
DocumentSymbol(
t.id,
SymbolKind.Enum,
Range(Position(i, 0), Position(i, 0)),
Range(Position(i, 0), Position(i, 0)),
detail=t.name,
)
for i, t in enumerate(server.cache.values())
]
@server.feature(TEXT_DOCUMENT_COMPLETION)
async def complete_cu_ids(params: CompletionParams) -> list[CompletionItem]:
return [
CompletionItem(
t.name,
CompletionItemLabelDetails(detail=f" #{t.id}"),
kind=CompletionItemKind.Constant,
insert_text=f"[{t.name} #{t.id}](https://app.clickup.com/t/{t.id})",
)
for t in server.cache.values()
]
def main() -> None:
server.start_io()
+103
View File
@@ -0,0 +1,103 @@
local curl = require("plenary.curl")
local helpers = require("eta.helpers")
local M = {}
---@class eta.clickup.Session: eta.Session
---@field user string
---@field workspace string
---@class eta.clickup.Ref
---@field name string
---@field id string
---@class eta.clickup.Dep
---@field task_id string
---@field depends_on string
---@class eta.clickup.Task
---@field id string
---@field name string
---@field tags? table[]
---@field locations? table[]
---@field list? eta.clickup.Ref
---@field parent? string | nil
---@field dependencies? eta.clickup.Dep[]
---@field [string] string
---@param self eta.clickup.Session
---@return eta.clickup.Task[]
M.latest_tasks = function(self)
local ret = helpers.request("get", self, "/team/" .. self.workspace .. "/task",{subtasks="true", include_markdown_description="true", ['assignees[]']= self.user})
if ret then
return ret.tasks
end
return {}
end
---@param self eta.clickup.Session
---@param id string
---@return eta.clickup.Task
M.task = function(self, id)
local ret = helpers.request("get", self, "/task/" .. id,{include_markdown_description="true"}) or {}
return ret
end
---@param self eta.clickup.Session
---@param id string
M.task_relations = function(self, id)
local ret = helpers.request("get", self, "/task/" .. id .. "/dependency", {}) or {}
return ret
end
M.insert_ref = function()
local pos = vim.api.nvim_win_get_cursor(0)
local row = pos[1] - 1
local col = pos[2]
local tasks = M.latest_tasks(require("plugin.eta").clickup_session)
---@type SelectionItem[]
local items = {}
for tix, t in ipairs(tasks) do
if string.sub(t.name, -7, -1) ~= "Absence" then
local preview_frontmatter = ""
---@type SelectionItem
local prepared = {
idx = tix,
id = t.id,
text = t.name .. t.id .. t.markdown_description,
name = t.name,
tags = require("plugin.eta").retrieve_subkeys(t.tags, { "name" }),
status = t.status.status,
parent = t.parent,
list = t.list.name,
description = t.markdown_description,
preview = {
ft = "markdown",
},
action = M._on_select_task
}
for _, k in ipairs({ "id", "name", "status", "tags", "parent" }) do
preview_frontmatter = preview_frontmatter .. "\n" .. k .. ": " .. vim.json.encode(prepared[k])
end
prepared.preview.text = "---" .. preview_frontmatter .. "\n---\n" .. t.markdown_description
table.insert(items, #items + 1, prepared)
end
end
require("snacks.picker").pick({
title = "Select Task",
format = require("plugin.eta")._item_format,
preview = "preview",
confirm = function(picker, item)
picker:close()
vim.api.nvim_buf_set_text(0, row, col, row, col,
{ "[#" .. item.id .. "](https://app.clickup.com/t/" .. item.id .. ")" })
end,
items = items
})
end
return M
+149
View File
@@ -0,0 +1,149 @@
local notify = require("snacks.notifier").notify
local helpers = require("eta.helpers")
local M = {}
---@class eta.gitlab.Session: eta.Session
---@field auth string personal access token `PRIVATE_TOKEN: <auth_token>`
---@class eta.gitlab.Namespace
---@field full_path string
---@class eta.gitlab.Project
---@field id number
---@field name string
---@field path_with_namespace string
---@field tag_list string[]
---@field text? string
---@field namespace eta.gitlab.Namespace
---@field preview? {ft: string, text: string}
---@class eta.gitlab.Milestone
---@field id number
---@field title string
---@class eta.gitlab.Assignee
---@field username string
---@field id number
---@class eta.gitlab.Issue
---@field id number
---@field milestone eta.gitlab.Milestone
---@field title string
---@field assignees eta.gitlab.Assignee[]
---@field description string
---@field labels string[]
---@param session eta.gitlab.Session
---@return eta.gitlab.Project[]
M.possible_projects = function(session)
return helpers.request("get", session, "/projects", {simple="true", per_page="100"}) or {}
end
---@param prj eta.gitlab.Project
M._project_format = function(prj)
local ret = {}
ret[#ret + 1] = { prj.namespace.full_path .. "/", "SnacksPickerComment" }
ret[#ret + 1] = { prj.name }
return ret
end
---@param self eta.gitlab.Session
---@return eta.gitlab.Project | nil
M.update_current_project = function(self)
local cmd = "git remote -v | grep fetch | cut -f2 | cut -d' ' -f1 | cut -d':' -f2"
local handle, _ = io.popen(cmd, 'r')
if not handle then return nil end
local repo = handle:read("*a")
handle:close()
local prjs = M.possible_projects(self)
for _, prj in ipairs(prjs) do
if prj.path_with_namespace == repo then
M.active_project = prj
notify("selected " .. prj.path_with_namespace, "info", { title = "ETA", style = 'fancy' })
return prj
end
end
end
---@type eta.gitlab.Project
M.active_project = nil
---@param picker snacks.Picker
---@param item eta.gitlab.Project
M._on_project_select = function(picker, item)
M.active_project = item
picker:close()
notify("selected " .. item.path_with_namespace, "info", { title = "ETA", style = 'fancy' })
end
---@param self eta.gitlab.Session
M.select_project = function(self)
local prjs = M.possible_projects(self)
require("snacks.picker").pick({
title = "Select Project",
format = M._project_format,
preview = "preview",
confirm = M._on_project_select,
items = prjs
})
end
---@param self eta.gitlab.Session
---@return eta.gitlab.Issue[]
M.active_issues = function(self)
if not M.active_project then
if not M.update_current_project(self) then
M.select_project(self)
end
end
return request("get", self, "/projects/" .. tostring(M.active_project.id) .. "/issues", {})
end
---@param self eta.gitlab.Session
---@param name string
---@param clickup_task_id string
---@param tags string[]
M.new_issue = function(self, name, clickup_task_id, tags)
if not M.active_project then
if not M.update_current_project(self) then
M.select_project(self)
end
end
local description = "%23" .. clickup_task_id
local title = name:gsub("%s", "%20")
return request("post", self, "/projects/" ..
tostring(M.active_project.id) ..
"/issues", {
title=title, description=description, labels=table.concat(tags, ",")
})
end
---@param self eta.gitlab.Session
---@param name string
---@param issue number
---@param clickup_task_id string
---@param tags string[]
M.new_mr = function(self, name, issue, clickup_task_id, tags)
if not M.active_project then
if not M.update_current_project(self) then
M.select_project(self)
end
end
local description = "Closes %23" .. tostring(issue) .. " | %23" .. clickup_task_id
local title = name:gsub("%s", "%20")
return request("post", self, "/projects/" .. tostring(M.active_project.id) .. "/merge_requests", {
title=title, description=description, labels=table.concat(tags, ",")
})
end
return M
+63
View File
@@ -0,0 +1,63 @@
local curl = require("plenary.curl")
local M = {}
---@class eta.Session
---@field auth string
---@field base_url string
---@param method `get` | `post`
---@param session eta.Session
---@param endpoint string
---@param params {[string]: string}
---@returns table | nil
M.request = function(method, session, endpoint, params)
local param_list = {}
for param_name, param_value in pairs(params) do
table.insert(param_list, param_name .. "=" .. param_value)
end
local url = session.base_url .. endpoint
if #param_list then
url = url .. "?" .. table.concat(param_list, "&")
end
local resp = nil
if method == "get" then
resp = curl.get(url, {headers = {
-- ['PRIVATE-TOKEN'] = session.auth,
['Authorization'] = session.auth,
["content-type"] = "application/json",
}})
elseif method == "post" then
resp = curl.post(url, {headers = {
-- ['PRIVATE-TOKEN'] = session.auth,
['Authorization'] = session.auth,
["content-type"] = "application/json",
}})
else
return nil
end
if resp.status == 200 then
local prjs = vim.json.decode(resp.body)
for _, prj in ipairs(prjs) do
local fcmd = "echo '" .. vim.json.encode(prj) .. "' | jq"
local fhandle, _ = io.popen(fcmd, 'r')
if not fhandle then goto continue end
local formatted = fhandle:read("*a")
fhandle:close()
prj.text = prj.path_with_namespace
prj.preview = {
ft = "json",
text = formatted
}
::continue::
end
return prjs
end
print("failed http request: " .. tostring(resp.status) .. " (" .. resp.body .. ", " .. url .. ")")
return nil
end
return M
+319
View File
@@ -0,0 +1,319 @@
local curl = require("plenary.curl")
local notify = require("snacks.notifier").notify
local gitlab = require("eta.gitlab")
local clickup = require("eta.clickup")
local M = {}
---@param data table
---@return string | nil
M.table_to_yaml = function(data)
local json = vim.json.encode(data)
-- Nutzt 'yq' um JSON zu YAML zu konvertieren
local handle = io.popen("echo '" .. json .. "' | yq -y")
local result = handle:read("*a")
handle:close()
if not result then
print("Error: yq not installed or got invalid json data")
return nil
end
return result
end
---@param yaml_string string
---@return table | nil
M.yaml_to_table = function(yaml_string)
-- 1. YAML String in yq einspeisen und JSON ausgeben lassen
-- Das Flag -o=json sorgt für die Konvertierung
local temp_file = os.tmpname()
local temp_file_handle = io.open(temp_file, "w+b")
if not temp_file_handle then
print("Error: could not open temp file")
print(" temp_file: " .. temp_file)
return nil
end
temp_file_handle:write(yaml_string)
temp_file_handle:flush()
temp_file_handle:close()
local cmd = "cat " .. temp_file .. " | yq"
local handle, errres = io.popen(cmd, 'r')
local json_result = handle:read("*a")
handle:close()
-- 2. Fehlerprüfung
if json_result == "" then
print("Error: yq not installed or got invalid yaml data")
print(" cmd: " .. cmd)
print(" err: " .. errres)
return nil
end
-- 3. JSON in Lua-Table umwandeln
local ok, table_result = pcall(vim.json.decode, json_result)
if not ok then
print("Error: failed to decode json")
print(" cmd: " .. cmd)
print(" json: " .. json_result)
return nil
end
os.remove(temp_file)
return table_result
end
---@type eta.clickup.Session
M.clickup_session = nil
---@type eta.gitlab.Session
M.gitlab_session = nil
---@param task eta.clickup.Task
---@return eta.clickup.Task
M._update_task = function(task)
local update_table = {}
for _, k in ipairs({ "name", "markdown_content", "status" }) do
if task[k] then
update_table[k] = task[k]
end
end
local resp = curl.put(M.clickup_session.base_url .. "/task/" .. task.id, {
headers = {
['Authorization'] = M.clickup_session.auth,
["accept"] = "application/json",
["content-type"] = "application/json",
},
body = vim.json.encode(update_table)
})
if resp.status ~= 200 then
error("failed to update " .. task.id .. "\n(" .. resp.body .. ")")
end
return vim.json.decode(resp.body)
end
---@param args vim.api.keyset.create_autocmd.callback_args
M._on_tempbuf_write = function(args)
local lines = vim.api.nvim_buf_get_lines(args.buf, 0, -1, false)
---@type string[]
local parts = {}
---@type string[]
local current = {}
for _, line in ipairs(lines) do
if line == "---" then
if #current then
table.insert(parts, table.concat(current, "\n"))
current = {}
end
else
table.insert(current, line)
end
end
table.insert(parts, table.concat(current, "\n"))
---@type eta.clickup.Task
local data = M.yaml_to_table(parts[2]) or {}
data['markdown_content'] = string.gsub(parts[3], "%-%-%-\n", "")
M._update_task(data)
vim.api.nvim_set_option_value("modified", false, { buf = args.buf })
local notif_id = notify("updated task #" .. data.id, "info", { title = "ETA", style = 'fancy' })
end
---@class SelectionItem
---@field status string
---@field parent string | nil
---@field list string
---@field name string
---@field description string
---@field tags string[]
---@field id string
---@param picker snacks.Picker | nil
---@param item SelectionItem
M._on_select_task = function(picker, item)
local notif_id = notify("opening task #" .. item.id, "info", { title = "ETA", style = 'fancy' })
local new_name = "[ClickUp] " .. item.name
local new_buf_no = vim.api.nvim_create_buf(true, false)
local status, _ = pcall(vim.api.nvim_buf_set_name, new_buf_no, new_name)
if status then
-- vim.api.nvim_set_option_value("buftype", "nofile", { buf = new_buf_no })
vim.api.nvim_set_option_value("filetype", "markdown", { buf = new_buf_no })
vim.api.nvim_create_autocmd({ "BufWriteCmd" }, { buffer = new_buf_no, callback = M._on_tempbuf_write })
local content = { "---"}
local to_dump = {}
for _, k in ipairs({ "id", "name", "status", "tags", "list", "parent", "dependencies" }) do
to_dump[k] = item[k]
end
local yaml_string = M.table_to_yaml(to_dump) or ""
for line in yaml_string:gmatch("([^\n]*)\n?") do
table.insert(content, line)
end
-- table.insert(content, "}")
table.insert(content, "---")
for line in string.gmatch(item.description, "[^\r\n]+") do
table.insert(content, line)
end
vim.api.nvim_buf_set_lines(new_buf_no, 0, 0, false, content)
vim.api.nvim_set_option_value("modified", false, { buf = new_buf_no })
vim.api.nvim_win_set_buf(0, new_buf_no)
else
vim.api.nvim_buf_delete(new_buf_no, {})
if picker then
picker:close()
end
local to_open = vim.fn.bufnr(new_name)
vim.api.nvim_win_set_buf(0, to_open)
end
end
---@param item SelectionItem
M._item_format = function(item, _)
local ret = {}
local hl = "SnacksPickerComment"
if item.status == "in progress" then
hl = "@method"
elseif item.status == "selected for development" then
hl = "@constant.builtin"
elseif item.status == "in review" then
hl = "@keyword"
elseif item.status == "done" then
hl = "@variable.builtin"
end
if item.parent ~= vim.NIL then
ret[#ret + 1] = { "󰘍 ", "SnacksPickerComment" }
end
ret[#ret + 1] = { item.name, hl }
for _, v in ipairs(item.tags) do
ret[#ret + 1] = { " #" .. v, "SnacksPickerComment" }
end
ret[#ret + 1] = {
" 󰻾" .. item.id,
"SnacksPickerComment"
}
return ret
end
---@param list table[]
---@param keys string[]
M.retrieve_subkeys = function(list, keys)
local ret = {}
local interm = list
for _, key in ipairs(keys) do
ret = {}
for _, item in ipairs(interm) do
table.insert(ret, item[key])
end
interm = ret
end
return ret
end
---@param id string
M.open_task = function(id)
if id:match("[a-z0-9]+") then
local notif_id = notify("opening #".. id, "info", { timeout = 1000, title = "ETA", style = 'fancy' })
local t = clickup.task(M.clickup_session, id)
---@type SelectionItem
local item = {
idx = nil,
id = t.id,
text = t.name .. t.id .. t.markdown_description,
name = t.name,
tags = M.retrieve_subkeys(t.tags, { "name" }),
status = t.status.status,
parent = t.parent,
list = t.list.name,
description = t.markdown_description,
preview = {
ft = "markdown",
},
action = nil
}
pcall(M._on_select_task,nil, item)
else
error("invalid id format")
end
end
---@param data vim.api.keyset.create_user_command.command_args
M.select_task = function(data)
local notif_id = notify("fetching tasks", "warn", { timeout = 10000, title = "ETA", style = 'fancy' })
if not pcall(M.open_task,vim.fn.expand("<cword>")) then
local tasks = clickup.latest_tasks(M.clickup_session)
---@type SelectionItem[]
local items = {}
for tix, t in ipairs(tasks) do
if string.sub(t.name, -7, -1) ~= "Absence" then
local preview_frontmatter = ""
local deps = {}
for _, dependency in ipairs(t.dependencies) do
table.insert(deps, dependency.depends_on)
end
---@type SelectionItem
local prepared = {
idx = tix,
id = t.id,
text = t.name .. t.id .. t.markdown_description,
name = t.name,
tags = M.retrieve_subkeys(t.tags, { "name" }),
status = t.status.status,
parent = t.parent,
list = t.list.name,
description = t.markdown_description,
dependencies = deps,
preview = {
ft = "markdown",
},
action = M._on_select_task
}
for _, k in ipairs({ "id", "name", "status", "tags", "parent"}) do
preview_frontmatter = preview_frontmatter .. "\n" .. k .. ": " .. vim.json.encode(prepared[k])
end
prepared.preview.text = "---" .. preview_frontmatter .. "\n---\n" .. t.markdown_description
table.insert(items, #items + 1, prepared)
end
end
require("snacks.notifier").hide(notif_id)
require("snacks.picker").pick({
title = "Select Task",
format = M._item_format,
preview = "preview",
confirm = M._on_select_task,
items = items
})
end
end
---@param data vim.api.keyset.create_user_command.command_args
M.mr_from_task = function(data)
if data.args then
notify(data.args, "info", { title = "ETA" })
end
end
return M
+18
View File
@@ -0,0 +1,18 @@
local eta = require("eta")
local eta_clickup = require("eta.clickup")
vim.api.nvim_create_user_command("ETA", eta.select_task, {})
---@type eta.clickup.Session
eta.clickup_session = {
auth = os.getenv("CLICKUP_AUTH") or "",
user = os.getenv("CLICKUP_USER_ID") or "",
workspace = os.getenv("CLICKUP_WORKSPACE_ID") or "",
base_url = "https://api.clickup.com/api/v2",
}
---@type eta.gitlab.Session
eta.gitlab_session = {
auth = os.getenv("GITLAB_AUTH") or "",
base_url = "https://git.extoll.de/api/v4",
}
+19 -4
View File
@@ -1,10 +1,25 @@
[build-system]
requires = ['setuptools>=57', "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "mrpy-nvim"
name = "eta-lsp"
version = "0.1.0"
description = "Add your description here"
description = "EXTOLL Task Access"
readme = "README.md"
requires-python = ">=3.11"
dependencies = [
"pynvim~=0.6.0",
"requests>=2.32.5",
"pygls~=2.0",
"pydantic~=2.12",
"requests~=2.32",
]
authors = [
{ name = "Patrick Nisble", email = "acereca@outlook.de"},
]
[project.scripts]
eta-lsp = "eta.main:main"
[tool.setuptools.packages.find]
include = ['mrpy']
namespaces = false
+78 -20
View File
@@ -6,10 +6,20 @@ from typing import Any, Callable, Self, TypedDict
from urllib.error import HTTPError
from .hints import JSONDataMap
from .requests import get, put
from .requests import delete, get, put, post
from .env import EnvVar
class TaskRespCheckItemDict(TypedDict):
name: str
resolved: int
class TaskRespCheckDict(TypedDict):
name: str
items: list[TaskRespCheckItemDict]
class TaskRespDict(TypedDict):
name: str
markdown_description: str
@@ -20,7 +30,7 @@ class TaskRespDict(TypedDict):
tags: list[dict[str, str]]
parent: str | None
locations: list[dict[str, str]]
checklists: dict[str, bool]
checklists: list[TaskRespCheckDict]
list: dict[str, str]
@@ -29,24 +39,25 @@ class ClickupTask:
"""fields marked with ``repr=False`` will not be pushed for updates"""
name: str
markdown_description: str
status: str
id: str = field(repr=False)
assignees: list[str] = field(repr=False)
tags: list[str] = field(repr=False)
id: str | None = field(default=None, repr=False)
assignees: list[str] | None = field(default=None, repr=False)
tags: list[str] | None = field(default=None, repr=False)
parent_list: str = field(repr=False)
locations: list[str] = field(repr=False)
checklists: dict[str, bool] = field(repr=False)
locations: list[str] | None = field(default=None, repr=False)
checklists: dict[str, dict[str, bool]] | None = field(default=None, repr=False)
parent: str | None = field(default=None, repr=False)
markdown_content: str
@classmethod
def from_resp_data(cls, resp_data_raw: dict[str, Any]) -> Self:
resp_data = TaskRespDict(**resp_data_raw)
return cls(
name=resp_data["name"],
markdown_description=resp_data["markdown_description"],
markdown_content=resp_data["markdown_description"],
status=resp_data["status"]["status"],
id=resp_data["id"],
assignees=cls.convert_assignees(resp_data["assignees"]),
@@ -54,7 +65,10 @@ class ClickupTask:
parent=resp_data.get("parent"),
parent_list=resp_data["list"]["id"],
locations=cls.convert_simple_list_with_names(resp_data["locations"]),
checklists=resp_data["checklists"],
checklists={
tlist["name"]: {t["name"]: bool(t["resolved"]) for t in tlist["items"]}
for tlist in resp_data["checklists"]
},
)
@classmethod
@@ -78,16 +92,14 @@ class ClickupTask:
@property
def updateables(self) -> dict[str, Any]:
return {
f.name: getattr(self, f.name, None) for f in fields(type(self)) if f.repr
}
return {f.name: getattr(self, f.name, None) for f in fields(type(self)) if f.repr}
@property
def showables(self) -> dict[str, Any]:
return {
f.name: getattr(self, f.name, None)
for f in fields(type(self))
if f.name != "markdown_description"
dcf.name: getattr(self, dcf.name, ...)
for dcf in fields(self)
if getattr(self, dcf.name, ...) is not None
}
@property
@@ -97,7 +109,12 @@ class ClickupTask:
if self.parent:
ret += " \033[32m 󰙅 "
ret += f"{self.name} (#{self.id})"
ret += self.name
if self.tags:
ret += "".join(f" #{t}" for t in self.tags)
ret += f" (#{self.id})"
return ret
@@ -151,9 +168,38 @@ class ClickupSession:
)
return loads(raw_data)
def get_tasks(self) -> list[ClickupTask]:
def _post(self, endpoint: str, **body_params: str) -> JSONDataMap:
raw_data = post(
self.base_url + endpoint,
body_params,
headers={
"accept": "application/json",
"content-type": "application/json",
"Authorization": self.auth_key,
},
)
return loads(raw_data)
def _delete(self, endpoint: str) -> JSONDataMap:
raw_data = delete(
self.base_url + endpoint,
headers={
"accept": "application/json",
"content-type": "application/json",
"Authorization": self.auth_key,
},
)
return loads(raw_data)
def get_tasks(self, **filters: str) -> list[ClickupTask]:
data = self._get(
f"/team/{self.workspace_id}/task?subtasks=true&include_markdown_description=true&assignees[]={self.user_id}"
f"/team/{self.workspace_id}/task",
**{
"subtask": "true",
"include_markdown_description": "true",
"assignees[]": self.user_id,
}
| filters,
).get("tasks", [])
if isinstance(data, list):
return [ClickupTask.from_resp_data(t) for t in data if isinstance(t, dict)]
@@ -169,4 +215,16 @@ class ClickupSession:
)
def update(self, data: ClickupTask) -> None:
_ = self._put(f"/task/{data.id}", **data.updateables)
ret_task = self._put(f"/task/{data.id}", **data.updateables)
current_tags: set[str] = set(ret_task["tags"])
new_tags = set(data.tags)
for del_tag in current_tags - new_tags:
self._delete(f"/task/{data.id}/tag/{del_tag}")
for add_tag in new_tags - current_tags:
self._post(f"/task/{data.id}/tag/{add_tag}")
def create(self, data: ClickupTask) -> str:
ret_task = self._post(f"/list/{data.parent_list}/task", **data.updateables)
return str(ret_task["id"])
+75 -19
View File
@@ -1,9 +1,11 @@
from collections.abc import Callable, Sequence
from pathlib import Path
from typing import Any
from pynvim import Nvim, command, plugin
from pynvim.api import Buffer
from .clickup import ClickupSession
from .clickup import ClickupSession, ClickupTask
from .hints import JSONData
from .yaml import load, dump
@@ -28,18 +30,20 @@ class MrPyPlugin:
"assignees": usernames_from_objs,
}
def select_task_id(self) -> None:
tasks = self.clickup.get_tasks()
def select_task_id(self, tags: set[str] | None = None) -> None:
tasks = self.clickup.get_tasks(**({"tags[]": "&tags[]=".join(tags)} if tags else {}))
task_names_by_id = [
{
"idx": tix + 1,
"id": t.id,
"text": t.short,
"name": t.name,
"tags": t.tags,
"status": t.status,
"is_child": bool(t.parent),
"preview": {
"text": f"---\n{dump(t.showables)}---\n{t.markdown_description}",
"ft": "markdown",
"text": dump(t.showables),
"ft": "yaml",
},
"action": f":Mrpy {t.id}",
}
@@ -70,8 +74,12 @@ class MrPyPlugin:
ret[#ret + 1] = { item.name, hl }
for _, v in ipairs(item.tags) do
ret[#ret + 1] = { " #" .. v, "SnacksPickerComment" }
end
ret[#ret + 1] = {
" (#" .. item.id .. ")",
" 󰻾" .. item.id,
"SnacksPickerComment"
}
return ret
@@ -91,34 +99,36 @@ class MrPyPlugin:
task = self.clickup.get_task(task_id)
temp_buf: Buffer = self.nvim.api.create_buf(True, False)
self.nvim.api.buf_set_name(temp_buf, f"[ClickUp] {task.name}")
self.nvim.buffers[temp_buf.number].options["filetype"] = "markdown"
self.nvim.buffers[temp_buf.number].options["filetype"] = "yaml"
self.nvim.api.create_autocmd(
["BufWriteCmd"],
{"buffer": temp_buf.number, "command": "MrpyPush " + str(temp_buf.number)},
)
content = ["---"]
content = [
f"# yaml-language-server: $schema={Path(__file__).parent.parent.parent.parent}/schema.json ",
"---",
]
content.extend(
dump(
task.showables,
).splitlines()
)
content.append("---")
content.extend(task.markdown_description.splitlines())
self.nvim.api.buf_set_lines(temp_buf, 0, 0, False, content)
self.nvim.buffers[temp_buf.number].options["modified"] = False
self.nvim.api.win_set_buf(0, temp_buf)
@command("Mrpy", nargs="?")
@command("Mrpy", nargs="*")
def entry(self, args: Sequence[str] = ()) -> None:
match args:
case (str() as task_id,):
try:
self.open_task_buffer(task_id)
case ():
self.select_task_id()
case _:
pass
except:
self.select_task_id({task_id})
case tags:
self.select_task_id(set(tags))
@command("MrpyPush", nargs="?")
def on_vbuf_write(self, args: Sequence[str] = ()) -> None:
@@ -132,14 +142,60 @@ class MrPyPlugin:
try:
a = self.nvim.buffers[buf_no][0:-1]
_, fm, *text = "\n".join(a).split("---\n")
fm = "\n".join(a)
data = load(fm)
assert isinstance(data, dict)
data["markdown_description"] = "\n\n".join(text)
data["markdown_content"] = data["markdown_content"].strip() + "\n"
self.nvim.err_write(str(data) + "\n")
# self.clickup.update(data)
self.clickup.update(ClickupTask(**data))
self.nvim.buffers[buf_no].options["modified"] = False
except ValueError:
pass
@command("MrpyNew", nargs=0)
def on_new_task(self) -> None:
task = ClickupTask(name="", status="backlog", markdown_content="", parent_list="")
temp_buf: Buffer = self.nvim.api.create_buf(True, False)
self.nvim.api.buf_set_name(temp_buf, f"[ClickUp] New Task")
self.nvim.buffers[temp_buf.number].options["filetype"] = "yaml"
self.nvim.api.create_autocmd(
["BufWriteCmd"],
{"buffer": temp_buf.number, "command": "MrpyPushNew " + str(temp_buf.number)},
)
content = [
f"# yaml-language-server: $schema={Path(__file__).parent.parent.parent.parent}/schema.json ",
"---",
]
content.extend(
dump(
task.showables,
).splitlines()
)
self.nvim.api.buf_set_lines(temp_buf, 0, 0, False, content)
self.nvim.buffers[temp_buf.number].options["modified"] = False
self.nvim.api.win_set_buf(0, temp_buf)
@command("MrpyPushNew", nargs=1)
def on_new_vbuf_write(self, buf_no: Sequence[Any] = ()) -> None:
buf_no = int(buf_no[0])
try:
a = self.nvim.buffers[buf_no][0:-1]
fm = "\n".join(a)
data = load(fm)
assert isinstance(data, dict)
data["markdown_content"] = data["markdown_content"].strip() + "\n"
new_task = ClickupTask(**data)
new_task_id = self.clickup.create(new_task)
self.nvim.buffers[buf_no].options["modified"] = False
self.nvim.command("bdelete " + str(buf_no))
self.entry([new_task_id])
except ValueError:
pass
+36 -12
View File
@@ -1,12 +1,18 @@
from json import dumps
from ssl import SSLContext, create_default_context
from urllib.error import HTTPError
from urllib.request import Request, urlopen
from urllib.parse import urlencode
def get(
def request(
url: str,
method: str,
query_params: dict[str, str] | None = None,
data: dict[str, str] | None = None,
headers: dict[str, str] | None = None,
) -> str:
try:
with urlopen(
Request(
url
@@ -15,12 +21,32 @@ def get(
if query_params
else ""
),
dumps(data).encode() if data else None,
headers=headers or {},
method="GET",
method=method,
),
context=create_default_context(),
) as resp:
return resp.read().decode("utf-8")
except HTTPError as e:
e.add_note(e.url)
raise
def get(
url: str,
query_params: dict[str, str] | None = None,
headers: dict[str, str] | None = None,
) -> str:
return request(url, "GET", query_params=query_params, headers=headers)
def post(
url: str,
data: dict[str, str] | None = None,
headers: dict[str, str] | None = None,
) -> str:
return request(url, "POST", data=data, headers=headers)
def put(
@@ -28,13 +54,11 @@ def put(
data: dict[str, str] | None = None,
headers: dict[str, str] | None = None,
) -> str:
with urlopen(
Request(
url,
str(data).encode(),
headers=headers or {},
method="PUT",
),
context=create_default_context(),
) as resp:
return resp.read().decode("utf-8")
return request(url, "PUT", data=data, headers=headers)
def delete(
url: str,
headers: dict[str, str] | None = None,
) -> str:
return request(url, "DELETE", headers=headers)
+44 -5
View File
@@ -17,7 +17,7 @@ def dump_scalar(entry: hints.JSONDataScalar) -> str:
return f"{entry}\n"
def dump(obj: dict[str, hints.JSONDataScalar | list[hints.JSONDataScalar]]) -> str:
def dump(obj: hints.JSONDataMap) -> str:
ret = ""
for key, value in obj.items():
ret += key
@@ -26,11 +26,32 @@ def dump(obj: dict[str, hints.JSONDataScalar | list[hints.JSONDataScalar]]) -> s
match value:
case []:
ret += " []\n"
case {}:
ret += " {}\n"
case "":
ret += ' ""\n'
case list() as entries:
ret += "\n"
for entry in entries:
match entry:
case dict():
subdump = dump(entry)
ret += "\n".join(" " + l for l in subdump.splitlines())
case list():
pass
case _:
ret += f" - {dump_scalar(entry)}"
case dict() as substruct:
ret += "\n"
subdump = dump(substruct)
ret += "\n".join(" " + l for l in subdump.splitlines()) + "\n"
case str() as mlstring if key == "markdown_content":
if mlstring.strip() == "" or dump_scalar(mlstring).strip() == "null":
ret += ' ""\n'
else:
indented = "\n".join((" " + l) for l in dump_scalar(mlstring).splitlines())
ret += f" >\n{indented}\n"
case entry:
ret += f" {dump_scalar(entry)}"
@@ -56,6 +77,10 @@ def load(content: str) -> hints.JSONData:
... key10:
... - str
... - "str"
... key11: >
... * a
... * b:
... * c
... '''
>>> yaml_parsed = load(yaml)
>>> compare = {
@@ -69,8 +94,8 @@ def load(content: str) -> hints.JSONData:
... 'key8': False,
... 'key9': [1, 23.2],
... 'key10': ['str', 'str'],
... 'key11': "* a\\n* b:\\n * c",
... }
>>> {k: yaml_parsed.get(k, None) for k in compare if compare[k] != yaml_parsed[k]}
{}
"""
@@ -78,20 +103,34 @@ def load(content: str) -> hints.JSONData:
ret = {}
key = None
value = None
for line in sub(r":[\s\n]*", ":\n ", dedent(content).strip()).splitlines():
if line.endswith(":"):
mlstring = False
for line in sub(r"(:( >)?)[\s\n]*", r"\1\n ", dedent(content).strip()).splitlines():
if line.startswith("#") or line.strip() == "---":
continue
elif line.endswith(":"):
if key:
ret[key] = value
value = None
mlstring = False
key = line.removesuffix(":")
elif line.startswith(" -"):
elif line.endswith(": >"):
if key:
ret[key] = value
value = None
mlstring = True
key = line.removesuffix(": >")
elif line.startswith(" -") and not mlstring:
value = value or []
try:
parsed = loads(line.removeprefix(" -").strip())
except:
parsed = loads('"' + line.removeprefix(" -").strip() + '"')
value.append(parsed)
elif line.startswith(" ") and mlstring:
value = value or ""
value += line.removeprefix(" ") + "\n"
else:
try:
value = loads(line.strip())
+44
View File
@@ -0,0 +1,44 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"description": "ETA yaml frontmatter",
"properties": {
"name": {
"type": "string"
},
"status": {
"type": "string",
"enum": [
"backlog",
"selected for development",
"in progress",
"in review",
"on hold",
"done",
"closed"
]
},
"markdown_content": {
"type": "string"
},
"id": {
"type": "string",
"pattern": "^[a-z0-9]+$"
},
"parent_list": {
"anyOf": [
{
"const": 900400316794,
"description": "OPE Tooling"
}
]
}
},
"required": [
"name",
"status",
"markdown_content",
"parent_list"
],
"title": "clickup task",
"type": "object"
}