New Java authorizer Interface. Support for non-key joining in KTable. Administrative API for replica reassignment. Kafka Connect now supports incremental cooperative rebalancing. Kafka Streams now supports an in-memory session store and window store. The AdminClient now allows users to determine what operations they are authorized to perform on topics. There is a new broker start time metric.
We now track partitions which are under their min ISR count. Consumers can now opt-out of automatic topic creation, even when it is enabled on the broker. Kafka components can now use external configuration stores KIP We have implemented improved replica fetcher behavior when errors are encountered. Here is a summary of some notable changes: Java 11 support Support for Zstandard, which achieves compression comparable to gzip with higher compression and especially decompression speeds KIP Avoid expiring committed offsets for active consumer group KIP Provide Intuitive User Timeouts in The Producer KIP Kafka's replication protocol now supports improved fencing of zombies.
Previously, under certain rare conditions, if a broker became partitioned from Zookeeper but not the rest of the cluster, then the logs of replicated partitions could diverge and cause data loss in the worst case KIP Here is a summary of some notable changes: KIP adds support for prefixed ACLs, simplifying access control management in large secure deployments.
Bulk access to topics, consumer groups or transactional ids with a prefix can now be granted using a single rule. Access control for topic creation has also been improved to enable access to be granted to create specific topics or topics with a prefix.
Host name verification is now enabled by default for SSL connections to ensure that the default SSL configuration is not susceptible to man-in-the-middle attacks. You can disable this verification if required. You can now dynamically update SSL truststores without broker restart. With this new feature, you can store sensitive password configs in encrypted form in ZooKeeper rather than in cleartext in the broker properties file.
The replication protocol has been improved to avoid log divergence between leader and follower during fast leader failover. We have also improved resilience of brokers by reducing the memory footprint of message down-conversions.
By using message chunking, both memory usage and memory reference time have been reduced to avoid OutOfMemory errors in brokers. Kafka clients are now notified of throttling before any throttling is applied when quotas are enabled. This enables clients to distinguish between network errors and large throttle times when quotas are exceeded.
We have added a configuration option for Kafka consumer to avoid indefinite blocking in the consumer. We have dropped support for Java 7 and removed the previously deprecated Scala producer and consumer. Kafka Connect includes a number of improvements and features. KIP enables you to control how errors in connectors, transformations and converters are handled by enabling automatic retries and controlling the number of errors that are tolerated before the connector is stopped.
More contextual information can be included in the logs to help diagnose problems and problematic messages consumed by sink connectors can be sent to a dead letter queue rather than forcing the connector to stop. KIP adds a new extension point to move secrets out of connector configurations and integrate with any external key management system.
The placeholders in connector configurations are only resolved before sending the configuration to the connector, ensuring that secrets are stored and managed securely in your preferred key management system and not exposed over the REST APIs or in log files. Scala users can have less boilerplate in their code, notably regarding Serdes with new implicit Serdes. Message headers are now supported in the Kafka Streams Processor API, allowing users to add and manipulate headers read from the source topics and propagate them to the sink topics.
Windowed aggregations performance in Kafka Streams has been largely improved sometimes by an order of magnitude thanks to the new single-key-fetch API. We have further improved unit testibility of Kafka Streams with the kafka-streams-testutil artifact. Here is a summary of some notable changes: Kafka 1. ZooKeeper session expiration edge cases have also been fixed as part of this effort.
Controller improvements also enable more partitions to be supported on a single cluster. Follower nodes forward write requests them to the current leader. Schema Registry stores all its schemas in Kafka, and therefore Schema Registry nodes do not require storage and can be deployed in containers.
Confluent Replicator Confluent Replicator is a new component added to Confluent Enterprise to help manage multi-cluster deployments of Confluent Platform and Apache Kafka. It provides a centralized configuration of cross-cluster replication. Confluent Replicator is integrated with the Kafka Connect framework and should be installed on the Connect nodes in the destination cluster.
If there are multiple Connect worker nodes, Replicator should be installed on all of them. By installing Replicator on a larger number of nodes, Replicator will scale to replicate at higher throughput and will be highly available through a built-in failover mechanism.
Auto Data Balancing evaluates information on the number of brokers, partitions, leaders and sizes of partitions to decide on a balanced placement of partitions on brokers and modify the replicas assigned to each broker to achieve a balanced placement. For example, when a new broker is added to the cluster, Auto Data Balancing will move partitions to the new broker to balance the load between all brokers available in the cluster. To avoid impact on production workloads, the rebalancing traffic can be throttled to a fraction of the available network capacity.
Auto Data Balancing can be installed on any machine in the Confluent Platform cluster — it just needs to be able to communicate with the Kafka brokers and ZooKeeper to collect load metrics and send instructions to move partitions.
For convenience, we recommend installing it alongside Kafka brokers or on the Confluent Control Center node if available. Use Control Center to verify that every message sent is received and received only once , and to measure system performance end to end. Drill down to better understand cluster usage, and identify any problems. Configure alerts to notify you when end-to-end performance does not match SLAs or measure whether messages sent were received.
You can easily add new sources to load data from external data systems and new sinks to write data into external data systems. Additionally, you can manage, monitor, and configure connectors with Confluent Control Center. Confluent Control Center currently runs on a single machine, and due to the resources required we recommend dedicating a separate machine for Control Center. Each component is given its own servers, and if any layer becomes overly loaded it can be scaled independently simply by adding nodes to that specific layer.
Small Cluster Reference Architecture Usually companies start out by adopting Confluent Platform for one use case with limited load, and when this proves successful they grow the cluster to accommodate additional applications and teams.
This architecture is recommended for the early stages where investing in full-scale deployment is usually not required for the success of the project. In those cases, starting with fewer servers and installing multiple components per server is the way to go. As the use case expands, you will notice bottlenecks develop in the system.
In this case the correct approach is to start by separating the bottleneck components to their own servers, and when further growth is required, scale by adding servers to each component.
With time, this architecture will evolve to resemble the recommended large scale architecture. Capacity Planning When planning your architecture for Confluent Platform, you need to provide sufficient resources for the planned workload.
Storage, memory, CPU and network resources can all be potential bottlenecks and must be considered. Since every component is scalable, usage of storage, memory and CPU can be monitored on each node and additional nodes can be added when required. While most components do not store state, there is no problem to add nodes at any time and immediately take advantage of the added capacity. The main exception is Kafka brokers, which serve as the main storage component for the cluster.
The reason is that the rebalancing operation itself takes resources and the more resources you can spare for rebalancing, the less time it will take to rebalance and the sooner the cluster will benefit from additional capacity.
Confluent Control Center enables you to configure alerts when SLAs are not met, which will allow you to take action proactively. Storage Storage is mostly a concern on ZooKeeper and Kafka brokers. For ZooKeeper the main concern is low latency writes to the transaction log. Therefore we recommend dedicated disks, specifically for storing the ZooKeeper transaction log even in small scale deployment where ZooKeeper is installed alongside Kafka Brokers. Kafka brokers are the main storage for the Confluent Platform cluster and therefore usually require ample storage capacity.
Most deployments use disks, usually 1TB each. The exact amount of storage you will need obviously depends on the number of topics, partitions, the rate at which applications will be writing to each topic and the retention policies you configure. You also want to consider the type of storage. SSD and spinning magnetic drives offer different performance characteristics and depending on your use case, SSD performance benefits may be worth their higher cost.
While Kafka brokers write sequentially to each partition, most deployments store more than one partition per disk and if your use case requires Kafka brokers to access disk frequently, minimizing seek times will increase throughput. When selecting a file system, we recommend either EXT4 or XFS — both have been tested and used extensively in production Kafka clusters. Note that the use of shared-storage devices, while supported, is not recommended.
All local data is kept in the directory specified by the confluent. The exact use of storage depends on the specific streams application.
Aggregation, windowed aggregation and windowed join all use RocksDB stores to store their state. The size used will depend on the number of partitions, unique keys in the stream cardinality , size of keys and values and the retention for windowed operations specified in the DSL using until operator.
Note that Kafka Streams uses quite a few file descriptors for its RocksDB stores, so make sure to increase number of file descriptors to 64K or above. Since calculating exact usage is complex, we typically allocate generous disk space to streams application to allow for ample local state. Memory Sufficient memory is essential for efficient use of almost all of Confluent Platform components.
Too small of a heap will result in high CPU due to constant garbage collection while too large heap may result in long garbage collection pauses and loss of connectivity within the ZooKeeper cluster. The JVM heap is used for replication of partitions between brokers and for log compaction.
Replication requires 1MB default replica. In Apache Kafka 0. For log compaction, calculating the required memory is more complicated and we recommend referring to the Kafka documentation if you are using this feature.
For small to medium-sized deployments, 4GB heap size is usually sufficient. In addition, it is highly recommended that consumers always read from memory, i. The amount of memory this requires depends on the rate at this data is written and how far behind you expect consumers to get. If you write 20GB per hour per broker and you allow brokers to fall 3 hours behind in normal scenario, you will want to reserve 60GB to the OS page cache. In cases where consumers are forced to read from disk, performance will drop significantly.
Kafka Connect itself does not use much memory, but some connectors buffer data internally for efficiency. If you run multiple connectors that use buffering, you will want to increase the JVM heap size to 1GB or higher. The JVM heap size can be fairly small defaults to 3GB but the application needs the additional memory for RocksDB in-memory indexes and caches as well as OS page cache for faster access to persistent data.
Our clients attempt to batch data as it is sent to brokers in order to use the network more efficiently, and in addition they store messages in memory until they are acknowledged successfully by the brokers. Having sufficient memory for the producer buffers will allow the producer to keep retrying to send messages to the broker in events of network issues or leader election rather than block or throw exceptions. Kafka Streams and Confluent KSQL have several memory areas and the total memory usage will depend on your specific streams application and on the configuration.
Starting with Apache Kafka 0. It defaults to 10MB and controlled through cache. Setting it higher will generally result in better performance for your streams application. In addition, streams uses RocksDB memory stores for each partition involved in each aggregation, windowed aggregation and windowed-join. In addition, Kafka Streams uses a Kafka consumer for each thread you configure for your application.
Each consumer allocates the lower of either 1MB per partition or 50MB per broker. Since calculating all these variables is complex and since more memory generally increases performance of streams applications, we typically allocate large amounts of memory — 32GB and above. Consumers use at least 2MB per consumer and up to 64MB in cases of large responses from brokers typical for bursty traffic.
Producers will have a buffer of 64MB each. Note that in all cases, we recommend using the G1 garbage collection for the JVM to minimize garbage collection overhead. If you notice high CPU it is usually a result of misconfiguration, insufficient memory, or a bug. We recommend enabling compression since it improves network and disk utilization, but it does use more CPU on the clients.
Kafka brokers older than 0. Large scale deployment often go to some length to make sure consumers are deployed within the same LAN as the brokers where encryption is often not a requirement. In those cases configuring clients to batch requests will improve performance. Note that many components are multi-threaded and will benefit more from large number of cores than from faster cores.
Network Large-scale Kafka deployments that are using 1GbE will typically become network-bound. When provisioning for network capacity, you will want to take into account the replication traffic and leave some overhead for rebalancing operations and bursty clients.
Network is one of the resources that are most difficult to provision since adding nodes will eventually run against switch limitations, therefore consider enabling compression to get better throughput from existing network resources. Note that Kafka Producer will compress messages in batches, so configuring the producer to send larger batches will result in better compression ratio and improved network utilization.
RAID 10 At least 3, more for is optional. More is better. Typically not CPU- Other than installation, bound. More cores is better ing on connectors than faster cores.
0コメント