python实现kafka收到消息然后在通过websockt发送给其他服务器的方法(异步调用并且收到其中一个的消息在转发)

发布时间 2023-05-22 14:44:55作者: 哦哟这个怎么搞
import asyncio
import threading
from kafka import KafkaConsumer
import websockets

connected = set()

async def handler(websocket, path):
    connected.add(websocket)
    while True:
        await asyncio.sleep(1)

def start_kafka():
    consumer = KafkaConsumer(
                '123456789abcd_device',
                bootstrap_servers="192.168.49.27" + ':9092',
            )
    for message in consumer:
        data = f"Received message from Kafka: {message}"
        print(data)
        for websocket in connected:
            asyncio.run_coroutine_threadsafe(websocket.send(data), loop)

def start_websocket():
    global loop
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    start_server = websockets.serve(handler, "localhost", 8000)
    loop.run_until_complete(start_server)
    loop.run_forever()

def start_kafka_and_websocket():
    t1 = threading.Thread(target=start_kafka)
    t2 = threading.Thread(target=start_websocket)

    t1.start()
    t2.start()

    t1.join()
    t2.join()

start_kafka_and_websocket()