Python 常用库

python

pydantic#

来自类#

from pydantic import BaseModel

class User(BaseModel):
    name: str
    age: int

    class Config:
        from_attributes = True

# 假设我们有一个普通的 Python 对象,它具有与模型字段相同的属性名
class OrdinaryObject:
    def __init__(self, name, age):
        self.name = name
        self.age = age

# 创建一个 OrdinaryObject 实例
ordinary_object = OrdinaryObject(name='Alice', age=30)

# 现在我们可以直接将 ordinary_object 传递给 User 模型的构造函数
user_model = User(ordinary_object)

# 输出模型的字段值
print(user_model.dict())  # 输出: {'name': 'Alice', 'age': 30}

自定义字段#

class User(BaseModel):
	id: int
	is_active: bool
	wx_infos: List[str] = []

	class Config:
		arbitrary_types_allowed = True # 允许任意类型

Tuple定义问题#

from typing import List, Tuple
class Task(BaseModel):
	launch_count_today: int
	launch_groups: List[Tuple[str, ...]]
	@property
	def launch_groups_count(self) -> int:
		return len(self.launch_groups)

wrong tuple length, expected 1 (type=value_error.tuple.length; actual_length=5; expected_length=1)#

from typing import List, Tuple
class Task(BaseModel):
	launch_count_today: int
	launch_groups: List[Tuple[str]] # 这样定义就会tuple限制长度
	@property
	def launch_groups_count(self) -> int:
		return len(self.launch_groups)

random#

有权重的随机#

import random
# 假设有一个字典
my_dict = {'a': 1, 'b': 2, 'c': 3}

# 将字典的键和值分别保存在两个列表中
keys = list(my_dict.keys())
weights = list(my_dict.values())

# 使用 random.choices() 函数进行带有权重的随机选择 取12个值
random_choice = random.choices(keys, weights, k=12)

print("随机选择的键:", random_choice)

loguru#

from multiprocessing import Process, Queue
from loguru import logger
import sys
import time

# 配置日志处理函数
def configure_logger(queue):
    logger.remove()  # 移除默认日志处理
    logger.add(sys.stdout, format="{message}")
    logger.add("logfile.log", format="{message}", rotation="1 MB")
    
    while True:
        try:
            record = queue.get()
            if record is None:
                break  # None 表示日志记录结束
            # 使用 log 方法记录日志
            logger.log(record["level"], record["message"])
        except Exception:
            logger.exception("Failed to process log record")

# 配置工作进程
def worker_process(name, queue):
    logger.remove()  # 移除默认日志处理
    def enqueue_log_message(message):
        # print('message.record["level"].name:', message.record["level"].name)
        # print('message.record["function"]:', message.record["function"])
        # print('message.record["line"]:', message.record["line"])
        # print('message.record.keys:', message.record.keys())
        queue.put({
            "line": message.record["line"], "function": message.record["function"],
            "level": message.record["level"].name, "message": message.strip()
        })
    logger.add(enqueue_log_message)
    
    for i in range(5):
        time.sleep(0.01)
        logger.info(f"Worker {name} is logging iteration {i}")

if __name__ == "__main__":
    log_queue = Queue()
    
    # 启动日志处理进程
    log_process = Process(target=configure_logger, args=(log_queue,))
    log_process.start()
    
    # 启动工作进程
    processes = []
    for i in range(5):
        process = Process(target=worker_process, args=(f"Process-{i}", log_queue))
        processes.append(process)
        process.start()

    # 等待所有工作进程完成
    for process in processes:
        process.join()
    
    # 终止日志处理进程
    log_queue.put(None)
    log_process.join()