-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
85 lines (71 loc) · 2.52 KB
/
main.py
File metadata and controls
85 lines (71 loc) · 2.52 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
from fastapi import FastAPI, HTTPException, Depends
from fastapi.middleware.cors import CORSMiddleware
from sqlalchemy.orm import Session
from typing import List, Optional
from database import Base, engine, SessionLocal
from models import Task
from schemas import TaskCreate, TaskUpdate, Task as TaskSchema
# ✅ Initialize FastAPI app
app = FastAPI()
# ✅ CORS Middleware for frontend access
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Replace with specific domain in production
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ✅ Create tables in SQLite
Base.metadata.create_all(bind=engine)
# ✅ Dependency to get DB session
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
# ✅ Optional welcome route
@app.get("/")
def read_root():
return {"message": "Welcome to the Smart Task Tracker API"}
# ✅ Create Task
@app.post("/tasks", response_model=TaskSchema)
def create_task(task: TaskCreate, db: Session = Depends(get_db)):
db_task = Task(**task.dict())
db.add(db_task)
db.commit()
db.refresh(db_task)
return db_task
# ✅ Get All Tasks (with optional filtering)
@app.get("/tasks", response_model=List[TaskSchema])
def get_tasks(is_completed: Optional[bool] = None, db: Session = Depends(get_db)):
if is_completed is None:
return db.query(Task).all()
return db.query(Task).filter(Task.is_completed == is_completed).all()
# ✅ Get Task by ID
@app.get("/tasks/{task_id}", response_model=TaskSchema)
def get_task(task_id: int, db: Session = Depends(get_db)):
task = db.query(Task).filter(Task.id == task_id).first()
if not task:
raise HTTPException(status_code=404, detail="Task not found")
return task
# ✅ Update Task
@app.put("/tasks/{task_id}", response_model=TaskSchema)
def update_task(task_id: int, updated_task: TaskUpdate, db: Session = Depends(get_db)):
task = db.query(Task).filter(Task.id == task_id).first()
if not task:
raise HTTPException(status_code=404, detail="Task not found")
for key, value in updated_task.dict().items():
setattr(task, key, value)
db.commit()
db.refresh(task)
return task
# ✅ Delete Task
@app.delete("/tasks/{task_id}")
def delete_task(task_id: int, db: Session = Depends(get_db)):
task = db.query(Task).filter(Task.id == task_id).first()
if not task:
raise HTTPException(status_code=404, detail="Task not found")
db.delete(task)
db.commit()
return {"detail": "Task deleted"}