diff --git a/Oops.rej b/Oops.rej new file mode 100644 index 0000000000000000000000000000000000000000..328fc546fcb400745783b3562f1cb1cb055e1804 --- /dev/null +++ b/Oops.rej @@ -0,0 +1,26 @@ +@@ -1,25 +0,0 @@ +-# This workflow will run tests using node and then publish a package to GitHub Packages when a release is created +-# For more information see: https://help.github.com/actions/language-and-framework-guides/publishing-nodejs-packages +- +-name: Publish node-rdkafka +- +-on: +- release: +- types: [created] +- +-jobs: +- publish-npm: +- runs-on: ubuntu-latest +- steps: +- - uses: actions/checkout@v3 +- with: +- submodules: recursive +- - uses: actions/setup-node@v3 +- with: +- node-version: 18 +- registry-url: https://registry.npmjs.org/ +- cache: "npm" +- - run: npm ci +- - run: npm publish +- env: +- NODE_AUTH_TOKEN: ${{secrets.NPM_TOKEN}} diff --git a/docker-compose.yml b/docker-compose.yml index abe29df25c7312382074b3e15289cb862a340247..8a12f135b4f96e5a0dd25e7c21adb2b3b0e644fa 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,23 +1,51 @@ --- -zookeeper: - image: confluentinc/cp-zookeeper - ports: - - "2181:2181" - environment: - ZOOKEEPER_CLIENT_PORT: 2181 - ZOOKEEPER_TICK_TIME: 2000 -kafka: - image: confluentinc/cp-kafka - links: - - zookeeper - ports: - - "9092:9092" - environment: - KAFKA_BROKER_ID: 1 - KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' - KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092' - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 - KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 - KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 - KAFKA_DEFAULT_REPLICATION_FACTOR: 1 - KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 +version: '2' +services: + zookeeper: + image: confluentinc/cp-zookeeper + ports: + - "2181:2181" + networks: + - localnet + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + kafka: + image: confluentinc/cp-kafka + ports: + - 9092:9092 + - 9997:9997 + networks: + - localnet + depends_on: + - zookeeper + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092 + # KAFKA_LISTENERS: PLAINTEXT://kafka0:29092,PLAINTEXT_HOST://localhost:9092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_DEFAULT_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + kafka-ui: + container_name: kafka-ui + image: provectuslabs/kafka-ui:latest + ports: + - 8080:8080 + networks: + - localnet + depends_on: + - zookeeper + - kafka + environment: + KAFKA_CLUSTERS_0_NAME: local + KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092 + KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181 +networks: + localnet: + attachable: true + diff --git a/e2e/both.spec.js b/e2e/both.spec.js index a8289ec319239fb05b1f321bff78a7c9e267f1cf..85ca5ef64264a903a30d5d4bac31f6b1a3792102 100644 --- a/e2e/both.spec.js +++ b/e2e/both.spec.js @@ -163,7 +163,7 @@ describe('Consumer/Producer', function() { }); }); - + it('should return ready messages on partition EOF', function(done) { crypto.randomBytes(4096, function(ex, buffer) { producer.setPollInterval(10); diff --git a/e2e/consumer.spec.js b/e2e/consumer.spec.js index a167483f1e0ea15c4edcb368e36640b4349574e8..38fcfd7464afb7df682b7b5f1fdb228b8d280a25 100644 --- a/e2e/consumer.spec.js +++ b/e2e/consumer.spec.js @@ -11,10 +11,12 @@ var crypto = require('crypto'); var eventListener = require('./listener'); +var cooperativeRebalanceCallback = require('../lib/kafka-consumer').cooperativeRebalanceCallback; var KafkaConsumer = require('../').KafkaConsumer; +var AdminClient = require('../').AdminClient; +var LibrdKafkaError = require('../lib/error'); var kafkaBrokerList = process.env.KAFKA_HOST || 'localhost:9092'; -var topic = 'test'; describe('Consumer', function() { var gcfg; @@ -31,6 +33,7 @@ describe('Consumer', function() { }); describe('commit', function() { + var topic = 'test'; var consumer; beforeEach(function(done) { consumer = new KafkaConsumer(gcfg, {}); @@ -61,6 +64,7 @@ describe('Consumer', function() { }); describe('committed and position', function() { + var topic = 'test'; var consumer; beforeEach(function(done) { consumer = new KafkaConsumer(gcfg, {}); @@ -95,6 +99,7 @@ describe('Consumer', function() { }); it('after assign, should get committed array without offsets ', function(done) { + var topic = 'test'; consumer.assign([{topic:topic, partition:0}]); // Defer this for a second setTimeout(function() { @@ -110,6 +115,7 @@ describe('Consumer', function() { }); it('after assign and commit, should get committed offsets', function(done) { + var topic = 'test'; consumer.assign([{topic:topic, partition:0}]); consumer.commitSync({topic:topic, partition:0, offset:1000}); consumer.committed(null, 1000, function(err, committed) { @@ -123,6 +129,7 @@ describe('Consumer', function() { }); it('after assign, before consume, position should return an array without offsets', function(done) { + var topic = 'test'; consumer.assign([{topic:topic, partition:0}]); var position = consumer.position(); t.equal(Array.isArray(position), true, 'Position should be an array'); @@ -147,6 +154,7 @@ describe('Consumer', function() { }); describe('seek and positioning', function() { + var topic = 'test'; var consumer; beforeEach(function(done) { consumer = new KafkaConsumer(gcfg, {}); @@ -195,6 +203,7 @@ describe('Consumer', function() { describe('subscribe', function() { + var topic = 'test'; var consumer; beforeEach(function(done) { consumer = new KafkaConsumer(gcfg, {}); @@ -232,6 +241,7 @@ describe('Consumer', function() { describe('assign', function() { + var topic = 'test'; var consumer; beforeEach(function(done) { consumer = new KafkaConsumer(gcfg, {}); @@ -266,7 +276,346 @@ describe('Consumer', function() { }); }); + describe('assignmentLost', function() { + function pollForTopic(client, topicName, maxTries, tryDelay, cb, customCondition) { + var tries = 0; + + function getTopicIfExists(innerCb) { + client.getMetadata({ + topic: topicName, + }, function(metadataErr, metadata) { + if (metadataErr) { + cb(metadataErr); + return; + } + + var topicFound = metadata.topics.filter(function(topicObj) { + var foundTopic = topicObj.name === topicName; + + // If we have a custom condition for "foundedness", do it here after + // we make sure we are operating on the correct topic + if (foundTopic && customCondition) { + return customCondition(topicObj); + } + return foundTopic; + }); + + if (topicFound.length >= 1) { + innerCb(null, topicFound[0]); + return; + } + + innerCb(new Error('Could not find topic ' + topicName)); + }); + } + + function maybeFinish(err, obj) { + if (err) { + queueNextTry(); + return; + } + + cb(null, obj); + } + + function queueNextTry() { + tries += 1; + if (tries < maxTries) { + setTimeout(function() { + getTopicIfExists(maybeFinish); + }, tryDelay); + } else { + cb(new Error('Exceeded max tries of ' + maxTries)); + } + } + + queueNextTry(); + } + + var client = AdminClient.create({ + 'client.id': 'kafka-test', + 'metadata.broker.list': kafkaBrokerList + }); + var consumer1; + var consumer2; + var assignmentLostCount = 0; + var grp = 'kafka-mocha-grp-' + crypto.randomBytes(20).toString('hex'); + var assignment_lost_gcfg = { + 'bootstrap.servers': kafkaBrokerList, + 'group.id': grp, + 'debug': 'all', + 'enable.auto.commit': false, + 'session.timeout.ms': 10000, + 'heartbeat.interval.ms': 1000, + 'auto.offset.reset': 'earliest', + 'topic.metadata.refresh.interval.ms': 3000, + 'partition.assignment.strategy': 'cooperative-sticky', + 'rebalance_cb': function(err, assignment) { + if ( + err.code === LibrdKafkaError.codes.ERR__REVOKE_PARTITIONS && + this.assignmentLost() + ) { + assignmentLostCount++; + } + cooperativeRebalanceCallback.call(this, err, assignment); + } + }; + + beforeEach(function(done) { + assignment_lost_gcfg['client.id'] = 1; + consumer1 = new KafkaConsumer(assignment_lost_gcfg, {}); + eventListener(consumer1); + consumer1.connect({ timeout: 2000 }, function(err, info) { + t.ifError(err); + }); + assignment_lost_gcfg['client.id'] = 2; + consumer2 = new KafkaConsumer(assignment_lost_gcfg, {}); + eventListener(consumer2); + consumer2.connect({ timeout: 2000 }, function(err, info) { + t.ifError(err); + done(); + }); + }); + + afterEach(function(done) { + consumer1.disconnect(function() { + consumer2.disconnect(function() { + done(); + }); + }); + }); + + it('should return false if not lost', function() { + t.equal(false, consumer1.assignmentLost()); + }); + + it('should be lost if topic gets deleted', function(cb) { + this.timeout(100000); + + var time = Date.now(); + var topicName = 'consumer-assignment-lost-test-topic-' + time; + var topicName2 = 'consumer-assignment-lost-test-topic2-' + time; + var deleting = false; + + client.createTopic({ + topic: topicName, + num_partitions: 2, + replication_factor: 1 + }, function(err) { + pollForTopic(consumer1, topicName, 10, 1000, function(err) { + t.ifError(err); + client.createTopic({ + topic: topicName2, + num_partitions: 2, + replication_factor: 1 + }, function(err) { + pollForTopic(consumer1, topicName2, 10, 1000, function(err) { + t.ifError(err); + consumer1.subscribe([topicName, topicName2]); + consumer2.subscribe([topicName, topicName2]); + consumer1.consume(); + consumer2.consume(); + var tryDelete = function() { + setTimeout(function() { + if(consumer1.assignments().length === 2 && + consumer2.assignments().length === 2 + ) { + client.deleteTopic(topicName, function(deleteErr) { + t.ifError(deleteErr); + }); + } else { + tryDelete(); + } + }, 2000); + }; + tryDelete(); + }); + }); + }); + }); + + var checking = false; + setInterval(function() { + if (assignmentLostCount >= 2 && !checking) { + checking = true; + t.equal(assignmentLostCount, 2); + client.deleteTopic(topicName2, function(deleteErr) { + // Cleanup topics + t.ifError(deleteErr); + cb(); + }); + } + }, 2000); + }); + + }); + + describe('incrementalAssign and incrementUnassign', function() { + + var topic = 'test7'; + var consumer; + beforeEach(function(done) { + consumer = new KafkaConsumer(gcfg, {}); + + consumer.connect({ timeout: 2000 }, function(err, info) { + t.ifError(err); + done(); + }); + + eventListener(consumer); + }); + + afterEach(function(done) { + consumer.disconnect(function() { + done(); + }); + }); + + it('should be able to assign an assignment', function() { + t.equal(0, consumer.assignments().length); + var assignments = [{ topic:topic, partition:0 }]; + consumer.assign(assignments); + t.equal(1, consumer.assignments().length); + t.equal(0, consumer.assignments()[0].partition); + t.equal(0, consumer.subscription().length); + + var additionalAssignment = [{ topic:topic, partition:1 }]; + consumer.incrementalAssign(additionalAssignment); + t.equal(2, consumer.assignments().length); + t.equal(0, consumer.assignments()[0].partition); + t.equal(1, consumer.assignments()[1].partition); + t.equal(0, consumer.subscription().length); + }); + + it('should be able to revoke an assignment', function() { + t.equal(0, consumer.assignments().length); + var assignments = [{ topic:topic, partition:0 }, { topic:topic, partition:1 }, { topic:topic, partition:2 }]; + consumer.assign(assignments); + t.equal(3, consumer.assignments().length); + t.equal(0, consumer.assignments()[0].partition); + t.equal(1, consumer.assignments()[1].partition); + t.equal(2, consumer.assignments()[2].partition); + t.equal(0, consumer.subscription().length); + + var revokedAssignments = [{ topic:topic, partition:2 }]; + consumer.incrementalUnassign(revokedAssignments); + t.equal(2, consumer.assignments().length); + t.equal(0, consumer.assignments()[0].partition); + t.equal(1, consumer.assignments()[1].partition); + t.equal(0, consumer.subscription().length); + }); + + }); + + describe('rebalance', function() { + + var topic = 'test7'; + var grp = 'kafka-mocha-grp-' + crypto.randomBytes(20).toString('hex'); + var consumer1; + var consumer2; + var counter = 0; + var reblance_gcfg = { + 'bootstrap.servers': kafkaBrokerList, + 'group.id': grp, + 'debug': 'all', + 'enable.auto.commit': false, + 'heartbeat.interval.ms': 200, + 'rebalance_cb': true + }; + + it('should be able reblance using the eager strategy', function(done) { + this.timeout(20000); + + var isStarted = false; + reblance_gcfg['partition.assignment.strategy'] = 'range,roundrobin'; + + reblance_gcfg['client.id'] = '1'; + consumer1 = new KafkaConsumer(reblance_gcfg, {}); + reblance_gcfg['client.id'] = '2'; + consumer2 = new KafkaConsumer(reblance_gcfg, {}); + + eventListener(consumer1); + eventListener(consumer2); + + consumer1.connect({ timeout: 2000 }, function(err, info) { + t.ifError(err); + consumer1.subscribe([topic]); + consumer1.on('rebalance', function(err, assignment) { + counter++; + if (!isStarted) { + isStarted = true; + consumer2.connect({ timeout: 2000 }, function(err, info) { + consumer2.subscribe([topic]); + consumer2.consume(); + consumer2.on('rebalance', function(err, assignment) { + counter++; + }); + }); + } + }); + consumer1.consume(); + }); + + setTimeout(function() { + t.deepStrictEqual(consumer1.assignments(), [ { topic: topic, partition: 0, offset: -1000 } ]); + t.deepStrictEqual(consumer2.assignments(), [ { topic: topic, partition: 1, offset: -1000 } ]); + t.equal(counter, 4); + consumer1.disconnect(function() { + consumer2.disconnect(function() { + done(); + }); + }); + }, 9000); + }); + + it('should be able reblance using the cooperative incremental strategy', function(cb) { + this.timeout(20000); + var isStarted = false; + reblance_gcfg['partition.assignment.strategy'] = 'cooperative-sticky'; + reblance_gcfg['client.id'] = '1'; + consumer1 = new KafkaConsumer(reblance_gcfg, {}); + reblance_gcfg['client.id'] = '2'; + consumer2 = new KafkaConsumer(reblance_gcfg, {}); + + eventListener(consumer1); + eventListener(consumer2); + + consumer1.connect({ timeout: 2000 }, function(err, info) { + t.ifError(err); + consumer1.subscribe([topic]); + consumer1.on('rebalance', function(err, assignment) { + if (!isStarted) { + isStarted = true; + consumer2.connect({ timeout: 2000 }, function(err, info) { + consumer2.subscribe([topic]); + consumer2.consume(); + consumer2.on('rebalance', function(err, assignment) { + counter++; + }); + }); + } + }); + consumer1.consume(); + }); + + setTimeout(function() { + t.equal(consumer1.assignments().length, 1); + t.equal(consumer2.assignments().length, 1); + t.equal(counter, 8); + + consumer1.disconnect(function() { + consumer2.disconnect(function() { + cb(); + }); + }); + }, 9000); + }); + + }); + describe('disconnect', function() { + + var topic = 'test'; var tcfg = { 'auto.offset.reset': 'earliest' }; it('should happen gracefully', function(cb) { diff --git a/index.d.ts b/index.d.ts index d7ce7e61e985ce46ceae2c10329d6448cc487dca..2c7b9a3d40b0547209c2cffe1f4e62d9573ab617 100644 --- a/index.d.ts +++ b/index.d.ts @@ -223,6 +223,12 @@ export class KafkaConsumer extends Client { consume(cb: (err: LibrdKafkaError, messages: Message[]) => void): void; consume(): void; + incrementalAssign(assigments: Assignment[]): this; + + incrementalUnassign(assignments: Assignment[]): this; + + assignmentLost(): boolean; + getWatermarkOffsets(topic: string, partition: number): WatermarkOffsets; offsetsStore(topicPartitions: TopicPartitionOffset[]): any; diff --git a/lib/index.js b/lib/index.js index e2e8a9c899700e56b3ddeff84e67ad97206ccabf..ba6d678275101170aedc694fedc489f479b5d05e 100644 --- a/lib/index.js +++ b/lib/index.js @@ -7,7 +7,7 @@ * of the MIT license. See the LICENSE.txt file for details. */ -var KafkaConsumer = require('./kafka-consumer'); +var KafkaConsumer = require('./kafka-consumer').KafkaConsumer; var Producer = require('./producer'); var HighLevelProducer = require('./producer/high-level-producer'); var error = require('./error'); diff --git a/lib/kafka-consumer.js b/lib/kafka-consumer.js index c479240f3bab17c68e38623b89ef67546ba59122..97e8458ab28757d013172de31e238ee2ee3f6ebc 100644 --- a/lib/kafka-consumer.js +++ b/lib/kafka-consumer.js @@ -8,8 +8,6 @@ */ 'use strict'; -module.exports = KafkaConsumer; - var Client = require('./client'); var util = require('util'); var Kafka = require('../librdkafka'); @@ -21,6 +19,48 @@ var DEFAULT_CONSUME_LOOP_TIMEOUT_DELAY = 500; var DEFAULT_CONSUME_TIME_OUT = 1000; util.inherits(KafkaConsumer, Client); +var eagerRebalanceCallback = function(err, assignment) { + // Create the librdkafka error + err = LibrdKafkaError.create(err); + // Emit the event + this.emit('rebalance', err, assignment); + + // That's it + try { + if (err.code === LibrdKafkaError.codes.ERR__ASSIGN_PARTITIONS) { + this.assign(assignment); + } else if (err.code === LibrdKafkaError.codes.ERR__REVOKE_PARTITIONS) { + this.unassign(); + } + } catch (e) { + // Ignore exceptions if we are not connected + if (this.isConnected()) { + this.emit('rebalance.error', e); + } + } +}; + +var cooperativeRebalanceCallback = function(err, assignment) { + // Create the librdkafka error + err = LibrdKafkaError.create(err); + // Emit the event + this.emit('rebalance', err, assignment); + + // That's it + try { + if (err.code === LibrdKafkaError.codes.ERR__ASSIGN_PARTITIONS) { + this.incrementalAssign(assignment); + } else if (err.code === LibrdKafkaError.codes.ERR__REVOKE_PARTITIONS) { + this.incrementalUnassign(assignment); + } + } catch (e) { + // Ignore exceptions if we are not connected + if (this.isConnected()) { + this.emit('rebalance.error', e); + } + } +}; + /** * KafkaConsumer class for reading messages from Kafka * @@ -52,26 +92,10 @@ function KafkaConsumer(conf, topicConf) { // If rebalance is undefined we don't want any part of this if (onRebalance && typeof onRebalance === 'boolean') { - conf.rebalance_cb = function(err, assignment) { - // Create the librdkafka error - err = LibrdKafkaError.create(err); - // Emit the event - self.emit('rebalance', err, assignment); - - // That's it - try { - if (err.code === -175 /*ERR__ASSIGN_PARTITIONS*/) { - self.assign(assignment); - } else if (err.code === -174 /*ERR__REVOKE_PARTITIONS*/) { - self.unassign(); - } - } catch (e) { - // Ignore exceptions if we are not connected - if (self.isConnected()) { - self.emit('rebalance.error', e); - } - } - }; + conf.rebalance_cb = + conf['partition.assignment.strategy'] === 'cooperative-sticky' ? + cooperativeRebalanceCallback.bind(this) : + eagerRebalanceCallback.bind(this); } else if (onRebalance && typeof onRebalance === 'function') { /* * Once this is opted in to, that's it. It's going to manually rebalance @@ -79,13 +103,13 @@ function KafkaConsumer(conf, topicConf) { * a way to override them. */ - conf.rebalance_cb = function(err, assignment) { - // Create the librdkafka error - err = err ? LibrdKafkaError.create(err) : undefined; + conf.rebalance_cb = function(err, assignment) { + // Create the librdkafka error + err = err ? LibrdKafkaError.create(err) : undefined; - self.emit('rebalance', err, assignment); - onRebalance.call(self, err, assignment); - }; + self.emit('rebalance', err, assignment); + onRebalance.call(self, err, assignment); + }; } // Same treatment for offset_commit_cb @@ -264,6 +288,19 @@ KafkaConsumer.prototype.assign = function(assignments) { return this; }; +/** + * Incremental assign the consumer specific partitions and topics + * + * @param {array} assignments - Assignments array. Should contain + * objects with topic and partition set. + * @return {Client} - Returns itself + */ + +KafkaConsumer.prototype.incrementalAssign = function(assignments) { + this._client.incrementalAssign(TopicPartition.map(assignments)); + return this; +}; + /** * Unassign the consumer from its assigned partitions and topics. * @@ -275,6 +312,34 @@ KafkaConsumer.prototype.unassign = function() { return this; }; +/** + * Incremental unassign the consumer from specific partitions and topics + * + * @param {array} assignments - Assignments array. Should contain + * objects with topic and partition set. + * @return {Client} - Returns itself + */ + +KafkaConsumer.prototype.incrementalUnassign = function(assignments) { + this._client.incrementalUnassign(TopicPartition.map(assignments)); + return this; +}; + +/** + * Get the assignment lost state. + * Examples for an assignment to be lost: + * - Unsuccessful heartbeats + * - Unknown member id during heartbeats + * - Illegal generation during heartbeats + * - Static consumer fenced by other consumer with same group.instance.id + * - Max. poll interval exceeded + * - Subscribed topic(s) no longer exist during meta data updates + * @return {boolean} - Returns true if the assignment is lost + */ + +KafkaConsumer.prototype.assignmentLost = function() { + return this._client.assignmentLost(); +}; /** * Get the assignments for the consumer @@ -654,3 +719,9 @@ KafkaConsumer.prototype.pause = function(topicPartitions) { return this._errorWrap(this._client.pause(topicPartitions), true); }; + +module.exports = { + KafkaConsumer: KafkaConsumer, + eagerRebalanceCallback: eagerRebalanceCallback, + cooperativeRebalanceCallback: cooperativeRebalanceCallback +}; diff --git a/run_docker.sh b/run_docker.sh index a6aadbd64609e5d5ae1a80205aac7ce3a49d9345..f817aa976c83b74670c7464099679eb32a390051 100755 --- a/run_docker.sh +++ b/run_docker.sh @@ -21,14 +21,16 @@ topics=( "test4" "test5" "test6" + "test7" ) # Run docker-compose exec to make them for topic in "${topics[@]}" do echo "Making topic $topic" + [[ "$topic" != "test7" ]] && partitions=1 || partitions=2 until docker-compose exec kafka \ - kafka-topics --create --topic $topic --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:2181 + kafka-topics --create --topic $topic --partitions $partitions --replication-factor 1 --if-not-exists --bootstrap-server localhost:9092 do topic_result="$?" if [ "$topic_result" == "1" ]; then diff --git a/src/kafka-consumer.cc b/src/kafka-consumer.cc index 019b0cb6478756120efe9a5f6f1bb4182b4af4ea..3895407788ae31ae38d7707eb63528ebac6e3b24 100644 --- a/src/kafka-consumer.cc +++ b/src/kafka-consumer.cc @@ -179,6 +179,32 @@ Baton KafkaConsumer::Assign(std::vector partitions) { return Baton(errcode); } +Baton KafkaConsumer::IncrementalAssign( + std::vector partitions) { + if (!IsConnected()) { + return Baton(RdKafka::ERR__STATE, "KafkaConsumer is disconnected"); + } + + RdKafka::KafkaConsumer* consumer = + dynamic_cast(m_client); + + RdKafka::Error *e = consumer->incremental_assign(partitions); + + if (e) { + RdKafka::ErrorCode errcode = e->code(); + delete e; + return Baton(errcode); + } + + m_partition_cnt += partitions.size(); + for (auto i = partitions.begin(); i != partitions.end(); ++i) { + m_partitions.push_back(*i); + } + partitions.clear(); + + return Baton(RdKafka::ERR_NO_ERROR); +} + Baton KafkaConsumer::Unassign() { if (!IsClosing() && !IsConnected()) { return Baton(RdKafka::ERR__STATE); @@ -195,12 +221,46 @@ Baton KafkaConsumer::Unassign() { // Destroy the old list of partitions since we are no longer using it RdKafka::TopicPartition::destroy(m_partitions); + m_partitions.clear(); m_partition_cnt = 0; return Baton(RdKafka::ERR_NO_ERROR); } +Baton KafkaConsumer::IncrementalUnassign( + std::vector partitions) { + if (!IsClosing() && !IsConnected()) { + return Baton(RdKafka::ERR__STATE); + } + + RdKafka::KafkaConsumer* consumer = + dynamic_cast(m_client); + + RdKafka::Error *e = consumer->incremental_unassign(partitions); + if (e) { + RdKafka::ErrorCode errcode = e->code(); + delete e; + return Baton(errcode); + } + + // Destroy the old list of partitions since we are no longer using it + RdKafka::TopicPartition::destroy(partitions); + + m_partitions.erase( + std::remove_if( + m_partitions.begin(), + m_partitions.end(), + [&partitions](RdKafka::TopicPartition *x) -> bool { + return std::find( + partitions.begin(), + partitions.end(), x) != partitions.end(); + }), + m_partitions.end()); + m_partition_cnt -= partitions.size(); + return Baton(RdKafka::ERR_NO_ERROR); +} + Baton KafkaConsumer::Commit(std::vector toppars) { if (!IsConnected()) { return Baton(RdKafka::ERR__STATE); @@ -469,6 +529,12 @@ Baton KafkaConsumer::RefreshAssignments() { } } +bool KafkaConsumer::AssignmentLost() { + RdKafka::KafkaConsumer* consumer = + dynamic_cast(m_client); + return consumer->assignment_lost(); +} + std::string KafkaConsumer::Name() { if (!IsConnected()) { return std::string(""); @@ -527,8 +593,11 @@ void KafkaConsumer::Init(v8::Local exports) { Nan::SetPrototypeMethod(tpl, "committed", NodeCommitted); Nan::SetPrototypeMethod(tpl, "position", NodePosition); Nan::SetPrototypeMethod(tpl, "assign", NodeAssign); + Nan::SetPrototypeMethod(tpl, "incrementalAssign", NodeIncrementalAssign); Nan::SetPrototypeMethod(tpl, "unassign", NodeUnassign); + Nan::SetPrototypeMethod(tpl, "incrementalUnassign", NodeIncrementalUnassign); Nan::SetPrototypeMethod(tpl, "assignments", NodeAssignments); + Nan::SetPrototypeMethod(tpl, "assignmentLost", NodeAssignmentLost); Nan::SetPrototypeMethod(tpl, "commit", NodeCommit); Nan::SetPrototypeMethod(tpl, "commitSync", NodeCommitSync); @@ -759,6 +828,64 @@ NAN_METHOD(KafkaConsumer::NodeAssign) { info.GetReturnValue().Set(Nan::True()); } +NAN_METHOD(KafkaConsumer::NodeIncrementalAssign) { + Nan::HandleScope scope; + + if (info.Length() < 1 || !info[0]->IsArray()) { + // Just throw an exception + return Nan::ThrowError("Need to specify an array of partitions"); + } + + v8::Local partitions = info[0].As(); + std::vector topic_partitions; + + for (unsigned int i = 0; i < partitions->Length(); ++i) { + v8::Local partition_obj_value; + if (!( + Nan::Get(partitions, i).ToLocal(&partition_obj_value) && + partition_obj_value->IsObject())) { + Nan::ThrowError("Must pass topic-partition objects"); + } + + v8::Local partition_obj = partition_obj_value.As(); + + // Got the object + int64_t partition = GetParameter(partition_obj, "partition", -1); + std::string topic = GetParameter(partition_obj, "topic", ""); + + if (!topic.empty()) { + RdKafka::TopicPartition* part; + + if (partition < 0) { + part = Connection::GetPartition(topic); + } else { + part = Connection::GetPartition(topic, partition); + } + + // Set the default value to offset invalid. If provided, we will not set + // the offset. + int64_t offset = GetParameter( + partition_obj, "offset", RdKafka::Topic::OFFSET_INVALID); + if (offset != RdKafka::Topic::OFFSET_INVALID) { + part->set_offset(offset); + } + + topic_partitions.push_back(part); + } + } + + KafkaConsumer* consumer = ObjectWrap::Unwrap(info.This()); + + // Hand over the partitions to the consumer. + Baton b = consumer->IncrementalAssign(topic_partitions); + + if (b.err() != RdKafka::ERR_NO_ERROR) { + Nan::ThrowError(RdKafka::err2str(b.err()).c_str()); + } + + info.GetReturnValue().Set(Nan::True()); +} + NAN_METHOD(KafkaConsumer::NodeUnassign) { Nan::HandleScope scope; @@ -779,6 +906,71 @@ NAN_METHOD(KafkaConsumer::NodeUnassign) { info.GetReturnValue().Set(Nan::True()); } +NAN_METHOD(KafkaConsumer::NodeIncrementalUnassign) { + Nan::HandleScope scope; + + if (info.Length() < 1 || !info[0]->IsArray()) { + // Just throw an exception + return Nan::ThrowError("Need to specify an array of partitions"); + } + + v8::Local partitions = info[0].As(); + std::vector topic_partitions; + + for (unsigned int i = 0; i < partitions->Length(); ++i) { + v8::Local partition_obj_value; + if (!( + Nan::Get(partitions, i).ToLocal(&partition_obj_value) && + partition_obj_value->IsObject())) { + Nan::ThrowError("Must pass topic-partition objects"); + } + + v8::Local partition_obj = partition_obj_value.As(); + + // Got the object + int64_t partition = GetParameter(partition_obj, "partition", -1); + std::string topic = GetParameter(partition_obj, "topic", ""); + + if (!topic.empty()) { + RdKafka::TopicPartition* part; + + if (partition < 0) { + part = Connection::GetPartition(topic); + } else { + part = Connection::GetPartition(topic, partition); + } + + // Set the default value to offset invalid. If provided, we will not set + // the offset. + int64_t offset = GetParameter( + partition_obj, "offset", RdKafka::Topic::OFFSET_INVALID); + if (offset != RdKafka::Topic::OFFSET_INVALID) { + part->set_offset(offset); + } + + topic_partitions.push_back(part); + } + } + + KafkaConsumer* consumer = ObjectWrap::Unwrap(info.This()); + // Hand over the partitions to the consumer. + Baton b = consumer->IncrementalUnassign(topic_partitions); + + if (b.err() != RdKafka::ERR_NO_ERROR) { + Nan::ThrowError(RdKafka::err2str(b.err()).c_str()); + } + + info.GetReturnValue().Set(Nan::True()); +} + +NAN_METHOD(KafkaConsumer::NodeAssignmentLost) { + Nan::HandleScope scope; + + KafkaConsumer* consumer = ObjectWrap::Unwrap(info.This()); + bool b = consumer->AssignmentLost(); + info.GetReturnValue().Set(Nan::New(b)); +} + NAN_METHOD(KafkaConsumer::NodeUnsubscribe) { Nan::HandleScope scope; diff --git a/src/kafka-consumer.h b/src/kafka-consumer.h index c91590ecc5d47c1d7a2a93c3e46b4b4657525df0..43e016db4ec47121051cb282f718a2b3156aacd4 100644 --- a/src/kafka-consumer.h +++ b/src/kafka-consumer.h @@ -72,7 +72,10 @@ class KafkaConsumer : public Connection { int AssignedPartitionCount(); Baton Assign(std::vector); + Baton IncrementalAssign(std::vector); Baton Unassign(); + Baton IncrementalUnassign(std::vector); + bool AssignmentLost(); Baton Seek(const RdKafka::TopicPartition &partition, int timeout_ms); @@ -105,7 +108,10 @@ class KafkaConsumer : public Connection { static NAN_METHOD(NodeSubscribe); static NAN_METHOD(NodeDisconnect); static NAN_METHOD(NodeAssign); + static NAN_METHOD(NodeIncrementalAssign); static NAN_METHOD(NodeUnassign); + static NAN_METHOD(NodeIncrementalUnassign); + static NAN_METHOD(NodeAssignmentLost); static NAN_METHOD(NodeAssignments); static NAN_METHOD(NodeUnsubscribe); static NAN_METHOD(NodeCommit); diff --git a/test/consumer.spec.js b/test/consumer.spec.js index 40b52ee4e1c718890f43b91adfb543319d5cc342..5e1a5655be0d2598163478aaaae936213c3bf27c 100644 --- a/test/consumer.spec.js +++ b/test/consumer.spec.js @@ -77,7 +77,7 @@ module.exports = { }); }, 'has necessary bindings for librdkafka 1:1 binding': function() { - var methods = ['assign', 'unassign', 'subscribe']; + var methods = ['assign', 'unassign', 'subscribe', 'incrementalAssign', 'incrementalUnassign', 'assignmentLost']; methods.forEach(function(m) { t.equal(typeof(client[m]), 'function', 'Client is missing ' + m + ' method'); }); diff --git a/test/kafka-consumer.spec.js b/test/kafka-consumer.spec.js index 0f4de520ed6b8a06dfe355e0bb9091273def98a5..ada72a7e621ea5433f194ab3d22eef326082c155 100644 --- a/test/kafka-consumer.spec.js +++ b/test/kafka-consumer.spec.js @@ -7,7 +7,8 @@ * of the MIT license. See the LICENSE.txt file for details. */ -var KafkaConsumer = require('../lib/kafka-consumer'); +var KafkaConsumer = require('../lib/kafka-consumer').KafkaConsumer; + var t = require('assert'); var client; diff --git a/deps/librdkafka/src/rdkafka_partition.h b/deps/librdkafka/src/rdkafka_partition.h index f9dd686423..aef704b95f 100644 --- a/deps/librdkafka/src/rdkafka_partition.h +++ b/deps/librdkafka/src/rdkafka_partition.h @@ -68,24 +68,35 @@ struct rd_kafka_toppar_err { * last msg sequence */ }; - +/** + * @brief Fetchpos comparator, only offset is compared. + */ +static RD_UNUSED RD_INLINE int +rd_kafka_fetch_pos_cmp_offset(const rd_kafka_fetch_pos_t *a, + const rd_kafka_fetch_pos_t *b) { + if (a->offset < b->offset) + return -1; + else if (a->offset > b->offset) + return 1; + else + return 0; +} /** * @brief Fetchpos comparator, leader epoch has precedence. + * iff both values are not null. */ static RD_UNUSED RD_INLINE int rd_kafka_fetch_pos_cmp(const rd_kafka_fetch_pos_t *a, const rd_kafka_fetch_pos_t *b) { + if (a->leader_epoch == -1 || b->leader_epoch == -1) + return rd_kafka_fetch_pos_cmp_offset(a, b); if (a->leader_epoch < b->leader_epoch) return -1; else if (a->leader_epoch > b->leader_epoch) return 1; - else if (a->offset < b->offset) - return -1; - else if (a->offset > b->offset) - return 1; else - return 0; + return rd_kafka_fetch_pos_cmp_offset(a, b); }