はじめに
前回はRabbitMQでメッセージキューイングシステムを構築し非同期処理を実装しました。今回は前回のプログラムを少し改修して並列処理を実装します。
並列処理用にプログラム改修
差し当たっては結果を見やすくするために送信プログラムのメッセージを「Hello World!」からコマンドライン引数に変更します。
$ diff sender.py.bk sender.py
2a3,5
> import sys
>
> args = sys.argv
9,10c12,13
< channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
< print(" [x] Sent 'Hello World!'")
---
> channel.basic_publish(exchange='', routing_key='hello', body=args[1])
> print(" [x] Sent 'arg'")
一秒間掛かる処理を一秒待つことで再現したいと思います。そのため受信プログラムにスリープ関数を付け足します。
$ diff reciver.py.bk reciver.py
2a3
> import time
10a12
> time.sleep(1)
改修はこれで以上です。送信用に1つと受信用に2つで3つシェルを立ち上げて試験を行ってみます。
$ for l in {1..10}; do python3.6 sender.py $l; done
[x] Sent 'arg'
[x] Sent 'arg'
[x] Sent 'arg'
[x] Sent 'arg'
[x] Sent 'arg'
[x] Sent 'arg'
[x] Sent 'arg'
[x] Sent 'arg'
[x] Sent 'arg'
[x] Sent 'arg'
$ python3.6 reciver.py
[*] Waiting for messages. To exit press CTRL+C
[x] Received b'1'
[x] Received b'3'
[x] Received b'5'
[x] Received b'7'
[x] Received b'9'
$ python3.6 reciver.py
[*] Waiting for messages. To exit press CTRL+C
[x] Received b'2'
[x] Received b'4'
[x] Received b'6'
[x] Received b'8'
[x] Received b'10'
この通り並列処理されていることが分かると思います。この次はリモートホストからキューを取れるようにします。
マルチホストでの並列処理
マルチホスト用のインスタンスを立て、RabbitMQ環境を構築した後リモートホスト用の受信プログラムを作成する。
#!/usr/bin/env python
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(host='【Producer側ホスト名】'))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
time.sleep(1)
channel.basic_consume('hello', callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
Producer側のホストに開放するポートを追加します。
- 5672/tcp
- クライアント接続をリッスンする
- 5671/tcp
- クライアント接続をリッスンする(TLS暗号化)
準備は以上です。先程と同じ様に送信用に1つと受信用に2つで3つシェルを立ち上げてプログラムをそれぞれ動かしてください。
$ for l in {1..10}; do python3.6 sender.py $l; done
[x] Sent 'arg'
[x] Sent 'arg'
[x] Sent 'arg'
[x] Sent 'arg'
[x] Sent 'arg'
[x] Sent 'arg'
[x] Sent 'arg'
[x] Sent 'arg'
[x] Sent 'arg'
[x] Sent 'arg'
$ python3.6 reciver.py
[*] Waiting for messages. To exit press CTRL+C
[x] Received b'1'
[x] Received b'3'
[x] Received b'5'
[x] Received b'7'
[x] Received b'9'
$ python3.6 reciver2.py
[*] Waiting for messages. To exit press CTRL+C
[x] Received b'2'
[x] Received b'4'
[x] Received b'6'
[x] Received b'8'
[x] Received b'10'
並列で処理されていれば成功です。これで計算機クラスタの様な物が完成しました。