
pydantic#
来自类#
from pydantic import BaseModel
class User(BaseModel):
name: str
age: int
class Config:
from_attributes = True
# 假设我们有一个普通的 Python 对象,它具有与模型字段相同的属性名
class OrdinaryObject:
def __init__(self, name, age):
self.name = name
self.age = age
# 创建一个 OrdinaryObject 实例
ordinary_object = OrdinaryObject(name='Alice', age=30)
# 现在我们可以直接将 ordinary_object 传递给 User 模型的构造函数
user_model = User(ordinary_object)
# 输出模型的字段值
print(user_model.dict()) # 输出: {'name': 'Alice', 'age': 30}
自定义字段#
class User(BaseModel):
id: int
is_active: bool
wx_infos: List[str] = []
class Config:
arbitrary_types_allowed = True # 允许任意类型
Tuple定义问题#
from typing import List, Tuple
class Task(BaseModel):
launch_count_today: int
launch_groups: List[Tuple[str, ...]]
@property
def launch_groups_count(self) -> int:
return len(self.launch_groups)
wrong tuple length, expected 1 (type=value_error.tuple.length; actual_length=5; expected_length=1)#
from typing import List, Tuple
class Task(BaseModel):
launch_count_today: int
launch_groups: List[Tuple[str]] # 这样定义就会tuple限制长度
@property
def launch_groups_count(self) -> int:
return len(self.launch_groups)
random#
有权重的随机#
import random
# 假设有一个字典
my_dict = {'a': 1, 'b': 2, 'c': 3}
# 将字典的键和值分别保存在两个列表中
keys = list(my_dict.keys())
weights = list(my_dict.values())
# 使用 random.choices() 函数进行带有权重的随机选择 取12个值
random_choice = random.choices(keys, weights, k=12)
print("随机选择的键:", random_choice)
loguru#
from multiprocessing import Process, Queue
from loguru import logger
import sys
import time
# 配置日志处理函数
def configure_logger(queue):
logger.remove() # 移除默认日志处理
logger.add(sys.stdout, format="{message}")
logger.add("logfile.log", format="{message}", rotation="1 MB")
while True:
try:
record = queue.get()
if record is None:
break # None 表示日志记录结束
# 使用 log 方法记录日志
logger.log(record["level"], record["message"])
except Exception:
logger.exception("Failed to process log record")
# 配置工作进程
def worker_process(name, queue):
logger.remove() # 移除默认日志处理
def enqueue_log_message(message):
# print('message.record["level"].name:', message.record["level"].name)
# print('message.record["function"]:', message.record["function"])
# print('message.record["line"]:', message.record["line"])
# print('message.record.keys:', message.record.keys())
queue.put({
"line": message.record["line"], "function": message.record["function"],
"level": message.record["level"].name, "message": message.strip()
})
logger.add(enqueue_log_message)
for i in range(5):
time.sleep(0.01)
logger.info(f"Worker {name} is logging iteration {i}")
if __name__ == "__main__":
log_queue = Queue()
# 启动日志处理进程
log_process = Process(target=configure_logger, args=(log_queue,))
log_process.start()
# 启动工作进程
processes = []
for i in range(5):
process = Process(target=worker_process, args=(f"Process-{i}", log_queue))
processes.append(process)
process.start()
# 等待所有工作进程完成
for process in processes:
process.join()
# 终止日志处理进程
log_queue.put(None)
log_process.join()