Compare commits

..

1 Commits

Author SHA1 Message Date
acereca b874441f06 replace use of pydantic_settings w/ dataclass 2026-01-02 12:52:08 +01:00
18 changed files with 123 additions and 1327 deletions
-39
View File
@@ -1,42 +1,3 @@
# mrpy.nvim # mrpy.nvim
NeoVim plugin, integrating a clickup to gitlab workflow (1 task to 1 issue to 1 merge request) NeoVim plugin, integrating a clickup to gitlab workflow (1 task to 1 issue to 1 merge request)
## Install
### Lazy.nvim
#### From Source
```lua
return {
dir = "<path/to/cloned/repo>",
dev = true,
dependencies = {
"nvim-lua/plenary.nvim",
},
}
```
## Setup
### Shell Configuration
#### Environment Variables
CLICKUP_AUTH
: user api token for clickup
CLICKUP_USER_ID
: user id for clickup
CLICKUP_WORKSPACE_ID
: workspace id for clickup
## Commands
### `:MrPy`
- start task selector of user's tasks (not done/closed)
View File
-68
View File
@@ -1,68 +0,0 @@
from dataclasses import dataclass, field
from json import loads
from pydantic import BaseModel
from requests import HTTPError, get
from .common import EnvVar, JSONDataMap, JSONData, JSONDataList, JSONDataScalar
class ClickupTask(BaseModel):
id: str
name: str
@dataclass
class ClickupSession:
auth_key: str = field(
default_factory=EnvVar(
"CLICKUP_AUTH",
"clickup auth token is required to be set",
)
)
workspace_id: str = field(
default_factory=EnvVar(
"CLICKUP_WORKSPACE_ID",
"clickup workspace id is required to be set",
)
)
user_id: str = field(
default_factory=EnvVar(
"CLICKUP_USER_ID",
"clickup user id is required to be set",
)
)
base_url: str = "https://api.clickup.com/api/v2"
def _get(self, endpoint: str, **query_params: str) -> JSONDataMap:
with get(
self.base_url + endpoint,
query_params,
headers={
"accept": "application/json",
"Authorization": self.auth_key,
},
) as resp:
return resp.json()
def get_tasks(self, **filters: str) -> list[ClickupTask]:
data = self._get(
f"/team/{self.workspace_id}/task",
**{
"subtasks": "true",
"include_markdown_description": "true",
"assignees[]": self.user_id,
}
| filters,
).get("tasks", [])
if isinstance(data, list):
return [ClickupTask.model_validate(t) for t in data if isinstance(t, dict)]
return []
def get_task(self, task_id: str) -> ClickupTask:
return ClickupTask.model_validate(
self._get(
f"/task/{task_id}",
include_markdown_description="true",
),
)
-32
View File
@@ -1,32 +0,0 @@
from __future__ import annotations
from dataclasses import dataclass
from os import environ
from typing import TypeAlias
JSONDataScalar: TypeAlias = str | None | float | bool
JSONDataList: TypeAlias = list["JSONDataScalar | JSONDataMap | JSONDataList"]
JSONDataMap: TypeAlias = dict[str, "JSONDataScalar | JSONDataList | JSONDataMap"]
JSONData: TypeAlias = "JSONDataMap | JSONDataList"
@dataclass
class EnvVar:
"""
Environment Variable fetcher for use in dataclass ``field(default_factory=...)``
>>> @dataclass
>>> class SomeDataclass:
... field_name: str = field(default_factory=EnvVar("SOME_VAR_NAME", "err msg"))
"""
var_name: str
err_msg: str = ""
def __call__(self) -> str:
try:
return environ[self.var_name]
except KeyError as e:
e.add_note(self.err_msg)
raise
-90
View File
@@ -1,90 +0,0 @@
from lsprotocol.types import (
TEXT_DOCUMENT_COMPLETION,
TEXT_DOCUMENT_DOCUMENT_SYMBOL,
CompletionItem,
CompletionItemKind,
CompletionItemLabelDetails,
CompletionOptions,
CompletionParams,
DocumentSymbol,
DocumentSymbolParams,
NotebookDocumentSyncOptions,
Position,
Range,
SymbolKind,
TextDocumentSyncKind,
WorkDoneProgressBegin,
WorkDoneProgressEnd,
WorkDoneProgressReport,
)
from pygls.lsp.server import LanguageServer
from .clickup import ClickupSession, ClickupTask
class CustomServer(LanguageServer):
cache: dict[str, ClickupTask]
cu_session: ClickupSession
def __init__(
self,
name: str,
version: str,
text_document_sync_kind: TextDocumentSyncKind = TextDocumentSyncKind.Incremental,
notebook_document_sync: NotebookDocumentSyncOptions | None = None,
) -> None:
super().__init__(name, version, text_document_sync_kind, notebook_document_sync)
self.cache = {}
self.cu_session = ClickupSession()
self.update_task_cache()
def update_task_cache(self) -> None:
self.protocol.progress.begin(
"startup", WorkDoneProgressBegin("Fetching Cache ...", percentage=0, cancellable=True)
)
self.cache = {}
tasks = self.cu_session.get_ta()
for ti, t in enumerate(tasks):
self.cache[t.id] = t
self.protocol.progress.report(
"startup",
WorkDoneProgressReport(
message="Fetched Cache", percentage=int(100 * (1 + ti) / len(tasks))
),
)
self.protocol.progress.end("startup", WorkDoneProgressEnd(message="Done Caching"))
server = CustomServer("mrpy-server", "0.1.0")
@server.feature(TEXT_DOCUMENT_DOCUMENT_SYMBOL)
async def list_ids(params: DocumentSymbolParams) -> list[DocumentSymbol]:
return [
DocumentSymbol(
t.id,
SymbolKind.Enum,
Range(Position(i, 0), Position(i, 0)),
Range(Position(i, 0), Position(i, 0)),
detail=t.name,
)
for i, t in enumerate(server.cache.values())
]
@server.feature(TEXT_DOCUMENT_COMPLETION)
async def complete_cu_ids(params: CompletionParams) -> list[CompletionItem]:
return [
CompletionItem(
t.name,
CompletionItemLabelDetails(detail=f" #{t.id}"),
kind=CompletionItemKind.Constant,
insert_text=f"[{t.name} #{t.id}](https://app.clickup.com/t/{t.id})",
)
for t in server.cache.values()
]
def main() -> None:
server.start_io()
-103
View File
@@ -1,103 +0,0 @@
local curl = require("plenary.curl")
local helpers = require("eta.helpers")
local M = {}
---@class eta.clickup.Session: eta.Session
---@field user string
---@field workspace string
---@class eta.clickup.Ref
---@field name string
---@field id string
---@class eta.clickup.Dep
---@field task_id string
---@field depends_on string
---@class eta.clickup.Task
---@field id string
---@field name string
---@field tags? table[]
---@field locations? table[]
---@field list? eta.clickup.Ref
---@field parent? string | nil
---@field dependencies? eta.clickup.Dep[]
---@field [string] string
---@param self eta.clickup.Session
---@return eta.clickup.Task[]
M.latest_tasks = function(self)
local ret = helpers.request("get", self, "/team/" .. self.workspace .. "/task",{subtasks="true", include_markdown_description="true", ['assignees[]']= self.user})
if ret then
return ret.tasks
end
return {}
end
---@param self eta.clickup.Session
---@param id string
---@return eta.clickup.Task
M.task = function(self, id)
local ret = helpers.request("get", self, "/task/" .. id,{include_markdown_description="true"}) or {}
return ret
end
---@param self eta.clickup.Session
---@param id string
M.task_relations = function(self, id)
local ret = helpers.request("get", self, "/task/" .. id .. "/dependency", {}) or {}
return ret
end
M.insert_ref = function()
local pos = vim.api.nvim_win_get_cursor(0)
local row = pos[1] - 1
local col = pos[2]
local tasks = M.latest_tasks(require("plugin.eta").clickup_session)
---@type SelectionItem[]
local items = {}
for tix, t in ipairs(tasks) do
if string.sub(t.name, -7, -1) ~= "Absence" then
local preview_frontmatter = ""
---@type SelectionItem
local prepared = {
idx = tix,
id = t.id,
text = t.name .. t.id .. t.markdown_description,
name = t.name,
tags = require("plugin.eta").retrieve_subkeys(t.tags, { "name" }),
status = t.status.status,
parent = t.parent,
list = t.list.name,
description = t.markdown_description,
preview = {
ft = "markdown",
},
action = M._on_select_task
}
for _, k in ipairs({ "id", "name", "status", "tags", "parent" }) do
preview_frontmatter = preview_frontmatter .. "\n" .. k .. ": " .. vim.json.encode(prepared[k])
end
prepared.preview.text = "---" .. preview_frontmatter .. "\n---\n" .. t.markdown_description
table.insert(items, #items + 1, prepared)
end
end
require("snacks.picker").pick({
title = "Select Task",
format = require("plugin.eta")._item_format,
preview = "preview",
confirm = function(picker, item)
picker:close()
vim.api.nvim_buf_set_text(0, row, col, row, col,
{ "[#" .. item.id .. "](https://app.clickup.com/t/" .. item.id .. ")" })
end,
items = items
})
end
return M
-149
View File
@@ -1,149 +0,0 @@
local notify = require("snacks.notifier").notify
local helpers = require("eta.helpers")
local M = {}
---@class eta.gitlab.Session: eta.Session
---@field auth string personal access token `PRIVATE_TOKEN: <auth_token>`
---@class eta.gitlab.Namespace
---@field full_path string
---@class eta.gitlab.Project
---@field id number
---@field name string
---@field path_with_namespace string
---@field tag_list string[]
---@field text? string
---@field namespace eta.gitlab.Namespace
---@field preview? {ft: string, text: string}
---@class eta.gitlab.Milestone
---@field id number
---@field title string
---@class eta.gitlab.Assignee
---@field username string
---@field id number
---@class eta.gitlab.Issue
---@field id number
---@field milestone eta.gitlab.Milestone
---@field title string
---@field assignees eta.gitlab.Assignee[]
---@field description string
---@field labels string[]
---@param session eta.gitlab.Session
---@return eta.gitlab.Project[]
M.possible_projects = function(session)
return helpers.request("get", session, "/projects", {simple="true", per_page="100"}) or {}
end
---@param prj eta.gitlab.Project
M._project_format = function(prj)
local ret = {}
ret[#ret + 1] = { prj.namespace.full_path .. "/", "SnacksPickerComment" }
ret[#ret + 1] = { prj.name }
return ret
end
---@param self eta.gitlab.Session
---@return eta.gitlab.Project | nil
M.update_current_project = function(self)
local cmd = "git remote -v | grep fetch | cut -f2 | cut -d' ' -f1 | cut -d':' -f2"
local handle, _ = io.popen(cmd, 'r')
if not handle then return nil end
local repo = handle:read("*a")
handle:close()
local prjs = M.possible_projects(self)
for _, prj in ipairs(prjs) do
if prj.path_with_namespace == repo then
M.active_project = prj
notify("selected " .. prj.path_with_namespace, "info", { title = "ETA", style = 'fancy' })
return prj
end
end
end
---@type eta.gitlab.Project
M.active_project = nil
---@param picker snacks.Picker
---@param item eta.gitlab.Project
M._on_project_select = function(picker, item)
M.active_project = item
picker:close()
notify("selected " .. item.path_with_namespace, "info", { title = "ETA", style = 'fancy' })
end
---@param self eta.gitlab.Session
M.select_project = function(self)
local prjs = M.possible_projects(self)
require("snacks.picker").pick({
title = "Select Project",
format = M._project_format,
preview = "preview",
confirm = M._on_project_select,
items = prjs
})
end
---@param self eta.gitlab.Session
---@return eta.gitlab.Issue[]
M.active_issues = function(self)
if not M.active_project then
if not M.update_current_project(self) then
M.select_project(self)
end
end
return request("get", self, "/projects/" .. tostring(M.active_project.id) .. "/issues", {})
end
---@param self eta.gitlab.Session
---@param name string
---@param clickup_task_id string
---@param tags string[]
M.new_issue = function(self, name, clickup_task_id, tags)
if not M.active_project then
if not M.update_current_project(self) then
M.select_project(self)
end
end
local description = "%23" .. clickup_task_id
local title = name:gsub("%s", "%20")
return request("post", self, "/projects/" ..
tostring(M.active_project.id) ..
"/issues", {
title=title, description=description, labels=table.concat(tags, ",")
})
end
---@param self eta.gitlab.Session
---@param name string
---@param issue number
---@param clickup_task_id string
---@param tags string[]
M.new_mr = function(self, name, issue, clickup_task_id, tags)
if not M.active_project then
if not M.update_current_project(self) then
M.select_project(self)
end
end
local description = "Closes %23" .. tostring(issue) .. " | %23" .. clickup_task_id
local title = name:gsub("%s", "%20")
return request("post", self, "/projects/" .. tostring(M.active_project.id) .. "/merge_requests", {
title=title, description=description, labels=table.concat(tags, ",")
})
end
return M
-63
View File
@@ -1,63 +0,0 @@
local curl = require("plenary.curl")
local M = {}
---@class eta.Session
---@field auth string
---@field base_url string
---@param method `get` | `post`
---@param session eta.Session
---@param endpoint string
---@param params {[string]: string}
---@returns table | nil
M.request = function(method, session, endpoint, params)
local param_list = {}
for param_name, param_value in pairs(params) do
table.insert(param_list, param_name .. "=" .. param_value)
end
local url = session.base_url .. endpoint
if #param_list then
url = url .. "?" .. table.concat(param_list, "&")
end
local resp = nil
if method == "get" then
resp = curl.get(url, {headers = {
-- ['PRIVATE-TOKEN'] = session.auth,
['Authorization'] = session.auth,
["content-type"] = "application/json",
}})
elseif method == "post" then
resp = curl.post(url, {headers = {
-- ['PRIVATE-TOKEN'] = session.auth,
['Authorization'] = session.auth,
["content-type"] = "application/json",
}})
else
return nil
end
if resp.status == 200 then
local prjs = vim.json.decode(resp.body)
for _, prj in ipairs(prjs) do
local fcmd = "echo '" .. vim.json.encode(prj) .. "' | jq"
local fhandle, _ = io.popen(fcmd, 'r')
if not fhandle then goto continue end
local formatted = fhandle:read("*a")
fhandle:close()
prj.text = prj.path_with_namespace
prj.preview = {
ft = "json",
text = formatted
}
::continue::
end
return prjs
end
print("failed http request: " .. tostring(resp.status) .. " (" .. resp.body .. ", " .. url .. ")")
return nil
end
return M
-319
View File
@@ -1,319 +0,0 @@
local curl = require("plenary.curl")
local notify = require("snacks.notifier").notify
local gitlab = require("eta.gitlab")
local clickup = require("eta.clickup")
local M = {}
---@param data table
---@return string | nil
M.table_to_yaml = function(data)
local json = vim.json.encode(data)
-- Nutzt 'yq' um JSON zu YAML zu konvertieren
local handle = io.popen("echo '" .. json .. "' | yq -y")
local result = handle:read("*a")
handle:close()
if not result then
print("Error: yq not installed or got invalid json data")
return nil
end
return result
end
---@param yaml_string string
---@return table | nil
M.yaml_to_table = function(yaml_string)
-- 1. YAML String in yq einspeisen und JSON ausgeben lassen
-- Das Flag -o=json sorgt für die Konvertierung
local temp_file = os.tmpname()
local temp_file_handle = io.open(temp_file, "w+b")
if not temp_file_handle then
print("Error: could not open temp file")
print(" temp_file: " .. temp_file)
return nil
end
temp_file_handle:write(yaml_string)
temp_file_handle:flush()
temp_file_handle:close()
local cmd = "cat " .. temp_file .. " | yq"
local handle, errres = io.popen(cmd, 'r')
local json_result = handle:read("*a")
handle:close()
-- 2. Fehlerprüfung
if json_result == "" then
print("Error: yq not installed or got invalid yaml data")
print(" cmd: " .. cmd)
print(" err: " .. errres)
return nil
end
-- 3. JSON in Lua-Table umwandeln
local ok, table_result = pcall(vim.json.decode, json_result)
if not ok then
print("Error: failed to decode json")
print(" cmd: " .. cmd)
print(" json: " .. json_result)
return nil
end
os.remove(temp_file)
return table_result
end
---@type eta.clickup.Session
M.clickup_session = nil
---@type eta.gitlab.Session
M.gitlab_session = nil
---@param task eta.clickup.Task
---@return eta.clickup.Task
M._update_task = function(task)
local update_table = {}
for _, k in ipairs({ "name", "markdown_content", "status" }) do
if task[k] then
update_table[k] = task[k]
end
end
local resp = curl.put(M.clickup_session.base_url .. "/task/" .. task.id, {
headers = {
['Authorization'] = M.clickup_session.auth,
["accept"] = "application/json",
["content-type"] = "application/json",
},
body = vim.json.encode(update_table)
})
if resp.status ~= 200 then
error("failed to update " .. task.id .. "\n(" .. resp.body .. ")")
end
return vim.json.decode(resp.body)
end
---@param args vim.api.keyset.create_autocmd.callback_args
M._on_tempbuf_write = function(args)
local lines = vim.api.nvim_buf_get_lines(args.buf, 0, -1, false)
---@type string[]
local parts = {}
---@type string[]
local current = {}
for _, line in ipairs(lines) do
if line == "---" then
if #current then
table.insert(parts, table.concat(current, "\n"))
current = {}
end
else
table.insert(current, line)
end
end
table.insert(parts, table.concat(current, "\n"))
---@type eta.clickup.Task
local data = M.yaml_to_table(parts[2]) or {}
data['markdown_content'] = string.gsub(parts[3], "%-%-%-\n", "")
M._update_task(data)
vim.api.nvim_set_option_value("modified", false, { buf = args.buf })
local notif_id = notify("updated task #" .. data.id, "info", { title = "ETA", style = 'fancy' })
end
---@class SelectionItem
---@field status string
---@field parent string | nil
---@field list string
---@field name string
---@field description string
---@field tags string[]
---@field id string
---@param picker snacks.Picker | nil
---@param item SelectionItem
M._on_select_task = function(picker, item)
local notif_id = notify("opening task #" .. item.id, "info", { title = "ETA", style = 'fancy' })
local new_name = "[ClickUp] " .. item.name
local new_buf_no = vim.api.nvim_create_buf(true, false)
local status, _ = pcall(vim.api.nvim_buf_set_name, new_buf_no, new_name)
if status then
-- vim.api.nvim_set_option_value("buftype", "nofile", { buf = new_buf_no })
vim.api.nvim_set_option_value("filetype", "markdown", { buf = new_buf_no })
vim.api.nvim_create_autocmd({ "BufWriteCmd" }, { buffer = new_buf_no, callback = M._on_tempbuf_write })
local content = { "---"}
local to_dump = {}
for _, k in ipairs({ "id", "name", "status", "tags", "list", "parent", "dependencies" }) do
to_dump[k] = item[k]
end
local yaml_string = M.table_to_yaml(to_dump) or ""
for line in yaml_string:gmatch("([^\n]*)\n?") do
table.insert(content, line)
end
-- table.insert(content, "}")
table.insert(content, "---")
for line in string.gmatch(item.description, "[^\r\n]+") do
table.insert(content, line)
end
vim.api.nvim_buf_set_lines(new_buf_no, 0, 0, false, content)
vim.api.nvim_set_option_value("modified", false, { buf = new_buf_no })
vim.api.nvim_win_set_buf(0, new_buf_no)
else
vim.api.nvim_buf_delete(new_buf_no, {})
if picker then
picker:close()
end
local to_open = vim.fn.bufnr(new_name)
vim.api.nvim_win_set_buf(0, to_open)
end
end
---@param item SelectionItem
M._item_format = function(item, _)
local ret = {}
local hl = "SnacksPickerComment"
if item.status == "in progress" then
hl = "@method"
elseif item.status == "selected for development" then
hl = "@constant.builtin"
elseif item.status == "in review" then
hl = "@keyword"
elseif item.status == "done" then
hl = "@variable.builtin"
end
if item.parent ~= vim.NIL then
ret[#ret + 1] = { "󰘍 ", "SnacksPickerComment" }
end
ret[#ret + 1] = { item.name, hl }
for _, v in ipairs(item.tags) do
ret[#ret + 1] = { " #" .. v, "SnacksPickerComment" }
end
ret[#ret + 1] = {
" 󰻾" .. item.id,
"SnacksPickerComment"
}
return ret
end
---@param list table[]
---@param keys string[]
M.retrieve_subkeys = function(list, keys)
local ret = {}
local interm = list
for _, key in ipairs(keys) do
ret = {}
for _, item in ipairs(interm) do
table.insert(ret, item[key])
end
interm = ret
end
return ret
end
---@param id string
M.open_task = function(id)
if id:match("[a-z0-9]+") then
local notif_id = notify("opening #".. id, "info", { timeout = 1000, title = "ETA", style = 'fancy' })
local t = clickup.task(M.clickup_session, id)
---@type SelectionItem
local item = {
idx = nil,
id = t.id,
text = t.name .. t.id .. t.markdown_description,
name = t.name,
tags = M.retrieve_subkeys(t.tags, { "name" }),
status = t.status.status,
parent = t.parent,
list = t.list.name,
description = t.markdown_description,
preview = {
ft = "markdown",
},
action = nil
}
pcall(M._on_select_task,nil, item)
else
error("invalid id format")
end
end
---@param data vim.api.keyset.create_user_command.command_args
M.select_task = function(data)
local notif_id = notify("fetching tasks", "warn", { timeout = 10000, title = "ETA", style = 'fancy' })
if not pcall(M.open_task,vim.fn.expand("<cword>")) then
local tasks = clickup.latest_tasks(M.clickup_session)
---@type SelectionItem[]
local items = {}
for tix, t in ipairs(tasks) do
if string.sub(t.name, -7, -1) ~= "Absence" then
local preview_frontmatter = ""
local deps = {}
for _, dependency in ipairs(t.dependencies) do
table.insert(deps, dependency.depends_on)
end
---@type SelectionItem
local prepared = {
idx = tix,
id = t.id,
text = t.name .. t.id .. t.markdown_description,
name = t.name,
tags = M.retrieve_subkeys(t.tags, { "name" }),
status = t.status.status,
parent = t.parent,
list = t.list.name,
description = t.markdown_description,
dependencies = deps,
preview = {
ft = "markdown",
},
action = M._on_select_task
}
for _, k in ipairs({ "id", "name", "status", "tags", "parent"}) do
preview_frontmatter = preview_frontmatter .. "\n" .. k .. ": " .. vim.json.encode(prepared[k])
end
prepared.preview.text = "---" .. preview_frontmatter .. "\n---\n" .. t.markdown_description
table.insert(items, #items + 1, prepared)
end
end
require("snacks.notifier").hide(notif_id)
require("snacks.picker").pick({
title = "Select Task",
format = M._item_format,
preview = "preview",
confirm = M._on_select_task,
items = items
})
end
end
---@param data vim.api.keyset.create_user_command.command_args
M.mr_from_task = function(data)
if data.args then
notify(data.args, "info", { title = "ETA" })
end
end
return M
-18
View File
@@ -1,18 +0,0 @@
local eta = require("eta")
local eta_clickup = require("eta.clickup")
vim.api.nvim_create_user_command("ETA", eta.select_task, {})
---@type eta.clickup.Session
eta.clickup_session = {
auth = os.getenv("CLICKUP_AUTH") or "",
user = os.getenv("CLICKUP_USER_ID") or "",
workspace = os.getenv("CLICKUP_WORKSPACE_ID") or "",
base_url = "https://api.clickup.com/api/v2",
}
---@type eta.gitlab.Session
eta.gitlab_session = {
auth = os.getenv("GITLAB_AUTH") or "",
base_url = "https://git.extoll.de/api/v4",
}
+6 -20
View File
@@ -1,25 +1,11 @@
[build-system]
requires = ['setuptools>=57', "wheel"]
build-backend = "setuptools.build_meta"
[project] [project]
name = "eta-lsp" name = "mrpy-nvim"
version = "0.1.0" version = "0.1.0"
description = "EXTOLL Task Access" description = "Add your description here"
readme = "README.md" readme = "README.md"
requires-python = ">=3.11" requires-python = ">=3.12"
dependencies = [ dependencies = [
"pygls~=2.0", "pydantic-settings~=2.12",
"pydantic~=2.12", "pynvim~=0.6.0",
"requests~=2.32", "requests>=2.32.5",
] ]
authors = [
{ name = "Patrick Nisble", email = "acereca@outlook.de"},
]
[project.scripts]
eta-lsp = "eta.main:main"
[tool.setuptools.packages.find]
include = ['mrpy']
namespaces = false
+85 -160
View File
@@ -1,105 +1,97 @@
from collections.abc import Sequence from dataclasses import dataclass, field
from dataclasses import dataclass, field, fields
from json import loads from json import loads
from os import environ from os import environ
from typing import Any, Callable, Self, TypedDict from typing import Any, Callable
from urllib.error import HTTPError
from .hints import JSONDataMap from pydantic import AliasPath, BaseModel, Field, field_validator
from .requests import delete, get, put, post
from .env import EnvVar from urllib.request import Request, urlopen
class TaskRespCheckItemDict(TypedDict): def get(
name: str url: str,
resolved: int query_params: dict[str, str] | None = None,
headers: dict[str, str] | None = None,
) -> str:
with urlopen(
Request(
url + ("?" + "&".join(f"{k}={v}" for k, v in query_params.items()))
if query_params
else "",
headers=headers or {},
method="GET",
),
) as resp:
return resp.read().decode("utf-8")
class TaskRespCheckDict(TypedDict): def put(
name: str url: str,
items: list[TaskRespCheckItemDict] data: dict[str, str] | None = None,
headers: dict[str, str] | None = None,
) -> str:
with urlopen(
Request(
url,
str(data).encode(),
headers=headers or {},
method="PUT",
),
) as resp:
return resp.read().decode("utf-8")
class TaskRespDict(TypedDict): class ClickupTask(BaseModel):
"""fields marked with `exclude=True` will not be pushed for updates"""
name: str name: str
markdown_description: str markdown_description: str
status: dict[str, str] status: str = Field(validation_alias=AliasPath("status", "status"))
id: str id: str = Field(exclude=True)
assignees: list[dict[str, str]] assignees: list[str] = Field(exclude=True)
tags: list[dict[str, str]] tags: list[str] = Field(exclude=True)
parent: str | None parent: str | None = Field(None, exclude=True)
locations: list[dict[str, str]] parent_list: str = Field(validation_alias=AliasPath("list", "id"), exclude=True)
checklists: list[TaskRespCheckDict] locations: list[str] = Field(exclude=True)
list: dict[str, str] checklists: dict[str, bool] = Field(exclude=True)
@dataclass(kw_only=True)
class ClickupTask:
"""fields marked with ``repr=False`` will not be pushed for updates"""
name: str
status: str
id: str | None = field(default=None, repr=False)
assignees: list[str] | None = field(default=None, repr=False)
tags: list[str] | None = field(default=None, repr=False)
parent_list: str = field(repr=False)
locations: list[str] | None = field(default=None, repr=False)
checklists: dict[str, dict[str, bool]] | None = field(default=None, repr=False)
parent: str | None = field(default=None, repr=False)
markdown_content: str
@field_validator("checklists", mode="before")
@classmethod @classmethod
def from_resp_data(cls, resp_data_raw: dict[str, Any]) -> Self: def convert_checklists(cls, content: list[Any]) -> dict[str, bool]:
resp_data = TaskRespDict(**resp_data_raw)
return cls(
name=resp_data["name"],
markdown_content=resp_data["markdown_description"],
status=resp_data["status"]["status"],
id=resp_data["id"],
assignees=cls.convert_assignees(resp_data["assignees"]),
tags=cls.convert_simple_list_with_names(resp_data["tags"]),
parent=resp_data.get("parent"),
parent_list=resp_data["list"]["id"],
locations=cls.convert_simple_list_with_names(resp_data["locations"]),
checklists={
tlist["name"]: {t["name"]: bool(t["resolved"]) for t in tlist["items"]}
for tlist in resp_data["checklists"]
},
)
@classmethod
def convert_checklists(cls, content: Sequence[Any]) -> dict[str, bool]:
return { return {
entry["name"]: entry["resolved"] entry["name"]: entry["resolved"]
for checklist in content for checklist in content
for entry in checklist["items"] for entry in checklist["items"]
} }
@field_validator("assignees", mode="before")
@classmethod @classmethod
def convert_assignees(cls, content: Sequence[str | dict[str, str]]) -> list[str]: def convert_assignees(cls, content: list[str | dict[str, Any]]) -> list[str]:
return [(e if isinstance(e, str) else e["username"]) for e in content] return [(e if isinstance(e, str) else e["username"]) for e in content]
@field_validator("tags", "locations", mode="before")
@classmethod @classmethod
def convert_simple_list_with_names( def convert_simple_list_with_names(
cls, cls,
content: Sequence[str | dict[str, str]], content: list[str | dict[str, Any]],
) -> list[str]: ) -> list[str]:
return [(e if isinstance(e, str) else e["name"]) for e in content] return [(e if isinstance(e, str) else e["name"]) for e in content]
@property @property
def updateables(self) -> dict[str, Any]: def updateables(self) -> dict[str, Any]:
return {f.name: getattr(self, f.name, None) for f in fields(type(self)) if f.repr} return {
k: getattr(self, k, None)
for k, v in type(self).model_fields.items()
if not v.exclude
}
@property @property
def showables(self) -> dict[str, Any]: def showables(self) -> dict[str, Any]:
return { return {
dcf.name: getattr(self, dcf.name, ...) k: getattr(self, k, None)
for dcf in fields(self) for k in type(self).model_fields
if getattr(self, dcf.name, ...) is not None if k != "markdown_description"
} }
@property @property
@@ -109,54 +101,34 @@ class ClickupTask:
if self.parent: if self.parent:
ret += " \033[32m 󰙅 " ret += " \033[32m 󰙅 "
ret += self.name ret += f"{self.name} (#{self.id})"
if self.tags:
ret += "".join(f" #{t}" for t in self.tags)
ret += f" (#{self.id})"
return ret return ret
def get_env_var(var_name: str) -> Callable[[], str]:
return lambda var=var_name: environ[var]
@dataclass @dataclass
class ClickupSession: class ClickupSession:
auth_key: str = field( auth_key: str = field(default_factory=get_env_var("CLICKUP_AUTH"))
default_factory=EnvVar( workspace_id: str = field(default_factory=get_env_var("CLICKUP_WORKSPACE_ID"))
"CLICKUP_AUTH", user_id: str = field(default_factory=get_env_var("CLICKUP_USER_ID"))
"clickup auth token is required to be set",
)
)
workspace_id: str = field(
default_factory=EnvVar(
"CLICKUP_WORKSPACE_ID",
"clickup workspace id is required to be set",
)
)
user_id: str = field(
default_factory=EnvVar(
"CLICKUP_USER_ID",
"clickup user id is required to be set",
)
)
base_url: str = "https://api.clickup.com/api/v2" base_url: str = "https://api.clickup.com/api/v2"
def _get(self, endpoint: str, **query_params: str) -> JSONDataMap: def _get(self, endpoint: str, **query_params: Any) -> dict[str, Any]:
try: raw_data = get(
raw_data = get( self.base_url + endpoint,
self.base_url + endpoint, query_params,
query_params, headers={
headers={ "accept": "application/json",
"accept": "application/json", "Authorization": self.auth_key,
"Authorization": self.auth_key, },
}, )
) return loads(raw_data)
return loads(raw_data)
except HTTPError as e:
e.add_note(e.url)
raise
def _put(self, endpoint: str, **body_params: str) -> JSONDataMap: def _put(self, endpoint: str, **body_params: Any) -> dict[str, Any]:
raw_data = put( raw_data = put(
self.base_url + endpoint, self.base_url + endpoint,
body_params, body_params,
@@ -168,63 +140,16 @@ class ClickupSession:
) )
return loads(raw_data) return loads(raw_data)
def _post(self, endpoint: str, **body_params: str) -> JSONDataMap: def get_tasks(self) -> list[ClickupTask]:
raw_data = post(
self.base_url + endpoint,
body_params,
headers={
"accept": "application/json",
"content-type": "application/json",
"Authorization": self.auth_key,
},
)
return loads(raw_data)
def _delete(self, endpoint: str) -> JSONDataMap:
raw_data = delete(
self.base_url + endpoint,
headers={
"accept": "application/json",
"content-type": "application/json",
"Authorization": self.auth_key,
},
)
return loads(raw_data)
def get_tasks(self, **filters: str) -> list[ClickupTask]:
data = self._get( data = self._get(
f"/team/{self.workspace_id}/task", f"/team/{self.workspace_id}/task?subtasks=true&include_markdown_description=true&assignees[]={self.user_id}"
**{ )
"subtask": "true", return [ClickupTask.model_validate(t) for t in data["tasks"]]
"include_markdown_description": "true",
"assignees[]": self.user_id,
}
| filters,
).get("tasks", [])
if isinstance(data, list):
return [ClickupTask.from_resp_data(t) for t in data if isinstance(t, dict)]
return []
def get_task(self, task_id: str) -> ClickupTask: def get_task(self, task_id: str) -> ClickupTask:
return ClickupTask.from_resp_data( return ClickupTask.model_validate(
self._get( self._get(f"/task/{task_id}?include_markdown_description=true")
f"/task/{task_id}",
include_markdown_description="true",
),
) )
def update(self, data: ClickupTask) -> None: def update(self, data: ClickupTask) -> None:
ret_task = self._put(f"/task/{data.id}", **data.updateables) _ = self._put(f"/task/{data.id}", **data.updateables)
current_tags: set[str] = set(ret_task["tags"])
new_tags = set(data.tags)
for del_tag in current_tags - new_tags:
self._delete(f"/task/{data.id}/tag/{del_tag}")
for add_tag in new_tags - current_tags:
self._post(f"/task/{data.id}/tag/{add_tag}")
def create(self, data: ClickupTask) -> str:
ret_task = self._post(f"/list/{data.parent_list}/task", **data.updateables)
return str(ret_task["id"])
-24
View File
@@ -1,24 +0,0 @@
from dataclasses import dataclass
from os import environ
@dataclass
class EnvVar:
"""
Environment Variable fetcher for use in dataclass ``field(default_factory=...)``
>>> @dataclass
>>> class SomeDataclass:
... field_name: str = field(default_factory=EnvVar("SOME_VAR_NAME", "err msg"))
"""
var_name: str
err_msg: str = ""
def __call__(self) -> str:
try:
return environ[self.var_name]
except KeyError as e:
e.add_note(self.err_msg)
raise
+4 -8
View File
@@ -1,8 +1,4 @@
from __future__ import annotations type JSONDataScalar = str | None | float | bool
from typing import TypeAlias type JSONDataList = list[JSONDataScalar | "JSONDataMap" | "JSONDataList"]
type JSONDataMap = dict[str, JSONDataScalar | JSONDataList | "JSONDataMap"]
type JSONData = JSONDataMap | JSONDataList
JSONDataScalar: TypeAlias = str | None | float | bool
JSONDataList: TypeAlias = list["JSONDataScalar | JSONDataMap | JSONDataList"]
JSONDataMap: TypeAlias = dict[str, "JSONDataScalar | JSONDataList | JSONDataMap"]
JSONData: TypeAlias = "JSONDataMap | JSONDataList"
+20 -76
View File
@@ -1,11 +1,9 @@
from collections.abc import Callable, Sequence from collections.abc import Callable, Sequence
from pathlib import Path
from typing import Any
from pynvim import Nvim, command, plugin from pynvim import Nvim, command, plugin
from pynvim.api import Buffer from pynvim.api import Buffer
from .clickup import ClickupSession, ClickupTask from .clickup import ClickupSession
from .hints import JSONData from .hints import JSONData
from .yaml import load, dump from .yaml import load, dump
@@ -30,20 +28,18 @@ class MrPyPlugin:
"assignees": usernames_from_objs, "assignees": usernames_from_objs,
} }
def select_task_id(self, tags: set[str] | None = None) -> None: def select_task_id(self) -> None:
tasks = self.clickup.get_tasks(**({"tags[]": "&tags[]=".join(tags)} if tags else {})) tasks = self.clickup.get_tasks()
task_names_by_id = [ task_names_by_id = [
{ {
"idx": tix + 1, "idx": tix + 1,
"id": t.id, "id": t.id,
"text": t.short,
"name": t.name, "name": t.name,
"tags": t.tags,
"status": t.status, "status": t.status,
"is_child": bool(t.parent), "is_child": bool(t.parent),
"preview": { "preview": {
"text": dump(t.showables), "text": f"---\n{dump(t.showables)}---\n{t.markdown_description}",
"ft": "yaml", "ft": "markdown",
}, },
"action": f":Mrpy {t.id}", "action": f":Mrpy {t.id}",
} }
@@ -74,12 +70,8 @@ class MrPyPlugin:
ret[#ret + 1] = { item.name, hl } ret[#ret + 1] = { item.name, hl }
for _, v in ipairs(item.tags) do
ret[#ret + 1] = { " #" .. v, "SnacksPickerComment" }
end
ret[#ret + 1] = { ret[#ret + 1] = {
" 󰻾" .. item.id, " (#" .. item.id .. ")",
"SnacksPickerComment" "SnacksPickerComment"
} }
return ret return ret
@@ -99,36 +91,34 @@ class MrPyPlugin:
task = self.clickup.get_task(task_id) task = self.clickup.get_task(task_id)
temp_buf: Buffer = self.nvim.api.create_buf(True, False) temp_buf: Buffer = self.nvim.api.create_buf(True, False)
self.nvim.api.buf_set_name(temp_buf, f"[ClickUp] {task.name}") self.nvim.api.buf_set_name(temp_buf, f"[ClickUp] {task.name}")
self.nvim.buffers[temp_buf.number].options["filetype"] = "yaml" self.nvim.buffers[temp_buf.number].options["filetype"] = "markdown"
self.nvim.api.create_autocmd( self.nvim.api.create_autocmd(
["BufWriteCmd"], ["BufWriteCmd"],
{"buffer": temp_buf.number, "command": "MrpyPush " + str(temp_buf.number)}, {"buffer": temp_buf.number, "command": "MrpyPush " + str(temp_buf.number)},
) )
content = [ content = ["---"]
f"# yaml-language-server: $schema={Path(__file__).parent.parent.parent.parent}/schema.json ",
"---",
]
content.extend( content.extend(
dump( dump(
task.showables, task.showables,
).splitlines() ).splitlines()
) )
content.append("---")
content.extend(task.markdown_description.splitlines())
self.nvim.api.buf_set_lines(temp_buf, 0, 0, False, content) self.nvim.api.buf_set_lines(temp_buf, 0, 0, False, content)
self.nvim.buffers[temp_buf.number].options["modified"] = False self.nvim.buffers[temp_buf.number].options["modified"] = False
self.nvim.api.win_set_buf(0, temp_buf) self.nvim.api.win_set_buf(0, temp_buf)
@command("Mrpy", nargs="*") @command("Mrpy", nargs="?")
def entry(self, args: Sequence[str] = ()) -> None: def entry(self, args: Sequence[str] = ()) -> None:
match args: match args:
case (str() as task_id,): case (str() as task_id,):
try: self.open_task_buffer(task_id)
self.open_task_buffer(task_id) case ():
except: self.select_task_id()
self.select_task_id({task_id}) case _:
case tags: pass
self.select_task_id(set(tags))
@command("MrpyPush", nargs="?") @command("MrpyPush", nargs="?")
def on_vbuf_write(self, args: Sequence[str] = ()) -> None: def on_vbuf_write(self, args: Sequence[str] = ()) -> None:
@@ -142,60 +132,14 @@ class MrPyPlugin:
try: try:
a = self.nvim.buffers[buf_no][0:-1] a = self.nvim.buffers[buf_no][0:-1]
fm = "\n".join(a) _, fm, *text = "\n".join(a).split("---\n")
data = load(fm) data = load(fm)
assert isinstance(data, dict) assert isinstance(data, dict)
data["markdown_content"] = data["markdown_content"].strip() + "\n" data["markdown_description"] = "\n\n".join(text)
self.clickup.update(ClickupTask(**data)) self.nvim.err_write(str(data) + "\n")
# self.clickup.update(data)
self.nvim.buffers[buf_no].options["modified"] = False self.nvim.buffers[buf_no].options["modified"] = False
except ValueError: except ValueError:
pass pass
@command("MrpyNew", nargs=0)
def on_new_task(self) -> None:
task = ClickupTask(name="", status="backlog", markdown_content="", parent_list="")
temp_buf: Buffer = self.nvim.api.create_buf(True, False)
self.nvim.api.buf_set_name(temp_buf, f"[ClickUp] New Task")
self.nvim.buffers[temp_buf.number].options["filetype"] = "yaml"
self.nvim.api.create_autocmd(
["BufWriteCmd"],
{"buffer": temp_buf.number, "command": "MrpyPushNew " + str(temp_buf.number)},
)
content = [
f"# yaml-language-server: $schema={Path(__file__).parent.parent.parent.parent}/schema.json ",
"---",
]
content.extend(
dump(
task.showables,
).splitlines()
)
self.nvim.api.buf_set_lines(temp_buf, 0, 0, False, content)
self.nvim.buffers[temp_buf.number].options["modified"] = False
self.nvim.api.win_set_buf(0, temp_buf)
@command("MrpyPushNew", nargs=1)
def on_new_vbuf_write(self, buf_no: Sequence[Any] = ()) -> None:
buf_no = int(buf_no[0])
try:
a = self.nvim.buffers[buf_no][0:-1]
fm = "\n".join(a)
data = load(fm)
assert isinstance(data, dict)
data["markdown_content"] = data["markdown_content"].strip() + "\n"
new_task = ClickupTask(**data)
new_task_id = self.clickup.create(new_task)
self.nvim.buffers[buf_no].options["modified"] = False
self.nvim.command("bdelete " + str(buf_no))
self.entry([new_task_id])
except ValueError:
pass
-64
View File
@@ -1,64 +0,0 @@
from json import dumps
from ssl import SSLContext, create_default_context
from urllib.error import HTTPError
from urllib.request import Request, urlopen
from urllib.parse import urlencode
def request(
url: str,
method: str,
query_params: dict[str, str] | None = None,
data: dict[str, str] | None = None,
headers: dict[str, str] | None = None,
) -> str:
try:
with urlopen(
Request(
url
+ (
("?" + "&".join(f"{k}={v}" for k, v in query_params.items()))
if query_params
else ""
),
dumps(data).encode() if data else None,
headers=headers or {},
method=method,
),
context=create_default_context(),
) as resp:
return resp.read().decode("utf-8")
except HTTPError as e:
e.add_note(e.url)
raise
def get(
url: str,
query_params: dict[str, str] | None = None,
headers: dict[str, str] | None = None,
) -> str:
return request(url, "GET", query_params=query_params, headers=headers)
def post(
url: str,
data: dict[str, str] | None = None,
headers: dict[str, str] | None = None,
) -> str:
return request(url, "POST", data=data, headers=headers)
def put(
url: str,
data: dict[str, str] | None = None,
headers: dict[str, str] | None = None,
) -> str:
return request(url, "PUT", data=data, headers=headers)
def delete(
url: str,
headers: dict[str, str] | None = None,
) -> str:
return request(url, "DELETE", headers=headers)
+7 -49
View File
@@ -14,44 +14,20 @@ def dump_scalar(entry: hints.JSONDataScalar) -> str:
case False: case False:
return "false\n" return "false\n"
case _: case _:
return f"{entry}\n" return f"{entry:s}\n"
def dump(obj: hints.JSONDataMap) -> str: def dump(obj: dict[str, hints.JSONDataScalar | list[hints.JSONDataScalar]]) -> str:
ret = "" ret = ""
for key, value in obj.items(): for key, value in obj.items():
ret += key ret += key
ret += ":" ret += ":"
match value: match value:
case []:
ret += " []\n"
case {}:
ret += " {}\n"
case "":
ret += ' ""\n'
case list() as entries: case list() as entries:
ret += "\n" ret += "\n"
for entry in entries: for entry in entries:
match entry: ret += f" - {dump_scalar(entry)}"
case dict():
subdump = dump(entry)
ret += "\n".join(" " + l for l in subdump.splitlines())
case list():
pass
case _:
ret += f" - {dump_scalar(entry)}"
case dict() as substruct:
ret += "\n"
subdump = dump(substruct)
ret += "\n".join(" " + l for l in subdump.splitlines()) + "\n"
case str() as mlstring if key == "markdown_content":
if mlstring.strip() == "" or dump_scalar(mlstring).strip() == "null":
ret += ' ""\n'
else:
indented = "\n".join((" " + l) for l in dump_scalar(mlstring).splitlines())
ret += f" >\n{indented}\n"
case entry: case entry:
ret += f" {dump_scalar(entry)}" ret += f" {dump_scalar(entry)}"
@@ -77,10 +53,6 @@ def load(content: str) -> hints.JSONData:
... key10: ... key10:
... - str ... - str
... - "str" ... - "str"
... key11: >
... * a
... * b:
... * c
... ''' ... '''
>>> yaml_parsed = load(yaml) >>> yaml_parsed = load(yaml)
>>> compare = { >>> compare = {
@@ -94,8 +66,8 @@ def load(content: str) -> hints.JSONData:
... 'key8': False, ... 'key8': False,
... 'key9': [1, 23.2], ... 'key9': [1, 23.2],
... 'key10': ['str', 'str'], ... 'key10': ['str', 'str'],
... 'key11': "* a\\n* b:\\n * c",
... } ... }
>>> {k: yaml_parsed.get(k, None) for k in compare if compare[k] != yaml_parsed[k]} >>> {k: yaml_parsed.get(k, None) for k in compare if compare[k] != yaml_parsed[k]}
{} {}
""" """
@@ -103,34 +75,20 @@ def load(content: str) -> hints.JSONData:
ret = {} ret = {}
key = None key = None
value = None value = None
mlstring = False for line in sub(r":[\s\n]*", ":\n ", dedent(content).strip()).splitlines():
for line in sub(r"(:( >)?)[\s\n]*", r"\1\n ", dedent(content).strip()).splitlines(): if line.endswith(":"):
if line.startswith("#") or line.strip() == "---":
continue
elif line.endswith(":"):
if key: if key:
ret[key] = value ret[key] = value
value = None value = None
mlstring = False
key = line.removesuffix(":") key = line.removesuffix(":")
elif line.endswith(": >"): elif line.startswith(" -"):
if key:
ret[key] = value
value = None
mlstring = True
key = line.removesuffix(": >")
elif line.startswith(" -") and not mlstring:
value = value or [] value = value or []
try: try:
parsed = loads(line.removeprefix(" -").strip()) parsed = loads(line.removeprefix(" -").strip())
except: except:
parsed = loads('"' + line.removeprefix(" -").strip() + '"') parsed = loads('"' + line.removeprefix(" -").strip() + '"')
value.append(parsed) value.append(parsed)
elif line.startswith(" ") and mlstring:
value = value or ""
value += line.removeprefix(" ") + "\n"
else: else:
try: try:
value = loads(line.strip()) value = loads(line.strip())
-44
View File
@@ -1,44 +0,0 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"description": "ETA yaml frontmatter",
"properties": {
"name": {
"type": "string"
},
"status": {
"type": "string",
"enum": [
"backlog",
"selected for development",
"in progress",
"in review",
"on hold",
"done",
"closed"
]
},
"markdown_content": {
"type": "string"
},
"id": {
"type": "string",
"pattern": "^[a-z0-9]+$"
},
"parent_list": {
"anyOf": [
{
"const": 900400316794,
"description": "OPE Tooling"
}
]
}
},
"required": [
"name",
"status",
"markdown_content",
"parent_list"
],
"title": "clickup task",
"type": "object"
}