我們首先下載pika,以及rabbitMQ,和ir語言,rabbitMQ是由ir語言編寫的
消息隊列的使用過程大概如下:
(1)客戶端連接到消息隊列服務器,打開一個channel。
channel:消息通道,在客戶端的每個連接里,可建立多個channel,每個channel代表一個會話任務。
(2)客戶端聲明一個exchange,并設置相關屬性。
Exchange:消息交換機,它指定消息按什么規則,路由到哪個隊列。
(3)客戶端聲明一個queue,并設置相關屬性。
Queue:消息隊列載體,每個消息都會被投入到一個或多個隊列。
(4)客戶端使用routing key,在exchange和queue之間建立好綁定關系。
Routing Key:路由關鍵字,exchange根據這個關鍵字進行消息投遞。
(5)客戶端投遞消息到exchange。
Exchange:消息交換機,它指定消息按什么規則,路由到哪個隊列。
接下來寫一個生產者:
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost'))
channel = connection.channel()#先通過socket建立一個實例,創建一個新的頻道
# 聲明queue
channel.queue_declare(queue='hello')# 注意此處需要聲明一個管道或者稱之為隊列,在此處出發消息 同時客戶端與服務端都需要
# n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
channel.basic_publish(exchange='',
routing_key='hello',#queue名字#路由鍵,寫明將消息發往哪個隊列,本例是將消息發往隊列pikamq
body='Hello World!')# 消息內容
print(" [x] Sent 'Hello World!'")# 當生產者發送完消息后,可選擇關閉連接
connection.close()
消費者:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost' ))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print('--->',ch,method,properties)
print(" [x] Received %r"%body)
channel.basic_consume(callback,
queue='hello',
#no_ack=True #此處有的代碼加了,但是python系統會自帶,同時加了之后,一旦等待時間過長,生產者發送的消息,無法轉移到另一個消費者中
)
channel.start_consuming()
另外有需要云服務器可以了解下創新互聯cdcxhl.cn,海內外云服務器15元起步,三天無理由+7*72小時售后在線,公司持有idc許可證,提供“云服務器、裸金屬服務器、高防服務器、香港服務器、美國服務器、虛擬主機、免備案服務器”等云主機租用服務以及企業上云的綜合解決方案,具有“安全穩定、簡單易用、服務可用性高、性價比高”等特點與優勢,專為企業上云打造定制,能夠滿足用戶豐富、多元化的應用場景需求。