Compare commits

..

No commits in common. "82afdef873221569f0935f43c293663951742cc7" and "2dca8c833cf66085145ba8a52144fcc0a24958fb" have entirely different histories.

7 changed files with 70 additions and 574 deletions

View File

@ -18,25 +18,3 @@ return {
build = function(_) vim.cmd "UpdateRemotePlugins" end, build = function(_) vim.cmd "UpdateRemotePlugins" end,
} }
``` ```
## Setup
### Shell Configuration
#### Environment Variables
CLICKUP_AUTH
: user api token for clickup
CLICKUP_USER_ID
: user id for clickup
CLICKUP_WORKSPACE_ID
: workspace id for clickup
## Commands
### `:MrPy`
- start task selector of user's tasks (not done/closed)

View File

@ -1,295 +0,0 @@
local curl = require("plenary.curl")
local M = {}
---@param data table
---@return string | nil
M.table_to_yaml = function(data)
local json = vim.json.encode(data)
-- Nutzt 'yq' um JSON zu YAML zu konvertieren
local handle = io.popen("echo '" .. json .. "' | yq -P e '. ' -")
local result = handle:read("*a")
handle:close()
if not result then
print("Error: yq not installed or got invalid json data")
return nil
end
return result
end
---@param yaml_string string
---@return table | nil
M.yaml_to_table = function(yaml_string)
-- 1. YAML String in yq einspeisen und JSON ausgeben lassen
-- Das Flag -o=json sorgt für die Konvertierung
local temp_file = os.tmpname()
local temp_file_handle = io.open(temp_file, "w+b")
if not temp_file_handle then
print("Error: could not open temp file")
print(" temp_file: " .. temp_file)
return nil
end
temp_file_handle:write(yaml_string)
temp_file_handle:flush()
temp_file_handle:close()
local cmd = "yq '.' " .. temp_file
local handle, errres = io.popen(cmd, 'r')
local json_result = handle:read("*a")
handle:close()
os.remove(temp_file)
-- 2. Fehlerprüfung
if json_result == "" then
print("Error: yq not installed or got invalid yaml data")
print(" cmd: " .. cmd)
print(" err: " .. errres)
return nil
end
-- 3. JSON in Lua-Table umwandeln
local ok, table_result = pcall(vim.json.decode, json_result)
if not ok then
print("Error: failed to decode json")
print(" cmd: " .. cmd)
print(" json: " .. json_result)
return nil
end
return table_result
end
---@class ClickupSession
---@field auth string
---@field user string
---@field workspace string
---@field base_url string
---@type ClickupSession
M.session = nil
---@class ClickupRef
---@field name string
---@field id string
---@class ClickupTask
---@field id string
---@field name string
---@field tags? table[]
---@field locations? table[]
---@field list? ClickupRef
---@field parent? string | nil
---@field [string] string
---@return ClickupTask[]
M.latest_tasks = function()
local resp = curl.get(M.session.base_url ..
"/team/" .. M.session.workspace .. "/task?subtasks=true&include_markdown_description=true&assignees[]=" ..
M.session.user, {
headers = {
['Authorization'] = M.session.auth,
["accept"] = "application/json",
["content-type"] = "application/json",
}
})
if resp.status == 200 then
return vim.json.decode(resp.body).tasks
end
print("failed http request: " .. tostring(resp.status) .. " (" .. resp.body .. ")")
return {}
end
---@param task ClickupTask
---@return ClickupTask
M._update_task = function(task)
local update_table = {}
for _, k in ipairs({ "name", "markdown_content", "status" }) do
if task[k] then
update_table[k] = task[k]
end
end
local resp = curl.put(M.session.base_url .. "/task/" .. task.id, {
headers = {
['Authorization'] = M.session.auth,
["accept"] = "application/json",
["content-type"] = "application/json",
},
body = vim.json.encode(update_table)
})
if resp.status ~= 200 then
error("failed to update " .. task.id .. "\n(" .. resp.body .. ")")
end
return vim.json.decode(resp.body)
end
---@param args vim.api.keyset.create_autocmd.callback_args
M._on_tempbuf_write = function(args)
local lines = vim.api.nvim_buf_get_lines(args.buf, 0, -1, false)
---@type string[]
local parts = {}
---@type string[]
local current = {}
for _, line in ipairs(lines) do
if line == "---" then
if #current then
table.insert(parts, table.concat(current, "\n"))
current = {}
end
else
table.insert(current, line)
end
end
table.insert(parts, table.concat(current, "\n"))
---@type ClickupTask
local data = vim.json.decode(parts[2])
data['markdown_content'] = string.gsub(parts[3], "%-%-%-\n", "")
M._update_task(data)
vim.api.nvim_set_option_value("modified", false, { buf = args.buf })
end
---@class SelectionItem
---@field status string
---@field parent string | nil
---@field list string
---@field name string
---@field description string
---@field tags string[]
---@field id string
---@param item SelectionItem
M._on_select_task = function(picker, item)
local new_name = "[ClickUp] " .. item.name
local new_buf_no = vim.api.nvim_create_buf(true, false)
local status, _ = pcall(vim.api.nvim_buf_set_name, new_buf_no, new_name)
if status then
-- vim.api.nvim_set_option_value("buftype", "nofile", { buf = new_buf_no })
vim.api.nvim_set_option_value("filetype", "markdown", { buf = new_buf_no })
vim.api.nvim_create_autocmd({ "BufWriteCmd" }, { buffer = new_buf_no, callback = M._on_tempbuf_write })
local content = { "---", "{" }
for _, k in ipairs({ "id", "name", "status", "tags", "list", "parent" }) do
if content[#content] ~= "{" then
content[#content] = content[#content] .. ","
end
table.insert(content, ' "' .. k .. '": ' .. vim.json.encode(item[k]))
end
table.insert(content, "}")
table.insert(content, "---")
for line in string.gmatch(item.description, "[^\r\n]+") do
table.insert(content, line)
end
vim.api.nvim_buf_set_lines(new_buf_no, 0, 0, false, content)
vim.api.nvim_set_option_value("modified", false, { buf = new_buf_no })
vim.api.nvim_win_set_buf(0, new_buf_no)
else
vim.api.nvim_buf_delete(new_buf_no, {})
picker:close()
end
end
---@param item SelectionItem
M._item_format = function(item, _)
local ret = {}
local hl = "SnacksPickerComment"
if item.status == "in progress" then
hl = "@method"
elseif item.status == "selected for development" then
hl = "@constant.builtin"
elseif item.status == "in review" then
hl = "@keyword"
elseif item.status == "done" then
hl = "@variable.builtin"
end
if item.parent ~= vim.NIL then
ret[#ret + 1] = { "󰘍 ", "SnacksPickerComment" }
end
ret[#ret + 1] = { item.name, hl }
for _, v in ipairs(item.tags) do
ret[#ret + 1] = { " #" .. v, "SnacksPickerComment" }
end
ret[#ret + 1] = {
" 󰻾" .. item.id,
"SnacksPickerComment"
}
return ret
end
---@param list table[]
---@param keys string[]
M.retrieve_subkeys = function(list, keys)
local ret = {}
local interm = list
for _, key in ipairs(keys) do
ret = {}
for _, item in ipairs(interm) do
table.insert(ret, item[key])
end
interm = ret
end
return ret
end
---@param data vim.api.keyset.create_user_command.command_args
M.select_task = function(data)
local tasks = M.latest_tasks()
---@type SelectionItem[]
local items = {}
for tix, t in ipairs(tasks) do
if string.sub(t.name, -7, -1) ~= "Absence" then
local preview_frontmatter = ""
---@type SelectionItem
local prepared = {
idx = tix,
id = t.id,
text = t.name .. t.id .. t.markdown_description,
name = t.name,
tags = M.retrieve_subkeys(t.tags, { "name" }),
status = t.status.status,
parent = t.parent,
list = t.list.name,
description = t.markdown_description,
preview = {
ft = "markdown",
},
action = M._on_select_task
}
for _, k in ipairs({ "id", "name", "status", "tags", "parent" }) do
preview_frontmatter = preview_frontmatter .. "\n" .. k .. ": " .. vim.json.encode(prepared[k])
end
prepared.preview.text = "---" .. preview_frontmatter .. "\n---\n" .. t.markdown_description
table.insert(items, #items + 1, prepared)
end
end
require("snacks.picker").pick({
title = "Select Task",
format = M._item_format,
preview = "preview",
confirm = M._on_select_task,
items = items
})
end
return M

View File

@ -1,10 +0,0 @@
local mrpy = require("mrpy")
vim.api.nvim_create_user_command("MrPy", mrpy.select_task, {})
mrpy.session = {
auth = os.getenv("CLICKUP_AUTH") or "",
user = os.getenv("CLICKUP_USER_ID") or "",
workspace = os.getenv("CLICKUP_WORKSPACE_ID") or "",
base_url = "https://api.clickup.com/api/v2",
}

View File

@ -6,20 +6,10 @@ from typing import Any, Callable, Self, TypedDict
from urllib.error import HTTPError from urllib.error import HTTPError
from .hints import JSONDataMap from .hints import JSONDataMap
from .requests import delete, get, put, post from .requests import get, put
from .env import EnvVar from .env import EnvVar
class TaskRespCheckItemDict(TypedDict):
name: str
resolved: int
class TaskRespCheckDict(TypedDict):
name: str
items: list[TaskRespCheckItemDict]
class TaskRespDict(TypedDict): class TaskRespDict(TypedDict):
name: str name: str
markdown_description: str markdown_description: str
@ -30,7 +20,7 @@ class TaskRespDict(TypedDict):
tags: list[dict[str, str]] tags: list[dict[str, str]]
parent: str | None parent: str | None
locations: list[dict[str, str]] locations: list[dict[str, str]]
checklists: list[TaskRespCheckDict] checklists: dict[str, bool]
list: dict[str, str] list: dict[str, str]
@ -39,25 +29,24 @@ class ClickupTask:
"""fields marked with ``repr=False`` will not be pushed for updates""" """fields marked with ``repr=False`` will not be pushed for updates"""
name: str name: str
markdown_description: str
status: str status: str
id: str | None = field(default=None, repr=False) id: str = field(repr=False)
assignees: list[str] | None = field(default=None, repr=False) assignees: list[str] = field(repr=False)
tags: list[str] | None = field(default=None, repr=False) tags: list[str] = field(repr=False)
parent_list: str = field(repr=False) parent_list: str = field(repr=False)
locations: list[str] | None = field(default=None, repr=False) locations: list[str] = field(repr=False)
checklists: dict[str, dict[str, bool]] | None = field(default=None, repr=False) checklists: dict[str, bool] = field(repr=False)
parent: str | None = field(default=None, repr=False) parent: str | None = field(default=None, repr=False)
markdown_content: str
@classmethod @classmethod
def from_resp_data(cls, resp_data_raw: dict[str, Any]) -> Self: def from_resp_data(cls, resp_data_raw: dict[str, Any]) -> Self:
resp_data = TaskRespDict(**resp_data_raw) resp_data = TaskRespDict(**resp_data_raw)
return cls( return cls(
name=resp_data["name"], name=resp_data["name"],
markdown_content=resp_data["markdown_description"], markdown_description=resp_data["markdown_description"],
status=resp_data["status"]["status"], status=resp_data["status"]["status"],
id=resp_data["id"], id=resp_data["id"],
assignees=cls.convert_assignees(resp_data["assignees"]), assignees=cls.convert_assignees(resp_data["assignees"]),
@ -65,10 +54,7 @@ class ClickupTask:
parent=resp_data.get("parent"), parent=resp_data.get("parent"),
parent_list=resp_data["list"]["id"], parent_list=resp_data["list"]["id"],
locations=cls.convert_simple_list_with_names(resp_data["locations"]), locations=cls.convert_simple_list_with_names(resp_data["locations"]),
checklists={ checklists=resp_data["checklists"],
tlist["name"]: {t["name"]: bool(t["resolved"]) for t in tlist["items"]}
for tlist in resp_data["checklists"]
},
) )
@classmethod @classmethod
@ -92,14 +78,16 @@ class ClickupTask:
@property @property
def updateables(self) -> dict[str, Any]: def updateables(self) -> dict[str, Any]:
return {f.name: getattr(self, f.name, None) for f in fields(type(self)) if f.repr} return {
f.name: getattr(self, f.name, None) for f in fields(type(self)) if f.repr
}
@property @property
def showables(self) -> dict[str, Any]: def showables(self) -> dict[str, Any]:
return { return {
dcf.name: getattr(self, dcf.name, ...) f.name: getattr(self, f.name, None)
for dcf in fields(self) for f in fields(type(self))
if getattr(self, dcf.name, ...) is not None if f.name != "markdown_description"
} }
@property @property
@ -109,12 +97,7 @@ class ClickupTask:
if self.parent: if self.parent:
ret += " \033[32m 󰙅 " ret += " \033[32m 󰙅 "
ret += self.name ret += f"{self.name} (#{self.id})"
if self.tags:
ret += "".join(f" #{t}" for t in self.tags)
ret += f" (#{self.id})"
return ret return ret
@ -168,38 +151,9 @@ class ClickupSession:
) )
return loads(raw_data) return loads(raw_data)
def _post(self, endpoint: str, **body_params: str) -> JSONDataMap: def get_tasks(self) -> list[ClickupTask]:
raw_data = post(
self.base_url + endpoint,
body_params,
headers={
"accept": "application/json",
"content-type": "application/json",
"Authorization": self.auth_key,
},
)
return loads(raw_data)
def _delete(self, endpoint: str) -> JSONDataMap:
raw_data = delete(
self.base_url + endpoint,
headers={
"accept": "application/json",
"content-type": "application/json",
"Authorization": self.auth_key,
},
)
return loads(raw_data)
def get_tasks(self, **filters: str) -> list[ClickupTask]:
data = self._get( data = self._get(
f"/team/{self.workspace_id}/task", f"/team/{self.workspace_id}/task?subtasks=true&include_markdown_description=true&assignees[]={self.user_id}"
**{
"subtask": "true",
"include_markdown_description": "true",
"assignees[]": self.user_id,
}
| filters,
).get("tasks", []) ).get("tasks", [])
if isinstance(data, list): if isinstance(data, list):
return [ClickupTask.from_resp_data(t) for t in data if isinstance(t, dict)] return [ClickupTask.from_resp_data(t) for t in data if isinstance(t, dict)]
@ -215,16 +169,4 @@ class ClickupSession:
) )
def update(self, data: ClickupTask) -> None: def update(self, data: ClickupTask) -> None:
ret_task = self._put(f"/task/{data.id}", **data.updateables) _ = self._put(f"/task/{data.id}", **data.updateables)
current_tags: set[str] = set(ret_task["tags"])
new_tags = set(data.tags)
for del_tag in current_tags - new_tags:
self._delete(f"/task/{data.id}/tag/{del_tag}")
for add_tag in new_tags - current_tags:
self._post(f"/task/{data.id}/tag/{add_tag}")
def create(self, data: ClickupTask) -> str:
ret_task = self._post(f"/list/{data.parent_list}/task", **data.updateables)
return str(ret_task["id"])

View File

@ -1,11 +1,9 @@
from collections.abc import Callable, Sequence from collections.abc import Callable, Sequence
from pathlib import Path
from typing import Any
from pynvim import Nvim, command, plugin from pynvim import Nvim, command, plugin
from pynvim.api import Buffer from pynvim.api import Buffer
from .clickup import ClickupSession, ClickupTask from .clickup import ClickupSession
from .hints import JSONData from .hints import JSONData
from .yaml import load, dump from .yaml import load, dump
@ -30,20 +28,18 @@ class MrPyPlugin:
"assignees": usernames_from_objs, "assignees": usernames_from_objs,
} }
def select_task_id(self, tags: set[str] | None = None) -> None: def select_task_id(self) -> None:
tasks = self.clickup.get_tasks(**({"tags[]": "&tags[]=".join(tags)} if tags else {})) tasks = self.clickup.get_tasks()
task_names_by_id = [ task_names_by_id = [
{ {
"idx": tix + 1, "idx": tix + 1,
"id": t.id, "id": t.id,
"text": t.short,
"name": t.name, "name": t.name,
"tags": t.tags,
"status": t.status, "status": t.status,
"is_child": bool(t.parent), "is_child": bool(t.parent),
"preview": { "preview": {
"text": dump(t.showables), "text": f"---\n{dump(t.showables)}---\n{t.markdown_description}",
"ft": "yaml", "ft": "markdown",
}, },
"action": f":Mrpy {t.id}", "action": f":Mrpy {t.id}",
} }
@ -74,12 +70,8 @@ class MrPyPlugin:
ret[#ret + 1] = { item.name, hl } ret[#ret + 1] = { item.name, hl }
for _, v in ipairs(item.tags) do
ret[#ret + 1] = { " #" .. v, "SnacksPickerComment" }
end
ret[#ret + 1] = { ret[#ret + 1] = {
" 󰻾" .. item.id, " (#" .. item.id .. ")",
"SnacksPickerComment" "SnacksPickerComment"
} }
return ret return ret
@ -99,36 +91,34 @@ class MrPyPlugin:
task = self.clickup.get_task(task_id) task = self.clickup.get_task(task_id)
temp_buf: Buffer = self.nvim.api.create_buf(True, False) temp_buf: Buffer = self.nvim.api.create_buf(True, False)
self.nvim.api.buf_set_name(temp_buf, f"[ClickUp] {task.name}") self.nvim.api.buf_set_name(temp_buf, f"[ClickUp] {task.name}")
self.nvim.buffers[temp_buf.number].options["filetype"] = "yaml" self.nvim.buffers[temp_buf.number].options["filetype"] = "markdown"
self.nvim.api.create_autocmd( self.nvim.api.create_autocmd(
["BufWriteCmd"], ["BufWriteCmd"],
{"buffer": temp_buf.number, "command": "MrpyPush " + str(temp_buf.number)}, {"buffer": temp_buf.number, "command": "MrpyPush " + str(temp_buf.number)},
) )
content = [ content = ["---"]
f"# yaml-language-server: $schema={Path(__file__).parent.parent.parent.parent}/schema.json ",
"---",
]
content.extend( content.extend(
dump( dump(
task.showables, task.showables,
).splitlines() ).splitlines()
) )
content.append("---")
content.extend(task.markdown_description.splitlines())
self.nvim.api.buf_set_lines(temp_buf, 0, 0, False, content) self.nvim.api.buf_set_lines(temp_buf, 0, 0, False, content)
self.nvim.buffers[temp_buf.number].options["modified"] = False self.nvim.buffers[temp_buf.number].options["modified"] = False
self.nvim.api.win_set_buf(0, temp_buf) self.nvim.api.win_set_buf(0, temp_buf)
@command("Mrpy", nargs="*") @command("Mrpy", nargs="?")
def entry(self, args: Sequence[str] = ()) -> None: def entry(self, args: Sequence[str] = ()) -> None:
match args: match args:
case (str() as task_id,): case (str() as task_id,):
try: self.open_task_buffer(task_id)
self.open_task_buffer(task_id) case ():
except: self.select_task_id()
self.select_task_id({task_id}) case _:
case tags: pass
self.select_task_id(set(tags))
@command("MrpyPush", nargs="?") @command("MrpyPush", nargs="?")
def on_vbuf_write(self, args: Sequence[str] = ()) -> None: def on_vbuf_write(self, args: Sequence[str] = ()) -> None:
@ -142,60 +132,14 @@ class MrPyPlugin:
try: try:
a = self.nvim.buffers[buf_no][0:-1] a = self.nvim.buffers[buf_no][0:-1]
fm = "\n".join(a) _, fm, *text = "\n".join(a).split("---\n")
data = load(fm) data = load(fm)
assert isinstance(data, dict) assert isinstance(data, dict)
data["markdown_content"] = data["markdown_content"].strip() + "\n" data["markdown_description"] = "\n\n".join(text)
self.clickup.update(ClickupTask(**data)) self.nvim.err_write(str(data) + "\n")
# self.clickup.update(data)
self.nvim.buffers[buf_no].options["modified"] = False self.nvim.buffers[buf_no].options["modified"] = False
except ValueError: except ValueError:
pass pass
@command("MrpyNew", nargs=0)
def on_new_task(self) -> None:
task = ClickupTask(name="", status="backlog", markdown_content="", parent_list="")
temp_buf: Buffer = self.nvim.api.create_buf(True, False)
self.nvim.api.buf_set_name(temp_buf, f"[ClickUp] New Task")
self.nvim.buffers[temp_buf.number].options["filetype"] = "yaml"
self.nvim.api.create_autocmd(
["BufWriteCmd"],
{"buffer": temp_buf.number, "command": "MrpyPushNew " + str(temp_buf.number)},
)
content = [
f"# yaml-language-server: $schema={Path(__file__).parent.parent.parent.parent}/schema.json ",
"---",
]
content.extend(
dump(
task.showables,
).splitlines()
)
self.nvim.api.buf_set_lines(temp_buf, 0, 0, False, content)
self.nvim.buffers[temp_buf.number].options["modified"] = False
self.nvim.api.win_set_buf(0, temp_buf)
@command("MrpyPushNew", nargs=1)
def on_new_vbuf_write(self, buf_no: Sequence[Any] = ()) -> None:
buf_no = int(buf_no[0])
try:
a = self.nvim.buffers[buf_no][0:-1]
fm = "\n".join(a)
data = load(fm)
assert isinstance(data, dict)
data["markdown_content"] = data["markdown_content"].strip() + "\n"
new_task = ClickupTask(**data)
new_task_id = self.clickup.create(new_task)
self.nvim.buffers[buf_no].options["modified"] = False
self.nvim.command("bdelete " + str(buf_no))
self.entry([new_task_id])
except ValueError:
pass

View File

@ -1,36 +1,5 @@
from json import dumps
from ssl import SSLContext, create_default_context from ssl import SSLContext, create_default_context
from urllib.error import HTTPError
from urllib.request import Request, urlopen from urllib.request import Request, urlopen
from urllib.parse import urlencode
def request(
url: str,
method: str,
query_params: dict[str, str] | None = None,
data: dict[str, str] | None = None,
headers: dict[str, str] | None = None,
) -> str:
try:
with urlopen(
Request(
url
+ (
("?" + "&".join(f"{k}={v}" for k, v in query_params.items()))
if query_params
else ""
),
dumps(data).encode() if data else None,
headers=headers or {},
method=method,
),
context=create_default_context(),
) as resp:
return resp.read().decode("utf-8")
except HTTPError as e:
e.add_note(e.url)
raise
def get( def get(
@ -38,15 +7,20 @@ def get(
query_params: dict[str, str] | None = None, query_params: dict[str, str] | None = None,
headers: dict[str, str] | None = None, headers: dict[str, str] | None = None,
) -> str: ) -> str:
return request(url, "GET", query_params=query_params, headers=headers) with urlopen(
Request(
url
def post( + (
url: str, ("?" + "&".join(f"{k}={v}" for k, v in query_params.items()))
data: dict[str, str] | None = None, if query_params
headers: dict[str, str] | None = None, else ""
) -> str: ),
return request(url, "POST", data=data, headers=headers) headers=headers or {},
method="GET",
),
context=create_default_context(),
) as resp:
return resp.read().decode("utf-8")
def put( def put(
@ -54,11 +28,13 @@ def put(
data: dict[str, str] | None = None, data: dict[str, str] | None = None,
headers: dict[str, str] | None = None, headers: dict[str, str] | None = None,
) -> str: ) -> str:
return request(url, "PUT", data=data, headers=headers) with urlopen(
Request(
url,
def delete( str(data).encode(),
url: str, headers=headers or {},
headers: dict[str, str] | None = None, method="PUT",
) -> str: ),
return request(url, "DELETE", headers=headers) context=create_default_context(),
) as resp:
return resp.read().decode("utf-8")

View File

@ -17,7 +17,7 @@ def dump_scalar(entry: hints.JSONDataScalar) -> str:
return f"{entry}\n" return f"{entry}\n"
def dump(obj: hints.JSONDataMap) -> str: def dump(obj: dict[str, hints.JSONDataScalar | list[hints.JSONDataScalar]]) -> str:
ret = "" ret = ""
for key, value in obj.items(): for key, value in obj.items():
ret += key ret += key
@ -26,32 +26,11 @@ def dump(obj: hints.JSONDataMap) -> str:
match value: match value:
case []: case []:
ret += " []\n" ret += " []\n"
case {}:
ret += " {}\n"
case "":
ret += ' ""\n'
case list() as entries: case list() as entries:
ret += "\n" ret += "\n"
for entry in entries: for entry in entries:
match entry: ret += f" - {dump_scalar(entry)}"
case dict():
subdump = dump(entry)
ret += "\n".join(" " + l for l in subdump.splitlines())
case list():
pass
case _:
ret += f" - {dump_scalar(entry)}"
case dict() as substruct:
ret += "\n"
subdump = dump(substruct)
ret += "\n".join(" " + l for l in subdump.splitlines()) + "\n"
case str() as mlstring if key == "markdown_content":
if mlstring.strip() == "" or dump_scalar(mlstring).strip() == "null":
ret += ' ""\n'
else:
indented = "\n".join((" " + l) for l in dump_scalar(mlstring).splitlines())
ret += f" >\n{indented}\n"
case entry: case entry:
ret += f" {dump_scalar(entry)}" ret += f" {dump_scalar(entry)}"
@ -77,10 +56,6 @@ def load(content: str) -> hints.JSONData:
... key10: ... key10:
... - str ... - str
... - "str" ... - "str"
... key11: >
... * a
... * b:
... * c
... ''' ... '''
>>> yaml_parsed = load(yaml) >>> yaml_parsed = load(yaml)
>>> compare = { >>> compare = {
@ -94,8 +69,8 @@ def load(content: str) -> hints.JSONData:
... 'key8': False, ... 'key8': False,
... 'key9': [1, 23.2], ... 'key9': [1, 23.2],
... 'key10': ['str', 'str'], ... 'key10': ['str', 'str'],
... 'key11': "* a\\n* b:\\n * c",
... } ... }
>>> {k: yaml_parsed.get(k, None) for k in compare if compare[k] != yaml_parsed[k]} >>> {k: yaml_parsed.get(k, None) for k in compare if compare[k] != yaml_parsed[k]}
{} {}
""" """
@ -103,34 +78,20 @@ def load(content: str) -> hints.JSONData:
ret = {} ret = {}
key = None key = None
value = None value = None
mlstring = False for line in sub(r":[\s\n]*", ":\n ", dedent(content).strip()).splitlines():
for line in sub(r"(:( >)?)[\s\n]*", r"\1\n ", dedent(content).strip()).splitlines(): if line.endswith(":"):
if line.startswith("#") or line.strip() == "---":
continue
elif line.endswith(":"):
if key: if key:
ret[key] = value ret[key] = value
value = None value = None
mlstring = False
key = line.removesuffix(":") key = line.removesuffix(":")
elif line.endswith(": >"): elif line.startswith(" -"):
if key:
ret[key] = value
value = None
mlstring = True
key = line.removesuffix(": >")
elif line.startswith(" -") and not mlstring:
value = value or [] value = value or []
try: try:
parsed = loads(line.removeprefix(" -").strip()) parsed = loads(line.removeprefix(" -").strip())
except: except:
parsed = loads('"' + line.removeprefix(" -").strip() + '"') parsed = loads('"' + line.removeprefix(" -").strip() + '"')
value.append(parsed) value.append(parsed)
elif line.startswith(" ") and mlstring:
value = value or ""
value += line.removeprefix(" ") + "\n"
else: else:
try: try:
value = loads(line.strip()) value = loads(line.strip())