Problems of batch processing of requests and their solutions (part 2)



This is a continuation of the article "Problems of batch processing of requests and their solutions . " It is recommended that you first familiarize yourself with the first part, as it describes in detail the essence of the problem and some approaches to its solution. Here we look at other methods.



Content
Brief repetition of the task

Increase packages

Reverse engineering

Collect all requests

Parallelism

Problems

findings

Business heuristics

Solving a contract problem

Search for useful limitations

Heuristic Errors

findings

DDD-style units

Logic of escalating queries inside an aggregate

Logic for aggregating queries outside the aggregate

findings

Proxy and double call

Solving a contract problem

findings

Conclusion


Brief repetition of the task



There is a chat to coordinate the document with a predefined set of participants. Messages contain text and files. And, as in regular chats, messages can be replies and forwards.



Chat Message Model
data class ChatMessage (

// nullable persist

val id : Long ? = null ,

/** */

val author : UserReference ,

/** */

val message : String ,

/** */

// - JPA+ null,

val files : List < FileReference > ? = null ,

/** , */

val replyTo : ChatMessage ? = null ,

/** , */

val forwardFrom : ChatMessage ? = null

)






Through Dependency Injection, we can implement the following external services:
interface ChatMessageRepository {

fun findLast ( n : Int ) : List < ChatMessage >

}



data class FileHeadRemote (

val id : FileReference ,

val name : String

)



interface FileRemoteApi {

fun getHeadById ( id : FileReference ) : FileHeadRemote

   fun getHeadsByIds ( id : Set < FileReference > ) : Set < FileHeadRemote >

fun getHeadsByIds ( id : List < FileReference > ) : List < FileHeadRemote >

}



data class UserRemote (

val id : UserReference ,

val name : String

)



interface UserRemoteApi {

fun getUserById ( id : UserReference ) : UserRemote

   fun getUsersByIds ( id : Set < UserReference > ) : Set < UserRemote >

fun getUsersByIds ( id : List < UserReference > ) : List < UserRemote >

}






We need to implement a REST controller:



interface ChatRestApi {

fun getLast ( n : Int ) : List < ChatMessageUI >

}








Where:
/** */

data class ReferenceUI (

/** url */

val ref : String ,

/** */

val name : String

)



data class ChatMessageUI (

val id : Long ,

/** */

val author : ReferenceUI ,

/** */

val message : String ,

/** */

val files : List < ReferenceUI >,

/** , */

val replyTo : ChatMessageUI ? = null ,

/** , */

val forwardFrom : ChatMessageUI ? = null

)







In the previous part, we looked at the naive implementation of a service using batch processing and several ways to speed it up. These methods are very simple, but their application does not provide sufficiently good performance.



Increase packages



The main problem of naive solutions was the small size of the packages.



In order to group calls into larger packets, you need to somehow accumulate requests. This line does not imply the accumulation of requests:



author = userRepository . getUserById ( author ) . toFrontReference () ,







Now in our runtime there is no special place to store the list of users - it is being formed gradually. This will have to change.



First you need to separate the logic of data acquisition from mapping in the ChatMessage.toFrontModel method:



private fun ChatMessage . toFrontModel (

getUser : ( UserReference ) -> UserRemote ,

getFile : ( FileReference ) -> FileHeadRemote ,

serializeMessage : ( ChatMessage ) -> ChatMessageUI

) : ChatMessageUI =

ChatMessageUI (

id = id ?: throw IllegalStateException ( " $ this must be persisted" ) ,

author = getUser ( author ) . toFrontReference () ,

message = message ,

files = files ?. let {

       it . map ( getFile ) . map { it . toFrontReference () }

} ?: listOf () ,

forwardFrom = forwardFrom ?. let ( serializeMessage ) ,

replyTo = replyTo ?. let ( serializeMessage )

)








It turns out that this function depends only on three external functions (and not on entire classes, as it was at the beginning).



After such a rework, the body of the function did not become less clear, and the contract became tougher (this has both pros and cons).



In fact, you can not do such a narrowing of the contract and leave dependencies on interfaces. The main thing is that there is definitely nothing superfluous in them, since we will need to do alternative implementations.



Since the serializeMessage function is similar to a recursive function, this can be done as an explicit recursion in the first step of refactoring:



class ChatRestController (

private val messageRepository : ChatMessageRepository ,

private val userRepository : UserRemoteApi ,

private val fileRepository : FileRemoteApi

) : ChatRestApi {

override fun getLast ( n : Int ) =

messageRepository . findLast ( n )

. map { it . toFrontModel () }








I made a stub for the toFrontModel method, which so far works exactly the same as in our first naive implementation (the implementation of all three external functions remains the same).



private fun ChatMessage . toFrontModel () : ChatMessageUI =

toFrontModel (

getUser = userRepository ::getUserById ,

getFile = fileRepository ::getHeadById ,

serializeMessage = { it . toFrontModel () }

   )








But we need to make sure that the functions getUser, getFile and serializeMessage work efficiently, that is, they send requests to the corresponding services in packages of the right sizes (theoretically, this size can be different for each service) or generally one request per service, if unlimited requests are allowed.



The easiest way to achieve this grouping is if we have all the necessary queries on hand before starting processing. To do this, before calling toFrontModel, you need to collect all the necessary links, do batch processing, and then use the result.



You can also try the scheme with the accumulation of requests and their gradual execution. However, such schemes will require asynchronous execution, but for now we will focus on synchronous ones.



So, in order to start using batch processing, one way or another we will have to find out in advance as many requests as possible (preferably all) that we will have to make. If we are talking about a REST controller, it would be nice to combine requests for each service throughout the session.



Group all calls
In some situations, all the data that is necessary within the session can be obtained immediately and will not cause problems with resources either from the initiator of the request or from the executor. In this case, we can not limit the size of the packet for calling the service and immediately receive all the data at once.



Another assumption that makes life much easier is to assume that the initiator has enough resources to process all the data. Requests to external services can also be sent in limited packages, if they require it.



Simplification of logic in this case concerns how the places where the data is needed will be compared with the results of calls. If we assume that the resources of the initiator are very limited, and at the same time try to minimize the number of external calls, we get a rather difficult task for the optimal cutting of the graph. Most likely, you just have to sacrifice performance to reduce resource consumption.



We assume that it is specifically in our demo project that the initiator is not particularly limited in resources, can receive all the necessary data and store it until the end of the session. If there are problems with resources, we just do a little pagination.



Since in my practice just such an approach is most in demand, further examples will concern this option.


We can distinguish such methods of obtaining large sets of queries:





Let's go through all the options on the example of our project.



Reverse engineering



Collect all requests



Since we have a code for implementing all the functions involved in collecting information and converting it for the front-end, we can do reverse engineering and from this code we can understand what requests will be:



class ChatRestController (

private val messageRepository : ChatMessageRepository ,

private val userRepository : UserRemoteApi ,

private val fileRepository : FileRemoteApi

) : ChatRestApi {

override fun getLast ( n : Int ) =

messageRepository . findLast ( n )

. let { messages ->

         // , forward reply

val allMessages = messages . asSequence () . flatMap {

           sequenceOf ( it , it . forwardFrom , it . replyTo ) . filterNotNull ()

} . toSet ()

val allUserReq = allMessages . map { it . author }

         val allFileReq = allMessages . flatMap { it . files ?: listOf () } . toSet ()








All requests are collected, now you need to do the actual batch processing.



For allUserReq and allFileReq we make external queries and group them by id. If there are no restrictions on the size of the package, then it will look something like this:



userRepository . getUsersByIds ( allMessages . map { it . author } . toSet ())

. associateBy { it . id } ::get

fileRepository . getHeadsByIds ( allMessages . flatMap { it . files ?: listOf () } . toSet ())

. associateBy { it . id } ::get








If there is a restriction, then the code will take the following form:



val userApiChunkLimit = 100

allMessages . map { it . author } . asSequence () . distinct ()

. chunked ( userApiChunkLimit , userRepository ::getUsersByIds )

. flatten ()

. associateBy { it . id } ::get








Unfortunately, unlike Stream, Sequence cannot easily switch to a parallel packet request.



If you consider a parallel query to be valid and necessary, you can do, for example, this:



allMessages . map { it . author } . parallelStream () . distinct ()

. chunked ( userApiChunkLimit , userRepository ::getUsersByIds )

. flatten ()

. associateBy { it . id } ::get








It can be seen that nothing has changed much. Using a certain amount of Kotlin magic helped us with this:



fun < T , R > Stream < out T >. chunked ( size : Int , transform : ( List < T > ) -> R ) : Stream < out R > =

batches ( this , size ) . map ( transform )



fun < T > Stream < out Collection < T >>. flatten () : Stream < T > =

flatMap { it . stream () }



fun < T , K > Stream < T >. associateBy ( keySelector : ( T ) -> K ) : Map < K , T > =

collect ( Collectors . toMap ( keySelector , { it } ))








Now it remains to collect everything together:
override fun getLast ( n : Int ) =

messageRepository . findLast ( n )

. let { messages ->

       // , forward reply

val allMessages = messages . asSequence () . flatMap { message ->

         sequenceOf ( message , message . forwardFrom , message . replyTo )

. filterNotNull ()

} . toSet ()



messages . map ( ValueHolder < ( ChatMessage ) -> ChatMessageUI > () . apply {

         value = memoize { message : ChatMessage ->

           message . toFrontModel (

// ,

getUser = allMessages . map { it . author } . parallelStream () . distinct ()

. chunked ( userApiChunkLimit , userRepository ::getUsersByIds )

. flatten ()

. associateBy { it . id } ::get . orThrow { IllegalArgumentException ( "User $ it" ) } ,

//

getFile = fileRepository . getHeadsByIds ( allMessages . flatMap { it . files ?: listOf () } . toSet ())

. associateBy { it . id } ::get . orThrow { IllegalArgumentException ( "File $ it" ) } ,

//

serializeMessage = value

)

}

} . value )

}






Explanations and Simplifications
The first thing that will surely catch your eye is the memoize function. The fact is that the serializeMessage function will almost certainly be called several times for the same messages (due to reply and forward). It is not clear why we need to do toFrontModel separately for each such message (in some cases this may be necessary, but not ours). Therefore, you can do memoization for the serializeMessage function. This is implemented, for example, as follows:



fun < A , R > memoize ( func : ( A ) -> R ) = func as? Memoize2 ?: Memoize2 ( func )

class Memoize2 < A , R > ( val func : ( A ) -> R ) : ( A ) -> R , java . util . function . Function < A , R > {

private val cache = hashMapOf < A , R > ()

override fun invoke ( p1 : A ) = cache . getOrPut ( p1 , { func ( p1 ) } )

override fun apply ( t : A ) : R = invoke ( t )

}








Next, we need to construct a memoized function serializeMessage, but at the same time it will be used inside it. It is important to use exactly the same instance of the function inside, otherwise all memoization will go down the drain. To resolve this collision, we use the ValueHolder class, which simply stores a reference to the value (you can take something standard instead, for example, AtomicReference). To shorten the entry for recursion, you can do this:



inline fun < A , R > recursiveMemoize ( crossinline func : ( A , ( A ) -> R ) -> R ) : ( A ) -> R =

ValueHolder < ( A ) -> R > () . apply {

     value = memoize { a -> func ( a , value ) }

} . value








If you could understand this arrow syllogism the first time - congratulations, you are a functional programmer :-)



Now the code will look like this:



override fun getLast ( n : Int ) =

messageRepository . findLast ( n )

. let { messages ->

       // , forward reply

val allMessages = messages . asSequence () . flatMap { message ->

         sequenceOf ( message , message . forwardFrom , message . replyTo )

. filterNotNull ()

} . toSet ()



// ,

val getUser = allMessages . map { it . author } . parallelStream () . distinct ()

. chunked ( userApiChunkLimit , userRepository ::getUsersByIds )

. flatten ()

. associateBy { it . id } ::get . orThrow { IllegalArgumentException ( "User $ it " ) }



       //

val getFile = fileRepository . getHeadsByIds ( allMessages . flatMap { it . files ?: listOf () } . toSet ())

. associateBy { it . id } ::get . orThrow { IllegalArgumentException ( "File $ it" ) }



       messages . map ( recursiveMemoize { message , memoized : ( ChatMessage ) -> ChatMessageUI ->

         message . toFrontModel (

getUser = getUser ,

getFile = getFile ,

//

serializeMessage = memoized

         )

} )








You can also notice orThrow, which is defined as follows:



/** [exception] , null */

fun < P , R > (( P ) -> R ? ) . orThrow ( exception : ( P ) -> Exception ) : ( P ) -> R =

{ p -> invoke ( p ) . let { it ?: throw exception ( p ) } }








If there is no data on our id in external services and this is considered a legal situation, you need to handle it somehow differently.



After this fix, the getLast runtime is expected to be around 300 ms. Moreover, this time will not grow much, even if the requests no longer fit into the restrictions on the packet size (since packets are requested in parallel). Let me remind you that our minimum goal is 500 ms, and 250 ms can be considered normal work.


Parallelism



But you need to move on. The calls to userRepository and fileRepository are completely independent, and they can be easily parallelized, in theory approaching 200 ms.



For example, through our join function:
override fun getLast ( n : Int ) =

messageRepository . findLast ( n )

. let { messages ->

       // , forward reply

val allMessages = messages . asSequence () . flatMap { message ->

         sequenceOf ( message , message . forwardFrom , message . replyTo )

. filterNotNull ()

} . toSet ()



join ( {

         // ,

allMessages . map { it . author } . parallelStream () . distinct ()

. chunked ( userApiChunkLimit , userRepository ::getUsersByIds )

. flatten ()

. associateBy { it . id }

} , {

         //

fileRepository . getHeadsByIds ( allMessages . flatMap { it . files ?: listOf () } . toSet ())

. associateBy { it . id }

} ) . let { ( users , files ) ->

         messages . map ( recursiveMemoize { message , memoized : ( ChatMessage ) -> ChatMessageUI ->

           message . toFrontModel (

getUser = users ::get . orThrow { IllegalArgumentException ( "User $ it" ) } ,

getFile = files ::get . orThrow { IllegalArgumentException ( "File $ it " ) } ,

//

serializeMessage = memoized

           )

} )

}

}






As practice shows, execution takes about 200 ms, and it is very important that time does not grow much with an increase in the number of messages.



Problems



In general, the code has, of course, become less readable than our naive first version, but it is good that the serialization itself (the implementation of toFrontModel) has not changed much and remains completely readable. The whole logic of cunning work with external services lives in one place.



The downside of this approach is that our abstraction is proceeding.



If we need to make changes to toFrontModel, we will almost certainly have to make changes to the getLast function, which violates the Liskov Substitution Principle.



For example, we agreed to decrypt the attached files only in the main messages, but not in the replies and forwards (reply / forward), or only in the responses and forwards of the first level. In this case, after making changes to the toFrontModel code, you will have to make the corresponding corrections in the code for collecting requests for files. Moreover, the correction will be nontrivial:



fileRepository . getHeadsByIds (

allMessages . flatMap { it . files ?: listOf () } . toSet ()

)








And here we are smoothly approaching another problem that is closely related to the previous one: the correct operation of the code as a whole depends on the literacy of the reverse engineering. In some complex cases, the code may not work correctly precisely because of incorrect query collection. There is no guarantee that you will be able to quickly come up with unit tests that will cover all such tricky situations.



findings



Pros:



  1. An obvious way to pre-receive requests, which is easily separated from the main code.
  2. The almost complete absence of memory and time overhead associated with the use of only the data that would have been received anyway.
  3. Good scaling and the ability to build a service, which in theory will be responsible for a predictable time, regardless of the size of the request from the outside.


Minuses:



  1. Pretty complex code for batch processing itself.
  2. Big and responsible work on the analysis of requests in the existing implementation.
  3. The flowing abstraction and, as a consequence, the fragility of the whole scheme in relation to changes in implementation.
  4. Difficulties in support: errors in the query prediction block are difficult to distinguish from errors in the main code. Ideally, you need to use twice as many unit tests, so proceedings with errors in production will be twice as difficult.
  5. Complying with SOLID principles when writing code: code must be prepared to alienate the logic of batch processing. The introduction of these principles alone will provide some advantages, so this minus is the most insignificant.


It is important to note that you can use this method without doing reverse engineering as such. We need to get a getLast contract, on which the contract for preliminary calculation of requests (hereinafter - prefetch) depends. In this case, we did this by looking at the implementation of getLast (reverse engineering). However, with this approach, difficulties arise: editing these two pieces of code should always be synchronous, and it is impossible to ensure this in any way (remember hashCode and equals, there is exactly the same thing). The next approach, which I would like to show, is designed to solve this problem (or at least mitigate).



Business heuristics



Solving a contract problem



What if we operate not with an exact contract and, therefore, with an exact set of requests, but with an approximate one? Moreover, we will build an approximate set so that it strictly includes the exact set and is based on the characteristics of the subject area.



Thus, instead of the dependency of the prefetch contract on getLast, we establish the dependence of both of them on some common contract that will be dictated by the user. The main difficulty will be to somehow embody this general contract in the form of code.



Search for useful limitations



Let's try to do this with our example.

In our case, there are the following business features:





From the first limitation it follows that you do not need to run around messages, look at what users are there, choose unique ones and make a request for them. You can simply query a predefined list. If you agreed with this statement, then I caught you.



In fact, everything is not so simple. A list can be predefined, but there can be thousands of users. Such things need to be clarified. In our case, the chat participants will usually be two to three, rarely more. So it’s perfectly acceptable to receive data on them all.



Further, if the list of chat users is predetermined, but this information is not in the user service (which is very likely), then there will be no sense from such information. We will make an extra request for the chat user list, and then you still have to make the request (s) to the user service.



Suppose that information about the connection of users and chat is stored in the user service. In our case, this is so, since the connection is determined by the rights of the user. Then for users it will turn out such a prefetch code:



It may seem surprising here that we are not passing any chat id. I did this intentionally so as not to clutter up the sample code.



At first glance, nothing follows from the second restriction. In any case, I still could not get something useful out of him.



We already used the third restriction earlier. It can have a significant impact on how we store and receive conversations. We will not begin to develop this topic, since it has nothing to do with the REST controller and batch processing.



What to do with files? I'd like to get a list of all chat files in one simple request. Under the terms of the API, we only need file headers, without bodies, so this does not look like a resource-intensive and dangerous task for the caller.



On the other hand, we must remember that we do not receive all the chat messages, but only the last N, and it can easily turn out that they do not contain any files at all.



There can be no universal answer: it all depends on the business specifics and use cases. When creating a product solution, you can get into trouble if you lay a heuristic for one use case, and then users will work with the functionality in a different way. For demos and presales, this is a good option, but now we are trying to write an honest production service.



So, alas, it will be possible to do business heuristics for files here only based on the results of operation and statistics collection (or after an expert assessment).



Since we still want to somehow apply our method, suppose that the statistics showed the following:



  1. A typical conversation starts with a message that includes one or more files, followed by reply messages without files.
  2. Almost all messages come in typical conversations.
  3. The expected number of unique files within a single chat is ~ 20.


It follows that to display almost all messages you will need to get the headers of some files (because ChatMessageUI is so arranged) and that the total number of files is small. In this case, it seems reasonable to receive all chat files in one request. To do this, we will have to add the following to our API for files:



fun getHeadsByChat () : List < FileHeadRemote >







The getHeadsByChat method does not look far-fetched and made purely because of our desire to optimize performance (although this is also a good reason). Quite often, in chat rooms with files, users want to see all the files used, and in the order they were added (therefore, we use List).



The implementation of such an explicit connection will require the storage of additional information in a file service or in our application. It all depends on whose area of ​​responsibility, in our opinion, this redundant information about the file’s connection with the chat should be stored. It is redundant because the message is already associated with the file, and it, in turn, is associated with the chat. You can not use denormalization, but extract this information on the fly from messages, that is, inside SQL, immediately get all the files in the entire chat (this is in our application) and request them all at once from the file service. This option will work worse if there are a lot of chat messages, but we don’t need denormalization. I would hide both options behind getHeadsByChat.



The code is as follows:



override fun getLast ( n : Int ) =

messageRepository . findLast ( n )

. let { messages ->

       join (

{ userRepository . getUsersByChat () . associateBy { it . id } } ,

{ fileRepository . getHeadsByChat () . associateBy { it . id } }

       )

. let { //

}

}








It can be seen that compared with the previous version, very little has changed and only the prefetch part has been affected, which is great.



The prefetch code has become much shorter and clearer.



The execution time has not changed, which is logical, since the number of requests remains the same. There are theoretically possible cases when scaling will be better than honest reverse engineering (only due to removing the link of complex calculation). However, opposite situations are equally likely: heuristics row too much too much. As practice shows, if you manage to come up with adequate heuristics, then there should not be any special changes in the execution time.



However, this is not all. We did not take into account that now receiving detailed data on users and files is not related to receiving messages and requests can be launched in parallel:



This option gives a stable 100 ms per request.



Heuristic Errors



What if, when using heuristics, the set of queries is not larger, but slightly smaller than it should be? For most options, such heuristics will work, but there will be exceptions for which you will have to make a separate request. In my practice, such decisions were unsuccessful, since each exception had a big impact on performance, and in the end some user made a request that consisted entirely of exceptions. I would say that in such situations it is better to use reverse engineering, even if the query collection algorithm is creepy and unreadable, but, of course, it all depends on the criticality of the service.



findings



Pros:



  1. The logic of business heuristics is easy to read and usually trivial. This is good in order to understand the limits of applicability, verify and modify the prefetch contract.
  2. Scalability is as good as reverse engineering.
  3. The coherence of the code according to the data is reduced, which can lead to better parallelization of the code.
  4. The prefetch logic, as well as the main logic of the REST controller, is based on requirements. This is a weak plus if requirements change frequently.


Minuses:



  1. From requirements it is not so easy to derive heuristics for query predictions. Clarification of the requirements may be necessary, to a degree that is poorly compatible with agile.
  2. You can get extra data.
  3. To ensure that the prefetch contract works effectively, denormalization of data storage will probably be required. This is a weak minus, since these optimizations follow from the business logic and therefore, most likely, will be claimed by different processes.


From our example, we can conclude that applying this approach is very difficult and the game is not worth the candle. In fact, in real business projects, the number of restrictions is huge and from this heap it is often possible to get something useful, which allows you to partition data or predict statistics. The main advantage of this approach is that the restrictions used are interpreted by the business, therefore they are easily understood and validated.



Usually the biggest problem when trying to use this approach is the separation of activities. The developer should be well immersed in business logic and ask analyst questions that clarify questions, which requires a certain level of initiative.



DDD-style units



In large projects, you can often see the use of DDD practices, since they allow you to efficiently structure your code. It is not necessary to use all DDD templates in the project - sometimes you can get good returns even from the introduction of one. Consider the concept of DDD as an aggregate. An aggregate is a union of logically related entities, work with which is carried out only through the root of the aggregate (usually this is an entity that is the top of the entity connectivity graph).



From the point of view of obtaining data, the main thing in the aggregate is that the whole logic of working with entity lists is in one place - the aggregate. There are two approaches to what should be transferred to the unit during its construction:



  1. We transfer functions to the unit to obtain external data. The logic of determining the necessary data lives inside the unit.
  2. We transfer all the necessary data. The logic for determining the necessary data lives outside the aggregate.


The choice of approach depends largely on how easily prefetch can be moved outside the aggregate. If the prefetch logic is based on business heuristics, it is usually easy to separate it from the aggregate. Moving logic beyond the scope of an aggregate based on an analysis of its use (reverse engineering) can be dangerous, since we distribute logically related code into different classes.



The logic of enlarging queries inside an aggregate



Let's try to sketch out an aggregate that would correspond to the concept of “chat”. Our ChatMessage, UserReference, FileReference classes correspond to the storage model, so they could be renamed with some appropriate prefix, but we have a small project, so let's leave it as it is. We call the assembly Chat, and its components are ChatPage and ChatPageMessage: So far, a lot of senseless duplication has been obtained. This is due to the fact that our subject model is similar to the storage model and both of them are similar to the frontend model. I use the FileHeadRemote and UserRemote classes directly, so as not to write too much code, although you should usually avoid using such classes directly in the domain.



interface Chat {

fun getLastPage ( n : Int ) : ChatPage

}



interface ChatPage {

val messages : List < ChatPageMessage >

}



data class ChatPageMessage (

val id : Long ,

val author : UserRemote ,

val message : String ,

val files : List < FileHeadRemote >,

val replyTo : ChatPageMessage ? ,

val forwardFrom : ChatPageMessage ?

)
















If you use such an aggregate, our REST controller can be rewritten as follows: This option is largely reminiscent of our first naive implementation, but it has an important advantage: the controller is no longer engaged in receiving data directly and does not depend on the classes associated with data storage, but depends only from the unit, which is set via the interfaces. Thus, the prefetch logic is no longer in the controller. The controller only deals with the conversion of the unit into a front-end model, which gives us compliance with the Single Responsibility Principle (SRP). Unfortunately, for all the methods described in the aggregate, you will have to write an implementation.



class ChatRestController (

private val chat : Chat

) : ChatRestApi {

override fun getLast ( n : Int ) =

chat . getLastPage ( n ) . toFrontModel ()



private fun ChatPage . toFrontModel () =

messages . map { it . toFrontModel () }



   private fun ChatPageMessage . toFrontModel () : ChatMessageUI =

ChatMessageUI (

id = id ,

author = author . toFrontReference () ,

message = message ,

files = files . toFrontReference () ,

forwardFrom = forwardFrom ?. toFrontModel () ,

replyTo = replyTo ?. toFrontModel ()

)

}
















Let's try just to save the controller logic implemented when using business heuristics.
class ChatImpl (

private val messageRepository : ChatMessageRepository ,

private val userRepository : UserRemoteApi ,

private val fileRepository : FileRemoteApi

) : Chat {

override fun getLastPage ( n : Int ) = object : ChatPage {

override val messages : List < ChatPageMessage >

get () =

runBlocking ( IO ) {

           val prefetch = async (

{ userRepository . getUsersByChat () . associateBy { it . id } } ,

{ fileRepository . getHeadsByChat () . associateBy { it . id } }

           )



withContext ( IO ) { messageRepository . findLast ( n ) }

             . map (

prefetch . await () . let { ( users , files ) ->

                 recursiveMemoize { message , memoized : ( ChatMessage ) -> ChatPageMessage ->

                   message . toDomainModel (

getUser = users ::get . orThrow { IllegalArgumentException ( "User $ it " ) } ,

getFile = files ::get . orThrow { IllegalArgumentException ( "File $ it " ) } ,

//

serializeMessage = memoized

                   )

}

}

             )

}

   }

}



private fun ChatMessage . toDomainModel (

getUser : ( UserReference ) -> UserRemote ,

getFile : ( FileReference ) -> FileHeadRemote ,

serializeMessage : ( ChatMessage ) -> ChatPageMessage

) = ChatPageMessage (

id = id ?: throw IllegalStateException ( " $ this must be persisted" ) ,

author = getUser ( author ) ,

message = message ,

files = files ?. map ( getFile ) ?: listOf () ,

forwardFrom = forwardFrom ?. let ( serializeMessage ) ,

replyTo = replyTo ?. let ( serializeMessage )

)







It turned out that the getLastPage function itself has a data acquisition strategy, including prefetch, and the toDomainModel function is purely technical and is responsible for converting stored models into a domain model.



I rewrote the parallel calls to userRepository, fileRepository and messageRepository in a more familiar form to Kotlin. I hope that the comprehensibility of the code did not suffer because of this.



In general, such a method is already fully functional, the performance when applying it will be the same as with the simple use of reverse engineering or business heuristics.



The logic of enlarging queries outside the aggregate



In the process of creating the aggregate, we will immediately encounter a problem: to construct ChatPage, the page size will need to be set as a constant when creating Chat, and not pass it to getLast (), as usual. We'll have to change the aggregate interface itself: Since we have a daughter of the remaining messages and we really want to get all the data outside the aggregate, we will have to completely abandon the Chat level aggregate and make ChatPage the root: Next, create a prefetch code separate from the aggregate: Now, for that To create an aggregate, you need to dock it with prefetch. In DDD, this kind of orchestration is done by Application Services.



interface Chat {

fun getPage () : ChatPage

}












class ChatPageImpl (

private val messageData : List < ChatMessage >,

private val userData : List < UserRemote >,

private val fileData : List < FileHeadRemote >

) : ChatPage {

override val messages : List < ChatPageMessage >

get () =

messageData . map (

( userData . associateBy { it . id } to fileData . associateBy { it . id } )

. let { ( users , files ) ->

             recursiveMemoize { message , self : ( ChatMessage ) -> ChatPageMessage ->

               message . toDomainModel (

getUser = users ::get . orThrow () ,

getFile = files ::get . orThrow () ,

//

serializeMessage = self

               )

}

}

       )

}












fun chatPagePrefetch (

pageSize : Int ,

messageRepository : ChatMessageRepository ,

userRepository : UserRemoteApi ,

fileRepository : FileRemoteApi

) =

runBlocking ( IO ) {

     async (

{ userRepository . getUsersByChat () } ,

{ fileRepository . getHeadsByChat () } ,

{ messageRepository . findLast ( pageSize ) }

     )

}












class ChatService (

private val messageRepository : ChatMessageRepository ,

private val userRepository : UserRemoteApi ,

private val fileRepository : FileRemoteApi

) {

private fun chatPagePrefetch ( pageSize : Int ) =

runBlocking ( IO ) {

       async (

{ messageRepository . findLast ( pageSize ) } ,

{ userRepository . getUsersByChat () } ,

{ fileRepository . getHeadsByChat () }

       ) . await ()

}



   fun getLastPage ( n : Int ) : ChatPage =

chatPagePrefetch ( n )

. let { ( messageData , userData , fileData ) ->

         ChatPageImpl ( messageData , userData , fileData )

}

}








Well, the controller will not change much, you just need to use ChatService :: getLastPage instead of Chat :: getLastPage. That is, the code will change like this:



class ChatRestController (

private val chat : ChatService

) : ChatRestApi









findings



  1. Prefetch logic can be placed either inside the unit or in a separate place.
  2. If the prefetch logic is strongly related to the internal logic of the aggregate, it is better not to take it out, as this can disrupt encapsulation. I personally don’t see much point in moving the prefetch out of the aggregate, as this greatly limits the possibilities and increases the implicit coherence of the code.
  3. Aggregate organization itself has a positive effect on the performance of batch processing, as control over heavy requests becomes more and the place for prefetch logic becomes quite defined.


In the next chapter, we will consider a prefetch implementation that cannot be implemented in isolation from the main function.



Proxy and double call



Solving the contract problem



As we have already seen in the previous parts, the main problem of the prefetch contract is that it is strongly connected with the contract of the function for which it must prepare the data. To be more precise, it depends on what data the main function may need. What if we do not try to predict, but try to do reverse engineering using the code itself? In simple situations, the proxy approach commonly used in testing can help us. Libraries such as Mockito generate classes with interface implementations, which can also accumulate information about calls. A similar approach is used in our library .



If you call the main function with proxied repositories and collect information about the necessary data, then you can then get this data in the form of a package and re-call the main function to obtain the final result.



The main condition is the following: the requested data should not affect subsequent requests. The proxy will not return real data, but only some stubs, so all branching and receiving the associated data disappear.



In our case, this means that it is useless to proxy messageRepository, since further requests are made based on the results of the message request. This is not a problem, since we have only one request for messageRepository, so no batch processing is required here.



Since we are going to proxy the simple functions UserReference-> UserRemote and FileReference-> FileHeadRemote, you just need to accumulate two lists of arguments.



As a result, we get the following:
class ChatRestController (

private val messageRepository : ChatMessageRepository ,

private val userRepository : UserRemoteApi ,

private val fileRepository : FileRemoteApi

) : ChatRestApi {

override fun getLast ( n : Int ) : List < ChatMessageUI > {

val messages = messageRepository . findLast ( n )



//

fun transform (

getUser : ( UserReference ) -> UserRemote ,

getFile : ( FileReference ) -> FileHeadRemote

     ) : List < ChatMessageUI > =

messages . map (

recursiveMemoize { message , self ->

           message . toFrontModel ( getUser , getFile , self )

}

       )



//

val userIds = mutableSetOf < UserReference > ()

val fileIds = mutableSetOf < FileReference > ()

transform (

{ userIds += it ; UserRemote ( 0L , "" ) } ,

{ fileIds += it ; FileHeadRemote ( 0L , "" ) }

     )



return runBlocking ( IO ) {

       //

async (

{ userRepository . getUsersByIds ( userIds ) . associateBy { it . id } ::get . orThrow () } ,

{ fileRepository . getHeadsByIds ( fileIds ) . associateBy { it . id } ::get . orThrow () }

       ) . await () . let { ( getUser , getFile ) ->

         transform ( getUser , getFile )

}

}

   }

}






If you measure performance, it turns out that with this approach it is no worse than using reverse engineering methods, although we call the function twice. This is due to the fact that, compared to the execution time of external queries, the execution time of the conversion function can be neglected (in our case).



Compared with performance when using business heuristics, in our case, the accumulation of queries will be less effective. But keep in mind that it’s not always possible to find such good heuristics. For example, if the number of users in the chat is large, as is the number of files, and files are rarely attached to messages, then our algorithm on business heuristics will immediately begin to lose to honestly receiving a list of requests.



findings



Pros:



  1. prefetch - .
  2. prefetch.
  3. , -.


Minuses:



  1. .
  2. , .


Despite the apparent exoticism, the accumulation of requests through proxying and recall is quite applicable in situations where the logic of the main function is not tied to the received data. The main difficulty here is the same as in reverse engineering: we are laying on the current implementation of the function, although to a much lesser extent (only on the fact that the following queries do not depend on the results of previous queries).



Performance will drop slightly, but in the prefetch code you will not need to take into account all the nuances of the implementation of the main function.



You can use this approach when you can’t build good business heuristics for query prediction, and you want to reduce the prefetch and function connectivity.



Conclusion



Using batch processing is not as simple as it seems at first glance. I think all design patterns have this property (remember caching).



For efficient batch processing of requests, it is important for the caller to collect as many requests as possible, which is often hampered by the structure of the application. There are two ways out: either design the application with a view to efficiently working with data (it may very well lead to a reactive device of the application), or, as it often happens, try to implement batch processing in an existing application without significant restructuring.



The most obvious way to pile queries together is to reverse engineer existing code in search of heavy queries. The main drawback of this approach will be an increase in implicit code connectivity. An alternative is to use information about business features in order to divide the data into chunks, which are often shared and the whole. Sometimes for such an effective separation it will be necessary to denormalize storage, but if this succeeds, the logic of batch processing will be determined by the subject area, which is good.



A less obvious way to get all requests is to implement two passes. At the first stage, we collect all the necessary requests, at the second we work with the data already received. The applicability of this approach is limited by the requirement of independence of requests from each other.



All Articles