from enum import Enum, IntEnum import re from lsprotocol.types import ( INITIALIZED, TEXT_DOCUMENT_CODE_ACTION, TEXT_DOCUMENT_COMPLETION, TEXT_DOCUMENT_DOCUMENT_SYMBOL, TEXT_DOCUMENT_HOVER, TEXT_DOCUMENT_INLAY_HINT, TEXT_DOCUMENT_SEMANTIC_TOKENS_FULL, WORKSPACE_INLAY_HINT_REFRESH, CodeAction, CodeActionKind, CodeActionOptions, CodeActionParams, Command, CompletionItem, CompletionItemKind, CompletionItemLabelDetails, CompletionOptions, CompletionParams, DocumentSymbol, DocumentSymbolParams, Hover, HoverParams, InitializedParams, InlayHint, InlayHintParams, MarkupContent, MarkupKind, NotebookDocumentSyncOptions, Position, Range, SemanticTokens, SemanticTokensLegend, SemanticTokensParams, SymbolKind, TextDocumentSyncKind, WorkDoneProgressBegin, WorkDoneProgressEnd, WorkDoneProgressReport, ) from pygls.lsp.server import LanguageServer from eta.gitlab import GitlabProject, GitlabSession from .clickup import ClickupSession, ClickupTask GL_PRJ_PATTERN = r"#project\/(?P((\w+)/)+)(?P\w+)" GL_ID_PATTERN = r"#(?Pmr|issue)\/(?P\d+)" CU_PATTERN = r"#task/(?P\w{8,})" class CustomServer(LanguageServer): cu_cache: dict[str, ClickupTask] cu_session: ClickupSession gl_cache: dict[str, GitlabProject] gl_session: GitlabSession def __init__( self, name: str, version: str, text_document_sync_kind: TextDocumentSyncKind = TextDocumentSyncKind.Incremental, notebook_document_sync: NotebookDocumentSyncOptions | None = None, ) -> None: super().__init__(name, version, text_document_sync_kind, notebook_document_sync) self.cu_cache = {} self.cu_session = ClickupSession() self.gl_cache = {} self.gl_session = GitlabSession() async def update_task_cache(self) -> None: self.cu_cache = {} tasks = self.cu_session.get_tasks() for ti, t in enumerate(tasks): self.cu_cache[t.id] = t self.protocol.progress.report( "startup", WorkDoneProgressReport( message="ClickUp Tasks", percentage=int(100 * (1 + ti) / len(tasks)) ), ) async def update_proj_cache(self) -> None: self.gl_cache = {} projs = self.gl_session.get_projects() for pi, p in enumerate(projs): self.gl_cache[p.path_with_namespace] = p self.protocol.progress.report( "startup", WorkDoneProgressReport( message="Gitlab Prjects", percentage=int(100 * (1 + pi) / len(projs)) ), ) await self.update_issue_cache(projs) await self.update_mr_cache(projs) async def update_issue_cache(self, prjs: list[GitlabProject]) -> None: p_by_id = {p.id: p for p in prjs} issues = self.gl_session.get_issues(prjs) for issue in issues: self.gl_cache[p_by_id[issue.project_id].path_with_namespace].issues.append(issue) async def update_mr_cache(self, prjs: list[GitlabProject]) -> None: p_by_id = {p.id: p for p in prjs} mrs = self.gl_session.get_merge_requests(prjs) for mr in mrs: self.gl_cache[p_by_id[mr.project_id].path_with_namespace].merge_requests.append(mr) server = CustomServer("eta-server", "0.1.0") @server.feature(INITIALIZED) async def on_init(params: InitializedParams) -> None: server.protocol.progress.begin( "startup", WorkDoneProgressBegin("Caching ", percentage=0, cancellable=True) ) await server.update_task_cache() await server.update_proj_cache() server.protocol.progress.end("startup", WorkDoneProgressEnd(message="Done Caching")) @server.feature(TEXT_DOCUMENT_DOCUMENT_SYMBOL) async def list_ids(params: DocumentSymbolParams) -> list[DocumentSymbol]: return [ DocumentSymbol( t.id, SymbolKind.Enum, Range(Position(i, 0), Position(i, 0)), Range(Position(i, 0), Position(i, 0)), detail=t.name, ) for i, t in enumerate(server.cu_cache.values()) ] @server.feature(TEXT_DOCUMENT_COMPLETION, CompletionOptions(trigger_characters=["/", "!", "#"])) async def complete_cu_ids(params: CompletionParams) -> list[CompletionItem]: doc = server.workspace.get_text_document(params.text_document.uri) line = doc.lines[params.position.line] if not line[: params.position.character].strip().endswith(("/", "!", "#")): return [] prev = ( line[: params.position.character] .strip() .removesuffix("/") .removesuffix("!") .removesuffix("#") .split("#")[-1] ) if prev.endswith(("task", "cu", "clickup")): return [ CompletionItem( f"{t.name}", CompletionItemLabelDetails(detail=f" #{t.id}"), kind=CompletionItemKind.Constant, insert_text=f"{t.id}", ) for t in server.cu_cache.values() ] if prev.endswith(("project", "gitlab", "gl")): return [ CompletionItem( p.path_with_namespace, detail=f" {p.path_with_namespace}", kind=CompletionItemKind.Enum, ) for p in server.gl_cache.values() ] prj = next(re.finditer(GL_PRJ_PATTERN, line), None) if prj is None: return [ CompletionItem( f"{line}: {prj}", ) ] if prev.endswith("issue"): return [ CompletionItem( i.title, insert_text=f"{i.iid}", detail=f"#{i.iid}: {i.title} {' '.join('[' + t + ']' for t in i.labels)}", documentation=i.description, kind=CompletionItemKind.EnumMember, ) for p in server.gl_cache.values() for i in p.issues if i.state == "opened" and p.path_with_namespace == f"{prj.group('ns')}{prj.group('prj')}" ] if prev.endswith("mr"): return [ CompletionItem( f"!{m.title} ({p.path_with_namespace}!{m.iid})", insert_text=f"{m.iid}", detail=f"!{m.iid}: {m.title} {' '.join('[' + t + ']' for t in m.labels)}", documentation=m.description, kind=CompletionItemKind.EnumMember, ) for p in server.gl_cache.values() for m in p.merge_requests if m.state == "opened" and p.path_with_namespace.casefold() == f"{prj.group('ns')}{prj.group('prj')}".casefold() ] return [] @server.feature(TEXT_DOCUMENT_INLAY_HINT) async def inlay_info(params: InlayHintParams) -> list[InlayHint]: ret: list[InlayHint] = [] for lid, line in enumerate(server.workspace.text_documents[params.text_document.uri].lines): # for m in re.finditer(GL_PATTERN, line): # ns = m.group("ns") # prj = m.group("prj") # idt = m.group("styp") # id = m.group("sid") # # if p := server.gl_cache.get(f"{ns}{prj}"): # if not idt: # pass # elif idt == "!" and (mr := next(m for m in p.merge_requests if str(m.iid) == id)): # ret.append( # InlayHint( # Position(line=lid, character=m.end()), # label=f"({mr.title} | {mr.state})", # padding_right=True, # padding_left=True, # ) # ) # elif idt == "#" and (issue := next(i for i in p.issues if str(i.iid) == id)): # ret.append( # InlayHint( # Position(line=lid, character=m.end()), # label=f"({issue.title} | {issue.state})", # padding_right=True, # padding_left=True, # ) # ) for m in re.finditer(CU_PATTERN, line): id = m.group(1) if t := server.cu_cache.get(id): ret.append( InlayHint( Position(line=lid, character=m.start()), label=f"{t.status.status_symbol}", padding_left=False, padding_right=False, ) ) ret.append( InlayHint( Position(line=lid, character=m.end()), label=f"{t.name}", padding_left=True, padding_right=False, ) ) return ret @server.command("recache_gl") async def recache_gl(*_) -> None: await server.update_proj_cache() await server.protocol.send_request_async(WORKSPACE_INLAY_HINT_REFRESH, None) @server.command("recache_cu") async def recache_cu(*_) -> None: await server.update_task_cache() await server.protocol.send_request_async(WORKSPACE_INLAY_HINT_REFRESH, None) @server.feature( TEXT_DOCUMENT_CODE_ACTION, CodeActionOptions(code_action_kinds=[CodeActionKind.QuickFix]), ) def code_actions(params: CodeActionParams) -> list[CodeAction]: return [ CodeAction( "Re-Cache GitLab Project Info", kind=CodeActionKind.QuickFix, command=Command("recache gl", "recache_gl"), ), CodeAction( "Re-Cache ClickUp Task Info", kind=CodeActionKind.QuickFix, command=Command("recache cu", "recache_cu"), ), ] @server.feature(TEXT_DOCUMENT_HOVER) def on_hover(params: HoverParams) -> Hover | None: doc = server.workspace.get_text_document(params.text_document.uri) line = doc.lines[params.position.line] # prj_match = list(re.finditer(GL_PRJ_PATTERN, line))[0] # ns = prj_match.group("ns") # prj = prj_match.group("prj") # id_match = list(re.finditer(GL_ID_PATTERN, line)) # # if p := server.gl_cache.get(f"{ns}{prj}"): # for m in id_match: # if params.position.character >= m.start() and params.position.character < m.end(): # idt = m.group("idt") # id = m.group("sid") # # if not idt: # pass # elif idt == "mr" and (mr := next(m for m in p.merge_requests if str(m.iid) == id)): # return Hover( # MarkupContent( # kind=MarkupKind.Markdown, # value=f"# {mr.title}\n\n{mr.description or ''}", # ), # range=Range( # Position(line=params.position.line, character=m.start()), # Position(line=params.position.line, character=m.end()), # ), # ) # elif idt == "issue" and (issue := next(i for i in p.issues if str(i.iid) == id)): # return Hover( # MarkupContent( # kind=MarkupKind.Markdown, # value=f"# {issue.title}\n\n{issue.description or ''}", # ), # range=Range( # Position(line=params.position.line, character=m.start()), # Position(line=params.position.line, character=m.end()), # ), # ) for m in re.finditer(CU_PATTERN, line): if params.position.character >= m.start() and params.position.character < m.end(): id = m.group("id") if t := server.cu_cache.get(id): return Hover( MarkupContent( kind=MarkupKind.Markdown, value=f"# {t.name} - 󱇯 {t.status.status}\n\n{t.markdown_description}", ), range=Range( Position(line=params.position.line, character=m.start()), Position(line=params.position.line, character=m.end()), ), ) else: return Hover( MarkupContent( kind=MarkupKind.Markdown, value=f"# {id}: {m.group(0)}\n\n{list(server.cu_cache)}", ), range=Range( Position(line=params.position.line, character=m.start()), Position(line=params.position.line, character=m.end()), ), ) return None def token_offset(rest: list[int], current: tuple[int, int]) -> tuple[int, int]: lines = rest[::5] # offsets = rest[:-4:-5] last_line = sum(lines) return (current[0] - last_line, current[1]) class TaskModifiers(Enum): Backlog = "backlog" Selected = "selected for development" Progress = "in progress" Review = "in review" Done = "done" Closed = "closed" @server.feature( TEXT_DOCUMENT_SEMANTIC_TOKENS_FULL, SemanticTokensLegend(["cuTask"], list(t.name for t in TaskModifiers)), ) def sem_tokens(params: SemanticTokensParams) -> SemanticTokens: ret: list[int] = [] doc = server.workspace.get_text_document(params.text_document.uri) for lix, line in enumerate(doc.lines): # ms = re.finditer(GL_PATTERN, line) # # for m in ms: # idt = m.group("styp") # id = m.group("sid") # rel_line, rel_char = token_offset(ret, (lix, m.start())) # # if not idt: # ret.extend([rel_line, rel_char, len(m.group(0)), 0, 0]) # elif idt == "!": # ret.extend([rel_line, rel_char, len(m.group(0)), 2, 0]) # elif idt == "#": # ret.extend([rel_line, rel_char, len(m.group(0)), 2, 0]) for m in re.finditer(CU_PATTERN, line): id = m.group("id") rel_line, rel_char = token_offset(ret, (lix, m.start())) if t := server.cu_cache.get(id): ret.extend( [ rel_line, rel_char, len(m.group(0)), 0, list(t.value for t in TaskModifiers).index(t.status.status), ] ) return SemanticTokens(data=ret) def main() -> None: server.start_io()