S2JMSを使ってみる〜インバウンド通信編〜

今日も引き続きSun Generic Resource Adapter for JMSと IBM WebSphere MQの組合わせで。
受信データをある項目でスライシングして並行処理をし、かつスライスされた各ブロックでは順番を保障するというアプリケーションを実現してみます。

リソースアダプタの設定

アウトバウンド同様WmqResourceAdapterDeployerを使用。上記のようなアプリケーションを実現するにはセッションプールは1にしてリソースアダプタを並行処理する数だけ用意するしかなさそう。

メッセージエンドポイントの設定

S2JMS - コンフィグレーション - インバウンド情報に出ている例の通りで大体良さそうですが、Topicを使う場合は「キュー名」の部分の設定をBaseQueueNameからBaseTopicNameに変更する必要があります。またセレクタの設定は以下のような感じでOKでした。

<initMethod name="setProperty">
    <arg>"messageSelector"</arg>
    <arg>"key=value"</arg>
</initMethod>

実行結果

リソースアダプタとメッセージエンドポイントの設定ファイルを4つ作ってそれをjms.diconでインクルードして実行してみました。コードは以下のような感じ

送信側
private JmsService jmsService;

public void testSend(){
	SingletonS2ContainerFactory.init();
	final S2Container container = SingletonS2ContainerFactory
			.getContainer();
	jmsService = (JmsService) container.getComponent(JmsService.class);

	for (int i = 0; i < 100; i++) {
		jmsService.hoge(i + 1);
	}
}
private MessageSender messageSender;

public void hoge(int hoge) {
	final Map<String, Object> payload = new HashMap<String, Object>();
	final Map<String, Object> property = new HashMap<String, Object>();
	payload.put("hoge", "" + hoge);
	payload.put("fuga", "ふが");
	property.put("intKey", Integer.valueOf(hoge % 4));
	messageSender.send("" + hoge, property);
}
受信側
@JMSProperty
private int intKey;

@JMSPayload
private String hoge;

@JMSPayload
private String fuga;

public void onMessage() {
	final Logger logger = Logger.getLogger(this.getClass());
	logger.info("メッセージ:" + hoge + "|プロパティ:" + intKey);
}

これを実行すると

2007/08/03 20:15:53 com.sun.genericra.inbound.EndpointConsumer _start
情報: Generic resource adapter started consumption
2007/08/03 20:15:53 com.sun.genericra.inbound.EndpointConsumer _start
情報: Generic resource adapter started consumption
2007/08/03 20:15:54 com.sun.genericra.inbound.EndpointConsumer _start
情報: Generic resource adapter started consumption
2007/08/03 20:15:55 com.sun.genericra.inbound.EndpointConsumer _start
情報: Generic resource adapter started consumption
INFO 2007-08-03 20:15:56,285 [main] Running on [ENV]product, [DEPLOY MODE]Cool Deploy
INFO 2007-08-03 20:15:56,879 [pool-1-thread-1] メッセージ:1|プロパティ:1
INFO 2007-08-03 20:15:56,942 [pool-1-thread-1] メッセージ:5|プロパティ:1
INFO 2007-08-03 20:15:57,020 [pool-1-thread-1] メッセージ:9|プロパティ:1
・・・(省略)・・・
INFO 2007-08-03 20:15:58,973 [pool-1-thread-1] メッセージ:89|プロパティ:1
INFO 2007-08-03 20:15:59,051 [pool-1-thread-1] メッセージ:93|プロパティ:1
INFO 2007-08-03 20:15:59,301 [pool-1-thread-1] メッセージ:97|プロパティ:1

と1ブロックだけ処理して終了してしまいました。どうやらmainスレッドが終わるとリスナーも終了するようなのでデバッガでブレークポイントを置いて全部処理するまで待ってみます。

・・・(省略)・・・
INFO 2007-08-03 20:20:42,082 [pool-1-thread-1] メッセージ:97|プロパティ:1
INFO 2007-08-03 20:20:47,879 [pool-2-thread-1] メッセージ:2|プロパティ:2
INFO 2007-08-03 20:20:48,067 [pool-3-thread-1] メッセージ:3|プロパティ:3
INFO 2007-08-03 20:20:48,176 [pool-4-thread-1] メッセージ:4|プロパティ:0
・・・(省略)・・・

一応全部処理できましたが、なぜかpool-1-thread-1スレッドの処理が終わってから4秒後くらいに他のスレッドの処理が始まっています。今度は送信処理開始前にもブレークポイントを置いて5秒ほど待ってから開始してみると

・・・(省略)・・・
INFO 2007-08-03 20:25:38,817 [pool-1-thread-1] メッセージ:1|プロパティ:1
INFO 2007-08-03 20:25:38,973 [pool-2-thread-1] メッセージ:2|プロパティ:2
INFO 2007-08-03 20:25:39,473 [pool-3-thread-1] メッセージ:3|プロパティ:3
INFO 2007-08-03 20:25:39,879 [pool-4-thread-1] メッセージ:4|プロパティ:0
・・・(省略)・・・

ちゃんと同時に処理が始まりました。Consumerは起動しているのに何で処理開始が遅いのでしょう?しかも2〜4個目は同時に処理が始まるので非常になぞです。