Introduction
There are already many excellent introductory articles on reactive programming, about the reactor project and its integration within the Spring Framework.
This post is about comparing the "standard " Spring framework to the reactive framework as a base for streaming Microservices
There is an excellent example project on github from Mark Paluch illustrating the difference between both ideas.
For the moment there is a lack of performance benchmarks. It is difficult to know if the reactive frameworks with all their overhead outperform the standard way of things.
If you really think it through, it is actually allowing the application to handle the scheduling of the CPU resources (asynchronous/reactive) vs letting the OS handle the CPU resources.
Setup
I wanted to perform a quick and basic performance/load test with a base streaming app building block that does some data enrichment.Reactive Kafka Consumer⇒Reactive REST towards external service for data enrichment⇒Reactive Kafka Producer
vs
Kafka Consumer⇒Blocking REST⇒Kafka Producer
The reactive way
The reactive setup will keep on pulling Kafka
records, and buffering up REST requests waiting for for their answers. Nothing
will block. Once a response is received from the data enrichment service and
the original data is enriched, the data is passed to the Kafka producer.
Since nothing in this pipe line is supposed to be blocking, the only thing stalling the processing is the amount we are willing to buffer. In this particular case, outstanding REST requests with their associated network sockets.
Since nothing in this pipe line is supposed to be blocking, the only thing stalling the processing is the amount we are willing to buffer. In this particular case, outstanding REST requests with their associated network sockets.
The standard way
The Kafka consumer will poll the Kafka Broker
for records. The records will be fur in different workers threads. Each
record is submitted to an ExecutorService with a pool of worker threads doing
the actual work. Each worker thread, once scheduled for work will issue a REST
request and stall until it receives a response. Then it will proceed sending
the final result to Kafka, and then accepting the next record, and so on.
Since the REST request is blocking, a thread is blocked and is not able to any further processing until a response is received. That means that any outstanding REST request consumes a thread.
Since the REST request is blocking, a thread is blocked and is not able to any further processing until a response is received. That means that any outstanding REST request consumes a thread.
The code
1 2 3 4 5 6 7 8 | ReceiverOptions<String, String> receiverOptions = ReceiverOptions.<String, String>create(consumerProps) .subscription(Collections.singleton("input-topic")) Flux<ReceiverRecord<String, String>> inboundFlux = KafkaReceiver.create(receiverOptions) .receive(); |
Creating the Reactive Kafka consumer
1 2 3 | WebClient webClient = WebClient.builder() .baseUrl("http://enrichmentservice/") .build(); |
Creating the Reactive web client
1 2 3 4 | SenderOptions<String, String> senderOptions = SenderOptions.<String, String>create(producerProps); KafkaSender<String, String> kafkaSender = KafkaSender.create(senderOptions); |
Creating the Reactive Kafka producer
The heart of the reactive chain
1 2 3 4 5 | public static Mono<String> enrichData(WebClient client, ReceiverRecord<String, String > r) { WebClient.ResponseSpec responseSpec = client.get().retrieve(); Mono<String> restResp = responseSpec.bodyToMono(String.class); return restResp.map(m -> m + " " + r.key()); } |
The data enrichment method
1 2 3 4 5 6 7 8 | kafkaSender.createOutbound() .send(inboundFlux.flatMap( r -> { return enrichData(webClient, r); }) .map(r -> new ProducerRecord<String, String>("output-topic", r))) .then() .subscribe(); |
The reactive chain
One
of the issues encountered in that part, was how to map the result of the web
client which is a single REST response back towards to a stream of record. In
Reactor terminology: from a Mono<String> back to Flux<String>. This
is done here by using the flapMap operator which accepts a Function with
Publisher as parameter. Publisher is the base interface of both Mono's and
Flux's
The results
Note
the actual results are dependent on the machines, Kafka brokers, you are
running it on. But the conclusion should be pretty independent of your setup.
- At low loads, the standard solution +- compares to the reactive solution . Sometimes the standard solution slightly outperforms the reactive one from a system resource point of view (%cpu, %mem)
- At higher loads the standard solution starts using a huge amount of threads to cope with the stalling REST requests, which encompass system resources, cpu's spending time context switching.
In
my low performance setup with a load of 4000 Kafka records/s and a 100ms delay
in the enrichment rest service.
- Standard: 520 threads, 520 sockets, 32%CPU usage, 8.3%Memory
- Reactive: 37 threads, 502 sockets, 16%CPU usage, 4.3%Memory
With my setup, I could not get really get higher than 4000 Record/s, but I think we can safely extrapolate the results to higher loads.
Conclusion and final notes
We
see that in this particular setup the Spring reactive framework does a good job
especially when the standard synchronous framework is starting to use a huge
amount of resources to cope with latencies and delays.
I
don't pretend reactive programming is the response to all issues. The Spring
Reactive framework needs much more benchmarking and performance
characterization.
There are other reactive frameworks, other
asynchronous/event driven frameworks that were not considered here.
On
a final note, we can see that both solutions creates a large amount HTTP of
connections to cope with the delays. This is due to the nature of the HTTP
protocol: you cannot issue a new request on the same connection before
receiving the response. This raise the question whether HTTP is a good
candidate for high performance REST based services.
Most
probably HTTP2 should perform better here
NB:
I had to increase "reactor.bufferSize.small" to increase the number of pending reactive REST requests
NB:
I had to increase "reactor.bufferSize.small" to increase the number of pending reactive REST requests
This comment has been removed by a blog administrator.
ReplyDelete