我想使用 Azure Eventhub 作为中间件消息队列。我现在基本上是以列表格式发送模拟数据并以字符串格式接收它。
如您所见,这里只有几种数据格式可以转换。我希望数据的格式是包含浮点数据的列表。
这是我现在正在处理的代码。我试图操纵下面的行,以便将浮点形式的每个事件数据累积在列表中。
LIST.append(event_data.message._body)
这是我的代码主体。
CONSUMER_GROUP = "$default" OFFSET = Offset("-1") PARTITION = "0" total = 0 last_sn = -1 last_offset = "-1" client = EventHubClient(ADDRESS, debug=False, username=USER, password=KEY) i=1 LIST=[] try: receiver = client.add_receiver(CONSUMER_GROUP, PARTITION, prefetch=5000, offset=OFFSET) client.run() start_time = time.time() batch = receiver.receive(timeout=None) while batch: for event_data in batch[-100:]: last_offset = event_data.offset last_sn = event_data.sequence_number print("Received: {}, {}".format(i, last_sn)) LIST.append(event_data.message._body) i += 1 total += 1 batch = receiver.receive(timeout=5000) end_time = time.time() client.stop() run_time = end_time - start_time print("Received {} messages in {} seconds".format(total, run_time)) except KeyboardInterrupt: pass finally: client.stop()
您可以在这里找到 eventData 类
===================================更新 =====================================
结果显示‘Message [abc ....]’,我认为Message被设置为写入,所以我想从结果格式中删除‘Message’这个词。
“sender.py”如下:
from azure.eventhub import EventHubClient, Sender, EventData import time import logging import numpy as np logger = logging.getLogger("azure") ADDRESS = "" USER = "RootManageSharedAccessKey" KEY = "" try: if not ADDRESS: raise ValueError("No EventHubs URL supplied.") # Create Event Hubs client client = EventHubClient(ADDRESS, debug=False, username=USER, password=KEY) sender = client.add_sender(partition="0") client.run() forging2 = lambda x: (np.exp(-(0.1*x-6)**2+3) + np.exp(-(0.1*x-4)**2+4))*1.4 x_value = np.arange(100) try: start_time = time.time() for i in range(100): y_value1 = forging2(x_value) + np.random.normal(0,1,len(x_value))*3 y_value1 = np.asarray(y_value1) print("Sending message: {}, {}".format(i, y_value1)) message = y_value1 sender.send(EventData(message)) time.sleep(0.35) except: raise finally: end_time = time.time() client.stop() run_time = end_time - start_time logger.info("Runtime: {} seconds".format(run_time)) except KeyboardInterrupt: pass
下面的修复代码有效:
logger = logging.getLogger("azure") ADDRESS = "" USER = "RootManageSharedAccessKey" KEY = "" try: if not ADDRESS: raise ValueError("No EventHubs URL supplied.") # Create Event Hubs client client = EventHubClient(ADDRESS, debug=False, username=USER, password=KEY) sender = client.add_sender(partition="0") client.run() forging2 = lambda x: (np.exp(-(0.1*x-6)**2+3) + np.exp(-(0.1*x-4)**2+4))*1.4 x_value = np.arange(100) try: start_time = time.time() for i in range(100000): y_value1 = forging1(x_value) + np.random.normal(0,1,len(x_value))*3 y_value1 = np.asarray(y_value1) print("Sending message: {}, {}".format(i, y_value1)) message = "{}".format(y_value1) sender.send(EventData(message)) time.sleep(0.35) except: raise finally: end_time = time.time() client.stop() run_time = end_time - start_time logger.info("Runtime: {} seconds".format(run_time)) except KeyboardInterrupt: pass
这样,我就能收到没有“消息”的消息了。