Scalaでの非同期プログラミング中にコンテキストを維持する問題

プロジェクトのある時点で、操作の進行状況を追跡し、それに関する情報を取得または保存するという疑問が生じます。 このために、操作コンテキスト、たとえばクライアントセッションのコンテキストが可能な限り機能します。 これを比較的簡単に行う方法に興味がある場合は、猫をお願いします。



Javaの世界では、多くの場合(常にではありませんが)、各操作は独自のスレッドで実行されます。 そして、ここではすべてが非常に単純であることがわかります。ThreadLocalオブジェクトを使用して、操作中にいつでも取得できます。



class Context { public static final ThreadLocal<Context> global = new ThreadLocal<Context>; } //-     Context context = new Context(...); Context.global.set(context); try { someService.someMethod(); } finally { Context.global.set(null); }
      
      





scalaでは、それほど単純ではないことが多く、操作中、非常に非同期なアプリケーションなどでフローが繰り返し変化する場合があります。 また、 ThreadLocalを使用したメソッド適切でなくなりました(Javaのスレッド切り替えの場合はもちろん)。



頭に浮かぶかもしれない最初のものは、暗黙の関数引数を通してコンテキストを渡すことです。



 def foo(bar: Bar)(implicit context: Context)
      
      





しかし、これはサービスプロトコルを混乱させます。 少し頭を痛めた、かなり単純なアイデアが浮上しました。サービスオブジェクトにコンテキストをアタッチし、関数が呼び出されたときに内部サービスにそれを配布するというものです。



コンテキストが次のようになっているとしましょう。



 //data -       class Context(val operationId: String, val data: TrieMap[String, String] = TrieMap.empty)
      
      





コンテキスト依存オブジェクトをマークするトレイトを作成しましょう:



 trait ContextualObject { protected def context: Option[Context] } //,     trait ChangeableContextualObject[T <: ContextualObject] extends ContextualObject { def withContext(ctx: Option[Context]): T } //    trait EmptyContext { _: ContextualObject => override protected val context: Option[Context] = None }
      
      





次に、サービスと実装を発表します。



 //,       trait ServiceA extends ChangeableContextualObject[ServiceA] { def someSimpleOperation: Int def someLongOperation(implicit executionContext: ExecutionContext): Future[Int] } trait ServiceAImpl extends ServiceA { override def someSimpleOperation: Int = 1 override def someLongOperation(implicit executionContext: ExecutionContext): Future[Int] = { Future(someSimpleOperation) .map { res => // -    ,    context.foreach(_.data.put("ServiceA.step1", res.toString)) res * Random.nextInt(10) } .map { res => context.foreach(_.data.put("ServiceA.step2", res.toString)) res - Random.nextInt(5) } .andThen { case Success(res) => context.foreach(_.data.put("ServiceA.step3", res.toString)) } } //      override def withContext(ctx: Option[Context]): ServiceA = new ServiceAImpl { ctx.foreach(_.data.put("ServiceA.withContext", "true")) override protected def context: Option[Context] = ctx } } object ServiceAImpl { def apply(): ServiceAImpl = new ServiceAImpl with EmptyContext }
      
      





そして、最初のサービスが使用する2番目のサービス:



 trait ServiceB extends ChangeableContextualObject[ServiceB] { def someOperationWithoutServiceA: Int def someOperationWithServiceA(implicit executionContext: ExecutionContext): Future[Boolean] } /** *         : *            ? *     EmptyContext   , *       withContext. * ,  ,      cake pattern    */ trait ServiceBImpl extends ServiceB { self => protected def serviceA: ServiceA override def someOperationWithoutServiceA: Int = 1 override def someOperationWithServiceA(implicit executionContext: ExecutionContext): Future[Boolean] = { serviceA.someLongOperation.map { case res if res % 2 == 0 => context.foreach(_.data.put("ServiceB.res", "even")) true case res => context.foreach(_.data.put("ServiceB.res", "odd")) false } } override def withContext(ctx: Option[Context]): ServiceB = new ServiceBImpl { ctx.foreach(_.data.put("ServiceB.withContext", "true")) override protected val context: Option[Context] = ctx // ,  ,        //      lazy val, //        ,     . //       override protected lazy val serviceA: ServiceA = self.serviceA.withContext(ctx) } } object ServiceBImpl { //    -        , //    ,        . //      : // class Builder(val serviceA: ServiceA) extends ServiceBImpl with EmptyContext //    : // new ServiceBImpl.Builder(serviceA) // , ,   ,     . def apply(a: ServiceA): ServiceBImpl = new ServiceBImpl with EmptyContext { //         val override protected val serviceA: ServiceA = a } }
      
      





その結果、呼び出しの場所で次のコードを取得します。



 val context = new Context("opId") val serviceBWithContext = serviceB.withContext(Some(context)) serviceBWithContext.someOperationWithoutServiceA context.data.get("ServiceB.withContext") // Some("true") context.data.get("ServiceA.withContext") // None serviceBWithContext.someOperationWithServiceA.andThen { case _ => context.data.get("ServiceA.withContext") // Some("true") context.data.get("ServiceA.step1") // Some("1") }
      
      





すべてが非常に単純です-したがって、操作のコースは同じコンテキストを持ちます。 ただし、このための実際のアプリケーションを見つける必要があります。 たとえば、運用中に重要な情報を記録しましたが、この情報を誓約したいと考えています。 最も簡単なオプションは、コンテキストごとにロガーを作成し、メッセージにログを書き込む際に、この情報をロガーに割り当てることでした。 ただし、コードの外部(サードパーティライブラリなど)で発生するログの問題があります。



コンテキストをコード外で使用するには、コンテキストでThreadLocalを作成します。



 object Context { val global: ThreadLocal[Option[Context]] = ThreadLocal.withInitial[Option[Context]](() => None) //    def runWith[T](context: Context)(operation: => T): T = { runWith(Some(context))(operation) } //    def runWith[T](context: Option[Context])(operation: => T): T = { val old = global.get() global.set(context) //         try operation finally global.set(old) } }
      
      





たとえば、ロギングにlogback-classicライブラリを使用する場合、これらのパラメーターを記録するレイアウトを作成できます。



可能な実装
 class OperationContextLayout extends LayoutBase[ILoggingEvent] { private val separator: String = System.getProperty("line.separator") override def doLayout(event: ILoggingEvent): String = { val sb = new StringBuilder(256) sb.append(event.getFormattedMessage) .append(separator) appendContextParams(sb) appendStack(event, sb) sb.toString() } private def appendContextParams(sb: StringBuilder): Unit = { Context.global.get().foreach { ctx => sb.append("operationId=") .append(ctx.operationId) ctx.data.readOnlySnapshot().foreach { case (key, value) => sb.append(" ").append(key).append("=").append(value) } sb.append(separator) } } private def appendStack(event: ILoggingEvent, sb: StringBuilder): Unit = { if (event.getThrowableProxy != null) { val converter = new ThrowableProxyConverter converter.setOptionList(List("full").asJava) converter.start() sb.append() } } }
      
      







可能な構成
 <configuration> <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> <encoder class="ch.qos.logback.core.encoder.LayoutWrappingEncoder"> <layout class="operation.context.logging.OperationContextLayout" /> </encoder> </appender> <root level="debug"> <appender-ref ref="STDOUT" /> </root> </configuration>
      
      







そして、何かを誓うようにしてください:



  def runWithoutA(): Unit = { val context = Some(createContext()) val res = serviceB.withContext(context).someOperationWithoutServiceA Context.runWith(context) { // Result of someOperationWithoutServiceA: '1' // operationId=GPapC6JKmY ServiceB.withContext=true logger.info(s"Result of someOperationWithoutServiceA: '$res'") } }
      
      





  def runWithA(): Future[_] = { val context = Some(createContext()) serviceB.withContext(context).someOperationWithServiceA.andThen { case _ => Context.runWith(context) { // someOperationWithServiceA completed // operationId=XU1SGXPq1N ServiceB.res=even ServiceA.withContext=true ServiceB.withContext=true ServiceA.step1=1 ServiceA.step2=7 ServiceA.step3=4 logger.info("someOperationWithServiceA completed") } } }
      
      





そして疑問が残りました: ExecutionContextで実行される外部コードはどうですか? しかし、誰も彼のためにラッパーを書くことを気にしません:



ラッパーの可能な実装
 class ContextualExecutionContext(context: Option[Context], executor: ExecutionContext) extends ExecutionContext { override def execute(runnable: Runnable): Unit = executor.execute(() => { Context.runWith(context)(runnable.run()) }) override def reportFailure(cause: Throwable): Unit = { Context.runWith(context)(executor.reportFailure(cause)) } } object ContextualExecutionContext { implicit class ContextualExecutionContextOps(val executor: ExecutionContext) extends AnyVal { def withContext(context: Option[Context]): ContextualExecutionContext = new ContextualExecutionContext(context, executor) } }
      
      







外部システムの可能な実装
 class SomeExternalObject { val logger: Logger = LoggerFactory.getLogger(classOf[SomeExternalObject]) def externalCall(implicit executionContext: ExecutionContext): Future[Int] = { Future(1).andThen { case Success(res) => logger.debug(s"external res $res") } } }
      
      





ExecutionContextで呼び出しを試行してみましょう。





  def runExternal(): Future[_] = { val context = Some(createContext()) implicit val executor = global.withContext(context) // external res 1 // operationId=8Hf277SV7B someExternalObject.externalCall }
      
      





それが全体のアイデアです。 実際、コンテキストの使用はロギングに限定されません。 このコンテキストには何でも保存できます。 たとえば、操作中にすべてのサービスが同じデータを処理する場合、いくつかの状態のキャスト。 などなど。



アクターを通信するときにコンテキストの監視を実装する必要がある場合は、コメントを書いて、記事を補足します。 他の実装についてのアイデアがあり、コメントも書いてください。読むのは面白いでしょう。



PS記事github.com/eld0727/scala-operation-contextで使用されているプロジェクトのソースコード。

PPSこのアプローチは、匿名クラスを作成できる他の言語にも適用できると確信しており、これはscalaでの可能な実装にすぎません。



All Articles