123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146 |
- from typing import Any, List, Optional
- from fastapi import APIRouter, Depends, HTTPException, File, UploadFile, Form
- from pydantic.errors import NotNoneError
- from sqlalchemy.orm import Session
- from app import crud, models, schemas
- from app.core.config import settings
- from app.api import deps
- from uuid import uuid4
- router = APIRouter()
- @router.get("/", response_model=List[schemas.NftPrint])
- # @router.get("/")
- def read_items(
- db: Session = Depends(deps.get_db),
- skip: int = 0,
- limit: int = 100,
- current_user: models.users = Depends(deps.get_current_active_superuser),
- ) -> Any:
- """
- Retrieve items.
- """
- if crud.user.is_superuser(current_user):
- nfts = crud.nft.get_group_by_title(db, skip=skip, limit=limit)
- else:
- nfts = crud.nft.get_multi_by_owner(
- db=db, owner_id=current_user.userid, skip=skip, limit=limit
- )
- return nfts
- @router.post("/", response_model=schemas.NftBase)
- async def create_item(
- *,
- db: Session = Depends(deps.get_db),
- # item_in: schemas.NftCreate,
- hash: Optional[str] = Form(None),
- userid: Optional[str] = Form(None),
- title: Optional[str] = Form(None),
- context: Optional[str] = Form(None),
- is_active: Optional[bool] = Form(True),
- catagory: Optional[str] = Form(None),
- image: UploadFile = File(...),
- current_user: models.users = Depends(deps.get_current_active_user),
- ) -> Any:
- """
- Create new item.
- """
- if image.content_type.split('/')[0] != 'image':
- raise HTTPException(status_code=415, detail='content type error! Please upload valid image type')
- filename = str(uuid4()) + '.' + image.content_type.split('/')[1]
- with open(settings.IMG_PATH + filename, 'wb+') as f:
- f.write(image.file.read())
- f.close()
- item_in = schemas.NftCreate
- item_in.hash = hash
- item_in.userid = userid
- item_in.title = title
- item_in.context = context
- item_in.is_active = is_active
- item_in.category = catagory
- item_in.imgurl = settings.IMG_HOST + filename
- nft = crud.nft.create_with_owner(
- db=db, obj_in=item_in, owner_id=item_in.userid)
- return nft
- @router.put("/{id}", response_model=schemas.NftCreate)
- def update_item(
- *,
- db: Session = Depends(deps.get_db),
- id: int,
- item_in: schemas.NftUpdate,
- current_user: models.users = Depends(deps.get_current_active_user),
- ) -> Any:
- """
- Update an item.
- """
- nft = crud.nft.get(db=db, id=id)
- if not nft:
- raise HTTPException(status_code=404, detail="Item not found")
- if not crud.user.is_superuser(current_user) and (nft.userid != current_user.userid):
- raise HTTPException(status_code=400, detail="Not enough permissions")
- nft = crud.nft.update(db=db, db_obj=nft, obj_in=item_in)
- return nft
- @router.get("/{id}", response_model=schemas.NftCreate)
- def read_item(
- *,
- db: Session = Depends(deps.get_db),
- id: int,
- current_user: models.users = Depends(deps.get_current_active_user),
- ) -> Any:
- """
- Get item by ID.
- """
- nft = crud.nft.get(db=db, id=id)
- if not nft:
- raise HTTPException(status_code=404, detail="Item not found")
- if not crud.user.is_superuser(current_user) and (nft.userid != current_user.userid):
- raise HTTPException(status_code=400, detail="Not enough permissions")
- return nft
- # @router.post("/title", response_model=schemas.NftCreate)
- @router.post("/title")
- def read_user_by_title(
- *,
- db: Session = Depends(deps.get_db),
- title: str,
- skip: int = 0,
- limit: int = 100,
- current_user: models.users = Depends(deps.get_current_active_user),
- ) -> Any:
- """
- Get item by title.
- """
- nft = crud.nft.get_user_by_title(db=db, title=title, skip=skip, limit=limit)
- if not nft:
- raise HTTPException(status_code=404, detail="Item not found")
- if not crud.user.is_superuser(current_user) and (nft.userid != current_user.userid):
- raise HTTPException(status_code=400, detail="Not enough permissions")
- return nft
- @router.delete("/{id}", response_model=schemas.NftCreate)
- def delete_item(
- *,
- db: Session = Depends(deps.get_db),
- id: int,
- current_user: models.users = Depends(deps.get_current_active_user),
- ) -> Any:
- """
- Delete an item.
- """
- nft = crud.nft.get(db=db, id=id)
- if not nft:
- raise HTTPException(status_code=404, detail="Item not found")
- if not crud.user.is_superuser(current_user) and (nft.userid != current_user.userid):
- raise HTTPException(status_code=400, detail="Not enough permissions")
- nft = crud.nft.remove(db=db, id=id)
- return nft
|