Wednesday, December 20, 2017

Using Kafka Streams for network analysis part 1

Introduction


In the following series of articles, I want to explore the use of streaming engines for network analyses. For that end, I decided to use Kafka Streams for its simplicity. It doesn't require the setup of clusters, complex configurations,...It is a matter of including kafka streaming jar dependency in your application and you are ready to go. When load increases just spawn another instance of your application.

Where this small toy project will lead me is unknown.

Note that,  there are already some heavy weight projects such as Metron which use streaming engines which enables us to similar analyses. Other projects of the same kind, not using streaming platforms,  are  Bro , YAF or Moloch.

Traditionally, protocol state handling and packet reassembly are performed by monolithic blocks of code written usually in "C/C++", sometimes in Go. We find it for example in Operating Systems, or applications such as Wireshark, Tcpdump, or PacketBeat 

After that initial packet capture and parsing is done, a packet summary is forwarded to some framework for indexing, analysis,...

One of the drawbacks of sending only packet summaries, is that low level analyses is not possible anymore further down the line.

Below a diagram of the setup I am going to experiment with.


Setup

The Network probe

The network probe implemented is very simple. It is written in C++/C and uses the standard pcap library for packet capture and librdkafka for transmitting record on Kafka for further processing.

The probe itself is single threaded. The librdkafka, however, uses multiple threads internally.

We use the standard network 5 Tuple: source & destination IP address, protocol, source & destination port as the Kafka key for the configured topic. By using the network 5 Tuple as key, we guarantee that packets belonging to the same stream are handled by the same Kafka consumers down the line.
We need also to arrange the 5 tuple is such a way that both directions of the same stream have the same key.

In this toy network probe, only basic Ethernet II packet parsing was implemented, no 802.1q vlans, and others were implemented.

Modern cards Ethernet cards offload the CPU by automatically reassembling TCP segments. Artificial large packets are received by the network driver and all layers above.

In order to capture the TCP/UDP packets as they are received by the network card, you need to disable the network cards generic receive offload if present at all (sometimes also LRO, TRO,...)
On Linux you can do that with 

sudo ethtool ethX -K gro off


For high performance SW based probes, frameworks such as DPDKPF_RINGNETMAP are recommended.

Streaming part

In this article, I decided to use Kafka Streams but any other Streaming platform would have been equally valid.

The first step is to ingest the Kafka records produced by the network probes discussed here above.

The binary encoded Kafka Key/Value records produced by the network probes, are deserialized to Net5Tuple POJO's as the key and a ByteArray for the value.


1
 final KStream<Net5Tuple, byte []> packetStream = builder.stream(net5TupleSerde, byteArraySerde, "packets");

We configure the default Serializers/Deserializers as follows:

1
2
 streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, net5TupleSerde.getClass().getName());
 streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName());

The Net5Tuple class uses NIO's ByteBuffers to map the binary fields into their Java counterparts. NIO uses by default Big Endian(network order) ordering, which is the ordering used to encode IP addresses and ports.

From that point on, we can start using Streaming primitives to perform packet reassembly and decoding, collect statistics and perform analyses.

Examples

I will show here 2 small examples of what can be done. More complex examples such as TCP packet reassembly, http decoding will be handled in future articles.

Packet statistics

Here we print on the console packet statistics per stream. Note: this bare meaning only in the case of TCP and UDP.


1
2
3
4
5
 packetStream.groupByKey(net5TupleSerde, byteArraySerde)
  .count()
  .toStream()
  .map((k,v) -> KeyValue.pair(k.toString(), v))
  .print();

We could use here also a KTable and then access externally the statistics via REST API's

SubStreams according to protocol


1
2
3
4
5
  KStream<Net5Tuple, byte []> [] perProtoStream = packetStream.branch(
                (k,v) -> k.getProtocol() == TCP,
                (k,v) -> k.getProtocol() == UDP,
                (k,v) -> true
        );



References:

  • Kafka: a distributed streaming platform
  • Kafka Streams: streaming engine on top of the Kafka
  • LibPcap: packet capture library
  • Code for the network probe is available upon request. 
  • DPDK: user mode data plane libraries



Wednesday, December 6, 2017

Evaluating Spring Reactive & WebFlux framework as base for streaming Microservices

Introduction


There are already many excellent introductory articles on reactive programming, about the reactor project and its integration within the Spring Framework.

This post is about comparing the "standard " Spring framework to the reactive framework as a base for streaming Microservices

There is an excellent example project on github from Mark Paluch illustrating the difference between both ideas.

For the moment there is a lack of performance benchmarks. It is difficult to know if the reactive frameworks with all their overhead outperform the standard way of things.

If you really think it through, it is actually allowing the application to handle the scheduling of the CPU resources (asynchronous/reactive) vs letting the OS handle the CPU resources.


Setup

I wanted to perform a quick and basic performance/load test with a base streaming app building block that does some data enrichment.


Reactive Kafka Consumer⇒Reactive REST towards external service for data enrichment⇒Reactive Kafka Producer

vs

Kafka Consumer⇒Blocking REST⇒Kafka Producer

The reactive way

The reactive setup will keep on pulling Kafka records, and buffering up REST requests waiting for for their answers. Nothing will block. Once a response is received from the data enrichment service and the original data is enriched, the data is passed to the Kafka producer.
Since nothing in this pipe line is supposed to be blocking, the only thing stalling the processing is the amount we are willing to buffer. In this particular case, outstanding REST requests with their associated network sockets.

The standard way

The Kafka consumer will poll the Kafka Broker for records. The records will be fur in different workers threads.  Each record is submitted to an ExecutorService with a pool of worker threads doing the actual work. Each worker thread, once scheduled for work will issue a REST request and stall until it receives a response. Then it will proceed sending the final result to Kafka, and then accepting the next record, and so on.

Since the REST request is blocking, a thread is blocked and is not able to any further processing until a response is received. That means that any outstanding REST request consumes a thread.

The code


1
2
3
4
5
6
7
8
        ReceiverOptions<String, String> receiverOptions =
                ReceiverOptions.<String, String>create(consumerProps)
                        .subscription(Collections.singleton("input-topic"))
                        

        Flux<ReceiverRecord<String, String>> inboundFlux =
                KafkaReceiver.create(receiverOptions)
                        .receive();
Creating the Reactive Kafka consumer


1
2
3
        WebClient webClient = WebClient.builder()
                .baseUrl("http://enrichmentservice/")
                .build();
Creating the Reactive web client



1
2
3
4
        SenderOptions<String, String> senderOptions =
                SenderOptions.<String, String>create(producerProps);

        KafkaSender<String, String> kafkaSender = KafkaSender.create(senderOptions);
Creating the Reactive Kafka producer



The heart of the reactive chain


1
2
3
4
5
    public static Mono<String> enrichData(WebClient client, ReceiverRecord<String, String > r) {
        WebClient.ResponseSpec responseSpec = client.get().retrieve();
        Mono<String> restResp = responseSpec.bodyToMono(String.class);
        return restResp.map(m -> m + " " + r.key());
    }

The data enrichment method


1
2
3
4
5
6
7
8
        kafkaSender.createOutbound()
              .send(inboundFlux.flatMap(
                        r -> {
                            return enrichData(webClient, r);
                        })
                        .map(r -> new ProducerRecord<String, String>("output-topic", r)))
               .then()
               .subscribe();
The reactive chain



One of the issues encountered in that part, was how to map the result of the web client which is a single REST response back towards to a stream of record. In Reactor terminology: from a Mono<String> back to Flux<String>. This is done here by using the flapMap operator which accepts a Function with Publisher as parameter. Publisher is the base interface of both Mono's and Flux's

The results

Note the actual results are dependent on the machines, Kafka brokers, you are running it on. But the conclusion should be pretty independent of your setup.
  •  At low loads, the standard solution +- compares to the reactive solution . Sometimes the standard solution slightly outperforms the reactive one from a system resource point of view (%cpu, %mem)
  •  At higher loads the standard solution starts using a huge amount of threads to cope with the stalling REST requests, which encompass system resources, cpu's spending time context switching. 
On the other hand the reactive solution doesn't need to create additional threads to cope with the load.
In my low performance setup with a load of 4000 Kafka records/s and a 100ms delay in the enrichment rest service.
  • Standard: 520 threads, 520 sockets, 32%CPU usage, 8.3%Memory
  • Reactive: 37 threads, 502 sockets, 16%CPU usage, 4.3%Memory

With my setup, I could not get really get higher than 4000 Record/s, but I think we can safely extrapolate the results to higher loads.

Conclusion and final notes

We see that in this particular setup the Spring reactive framework does a good job especially when the standard synchronous framework is starting to use a huge amount of resources to cope with latencies and delays.

I don't pretend reactive programming is the response to all issues. The Spring Reactive framework needs much more benchmarking and performance characterization. 

There are other reactive frameworks, other asynchronous/event driven frameworks that were not considered here.
On a final note, we can see that both solutions creates a large amount HTTP of connections to cope with the delays. This is due to the nature of the HTTP protocol: you cannot issue a new request on the same connection before receiving the response. This raise the question whether HTTP is a good candidate for high performance REST based services. 

Most probably HTTP2 should perform better here

NB:
I had to increase "reactor.bufferSize.small" to increase the number of pending reactive REST requests

Monday, December 4, 2017

The world is not ours anymore - Minecon Earth 2017

Last month, I was with mine 9 years old son, to a local Minecraft event where the Minecon conference was screened followed by a panel of young Minecraft youtubers.

At the beginning I was not so eager to go to another of those gaming events for kids, full of chaos,  screaming, but this time it was different.

It was an epiphany, I suddenly realized the fundamental gap there is between the digital natives and the representatives of my generation X and to some extent to the Millennials(Generation Y).

Their heroes are young youtubers between 14 a 16 years. They are the stars of this generation. To some extent, it is much democratic and easy to access to stardom nowadays then when we were young. You make some cool movies, publish and promote them on youtube. And then hope for "Likes" and "Subscriptions", the measures of your success.

At the end of the screening, there were presentations by the members of the panel. One of them, was about setting up a Minecraft server, hiring developers, promoting and marketing of the server. Strategies on how retain as long as possible the online players. Full of good common sense without the professional bullshit. This by a 16 years old kid. It was amazing.
























Thursday, March 30, 2017

Stream processing of time series and the uncertainty principle of Heisenberg

Recently at work, we worked on processing time-based event series.

The collected events could enter the system out of order, and we needed to sort and process them based on their time of arrival in the system.
This was done using time windows. Time windows are buckets of events. Events arrived in the system are placed in the corresponding time buckets.  If an event arrives later than a predefined max out order time, it is discarded.
Once the system decides a time window is ready for processing it releases the events and they are processed.
E.g. consider the following:
  • Time window 1: 0sec-10sec
  • Time window 2: 10sec-20sec
  • Time window 3: 20sec-30sec

And consider a maximum out of order delay of 2sec.
  • Event 1 arrives at t=1sec and is placed in time window 1
  • Event 2 arrives at t=5sec and is placed in time window 1
  • Event 3 arrives at t=11sec and is placed in time window 2
  • Event 4 arrives at t=9sec and is placed in time window 1 (out of order delay is still smaller or equal than 2 seconds)
  • Event 5 arrives at t=15 sec  and is placed in time window 2, since t=15 sec, and t-end of time window 1> out of order delay, the window 1, with events 1,2 and 4, is released for processing


The question arises, what is the optimal size of the window?
If you have small time windows, you fill them fast, and release them fast for further processing, but on the other end the ability to sort the events is limited.
On the other hand, using large time windows provides the ability to arrange a large amount of events in order of arrival in the system, but the delay to process the ordered events increases proportionally.
If you want to order an infinite stream of time-based events perfectly, you will have to wait an eternity before processing them.

This duality, this inherent impossibility to reconcile both aspects: reduce latency to a maximum (release events as fast as possible for processing: small time windows)  and the ability to have an ordered set of events (large time events). The same duality exists in computer science, to a certain extent, with latency vs performance.

This is exactly what the uncertainty principle in physics is about.

The uncertainty principle of Heisenberg is an important concept/idea in the field of quantum mechanics.
It was formulated by Werner Heisenberg in the 1920's when the basics quantum physics were conceived.

It basically states that there is a limit to the precision in wich we can measure velocity and position of an object at the same time. Either we can measure speed with extreme precision but then we compromise on the exactitude of the object's position or the other way around. 
For objects at human scale the lack of precision is negligible. However, for atomic particles the limit becomes really apparent.  This is a fundamental limitation. It does not result from our lack of technological ingenuity. It is a limit to the way we represent physical reality.

This fundamental uncertainty is the core principle of the famous Einstein's quote: God doesn't play dice.
There is another equivalent formulation of the uncertainty principle which uses energy and time, or frequency and time, instead of velocity and position:
It is impossible to measure the exact frequency of a phenomenon quickly. If you want zero uncertainty on the frequency you have to measure it during an infinite time.

People working on signal processing always have to compromise about the precision of the frequency measurements and the time windows.

Let's say you want to investigate the migration pattern of birds. You want to know the frequency of their migrations. 
You go outside once in October and you see birds flying southwards. Can you deduce anything regarding the frequency of their migration? No, you cannot.
Let's say you go outside a couple of times during the year, and observe that once during this time frame they were flying southwards and once to the north. Can you extrapolate with certainty that this pattern is applicable every year and at all times. No you cannot.

To be sure you will have to observe for eternity.

What I wanted to illustrate here is that sometimes seemingly unrelated ideas are somehow connected.
And  that the way we perceive and interpret reality sometimes limits us in seeing underlying concepts






Tuesday, March 7, 2017

Not politically correct

Fed up with the supreme value of ''not being politically correct"
''Not being politically correct" is nowadays considered as an act of rebellion
Politically incorrect people are considered as freedom fighters.
Those who are not rude, not obnoxious or don't think that everyone who doesn't think like them are considered as morons, liars and merely part of a flock of sheeps.
Those who are not racist, egocentric and bigots are supposedly just lying to everyone, including themselves.
"Deep inside everyone is a rotten bastard" is what they tell us.
Well I don't think so.
Not everyone is a rotten bastard that doesn't speak out.
Not being an all loving altruist and being "politically correct" are mutually compatible

Saturday, January 28, 2017

Nihil sub sole novum

And here we go again for another round
I hope I am wrong
Fear and obscurantism are back on
Like waves, ups and downs. You can try to brake them, to stop them, but they'll keep on coming.
History has it own dynamics, like the Greek tragedy, the Roman fatum, it is not stoppable.
Unfortunately it is stronger than us



Remaining relevant: Abstraction layers and API's for cloud native applications

Separation of concerns, abstraction layers and API's for Cloud native environments Those 3 terms are closely related to each other. T...