Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/confluent_kafka/cimpl.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ class Consumer:
self, partition: TopicPartition, timeout: float = -1, cached: bool = False
) -> Tuple[int, int]: ...
def pause(self, partitions: List[TopicPartition]) -> None: ...
def paused(self) -> list[TopicPartition]: ...
def resume(self, partitions: List[TopicPartition]) -> None: ...
def seek(self, partition: TopicPartition) -> None: ...
def position(self, partitions: List[TopicPartition]) -> List[TopicPartition]: ...
Expand Down
8 changes: 6 additions & 2 deletions src/confluent_kafka/schema_registry/_sync/avro.py
Original file line number Diff line number Diff line change
Expand Up @@ -711,9 +711,13 @@ def field_transformer(rule_ctx, field_transform, message):
if self._from_dict is not None:
if ctx is None:
raise TypeError("SerializationContext cannot be None")
return self._from_dict(obj_dict, ctx)
obj_dict = self._from_dict(obj_dict, ctx)

return obj_dict
return {
"record": obj_dict,
"schema_id": schema_id,
"writer_schema": writer_schema
}

def _get_parsed_schema(self, schema: Schema) -> AvroSchema:
parsed_schema = self._parsed_schemas.get_parsed_schema(schema)
Expand Down
19 changes: 19 additions & 0 deletions src/confluent_kafka/src/Consumer.c
Original file line number Diff line number Diff line change
Expand Up @@ -780,6 +780,24 @@ Consumer_pause(Handle *self, PyObject *args, PyObject *kwargs) {
Py_RETURN_NONE;
}

static PyObject *Consumer_paused (Handle *self, PyObject *args) {

rd_kafka_topic_partition_list_t *partitions;
rd_kafka_resp_err_t err;
PyObject *py_list;

err = rd_kafka_assignment(self->rk, &partitions);
if (err) {
PyErr_SetString(PyExc_RuntimeError, rd_kafka_err2str(err));
return NULL;
}

py_list = c_parts_to_py(partitions);

rd_kafka_topic_partition_list_destroy(partitions);
return py_list;
}

static PyObject *
Consumer_resume(Handle *self, PyObject *args, PyObject *kwargs) {

Expand Down Expand Up @@ -1558,6 +1576,7 @@ static PyMethodDef Consumer_methods[] = {
" :rtype: None\n"
" :raises: KafkaException\n"
"\n"},
{"paused", (PyCFunction)Consumer_paused, METH_NOARGS, "Returns currently paused partitions"},
{"resume", (PyCFunction)Consumer_resume, METH_VARARGS | METH_KEYWORDS,
".. py:function:: resume(partitions)\n"
"\n"
Expand Down
24 changes: 23 additions & 1 deletion tests/test_Consumer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/usr/bin/env python
import pytest
import pytest

from confluent_kafka import (
OFFSET_INVALID,
Expand Down Expand Up @@ -785,3 +785,25 @@ def __init__(self, config):

with pytest.raises(RuntimeError, match="Consumer closed"):
consumer.consumer_group_metadata()

def test_paused():
""" Tests that Consumer.paused() returns the correct partitions. """

conf = {'group.id': 'test', 'bootstrap.servers': 'localhost'}
c = Consumer(conf)

tp = TopicPartition("test_topic", 1)

assert len(c.paused()) == 0

c.assign([tp])
c.pause([tp])

paused_list = c.paused()
assert len(paused_list) == 1
assert paused_list[0].topic == "test_topic"
assert paused_list[0].partition == 1

c.resume([tp])
assert len(c.paused()) == 0
c.close()