Wednesday, December 20, 2017

Using Kafka Streams for network analysis part 1

Introduction


In the following series of articles, I want to explore the use of streaming engines for network analyses. For that end, I decided to use Kafka Streams for its simplicity. It doesn't require the setup of clusters, complex configurations,...It is a matter of including kafka streaming jar dependency in your application and you are ready to go. When load increases just spawn another instance of your application.

Where this small toy project will lead me is unknown.

Note that,  there are already some heavy weight projects such as Metron which use streaming engines which enables us to similar analyses. Other projects of the same kind, not using streaming platforms,  are  Bro , YAF or Moloch.

Traditionally, protocol state handling and packet reassembly are performed by monolithic blocks of code written usually in "C/C++", sometimes in Go. We find it for example in Operating Systems, or applications such as Wireshark, Tcpdump, or PacketBeat 

After that initial packet capture and parsing is done, a packet summary is forwarded to some framework for indexing, analysis,...

One of the drawbacks of sending only packet summaries, is that low level analyses is not possible anymore further down the line.

Below a diagram of the setup I am going to experiment with.


Setup

The Network probe

The network probe implemented is very simple. It is written in C++/C and uses the standard pcap library for packet capture and librdkafka for transmitting record on Kafka for further processing.

The probe itself is single threaded. The librdkafka, however, uses multiple threads internally.

We use the standard network 5 Tuple: source & destination IP address, protocol, source & destination port as the Kafka key for the configured topic. By using the network 5 Tuple as key, we guarantee that packets belonging to the same stream are handled by the same Kafka consumers down the line.
We need also to arrange the 5 tuple is such a way that both directions of the same stream have the same key.

In this toy network probe, only basic Ethernet II packet parsing was implemented, no 802.1q vlans, and others were implemented.

Modern cards Ethernet cards offload the CPU by automatically reassembling TCP segments. Artificial large packets are received by the network driver and all layers above.

In order to capture the TCP/UDP packets as they are received by the network card, you need to disable the network cards generic receive offload if present at all (sometimes also LRO, TRO,...)
On Linux you can do that with 

sudo ethtool ethX -K gro off


For high performance SW based probes, frameworks such as DPDKPF_RINGNETMAP are recommended.

Streaming part

In this article, I decided to use Kafka Streams but any other Streaming platform would have been equally valid.

The first step is to ingest the Kafka records produced by the network probes discussed here above.

The binary encoded Kafka Key/Value records produced by the network probes, are deserialized to Net5Tuple POJO's as the key and a ByteArray for the value.


1
 final KStream<Net5Tuple, byte []> packetStream = builder.stream(net5TupleSerde, byteArraySerde, "packets");

We configure the default Serializers/Deserializers as follows:

1
2
 streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, net5TupleSerde.getClass().getName());
 streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName());

The Net5Tuple class uses NIO's ByteBuffers to map the binary fields into their Java counterparts. NIO uses by default Big Endian(network order) ordering, which is the ordering used to encode IP addresses and ports.

From that point on, we can start using Streaming primitives to perform packet reassembly and decoding, collect statistics and perform analyses.

Examples

I will show here 2 small examples of what can be done. More complex examples such as TCP packet reassembly, http decoding will be handled in future articles.

Packet statistics

Here we print on the console packet statistics per stream. Note: this bare meaning only in the case of TCP and UDP.


1
2
3
4
5
 packetStream.groupByKey(net5TupleSerde, byteArraySerde)
  .count()
  .toStream()
  .map((k,v) -> KeyValue.pair(k.toString(), v))
  .print();

We could use here also a KTable and then access externally the statistics via REST API's

SubStreams according to protocol


1
2
3
4
5
  KStream<Net5Tuple, byte []> [] perProtoStream = packetStream.branch(
                (k,v) -> k.getProtocol() == TCP,
                (k,v) -> k.getProtocol() == UDP,
                (k,v) -> true
        );



References:

  • Kafka: a distributed streaming platform
  • Kafka Streams: streaming engine on top of the Kafka
  • LibPcap: packet capture library
  • Code for the network probe is available upon request. 
  • DPDK: user mode data plane libraries



Wednesday, December 6, 2017

Evaluating Spring Reactive & WebFlux framework as base for streaming Microservices

Introduction


There are already many excellent introductory articles on reactive programming, about the reactor project and its integration within the Spring Framework.

This post is about comparing the "standard " Spring framework to the reactive framework as a base for streaming Microservices

There is an excellent example project on github from Mark Paluch illustrating the difference between both ideas.

For the moment there is a lack of performance benchmarks. It is difficult to know if the reactive frameworks with all their overhead outperform the standard way of things.

If you really think it through, it is actually allowing the application to handle the scheduling of the CPU resources (asynchronous/reactive) vs letting the OS handle the CPU resources.


Setup

I wanted to perform a quick and basic performance/load test with a base streaming app building block that does some data enrichment.


Reactive Kafka Consumer⇒Reactive REST towards external service for data enrichment⇒Reactive Kafka Producer

vs

Kafka Consumer⇒Blocking REST⇒Kafka Producer

The reactive way

The reactive setup will keep on pulling Kafka records, and buffering up REST requests waiting for for their answers. Nothing will block. Once a response is received from the data enrichment service and the original data is enriched, the data is passed to the Kafka producer.
Since nothing in this pipe line is supposed to be blocking, the only thing stalling the processing is the amount we are willing to buffer. In this particular case, outstanding REST requests with their associated network sockets.

The standard way

The Kafka consumer will poll the Kafka Broker for records. The records will be fur in different workers threads.  Each record is submitted to an ExecutorService with a pool of worker threads doing the actual work. Each worker thread, once scheduled for work will issue a REST request and stall until it receives a response. Then it will proceed sending the final result to Kafka, and then accepting the next record, and so on.

Since the REST request is blocking, a thread is blocked and is not able to any further processing until a response is received. That means that any outstanding REST request consumes a thread.

The code


1
2
3
4
5
6
7
8
        ReceiverOptions<String, String> receiverOptions =
                ReceiverOptions.<String, String>create(consumerProps)
                        .subscription(Collections.singleton("input-topic"))
                        

        Flux<ReceiverRecord<String, String>> inboundFlux =
                KafkaReceiver.create(receiverOptions)
                        .receive();
Creating the Reactive Kafka consumer


1
2
3
        WebClient webClient = WebClient.builder()
                .baseUrl("http://enrichmentservice/")
                .build();
Creating the Reactive web client



1
2
3
4
        SenderOptions<String, String> senderOptions =
                SenderOptions.<String, String>create(producerProps);

        KafkaSender<String, String> kafkaSender = KafkaSender.create(senderOptions);
Creating the Reactive Kafka producer



The heart of the reactive chain


1
2
3
4
5
    public static Mono<String> enrichData(WebClient client, ReceiverRecord<String, String > r) {
        WebClient.ResponseSpec responseSpec = client.get().retrieve();
        Mono<String> restResp = responseSpec.bodyToMono(String.class);
        return restResp.map(m -> m + " " + r.key());
    }

The data enrichment method


1
2
3
4
5
6
7
8
        kafkaSender.createOutbound()
              .send(inboundFlux.flatMap(
                        r -> {
                            return enrichData(webClient, r);
                        })
                        .map(r -> new ProducerRecord<String, String>("output-topic", r)))
               .then()
               .subscribe();
The reactive chain



One of the issues encountered in that part, was how to map the result of the web client which is a single REST response back towards to a stream of record. In Reactor terminology: from a Mono<String> back to Flux<String>. This is done here by using the flapMap operator which accepts a Function with Publisher as parameter. Publisher is the base interface of both Mono's and Flux's

The results

Note the actual results are dependent on the machines, Kafka brokers, you are running it on. But the conclusion should be pretty independent of your setup.
  •  At low loads, the standard solution +- compares to the reactive solution . Sometimes the standard solution slightly outperforms the reactive one from a system resource point of view (%cpu, %mem)
  •  At higher loads the standard solution starts using a huge amount of threads to cope with the stalling REST requests, which encompass system resources, cpu's spending time context switching. 
On the other hand the reactive solution doesn't need to create additional threads to cope with the load.
In my low performance setup with a load of 4000 Kafka records/s and a 100ms delay in the enrichment rest service.
  • Standard: 520 threads, 520 sockets, 32%CPU usage, 8.3%Memory
  • Reactive: 37 threads, 502 sockets, 16%CPU usage, 4.3%Memory

With my setup, I could not get really get higher than 4000 Record/s, but I think we can safely extrapolate the results to higher loads.

Conclusion and final notes

We see that in this particular setup the Spring reactive framework does a good job especially when the standard synchronous framework is starting to use a huge amount of resources to cope with latencies and delays.

I don't pretend reactive programming is the response to all issues. The Spring Reactive framework needs much more benchmarking and performance characterization. 

There are other reactive frameworks, other asynchronous/event driven frameworks that were not considered here.
On a final note, we can see that both solutions creates a large amount HTTP of connections to cope with the delays. This is due to the nature of the HTTP protocol: you cannot issue a new request on the same connection before receiving the response. This raise the question whether HTTP is a good candidate for high performance REST based services. 

Most probably HTTP2 should perform better here

NB:
I had to increase "reactor.bufferSize.small" to increase the number of pending reactive REST requests

Monday, December 4, 2017

The world is not ours anymore - Minecon Earth 2017

Last month, I was with mine 9 years old son, to a local Minecraft event where the Minecon conference was screened followed by a panel of young Minecraft youtubers.

At the beginning I was not so eager to go to another of those gaming events for kids, full of chaos,  screaming, but this time it was different.

It was an epiphany, I suddenly realized the fundamental gap there is between the digital natives and the representatives of my generation X and to some extent to the Millennials(Generation Y).

Their heroes are young youtubers between 14 a 16 years. They are the stars of this generation. To some extent, it is much democratic and easy to access to stardom nowadays then when we were young. You make some cool movies, publish and promote them on youtube. And then hope for "Likes" and "Subscriptions", the measures of your success.

At the end of the screening, there were presentations by the members of the panel. One of them, was about setting up a Minecraft server, hiring developers, promoting and marketing of the server. Strategies on how retain as long as possible the online players. Full of good common sense without the professional bullshit. This by a 16 years old kid. It was amazing.
























Remaining relevant: Abstraction layers and API's for cloud native applications

Separation of concerns, abstraction layers and API's for Cloud native environments Those 3 terms are closely related to each other. T...