FastAPI

FastAPI 使用笔记 #

上传文件 #

pip install python-multipart
from fastapi import FastAPI, UploadFile, File
import pandas as pd
from io import StringIO
from pydantic import BaseModel

app = FastAPI()

# 定义响应模型
class RespAction(BaseModel):
    status: dict

# 解析上传的文件
# 解析上传的文件

async def __parse_file(file: UploadFile):
	contents = await file.read()
	# 将字节内容转换为字符串
	data = StringIO(contents.decode('utf-8'))
	# 根据文件类型选择解析方式
	if file.filename.endswith('.csv'):
		df = pd.read_csv(data)
	elif file.filename.endswith('.txt'):
		df = pd.read_csv(data, delimiter='\t') # 假设 TXT 文件以制表符分隔
	else:
		raise ValueError("Unsupported file type. Please upload a CSV or TXT file.")
	return df

# 上传账号的路由
@app.post("/upload/accounts/", description="上传账号csv/txt文件进行解析入库存储")
async def upload_accounts(file: UploadFile = File(...)) -> RespAction:
    rst = RespAction(status={"code": 0, "msg": ""})
    try:
        # 解析文件
        df = await parse_file(file)
        pass
	except:
		pass
	return rst