Compare commits
15 Commits
b874441f06
..
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 7c6c3d7223 | |||
| 518f2df5c8 | |||
| 975dd39437 | |||
| 8dc1d080d8 | |||
| 82afdef873 | |||
| 14e757dd77 | |||
| 91fcb96619 | |||
| d6fc41fe05 | |||
| 93a39ad889 | |||
| 2dca8c833c | |||
| e2431f2b21 | |||
| 0407393eda | |||
| da494835cb | |||
| f82086b2e5 | |||
| 615b696fda |
@@ -1,3 +1,42 @@
|
|||||||
# mrpy.nvim
|
# mrpy.nvim
|
||||||
|
|
||||||
NeoVim plugin, integrating a clickup to gitlab workflow (1 task to 1 issue to 1 merge request)
|
NeoVim plugin, integrating a clickup to gitlab workflow (1 task to 1 issue to 1 merge request)
|
||||||
|
|
||||||
|
## Install
|
||||||
|
|
||||||
|
### Lazy.nvim
|
||||||
|
|
||||||
|
#### From Source
|
||||||
|
|
||||||
|
```lua
|
||||||
|
return {
|
||||||
|
dir = "<path/to/cloned/repo>",
|
||||||
|
dev = true,
|
||||||
|
dependencies = {
|
||||||
|
"nvim-lua/plenary.nvim",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
## Setup
|
||||||
|
|
||||||
|
### Shell Configuration
|
||||||
|
|
||||||
|
#### Environment Variables
|
||||||
|
|
||||||
|
CLICKUP_AUTH
|
||||||
|
: user api token for clickup
|
||||||
|
|
||||||
|
CLICKUP_USER_ID
|
||||||
|
: user id for clickup
|
||||||
|
|
||||||
|
CLICKUP_WORKSPACE_ID
|
||||||
|
: workspace id for clickup
|
||||||
|
|
||||||
|
|
||||||
|
## Commands
|
||||||
|
|
||||||
|
### `:MrPy`
|
||||||
|
|
||||||
|
- start task selector of user's tasks (not done/closed)
|
||||||
|
|
||||||
|
|||||||
@@ -0,0 +1,68 @@
|
|||||||
|
from dataclasses import dataclass, field
|
||||||
|
from json import loads
|
||||||
|
from pydantic import BaseModel
|
||||||
|
from requests import HTTPError, get
|
||||||
|
|
||||||
|
from .common import EnvVar, JSONDataMap, JSONData, JSONDataList, JSONDataScalar
|
||||||
|
|
||||||
|
|
||||||
|
class ClickupTask(BaseModel):
|
||||||
|
id: str
|
||||||
|
name: str
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class ClickupSession:
|
||||||
|
auth_key: str = field(
|
||||||
|
default_factory=EnvVar(
|
||||||
|
"CLICKUP_AUTH",
|
||||||
|
"clickup auth token is required to be set",
|
||||||
|
)
|
||||||
|
)
|
||||||
|
workspace_id: str = field(
|
||||||
|
default_factory=EnvVar(
|
||||||
|
"CLICKUP_WORKSPACE_ID",
|
||||||
|
"clickup workspace id is required to be set",
|
||||||
|
)
|
||||||
|
)
|
||||||
|
user_id: str = field(
|
||||||
|
default_factory=EnvVar(
|
||||||
|
"CLICKUP_USER_ID",
|
||||||
|
"clickup user id is required to be set",
|
||||||
|
)
|
||||||
|
)
|
||||||
|
base_url: str = "https://api.clickup.com/api/v2"
|
||||||
|
|
||||||
|
def _get(self, endpoint: str, **query_params: str) -> JSONDataMap:
|
||||||
|
with get(
|
||||||
|
self.base_url + endpoint,
|
||||||
|
query_params,
|
||||||
|
headers={
|
||||||
|
"accept": "application/json",
|
||||||
|
"Authorization": self.auth_key,
|
||||||
|
},
|
||||||
|
) as resp:
|
||||||
|
return resp.json()
|
||||||
|
|
||||||
|
def get_tasks(self, **filters: str) -> list[ClickupTask]:
|
||||||
|
data = self._get(
|
||||||
|
f"/team/{self.workspace_id}/task",
|
||||||
|
**{
|
||||||
|
"subtasks": "true",
|
||||||
|
"include_markdown_description": "true",
|
||||||
|
"assignees[]": self.user_id,
|
||||||
|
}
|
||||||
|
| filters,
|
||||||
|
).get("tasks", [])
|
||||||
|
if isinstance(data, list):
|
||||||
|
return [ClickupTask.model_validate(t) for t in data if isinstance(t, dict)]
|
||||||
|
|
||||||
|
return []
|
||||||
|
|
||||||
|
def get_task(self, task_id: str) -> ClickupTask:
|
||||||
|
return ClickupTask.model_validate(
|
||||||
|
self._get(
|
||||||
|
f"/task/{task_id}",
|
||||||
|
include_markdown_description="true",
|
||||||
|
),
|
||||||
|
)
|
||||||
@@ -0,0 +1,32 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from os import environ
|
||||||
|
from typing import TypeAlias
|
||||||
|
|
||||||
|
|
||||||
|
JSONDataScalar: TypeAlias = str | None | float | bool
|
||||||
|
JSONDataList: TypeAlias = list["JSONDataScalar | JSONDataMap | JSONDataList"]
|
||||||
|
JSONDataMap: TypeAlias = dict[str, "JSONDataScalar | JSONDataList | JSONDataMap"]
|
||||||
|
JSONData: TypeAlias = "JSONDataMap | JSONDataList"
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class EnvVar:
|
||||||
|
"""
|
||||||
|
Environment Variable fetcher for use in dataclass ``field(default_factory=...)``
|
||||||
|
|
||||||
|
>>> @dataclass
|
||||||
|
>>> class SomeDataclass:
|
||||||
|
... field_name: str = field(default_factory=EnvVar("SOME_VAR_NAME", "err msg"))
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
var_name: str
|
||||||
|
err_msg: str = ""
|
||||||
|
|
||||||
|
def __call__(self) -> str:
|
||||||
|
try:
|
||||||
|
return environ[self.var_name]
|
||||||
|
except KeyError as e:
|
||||||
|
e.add_note(self.err_msg)
|
||||||
|
raise
|
||||||
+90
@@ -0,0 +1,90 @@
|
|||||||
|
from lsprotocol.types import (
|
||||||
|
TEXT_DOCUMENT_COMPLETION,
|
||||||
|
TEXT_DOCUMENT_DOCUMENT_SYMBOL,
|
||||||
|
CompletionItem,
|
||||||
|
CompletionItemKind,
|
||||||
|
CompletionItemLabelDetails,
|
||||||
|
CompletionOptions,
|
||||||
|
CompletionParams,
|
||||||
|
DocumentSymbol,
|
||||||
|
DocumentSymbolParams,
|
||||||
|
NotebookDocumentSyncOptions,
|
||||||
|
Position,
|
||||||
|
Range,
|
||||||
|
SymbolKind,
|
||||||
|
TextDocumentSyncKind,
|
||||||
|
WorkDoneProgressBegin,
|
||||||
|
WorkDoneProgressEnd,
|
||||||
|
WorkDoneProgressReport,
|
||||||
|
)
|
||||||
|
from pygls.lsp.server import LanguageServer
|
||||||
|
|
||||||
|
from .clickup import ClickupSession, ClickupTask
|
||||||
|
|
||||||
|
|
||||||
|
class CustomServer(LanguageServer):
|
||||||
|
cache: dict[str, ClickupTask]
|
||||||
|
cu_session: ClickupSession
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
name: str,
|
||||||
|
version: str,
|
||||||
|
text_document_sync_kind: TextDocumentSyncKind = TextDocumentSyncKind.Incremental,
|
||||||
|
notebook_document_sync: NotebookDocumentSyncOptions | None = None,
|
||||||
|
) -> None:
|
||||||
|
super().__init__(name, version, text_document_sync_kind, notebook_document_sync)
|
||||||
|
self.cache = {}
|
||||||
|
self.cu_session = ClickupSession()
|
||||||
|
self.update_task_cache()
|
||||||
|
|
||||||
|
def update_task_cache(self) -> None:
|
||||||
|
self.protocol.progress.begin(
|
||||||
|
"startup", WorkDoneProgressBegin("Fetching Cache ...", percentage=0, cancellable=True)
|
||||||
|
)
|
||||||
|
self.cache = {}
|
||||||
|
tasks = self.cu_session.get_ta()
|
||||||
|
for ti, t in enumerate(tasks):
|
||||||
|
self.cache[t.id] = t
|
||||||
|
self.protocol.progress.report(
|
||||||
|
"startup",
|
||||||
|
WorkDoneProgressReport(
|
||||||
|
message="Fetched Cache", percentage=int(100 * (1 + ti) / len(tasks))
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
self.protocol.progress.end("startup", WorkDoneProgressEnd(message="Done Caching"))
|
||||||
|
|
||||||
|
|
||||||
|
server = CustomServer("mrpy-server", "0.1.0")
|
||||||
|
|
||||||
|
|
||||||
|
@server.feature(TEXT_DOCUMENT_DOCUMENT_SYMBOL)
|
||||||
|
async def list_ids(params: DocumentSymbolParams) -> list[DocumentSymbol]:
|
||||||
|
return [
|
||||||
|
DocumentSymbol(
|
||||||
|
t.id,
|
||||||
|
SymbolKind.Enum,
|
||||||
|
Range(Position(i, 0), Position(i, 0)),
|
||||||
|
Range(Position(i, 0), Position(i, 0)),
|
||||||
|
detail=t.name,
|
||||||
|
)
|
||||||
|
for i, t in enumerate(server.cache.values())
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
@server.feature(TEXT_DOCUMENT_COMPLETION)
|
||||||
|
async def complete_cu_ids(params: CompletionParams) -> list[CompletionItem]:
|
||||||
|
return [
|
||||||
|
CompletionItem(
|
||||||
|
t.name,
|
||||||
|
CompletionItemLabelDetails(detail=f" #{t.id}"),
|
||||||
|
kind=CompletionItemKind.Constant,
|
||||||
|
insert_text=f"[{t.name} #{t.id}](https://app.clickup.com/t/{t.id})",
|
||||||
|
)
|
||||||
|
for t in server.cache.values()
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def main() -> None:
|
||||||
|
server.start_io()
|
||||||
@@ -0,0 +1,103 @@
|
|||||||
|
local curl = require("plenary.curl")
|
||||||
|
local helpers = require("eta.helpers")
|
||||||
|
local M = {}
|
||||||
|
|
||||||
|
---@class eta.clickup.Session: eta.Session
|
||||||
|
---@field user string
|
||||||
|
---@field workspace string
|
||||||
|
|
||||||
|
---@class eta.clickup.Ref
|
||||||
|
---@field name string
|
||||||
|
---@field id string
|
||||||
|
|
||||||
|
---@class eta.clickup.Dep
|
||||||
|
---@field task_id string
|
||||||
|
---@field depends_on string
|
||||||
|
|
||||||
|
---@class eta.clickup.Task
|
||||||
|
---@field id string
|
||||||
|
---@field name string
|
||||||
|
---@field tags? table[]
|
||||||
|
---@field locations? table[]
|
||||||
|
---@field list? eta.clickup.Ref
|
||||||
|
---@field parent? string | nil
|
||||||
|
---@field dependencies? eta.clickup.Dep[]
|
||||||
|
---@field [string] string
|
||||||
|
|
||||||
|
---@param self eta.clickup.Session
|
||||||
|
---@return eta.clickup.Task[]
|
||||||
|
M.latest_tasks = function(self)
|
||||||
|
local ret = helpers.request("get", self, "/team/" .. self.workspace .. "/task",{subtasks="true", include_markdown_description="true", ['assignees[]']= self.user})
|
||||||
|
if ret then
|
||||||
|
return ret.tasks
|
||||||
|
end
|
||||||
|
return {}
|
||||||
|
end
|
||||||
|
|
||||||
|
---@param self eta.clickup.Session
|
||||||
|
---@param id string
|
||||||
|
---@return eta.clickup.Task
|
||||||
|
M.task = function(self, id)
|
||||||
|
local ret = helpers.request("get", self, "/task/" .. id,{include_markdown_description="true"}) or {}
|
||||||
|
return ret
|
||||||
|
end
|
||||||
|
|
||||||
|
---@param self eta.clickup.Session
|
||||||
|
---@param id string
|
||||||
|
M.task_relations = function(self, id)
|
||||||
|
local ret = helpers.request("get", self, "/task/" .. id .. "/dependency", {}) or {}
|
||||||
|
return ret
|
||||||
|
end
|
||||||
|
|
||||||
|
M.insert_ref = function()
|
||||||
|
local pos = vim.api.nvim_win_get_cursor(0)
|
||||||
|
local row = pos[1] - 1
|
||||||
|
local col = pos[2]
|
||||||
|
local tasks = M.latest_tasks(require("plugin.eta").clickup_session)
|
||||||
|
|
||||||
|
---@type SelectionItem[]
|
||||||
|
local items = {}
|
||||||
|
|
||||||
|
for tix, t in ipairs(tasks) do
|
||||||
|
if string.sub(t.name, -7, -1) ~= "Absence" then
|
||||||
|
local preview_frontmatter = ""
|
||||||
|
|
||||||
|
---@type SelectionItem
|
||||||
|
local prepared = {
|
||||||
|
idx = tix,
|
||||||
|
id = t.id,
|
||||||
|
text = t.name .. t.id .. t.markdown_description,
|
||||||
|
name = t.name,
|
||||||
|
tags = require("plugin.eta").retrieve_subkeys(t.tags, { "name" }),
|
||||||
|
status = t.status.status,
|
||||||
|
parent = t.parent,
|
||||||
|
list = t.list.name,
|
||||||
|
description = t.markdown_description,
|
||||||
|
preview = {
|
||||||
|
ft = "markdown",
|
||||||
|
},
|
||||||
|
action = M._on_select_task
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, k in ipairs({ "id", "name", "status", "tags", "parent" }) do
|
||||||
|
preview_frontmatter = preview_frontmatter .. "\n" .. k .. ": " .. vim.json.encode(prepared[k])
|
||||||
|
end
|
||||||
|
|
||||||
|
prepared.preview.text = "---" .. preview_frontmatter .. "\n---\n" .. t.markdown_description
|
||||||
|
table.insert(items, #items + 1, prepared)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
require("snacks.picker").pick({
|
||||||
|
title = "Select Task",
|
||||||
|
format = require("plugin.eta")._item_format,
|
||||||
|
preview = "preview",
|
||||||
|
confirm = function(picker, item)
|
||||||
|
picker:close()
|
||||||
|
vim.api.nvim_buf_set_text(0, row, col, row, col,
|
||||||
|
{ "[#" .. item.id .. "](https://app.clickup.com/t/" .. item.id .. ")" })
|
||||||
|
end,
|
||||||
|
items = items
|
||||||
|
})
|
||||||
|
end
|
||||||
|
return M
|
||||||
@@ -0,0 +1,149 @@
|
|||||||
|
local notify = require("snacks.notifier").notify
|
||||||
|
local helpers = require("eta.helpers")
|
||||||
|
local M = {}
|
||||||
|
|
||||||
|
---@class eta.gitlab.Session: eta.Session
|
||||||
|
---@field auth string personal access token `PRIVATE_TOKEN: <auth_token>`
|
||||||
|
|
||||||
|
---@class eta.gitlab.Namespace
|
||||||
|
---@field full_path string
|
||||||
|
|
||||||
|
---@class eta.gitlab.Project
|
||||||
|
---@field id number
|
||||||
|
---@field name string
|
||||||
|
---@field path_with_namespace string
|
||||||
|
---@field tag_list string[]
|
||||||
|
---@field text? string
|
||||||
|
---@field namespace eta.gitlab.Namespace
|
||||||
|
---@field preview? {ft: string, text: string}
|
||||||
|
|
||||||
|
---@class eta.gitlab.Milestone
|
||||||
|
---@field id number
|
||||||
|
---@field title string
|
||||||
|
|
||||||
|
---@class eta.gitlab.Assignee
|
||||||
|
---@field username string
|
||||||
|
---@field id number
|
||||||
|
|
||||||
|
---@class eta.gitlab.Issue
|
||||||
|
---@field id number
|
||||||
|
---@field milestone eta.gitlab.Milestone
|
||||||
|
---@field title string
|
||||||
|
---@field assignees eta.gitlab.Assignee[]
|
||||||
|
---@field description string
|
||||||
|
---@field labels string[]
|
||||||
|
|
||||||
|
|
||||||
|
---@param session eta.gitlab.Session
|
||||||
|
---@return eta.gitlab.Project[]
|
||||||
|
M.possible_projects = function(session)
|
||||||
|
return helpers.request("get", session, "/projects", {simple="true", per_page="100"}) or {}
|
||||||
|
end
|
||||||
|
|
||||||
|
---@param prj eta.gitlab.Project
|
||||||
|
M._project_format = function(prj)
|
||||||
|
local ret = {}
|
||||||
|
|
||||||
|
ret[#ret + 1] = { prj.namespace.full_path .. "/", "SnacksPickerComment" }
|
||||||
|
ret[#ret + 1] = { prj.name }
|
||||||
|
|
||||||
|
return ret
|
||||||
|
end
|
||||||
|
|
||||||
|
---@param self eta.gitlab.Session
|
||||||
|
---@return eta.gitlab.Project | nil
|
||||||
|
M.update_current_project = function(self)
|
||||||
|
local cmd = "git remote -v | grep fetch | cut -f2 | cut -d' ' -f1 | cut -d':' -f2"
|
||||||
|
local handle, _ = io.popen(cmd, 'r')
|
||||||
|
if not handle then return nil end
|
||||||
|
local repo = handle:read("*a")
|
||||||
|
handle:close()
|
||||||
|
|
||||||
|
local prjs = M.possible_projects(self)
|
||||||
|
for _, prj in ipairs(prjs) do
|
||||||
|
if prj.path_with_namespace == repo then
|
||||||
|
M.active_project = prj
|
||||||
|
notify("selected " .. prj.path_with_namespace, "info", { title = "ETA", style = 'fancy' })
|
||||||
|
return prj
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
---@type eta.gitlab.Project
|
||||||
|
M.active_project = nil
|
||||||
|
|
||||||
|
---@param picker snacks.Picker
|
||||||
|
---@param item eta.gitlab.Project
|
||||||
|
M._on_project_select = function(picker, item)
|
||||||
|
M.active_project = item
|
||||||
|
picker:close()
|
||||||
|
notify("selected " .. item.path_with_namespace, "info", { title = "ETA", style = 'fancy' })
|
||||||
|
end
|
||||||
|
|
||||||
|
---@param self eta.gitlab.Session
|
||||||
|
M.select_project = function(self)
|
||||||
|
local prjs = M.possible_projects(self)
|
||||||
|
require("snacks.picker").pick({
|
||||||
|
title = "Select Project",
|
||||||
|
format = M._project_format,
|
||||||
|
preview = "preview",
|
||||||
|
confirm = M._on_project_select,
|
||||||
|
items = prjs
|
||||||
|
})
|
||||||
|
end
|
||||||
|
|
||||||
|
---@param self eta.gitlab.Session
|
||||||
|
---@return eta.gitlab.Issue[]
|
||||||
|
M.active_issues = function(self)
|
||||||
|
if not M.active_project then
|
||||||
|
if not M.update_current_project(self) then
|
||||||
|
M.select_project(self)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
return request("get", self, "/projects/" .. tostring(M.active_project.id) .. "/issues", {})
|
||||||
|
end
|
||||||
|
|
||||||
|
---@param self eta.gitlab.Session
|
||||||
|
---@param name string
|
||||||
|
---@param clickup_task_id string
|
||||||
|
---@param tags string[]
|
||||||
|
M.new_issue = function(self, name, clickup_task_id, tags)
|
||||||
|
if not M.active_project then
|
||||||
|
if not M.update_current_project(self) then
|
||||||
|
M.select_project(self)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
local description = "%23" .. clickup_task_id
|
||||||
|
local title = name:gsub("%s", "%20")
|
||||||
|
|
||||||
|
return request("post", self, "/projects/" ..
|
||||||
|
tostring(M.active_project.id) ..
|
||||||
|
"/issues", {
|
||||||
|
title=title, description=description, labels=table.concat(tags, ",")
|
||||||
|
})
|
||||||
|
|
||||||
|
end
|
||||||
|
|
||||||
|
---@param self eta.gitlab.Session
|
||||||
|
---@param name string
|
||||||
|
---@param issue number
|
||||||
|
---@param clickup_task_id string
|
||||||
|
---@param tags string[]
|
||||||
|
M.new_mr = function(self, name, issue, clickup_task_id, tags)
|
||||||
|
if not M.active_project then
|
||||||
|
if not M.update_current_project(self) then
|
||||||
|
M.select_project(self)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
local description = "Closes %23" .. tostring(issue) .. " | %23" .. clickup_task_id
|
||||||
|
local title = name:gsub("%s", "%20")
|
||||||
|
|
||||||
|
return request("post", self, "/projects/" .. tostring(M.active_project.id) .. "/merge_requests", {
|
||||||
|
title=title, description=description, labels=table.concat(tags, ",")
|
||||||
|
})
|
||||||
|
end
|
||||||
|
|
||||||
|
return M
|
||||||
@@ -0,0 +1,63 @@
|
|||||||
|
local curl = require("plenary.curl")
|
||||||
|
local M = {}
|
||||||
|
|
||||||
|
---@class eta.Session
|
||||||
|
---@field auth string
|
||||||
|
---@field base_url string
|
||||||
|
|
||||||
|
---@param method `get` | `post`
|
||||||
|
---@param session eta.Session
|
||||||
|
---@param endpoint string
|
||||||
|
---@param params {[string]: string}
|
||||||
|
---@returns table | nil
|
||||||
|
M.request = function(method, session, endpoint, params)
|
||||||
|
local param_list = {}
|
||||||
|
for param_name, param_value in pairs(params) do
|
||||||
|
table.insert(param_list, param_name .. "=" .. param_value)
|
||||||
|
end
|
||||||
|
local url = session.base_url .. endpoint
|
||||||
|
if #param_list then
|
||||||
|
url = url .. "?" .. table.concat(param_list, "&")
|
||||||
|
end
|
||||||
|
local resp = nil
|
||||||
|
if method == "get" then
|
||||||
|
resp = curl.get(url, {headers = {
|
||||||
|
-- ['PRIVATE-TOKEN'] = session.auth,
|
||||||
|
['Authorization'] = session.auth,
|
||||||
|
["content-type"] = "application/json",
|
||||||
|
}})
|
||||||
|
elseif method == "post" then
|
||||||
|
resp = curl.post(url, {headers = {
|
||||||
|
-- ['PRIVATE-TOKEN'] = session.auth,
|
||||||
|
['Authorization'] = session.auth,
|
||||||
|
["content-type"] = "application/json",
|
||||||
|
}})
|
||||||
|
else
|
||||||
|
return nil
|
||||||
|
end
|
||||||
|
|
||||||
|
if resp.status == 200 then
|
||||||
|
local prjs = vim.json.decode(resp.body)
|
||||||
|
for _, prj in ipairs(prjs) do
|
||||||
|
local fcmd = "echo '" .. vim.json.encode(prj) .. "' | jq"
|
||||||
|
local fhandle, _ = io.popen(fcmd, 'r')
|
||||||
|
if not fhandle then goto continue end
|
||||||
|
local formatted = fhandle:read("*a")
|
||||||
|
fhandle:close()
|
||||||
|
|
||||||
|
prj.text = prj.path_with_namespace
|
||||||
|
prj.preview = {
|
||||||
|
ft = "json",
|
||||||
|
text = formatted
|
||||||
|
}
|
||||||
|
|
||||||
|
::continue::
|
||||||
|
end
|
||||||
|
return prjs
|
||||||
|
end
|
||||||
|
|
||||||
|
print("failed http request: " .. tostring(resp.status) .. " (" .. resp.body .. ", " .. url .. ")")
|
||||||
|
return nil
|
||||||
|
end
|
||||||
|
|
||||||
|
return M
|
||||||
@@ -0,0 +1,319 @@
|
|||||||
|
local curl = require("plenary.curl")
|
||||||
|
local notify = require("snacks.notifier").notify
|
||||||
|
local gitlab = require("eta.gitlab")
|
||||||
|
local clickup = require("eta.clickup")
|
||||||
|
local M = {}
|
||||||
|
|
||||||
|
---@param data table
|
||||||
|
---@return string | nil
|
||||||
|
M.table_to_yaml = function(data)
|
||||||
|
local json = vim.json.encode(data)
|
||||||
|
|
||||||
|
-- Nutzt 'yq' um JSON zu YAML zu konvertieren
|
||||||
|
local handle = io.popen("echo '" .. json .. "' | yq -y")
|
||||||
|
local result = handle:read("*a")
|
||||||
|
handle:close()
|
||||||
|
|
||||||
|
if not result then
|
||||||
|
print("Error: yq not installed or got invalid json data")
|
||||||
|
return nil
|
||||||
|
end
|
||||||
|
return result
|
||||||
|
end
|
||||||
|
|
||||||
|
---@param yaml_string string
|
||||||
|
---@return table | nil
|
||||||
|
M.yaml_to_table = function(yaml_string)
|
||||||
|
-- 1. YAML String in yq einspeisen und JSON ausgeben lassen
|
||||||
|
-- Das Flag -o=json sorgt für die Konvertierung
|
||||||
|
local temp_file = os.tmpname()
|
||||||
|
local temp_file_handle = io.open(temp_file, "w+b")
|
||||||
|
if not temp_file_handle then
|
||||||
|
print("Error: could not open temp file")
|
||||||
|
print(" temp_file: " .. temp_file)
|
||||||
|
return nil
|
||||||
|
end
|
||||||
|
temp_file_handle:write(yaml_string)
|
||||||
|
temp_file_handle:flush()
|
||||||
|
temp_file_handle:close()
|
||||||
|
|
||||||
|
local cmd = "cat " .. temp_file .. " | yq"
|
||||||
|
local handle, errres = io.popen(cmd, 'r')
|
||||||
|
local json_result = handle:read("*a")
|
||||||
|
handle:close()
|
||||||
|
|
||||||
|
|
||||||
|
-- 2. Fehlerprüfung
|
||||||
|
if json_result == "" then
|
||||||
|
print("Error: yq not installed or got invalid yaml data")
|
||||||
|
print(" cmd: " .. cmd)
|
||||||
|
print(" err: " .. errres)
|
||||||
|
return nil
|
||||||
|
end
|
||||||
|
|
||||||
|
-- 3. JSON in Lua-Table umwandeln
|
||||||
|
local ok, table_result = pcall(vim.json.decode, json_result)
|
||||||
|
if not ok then
|
||||||
|
print("Error: failed to decode json")
|
||||||
|
print(" cmd: " .. cmd)
|
||||||
|
print(" json: " .. json_result)
|
||||||
|
return nil
|
||||||
|
end
|
||||||
|
|
||||||
|
os.remove(temp_file)
|
||||||
|
return table_result
|
||||||
|
end
|
||||||
|
|
||||||
|
---@type eta.clickup.Session
|
||||||
|
M.clickup_session = nil
|
||||||
|
|
||||||
|
---@type eta.gitlab.Session
|
||||||
|
M.gitlab_session = nil
|
||||||
|
|
||||||
|
|
||||||
|
---@param task eta.clickup.Task
|
||||||
|
---@return eta.clickup.Task
|
||||||
|
M._update_task = function(task)
|
||||||
|
local update_table = {}
|
||||||
|
|
||||||
|
for _, k in ipairs({ "name", "markdown_content", "status" }) do
|
||||||
|
if task[k] then
|
||||||
|
update_table[k] = task[k]
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
local resp = curl.put(M.clickup_session.base_url .. "/task/" .. task.id, {
|
||||||
|
headers = {
|
||||||
|
['Authorization'] = M.clickup_session.auth,
|
||||||
|
["accept"] = "application/json",
|
||||||
|
["content-type"] = "application/json",
|
||||||
|
},
|
||||||
|
body = vim.json.encode(update_table)
|
||||||
|
})
|
||||||
|
|
||||||
|
if resp.status ~= 200 then
|
||||||
|
error("failed to update " .. task.id .. "\n(" .. resp.body .. ")")
|
||||||
|
end
|
||||||
|
|
||||||
|
return vim.json.decode(resp.body)
|
||||||
|
end
|
||||||
|
|
||||||
|
---@param args vim.api.keyset.create_autocmd.callback_args
|
||||||
|
M._on_tempbuf_write = function(args)
|
||||||
|
local lines = vim.api.nvim_buf_get_lines(args.buf, 0, -1, false)
|
||||||
|
---@type string[]
|
||||||
|
local parts = {}
|
||||||
|
---@type string[]
|
||||||
|
local current = {}
|
||||||
|
for _, line in ipairs(lines) do
|
||||||
|
if line == "---" then
|
||||||
|
if #current then
|
||||||
|
table.insert(parts, table.concat(current, "\n"))
|
||||||
|
current = {}
|
||||||
|
end
|
||||||
|
else
|
||||||
|
table.insert(current, line)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
table.insert(parts, table.concat(current, "\n"))
|
||||||
|
|
||||||
|
---@type eta.clickup.Task
|
||||||
|
local data = M.yaml_to_table(parts[2]) or {}
|
||||||
|
|
||||||
|
data['markdown_content'] = string.gsub(parts[3], "%-%-%-\n", "")
|
||||||
|
|
||||||
|
M._update_task(data)
|
||||||
|
|
||||||
|
vim.api.nvim_set_option_value("modified", false, { buf = args.buf })
|
||||||
|
local notif_id = notify("updated task #" .. data.id, "info", { title = "ETA", style = 'fancy' })
|
||||||
|
end
|
||||||
|
|
||||||
|
---@class SelectionItem
|
||||||
|
---@field status string
|
||||||
|
---@field parent string | nil
|
||||||
|
---@field list string
|
||||||
|
---@field name string
|
||||||
|
---@field description string
|
||||||
|
---@field tags string[]
|
||||||
|
---@field id string
|
||||||
|
|
||||||
|
---@param picker snacks.Picker | nil
|
||||||
|
---@param item SelectionItem
|
||||||
|
M._on_select_task = function(picker, item)
|
||||||
|
local notif_id = notify("opening task #" .. item.id, "info", { title = "ETA", style = 'fancy' })
|
||||||
|
local new_name = "[ClickUp] " .. item.name
|
||||||
|
local new_buf_no = vim.api.nvim_create_buf(true, false)
|
||||||
|
|
||||||
|
local status, _ = pcall(vim.api.nvim_buf_set_name, new_buf_no, new_name)
|
||||||
|
if status then
|
||||||
|
-- vim.api.nvim_set_option_value("buftype", "nofile", { buf = new_buf_no })
|
||||||
|
vim.api.nvim_set_option_value("filetype", "markdown", { buf = new_buf_no })
|
||||||
|
vim.api.nvim_create_autocmd({ "BufWriteCmd" }, { buffer = new_buf_no, callback = M._on_tempbuf_write })
|
||||||
|
local content = { "---"}
|
||||||
|
local to_dump = {}
|
||||||
|
for _, k in ipairs({ "id", "name", "status", "tags", "list", "parent", "dependencies" }) do
|
||||||
|
to_dump[k] = item[k]
|
||||||
|
end
|
||||||
|
local yaml_string = M.table_to_yaml(to_dump) or ""
|
||||||
|
for line in yaml_string:gmatch("([^\n]*)\n?") do
|
||||||
|
table.insert(content, line)
|
||||||
|
end
|
||||||
|
-- table.insert(content, "}")
|
||||||
|
table.insert(content, "---")
|
||||||
|
for line in string.gmatch(item.description, "[^\r\n]+") do
|
||||||
|
table.insert(content, line)
|
||||||
|
end
|
||||||
|
vim.api.nvim_buf_set_lines(new_buf_no, 0, 0, false, content)
|
||||||
|
vim.api.nvim_set_option_value("modified", false, { buf = new_buf_no })
|
||||||
|
vim.api.nvim_win_set_buf(0, new_buf_no)
|
||||||
|
else
|
||||||
|
vim.api.nvim_buf_delete(new_buf_no, {})
|
||||||
|
if picker then
|
||||||
|
picker:close()
|
||||||
|
end
|
||||||
|
local to_open = vim.fn.bufnr(new_name)
|
||||||
|
vim.api.nvim_win_set_buf(0, to_open)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
---@param item SelectionItem
|
||||||
|
M._item_format = function(item, _)
|
||||||
|
local ret = {}
|
||||||
|
|
||||||
|
local hl = "SnacksPickerComment"
|
||||||
|
if item.status == "in progress" then
|
||||||
|
hl = "@method"
|
||||||
|
elseif item.status == "selected for development" then
|
||||||
|
hl = "@constant.builtin"
|
||||||
|
elseif item.status == "in review" then
|
||||||
|
hl = "@keyword"
|
||||||
|
elseif item.status == "done" then
|
||||||
|
hl = "@variable.builtin"
|
||||||
|
end
|
||||||
|
|
||||||
|
if item.parent ~= vim.NIL then
|
||||||
|
ret[#ret + 1] = { " ", "SnacksPickerComment" }
|
||||||
|
end
|
||||||
|
|
||||||
|
ret[#ret + 1] = { item.name, hl }
|
||||||
|
|
||||||
|
for _, v in ipairs(item.tags) do
|
||||||
|
ret[#ret + 1] = { " #" .. v, "SnacksPickerComment" }
|
||||||
|
end
|
||||||
|
|
||||||
|
ret[#ret + 1] = {
|
||||||
|
" " .. item.id,
|
||||||
|
"SnacksPickerComment"
|
||||||
|
}
|
||||||
|
return ret
|
||||||
|
end
|
||||||
|
|
||||||
|
---@param list table[]
|
||||||
|
---@param keys string[]
|
||||||
|
M.retrieve_subkeys = function(list, keys)
|
||||||
|
local ret = {}
|
||||||
|
local interm = list
|
||||||
|
|
||||||
|
for _, key in ipairs(keys) do
|
||||||
|
ret = {}
|
||||||
|
for _, item in ipairs(interm) do
|
||||||
|
table.insert(ret, item[key])
|
||||||
|
end
|
||||||
|
interm = ret
|
||||||
|
end
|
||||||
|
|
||||||
|
return ret
|
||||||
|
end
|
||||||
|
|
||||||
|
---@param id string
|
||||||
|
M.open_task = function(id)
|
||||||
|
if id:match("[a-z0-9]+") then
|
||||||
|
local notif_id = notify("opening #".. id, "info", { timeout = 1000, title = "ETA", style = 'fancy' })
|
||||||
|
local t = clickup.task(M.clickup_session, id)
|
||||||
|
---@type SelectionItem
|
||||||
|
local item = {
|
||||||
|
idx = nil,
|
||||||
|
id = t.id,
|
||||||
|
text = t.name .. t.id .. t.markdown_description,
|
||||||
|
name = t.name,
|
||||||
|
tags = M.retrieve_subkeys(t.tags, { "name" }),
|
||||||
|
status = t.status.status,
|
||||||
|
parent = t.parent,
|
||||||
|
list = t.list.name,
|
||||||
|
description = t.markdown_description,
|
||||||
|
preview = {
|
||||||
|
ft = "markdown",
|
||||||
|
},
|
||||||
|
action = nil
|
||||||
|
}
|
||||||
|
pcall(M._on_select_task,nil, item)
|
||||||
|
else
|
||||||
|
error("invalid id format")
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
---@param data vim.api.keyset.create_user_command.command_args
|
||||||
|
M.select_task = function(data)
|
||||||
|
local notif_id = notify("fetching tasks", "warn", { timeout = 10000, title = "ETA", style = 'fancy' })
|
||||||
|
if not pcall(M.open_task,vim.fn.expand("<cword>")) then
|
||||||
|
local tasks = clickup.latest_tasks(M.clickup_session)
|
||||||
|
|
||||||
|
---@type SelectionItem[]
|
||||||
|
local items = {}
|
||||||
|
|
||||||
|
for tix, t in ipairs(tasks) do
|
||||||
|
if string.sub(t.name, -7, -1) ~= "Absence" then
|
||||||
|
local preview_frontmatter = ""
|
||||||
|
|
||||||
|
local deps = {}
|
||||||
|
for _, dependency in ipairs(t.dependencies) do
|
||||||
|
table.insert(deps, dependency.depends_on)
|
||||||
|
end
|
||||||
|
|
||||||
|
---@type SelectionItem
|
||||||
|
local prepared = {
|
||||||
|
idx = tix,
|
||||||
|
id = t.id,
|
||||||
|
text = t.name .. t.id .. t.markdown_description,
|
||||||
|
name = t.name,
|
||||||
|
tags = M.retrieve_subkeys(t.tags, { "name" }),
|
||||||
|
status = t.status.status,
|
||||||
|
parent = t.parent,
|
||||||
|
list = t.list.name,
|
||||||
|
description = t.markdown_description,
|
||||||
|
dependencies = deps,
|
||||||
|
preview = {
|
||||||
|
ft = "markdown",
|
||||||
|
},
|
||||||
|
action = M._on_select_task
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, k in ipairs({ "id", "name", "status", "tags", "parent"}) do
|
||||||
|
preview_frontmatter = preview_frontmatter .. "\n" .. k .. ": " .. vim.json.encode(prepared[k])
|
||||||
|
end
|
||||||
|
|
||||||
|
prepared.preview.text = "---" .. preview_frontmatter .. "\n---\n" .. t.markdown_description
|
||||||
|
table.insert(items, #items + 1, prepared)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
require("snacks.notifier").hide(notif_id)
|
||||||
|
require("snacks.picker").pick({
|
||||||
|
title = "Select Task",
|
||||||
|
format = M._item_format,
|
||||||
|
preview = "preview",
|
||||||
|
confirm = M._on_select_task,
|
||||||
|
items = items
|
||||||
|
})
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
---@param data vim.api.keyset.create_user_command.command_args
|
||||||
|
M.mr_from_task = function(data)
|
||||||
|
if data.args then
|
||||||
|
notify(data.args, "info", { title = "ETA" })
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
|
||||||
|
return M
|
||||||
@@ -0,0 +1,18 @@
|
|||||||
|
local eta = require("eta")
|
||||||
|
local eta_clickup = require("eta.clickup")
|
||||||
|
|
||||||
|
vim.api.nvim_create_user_command("ETA", eta.select_task, {})
|
||||||
|
|
||||||
|
---@type eta.clickup.Session
|
||||||
|
eta.clickup_session = {
|
||||||
|
auth = os.getenv("CLICKUP_AUTH") or "",
|
||||||
|
user = os.getenv("CLICKUP_USER_ID") or "",
|
||||||
|
workspace = os.getenv("CLICKUP_WORKSPACE_ID") or "",
|
||||||
|
base_url = "https://api.clickup.com/api/v2",
|
||||||
|
}
|
||||||
|
|
||||||
|
---@type eta.gitlab.Session
|
||||||
|
eta.gitlab_session = {
|
||||||
|
auth = os.getenv("GITLAB_AUTH") or "",
|
||||||
|
base_url = "https://git.extoll.de/api/v4",
|
||||||
|
}
|
||||||
+20
-6
@@ -1,11 +1,25 @@
|
|||||||
|
[build-system]
|
||||||
|
requires = ['setuptools>=57', "wheel"]
|
||||||
|
build-backend = "setuptools.build_meta"
|
||||||
|
|
||||||
[project]
|
[project]
|
||||||
name = "mrpy-nvim"
|
name = "eta-lsp"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
description = "Add your description here"
|
description = "EXTOLL Task Access"
|
||||||
readme = "README.md"
|
readme = "README.md"
|
||||||
requires-python = ">=3.12"
|
requires-python = ">=3.11"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"pydantic-settings~=2.12",
|
"pygls~=2.0",
|
||||||
"pynvim~=0.6.0",
|
"pydantic~=2.12",
|
||||||
"requests>=2.32.5",
|
"requests~=2.32",
|
||||||
]
|
]
|
||||||
|
authors = [
|
||||||
|
{ name = "Patrick Nisble", email = "acereca@outlook.de"},
|
||||||
|
]
|
||||||
|
|
||||||
|
[project.scripts]
|
||||||
|
eta-lsp = "eta.main:main"
|
||||||
|
|
||||||
|
[tool.setuptools.packages.find]
|
||||||
|
include = ['mrpy']
|
||||||
|
namespaces = false
|
||||||
|
|||||||
+160
-85
@@ -1,97 +1,105 @@
|
|||||||
from dataclasses import dataclass, field
|
from collections.abc import Sequence
|
||||||
|
from dataclasses import dataclass, field, fields
|
||||||
from json import loads
|
from json import loads
|
||||||
from os import environ
|
from os import environ
|
||||||
from typing import Any, Callable
|
from typing import Any, Callable, Self, TypedDict
|
||||||
|
from urllib.error import HTTPError
|
||||||
|
|
||||||
from pydantic import AliasPath, BaseModel, Field, field_validator
|
from .hints import JSONDataMap
|
||||||
|
from .requests import delete, get, put, post
|
||||||
from urllib.request import Request, urlopen
|
from .env import EnvVar
|
||||||
|
|
||||||
|
|
||||||
def get(
|
class TaskRespCheckItemDict(TypedDict):
|
||||||
url: str,
|
name: str
|
||||||
query_params: dict[str, str] | None = None,
|
resolved: int
|
||||||
headers: dict[str, str] | None = None,
|
|
||||||
) -> str:
|
|
||||||
with urlopen(
|
|
||||||
Request(
|
|
||||||
url + ("?" + "&".join(f"{k}={v}" for k, v in query_params.items()))
|
|
||||||
if query_params
|
|
||||||
else "",
|
|
||||||
headers=headers or {},
|
|
||||||
method="GET",
|
|
||||||
),
|
|
||||||
) as resp:
|
|
||||||
return resp.read().decode("utf-8")
|
|
||||||
|
|
||||||
|
|
||||||
def put(
|
class TaskRespCheckDict(TypedDict):
|
||||||
url: str,
|
name: str
|
||||||
data: dict[str, str] | None = None,
|
items: list[TaskRespCheckItemDict]
|
||||||
headers: dict[str, str] | None = None,
|
|
||||||
) -> str:
|
|
||||||
with urlopen(
|
|
||||||
Request(
|
|
||||||
url,
|
|
||||||
str(data).encode(),
|
|
||||||
headers=headers or {},
|
|
||||||
method="PUT",
|
|
||||||
),
|
|
||||||
) as resp:
|
|
||||||
return resp.read().decode("utf-8")
|
|
||||||
|
|
||||||
|
|
||||||
class ClickupTask(BaseModel):
|
class TaskRespDict(TypedDict):
|
||||||
"""fields marked with `exclude=True` will not be pushed for updates"""
|
|
||||||
|
|
||||||
name: str
|
name: str
|
||||||
markdown_description: str
|
markdown_description: str
|
||||||
status: str = Field(validation_alias=AliasPath("status", "status"))
|
status: dict[str, str]
|
||||||
|
|
||||||
id: str = Field(exclude=True)
|
id: str
|
||||||
assignees: list[str] = Field(exclude=True)
|
assignees: list[dict[str, str]]
|
||||||
tags: list[str] = Field(exclude=True)
|
tags: list[dict[str, str]]
|
||||||
parent: str | None = Field(None, exclude=True)
|
parent: str | None
|
||||||
parent_list: str = Field(validation_alias=AliasPath("list", "id"), exclude=True)
|
locations: list[dict[str, str]]
|
||||||
locations: list[str] = Field(exclude=True)
|
checklists: list[TaskRespCheckDict]
|
||||||
checklists: dict[str, bool] = Field(exclude=True)
|
list: dict[str, str]
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(kw_only=True)
|
||||||
|
class ClickupTask:
|
||||||
|
"""fields marked with ``repr=False`` will not be pushed for updates"""
|
||||||
|
|
||||||
|
name: str
|
||||||
|
status: str
|
||||||
|
|
||||||
|
id: str | None = field(default=None, repr=False)
|
||||||
|
assignees: list[str] | None = field(default=None, repr=False)
|
||||||
|
tags: list[str] | None = field(default=None, repr=False)
|
||||||
|
parent_list: str = field(repr=False)
|
||||||
|
locations: list[str] | None = field(default=None, repr=False)
|
||||||
|
checklists: dict[str, dict[str, bool]] | None = field(default=None, repr=False)
|
||||||
|
parent: str | None = field(default=None, repr=False)
|
||||||
|
|
||||||
|
markdown_content: str
|
||||||
|
|
||||||
@field_validator("checklists", mode="before")
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def convert_checklists(cls, content: list[Any]) -> dict[str, bool]:
|
def from_resp_data(cls, resp_data_raw: dict[str, Any]) -> Self:
|
||||||
|
resp_data = TaskRespDict(**resp_data_raw)
|
||||||
|
|
||||||
|
return cls(
|
||||||
|
name=resp_data["name"],
|
||||||
|
markdown_content=resp_data["markdown_description"],
|
||||||
|
status=resp_data["status"]["status"],
|
||||||
|
id=resp_data["id"],
|
||||||
|
assignees=cls.convert_assignees(resp_data["assignees"]),
|
||||||
|
tags=cls.convert_simple_list_with_names(resp_data["tags"]),
|
||||||
|
parent=resp_data.get("parent"),
|
||||||
|
parent_list=resp_data["list"]["id"],
|
||||||
|
locations=cls.convert_simple_list_with_names(resp_data["locations"]),
|
||||||
|
checklists={
|
||||||
|
tlist["name"]: {t["name"]: bool(t["resolved"]) for t in tlist["items"]}
|
||||||
|
for tlist in resp_data["checklists"]
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def convert_checklists(cls, content: Sequence[Any]) -> dict[str, bool]:
|
||||||
return {
|
return {
|
||||||
entry["name"]: entry["resolved"]
|
entry["name"]: entry["resolved"]
|
||||||
for checklist in content
|
for checklist in content
|
||||||
for entry in checklist["items"]
|
for entry in checklist["items"]
|
||||||
}
|
}
|
||||||
|
|
||||||
@field_validator("assignees", mode="before")
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def convert_assignees(cls, content: list[str | dict[str, Any]]) -> list[str]:
|
def convert_assignees(cls, content: Sequence[str | dict[str, str]]) -> list[str]:
|
||||||
return [(e if isinstance(e, str) else e["username"]) for e in content]
|
return [(e if isinstance(e, str) else e["username"]) for e in content]
|
||||||
|
|
||||||
@field_validator("tags", "locations", mode="before")
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def convert_simple_list_with_names(
|
def convert_simple_list_with_names(
|
||||||
cls,
|
cls,
|
||||||
content: list[str | dict[str, Any]],
|
content: Sequence[str | dict[str, str]],
|
||||||
) -> list[str]:
|
) -> list[str]:
|
||||||
return [(e if isinstance(e, str) else e["name"]) for e in content]
|
return [(e if isinstance(e, str) else e["name"]) for e in content]
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def updateables(self) -> dict[str, Any]:
|
def updateables(self) -> dict[str, Any]:
|
||||||
return {
|
return {f.name: getattr(self, f.name, None) for f in fields(type(self)) if f.repr}
|
||||||
k: getattr(self, k, None)
|
|
||||||
for k, v in type(self).model_fields.items()
|
|
||||||
if not v.exclude
|
|
||||||
}
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def showables(self) -> dict[str, Any]:
|
def showables(self) -> dict[str, Any]:
|
||||||
return {
|
return {
|
||||||
k: getattr(self, k, None)
|
dcf.name: getattr(self, dcf.name, ...)
|
||||||
for k in type(self).model_fields
|
for dcf in fields(self)
|
||||||
if k != "markdown_description"
|
if getattr(self, dcf.name, ...) is not None
|
||||||
}
|
}
|
||||||
|
|
||||||
@property
|
@property
|
||||||
@@ -101,34 +109,54 @@ class ClickupTask(BaseModel):
|
|||||||
if self.parent:
|
if self.parent:
|
||||||
ret += " \033[32m "
|
ret += " \033[32m "
|
||||||
|
|
||||||
ret += f"{self.name} (#{self.id})"
|
ret += self.name
|
||||||
|
|
||||||
|
if self.tags:
|
||||||
|
ret += "".join(f" #{t}" for t in self.tags)
|
||||||
|
|
||||||
|
ret += f" (#{self.id})"
|
||||||
|
|
||||||
return ret
|
return ret
|
||||||
|
|
||||||
|
|
||||||
def get_env_var(var_name: str) -> Callable[[], str]:
|
|
||||||
return lambda var=var_name: environ[var]
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class ClickupSession:
|
class ClickupSession:
|
||||||
auth_key: str = field(default_factory=get_env_var("CLICKUP_AUTH"))
|
auth_key: str = field(
|
||||||
workspace_id: str = field(default_factory=get_env_var("CLICKUP_WORKSPACE_ID"))
|
default_factory=EnvVar(
|
||||||
user_id: str = field(default_factory=get_env_var("CLICKUP_USER_ID"))
|
"CLICKUP_AUTH",
|
||||||
|
"clickup auth token is required to be set",
|
||||||
|
)
|
||||||
|
)
|
||||||
|
workspace_id: str = field(
|
||||||
|
default_factory=EnvVar(
|
||||||
|
"CLICKUP_WORKSPACE_ID",
|
||||||
|
"clickup workspace id is required to be set",
|
||||||
|
)
|
||||||
|
)
|
||||||
|
user_id: str = field(
|
||||||
|
default_factory=EnvVar(
|
||||||
|
"CLICKUP_USER_ID",
|
||||||
|
"clickup user id is required to be set",
|
||||||
|
)
|
||||||
|
)
|
||||||
base_url: str = "https://api.clickup.com/api/v2"
|
base_url: str = "https://api.clickup.com/api/v2"
|
||||||
|
|
||||||
def _get(self, endpoint: str, **query_params: Any) -> dict[str, Any]:
|
def _get(self, endpoint: str, **query_params: str) -> JSONDataMap:
|
||||||
raw_data = get(
|
try:
|
||||||
self.base_url + endpoint,
|
raw_data = get(
|
||||||
query_params,
|
self.base_url + endpoint,
|
||||||
headers={
|
query_params,
|
||||||
"accept": "application/json",
|
headers={
|
||||||
"Authorization": self.auth_key,
|
"accept": "application/json",
|
||||||
},
|
"Authorization": self.auth_key,
|
||||||
)
|
},
|
||||||
return loads(raw_data)
|
)
|
||||||
|
return loads(raw_data)
|
||||||
|
except HTTPError as e:
|
||||||
|
e.add_note(e.url)
|
||||||
|
raise
|
||||||
|
|
||||||
def _put(self, endpoint: str, **body_params: Any) -> dict[str, Any]:
|
def _put(self, endpoint: str, **body_params: str) -> JSONDataMap:
|
||||||
raw_data = put(
|
raw_data = put(
|
||||||
self.base_url + endpoint,
|
self.base_url + endpoint,
|
||||||
body_params,
|
body_params,
|
||||||
@@ -140,16 +168,63 @@ class ClickupSession:
|
|||||||
)
|
)
|
||||||
return loads(raw_data)
|
return loads(raw_data)
|
||||||
|
|
||||||
def get_tasks(self) -> list[ClickupTask]:
|
def _post(self, endpoint: str, **body_params: str) -> JSONDataMap:
|
||||||
data = self._get(
|
raw_data = post(
|
||||||
f"/team/{self.workspace_id}/task?subtasks=true&include_markdown_description=true&assignees[]={self.user_id}"
|
self.base_url + endpoint,
|
||||||
|
body_params,
|
||||||
|
headers={
|
||||||
|
"accept": "application/json",
|
||||||
|
"content-type": "application/json",
|
||||||
|
"Authorization": self.auth_key,
|
||||||
|
},
|
||||||
)
|
)
|
||||||
return [ClickupTask.model_validate(t) for t in data["tasks"]]
|
return loads(raw_data)
|
||||||
|
|
||||||
|
def _delete(self, endpoint: str) -> JSONDataMap:
|
||||||
|
raw_data = delete(
|
||||||
|
self.base_url + endpoint,
|
||||||
|
headers={
|
||||||
|
"accept": "application/json",
|
||||||
|
"content-type": "application/json",
|
||||||
|
"Authorization": self.auth_key,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
return loads(raw_data)
|
||||||
|
|
||||||
|
def get_tasks(self, **filters: str) -> list[ClickupTask]:
|
||||||
|
data = self._get(
|
||||||
|
f"/team/{self.workspace_id}/task",
|
||||||
|
**{
|
||||||
|
"subtask": "true",
|
||||||
|
"include_markdown_description": "true",
|
||||||
|
"assignees[]": self.user_id,
|
||||||
|
}
|
||||||
|
| filters,
|
||||||
|
).get("tasks", [])
|
||||||
|
if isinstance(data, list):
|
||||||
|
return [ClickupTask.from_resp_data(t) for t in data if isinstance(t, dict)]
|
||||||
|
|
||||||
|
return []
|
||||||
|
|
||||||
def get_task(self, task_id: str) -> ClickupTask:
|
def get_task(self, task_id: str) -> ClickupTask:
|
||||||
return ClickupTask.model_validate(
|
return ClickupTask.from_resp_data(
|
||||||
self._get(f"/task/{task_id}?include_markdown_description=true")
|
self._get(
|
||||||
|
f"/task/{task_id}",
|
||||||
|
include_markdown_description="true",
|
||||||
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
def update(self, data: ClickupTask) -> None:
|
def update(self, data: ClickupTask) -> None:
|
||||||
_ = self._put(f"/task/{data.id}", **data.updateables)
|
ret_task = self._put(f"/task/{data.id}", **data.updateables)
|
||||||
|
current_tags: set[str] = set(ret_task["tags"])
|
||||||
|
new_tags = set(data.tags)
|
||||||
|
|
||||||
|
for del_tag in current_tags - new_tags:
|
||||||
|
self._delete(f"/task/{data.id}/tag/{del_tag}")
|
||||||
|
|
||||||
|
for add_tag in new_tags - current_tags:
|
||||||
|
self._post(f"/task/{data.id}/tag/{add_tag}")
|
||||||
|
|
||||||
|
def create(self, data: ClickupTask) -> str:
|
||||||
|
ret_task = self._post(f"/list/{data.parent_list}/task", **data.updateables)
|
||||||
|
return str(ret_task["id"])
|
||||||
|
|||||||
@@ -0,0 +1,24 @@
|
|||||||
|
from dataclasses import dataclass
|
||||||
|
from os import environ
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class EnvVar:
|
||||||
|
"""
|
||||||
|
Environment Variable fetcher for use in dataclass ``field(default_factory=...)``
|
||||||
|
|
||||||
|
>>> @dataclass
|
||||||
|
>>> class SomeDataclass:
|
||||||
|
... field_name: str = field(default_factory=EnvVar("SOME_VAR_NAME", "err msg"))
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
var_name: str
|
||||||
|
err_msg: str = ""
|
||||||
|
|
||||||
|
def __call__(self) -> str:
|
||||||
|
try:
|
||||||
|
return environ[self.var_name]
|
||||||
|
except KeyError as e:
|
||||||
|
e.add_note(self.err_msg)
|
||||||
|
raise
|
||||||
@@ -1,4 +1,8 @@
|
|||||||
type JSONDataScalar = str | None | float | bool
|
from __future__ import annotations
|
||||||
type JSONDataList = list[JSONDataScalar | "JSONDataMap" | "JSONDataList"]
|
from typing import TypeAlias
|
||||||
type JSONDataMap = dict[str, JSONDataScalar | JSONDataList | "JSONDataMap"]
|
|
||||||
type JSONData = JSONDataMap | JSONDataList
|
|
||||||
|
JSONDataScalar: TypeAlias = str | None | float | bool
|
||||||
|
JSONDataList: TypeAlias = list["JSONDataScalar | JSONDataMap | JSONDataList"]
|
||||||
|
JSONDataMap: TypeAlias = dict[str, "JSONDataScalar | JSONDataList | JSONDataMap"]
|
||||||
|
JSONData: TypeAlias = "JSONDataMap | JSONDataList"
|
||||||
|
|||||||
@@ -1,9 +1,11 @@
|
|||||||
from collections.abc import Callable, Sequence
|
from collections.abc import Callable, Sequence
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
from pynvim import Nvim, command, plugin
|
from pynvim import Nvim, command, plugin
|
||||||
from pynvim.api import Buffer
|
from pynvim.api import Buffer
|
||||||
|
|
||||||
from .clickup import ClickupSession
|
from .clickup import ClickupSession, ClickupTask
|
||||||
from .hints import JSONData
|
from .hints import JSONData
|
||||||
from .yaml import load, dump
|
from .yaml import load, dump
|
||||||
|
|
||||||
@@ -28,18 +30,20 @@ class MrPyPlugin:
|
|||||||
"assignees": usernames_from_objs,
|
"assignees": usernames_from_objs,
|
||||||
}
|
}
|
||||||
|
|
||||||
def select_task_id(self) -> None:
|
def select_task_id(self, tags: set[str] | None = None) -> None:
|
||||||
tasks = self.clickup.get_tasks()
|
tasks = self.clickup.get_tasks(**({"tags[]": "&tags[]=".join(tags)} if tags else {}))
|
||||||
task_names_by_id = [
|
task_names_by_id = [
|
||||||
{
|
{
|
||||||
"idx": tix + 1,
|
"idx": tix + 1,
|
||||||
"id": t.id,
|
"id": t.id,
|
||||||
|
"text": t.short,
|
||||||
"name": t.name,
|
"name": t.name,
|
||||||
|
"tags": t.tags,
|
||||||
"status": t.status,
|
"status": t.status,
|
||||||
"is_child": bool(t.parent),
|
"is_child": bool(t.parent),
|
||||||
"preview": {
|
"preview": {
|
||||||
"text": f"---\n{dump(t.showables)}---\n{t.markdown_description}",
|
"text": dump(t.showables),
|
||||||
"ft": "markdown",
|
"ft": "yaml",
|
||||||
},
|
},
|
||||||
"action": f":Mrpy {t.id}",
|
"action": f":Mrpy {t.id}",
|
||||||
}
|
}
|
||||||
@@ -70,8 +74,12 @@ class MrPyPlugin:
|
|||||||
|
|
||||||
ret[#ret + 1] = { item.name, hl }
|
ret[#ret + 1] = { item.name, hl }
|
||||||
|
|
||||||
|
for _, v in ipairs(item.tags) do
|
||||||
|
ret[#ret + 1] = { " #" .. v, "SnacksPickerComment" }
|
||||||
|
end
|
||||||
|
|
||||||
ret[#ret + 1] = {
|
ret[#ret + 1] = {
|
||||||
" (#" .. item.id .. ")",
|
" " .. item.id,
|
||||||
"SnacksPickerComment"
|
"SnacksPickerComment"
|
||||||
}
|
}
|
||||||
return ret
|
return ret
|
||||||
@@ -91,34 +99,36 @@ class MrPyPlugin:
|
|||||||
task = self.clickup.get_task(task_id)
|
task = self.clickup.get_task(task_id)
|
||||||
temp_buf: Buffer = self.nvim.api.create_buf(True, False)
|
temp_buf: Buffer = self.nvim.api.create_buf(True, False)
|
||||||
self.nvim.api.buf_set_name(temp_buf, f"[ClickUp] {task.name}")
|
self.nvim.api.buf_set_name(temp_buf, f"[ClickUp] {task.name}")
|
||||||
self.nvim.buffers[temp_buf.number].options["filetype"] = "markdown"
|
self.nvim.buffers[temp_buf.number].options["filetype"] = "yaml"
|
||||||
self.nvim.api.create_autocmd(
|
self.nvim.api.create_autocmd(
|
||||||
["BufWriteCmd"],
|
["BufWriteCmd"],
|
||||||
{"buffer": temp_buf.number, "command": "MrpyPush " + str(temp_buf.number)},
|
{"buffer": temp_buf.number, "command": "MrpyPush " + str(temp_buf.number)},
|
||||||
)
|
)
|
||||||
|
|
||||||
content = ["---"]
|
content = [
|
||||||
|
f"# yaml-language-server: $schema={Path(__file__).parent.parent.parent.parent}/schema.json ",
|
||||||
|
"---",
|
||||||
|
]
|
||||||
content.extend(
|
content.extend(
|
||||||
dump(
|
dump(
|
||||||
task.showables,
|
task.showables,
|
||||||
).splitlines()
|
).splitlines()
|
||||||
)
|
)
|
||||||
content.append("---")
|
|
||||||
content.extend(task.markdown_description.splitlines())
|
|
||||||
|
|
||||||
self.nvim.api.buf_set_lines(temp_buf, 0, 0, False, content)
|
self.nvim.api.buf_set_lines(temp_buf, 0, 0, False, content)
|
||||||
self.nvim.buffers[temp_buf.number].options["modified"] = False
|
self.nvim.buffers[temp_buf.number].options["modified"] = False
|
||||||
self.nvim.api.win_set_buf(0, temp_buf)
|
self.nvim.api.win_set_buf(0, temp_buf)
|
||||||
|
|
||||||
@command("Mrpy", nargs="?")
|
@command("Mrpy", nargs="*")
|
||||||
def entry(self, args: Sequence[str] = ()) -> None:
|
def entry(self, args: Sequence[str] = ()) -> None:
|
||||||
match args:
|
match args:
|
||||||
case (str() as task_id,):
|
case (str() as task_id,):
|
||||||
self.open_task_buffer(task_id)
|
try:
|
||||||
case ():
|
self.open_task_buffer(task_id)
|
||||||
self.select_task_id()
|
except:
|
||||||
case _:
|
self.select_task_id({task_id})
|
||||||
pass
|
case tags:
|
||||||
|
self.select_task_id(set(tags))
|
||||||
|
|
||||||
@command("MrpyPush", nargs="?")
|
@command("MrpyPush", nargs="?")
|
||||||
def on_vbuf_write(self, args: Sequence[str] = ()) -> None:
|
def on_vbuf_write(self, args: Sequence[str] = ()) -> None:
|
||||||
@@ -132,14 +142,60 @@ class MrPyPlugin:
|
|||||||
|
|
||||||
try:
|
try:
|
||||||
a = self.nvim.buffers[buf_no][0:-1]
|
a = self.nvim.buffers[buf_no][0:-1]
|
||||||
_, fm, *text = "\n".join(a).split("---\n")
|
fm = "\n".join(a)
|
||||||
data = load(fm)
|
data = load(fm)
|
||||||
assert isinstance(data, dict)
|
assert isinstance(data, dict)
|
||||||
|
|
||||||
data["markdown_description"] = "\n\n".join(text)
|
data["markdown_content"] = data["markdown_content"].strip() + "\n"
|
||||||
|
|
||||||
self.nvim.err_write(str(data) + "\n")
|
self.clickup.update(ClickupTask(**data))
|
||||||
# self.clickup.update(data)
|
|
||||||
self.nvim.buffers[buf_no].options["modified"] = False
|
self.nvim.buffers[buf_no].options["modified"] = False
|
||||||
except ValueError:
|
except ValueError:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
@command("MrpyNew", nargs=0)
|
||||||
|
def on_new_task(self) -> None:
|
||||||
|
task = ClickupTask(name="", status="backlog", markdown_content="", parent_list="")
|
||||||
|
temp_buf: Buffer = self.nvim.api.create_buf(True, False)
|
||||||
|
self.nvim.api.buf_set_name(temp_buf, f"[ClickUp] New Task")
|
||||||
|
self.nvim.buffers[temp_buf.number].options["filetype"] = "yaml"
|
||||||
|
self.nvim.api.create_autocmd(
|
||||||
|
["BufWriteCmd"],
|
||||||
|
{"buffer": temp_buf.number, "command": "MrpyPushNew " + str(temp_buf.number)},
|
||||||
|
)
|
||||||
|
|
||||||
|
content = [
|
||||||
|
f"# yaml-language-server: $schema={Path(__file__).parent.parent.parent.parent}/schema.json ",
|
||||||
|
"---",
|
||||||
|
]
|
||||||
|
content.extend(
|
||||||
|
dump(
|
||||||
|
task.showables,
|
||||||
|
).splitlines()
|
||||||
|
)
|
||||||
|
|
||||||
|
self.nvim.api.buf_set_lines(temp_buf, 0, 0, False, content)
|
||||||
|
self.nvim.buffers[temp_buf.number].options["modified"] = False
|
||||||
|
self.nvim.api.win_set_buf(0, temp_buf)
|
||||||
|
|
||||||
|
@command("MrpyPushNew", nargs=1)
|
||||||
|
def on_new_vbuf_write(self, buf_no: Sequence[Any] = ()) -> None:
|
||||||
|
buf_no = int(buf_no[0])
|
||||||
|
|
||||||
|
try:
|
||||||
|
a = self.nvim.buffers[buf_no][0:-1]
|
||||||
|
fm = "\n".join(a)
|
||||||
|
data = load(fm)
|
||||||
|
assert isinstance(data, dict)
|
||||||
|
|
||||||
|
data["markdown_content"] = data["markdown_content"].strip() + "\n"
|
||||||
|
|
||||||
|
new_task = ClickupTask(**data)
|
||||||
|
new_task_id = self.clickup.create(new_task)
|
||||||
|
|
||||||
|
self.nvim.buffers[buf_no].options["modified"] = False
|
||||||
|
self.nvim.command("bdelete " + str(buf_no))
|
||||||
|
|
||||||
|
self.entry([new_task_id])
|
||||||
|
except ValueError:
|
||||||
|
pass
|
||||||
|
|||||||
@@ -0,0 +1,64 @@
|
|||||||
|
from json import dumps
|
||||||
|
from ssl import SSLContext, create_default_context
|
||||||
|
from urllib.error import HTTPError
|
||||||
|
from urllib.request import Request, urlopen
|
||||||
|
from urllib.parse import urlencode
|
||||||
|
|
||||||
|
|
||||||
|
def request(
|
||||||
|
url: str,
|
||||||
|
method: str,
|
||||||
|
query_params: dict[str, str] | None = None,
|
||||||
|
data: dict[str, str] | None = None,
|
||||||
|
headers: dict[str, str] | None = None,
|
||||||
|
) -> str:
|
||||||
|
try:
|
||||||
|
with urlopen(
|
||||||
|
Request(
|
||||||
|
url
|
||||||
|
+ (
|
||||||
|
("?" + "&".join(f"{k}={v}" for k, v in query_params.items()))
|
||||||
|
if query_params
|
||||||
|
else ""
|
||||||
|
),
|
||||||
|
dumps(data).encode() if data else None,
|
||||||
|
headers=headers or {},
|
||||||
|
method=method,
|
||||||
|
),
|
||||||
|
context=create_default_context(),
|
||||||
|
) as resp:
|
||||||
|
return resp.read().decode("utf-8")
|
||||||
|
except HTTPError as e:
|
||||||
|
e.add_note(e.url)
|
||||||
|
raise
|
||||||
|
|
||||||
|
|
||||||
|
def get(
|
||||||
|
url: str,
|
||||||
|
query_params: dict[str, str] | None = None,
|
||||||
|
headers: dict[str, str] | None = None,
|
||||||
|
) -> str:
|
||||||
|
return request(url, "GET", query_params=query_params, headers=headers)
|
||||||
|
|
||||||
|
|
||||||
|
def post(
|
||||||
|
url: str,
|
||||||
|
data: dict[str, str] | None = None,
|
||||||
|
headers: dict[str, str] | None = None,
|
||||||
|
) -> str:
|
||||||
|
return request(url, "POST", data=data, headers=headers)
|
||||||
|
|
||||||
|
|
||||||
|
def put(
|
||||||
|
url: str,
|
||||||
|
data: dict[str, str] | None = None,
|
||||||
|
headers: dict[str, str] | None = None,
|
||||||
|
) -> str:
|
||||||
|
return request(url, "PUT", data=data, headers=headers)
|
||||||
|
|
||||||
|
|
||||||
|
def delete(
|
||||||
|
url: str,
|
||||||
|
headers: dict[str, str] | None = None,
|
||||||
|
) -> str:
|
||||||
|
return request(url, "DELETE", headers=headers)
|
||||||
@@ -14,20 +14,44 @@ def dump_scalar(entry: hints.JSONDataScalar) -> str:
|
|||||||
case False:
|
case False:
|
||||||
return "false\n"
|
return "false\n"
|
||||||
case _:
|
case _:
|
||||||
return f"{entry:s}\n"
|
return f"{entry}\n"
|
||||||
|
|
||||||
|
|
||||||
def dump(obj: dict[str, hints.JSONDataScalar | list[hints.JSONDataScalar]]) -> str:
|
def dump(obj: hints.JSONDataMap) -> str:
|
||||||
ret = ""
|
ret = ""
|
||||||
for key, value in obj.items():
|
for key, value in obj.items():
|
||||||
ret += key
|
ret += key
|
||||||
ret += ":"
|
ret += ":"
|
||||||
|
|
||||||
match value:
|
match value:
|
||||||
|
case []:
|
||||||
|
ret += " []\n"
|
||||||
|
case {}:
|
||||||
|
ret += " {}\n"
|
||||||
|
case "":
|
||||||
|
ret += ' ""\n'
|
||||||
case list() as entries:
|
case list() as entries:
|
||||||
ret += "\n"
|
ret += "\n"
|
||||||
for entry in entries:
|
for entry in entries:
|
||||||
ret += f" - {dump_scalar(entry)}"
|
match entry:
|
||||||
|
case dict():
|
||||||
|
subdump = dump(entry)
|
||||||
|
ret += "\n".join(" " + l for l in subdump.splitlines())
|
||||||
|
case list():
|
||||||
|
pass
|
||||||
|
case _:
|
||||||
|
ret += f" - {dump_scalar(entry)}"
|
||||||
|
case dict() as substruct:
|
||||||
|
ret += "\n"
|
||||||
|
subdump = dump(substruct)
|
||||||
|
ret += "\n".join(" " + l for l in subdump.splitlines()) + "\n"
|
||||||
|
|
||||||
|
case str() as mlstring if key == "markdown_content":
|
||||||
|
if mlstring.strip() == "" or dump_scalar(mlstring).strip() == "null":
|
||||||
|
ret += ' ""\n'
|
||||||
|
else:
|
||||||
|
indented = "\n".join((" " + l) for l in dump_scalar(mlstring).splitlines())
|
||||||
|
ret += f" >\n{indented}\n"
|
||||||
case entry:
|
case entry:
|
||||||
ret += f" {dump_scalar(entry)}"
|
ret += f" {dump_scalar(entry)}"
|
||||||
|
|
||||||
@@ -53,6 +77,10 @@ def load(content: str) -> hints.JSONData:
|
|||||||
... key10:
|
... key10:
|
||||||
... - str
|
... - str
|
||||||
... - "str"
|
... - "str"
|
||||||
|
... key11: >
|
||||||
|
... * a
|
||||||
|
... * b:
|
||||||
|
... * c
|
||||||
... '''
|
... '''
|
||||||
>>> yaml_parsed = load(yaml)
|
>>> yaml_parsed = load(yaml)
|
||||||
>>> compare = {
|
>>> compare = {
|
||||||
@@ -66,8 +94,8 @@ def load(content: str) -> hints.JSONData:
|
|||||||
... 'key8': False,
|
... 'key8': False,
|
||||||
... 'key9': [1, 23.2],
|
... 'key9': [1, 23.2],
|
||||||
... 'key10': ['str', 'str'],
|
... 'key10': ['str', 'str'],
|
||||||
|
... 'key11': "* a\\n* b:\\n * c",
|
||||||
... }
|
... }
|
||||||
|
|
||||||
>>> {k: yaml_parsed.get(k, None) for k in compare if compare[k] != yaml_parsed[k]}
|
>>> {k: yaml_parsed.get(k, None) for k in compare if compare[k] != yaml_parsed[k]}
|
||||||
{}
|
{}
|
||||||
"""
|
"""
|
||||||
@@ -75,20 +103,34 @@ def load(content: str) -> hints.JSONData:
|
|||||||
ret = {}
|
ret = {}
|
||||||
key = None
|
key = None
|
||||||
value = None
|
value = None
|
||||||
for line in sub(r":[\s\n]*", ":\n ", dedent(content).strip()).splitlines():
|
mlstring = False
|
||||||
if line.endswith(":"):
|
for line in sub(r"(:( >)?)[\s\n]*", r"\1\n ", dedent(content).strip()).splitlines():
|
||||||
|
if line.startswith("#") or line.strip() == "---":
|
||||||
|
continue
|
||||||
|
elif line.endswith(":"):
|
||||||
if key:
|
if key:
|
||||||
ret[key] = value
|
ret[key] = value
|
||||||
value = None
|
value = None
|
||||||
|
mlstring = False
|
||||||
|
|
||||||
key = line.removesuffix(":")
|
key = line.removesuffix(":")
|
||||||
elif line.startswith(" -"):
|
elif line.endswith(": >"):
|
||||||
|
if key:
|
||||||
|
ret[key] = value
|
||||||
|
value = None
|
||||||
|
mlstring = True
|
||||||
|
|
||||||
|
key = line.removesuffix(": >")
|
||||||
|
elif line.startswith(" -") and not mlstring:
|
||||||
value = value or []
|
value = value or []
|
||||||
try:
|
try:
|
||||||
parsed = loads(line.removeprefix(" -").strip())
|
parsed = loads(line.removeprefix(" -").strip())
|
||||||
except:
|
except:
|
||||||
parsed = loads('"' + line.removeprefix(" -").strip() + '"')
|
parsed = loads('"' + line.removeprefix(" -").strip() + '"')
|
||||||
value.append(parsed)
|
value.append(parsed)
|
||||||
|
elif line.startswith(" ") and mlstring:
|
||||||
|
value = value or ""
|
||||||
|
value += line.removeprefix(" ") + "\n"
|
||||||
else:
|
else:
|
||||||
try:
|
try:
|
||||||
value = loads(line.strip())
|
value = loads(line.strip())
|
||||||
|
|||||||
+44
@@ -0,0 +1,44 @@
|
|||||||
|
{
|
||||||
|
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||||
|
"description": "ETA yaml frontmatter",
|
||||||
|
"properties": {
|
||||||
|
"name": {
|
||||||
|
"type": "string"
|
||||||
|
},
|
||||||
|
"status": {
|
||||||
|
"type": "string",
|
||||||
|
"enum": [
|
||||||
|
"backlog",
|
||||||
|
"selected for development",
|
||||||
|
"in progress",
|
||||||
|
"in review",
|
||||||
|
"on hold",
|
||||||
|
"done",
|
||||||
|
"closed"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"markdown_content": {
|
||||||
|
"type": "string"
|
||||||
|
},
|
||||||
|
"id": {
|
||||||
|
"type": "string",
|
||||||
|
"pattern": "^[a-z0-9]+$"
|
||||||
|
},
|
||||||
|
"parent_list": {
|
||||||
|
"anyOf": [
|
||||||
|
{
|
||||||
|
"const": 900400316794,
|
||||||
|
"description": "OPE Tooling"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"required": [
|
||||||
|
"name",
|
||||||
|
"status",
|
||||||
|
"markdown_content",
|
||||||
|
"parent_list"
|
||||||
|
],
|
||||||
|
"title": "clickup task",
|
||||||
|
"type": "object"
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user