implement server mvp: fastapi app, org formatter, sqlite store, tests, dockerfile
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
17
server/Dockerfile
Normal file
17
server/Dockerfile
Normal file
@@ -0,0 +1,17 @@
|
|||||||
|
FROM python:3.11-slim
|
||||||
|
|
||||||
|
WORKDIR /app
|
||||||
|
|
||||||
|
COPY pyproject.toml .
|
||||||
|
RUN pip install --no-cache-dir fastapi "uvicorn[standard]" pydantic
|
||||||
|
|
||||||
|
COPY app/ app/
|
||||||
|
|
||||||
|
ENV PHONE_CAPTURE_HOST=0.0.0.0
|
||||||
|
ENV PHONE_CAPTURE_PORT=8765
|
||||||
|
ENV PHONE_CAPTURE_ORG_PATH=/data/synq.org
|
||||||
|
ENV PHONE_CAPTURE_DB_PATH=/data/capture.sqlite3
|
||||||
|
|
||||||
|
EXPOSE 8765
|
||||||
|
|
||||||
|
CMD ["python", "-m", "app.main"]
|
||||||
0
server/app/__init__.py
Normal file
0
server/app/__init__.py
Normal file
85
server/app/main.py
Normal file
85
server/app/main.py
Normal file
@@ -0,0 +1,85 @@
|
|||||||
|
import logging
|
||||||
|
import os
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
from fastapi import Depends, FastAPI, HTTPException, Request
|
||||||
|
from fastapi.exceptions import RequestValidationError
|
||||||
|
from fastapi.responses import JSONResponse
|
||||||
|
|
||||||
|
from .models import CaptureRequest, CaptureResponse, HealthResponse
|
||||||
|
from .org_writer import format_capture
|
||||||
|
from .store import IdempotencyStore, get_file_lock
|
||||||
|
|
||||||
|
logger = logging.getLogger("synq")
|
||||||
|
|
||||||
|
app = FastAPI(title="synq", version="0.1.0")
|
||||||
|
|
||||||
|
_store: IdempotencyStore | None = None
|
||||||
|
|
||||||
|
|
||||||
|
def get_store() -> IdempotencyStore:
|
||||||
|
global _store
|
||||||
|
if _store is None:
|
||||||
|
db_path = os.environ.get("PHONE_CAPTURE_DB_PATH", "/data/capture.sqlite3")
|
||||||
|
_store = IdempotencyStore(db_path)
|
||||||
|
return _store
|
||||||
|
|
||||||
|
|
||||||
|
def _org_path() -> str:
|
||||||
|
return os.environ.get("PHONE_CAPTURE_ORG_PATH", "/data/synq.org")
|
||||||
|
|
||||||
|
|
||||||
|
def _token() -> str:
|
||||||
|
return os.environ.get("PHONE_CAPTURE_TOKEN", "")
|
||||||
|
|
||||||
|
|
||||||
|
async def check_token(request: Request) -> None:
|
||||||
|
expected = _token()
|
||||||
|
if not expected:
|
||||||
|
raise HTTPException(status_code=500, detail="server token not configured")
|
||||||
|
auth = request.headers.get("authorization", "")
|
||||||
|
scheme, _, provided = auth.partition(" ")
|
||||||
|
if scheme.lower() != "bearer" or provided != expected:
|
||||||
|
raise HTTPException(status_code=401, detail="unauthorized")
|
||||||
|
|
||||||
|
|
||||||
|
@app.get("/health", response_model=HealthResponse)
|
||||||
|
async def health() -> HealthResponse:
|
||||||
|
return HealthResponse(ok=True, service="synq", version="0.1.0")
|
||||||
|
|
||||||
|
|
||||||
|
@app.post("/capture", response_model=CaptureResponse, dependencies=[Depends(check_token)])
|
||||||
|
async def capture(payload: CaptureRequest, store: IdempotencyStore = Depends(get_store)) -> CaptureResponse:
|
||||||
|
if store.already_seen(payload.id):
|
||||||
|
logger.info("duplicate capture id=%s", payload.id)
|
||||||
|
return CaptureResponse(ok=True, status="already_seen", id=payload.id)
|
||||||
|
|
||||||
|
entry = format_capture(payload)
|
||||||
|
org_path = _org_path()
|
||||||
|
|
||||||
|
lock = get_file_lock()
|
||||||
|
with lock:
|
||||||
|
Path(org_path).parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
with open(org_path, "a", encoding="utf-8") as f:
|
||||||
|
f.write(entry)
|
||||||
|
store.mark_seen(payload.id, payload.created_at)
|
||||||
|
|
||||||
|
logger.info("accepted capture id=%s", payload.id)
|
||||||
|
return CaptureResponse(ok=True, status="accepted", id=payload.id)
|
||||||
|
|
||||||
|
|
||||||
|
@app.exception_handler(RequestValidationError)
|
||||||
|
async def validation_exception_handler(request: Request, exc: RequestValidationError) -> JSONResponse:
|
||||||
|
errors = exc.errors()
|
||||||
|
for err in errors:
|
||||||
|
if "body" in err.get("loc", ()):
|
||||||
|
return JSONResponse(status_code=400, content={"detail": "body must not be empty"})
|
||||||
|
msg = str(errors[0]["msg"]) if errors else "invalid payload"
|
||||||
|
return JSONResponse(status_code=400, content={"detail": msg})
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
import uvicorn
|
||||||
|
host = os.environ.get("PHONE_CAPTURE_HOST", "0.0.0.0")
|
||||||
|
port = int(os.environ.get("PHONE_CAPTURE_PORT", "8765"))
|
||||||
|
uvicorn.run("app.main:app", host=host, port=port, reload=False)
|
||||||
45
server/app/models.py
Normal file
45
server/app/models.py
Normal file
@@ -0,0 +1,45 @@
|
|||||||
|
from pydantic import BaseModel, field_validator
|
||||||
|
from typing import Literal
|
||||||
|
|
||||||
|
|
||||||
|
class CaptureRequest(BaseModel):
|
||||||
|
id: str
|
||||||
|
created_at: str
|
||||||
|
kind: Literal["note", "todo"]
|
||||||
|
body: str
|
||||||
|
tags: list[str]
|
||||||
|
device: str
|
||||||
|
|
||||||
|
@field_validator("body")
|
||||||
|
@classmethod
|
||||||
|
def body_must_not_be_empty(cls, v: str) -> str:
|
||||||
|
stripped = v.strip()
|
||||||
|
if not stripped:
|
||||||
|
raise ValueError("body must not be empty")
|
||||||
|
return stripped
|
||||||
|
|
||||||
|
@field_validator("id")
|
||||||
|
@classmethod
|
||||||
|
def id_must_not_be_empty(cls, v: str) -> str:
|
||||||
|
if not v.strip():
|
||||||
|
raise ValueError("id must not be empty")
|
||||||
|
return v
|
||||||
|
|
||||||
|
@field_validator("device")
|
||||||
|
@classmethod
|
||||||
|
def device_must_not_be_empty(cls, v: str) -> str:
|
||||||
|
if not v.strip():
|
||||||
|
raise ValueError("device must not be empty")
|
||||||
|
return v
|
||||||
|
|
||||||
|
|
||||||
|
class CaptureResponse(BaseModel):
|
||||||
|
ok: bool
|
||||||
|
status: str
|
||||||
|
id: str
|
||||||
|
|
||||||
|
|
||||||
|
class HealthResponse(BaseModel):
|
||||||
|
ok: bool
|
||||||
|
service: str
|
||||||
|
version: str
|
||||||
68
server/app/org_writer.py
Normal file
68
server/app/org_writer.py
Normal file
@@ -0,0 +1,68 @@
|
|||||||
|
import re
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
from typing import Sequence
|
||||||
|
|
||||||
|
from .models import CaptureRequest
|
||||||
|
|
||||||
|
|
||||||
|
def normalize_tags(tags: Sequence[str]) -> list[str]:
|
||||||
|
seen: set[str] = set()
|
||||||
|
result: list[str] = []
|
||||||
|
for raw in tags:
|
||||||
|
tag = raw.strip().lower()
|
||||||
|
tag = tag.lstrip("#")
|
||||||
|
tag = tag.replace(" ", "_")
|
||||||
|
tag = re.sub(r"[^a-z0-9_]", "", tag)
|
||||||
|
if tag and tag not in seen:
|
||||||
|
seen.add(tag)
|
||||||
|
result.append(tag)
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
def _org_tags(tags: list[str]) -> str:
|
||||||
|
if not tags:
|
||||||
|
return ""
|
||||||
|
return " :" + ":".join(tags) + ":"
|
||||||
|
|
||||||
|
|
||||||
|
def _created_stamp(created_at: str) -> str:
|
||||||
|
"""Parse ISO-8601 datetime and format as org inactive timestamp."""
|
||||||
|
try:
|
||||||
|
dt = datetime.fromisoformat(created_at)
|
||||||
|
except ValueError:
|
||||||
|
dt = datetime.now(timezone.utc)
|
||||||
|
day_abbr = dt.strftime("%a").lower()
|
||||||
|
return f"[{dt.strftime('%Y-%m-%d')} {day_abbr} {dt.strftime('%H:%M')}]"
|
||||||
|
|
||||||
|
|
||||||
|
def _property_drawer(created_stamp: str, source: str, capture_id: str) -> str:
|
||||||
|
return (
|
||||||
|
":PROPERTIES:\n"
|
||||||
|
f":CREATED: {created_stamp}\n"
|
||||||
|
f":SOURCE: {source}\n"
|
||||||
|
f":ID: {capture_id}\n"
|
||||||
|
":END:"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def format_capture(capture: CaptureRequest) -> str:
|
||||||
|
tags = normalize_tags(capture.tags)
|
||||||
|
tag_str = _org_tags(tags)
|
||||||
|
created_stamp = _created_stamp(capture.created_at)
|
||||||
|
drawer = _property_drawer(created_stamp, capture.device, capture.id)
|
||||||
|
|
||||||
|
if capture.kind == "todo":
|
||||||
|
heading = f"* TODO {capture.body}{tag_str}"
|
||||||
|
return f"{heading}\n{drawer}\n"
|
||||||
|
|
||||||
|
# note
|
||||||
|
lines = capture.body.split("\n")
|
||||||
|
first_line = lines[0].rstrip()
|
||||||
|
is_multiline = len(lines) > 1 and any(l.strip() for l in lines[1:])
|
||||||
|
|
||||||
|
if is_multiline:
|
||||||
|
heading = f"* note: {first_line}{tag_str}"
|
||||||
|
return f"{heading}\n{drawer}\n\n{capture.body}\n"
|
||||||
|
else:
|
||||||
|
heading = f"* note{tag_str}"
|
||||||
|
return f"{heading}\n{drawer}\n\n{capture.body}\n"
|
||||||
38
server/app/store.py
Normal file
38
server/app/store.py
Normal file
@@ -0,0 +1,38 @@
|
|||||||
|
import sqlite3
|
||||||
|
import threading
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
|
||||||
|
_lock = threading.Lock()
|
||||||
|
|
||||||
|
|
||||||
|
def _connect(db_path: str) -> sqlite3.Connection:
|
||||||
|
conn = sqlite3.connect(db_path, check_same_thread=False)
|
||||||
|
conn.execute(
|
||||||
|
"CREATE TABLE IF NOT EXISTS seen_ids (id TEXT PRIMARY KEY, created_at TEXT NOT NULL)"
|
||||||
|
)
|
||||||
|
conn.commit()
|
||||||
|
return conn
|
||||||
|
|
||||||
|
|
||||||
|
class IdempotencyStore:
|
||||||
|
def __init__(self, db_path: str) -> None:
|
||||||
|
Path(db_path).parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
self._conn = _connect(db_path)
|
||||||
|
|
||||||
|
def already_seen(self, capture_id: str) -> bool:
|
||||||
|
row = self._conn.execute(
|
||||||
|
"SELECT 1 FROM seen_ids WHERE id = ?", (capture_id,)
|
||||||
|
).fetchone()
|
||||||
|
return row is not None
|
||||||
|
|
||||||
|
def mark_seen(self, capture_id: str, created_at: str) -> None:
|
||||||
|
self._conn.execute(
|
||||||
|
"INSERT OR IGNORE INTO seen_ids (id, created_at) VALUES (?, ?)",
|
||||||
|
(capture_id, created_at),
|
||||||
|
)
|
||||||
|
self._conn.commit()
|
||||||
|
|
||||||
|
|
||||||
|
def get_file_lock() -> threading.Lock:
|
||||||
|
return _lock
|
||||||
22
server/pyproject.toml
Normal file
22
server/pyproject.toml
Normal file
@@ -0,0 +1,22 @@
|
|||||||
|
[project]
|
||||||
|
name = "synq-server"
|
||||||
|
version = "0.1.0"
|
||||||
|
requires-python = ">=3.11"
|
||||||
|
dependencies = [
|
||||||
|
"fastapi>=0.111",
|
||||||
|
"uvicorn[standard]>=0.29",
|
||||||
|
"pydantic>=2.7",
|
||||||
|
]
|
||||||
|
|
||||||
|
[project.optional-dependencies]
|
||||||
|
dev = [
|
||||||
|
"pytest>=8",
|
||||||
|
"httpx>=0.27",
|
||||||
|
]
|
||||||
|
|
||||||
|
[build-system]
|
||||||
|
requires = ["hatchling"]
|
||||||
|
build-backend = "hatchling.build"
|
||||||
|
|
||||||
|
[tool.pytest.ini_options]
|
||||||
|
testpaths = ["tests"]
|
||||||
0
server/tests/__init__.py
Normal file
0
server/tests/__init__.py
Normal file
118
server/tests/test_api.py
Normal file
118
server/tests/test_api.py
Normal file
@@ -0,0 +1,118 @@
|
|||||||
|
import os
|
||||||
|
import tempfile
|
||||||
|
import pytest
|
||||||
|
from fastapi.testclient import TestClient
|
||||||
|
|
||||||
|
# Set env vars before importing app so store/paths are overridden in tests
|
||||||
|
TOKEN = "test-token-abc"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(autouse=True)
|
||||||
|
def env_setup(tmp_path, monkeypatch):
|
||||||
|
db = str(tmp_path / "capture.sqlite3")
|
||||||
|
org = str(tmp_path / "synq.org")
|
||||||
|
monkeypatch.setenv("PHONE_CAPTURE_TOKEN", TOKEN)
|
||||||
|
monkeypatch.setenv("PHONE_CAPTURE_DB_PATH", db)
|
||||||
|
monkeypatch.setenv("PHONE_CAPTURE_ORG_PATH", org)
|
||||||
|
# reset singleton store so each test gets a fresh db
|
||||||
|
import app.main as m
|
||||||
|
m._store = None
|
||||||
|
yield
|
||||||
|
m._store = None
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture()
|
||||||
|
def client():
|
||||||
|
from app.main import app
|
||||||
|
return TestClient(app)
|
||||||
|
|
||||||
|
|
||||||
|
def auth_headers():
|
||||||
|
return {"Authorization": f"Bearer {TOKEN}"}
|
||||||
|
|
||||||
|
|
||||||
|
VALID_PAYLOAD = {
|
||||||
|
"id": "phone-20260517-143122-a8f2",
|
||||||
|
"created_at": "2026-05-17T14:31:22-04:00",
|
||||||
|
"kind": "todo",
|
||||||
|
"body": "buy printer paper",
|
||||||
|
"tags": ["home", "errands"],
|
||||||
|
"device": "android",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class TestHealth:
|
||||||
|
def test_health_returns_200(self, client):
|
||||||
|
r = client.get("/health")
|
||||||
|
assert r.status_code == 200
|
||||||
|
data = r.json()
|
||||||
|
assert data["ok"] is True
|
||||||
|
assert data["service"] == "synq"
|
||||||
|
assert data["version"] == "0.1.0"
|
||||||
|
|
||||||
|
def test_health_no_auth_required(self, client):
|
||||||
|
r = client.get("/health")
|
||||||
|
assert r.status_code == 200
|
||||||
|
|
||||||
|
|
||||||
|
class TestCapture:
|
||||||
|
def test_valid_capture_accepted(self, client, tmp_path):
|
||||||
|
r = client.post("/capture", json=VALID_PAYLOAD, headers=auth_headers())
|
||||||
|
assert r.status_code == 200
|
||||||
|
data = r.json()
|
||||||
|
assert data["ok"] is True
|
||||||
|
assert data["status"] == "accepted"
|
||||||
|
assert data["id"] == VALID_PAYLOAD["id"]
|
||||||
|
|
||||||
|
def test_valid_capture_appended_to_org(self, client, monkeypatch, tmp_path):
|
||||||
|
org = str(tmp_path / "synq.org")
|
||||||
|
monkeypatch.setenv("PHONE_CAPTURE_ORG_PATH", org)
|
||||||
|
client.post("/capture", json=VALID_PAYLOAD, headers=auth_headers())
|
||||||
|
content = open(org, encoding="utf-8").read()
|
||||||
|
assert "buy printer paper" in content
|
||||||
|
assert ":ID: phone-20260517-143122-a8f2" in content
|
||||||
|
|
||||||
|
def test_duplicate_capture_returns_already_seen(self, client):
|
||||||
|
client.post("/capture", json=VALID_PAYLOAD, headers=auth_headers())
|
||||||
|
r = client.post("/capture", json=VALID_PAYLOAD, headers=auth_headers())
|
||||||
|
assert r.status_code == 200
|
||||||
|
assert r.json()["status"] == "already_seen"
|
||||||
|
|
||||||
|
def test_duplicate_not_appended_twice(self, client, monkeypatch, tmp_path):
|
||||||
|
org = str(tmp_path / "synq.org")
|
||||||
|
monkeypatch.setenv("PHONE_CAPTURE_ORG_PATH", org)
|
||||||
|
client.post("/capture", json=VALID_PAYLOAD, headers=auth_headers())
|
||||||
|
client.post("/capture", json=VALID_PAYLOAD, headers=auth_headers())
|
||||||
|
content = open(org, encoding="utf-8").read()
|
||||||
|
assert content.count(VALID_PAYLOAD["id"]) == 1
|
||||||
|
|
||||||
|
def test_missing_token_rejected(self, client):
|
||||||
|
r = client.post("/capture", json=VALID_PAYLOAD)
|
||||||
|
assert r.status_code == 401
|
||||||
|
assert r.json()["detail"] == "unauthorized"
|
||||||
|
|
||||||
|
def test_wrong_token_rejected(self, client):
|
||||||
|
r = client.post("/capture", json=VALID_PAYLOAD, headers={"Authorization": "Bearer wrong"})
|
||||||
|
assert r.status_code == 401
|
||||||
|
|
||||||
|
def test_empty_body_rejected(self, client):
|
||||||
|
payload = {**VALID_PAYLOAD, "body": " "}
|
||||||
|
r = client.post("/capture", json=payload, headers=auth_headers())
|
||||||
|
assert r.status_code == 400
|
||||||
|
assert "body" in r.json()["detail"].lower()
|
||||||
|
|
||||||
|
def test_missing_body_field_rejected(self, client):
|
||||||
|
payload = {k: v for k, v in VALID_PAYLOAD.items() if k != "body"}
|
||||||
|
r = client.post("/capture", json=payload, headers=auth_headers())
|
||||||
|
assert r.status_code in (400, 422)
|
||||||
|
|
||||||
|
def test_invalid_kind_rejected(self, client):
|
||||||
|
payload = {**VALID_PAYLOAD, "kind": "journal"}
|
||||||
|
r = client.post("/capture", json=payload, headers=auth_headers())
|
||||||
|
assert r.status_code in (400, 422)
|
||||||
|
|
||||||
|
def test_note_kind_accepted(self, client):
|
||||||
|
payload = {**VALID_PAYLOAD, "id": "phone-20260517-143200-note1", "kind": "note"}
|
||||||
|
r = client.post("/capture", json=payload, headers=auth_headers())
|
||||||
|
assert r.status_code == 200
|
||||||
|
assert r.json()["status"] == "accepted"
|
||||||
106
server/tests/test_org_writer.py
Normal file
106
server/tests/test_org_writer.py
Normal file
@@ -0,0 +1,106 @@
|
|||||||
|
import pytest
|
||||||
|
from app.models import CaptureRequest
|
||||||
|
from app.org_writer import format_capture, normalize_tags
|
||||||
|
|
||||||
|
|
||||||
|
def make_capture(**kwargs) -> CaptureRequest:
|
||||||
|
defaults = dict(
|
||||||
|
id="phone-20260517-143122-a8f2",
|
||||||
|
created_at="2026-05-17T14:31:22-04:00",
|
||||||
|
kind="note",
|
||||||
|
body="hello world",
|
||||||
|
tags=[],
|
||||||
|
device="android",
|
||||||
|
)
|
||||||
|
defaults.update(kwargs)
|
||||||
|
return CaptureRequest(**defaults)
|
||||||
|
|
||||||
|
|
||||||
|
class TestNormalizeTags:
|
||||||
|
def test_lowercase(self):
|
||||||
|
assert normalize_tags(["Home", "ERRANDS"]) == ["home", "errands"]
|
||||||
|
|
||||||
|
def test_strips_hash(self):
|
||||||
|
assert normalize_tags(["#home"]) == ["home"]
|
||||||
|
|
||||||
|
def test_replaces_spaces(self):
|
||||||
|
assert normalize_tags(["my tag"]) == ["my_tag"]
|
||||||
|
|
||||||
|
def test_removes_invalid_chars(self):
|
||||||
|
assert normalize_tags(["tag!"]) == ["tag"]
|
||||||
|
|
||||||
|
def test_deduplicates(self):
|
||||||
|
assert normalize_tags(["home", "home"]) == ["home"]
|
||||||
|
|
||||||
|
def test_omits_empty(self):
|
||||||
|
assert normalize_tags(["", " "]) == []
|
||||||
|
|
||||||
|
def test_empty_list(self):
|
||||||
|
assert normalize_tags([]) == []
|
||||||
|
|
||||||
|
|
||||||
|
class TestFormatCapture:
|
||||||
|
def test_todo_with_tags(self):
|
||||||
|
c = make_capture(kind="todo", body="buy printer paper", tags=["home", "errands"])
|
||||||
|
result = format_capture(c)
|
||||||
|
assert result.startswith("* TODO buy printer paper :home:errands:\n")
|
||||||
|
assert ":CREATED:" in result
|
||||||
|
assert ":SOURCE: android" in result
|
||||||
|
assert ":ID: phone-20260517-143122-a8f2" in result
|
||||||
|
|
||||||
|
def test_todo_no_tags(self):
|
||||||
|
c = make_capture(kind="todo", body="do the thing", tags=[])
|
||||||
|
result = format_capture(c)
|
||||||
|
assert result.startswith("* TODO do the thing\n")
|
||||||
|
|
||||||
|
def test_note_single_line(self):
|
||||||
|
c = make_capture(kind="note", body="mobile capture should stay dumb and append-only.", tags=["retcon"])
|
||||||
|
result = format_capture(c)
|
||||||
|
assert result.startswith("* note :retcon:\n")
|
||||||
|
assert "mobile capture should stay dumb and append-only." in result
|
||||||
|
assert ":PROPERTIES:" in result
|
||||||
|
|
||||||
|
def test_note_no_tags(self):
|
||||||
|
c = make_capture(kind="note", body="simple note", tags=[])
|
||||||
|
result = format_capture(c)
|
||||||
|
assert result.startswith("* note\n")
|
||||||
|
assert "simple note" in result
|
||||||
|
|
||||||
|
def test_note_multiline(self):
|
||||||
|
body = "retcon capture idea\n\nphone should produce records, not edit org files."
|
||||||
|
c = make_capture(kind="note", body=body, tags=["retcon"])
|
||||||
|
result = format_capture(c)
|
||||||
|
assert result.startswith("* note: retcon capture idea :retcon:\n")
|
||||||
|
assert "retcon capture idea" in result
|
||||||
|
assert "phone should produce records, not edit org files." in result
|
||||||
|
assert ":PROPERTIES:" in result
|
||||||
|
|
||||||
|
def test_created_timestamp_format(self):
|
||||||
|
c = make_capture(created_at="2026-05-17T14:31:22-04:00")
|
||||||
|
result = format_capture(c)
|
||||||
|
assert ":CREATED: [2026-05-17 sun 14:31]" in result
|
||||||
|
|
||||||
|
def test_property_drawer_complete(self):
|
||||||
|
c = make_capture()
|
||||||
|
result = format_capture(c)
|
||||||
|
assert ":PROPERTIES:" in result
|
||||||
|
assert ":END:" in result
|
||||||
|
|
||||||
|
def test_todo_body_below_drawer_absent(self):
|
||||||
|
c = make_capture(kind="todo", body="simple todo")
|
||||||
|
result = format_capture(c)
|
||||||
|
lines = result.strip().split("\n")
|
||||||
|
# heading, drawer block — no extra body section
|
||||||
|
assert lines[0].startswith("* TODO")
|
||||||
|
assert ":END:" in result
|
||||||
|
# body should not appear after :END:
|
||||||
|
end_idx = result.index(":END:")
|
||||||
|
after_end = result[end_idx + len(":END:"):].strip()
|
||||||
|
assert after_end == ""
|
||||||
|
|
||||||
|
def test_note_body_appears_after_drawer(self):
|
||||||
|
c = make_capture(kind="note", body="my note body")
|
||||||
|
result = format_capture(c)
|
||||||
|
end_idx = result.index(":END:")
|
||||||
|
after_end = result[end_idx + len(":END:"):].strip()
|
||||||
|
assert "my note body" in after_end
|
||||||
Reference in New Issue
Block a user