Python 多线程多进程
  • 一个协程示例:
import asyncio

async def my_coroutine():
    # 这里是协程的代码
    await asyncio.sleep(1)
    print("Coroutine is done")

async def main():
    # 创建一个任务对象
    task = asyncio.create_task(my_coroutine())
    # 等待任务完成
    await task

# 运行主协程
asyncio.run(main())
  • 一个线程示例:
import threading

# 定义一个函数,用于在线程中执行
def thread_function(name):
    print(f"Thread {name}: starting")
    # 在这里执行一些操作
    print(f"Thread {name}: finishing")

# 创建线程的实例
thread = threading.Thread(target=thread_function, args=(1,))
# 启动线程
thread.start()
# 等待线程结束
thread.join()
print("Main thread is done")
  • 一个loop tip:
self.loop = asyncio.get_event_loop()
self.queue.set_loop(loop=self.loop)
await self.af.set_loop(loop=self.loop)
await self.adjust.set_loop(loop=self.loop)

为什么这里使用同一个loop ?
---
在 Python 的 `asyncio` 框架中事件循环event loop是处理异步任务的核心每个异步应用程序通常只需要一个事件循环所有的异步任务都应该在这个事件循环中运行使用同一个事件循环的好处包括
1. **统一管理**所有异步任务都在同一个事件循环中运行这样可以更方便地管理和监控它们
2. **避免冲突**如果为不同的任务创建多个事件循环可能会导致资源竞争或冲突尤其是在共享资源或执行I/O操作时
3. **保持上下文一致性**使用同一个事件循环可以确保所有任务都在相同的上下文中运行这有助于维护状态的一致性

在你提供的代码片段中这样做的目的是确保所有这些组件都在同一个事件循环中运行以便它们可以正确地协同工作

如果这些组件是异步的并且需要与事件循环交互那么它们需要知道使用哪个事件循环通过这种方式设置事件循环可以确保组件在执行异步操作时能够正确地在事件循环中注册和处理

总之使用同一个事件循环有助于保持代码的整洁和协调避免潜在的问题并确保整个应用程序的异步操作能够顺利地进行

多进程 #

多进程通信打日志 #


from multiprocessing import Process, Queue
from loguru import logger
import sys
import time

# 配置日志处理函数
def configure_logger(queue):
    logger.remove()  # 移除默认日志处理
    logger.add(sys.stdout, format="{message}")
    logger.add("logfile.log", format="{message}", rotation="1 MB")
    
    while True:
        try:
            record = queue.get()
            if record is None:
                break  # None 表示日志记录结束
            # 使用 log 方法记录日志
            logger.log(record["level"], record["message"])
        except Exception:
            logger.exception("Failed to process log record")

# 配置工作进程
def worker_process(name, queue):
    logger.remove()  # 移除默认日志处理
    def enqueue_log_message(message):
        # print('message.record["level"].name:', message.record["level"].name)
        # print('message.record["function"]:', message.record["function"])
        # print('message.record["line"]:', message.record["line"])
        # print('message.record.keys:', message.record.keys())
        queue.put({
            "line": message.record["line"], "function": message.record["function"],
            "level": message.record["level"].name, "message": message.strip()
        })
    logger.add(enqueue_log_message)
    
    for i in range(5):
        time.sleep(0.01)
        logger.info(f"Worker {name} is logging iteration {i}")

if __name__ == "__main__":
    log_queue = Queue()
    
    # 启动日志处理进程
    log_process = Process(target=configure_logger, args=(log_queue,))
    log_process.start()
    
    # 启动工作进程
    processes = []
    for i in range(5):
        process = Process(target=worker_process, args=(f"Process-{i}", log_queue))
        processes.append(process)
        process.start()

    # 等待所有工作进程完成
    for process in processes:
        process.join()
    
    # 终止日志处理进程
    log_queue.put(None)
    log_process.join()