0
0
mirror of https://github.com/PostHog/posthog.git synced 2024-11-24 18:07:17 +01:00
posthog/plugin-server/patches/node-rdkafka@2.17.0.patch

1128 lines
36 KiB
Diff

diff --git a/Oops.rej b/Oops.rej
new file mode 100644
index 0000000000000000000000000000000000000000..328fc546fcb400745783b3562f1cb1cb055e1804
--- /dev/null
+++ b/Oops.rej
@@ -0,0 +1,26 @@
+@@ -1,25 +0,0 @@
+-# This workflow will run tests using node and then publish a package to GitHub Packages when a release is created
+-# For more information see: https://help.github.com/actions/language-and-framework-guides/publishing-nodejs-packages
+-
+-name: Publish node-rdkafka
+-
+-on:
+- release:
+- types: [created]
+-
+-jobs:
+- publish-npm:
+- runs-on: ubuntu-latest
+- steps:
+- - uses: actions/checkout@v3
+- with:
+- submodules: recursive
+- - uses: actions/setup-node@v3
+- with:
+- node-version: 18
+- registry-url: https://registry.npmjs.org/
+- cache: "npm"
+- - run: npm ci
+- - run: npm publish
+- env:
+- NODE_AUTH_TOKEN: ${{secrets.NPM_TOKEN}}
diff --git a/docker-compose.yml b/docker-compose.yml
index abe29df25c7312382074b3e15289cb862a340247..8a12f135b4f96e5a0dd25e7c21adb2b3b0e644fa 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -1,23 +1,51 @@
---
-zookeeper:
- image: confluentinc/cp-zookeeper
- ports:
- - "2181:2181"
- environment:
- ZOOKEEPER_CLIENT_PORT: 2181
- ZOOKEEPER_TICK_TIME: 2000
-kafka:
- image: confluentinc/cp-kafka
- links:
- - zookeeper
- ports:
- - "9092:9092"
- environment:
- KAFKA_BROKER_ID: 1
- KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
- KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092'
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
- KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
- KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
- KAFKA_DEFAULT_REPLICATION_FACTOR: 1
- KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
+version: '2'
+services:
+ zookeeper:
+ image: confluentinc/cp-zookeeper
+ ports:
+ - "2181:2181"
+ networks:
+ - localnet
+ environment:
+ ZOOKEEPER_CLIENT_PORT: 2181
+ ZOOKEEPER_TICK_TIME: 2000
+ kafka:
+ image: confluentinc/cp-kafka
+ ports:
+ - 9092:9092
+ - 9997:9997
+ networks:
+ - localnet
+ depends_on:
+ - zookeeper
+ environment:
+ KAFKA_BROKER_ID: 1
+ KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
+ KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
+ # KAFKA_LISTENERS: PLAINTEXT://kafka0:29092,PLAINTEXT_HOST://localhost:9092
+ KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
+ KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
+ KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
+ KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
+ KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
+ KAFKA_DEFAULT_REPLICATION_FACTOR: 1
+ KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
+ kafka-ui:
+ container_name: kafka-ui
+ image: provectuslabs/kafka-ui:latest
+ ports:
+ - 8080:8080
+ networks:
+ - localnet
+ depends_on:
+ - zookeeper
+ - kafka
+ environment:
+ KAFKA_CLUSTERS_0_NAME: local
+ KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
+ KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
+networks:
+ localnet:
+ attachable: true
+
diff --git a/e2e/both.spec.js b/e2e/both.spec.js
index a8289ec319239fb05b1f321bff78a7c9e267f1cf..85ca5ef64264a903a30d5d4bac31f6b1a3792102 100644
--- a/e2e/both.spec.js
+++ b/e2e/both.spec.js
@@ -163,7 +163,7 @@ describe('Consumer/Producer', function() {
});
});
-
+
it('should return ready messages on partition EOF', function(done) {
crypto.randomBytes(4096, function(ex, buffer) {
producer.setPollInterval(10);
diff --git a/e2e/consumer.spec.js b/e2e/consumer.spec.js
index a167483f1e0ea15c4edcb368e36640b4349574e8..38fcfd7464afb7df682b7b5f1fdb228b8d280a25 100644
--- a/e2e/consumer.spec.js
+++ b/e2e/consumer.spec.js
@@ -11,10 +11,12 @@ var crypto = require('crypto');
var eventListener = require('./listener');
+var cooperativeRebalanceCallback = require('../lib/kafka-consumer').cooperativeRebalanceCallback;
var KafkaConsumer = require('../').KafkaConsumer;
+var AdminClient = require('../').AdminClient;
+var LibrdKafkaError = require('../lib/error');
var kafkaBrokerList = process.env.KAFKA_HOST || 'localhost:9092';
-var topic = 'test';
describe('Consumer', function() {
var gcfg;
@@ -31,6 +33,7 @@ describe('Consumer', function() {
});
describe('commit', function() {
+ var topic = 'test';
var consumer;
beforeEach(function(done) {
consumer = new KafkaConsumer(gcfg, {});
@@ -61,6 +64,7 @@ describe('Consumer', function() {
});
describe('committed and position', function() {
+ var topic = 'test';
var consumer;
beforeEach(function(done) {
consumer = new KafkaConsumer(gcfg, {});
@@ -95,6 +99,7 @@ describe('Consumer', function() {
});
it('after assign, should get committed array without offsets ', function(done) {
+ var topic = 'test';
consumer.assign([{topic:topic, partition:0}]);
// Defer this for a second
setTimeout(function() {
@@ -110,6 +115,7 @@ describe('Consumer', function() {
});
it('after assign and commit, should get committed offsets', function(done) {
+ var topic = 'test';
consumer.assign([{topic:topic, partition:0}]);
consumer.commitSync({topic:topic, partition:0, offset:1000});
consumer.committed(null, 1000, function(err, committed) {
@@ -123,6 +129,7 @@ describe('Consumer', function() {
});
it('after assign, before consume, position should return an array without offsets', function(done) {
+ var topic = 'test';
consumer.assign([{topic:topic, partition:0}]);
var position = consumer.position();
t.equal(Array.isArray(position), true, 'Position should be an array');
@@ -147,6 +154,7 @@ describe('Consumer', function() {
});
describe('seek and positioning', function() {
+ var topic = 'test';
var consumer;
beforeEach(function(done) {
consumer = new KafkaConsumer(gcfg, {});
@@ -195,6 +203,7 @@ describe('Consumer', function() {
describe('subscribe', function() {
+ var topic = 'test';
var consumer;
beforeEach(function(done) {
consumer = new KafkaConsumer(gcfg, {});
@@ -232,6 +241,7 @@ describe('Consumer', function() {
describe('assign', function() {
+ var topic = 'test';
var consumer;
beforeEach(function(done) {
consumer = new KafkaConsumer(gcfg, {});
@@ -266,7 +276,346 @@ describe('Consumer', function() {
});
});
+ describe('assignmentLost', function() {
+ function pollForTopic(client, topicName, maxTries, tryDelay, cb, customCondition) {
+ var tries = 0;
+
+ function getTopicIfExists(innerCb) {
+ client.getMetadata({
+ topic: topicName,
+ }, function(metadataErr, metadata) {
+ if (metadataErr) {
+ cb(metadataErr);
+ return;
+ }
+
+ var topicFound = metadata.topics.filter(function(topicObj) {
+ var foundTopic = topicObj.name === topicName;
+
+ // If we have a custom condition for "foundedness", do it here after
+ // we make sure we are operating on the correct topic
+ if (foundTopic && customCondition) {
+ return customCondition(topicObj);
+ }
+ return foundTopic;
+ });
+
+ if (topicFound.length >= 1) {
+ innerCb(null, topicFound[0]);
+ return;
+ }
+
+ innerCb(new Error('Could not find topic ' + topicName));
+ });
+ }
+
+ function maybeFinish(err, obj) {
+ if (err) {
+ queueNextTry();
+ return;
+ }
+
+ cb(null, obj);
+ }
+
+ function queueNextTry() {
+ tries += 1;
+ if (tries < maxTries) {
+ setTimeout(function() {
+ getTopicIfExists(maybeFinish);
+ }, tryDelay);
+ } else {
+ cb(new Error('Exceeded max tries of ' + maxTries));
+ }
+ }
+
+ queueNextTry();
+ }
+
+ var client = AdminClient.create({
+ 'client.id': 'kafka-test',
+ 'metadata.broker.list': kafkaBrokerList
+ });
+ var consumer1;
+ var consumer2;
+ var assignmentLostCount = 0;
+ var grp = 'kafka-mocha-grp-' + crypto.randomBytes(20).toString('hex');
+ var assignment_lost_gcfg = {
+ 'bootstrap.servers': kafkaBrokerList,
+ 'group.id': grp,
+ 'debug': 'all',
+ 'enable.auto.commit': false,
+ 'session.timeout.ms': 10000,
+ 'heartbeat.interval.ms': 1000,
+ 'auto.offset.reset': 'earliest',
+ 'topic.metadata.refresh.interval.ms': 3000,
+ 'partition.assignment.strategy': 'cooperative-sticky',
+ 'rebalance_cb': function(err, assignment) {
+ if (
+ err.code === LibrdKafkaError.codes.ERR__REVOKE_PARTITIONS &&
+ this.assignmentLost()
+ ) {
+ assignmentLostCount++;
+ }
+ cooperativeRebalanceCallback.call(this, err, assignment);
+ }
+ };
+
+ beforeEach(function(done) {
+ assignment_lost_gcfg['client.id'] = 1;
+ consumer1 = new KafkaConsumer(assignment_lost_gcfg, {});
+ eventListener(consumer1);
+ consumer1.connect({ timeout: 2000 }, function(err, info) {
+ t.ifError(err);
+ });
+ assignment_lost_gcfg['client.id'] = 2;
+ consumer2 = new KafkaConsumer(assignment_lost_gcfg, {});
+ eventListener(consumer2);
+ consumer2.connect({ timeout: 2000 }, function(err, info) {
+ t.ifError(err);
+ done();
+ });
+ });
+
+ afterEach(function(done) {
+ consumer1.disconnect(function() {
+ consumer2.disconnect(function() {
+ done();
+ });
+ });
+ });
+
+ it('should return false if not lost', function() {
+ t.equal(false, consumer1.assignmentLost());
+ });
+
+ it('should be lost if topic gets deleted', function(cb) {
+ this.timeout(100000);
+
+ var time = Date.now();
+ var topicName = 'consumer-assignment-lost-test-topic-' + time;
+ var topicName2 = 'consumer-assignment-lost-test-topic2-' + time;
+ var deleting = false;
+
+ client.createTopic({
+ topic: topicName,
+ num_partitions: 2,
+ replication_factor: 1
+ }, function(err) {
+ pollForTopic(consumer1, topicName, 10, 1000, function(err) {
+ t.ifError(err);
+ client.createTopic({
+ topic: topicName2,
+ num_partitions: 2,
+ replication_factor: 1
+ }, function(err) {
+ pollForTopic(consumer1, topicName2, 10, 1000, function(err) {
+ t.ifError(err);
+ consumer1.subscribe([topicName, topicName2]);
+ consumer2.subscribe([topicName, topicName2]);
+ consumer1.consume();
+ consumer2.consume();
+ var tryDelete = function() {
+ setTimeout(function() {
+ if(consumer1.assignments().length === 2 &&
+ consumer2.assignments().length === 2
+ ) {
+ client.deleteTopic(topicName, function(deleteErr) {
+ t.ifError(deleteErr);
+ });
+ } else {
+ tryDelete();
+ }
+ }, 2000);
+ };
+ tryDelete();
+ });
+ });
+ });
+ });
+
+ var checking = false;
+ setInterval(function() {
+ if (assignmentLostCount >= 2 && !checking) {
+ checking = true;
+ t.equal(assignmentLostCount, 2);
+ client.deleteTopic(topicName2, function(deleteErr) {
+ // Cleanup topics
+ t.ifError(deleteErr);
+ cb();
+ });
+ }
+ }, 2000);
+ });
+
+ });
+
+ describe('incrementalAssign and incrementUnassign', function() {
+
+ var topic = 'test7';
+ var consumer;
+ beforeEach(function(done) {
+ consumer = new KafkaConsumer(gcfg, {});
+
+ consumer.connect({ timeout: 2000 }, function(err, info) {
+ t.ifError(err);
+ done();
+ });
+
+ eventListener(consumer);
+ });
+
+ afterEach(function(done) {
+ consumer.disconnect(function() {
+ done();
+ });
+ });
+
+ it('should be able to assign an assignment', function() {
+ t.equal(0, consumer.assignments().length);
+ var assignments = [{ topic:topic, partition:0 }];
+ consumer.assign(assignments);
+ t.equal(1, consumer.assignments().length);
+ t.equal(0, consumer.assignments()[0].partition);
+ t.equal(0, consumer.subscription().length);
+
+ var additionalAssignment = [{ topic:topic, partition:1 }];
+ consumer.incrementalAssign(additionalAssignment);
+ t.equal(2, consumer.assignments().length);
+ t.equal(0, consumer.assignments()[0].partition);
+ t.equal(1, consumer.assignments()[1].partition);
+ t.equal(0, consumer.subscription().length);
+ });
+
+ it('should be able to revoke an assignment', function() {
+ t.equal(0, consumer.assignments().length);
+ var assignments = [{ topic:topic, partition:0 }, { topic:topic, partition:1 }, { topic:topic, partition:2 }];
+ consumer.assign(assignments);
+ t.equal(3, consumer.assignments().length);
+ t.equal(0, consumer.assignments()[0].partition);
+ t.equal(1, consumer.assignments()[1].partition);
+ t.equal(2, consumer.assignments()[2].partition);
+ t.equal(0, consumer.subscription().length);
+
+ var revokedAssignments = [{ topic:topic, partition:2 }];
+ consumer.incrementalUnassign(revokedAssignments);
+ t.equal(2, consumer.assignments().length);
+ t.equal(0, consumer.assignments()[0].partition);
+ t.equal(1, consumer.assignments()[1].partition);
+ t.equal(0, consumer.subscription().length);
+ });
+
+ });
+
+ describe('rebalance', function() {
+
+ var topic = 'test7';
+ var grp = 'kafka-mocha-grp-' + crypto.randomBytes(20).toString('hex');
+ var consumer1;
+ var consumer2;
+ var counter = 0;
+ var reblance_gcfg = {
+ 'bootstrap.servers': kafkaBrokerList,
+ 'group.id': grp,
+ 'debug': 'all',
+ 'enable.auto.commit': false,
+ 'heartbeat.interval.ms': 200,
+ 'rebalance_cb': true
+ };
+
+ it('should be able reblance using the eager strategy', function(done) {
+ this.timeout(20000);
+
+ var isStarted = false;
+ reblance_gcfg['partition.assignment.strategy'] = 'range,roundrobin';
+
+ reblance_gcfg['client.id'] = '1';
+ consumer1 = new KafkaConsumer(reblance_gcfg, {});
+ reblance_gcfg['client.id'] = '2';
+ consumer2 = new KafkaConsumer(reblance_gcfg, {});
+
+ eventListener(consumer1);
+ eventListener(consumer2);
+
+ consumer1.connect({ timeout: 2000 }, function(err, info) {
+ t.ifError(err);
+ consumer1.subscribe([topic]);
+ consumer1.on('rebalance', function(err, assignment) {
+ counter++;
+ if (!isStarted) {
+ isStarted = true;
+ consumer2.connect({ timeout: 2000 }, function(err, info) {
+ consumer2.subscribe([topic]);
+ consumer2.consume();
+ consumer2.on('rebalance', function(err, assignment) {
+ counter++;
+ });
+ });
+ }
+ });
+ consumer1.consume();
+ });
+
+ setTimeout(function() {
+ t.deepStrictEqual(consumer1.assignments(), [ { topic: topic, partition: 0, offset: -1000 } ]);
+ t.deepStrictEqual(consumer2.assignments(), [ { topic: topic, partition: 1, offset: -1000 } ]);
+ t.equal(counter, 4);
+ consumer1.disconnect(function() {
+ consumer2.disconnect(function() {
+ done();
+ });
+ });
+ }, 9000);
+ });
+
+ it('should be able reblance using the cooperative incremental strategy', function(cb) {
+ this.timeout(20000);
+ var isStarted = false;
+ reblance_gcfg['partition.assignment.strategy'] = 'cooperative-sticky';
+ reblance_gcfg['client.id'] = '1';
+ consumer1 = new KafkaConsumer(reblance_gcfg, {});
+ reblance_gcfg['client.id'] = '2';
+ consumer2 = new KafkaConsumer(reblance_gcfg, {});
+
+ eventListener(consumer1);
+ eventListener(consumer2);
+
+ consumer1.connect({ timeout: 2000 }, function(err, info) {
+ t.ifError(err);
+ consumer1.subscribe([topic]);
+ consumer1.on('rebalance', function(err, assignment) {
+ if (!isStarted) {
+ isStarted = true;
+ consumer2.connect({ timeout: 2000 }, function(err, info) {
+ consumer2.subscribe([topic]);
+ consumer2.consume();
+ consumer2.on('rebalance', function(err, assignment) {
+ counter++;
+ });
+ });
+ }
+ });
+ consumer1.consume();
+ });
+
+ setTimeout(function() {
+ t.equal(consumer1.assignments().length, 1);
+ t.equal(consumer2.assignments().length, 1);
+ t.equal(counter, 8);
+
+ consumer1.disconnect(function() {
+ consumer2.disconnect(function() {
+ cb();
+ });
+ });
+ }, 9000);
+ });
+
+ });
+
describe('disconnect', function() {
+
+ var topic = 'test';
var tcfg = { 'auto.offset.reset': 'earliest' };
it('should happen gracefully', function(cb) {
diff --git a/index.d.ts b/index.d.ts
index d7ce7e61e985ce46ceae2c10329d6448cc487dca..2c7b9a3d40b0547209c2cffe1f4e62d9573ab617 100644
--- a/index.d.ts
+++ b/index.d.ts
@@ -223,6 +223,12 @@ export class KafkaConsumer extends Client<KafkaConsumerEvents> {
consume(cb: (err: LibrdKafkaError, messages: Message[]) => void): void;
consume(): void;
+ incrementalAssign(assigments: Assignment[]): this;
+
+ incrementalUnassign(assignments: Assignment[]): this;
+
+ assignmentLost(): boolean;
+
getWatermarkOffsets(topic: string, partition: number): WatermarkOffsets;
offsetsStore(topicPartitions: TopicPartitionOffset[]): any;
diff --git a/lib/index.js b/lib/index.js
index e2e8a9c899700e56b3ddeff84e67ad97206ccabf..ba6d678275101170aedc694fedc489f479b5d05e 100644
--- a/lib/index.js
+++ b/lib/index.js
@@ -7,7 +7,7 @@
* of the MIT license. See the LICENSE.txt file for details.
*/
-var KafkaConsumer = require('./kafka-consumer');
+var KafkaConsumer = require('./kafka-consumer').KafkaConsumer;
var Producer = require('./producer');
var HighLevelProducer = require('./producer/high-level-producer');
var error = require('./error');
diff --git a/lib/kafka-consumer.js b/lib/kafka-consumer.js
index c479240f3bab17c68e38623b89ef67546ba59122..97e8458ab28757d013172de31e238ee2ee3f6ebc 100644
--- a/lib/kafka-consumer.js
+++ b/lib/kafka-consumer.js
@@ -8,8 +8,6 @@
*/
'use strict';
-module.exports = KafkaConsumer;
-
var Client = require('./client');
var util = require('util');
var Kafka = require('../librdkafka');
@@ -21,6 +19,48 @@ var DEFAULT_CONSUME_LOOP_TIMEOUT_DELAY = 500;
var DEFAULT_CONSUME_TIME_OUT = 1000;
util.inherits(KafkaConsumer, Client);
+var eagerRebalanceCallback = function(err, assignment) {
+ // Create the librdkafka error
+ err = LibrdKafkaError.create(err);
+ // Emit the event
+ this.emit('rebalance', err, assignment);
+
+ // That's it
+ try {
+ if (err.code === LibrdKafkaError.codes.ERR__ASSIGN_PARTITIONS) {
+ this.assign(assignment);
+ } else if (err.code === LibrdKafkaError.codes.ERR__REVOKE_PARTITIONS) {
+ this.unassign();
+ }
+ } catch (e) {
+ // Ignore exceptions if we are not connected
+ if (this.isConnected()) {
+ this.emit('rebalance.error', e);
+ }
+ }
+};
+
+var cooperativeRebalanceCallback = function(err, assignment) {
+ // Create the librdkafka error
+ err = LibrdKafkaError.create(err);
+ // Emit the event
+ this.emit('rebalance', err, assignment);
+
+ // That's it
+ try {
+ if (err.code === LibrdKafkaError.codes.ERR__ASSIGN_PARTITIONS) {
+ this.incrementalAssign(assignment);
+ } else if (err.code === LibrdKafkaError.codes.ERR__REVOKE_PARTITIONS) {
+ this.incrementalUnassign(assignment);
+ }
+ } catch (e) {
+ // Ignore exceptions if we are not connected
+ if (this.isConnected()) {
+ this.emit('rebalance.error', e);
+ }
+ }
+};
+
/**
* KafkaConsumer class for reading messages from Kafka
*
@@ -52,26 +92,10 @@ function KafkaConsumer(conf, topicConf) {
// If rebalance is undefined we don't want any part of this
if (onRebalance && typeof onRebalance === 'boolean') {
- conf.rebalance_cb = function(err, assignment) {
- // Create the librdkafka error
- err = LibrdKafkaError.create(err);
- // Emit the event
- self.emit('rebalance', err, assignment);
-
- // That's it
- try {
- if (err.code === -175 /*ERR__ASSIGN_PARTITIONS*/) {
- self.assign(assignment);
- } else if (err.code === -174 /*ERR__REVOKE_PARTITIONS*/) {
- self.unassign();
- }
- } catch (e) {
- // Ignore exceptions if we are not connected
- if (self.isConnected()) {
- self.emit('rebalance.error', e);
- }
- }
- };
+ conf.rebalance_cb =
+ conf['partition.assignment.strategy'] === 'cooperative-sticky' ?
+ cooperativeRebalanceCallback.bind(this) :
+ eagerRebalanceCallback.bind(this);
} else if (onRebalance && typeof onRebalance === 'function') {
/*
* Once this is opted in to, that's it. It's going to manually rebalance
@@ -79,13 +103,13 @@ function KafkaConsumer(conf, topicConf) {
* a way to override them.
*/
- conf.rebalance_cb = function(err, assignment) {
- // Create the librdkafka error
- err = err ? LibrdKafkaError.create(err) : undefined;
+ conf.rebalance_cb = function(err, assignment) {
+ // Create the librdkafka error
+ err = err ? LibrdKafkaError.create(err) : undefined;
- self.emit('rebalance', err, assignment);
- onRebalance.call(self, err, assignment);
- };
+ self.emit('rebalance', err, assignment);
+ onRebalance.call(self, err, assignment);
+ };
}
// Same treatment for offset_commit_cb
@@ -264,6 +288,19 @@ KafkaConsumer.prototype.assign = function(assignments) {
return this;
};
+/**
+ * Incremental assign the consumer specific partitions and topics
+ *
+ * @param {array} assignments - Assignments array. Should contain
+ * objects with topic and partition set.
+ * @return {Client} - Returns itself
+ */
+
+KafkaConsumer.prototype.incrementalAssign = function(assignments) {
+ this._client.incrementalAssign(TopicPartition.map(assignments));
+ return this;
+};
+
/**
* Unassign the consumer from its assigned partitions and topics.
*
@@ -275,6 +312,34 @@ KafkaConsumer.prototype.unassign = function() {
return this;
};
+/**
+ * Incremental unassign the consumer from specific partitions and topics
+ *
+ * @param {array} assignments - Assignments array. Should contain
+ * objects with topic and partition set.
+ * @return {Client} - Returns itself
+ */
+
+KafkaConsumer.prototype.incrementalUnassign = function(assignments) {
+ this._client.incrementalUnassign(TopicPartition.map(assignments));
+ return this;
+};
+
+/**
+ * Get the assignment lost state.
+ * Examples for an assignment to be lost:
+ * - Unsuccessful heartbeats
+ * - Unknown member id during heartbeats
+ * - Illegal generation during heartbeats
+ * - Static consumer fenced by other consumer with same group.instance.id
+ * - Max. poll interval exceeded
+ * - Subscribed topic(s) no longer exist during meta data updates
+ * @return {boolean} - Returns true if the assignment is lost
+ */
+
+KafkaConsumer.prototype.assignmentLost = function() {
+ return this._client.assignmentLost();
+};
/**
* Get the assignments for the consumer
@@ -654,3 +719,9 @@ KafkaConsumer.prototype.pause = function(topicPartitions) {
return this._errorWrap(this._client.pause(topicPartitions), true);
};
+
+module.exports = {
+ KafkaConsumer: KafkaConsumer,
+ eagerRebalanceCallback: eagerRebalanceCallback,
+ cooperativeRebalanceCallback: cooperativeRebalanceCallback
+};
diff --git a/run_docker.sh b/run_docker.sh
index a6aadbd64609e5d5ae1a80205aac7ce3a49d9345..f817aa976c83b74670c7464099679eb32a390051 100755
--- a/run_docker.sh
+++ b/run_docker.sh
@@ -21,14 +21,16 @@ topics=(
"test4"
"test5"
"test6"
+ "test7"
)
# Run docker-compose exec to make them
for topic in "${topics[@]}"
do
echo "Making topic $topic"
+ [[ "$topic" != "test7" ]] && partitions=1 || partitions=2
until docker-compose exec kafka \
- kafka-topics --create --topic $topic --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:2181
+ kafka-topics --create --topic $topic --partitions $partitions --replication-factor 1 --if-not-exists --bootstrap-server localhost:9092
do
topic_result="$?"
if [ "$topic_result" == "1" ]; then
diff --git a/src/kafka-consumer.cc b/src/kafka-consumer.cc
index 019b0cb6478756120efe9a5f6f1bb4182b4af4ea..3895407788ae31ae38d7707eb63528ebac6e3b24 100644
--- a/src/kafka-consumer.cc
+++ b/src/kafka-consumer.cc
@@ -179,6 +179,32 @@ Baton KafkaConsumer::Assign(std::vector<RdKafka::TopicPartition*> partitions) {
return Baton(errcode);
}
+Baton KafkaConsumer::IncrementalAssign(
+ std::vector<RdKafka::TopicPartition *> partitions) {
+ if (!IsConnected()) {
+ return Baton(RdKafka::ERR__STATE, "KafkaConsumer is disconnected");
+ }
+
+ RdKafka::KafkaConsumer* consumer =
+ dynamic_cast<RdKafka::KafkaConsumer*>(m_client);
+
+ RdKafka::Error *e = consumer->incremental_assign(partitions);
+
+ if (e) {
+ RdKafka::ErrorCode errcode = e->code();
+ delete e;
+ return Baton(errcode);
+ }
+
+ m_partition_cnt += partitions.size();
+ for (auto i = partitions.begin(); i != partitions.end(); ++i) {
+ m_partitions.push_back(*i);
+ }
+ partitions.clear();
+
+ return Baton(RdKafka::ERR_NO_ERROR);
+}
+
Baton KafkaConsumer::Unassign() {
if (!IsClosing() && !IsConnected()) {
return Baton(RdKafka::ERR__STATE);
@@ -195,12 +221,46 @@ Baton KafkaConsumer::Unassign() {
// Destroy the old list of partitions since we are no longer using it
RdKafka::TopicPartition::destroy(m_partitions);
+ m_partitions.clear();
m_partition_cnt = 0;
return Baton(RdKafka::ERR_NO_ERROR);
}
+Baton KafkaConsumer::IncrementalUnassign(
+ std::vector<RdKafka::TopicPartition*> partitions) {
+ if (!IsClosing() && !IsConnected()) {
+ return Baton(RdKafka::ERR__STATE);
+ }
+
+ RdKafka::KafkaConsumer* consumer =
+ dynamic_cast<RdKafka::KafkaConsumer*>(m_client);
+
+ RdKafka::Error *e = consumer->incremental_unassign(partitions);
+ if (e) {
+ RdKafka::ErrorCode errcode = e->code();
+ delete e;
+ return Baton(errcode);
+ }
+
+ // Destroy the old list of partitions since we are no longer using it
+ RdKafka::TopicPartition::destroy(partitions);
+
+ m_partitions.erase(
+ std::remove_if(
+ m_partitions.begin(),
+ m_partitions.end(),
+ [&partitions](RdKafka::TopicPartition *x) -> bool {
+ return std::find(
+ partitions.begin(),
+ partitions.end(), x) != partitions.end();
+ }),
+ m_partitions.end());
+ m_partition_cnt -= partitions.size();
+ return Baton(RdKafka::ERR_NO_ERROR);
+}
+
Baton KafkaConsumer::Commit(std::vector<RdKafka::TopicPartition*> toppars) {
if (!IsConnected()) {
return Baton(RdKafka::ERR__STATE);
@@ -469,6 +529,12 @@ Baton KafkaConsumer::RefreshAssignments() {
}
}
+bool KafkaConsumer::AssignmentLost() {
+ RdKafka::KafkaConsumer* consumer =
+ dynamic_cast<RdKafka::KafkaConsumer*>(m_client);
+ return consumer->assignment_lost();
+}
+
std::string KafkaConsumer::Name() {
if (!IsConnected()) {
return std::string("");
@@ -527,8 +593,11 @@ void KafkaConsumer::Init(v8::Local<v8::Object> exports) {
Nan::SetPrototypeMethod(tpl, "committed", NodeCommitted);
Nan::SetPrototypeMethod(tpl, "position", NodePosition);
Nan::SetPrototypeMethod(tpl, "assign", NodeAssign);
+ Nan::SetPrototypeMethod(tpl, "incrementalAssign", NodeIncrementalAssign);
Nan::SetPrototypeMethod(tpl, "unassign", NodeUnassign);
+ Nan::SetPrototypeMethod(tpl, "incrementalUnassign", NodeIncrementalUnassign);
Nan::SetPrototypeMethod(tpl, "assignments", NodeAssignments);
+ Nan::SetPrototypeMethod(tpl, "assignmentLost", NodeAssignmentLost);
Nan::SetPrototypeMethod(tpl, "commit", NodeCommit);
Nan::SetPrototypeMethod(tpl, "commitSync", NodeCommitSync);
@@ -759,6 +828,64 @@ NAN_METHOD(KafkaConsumer::NodeAssign) {
info.GetReturnValue().Set(Nan::True());
}
+NAN_METHOD(KafkaConsumer::NodeIncrementalAssign) {
+ Nan::HandleScope scope;
+
+ if (info.Length() < 1 || !info[0]->IsArray()) {
+ // Just throw an exception
+ return Nan::ThrowError("Need to specify an array of partitions");
+ }
+
+ v8::Local<v8::Array> partitions = info[0].As<v8::Array>();
+ std::vector<RdKafka::TopicPartition*> topic_partitions;
+
+ for (unsigned int i = 0; i < partitions->Length(); ++i) {
+ v8::Local<v8::Value> partition_obj_value;
+ if (!(
+ Nan::Get(partitions, i).ToLocal(&partition_obj_value) &&
+ partition_obj_value->IsObject())) {
+ Nan::ThrowError("Must pass topic-partition objects");
+ }
+
+ v8::Local<v8::Object> partition_obj = partition_obj_value.As<v8::Object>();
+
+ // Got the object
+ int64_t partition = GetParameter<int64_t>(partition_obj, "partition", -1);
+ std::string topic = GetParameter<std::string>(partition_obj, "topic", "");
+
+ if (!topic.empty()) {
+ RdKafka::TopicPartition* part;
+
+ if (partition < 0) {
+ part = Connection::GetPartition(topic);
+ } else {
+ part = Connection::GetPartition(topic, partition);
+ }
+
+ // Set the default value to offset invalid. If provided, we will not set
+ // the offset.
+ int64_t offset = GetParameter<int64_t>(
+ partition_obj, "offset", RdKafka::Topic::OFFSET_INVALID);
+ if (offset != RdKafka::Topic::OFFSET_INVALID) {
+ part->set_offset(offset);
+ }
+
+ topic_partitions.push_back(part);
+ }
+ }
+
+ KafkaConsumer* consumer = ObjectWrap::Unwrap<KafkaConsumer>(info.This());
+
+ // Hand over the partitions to the consumer.
+ Baton b = consumer->IncrementalAssign(topic_partitions);
+
+ if (b.err() != RdKafka::ERR_NO_ERROR) {
+ Nan::ThrowError(RdKafka::err2str(b.err()).c_str());
+ }
+
+ info.GetReturnValue().Set(Nan::True());
+}
+
NAN_METHOD(KafkaConsumer::NodeUnassign) {
Nan::HandleScope scope;
@@ -779,6 +906,71 @@ NAN_METHOD(KafkaConsumer::NodeUnassign) {
info.GetReturnValue().Set(Nan::True());
}
+NAN_METHOD(KafkaConsumer::NodeIncrementalUnassign) {
+ Nan::HandleScope scope;
+
+ if (info.Length() < 1 || !info[0]->IsArray()) {
+ // Just throw an exception
+ return Nan::ThrowError("Need to specify an array of partitions");
+ }
+
+ v8::Local<v8::Array> partitions = info[0].As<v8::Array>();
+ std::vector<RdKafka::TopicPartition*> topic_partitions;
+
+ for (unsigned int i = 0; i < partitions->Length(); ++i) {
+ v8::Local<v8::Value> partition_obj_value;
+ if (!(
+ Nan::Get(partitions, i).ToLocal(&partition_obj_value) &&
+ partition_obj_value->IsObject())) {
+ Nan::ThrowError("Must pass topic-partition objects");
+ }
+
+ v8::Local<v8::Object> partition_obj = partition_obj_value.As<v8::Object>();
+
+ // Got the object
+ int64_t partition = GetParameter<int64_t>(partition_obj, "partition", -1);
+ std::string topic = GetParameter<std::string>(partition_obj, "topic", "");
+
+ if (!topic.empty()) {
+ RdKafka::TopicPartition* part;
+
+ if (partition < 0) {
+ part = Connection::GetPartition(topic);
+ } else {
+ part = Connection::GetPartition(topic, partition);
+ }
+
+ // Set the default value to offset invalid. If provided, we will not set
+ // the offset.
+ int64_t offset = GetParameter<int64_t>(
+ partition_obj, "offset", RdKafka::Topic::OFFSET_INVALID);
+ if (offset != RdKafka::Topic::OFFSET_INVALID) {
+ part->set_offset(offset);
+ }
+
+ topic_partitions.push_back(part);
+ }
+ }
+
+ KafkaConsumer* consumer = ObjectWrap::Unwrap<KafkaConsumer>(info.This());
+ // Hand over the partitions to the consumer.
+ Baton b = consumer->IncrementalUnassign(topic_partitions);
+
+ if (b.err() != RdKafka::ERR_NO_ERROR) {
+ Nan::ThrowError(RdKafka::err2str(b.err()).c_str());
+ }
+
+ info.GetReturnValue().Set(Nan::True());
+}
+
+NAN_METHOD(KafkaConsumer::NodeAssignmentLost) {
+ Nan::HandleScope scope;
+
+ KafkaConsumer* consumer = ObjectWrap::Unwrap<KafkaConsumer>(info.This());
+ bool b = consumer->AssignmentLost();
+ info.GetReturnValue().Set(Nan::New<v8::Boolean>(b));
+}
+
NAN_METHOD(KafkaConsumer::NodeUnsubscribe) {
Nan::HandleScope scope;
diff --git a/src/kafka-consumer.h b/src/kafka-consumer.h
index c91590ecc5d47c1d7a2a93c3e46b4b4657525df0..43e016db4ec47121051cb282f718a2b3156aacd4 100644
--- a/src/kafka-consumer.h
+++ b/src/kafka-consumer.h
@@ -72,7 +72,10 @@ class KafkaConsumer : public Connection {
int AssignedPartitionCount();
Baton Assign(std::vector<RdKafka::TopicPartition*>);
+ Baton IncrementalAssign(std::vector<RdKafka::TopicPartition*>);
Baton Unassign();
+ Baton IncrementalUnassign(std::vector<RdKafka::TopicPartition*>);
+ bool AssignmentLost();
Baton Seek(const RdKafka::TopicPartition &partition, int timeout_ms);
@@ -105,7 +108,10 @@ class KafkaConsumer : public Connection {
static NAN_METHOD(NodeSubscribe);
static NAN_METHOD(NodeDisconnect);
static NAN_METHOD(NodeAssign);
+ static NAN_METHOD(NodeIncrementalAssign);
static NAN_METHOD(NodeUnassign);
+ static NAN_METHOD(NodeIncrementalUnassign);
+ static NAN_METHOD(NodeAssignmentLost);
static NAN_METHOD(NodeAssignments);
static NAN_METHOD(NodeUnsubscribe);
static NAN_METHOD(NodeCommit);
diff --git a/test/consumer.spec.js b/test/consumer.spec.js
index 40b52ee4e1c718890f43b91adfb543319d5cc342..5e1a5655be0d2598163478aaaae936213c3bf27c 100644
--- a/test/consumer.spec.js
+++ b/test/consumer.spec.js
@@ -77,7 +77,7 @@ module.exports = {
});
},
'has necessary bindings for librdkafka 1:1 binding': function() {
- var methods = ['assign', 'unassign', 'subscribe'];
+ var methods = ['assign', 'unassign', 'subscribe', 'incrementalAssign', 'incrementalUnassign', 'assignmentLost'];
methods.forEach(function(m) {
t.equal(typeof(client[m]), 'function', 'Client is missing ' + m + ' method');
});
diff --git a/test/kafka-consumer.spec.js b/test/kafka-consumer.spec.js
index 0f4de520ed6b8a06dfe355e0bb9091273def98a5..ada72a7e621ea5433f194ab3d22eef326082c155 100644
--- a/test/kafka-consumer.spec.js
+++ b/test/kafka-consumer.spec.js
@@ -7,7 +7,8 @@
* of the MIT license. See the LICENSE.txt file for details.
*/
-var KafkaConsumer = require('../lib/kafka-consumer');
+var KafkaConsumer = require('../lib/kafka-consumer').KafkaConsumer;
+
var t = require('assert');
var client;
diff --git a/deps/librdkafka/src/rdkafka_partition.h b/deps/librdkafka/src/rdkafka_partition.h
index f9dd686423..aef704b95f 100644
--- a/deps/librdkafka/src/rdkafka_partition.h
+++ b/deps/librdkafka/src/rdkafka_partition.h
@@ -68,24 +68,35 @@ struct rd_kafka_toppar_err {
* last msg sequence */
};
-
+/**
+ * @brief Fetchpos comparator, only offset is compared.
+ */
+static RD_UNUSED RD_INLINE int
+rd_kafka_fetch_pos_cmp_offset(const rd_kafka_fetch_pos_t *a,
+ const rd_kafka_fetch_pos_t *b) {
+ if (a->offset < b->offset)
+ return -1;
+ else if (a->offset > b->offset)
+ return 1;
+ else
+ return 0;
+}
/**
* @brief Fetchpos comparator, leader epoch has precedence.
+ * iff both values are not null.
*/
static RD_UNUSED RD_INLINE int
rd_kafka_fetch_pos_cmp(const rd_kafka_fetch_pos_t *a,
const rd_kafka_fetch_pos_t *b) {
+ if (a->leader_epoch == -1 || b->leader_epoch == -1)
+ return rd_kafka_fetch_pos_cmp_offset(a, b);
if (a->leader_epoch < b->leader_epoch)
return -1;
else if (a->leader_epoch > b->leader_epoch)
return 1;
- else if (a->offset < b->offset)
- return -1;
- else if (a->offset > b->offset)
- return 1;
else
- return 0;
+ return rd_kafka_fetch_pos_cmp_offset(a, b);
}