update gitignore, add test data, finish paperless connector, start HPScanConnector
This commit is contained in:
parent
6a21f36f18
commit
2a38d141dd
|
@ -1 +1,2 @@
|
|||
.venv/*
|
||||
.env
|
||||
|
|
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
|
@ -0,0 +1,56 @@
|
|||
from requests import Response, get, post
|
||||
from abc import ABC, abstractmethod
|
||||
from types import TracebackType
|
||||
from typing import IO, Any, Self, final
|
||||
|
||||
|
||||
class Connector(ABC):
|
||||
@property
|
||||
@abstractmethod
|
||||
def connected(self) -> bool:
|
||||
raise NotImplementedError
|
||||
|
||||
@abstractmethod
|
||||
def connect(self) -> None:
|
||||
raise NotImplementedError
|
||||
|
||||
@abstractmethod
|
||||
def disconnect(self) -> None:
|
||||
raise NotImplementedError
|
||||
|
||||
@final
|
||||
def __enter__(self) -> Self:
|
||||
self.connect()
|
||||
return self
|
||||
|
||||
@final
|
||||
def __exit__(
|
||||
self,
|
||||
_exc_type: type[BaseException] | None,
|
||||
_exc_value: BaseException | None,
|
||||
_exc_tb: TracebackType | None,
|
||||
) -> bool:
|
||||
self.disconnect()
|
||||
return False
|
||||
|
||||
|
||||
class RESTConnector(Connector, ABC):
|
||||
url: str
|
||||
port: int
|
||||
|
||||
def post(
|
||||
self,
|
||||
endpoint: str,
|
||||
data: dict[str, Any],
|
||||
files: dict[str, IO] | None = None,
|
||||
header: dict[str, Any] | None = None,
|
||||
) -> Response:
|
||||
return post(
|
||||
f"https://{self.url}:{self.port}/{endpoint}",
|
||||
data=data,
|
||||
files=files,
|
||||
headers=header,
|
||||
)
|
||||
|
||||
def get(self, endpoint: str, data: dict[str, Any]) -> Response:
|
||||
return get(f"https://{self.url}:{self.port}/{endpoint}", params=data)
|
|
@ -0,0 +1,62 @@
|
|||
from datetime import datetime
|
||||
from typing import IO, NotRequired, TypedDict, Unpack
|
||||
|
||||
from requests import Response
|
||||
from .common import RESTConnector
|
||||
|
||||
|
||||
class PaperlessPostDocumentOptions(TypedDict):
|
||||
title: NotRequired[str]
|
||||
created: NotRequired[datetime]
|
||||
correspondent: NotRequired[str]
|
||||
document_type: NotRequired[str]
|
||||
tags: NotRequired[list[str]]
|
||||
archive_serial_number: NotRequired[int]
|
||||
|
||||
|
||||
class PaperlessConnection(RESTConnector):
|
||||
url: str
|
||||
port: int
|
||||
|
||||
_token: str | None
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
url: str,
|
||||
port: int,
|
||||
user: str,
|
||||
password: str,
|
||||
) -> None:
|
||||
self.url = url
|
||||
self.port = port
|
||||
token_response = self.post(
|
||||
"/api/token/",
|
||||
{
|
||||
"username": user,
|
||||
"password": password,
|
||||
},
|
||||
)
|
||||
|
||||
assert token_response.ok, token_response
|
||||
|
||||
self._token = token_response.json()["token"]
|
||||
|
||||
def post_document(
|
||||
self, document: IO, **kwargs: Unpack[PaperlessPostDocumentOptions]
|
||||
) -> Response:
|
||||
return self.post(
|
||||
"/api/documents/post_document/",
|
||||
files={"document": document},
|
||||
data=dict(**kwargs),
|
||||
header={"Authorization": f"Token {self._token}"},
|
||||
)
|
||||
|
||||
@property
|
||||
def connected(self) -> bool:
|
||||
return bool(self._token)
|
||||
|
||||
def connect(self) -> None:
|
||||
pass
|
||||
|
||||
def disconnect(self) -> None:
|
||||
pass
|
|
@ -0,0 +1,5 @@
|
|||
from bscan.connector.common import Connector
|
||||
|
||||
|
||||
class HPScannerConnection(Connector):
|
||||
pass
|
|
@ -1,5 +1,25 @@
|
|||
import sane as sane
|
||||
|
||||
|
||||
def main() -> None:
|
||||
pass
|
||||
# env_file = Path(__file__).parent.parent / ".env"
|
||||
#
|
||||
# if env_file.exists():
|
||||
# conf = loads(env_file.read_text())
|
||||
# user = conf["username"]
|
||||
# password = conf["password"]
|
||||
# else:
|
||||
# user = input("paperless user: ")
|
||||
# password = getpass("paperless password: ")
|
||||
#
|
||||
# pc = PaperlessConnection("papers.acereca.net", 443, user, password)
|
||||
#
|
||||
# doc = Path(__file__).parent.parent / "tests" / "data" / "20230504_FRITZ!Box7590.pdf"
|
||||
#
|
||||
# ret = pc.post_document(document=doc.open(mode="rb"))
|
||||
|
||||
sane.init()
|
||||
print(sane.get_devices())
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
|
|
@ -1,7 +1,10 @@
|
|||
[project]
|
||||
name = "bscan"
|
||||
version = "0.1.0"
|
||||
dependencies = []
|
||||
dependencies = [
|
||||
"requests~=2.31",
|
||||
"python-sane~=2.9"
|
||||
]
|
||||
|
||||
[project.optional-dependencies]
|
||||
dev = [
|
||||
|
@ -18,4 +21,4 @@ requires = [
|
|||
]
|
||||
|
||||
[project.scripts]
|
||||
bscan = "bscan:main/main"
|
||||
bscan = "bscan.main:main"
|
||||
|
|
Binary file not shown.
Loading…
Reference in New Issue