Consumer

Pasted image 20230301204659.webp

References

Quote

Consumers read messages. In other publish/subscribe systems, these clients may be called subscribers or readers. The consumer subscribes to one or more topics and reads the messages in the order in which they were produced to each partition. The consumer keeps track of which messages it has already consumed by keeping track of the offset of messages (Page 7) #✂️

Quote

Consumers can only read messages that are committed. (Page 162) #✂️

Quote

Consumers work as part of a consumer group, which is one or more consumers that work together to consume a topic. The group ensures that each partition is only consumed by one member (Page 7) #✂️

Quote

Consumers maintain membership in a consumer group and ownership of the partitions assigned to them by sending heartbeats to a Kafka broker designated as the group coordinator (this broker can be different for different consumer groups) (Page 82) #✂️

Quote

The mapping of a consumer to a partition is often called ownership of the partition by the consumer (Page 7) #✂️

Quote

All producers must connect to the leader in order to publish messages, but consumers may fetch from either the leader or one of the followers (Page 8) #✂️

Quote

How does a consumer commit an offset? It sends a message to Kafka, which updates a special __consumer_offsets topic with the committed offset for each partition. As long as all your consumers are up, running, and churning away, this will have no impact. However, if a consumer crashes or a new consumer joins the consumer group, this will trigger a rebalance. After a rebalance, each consumer may be assigned a new set of partitions than the one it processed before. In order to know where to pick up the work, the consumer will read the latest committed offset of each partition and continue from there (Page 94) #✂️

Flashcards

Consumers read::: These read data from a Topic

Consumers know::: These know which Broker to read from

Consumers automatically recover on broker failure

Consumers read in order from::: This is how what reads from low to high offset with each Partition

Consumers use a Pull Model as it relates to reading from a topic

Ordered consumption occurs when reading from a single Partition

Order consumption is not insured if reading from multiple Partitions

We would like to be in an at-most once consuming scenario. Which offset commit strategy would you recommend?:: Commit the offsets in Kafka, before processing the data

A consumer starts and has auto.offset.reset=latest, and the topic partition currently has data for offsets going from 45 to 2311. The consumer group has committed the offset 643 for the topic before. Where will the consumer read from?:: offset 643

You are doing complex calculations using a machine learning framework on records fetched from a Kafka topic. It takes more about 6 minutes to process a record batch, and the consumer enters rebalances even though it's still running. How can you improve this scenario?:: Here, we need to change the setting max.poll.interval.ms (default 300000) to its double in order to tell Kafka a consumer should be considered dead if the consumer only if it hasn't called the .poll() method in 10 minutes instead of 5.

How does a consumer commit offsets in Kafka?:: Consumers do not directly write to the __consumer_offsets topic, they instead interact with a broker that has been elected to manage that topic, which is the Group Coordinator broker

A consumer starts and has auto.offset.reset=none, and the topic partition currently has data for offsets going from 45 to 2311. The consumer group has committed the offset 10 for the topic before. Where will the consumer read from?:: auto.offset.reset=none means that the consumer will crash if the offsets it's recovering from have been deleted from Kafka, which is the case here, as 10 < 45

In the Kafka consumer metrics it is observed that fetch-rate is very high and each fetch is small. What steps will you take to increase throughput?:: Increase fetch.min.bytes This will allow consumers to wait and receive more bytes in each fetch request.

A producer just sent a message to the leader broker for a topic partition. The producer used acks=1 and therefore the data has not yet been replicated to followers. Under which conditions will the consumer see the message?:: When the high watermark has advanced. The high watermark is an advanced Kafka concept, and is advanced once all the ISR replicates the latest offsets. A consumer can only read up to the value of the High Watermark (which can be less than the highest offset, in the case of acks=1)

To read data from a topic, the following configuration is needed for the consumers:: any broker to connect to, and the topic name. All brokers can respond to Metadata request, so a client can connect to any broker in the cluster.

To allow consumers in a group to resume at the previously committed offset, I need to make sure the consumers in a group use the same...:: group.id Setting a group.id that's consistent across restarts will allow your consumers part of the same group to resume reading from where offsets were last committed for that group

There are 3 producers writing to a topic with 5 partitions. There are 10 consumers consuming from the topic as part of the same group. How many consumers will remain idle?:: 5 One consumer per partition assignment will keep 5 consumers idle.

A consumer sends a request to commit offset 2000. There is a temporary communication problem, so the broker never gets the request and therefore never responds. Meanwhile, the consumer processed another batch and successfully committed offset 3000. What should you do?:: In this case, because the offset 3000 has been committed and all the messages between 0 and 3000 have all been processed, it is okay not to have committed offset 2000. The right answer is to do "nothing", this behaviour is acceptable

A topic has three replicas and you set min.insync.replicas to 2. If two out of three replicas are not available, what happens when a consume request is sent to broker?:: Data will be returned from the remaining in-sync replica

Consumer failed to process record # 10 and succeeded in processing record # 11. Select the course of action that you should choose to guarantee at least once processing:: Do not commit until successfully processing the record #10 Here, you shouldn't commit offsets 11 or 10 as it would indicate that the message #10 has been processed successfully.

A consumer has auto.offset.reset=latest, and the topic partition currently has data for offsets going from 45 to 2311. The consumer group never committed offsets for the topic before. Where will the consumer read from?:: offset 2311 Latest means that data retrievals will start from where the offsets currently end

A consumer is configured with enable.auto.commit=false. What happens when close() is called on the consumer object?:: A rebalance in the consumer group will happen immediately. Calling close() on consumer immediately triggers a partition rebalance as the consumer will not be available anymore.

What is a generic unique id that I can use for messages I receive from a consumer?:: topic + partition + offset

If you configure enable.auto.commit=true what is the default frequency the consumer will commit it's offset?:: 5 seconds, can be modified using auto.commit.interval.ms

fetch.min.bytes::: Allows a consumer to specify the minimum amount of data that it wants to receive from the broker when fetching records, default is 1 bye. If a broker receives a request for records from a consumer but the new records amount to fewer bytes than specified, the broker will wait until more messages are available

fetch.max.wait.ms::: Kafka will wait until it has enough data to send before responding to the consumer. This lets you control how long to wait, by default it's 500ms

fetch.max.bytes::: Specify the maximum bytes that Kafka will return whenevber the consumer polls a broker, 50mb by default. This is partition and message count agnostic.

max.poll.records::: Controls the maximum number of records that a single call to poll() will return. This limits the number of records returned (not the overall disk size)

max.partition.fetch.bytes::: This property controls the maximum number of bytes the server will return per partition. 1 MB by default

session.timeout.ms and heartbeat.interval.ms::: The amount of time a consumer be out of contact with the brokers while still considered alive. Defaults to 10 seconds. If more than this variable value passes without the consumer sending a heartbeat to the group coordinator, it is considered dead and will trigger a rebalance. These 2 settings are intertwined in that one is often 1/3 of the other as the second setting denotes how frequently a heartbeat signal is sent to the brokers.

max.poll.interval.ms::: Set the length of time during which the consumer can go without polliong before it is considered dead. This is a backstop to session.timeout.ms and heartbeat.interval.ms just in case the main thread is deadlocked. Default is 5 minutes

default.api.timeout.ms::: The timeout that applies to all API calls made by the consumer unless explicitly set otherwise (poll() always requires an explicit value)

request.timeout.ms::: Maximum amount of time the consumer will wait for a response from the broker. Default is 30 seconds.

auto.offset.reset::: Controls the behavior of the consumer when it starts reading a partition for which it doesn't have a committed offset. Default is latest which means that lacking a valid offset, the consumer will start reading from the newest records. Can also set earliest where the consumer will read all the data in the partition and none will cause an exception to be thrown when attempting to consume from an invalid offset

partition.assignment.strategy
??

client.id::: This can be any string, and will be used by the brokers to identify requests sent from the client, such as fetch requests.

client.rack::: By default, consumers will fetch messages from the leader replica of each partition. This enables to fetch messages from a replica that is located in the same zone as the consumer. Also requires replacing the default broker replica.selector to RackAwareReplicaSelector

receive.buffer.bytes and send.buffer.bytes::: These are the sizes of the TCP send and receive buffers used by the sockets when writing and reading data. -1 is the OS default

offsets.retention.minutes::: This is a broker configuration but impacts consumer groups because if the consumer group remains disconnected for longer than the default value (7 days) it will be treated as a brand new consumer group since the offset topic entry will have been removed

group.instance.id::: When this setting is set, makes the consumer a static member of the associated consumer group. No 2 consumers can have the same value, the second one will get an exception.

A static consumer group member has what benefits?:: Maintains it's ownership of the original partitions assigned to it. When it shutsdown, consumer rebalance does not occur until it's session has expired (default 5 minutes)

The first consumer to join a consumer group:: becomes the group leader

Consumers send heartbeats to maintain their ownership on their partitions to which broker?:: The group coordinator

During this type of rebalance, all consumers stop consuming, give up their ownership of all partitions, rejoin the consumer group, and get a brand-new partition assignment::: Eager rebalance

During this type of rebalance, a subset of consumers may stop consuming, and only a subset of partitions are reassigned. This is done in an incremental process of multiple rebalances.::: Cooperative rebalance