Preview image

Introduction

Message brokers are the links between the sender and recipient of messages in distributed application systems. They provide methods for passing messages between applications and components, which is one of the key mechanisms for asynchronous data exchange in modern information systems. In this article, we will look at the Apache Kafka platform - a distributed messaging system that allows you to transfer data streams between various applications and services.

Distributed system

A distributed system is a collection of computer programs that use the computing resources of several separate computing nodes to achieve one common goal. It is also called distributed computing or distributed database. A distributed system is based on individual nodes that communicate and synchronize over a common network. Typically, nodes are separate physical hardware devices, but they can also be separate software processes or other recursive encapsulated systems. Distributed systems aim to eliminate bottlenecks or single points of failure in a system.

Distributed computing systems have the following characteristics.

  • Resource Sharing: A distributed system can share hardware, software, or data.
  • Parallel processing: the same function can be processed simultaneously by several machines.
  • Scalability: Computing power and performance can scale as needed as additional machines are added.
  • Error detection: makes it easier to detect failures.
  • Transparency: a node can access and exchange data with other nodes in the system.

In a centralized computing system, all calculations are performed on one computer and in one place. The main difference between centralized and distributed systems is the model of interaction between system nodes. The state of the centralized system is stored in a central node that is accessed individually by clients. Since all nodes in a centralized system access the central node, this can overload the network and slow it down. A centralized system has a single point of failure, whereas a distributed system does not.

Inter-service communication

There are 2 main types of interaction between services: synchronous and asynchronous.

image of interaction between services

Synchronous interaction
services means that one service must wait for a response from another service before continuing its work. For example, if Service A needs to receive data from Service B, it must send a request for the data and wait for a response from Service B before continuing.
Asynchronous communication
services means that services communicate with each other without waiting for each other to respond. Instead, each service sends a request to perform a task and then continues without waiting for a response.

You can learn more about the methods of internetworking at LINK.

Classic queuing services

Queuing systems typically consist of three basic components:

  1. server(broker)
  2. producers who send messages to a named queue, pre-configured by the administrator on the server
  3. consumers who read the same messages as they are appearance
image of three basic components

In web applications, queues are often used for deferred processing. events or as a temporary buffer between other services, so thereby protecting them from load surges.

Consumers receive data from the server (broker) using two different request models: pull or push.

image of two different query models
pull model
— consumers themselves send a request once every n seconds to the server for receiving a new batch of messages. With this approach, clients can effectively control your own workload. Besides, The pull model allows you to group messages into batches, thus achieving better throughput. The disadvantages of the model can be attribute potential load imbalance between different consumers, as well as higher data processing latency.
push model (RabbitMQ)
— the server makes a request to the client, sending it a new portion of data. It reduces message processing latency and allows efficient balance the distribution of messages among consumers.

A typical message lifecycle in queuing systems is:

  1. The producer sends a message to the server.
  2. The consumer fetches (from the English fetch - bring) a message and its unique server identifier.
  3. The server marks the message as in-flight. Messages in this state are still stored on the server, but are temporarily not delivered to others to consumers. The timeout of this state is controlled by a special setting.
  4. The consumer processes the message following business logic. Then sends an ack or nack request back to the server using a unique identifier obtained earlier - thereby either confirming the successful processing of the message, or signaling error.
  5. If successful, the message is deleted from the server forever. When error or state timeout in-flight message is delivered to the consumer for reprocessing.
message lifecycle image

Apache KAFKA

This platform is used to create streaming pipelines real-time data and applications that adapt to flows data. It combines messaging, storage and stream processing information. Thanks to this, you can store and analyze like old data and those that arrive in real time.

Kafka gives users three main features:

  • Publish and subscribe to post streams
  • Efficiently store streams of records in the order they were created
  • Process recording streams in real time

The main entities for Apache Kafka are:

  • Broker is the main component that is responsible for storing and managing data flows. He gets, stores and distributes messages between producers and consumers.
  • ZooKeeper - separate auxiliary product for storing cluster state, configuration and metadata.
  • Kafka cluster - a group of Kafka brokers, which work together to store and process messages. Cluster provides scalability, fault tolerance and distribution loads.
  • Producer is a component that creates and sends messages to the Kafka broker. The producer may be associated with many topics (topics) and can send messages to several brokers.
  • Topic - category or channel to which The producer publishes messages and the consumer (consumer) reads them. A topic is a logical unit grouping messages. They are sent in the same order as arrived (FIFO).
  • Consumer is a component that reads and processes messages from the Kafka broker. Consumer can be associated with many topics and receive messages from several brokers. Consumers read messages from sections (partitions) within a topic and retain their offset for tracking reading progress.

Like queue processing services, Kafka conventionally consists of three components:

  1. server (broker)
  2. producers - they send messages to the broker
  3. consumers - read these messages using the pull model
image of three kafka components

Perhaps the fundamental difference between Kafka and queues is how messages are stored on the broker and consumed by consumers.

  • Messages in Kafka are not deleted by brokers as they processing by consumers - data in Kafka can be stored for days, weeks, years
  • Thanks to this, the same message can be processed as many times as you like by different consumers and in different contexts

This is the main power and the main difference between Kafka and traditional messaging systems.

Kafka uses a pull model approach that allows users to request message packages from specific offsets. Users can use batch message processing to increasing throughput and efficient message delivery.

What is Apache Kafka used for?

Apache Kafka is best for streaming from A to B without complex routing, but with maximum throughput. The tool excels at stream processing and modeling changes in the system as a sequence of events. Kafka also can be used for data processing in a multi-stage pipeline processing. Kafka is a great solution if you need a framework for storing, reading, re-reading and analyzing streaming data. Her Our strong point is real-time data processing and analysis. The tool is ideal for permanently storing messages or for regularly checked systems.

Where can Apache Kafka be used?

  • Logs and metrics: Kafka can be used for collecting, storing and analyzing logs and metrics to extract valuable information about the system.
  • Microservice architecture: Apache Kafka can be used for communication and synchronization between microservices in a distributed system. It can be used to transfer messages and events between different services, ensuring reliable delivery and guarantee of message processing.
  • Data stream processing: Apache Kafka can used to process data streams in real time. He can receive data streams from various sources, e.g. sensors, IoT devices, etc., and process them practically instantly. This makes it ideal for solving machine tool problems. training, data analysis and monitoring.
  • Web consumer analysis: Apache Kafka can be used to analyze web consumers by processing data into real time from various sources. For example it could be used to build a recommendation system that processes data on user behavior and provides personalized recommendations.
  • Integration and streaming data processing: Apache Kafka can be used to integrate and process data in real time with other systems such as databases, systems data management and other data sources. It provides powerful tools for performing streaming data transformations and aggregations.
  • Persistence logs: Kafka can used as persistence logs to save and data recovery. It can be used to record and restoring the system state in case of failures, etc.

Apache Kafka data structure

Messages in Kafka are organized and stored in named topics (Topics), each topic consists of one or more partitions distributed between brokers within the same cluster. This distribution is important for horizontal scaling of the cluster, as it allows clients write and read messages from several brokers simultaneously.

When a new message is added to a topic, it is actually is written to one of the partitions of this topic. Messages with the same keys are always written to the same partition, thereby guaranteeing the order or order of writing and reading.

To guarantee data safety, each partition in Kafka can be replicated n times, where n is the replication factor. Thus guarantees the presence of several copies of the message stored on different brokers.

image of adding a new message to a topic

Each partition has a “leader” - a broker who works with clients. It is the leader who works with producers and, in general, gives messages to consumers. Followers make requests to the leader (Follower) - brokers that store a replica of all partition data. Messages are always sent to the leader and are generally read from leader.

To understand who is the partition leader before writing and reading clients make a request for metadata from the broker. And they can connect to any broker in the cluster.

image of the main Apache Kafka data structure

The main data structure in Kafka is distributed, replicated log. Each partition is the same replicated log that stored on disk. Each new message sent by the producer to partition, is saved in the “head” of this log and receives its a unique, monotonically increasing offset (a 64-bit number that appointed by the broker himself).

Kafka topic data typically consists of three types of files: .log files, .index files and .timeindex files. Each of these files contains different data and information about offsets of messages in the topic:

  • .log: contain the actual messages sent to the topic. Each message is recorded in a separate line of the .log file. .log files contain raw data and are used to store Kafka messages on disk.
  • .index: contain indexes for quick search for messages in .log files. Each index indicates an offset (offset) messages in the .log file. Indexes make searching easier messages by offset, making it more efficient and faster.
  • .timeindex: contain indexes based on on time, to quickly search for messages in .log files. Every index indicates the offset of the message in the .log file, associated with a specific time. This allows you to quickly find everything messages sent within a specific time range.

Together, these files allow Kafka to efficiently store and process messages in topics. By tracking offsets, you can easily transfer successive messages and find a specific message by time or offset.

So we have already found out that messages are not deleted from the log after transmission consumers and can be read as many times as desired. Time guaranteed data storage on the broker can be controlled from using special settings. The duration of storage of messages in this case does not affect overall system performance. Therefore completely it's normal to store messages in Kafka for days, weeks, months or even for years.

Deploying a KAFKA cluster

  1. Installing the package and creating a directory system
    1. Create a folder for the cluster
    2. Let's create working directories for three nodes kafka_server_1, kafka_server_2 and kafka_server_3.
      image of creating a working directory for three nodes
    3. Download, unpack and copy the contents to each broker directories https://downloads.apache.org/kafka/3.5 .0/kafka_2.13-3.5.0.tgz :
      image of broker directories
  2. Setting up nodes
    1. Kafka broker configuration (from the kafka_server_1, kafka_server_2 and kafka_server_3 respectively)
      config/server.properties
      • broker id(0, 1, 2)
      • client port( 9092, 9093, 9094)
      • Zookeeper port(2181, 2182, 2183)
      • log directories(/tmp/kafka-logs-1, /tmp/kafka-logs-2, /tmp/kafka-logs-3)
      broker.id=0
                                   listeners=PLAINTEXT://:9092
                                   zookeeper.connect=localhost:2181
                                   log.dirs=/tmp/kafka-logs-1
    2. Zookeeper node configuration:
      
                                   config/server.properties
      • data directory (/tmp/zookeeper_1, /tmp/zookeeper_2, /tmp/zookeeper_3)
      • client port(2181, 2182, 2183)
      • maximum number of client connections and limits connections
      • ports for data exchange between nodes (i.e. we inform each node about existence of others)
      
                                  dataDir=/tmp/zookeeper_1
                                  clientPort=2181
                                  maxClientCnxns=60
                                  initLimit=10
                                  syncLimit=5
                                  tickTime=2000
                                  server.1=localhost:2888:3888
                                  server.2=localhost:2889:3889
                                  server.3=localhost:2890:3890
                                                  
    3. Creating directories for Zookeeper nodes, recording node ids in service files:
      
                                   mkdir -p /tmp/zookeeper_[1..3]
                                   echo "1" >> /tmp/zookeeper_1/myid
                                   echo "2" >> /tmp/zookeeper_2/myid
                                   echo "3" >> /tmp/zookeeper_3/myid
                                                   
  3. Launching zookeeper nodes and brokers

    To start a cluster you need from each root directory( kafka_server_1, kafka_server_2 and kafka_server_3) run the scripts:

    
                         sudo bin/zookeeper-server-start.sh config/zookeeper.properties
                         sudo bin/kafka-server-start.sh config/server.properties
                                     
  4. Creation of a topic, producer and consumer:
    
                         sudo bin/kafka-topics.sh --create --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --replication-factor 3 --partitions 3 --topic TestTopic
                                     

    We created a topic (topic) with 3 partitions on 3 servers and duplicated all partitions on each server (replication factor).

    You can find out information about a topic with the following command:

    
                         bin/kafka-topics.sh --describe --topic TestTopic --bootstrap-server localhost:9094
                                     

    We get:

    image of topic creation, producer and consumer
    • Leader - server with the main instance of the partition
    • replica - server on which information is duplicated
    • ISR - servers that take on the role of leaders in case of leader failure

Testing the fault tolerance of the Apache Kafka cluster:

  1. First, let's create console producers and consumers:
    
                         sudo bin/kafka-console-producer.sh --broker-list localhost:9092,localhost:9093,localhost:9094 --topic TestTopic
                         sudo bin/kafka-console-consumer.sh --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --from-beginning --topic TestTopic
                                     
    image of the creation of a console producer and consumer

    On the right is the producer who sends the message, on the left - consumer who reads.

  2. Let's stop one of the kafka nodes (kafka_server_1):
    image of one of the kafka nodes stopping

    We see that the Leader and Isr parameters have changed. For Partition: 1 the leader changed from the first broker with id=0 to the second with id=1, as well as must be according to the Isr parameter.

    Let's try to write a message after the first broker stops:

    image of the message after stopping the first broker

    We see that the message has arrived.

  3. Let's stop one of the kafka nodes (kafka_server_1):
    image of one of the kafka nodes stopping

    Since there is only one broker left to work, accordingly he become a leader for all parties.

    Let's try to write a new message:

    new message image

    We see that this time too, the message got through. Thus we observe Kafka cluster fault tolerance.

Analysis of Apache Kafka data structure in practice

  1. Let's create a new topic with 3 ports and 3 replicas on 3 brokers to make the recording uniform across all partitions from one consumer needs to add the RoundRobin class to config/producer.properties
    
                         partitioner.class=org.apache.kafka.clients.producer.RoundRobinPartitioner
                                     
    image of creating a new topic

    There are 3 directories created in the kafka broker log directories:

    image of kafka broker logs

    We have created three partitions in the topic, and each has its own directory in file system on each broker. In leader-epoch-checkpoint information about the current epochal number of the leader and his identifier, as well as information about the last applied and sent journal entries for each leader. This is used to control repeat operations and reorder records in case of emergency or change of leader. In partition.metadata contain information about each topic partition in the Kafka cluster. They include the following information:

    • Partition ID: unique identifier section within the topic.
    • Topic ID: ID of the topic to which belongs to the section.
    • Section Leader: the broker who is the leader of the section and is responsible for reading and writing data to this section.
    • Partition Replicas: All brokers that contain a copy this section.
    • Version: The current version of the section's metadata.
    • State: The state of the partition, such as "Online" or "Offline".

    The log-start-offset-checkpoint parameter is also visible, which used in Apache Kafka to store information about the latest recorded offset (offset) in the log of each partition, which is necessary to continue reading data from where you left off consumer. Also used for recovery after a failure. If Kafka broker crashes, it recovers from last saved checkpoint file. This allows you to restore data and state of the broker before the failure.

  2. Let's add a few messages to the topic. Let's connect to the first broker producer, who is the leader of one of the parties and will be sent to he has several messages:
    image of adding several messages to a topic

    Messages are written to the .log file, so we can see where the messages were recorded - in the TestTopic-1 port:

    image of recorded messages

    Each new message in the partition is assigned an Id that is 1 more previous one. This Id is also called offset. At the first messages have an offset of 0, the second one has an offset of 1, etc., each next one is always 1 more than the previous one. Let's check our example with the tool kafka:

    
                        bin/kafka-run-class.sh kafka.tools.DumpLogSegments --deep-iteration --print-data-log --files /tmp/kafka-logs-1/TestTopic-1/00000000000000000000.log
                                    
    offset image

    Here we see offset, Create Time, size key(keySize) and value(valueSize) , and also the message itself (payload).

  3. Let's make sure that when connecting to different brokers - record carried out only in one partition, for which the broker is leader. Let's connect to the second broker and write several messages:
    image of recording multiple messages

    Let's check which partition the data was written to:

    image of checking which partition the data was written to

    The size of the TestTopic-0 log file has changed, let's check if there is messages we sent to the second broker:

    image checking whether there are messages

    We see that the sent messages were recorded in the TestTopic-0 partition.

    Let's repeat all the steps for the third broker:

    image of all actions for the third broker
    image of all actions for the third broker
    image of all actions for the third broker

    We conclude that the first broker is the leader of the TestTopic-1 partition, the second broker is TestTopic-0, and the third is TestTopic2.