[基本機能]キュー

アプリケーション開発において、非同期処理が必要になるケースがあります。非同期処理では、処理を依頼する側と処理を実行する側が存在し、その間にはキューが必要となります。

キュー機能を抽象化したインタフェースがQueueです。

Queue経由で渡された処理依頼を、Queueの後ろで待ち受け、非同期で処理を行う機能が必要になります。
そのような非同期処理コンテナ機能を抽象化したインタフェースがQueueHandlerContainerです。
また、非同期処理コンテナ上で「任意の処理を行う」機能を抽象化したインタフェースが、QueueHandlerです。

Queueの実装には、性能分散のために、分流を行うものが存在します。
投入されたオブジェクトを一定のルールで分流を行う必要があります。
このようなQueueの分流を抽象化したインタフェースがDistributedQueueSelectorです。
このインタフェースは、アプリケーション向けではなく、分流Queue実装向けです。

関連するパッケージは、以下です。

アプリケーション向けインタフェース Queue

アプリケーション向けインタフェースQueueを使った簡単なアプリケーションのサンプルを示します。

  1. import jp.ossc.nimbus.core.ServiceManagerFactory;
  2. import jp.ossc.nimbus.service.queue.Queue;
  3. // Queueを取得
  4. final Queue queue = (Queue)ServiceManagerFactory.getServiceObject("Queue");
  5. // Queueから引き抜くスレッドを作成する
  6. Thread getterThread = new Thread(new Runnable(){
  7. public void run(){
  8. for(int i = 0; i < 10; i++){
  9. Object obj = queue.get();
  10. System.out.println(obj);
  11. }
  12. }
  13. }, "Getter Thread");
  14. // Queueから引き抜くスレッドを開始する
  15. getterThread.start();
  16. // Queueに詰める
  17. for(int i = 1; i <= 10; i++){
  18. queue.push(new Integer(i));
  19. }
  20. // Queueから引き抜き終わるまで待機する
  21. getterThread.join();

実装サービスの一覧は以下のとおりです。

実装サービス実装概要
jp.ossc.nimbus.service.queue.DefaultQueueServiceQueueのデフォルト実装サービス
jp.ossc.nimbus.service.queue.DelayQueueService引き抜きを遅延させるQueue実装サービス
jp.ossc.nimbus.service.queue.DistributedQueueService内部で分流して性能分散を行うQueue実装サービス
jp.ossc.nimbus.service.queue.ThreadLocalQueueServiceスレッド単位でエントリを投入/引き抜きできるQueue実装サービス
jp.ossc.nimbus.service.queue.SharedQueueService複数のJVM間でエントリを共有して投入/引き抜きできるQueue実装サービス

アプリケーション向けインタフェース QueueHandlerContainer

アプリケーション向けインタフェースQueueHandlerContainerは、Queueインタフェースを継承しています。
但し、投入専用となっており、外部から引き抜く事はできません。引き抜きは、QueueHandlerContainer内部のスレッドによって行われ、QueueHandlerContainerに設定されたQueueHandlerによって処理されます。

以下に、QueueHandlerContainerを使った非同期処理を行うアプリケーションのサンプルを示します。

  1. import jp.ossc.nimbus.core.ServiceManagerFactory;
  2. import jp.ossc.nimbus.service.queue.QueueHandlerContainer;
  3. import jp.ossc.nimbus.service.queue.AsynchContext;
  4. // QueueHandlerContainerを取得
  5. final QueueHandlerContainer container = (QueueHandlerContainer)ServiceManagerFactory.getServiceObject("QueueHandlerContainer");
  6. // QueueHandlerContainerに非同期処理要求(応答なし)を投入する
  7. for(int i = 1; i <= 10; i++){
  8. container.push(new AsynchContext(new Integer(i)));
  9. }
  10. // 応答Queueを取得
  11. // 通常、処理毎に使い捨ての応答Queueが必要なので、サービス定義でinstance="factory"を宣言しておく
  12. final Queue responseQueue = (Queue)ServiceManagerFactory.getServiceObject("ResponseQueue");
  13. // QueueHandlerContainerに非同期処理要求(応答あり)を投入する
  14. for(int i = 1; i <= 10; i++){
  15. container.push(new AsynchContext(new Integer(i), responseQueue));
  16. }
  17. // 要求した回数分、応答待ちをする
  18. for(int i = 1; i <= 10; i++){
  19. // 各要求毎に1秒まで応答待ちをする
  20. // タイムアウトした場合は、nullが返る
  21. AsynchContext context = (AsynchContext)responseQueue.get(1000l);
  22. if(context != null){
  23. // 非同期処理で例外が発生していないかチェックする
  24. // 例外が発生している場合は、発生した例外がthrowされる
  25. context.checkError();
  26. // 応答を取得する
  27. Object output = context.getOutput();
  28. System.out.println(output);
  29. }
  30. }

実装サービスの一覧は以下のとおりです。

実装サービス実装概要
jp.ossc.nimbus.service.queue.QueueHandlerContainerServiceQueueHandlerContainerのデフォルト実装サービス
jp.ossc.nimbus.service.queue.DistributedQueueHandlerContainerService分流して性能分散を行うQueueHandlerContainerの実装サービス

QueueHandlerContainer向けインタフェース QueueHandler

QueueHandlerContainer向けインタフェースQueueHandlerは、QueueHandlerContainerに投入された非同期処理要求を処理します。

以下に、非同期処理要求を処理するQueueHandlerの実装例を示します。

  1. import jp.ossc.nimbus.core.ServiceBase;
  2. import jp.ossc.nimbus.service.queue.QueueHandler;
  3. public class SampleQueueHandlerService extends ServiceBase implements QueueHandler{
  4. public void handleDequeuedObject(Object obj) throws Throwable{
  5. // Queueからエントリを取り出すと、呼び出される
  6. if(obj == null){
  7. return;
  8. }
  9. AsynchContext context = (AsynchContext)obj;
  10. System.out.println(Thread.currentThread().getName() + " : " + context.getInput());
  11. // 応答を返す場合は、AsynchContextに応答を設定して、応答QueueにAsynchContextを投入する
  12. if(context.getResponseQueue() != null){
  13. context.setOutput(Thread.currentThread().getName());
  14. context.getResponseQueue().push(context);
  15. }
  16. }
  17. public boolean handleError(Object obj, Throwable th) throws Throwable{
  18. // handleDequeuedObject(Object)で例外がthrowされると呼び出される
  19. getLogger().write("WARN", "Error occurred in " + Thread.currentThread().getName() + " : " + obj, th);
  20. // trueを返すと、再度handleDequeuedObject(Object)が呼び出される
  21. // falseを返すと、終了する
  22. return true;
  23. }
  24. public void handleRetryOver(Object obj, Throwable th) throws Throwable{
  25. // handleDequeuedObject(Object)で例外をthrowされ、リトライ回数を越えていると呼び出される
  26. AsynchContext context = (AsynchContext)obj;
  27. getLogger().write("ERROR", "Fatal error occurred in " + Thread.currentThread().getName() + " : " + obj, th);
  28. // 応答を返す場合は、AsynchContextに例外を設定して、応答QueueにAsynchContextを投入する
  29. if(context.getResponseQueue() != null){
  30. context.setThrowable(th);
  31. context.getResponseQueue().push(context);
  32. }
  33. }
  34. }

実装サービスの一覧は以下のとおりです。

実装サービス実装概要
jp.ossc.nimbus.service.queue.BeanFlowInvokerCallQueueHandlerService非同期処理を業務フローで処理する

分流Queue向けインタフェース DistributedQueueSelector

DistributedQueueSelectorは、分流Queue実装向けのインタフェースです。

以下の分流Queue実装から使用されます。

実装サービスの一覧は以下のとおりです。

実装サービス実装概要
jp.ossc.nimbus.service.queue.AbstractDistributedQueueSelectorServiceDistributedQueueSelectorの抽象サービス
jp.ossc.nimbus.service.queue.SimpleDistributedQueueSelectorServiceDistributedQueueSelectorの簡易実装サービス
jp.ossc.nimbus.service.queue.InputPropertyDistributedQueueSelectorService投入されたオブジェクトのプロパティ値をキーにして分流するDistributedQueueSelector実装サービス

サンプルは、以下。