
详解Python实现多进程异步事件驱动引擎
本篇文章主要介绍了详解Python实现多进程异步事件驱动引擎,小编觉得挺不错的,现在分享给大家,也给大家做个参考。
多进程异步事件驱动逻辑
逻辑
code
# -*- coding: utf-8 -*-
'''
author: Jimmy
contact: 234390130@qq.com
file: eventEngine.py
time: 2017/8/25 上午10:06
description: 多进程异步事件驱动引擎
'''
__author__ = 'Jimmy'
from multiprocessing import Process, Queue
class EventEngine(object):
# 初始化事件事件驱动引擎
def __init__(self):
#保存事件列表
self.__eventQueue = Queue()
#引擎开关
self.__active = False
#事件处理字典{'event1': [handler1,handler2] , 'event2':[handler3, ...,handler4]}
self.__handlers = {}
#保存事件处理进程池
self.__processPool = []
#事件引擎主进程
self.__mainProcess = Process(target=self.__run)
#执行事件循环
def __run(self):
while self.__active:
#事件队列非空
if not self.__eventQueue.empty():
#获取队列中的事件 超时1秒
event = self.__eventQueue.get(block=True ,timeout=1)
#执行事件
self.__process(event)
else:
# print('无任何事件')
pass
#执行事件
def __process(self, event):
if event.type in self.__handlers:
for handler in self.__handlers[event.type]:
#开一个进程去异步处理
p = Process(target=handler, args=(event, ))
#保存到进程池
self.__processPool.append(p)
p.start()
#开启事件引擎
def start(self):
self.__active = True
self.__mainProcess.start()
#暂停事件引擎
def stop(self):
"""停止"""
# 将事件管理器设为停止
self.__active = False
# 等待事件处理进程退出
for p in self.__processPool:
p.join()
self.__mainProcess.join()
#终止事件引擎
def terminate(self):
self.__active = False
#终止所有事件处理进程
for p in self.__processPool:
p.terminate()
self.__mainProcess.join()
#注册事件
def register(self, type, handler):
"""注册事件处理函数监听"""
# 尝试获取该事件类型对应的处理函数列表,若无则创建
try:
handlerList = self.__handlers[type]
except KeyError:
handlerList = []
self.__handlers[type] = handlerList
# 若要注册的处理器不在该事件的处理器列表中,则注册该事件
if handler not in handlerList:
handlerList.append(handler)
def unregister(self, type, handler):
"""注销事件处理函数监听"""
# 尝试获取该事件类型对应的处理函数列表,若无则忽略该次注销请求
try:
handlerList = self.__handlers[type]
# 如果该函数存在于列表中,则移除
if handler in handlerList:
handlerList.remove(handler)
# 如果函数列表为空,则从引擎中移除该事件类型
if not handlerList:
del self.__handlers[type]
except KeyError:
pass
def sendEvent(self, event):
#发送事件 像队列里存入事件
self.__eventQueue.put(event)
class Event(object):
#事件对象
def __init__(self, type =None):
self.type = type
self.dict = {}
#测试
if __name__ == '__main__':
import time
EVENT_ARTICAL = "Event_Artical"
# 事件源 公众号
class PublicAccounts:
def __init__(self, eventManager):
self.__eventManager = eventManager
def writeNewArtical(self):
# 事件对象,写了新文章
event = Event(EVENT_ARTICAL)
event.dict["artical"] = u'如何写出更优雅的代码\n'
# 发送事件
self.__eventManager.sendEvent(event)
print(u'公众号发送新文章\n')
# 监听器 订阅者
class ListenerTypeOne:
def __init__(self, username):
self.__username = username
# 监听器的处理函数 读文章
def ReadArtical(self, event):
print(u'%s 收到新文章' % self.__username)
print(u'%s 正在阅读新文章内容:%s' % (self.__username, event.dict["artical"]))
class ListenerTypeTwo:
def __init__(self, username):
self.__username = username
# 监听器的处理函数 读文章
def ReadArtical(self, event):
print(u'%s 收到新文章 睡3秒再看' % self.__username)
time.sleep(3)
print(u'%s 正在阅读新文章内容:%s' % (self.__username, event.dict["artical"]))
def test():
listner1 = ListenerTypeOne("thinkroom") # 订阅者1
listner2 = ListenerTypeTwo("steve") # 订阅者2
ee = EventEngine()
# 绑定事件和监听器响应函数(新文章)
ee.register(EVENT_ARTICAL, listner1.ReadArtical)
ee.register(EVENT_ARTICAL, listner2.ReadArtical)
for i in range(0, 20):
listner3 = ListenerTypeOne("Jimmy") # 订阅者X
ee.register(EVENT_ARTICAL, listner3.ReadArtical)
ee.start()
#发送事件
publicAcc = PublicAccounts(ee)
publicAcc.writeNewArtical()
test()
以上就是本文的全部内容,希望对大家的学习有所帮助.
数据分析咨询请扫描二维码
若不方便扫码,搜微信号:CDAshujufenxi
Pandas 选取特定值所在行:6 类核心方法与实战指南 在使用 pandas 处理结构化数据时,“选取特定值所在的行” 是最高频的操作之 ...
2025-09-30球面卷积神经网络(SCNN) 为解决这一痛点,球面卷积神经网络(Spherical Convolutional Neural Network, SCNN) 应运而生。它通 ...
2025-09-30在企业日常运营中,“未来会怎样” 是决策者最关心的问题 —— 电商平台想知道 “下月销量能否达标”,金融机构想预判 “下周股 ...
2025-09-30Excel 能做聚类分析吗?基础方法、进阶技巧与场景边界 在数据分析领域,聚类分析是 “无监督学习” 的核心技术 —— 无需预设分 ...
2025-09-29XGBoost 决策树:原理、优化与工业级实战指南 在机器学习领域,决策树因 “可解释性强、处理非线性关系能力突出” 成为基础模型 ...
2025-09-29在标签体系的落地链路中,“设计标签逻辑” 只是第一步,真正让标签从 “纸上定义” 变为 “业务可用资产” 的关键,在于标签加 ...
2025-09-29在使用 Excel 数据透视表进行多维度数据汇总时,折叠功能是梳理数据层级的核心工具 —— 通过点击 “+/-” 符号可展开明细数据或 ...
2025-09-28在使用 Pandas 处理 CSV、TSV 等文本文件时,“引号” 是最容易引发格式混乱的 “隐形杀手”—— 比如字段中包含逗号(如 “北京 ...
2025-09-28在 CDA(Certified Data Analyst)数据分析师的技能工具箱中,数据查询语言(尤其是 SQL)是最基础、也最核心的 “武器”。无论 ...
2025-09-28Cox 模型时间依赖性检验:原理、方法与实战应用 在生存分析领域,Cox 比例风险模型(Cox Proportional Hazards Model)是分析 “ ...
2025-09-26检测因子类型的影响程度大小:评估标准、实战案例与管控策略 在检测分析领域(如环境监测、食品质量检测、工业产品合规性测试) ...
2025-09-26CDA 数据分析师:以数据库为基石,筑牢数据驱动的 “源头防线” 在数据驱动业务的链条中,“数据从哪里来” 是 CDA(Certified D ...
2025-09-26线性相关点分布的四种基本类型:特征、识别与实战应用 在数据分析与统计学中,“线性相关” 是描述两个数值变量间关联趋势的核心 ...
2025-09-25深度神经网络神经元个数确定指南:从原理到实战的科学路径 在深度神经网络(DNN)的设计中,“神经元个数” 是决定模型性能的关 ...
2025-09-25在企业数字化进程中,不少团队陷入 “指标困境”:仪表盘上堆砌着上百个指标,DAU、转化率、营收等数据实时跳动,却无法回答 “ ...
2025-09-25MySQL 服务器内存碎片:成因、检测与内存持续增长的解决策略 在 MySQL 运维中,“内存持续增长” 是常见且隐蔽的性能隐患 —— ...
2025-09-24人工智能重塑工程质量检测:核心应用、技术路径与实践案例 工程质量检测是保障建筑、市政、交通、水利等基础设施安全的 “最后一 ...
2025-09-24CDA 数据分析师:驾驭通用与场景指标,解锁数据驱动的精准路径 在数据驱动业务的实践中,指标是连接数据与决策的核心载体。但并 ...
2025-09-24在数据驱动的业务迭代中,AB 实验系统(负责验证业务优化效果)与业务系统(负责承载用户交互与核心流程)并非独立存在 —— 前 ...
2025-09-23CDA 业务数据分析:6 步闭环,让数据驱动业务落地 在企业数字化转型中,CDA(Certified Data Analyst)数据分析师的核心价值,并 ...
2025-09-23