mirror of
https://github.com/PostHog/posthog.git
synced 2024-11-22 08:40:03 +01:00
1128 lines
36 KiB
Diff
1128 lines
36 KiB
Diff
diff --git a/Oops.rej b/Oops.rej
|
|
new file mode 100644
|
|
index 0000000000000000000000000000000000000000..328fc546fcb400745783b3562f1cb1cb055e1804
|
|
--- /dev/null
|
|
+++ b/Oops.rej
|
|
@@ -0,0 +1,26 @@
|
|
+@@ -1,25 +0,0 @@
|
|
+-# This workflow will run tests using node and then publish a package to GitHub Packages when a release is created
|
|
+-# For more information see: https://help.github.com/actions/language-and-framework-guides/publishing-nodejs-packages
|
|
+-
|
|
+-name: Publish node-rdkafka
|
|
+-
|
|
+-on:
|
|
+- release:
|
|
+- types: [created]
|
|
+-
|
|
+-jobs:
|
|
+- publish-npm:
|
|
+- runs-on: ubuntu-latest
|
|
+- steps:
|
|
+- - uses: actions/checkout@v3
|
|
+- with:
|
|
+- submodules: recursive
|
|
+- - uses: actions/setup-node@v3
|
|
+- with:
|
|
+- node-version: 18
|
|
+- registry-url: https://registry.npmjs.org/
|
|
+- cache: "npm"
|
|
+- - run: npm ci
|
|
+- - run: npm publish
|
|
+- env:
|
|
+- NODE_AUTH_TOKEN: ${{secrets.NPM_TOKEN}}
|
|
diff --git a/docker-compose.yml b/docker-compose.yml
|
|
index abe29df25c7312382074b3e15289cb862a340247..8a12f135b4f96e5a0dd25e7c21adb2b3b0e644fa 100644
|
|
--- a/docker-compose.yml
|
|
+++ b/docker-compose.yml
|
|
@@ -1,23 +1,51 @@
|
|
---
|
|
-zookeeper:
|
|
- image: confluentinc/cp-zookeeper
|
|
- ports:
|
|
- - "2181:2181"
|
|
- environment:
|
|
- ZOOKEEPER_CLIENT_PORT: 2181
|
|
- ZOOKEEPER_TICK_TIME: 2000
|
|
-kafka:
|
|
- image: confluentinc/cp-kafka
|
|
- links:
|
|
- - zookeeper
|
|
- ports:
|
|
- - "9092:9092"
|
|
- environment:
|
|
- KAFKA_BROKER_ID: 1
|
|
- KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
|
|
- KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092'
|
|
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
|
|
- KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
|
|
- KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
|
|
- KAFKA_DEFAULT_REPLICATION_FACTOR: 1
|
|
- KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
|
|
+version: '2'
|
|
+services:
|
|
+ zookeeper:
|
|
+ image: confluentinc/cp-zookeeper
|
|
+ ports:
|
|
+ - "2181:2181"
|
|
+ networks:
|
|
+ - localnet
|
|
+ environment:
|
|
+ ZOOKEEPER_CLIENT_PORT: 2181
|
|
+ ZOOKEEPER_TICK_TIME: 2000
|
|
+ kafka:
|
|
+ image: confluentinc/cp-kafka
|
|
+ ports:
|
|
+ - 9092:9092
|
|
+ - 9997:9997
|
|
+ networks:
|
|
+ - localnet
|
|
+ depends_on:
|
|
+ - zookeeper
|
|
+ environment:
|
|
+ KAFKA_BROKER_ID: 1
|
|
+ KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
|
|
+ KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
|
|
+ # KAFKA_LISTENERS: PLAINTEXT://kafka0:29092,PLAINTEXT_HOST://localhost:9092
|
|
+ KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
|
|
+ KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
|
|
+ KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
|
|
+ KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
|
|
+ KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
|
|
+ KAFKA_DEFAULT_REPLICATION_FACTOR: 1
|
|
+ KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
|
|
+ kafka-ui:
|
|
+ container_name: kafka-ui
|
|
+ image: provectuslabs/kafka-ui:latest
|
|
+ ports:
|
|
+ - 8080:8080
|
|
+ networks:
|
|
+ - localnet
|
|
+ depends_on:
|
|
+ - zookeeper
|
|
+ - kafka
|
|
+ environment:
|
|
+ KAFKA_CLUSTERS_0_NAME: local
|
|
+ KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
|
|
+ KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
|
|
+networks:
|
|
+ localnet:
|
|
+ attachable: true
|
|
+
|
|
diff --git a/e2e/both.spec.js b/e2e/both.spec.js
|
|
index a8289ec319239fb05b1f321bff78a7c9e267f1cf..85ca5ef64264a903a30d5d4bac31f6b1a3792102 100644
|
|
--- a/e2e/both.spec.js
|
|
+++ b/e2e/both.spec.js
|
|
@@ -163,7 +163,7 @@ describe('Consumer/Producer', function() {
|
|
|
|
});
|
|
});
|
|
-
|
|
+
|
|
it('should return ready messages on partition EOF', function(done) {
|
|
crypto.randomBytes(4096, function(ex, buffer) {
|
|
producer.setPollInterval(10);
|
|
diff --git a/e2e/consumer.spec.js b/e2e/consumer.spec.js
|
|
index a167483f1e0ea15c4edcb368e36640b4349574e8..38fcfd7464afb7df682b7b5f1fdb228b8d280a25 100644
|
|
--- a/e2e/consumer.spec.js
|
|
+++ b/e2e/consumer.spec.js
|
|
@@ -11,10 +11,12 @@ var crypto = require('crypto');
|
|
|
|
var eventListener = require('./listener');
|
|
|
|
+var cooperativeRebalanceCallback = require('../lib/kafka-consumer').cooperativeRebalanceCallback;
|
|
var KafkaConsumer = require('../').KafkaConsumer;
|
|
+var AdminClient = require('../').AdminClient;
|
|
+var LibrdKafkaError = require('../lib/error');
|
|
|
|
var kafkaBrokerList = process.env.KAFKA_HOST || 'localhost:9092';
|
|
-var topic = 'test';
|
|
|
|
describe('Consumer', function() {
|
|
var gcfg;
|
|
@@ -31,6 +33,7 @@ describe('Consumer', function() {
|
|
});
|
|
|
|
describe('commit', function() {
|
|
+ var topic = 'test';
|
|
var consumer;
|
|
beforeEach(function(done) {
|
|
consumer = new KafkaConsumer(gcfg, {});
|
|
@@ -61,6 +64,7 @@ describe('Consumer', function() {
|
|
});
|
|
|
|
describe('committed and position', function() {
|
|
+ var topic = 'test';
|
|
var consumer;
|
|
beforeEach(function(done) {
|
|
consumer = new KafkaConsumer(gcfg, {});
|
|
@@ -95,6 +99,7 @@ describe('Consumer', function() {
|
|
});
|
|
|
|
it('after assign, should get committed array without offsets ', function(done) {
|
|
+ var topic = 'test';
|
|
consumer.assign([{topic:topic, partition:0}]);
|
|
// Defer this for a second
|
|
setTimeout(function() {
|
|
@@ -110,6 +115,7 @@ describe('Consumer', function() {
|
|
});
|
|
|
|
it('after assign and commit, should get committed offsets', function(done) {
|
|
+ var topic = 'test';
|
|
consumer.assign([{topic:topic, partition:0}]);
|
|
consumer.commitSync({topic:topic, partition:0, offset:1000});
|
|
consumer.committed(null, 1000, function(err, committed) {
|
|
@@ -123,6 +129,7 @@ describe('Consumer', function() {
|
|
});
|
|
|
|
it('after assign, before consume, position should return an array without offsets', function(done) {
|
|
+ var topic = 'test';
|
|
consumer.assign([{topic:topic, partition:0}]);
|
|
var position = consumer.position();
|
|
t.equal(Array.isArray(position), true, 'Position should be an array');
|
|
@@ -147,6 +154,7 @@ describe('Consumer', function() {
|
|
});
|
|
|
|
describe('seek and positioning', function() {
|
|
+ var topic = 'test';
|
|
var consumer;
|
|
beforeEach(function(done) {
|
|
consumer = new KafkaConsumer(gcfg, {});
|
|
@@ -195,6 +203,7 @@ describe('Consumer', function() {
|
|
|
|
describe('subscribe', function() {
|
|
|
|
+ var topic = 'test';
|
|
var consumer;
|
|
beforeEach(function(done) {
|
|
consumer = new KafkaConsumer(gcfg, {});
|
|
@@ -232,6 +241,7 @@ describe('Consumer', function() {
|
|
|
|
describe('assign', function() {
|
|
|
|
+ var topic = 'test';
|
|
var consumer;
|
|
beforeEach(function(done) {
|
|
consumer = new KafkaConsumer(gcfg, {});
|
|
@@ -266,7 +276,346 @@ describe('Consumer', function() {
|
|
});
|
|
});
|
|
|
|
+ describe('assignmentLost', function() {
|
|
+ function pollForTopic(client, topicName, maxTries, tryDelay, cb, customCondition) {
|
|
+ var tries = 0;
|
|
+
|
|
+ function getTopicIfExists(innerCb) {
|
|
+ client.getMetadata({
|
|
+ topic: topicName,
|
|
+ }, function(metadataErr, metadata) {
|
|
+ if (metadataErr) {
|
|
+ cb(metadataErr);
|
|
+ return;
|
|
+ }
|
|
+
|
|
+ var topicFound = metadata.topics.filter(function(topicObj) {
|
|
+ var foundTopic = topicObj.name === topicName;
|
|
+
|
|
+ // If we have a custom condition for "foundedness", do it here after
|
|
+ // we make sure we are operating on the correct topic
|
|
+ if (foundTopic && customCondition) {
|
|
+ return customCondition(topicObj);
|
|
+ }
|
|
+ return foundTopic;
|
|
+ });
|
|
+
|
|
+ if (topicFound.length >= 1) {
|
|
+ innerCb(null, topicFound[0]);
|
|
+ return;
|
|
+ }
|
|
+
|
|
+ innerCb(new Error('Could not find topic ' + topicName));
|
|
+ });
|
|
+ }
|
|
+
|
|
+ function maybeFinish(err, obj) {
|
|
+ if (err) {
|
|
+ queueNextTry();
|
|
+ return;
|
|
+ }
|
|
+
|
|
+ cb(null, obj);
|
|
+ }
|
|
+
|
|
+ function queueNextTry() {
|
|
+ tries += 1;
|
|
+ if (tries < maxTries) {
|
|
+ setTimeout(function() {
|
|
+ getTopicIfExists(maybeFinish);
|
|
+ }, tryDelay);
|
|
+ } else {
|
|
+ cb(new Error('Exceeded max tries of ' + maxTries));
|
|
+ }
|
|
+ }
|
|
+
|
|
+ queueNextTry();
|
|
+ }
|
|
+
|
|
+ var client = AdminClient.create({
|
|
+ 'client.id': 'kafka-test',
|
|
+ 'metadata.broker.list': kafkaBrokerList
|
|
+ });
|
|
+ var consumer1;
|
|
+ var consumer2;
|
|
+ var assignmentLostCount = 0;
|
|
+ var grp = 'kafka-mocha-grp-' + crypto.randomBytes(20).toString('hex');
|
|
+ var assignment_lost_gcfg = {
|
|
+ 'bootstrap.servers': kafkaBrokerList,
|
|
+ 'group.id': grp,
|
|
+ 'debug': 'all',
|
|
+ 'enable.auto.commit': false,
|
|
+ 'session.timeout.ms': 10000,
|
|
+ 'heartbeat.interval.ms': 1000,
|
|
+ 'auto.offset.reset': 'earliest',
|
|
+ 'topic.metadata.refresh.interval.ms': 3000,
|
|
+ 'partition.assignment.strategy': 'cooperative-sticky',
|
|
+ 'rebalance_cb': function(err, assignment) {
|
|
+ if (
|
|
+ err.code === LibrdKafkaError.codes.ERR__REVOKE_PARTITIONS &&
|
|
+ this.assignmentLost()
|
|
+ ) {
|
|
+ assignmentLostCount++;
|
|
+ }
|
|
+ cooperativeRebalanceCallback.call(this, err, assignment);
|
|
+ }
|
|
+ };
|
|
+
|
|
+ beforeEach(function(done) {
|
|
+ assignment_lost_gcfg['client.id'] = 1;
|
|
+ consumer1 = new KafkaConsumer(assignment_lost_gcfg, {});
|
|
+ eventListener(consumer1);
|
|
+ consumer1.connect({ timeout: 2000 }, function(err, info) {
|
|
+ t.ifError(err);
|
|
+ });
|
|
+ assignment_lost_gcfg['client.id'] = 2;
|
|
+ consumer2 = new KafkaConsumer(assignment_lost_gcfg, {});
|
|
+ eventListener(consumer2);
|
|
+ consumer2.connect({ timeout: 2000 }, function(err, info) {
|
|
+ t.ifError(err);
|
|
+ done();
|
|
+ });
|
|
+ });
|
|
+
|
|
+ afterEach(function(done) {
|
|
+ consumer1.disconnect(function() {
|
|
+ consumer2.disconnect(function() {
|
|
+ done();
|
|
+ });
|
|
+ });
|
|
+ });
|
|
+
|
|
+ it('should return false if not lost', function() {
|
|
+ t.equal(false, consumer1.assignmentLost());
|
|
+ });
|
|
+
|
|
+ it('should be lost if topic gets deleted', function(cb) {
|
|
+ this.timeout(100000);
|
|
+
|
|
+ var time = Date.now();
|
|
+ var topicName = 'consumer-assignment-lost-test-topic-' + time;
|
|
+ var topicName2 = 'consumer-assignment-lost-test-topic2-' + time;
|
|
+ var deleting = false;
|
|
+
|
|
+ client.createTopic({
|
|
+ topic: topicName,
|
|
+ num_partitions: 2,
|
|
+ replication_factor: 1
|
|
+ }, function(err) {
|
|
+ pollForTopic(consumer1, topicName, 10, 1000, function(err) {
|
|
+ t.ifError(err);
|
|
+ client.createTopic({
|
|
+ topic: topicName2,
|
|
+ num_partitions: 2,
|
|
+ replication_factor: 1
|
|
+ }, function(err) {
|
|
+ pollForTopic(consumer1, topicName2, 10, 1000, function(err) {
|
|
+ t.ifError(err);
|
|
+ consumer1.subscribe([topicName, topicName2]);
|
|
+ consumer2.subscribe([topicName, topicName2]);
|
|
+ consumer1.consume();
|
|
+ consumer2.consume();
|
|
+ var tryDelete = function() {
|
|
+ setTimeout(function() {
|
|
+ if(consumer1.assignments().length === 2 &&
|
|
+ consumer2.assignments().length === 2
|
|
+ ) {
|
|
+ client.deleteTopic(topicName, function(deleteErr) {
|
|
+ t.ifError(deleteErr);
|
|
+ });
|
|
+ } else {
|
|
+ tryDelete();
|
|
+ }
|
|
+ }, 2000);
|
|
+ };
|
|
+ tryDelete();
|
|
+ });
|
|
+ });
|
|
+ });
|
|
+ });
|
|
+
|
|
+ var checking = false;
|
|
+ setInterval(function() {
|
|
+ if (assignmentLostCount >= 2 && !checking) {
|
|
+ checking = true;
|
|
+ t.equal(assignmentLostCount, 2);
|
|
+ client.deleteTopic(topicName2, function(deleteErr) {
|
|
+ // Cleanup topics
|
|
+ t.ifError(deleteErr);
|
|
+ cb();
|
|
+ });
|
|
+ }
|
|
+ }, 2000);
|
|
+ });
|
|
+
|
|
+ });
|
|
+
|
|
+ describe('incrementalAssign and incrementUnassign', function() {
|
|
+
|
|
+ var topic = 'test7';
|
|
+ var consumer;
|
|
+ beforeEach(function(done) {
|
|
+ consumer = new KafkaConsumer(gcfg, {});
|
|
+
|
|
+ consumer.connect({ timeout: 2000 }, function(err, info) {
|
|
+ t.ifError(err);
|
|
+ done();
|
|
+ });
|
|
+
|
|
+ eventListener(consumer);
|
|
+ });
|
|
+
|
|
+ afterEach(function(done) {
|
|
+ consumer.disconnect(function() {
|
|
+ done();
|
|
+ });
|
|
+ });
|
|
+
|
|
+ it('should be able to assign an assignment', function() {
|
|
+ t.equal(0, consumer.assignments().length);
|
|
+ var assignments = [{ topic:topic, partition:0 }];
|
|
+ consumer.assign(assignments);
|
|
+ t.equal(1, consumer.assignments().length);
|
|
+ t.equal(0, consumer.assignments()[0].partition);
|
|
+ t.equal(0, consumer.subscription().length);
|
|
+
|
|
+ var additionalAssignment = [{ topic:topic, partition:1 }];
|
|
+ consumer.incrementalAssign(additionalAssignment);
|
|
+ t.equal(2, consumer.assignments().length);
|
|
+ t.equal(0, consumer.assignments()[0].partition);
|
|
+ t.equal(1, consumer.assignments()[1].partition);
|
|
+ t.equal(0, consumer.subscription().length);
|
|
+ });
|
|
+
|
|
+ it('should be able to revoke an assignment', function() {
|
|
+ t.equal(0, consumer.assignments().length);
|
|
+ var assignments = [{ topic:topic, partition:0 }, { topic:topic, partition:1 }, { topic:topic, partition:2 }];
|
|
+ consumer.assign(assignments);
|
|
+ t.equal(3, consumer.assignments().length);
|
|
+ t.equal(0, consumer.assignments()[0].partition);
|
|
+ t.equal(1, consumer.assignments()[1].partition);
|
|
+ t.equal(2, consumer.assignments()[2].partition);
|
|
+ t.equal(0, consumer.subscription().length);
|
|
+
|
|
+ var revokedAssignments = [{ topic:topic, partition:2 }];
|
|
+ consumer.incrementalUnassign(revokedAssignments);
|
|
+ t.equal(2, consumer.assignments().length);
|
|
+ t.equal(0, consumer.assignments()[0].partition);
|
|
+ t.equal(1, consumer.assignments()[1].partition);
|
|
+ t.equal(0, consumer.subscription().length);
|
|
+ });
|
|
+
|
|
+ });
|
|
+
|
|
+ describe('rebalance', function() {
|
|
+
|
|
+ var topic = 'test7';
|
|
+ var grp = 'kafka-mocha-grp-' + crypto.randomBytes(20).toString('hex');
|
|
+ var consumer1;
|
|
+ var consumer2;
|
|
+ var counter = 0;
|
|
+ var reblance_gcfg = {
|
|
+ 'bootstrap.servers': kafkaBrokerList,
|
|
+ 'group.id': grp,
|
|
+ 'debug': 'all',
|
|
+ 'enable.auto.commit': false,
|
|
+ 'heartbeat.interval.ms': 200,
|
|
+ 'rebalance_cb': true
|
|
+ };
|
|
+
|
|
+ it('should be able reblance using the eager strategy', function(done) {
|
|
+ this.timeout(20000);
|
|
+
|
|
+ var isStarted = false;
|
|
+ reblance_gcfg['partition.assignment.strategy'] = 'range,roundrobin';
|
|
+
|
|
+ reblance_gcfg['client.id'] = '1';
|
|
+ consumer1 = new KafkaConsumer(reblance_gcfg, {});
|
|
+ reblance_gcfg['client.id'] = '2';
|
|
+ consumer2 = new KafkaConsumer(reblance_gcfg, {});
|
|
+
|
|
+ eventListener(consumer1);
|
|
+ eventListener(consumer2);
|
|
+
|
|
+ consumer1.connect({ timeout: 2000 }, function(err, info) {
|
|
+ t.ifError(err);
|
|
+ consumer1.subscribe([topic]);
|
|
+ consumer1.on('rebalance', function(err, assignment) {
|
|
+ counter++;
|
|
+ if (!isStarted) {
|
|
+ isStarted = true;
|
|
+ consumer2.connect({ timeout: 2000 }, function(err, info) {
|
|
+ consumer2.subscribe([topic]);
|
|
+ consumer2.consume();
|
|
+ consumer2.on('rebalance', function(err, assignment) {
|
|
+ counter++;
|
|
+ });
|
|
+ });
|
|
+ }
|
|
+ });
|
|
+ consumer1.consume();
|
|
+ });
|
|
+
|
|
+ setTimeout(function() {
|
|
+ t.deepStrictEqual(consumer1.assignments(), [ { topic: topic, partition: 0, offset: -1000 } ]);
|
|
+ t.deepStrictEqual(consumer2.assignments(), [ { topic: topic, partition: 1, offset: -1000 } ]);
|
|
+ t.equal(counter, 4);
|
|
+ consumer1.disconnect(function() {
|
|
+ consumer2.disconnect(function() {
|
|
+ done();
|
|
+ });
|
|
+ });
|
|
+ }, 9000);
|
|
+ });
|
|
+
|
|
+ it('should be able reblance using the cooperative incremental strategy', function(cb) {
|
|
+ this.timeout(20000);
|
|
+ var isStarted = false;
|
|
+ reblance_gcfg['partition.assignment.strategy'] = 'cooperative-sticky';
|
|
+ reblance_gcfg['client.id'] = '1';
|
|
+ consumer1 = new KafkaConsumer(reblance_gcfg, {});
|
|
+ reblance_gcfg['client.id'] = '2';
|
|
+ consumer2 = new KafkaConsumer(reblance_gcfg, {});
|
|
+
|
|
+ eventListener(consumer1);
|
|
+ eventListener(consumer2);
|
|
+
|
|
+ consumer1.connect({ timeout: 2000 }, function(err, info) {
|
|
+ t.ifError(err);
|
|
+ consumer1.subscribe([topic]);
|
|
+ consumer1.on('rebalance', function(err, assignment) {
|
|
+ if (!isStarted) {
|
|
+ isStarted = true;
|
|
+ consumer2.connect({ timeout: 2000 }, function(err, info) {
|
|
+ consumer2.subscribe([topic]);
|
|
+ consumer2.consume();
|
|
+ consumer2.on('rebalance', function(err, assignment) {
|
|
+ counter++;
|
|
+ });
|
|
+ });
|
|
+ }
|
|
+ });
|
|
+ consumer1.consume();
|
|
+ });
|
|
+
|
|
+ setTimeout(function() {
|
|
+ t.equal(consumer1.assignments().length, 1);
|
|
+ t.equal(consumer2.assignments().length, 1);
|
|
+ t.equal(counter, 8);
|
|
+
|
|
+ consumer1.disconnect(function() {
|
|
+ consumer2.disconnect(function() {
|
|
+ cb();
|
|
+ });
|
|
+ });
|
|
+ }, 9000);
|
|
+ });
|
|
+
|
|
+ });
|
|
+
|
|
describe('disconnect', function() {
|
|
+
|
|
+ var topic = 'test';
|
|
var tcfg = { 'auto.offset.reset': 'earliest' };
|
|
|
|
it('should happen gracefully', function(cb) {
|
|
diff --git a/index.d.ts b/index.d.ts
|
|
index d7ce7e61e985ce46ceae2c10329d6448cc487dca..2c7b9a3d40b0547209c2cffe1f4e62d9573ab617 100644
|
|
--- a/index.d.ts
|
|
+++ b/index.d.ts
|
|
@@ -223,6 +223,12 @@ export class KafkaConsumer extends Client<KafkaConsumerEvents> {
|
|
consume(cb: (err: LibrdKafkaError, messages: Message[]) => void): void;
|
|
consume(): void;
|
|
|
|
+ incrementalAssign(assigments: Assignment[]): this;
|
|
+
|
|
+ incrementalUnassign(assignments: Assignment[]): this;
|
|
+
|
|
+ assignmentLost(): boolean;
|
|
+
|
|
getWatermarkOffsets(topic: string, partition: number): WatermarkOffsets;
|
|
|
|
offsetsStore(topicPartitions: TopicPartitionOffset[]): any;
|
|
diff --git a/lib/index.js b/lib/index.js
|
|
index e2e8a9c899700e56b3ddeff84e67ad97206ccabf..ba6d678275101170aedc694fedc489f479b5d05e 100644
|
|
--- a/lib/index.js
|
|
+++ b/lib/index.js
|
|
@@ -7,7 +7,7 @@
|
|
* of the MIT license. See the LICENSE.txt file for details.
|
|
*/
|
|
|
|
-var KafkaConsumer = require('./kafka-consumer');
|
|
+var KafkaConsumer = require('./kafka-consumer').KafkaConsumer;
|
|
var Producer = require('./producer');
|
|
var HighLevelProducer = require('./producer/high-level-producer');
|
|
var error = require('./error');
|
|
diff --git a/lib/kafka-consumer.js b/lib/kafka-consumer.js
|
|
index c479240f3bab17c68e38623b89ef67546ba59122..97e8458ab28757d013172de31e238ee2ee3f6ebc 100644
|
|
--- a/lib/kafka-consumer.js
|
|
+++ b/lib/kafka-consumer.js
|
|
@@ -8,8 +8,6 @@
|
|
*/
|
|
'use strict';
|
|
|
|
-module.exports = KafkaConsumer;
|
|
-
|
|
var Client = require('./client');
|
|
var util = require('util');
|
|
var Kafka = require('../librdkafka');
|
|
@@ -21,6 +19,48 @@ var DEFAULT_CONSUME_LOOP_TIMEOUT_DELAY = 500;
|
|
var DEFAULT_CONSUME_TIME_OUT = 1000;
|
|
util.inherits(KafkaConsumer, Client);
|
|
|
|
+var eagerRebalanceCallback = function(err, assignment) {
|
|
+ // Create the librdkafka error
|
|
+ err = LibrdKafkaError.create(err);
|
|
+ // Emit the event
|
|
+ this.emit('rebalance', err, assignment);
|
|
+
|
|
+ // That's it
|
|
+ try {
|
|
+ if (err.code === LibrdKafkaError.codes.ERR__ASSIGN_PARTITIONS) {
|
|
+ this.assign(assignment);
|
|
+ } else if (err.code === LibrdKafkaError.codes.ERR__REVOKE_PARTITIONS) {
|
|
+ this.unassign();
|
|
+ }
|
|
+ } catch (e) {
|
|
+ // Ignore exceptions if we are not connected
|
|
+ if (this.isConnected()) {
|
|
+ this.emit('rebalance.error', e);
|
|
+ }
|
|
+ }
|
|
+};
|
|
+
|
|
+var cooperativeRebalanceCallback = function(err, assignment) {
|
|
+ // Create the librdkafka error
|
|
+ err = LibrdKafkaError.create(err);
|
|
+ // Emit the event
|
|
+ this.emit('rebalance', err, assignment);
|
|
+
|
|
+ // That's it
|
|
+ try {
|
|
+ if (err.code === LibrdKafkaError.codes.ERR__ASSIGN_PARTITIONS) {
|
|
+ this.incrementalAssign(assignment);
|
|
+ } else if (err.code === LibrdKafkaError.codes.ERR__REVOKE_PARTITIONS) {
|
|
+ this.incrementalUnassign(assignment);
|
|
+ }
|
|
+ } catch (e) {
|
|
+ // Ignore exceptions if we are not connected
|
|
+ if (this.isConnected()) {
|
|
+ this.emit('rebalance.error', e);
|
|
+ }
|
|
+ }
|
|
+};
|
|
+
|
|
/**
|
|
* KafkaConsumer class for reading messages from Kafka
|
|
*
|
|
@@ -52,26 +92,10 @@ function KafkaConsumer(conf, topicConf) {
|
|
|
|
// If rebalance is undefined we don't want any part of this
|
|
if (onRebalance && typeof onRebalance === 'boolean') {
|
|
- conf.rebalance_cb = function(err, assignment) {
|
|
- // Create the librdkafka error
|
|
- err = LibrdKafkaError.create(err);
|
|
- // Emit the event
|
|
- self.emit('rebalance', err, assignment);
|
|
-
|
|
- // That's it
|
|
- try {
|
|
- if (err.code === -175 /*ERR__ASSIGN_PARTITIONS*/) {
|
|
- self.assign(assignment);
|
|
- } else if (err.code === -174 /*ERR__REVOKE_PARTITIONS*/) {
|
|
- self.unassign();
|
|
- }
|
|
- } catch (e) {
|
|
- // Ignore exceptions if we are not connected
|
|
- if (self.isConnected()) {
|
|
- self.emit('rebalance.error', e);
|
|
- }
|
|
- }
|
|
- };
|
|
+ conf.rebalance_cb =
|
|
+ conf['partition.assignment.strategy'] === 'cooperative-sticky' ?
|
|
+ cooperativeRebalanceCallback.bind(this) :
|
|
+ eagerRebalanceCallback.bind(this);
|
|
} else if (onRebalance && typeof onRebalance === 'function') {
|
|
/*
|
|
* Once this is opted in to, that's it. It's going to manually rebalance
|
|
@@ -79,13 +103,13 @@ function KafkaConsumer(conf, topicConf) {
|
|
* a way to override them.
|
|
*/
|
|
|
|
- conf.rebalance_cb = function(err, assignment) {
|
|
- // Create the librdkafka error
|
|
- err = err ? LibrdKafkaError.create(err) : undefined;
|
|
+ conf.rebalance_cb = function(err, assignment) {
|
|
+ // Create the librdkafka error
|
|
+ err = err ? LibrdKafkaError.create(err) : undefined;
|
|
|
|
- self.emit('rebalance', err, assignment);
|
|
- onRebalance.call(self, err, assignment);
|
|
- };
|
|
+ self.emit('rebalance', err, assignment);
|
|
+ onRebalance.call(self, err, assignment);
|
|
+ };
|
|
}
|
|
|
|
// Same treatment for offset_commit_cb
|
|
@@ -264,6 +288,19 @@ KafkaConsumer.prototype.assign = function(assignments) {
|
|
return this;
|
|
};
|
|
|
|
+/**
|
|
+ * Incremental assign the consumer specific partitions and topics
|
|
+ *
|
|
+ * @param {array} assignments - Assignments array. Should contain
|
|
+ * objects with topic and partition set.
|
|
+ * @return {Client} - Returns itself
|
|
+ */
|
|
+
|
|
+KafkaConsumer.prototype.incrementalAssign = function(assignments) {
|
|
+ this._client.incrementalAssign(TopicPartition.map(assignments));
|
|
+ return this;
|
|
+};
|
|
+
|
|
/**
|
|
* Unassign the consumer from its assigned partitions and topics.
|
|
*
|
|
@@ -275,6 +312,34 @@ KafkaConsumer.prototype.unassign = function() {
|
|
return this;
|
|
};
|
|
|
|
+/**
|
|
+ * Incremental unassign the consumer from specific partitions and topics
|
|
+ *
|
|
+ * @param {array} assignments - Assignments array. Should contain
|
|
+ * objects with topic and partition set.
|
|
+ * @return {Client} - Returns itself
|
|
+ */
|
|
+
|
|
+KafkaConsumer.prototype.incrementalUnassign = function(assignments) {
|
|
+ this._client.incrementalUnassign(TopicPartition.map(assignments));
|
|
+ return this;
|
|
+};
|
|
+
|
|
+/**
|
|
+ * Get the assignment lost state.
|
|
+ * Examples for an assignment to be lost:
|
|
+ * - Unsuccessful heartbeats
|
|
+ * - Unknown member id during heartbeats
|
|
+ * - Illegal generation during heartbeats
|
|
+ * - Static consumer fenced by other consumer with same group.instance.id
|
|
+ * - Max. poll interval exceeded
|
|
+ * - Subscribed topic(s) no longer exist during meta data updates
|
|
+ * @return {boolean} - Returns true if the assignment is lost
|
|
+ */
|
|
+
|
|
+KafkaConsumer.prototype.assignmentLost = function() {
|
|
+ return this._client.assignmentLost();
|
|
+};
|
|
|
|
/**
|
|
* Get the assignments for the consumer
|
|
@@ -654,3 +719,9 @@ KafkaConsumer.prototype.pause = function(topicPartitions) {
|
|
|
|
return this._errorWrap(this._client.pause(topicPartitions), true);
|
|
};
|
|
+
|
|
+module.exports = {
|
|
+ KafkaConsumer: KafkaConsumer,
|
|
+ eagerRebalanceCallback: eagerRebalanceCallback,
|
|
+ cooperativeRebalanceCallback: cooperativeRebalanceCallback
|
|
+};
|
|
diff --git a/run_docker.sh b/run_docker.sh
|
|
index a6aadbd64609e5d5ae1a80205aac7ce3a49d9345..f817aa976c83b74670c7464099679eb32a390051 100755
|
|
--- a/run_docker.sh
|
|
+++ b/run_docker.sh
|
|
@@ -21,14 +21,16 @@ topics=(
|
|
"test4"
|
|
"test5"
|
|
"test6"
|
|
+ "test7"
|
|
)
|
|
|
|
# Run docker-compose exec to make them
|
|
for topic in "${topics[@]}"
|
|
do
|
|
echo "Making topic $topic"
|
|
+ [[ "$topic" != "test7" ]] && partitions=1 || partitions=2
|
|
until docker-compose exec kafka \
|
|
- kafka-topics --create --topic $topic --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:2181
|
|
+ kafka-topics --create --topic $topic --partitions $partitions --replication-factor 1 --if-not-exists --bootstrap-server localhost:9092
|
|
do
|
|
topic_result="$?"
|
|
if [ "$topic_result" == "1" ]; then
|
|
diff --git a/src/kafka-consumer.cc b/src/kafka-consumer.cc
|
|
index 019b0cb6478756120efe9a5f6f1bb4182b4af4ea..3895407788ae31ae38d7707eb63528ebac6e3b24 100644
|
|
--- a/src/kafka-consumer.cc
|
|
+++ b/src/kafka-consumer.cc
|
|
@@ -179,6 +179,32 @@ Baton KafkaConsumer::Assign(std::vector<RdKafka::TopicPartition*> partitions) {
|
|
return Baton(errcode);
|
|
}
|
|
|
|
+Baton KafkaConsumer::IncrementalAssign(
|
|
+ std::vector<RdKafka::TopicPartition *> partitions) {
|
|
+ if (!IsConnected()) {
|
|
+ return Baton(RdKafka::ERR__STATE, "KafkaConsumer is disconnected");
|
|
+ }
|
|
+
|
|
+ RdKafka::KafkaConsumer* consumer =
|
|
+ dynamic_cast<RdKafka::KafkaConsumer*>(m_client);
|
|
+
|
|
+ RdKafka::Error *e = consumer->incremental_assign(partitions);
|
|
+
|
|
+ if (e) {
|
|
+ RdKafka::ErrorCode errcode = e->code();
|
|
+ delete e;
|
|
+ return Baton(errcode);
|
|
+ }
|
|
+
|
|
+ m_partition_cnt += partitions.size();
|
|
+ for (auto i = partitions.begin(); i != partitions.end(); ++i) {
|
|
+ m_partitions.push_back(*i);
|
|
+ }
|
|
+ partitions.clear();
|
|
+
|
|
+ return Baton(RdKafka::ERR_NO_ERROR);
|
|
+}
|
|
+
|
|
Baton KafkaConsumer::Unassign() {
|
|
if (!IsClosing() && !IsConnected()) {
|
|
return Baton(RdKafka::ERR__STATE);
|
|
@@ -195,12 +221,46 @@ Baton KafkaConsumer::Unassign() {
|
|
|
|
// Destroy the old list of partitions since we are no longer using it
|
|
RdKafka::TopicPartition::destroy(m_partitions);
|
|
+ m_partitions.clear();
|
|
|
|
m_partition_cnt = 0;
|
|
|
|
return Baton(RdKafka::ERR_NO_ERROR);
|
|
}
|
|
|
|
+Baton KafkaConsumer::IncrementalUnassign(
|
|
+ std::vector<RdKafka::TopicPartition*> partitions) {
|
|
+ if (!IsClosing() && !IsConnected()) {
|
|
+ return Baton(RdKafka::ERR__STATE);
|
|
+ }
|
|
+
|
|
+ RdKafka::KafkaConsumer* consumer =
|
|
+ dynamic_cast<RdKafka::KafkaConsumer*>(m_client);
|
|
+
|
|
+ RdKafka::Error *e = consumer->incremental_unassign(partitions);
|
|
+ if (e) {
|
|
+ RdKafka::ErrorCode errcode = e->code();
|
|
+ delete e;
|
|
+ return Baton(errcode);
|
|
+ }
|
|
+
|
|
+ // Destroy the old list of partitions since we are no longer using it
|
|
+ RdKafka::TopicPartition::destroy(partitions);
|
|
+
|
|
+ m_partitions.erase(
|
|
+ std::remove_if(
|
|
+ m_partitions.begin(),
|
|
+ m_partitions.end(),
|
|
+ [&partitions](RdKafka::TopicPartition *x) -> bool {
|
|
+ return std::find(
|
|
+ partitions.begin(),
|
|
+ partitions.end(), x) != partitions.end();
|
|
+ }),
|
|
+ m_partitions.end());
|
|
+ m_partition_cnt -= partitions.size();
|
|
+ return Baton(RdKafka::ERR_NO_ERROR);
|
|
+}
|
|
+
|
|
Baton KafkaConsumer::Commit(std::vector<RdKafka::TopicPartition*> toppars) {
|
|
if (!IsConnected()) {
|
|
return Baton(RdKafka::ERR__STATE);
|
|
@@ -469,6 +529,12 @@ Baton KafkaConsumer::RefreshAssignments() {
|
|
}
|
|
}
|
|
|
|
+bool KafkaConsumer::AssignmentLost() {
|
|
+ RdKafka::KafkaConsumer* consumer =
|
|
+ dynamic_cast<RdKafka::KafkaConsumer*>(m_client);
|
|
+ return consumer->assignment_lost();
|
|
+}
|
|
+
|
|
std::string KafkaConsumer::Name() {
|
|
if (!IsConnected()) {
|
|
return std::string("");
|
|
@@ -527,8 +593,11 @@ void KafkaConsumer::Init(v8::Local<v8::Object> exports) {
|
|
Nan::SetPrototypeMethod(tpl, "committed", NodeCommitted);
|
|
Nan::SetPrototypeMethod(tpl, "position", NodePosition);
|
|
Nan::SetPrototypeMethod(tpl, "assign", NodeAssign);
|
|
+ Nan::SetPrototypeMethod(tpl, "incrementalAssign", NodeIncrementalAssign);
|
|
Nan::SetPrototypeMethod(tpl, "unassign", NodeUnassign);
|
|
+ Nan::SetPrototypeMethod(tpl, "incrementalUnassign", NodeIncrementalUnassign);
|
|
Nan::SetPrototypeMethod(tpl, "assignments", NodeAssignments);
|
|
+ Nan::SetPrototypeMethod(tpl, "assignmentLost", NodeAssignmentLost);
|
|
|
|
Nan::SetPrototypeMethod(tpl, "commit", NodeCommit);
|
|
Nan::SetPrototypeMethod(tpl, "commitSync", NodeCommitSync);
|
|
@@ -759,6 +828,64 @@ NAN_METHOD(KafkaConsumer::NodeAssign) {
|
|
info.GetReturnValue().Set(Nan::True());
|
|
}
|
|
|
|
+NAN_METHOD(KafkaConsumer::NodeIncrementalAssign) {
|
|
+ Nan::HandleScope scope;
|
|
+
|
|
+ if (info.Length() < 1 || !info[0]->IsArray()) {
|
|
+ // Just throw an exception
|
|
+ return Nan::ThrowError("Need to specify an array of partitions");
|
|
+ }
|
|
+
|
|
+ v8::Local<v8::Array> partitions = info[0].As<v8::Array>();
|
|
+ std::vector<RdKafka::TopicPartition*> topic_partitions;
|
|
+
|
|
+ for (unsigned int i = 0; i < partitions->Length(); ++i) {
|
|
+ v8::Local<v8::Value> partition_obj_value;
|
|
+ if (!(
|
|
+ Nan::Get(partitions, i).ToLocal(&partition_obj_value) &&
|
|
+ partition_obj_value->IsObject())) {
|
|
+ Nan::ThrowError("Must pass topic-partition objects");
|
|
+ }
|
|
+
|
|
+ v8::Local<v8::Object> partition_obj = partition_obj_value.As<v8::Object>();
|
|
+
|
|
+ // Got the object
|
|
+ int64_t partition = GetParameter<int64_t>(partition_obj, "partition", -1);
|
|
+ std::string topic = GetParameter<std::string>(partition_obj, "topic", "");
|
|
+
|
|
+ if (!topic.empty()) {
|
|
+ RdKafka::TopicPartition* part;
|
|
+
|
|
+ if (partition < 0) {
|
|
+ part = Connection::GetPartition(topic);
|
|
+ } else {
|
|
+ part = Connection::GetPartition(topic, partition);
|
|
+ }
|
|
+
|
|
+ // Set the default value to offset invalid. If provided, we will not set
|
|
+ // the offset.
|
|
+ int64_t offset = GetParameter<int64_t>(
|
|
+ partition_obj, "offset", RdKafka::Topic::OFFSET_INVALID);
|
|
+ if (offset != RdKafka::Topic::OFFSET_INVALID) {
|
|
+ part->set_offset(offset);
|
|
+ }
|
|
+
|
|
+ topic_partitions.push_back(part);
|
|
+ }
|
|
+ }
|
|
+
|
|
+ KafkaConsumer* consumer = ObjectWrap::Unwrap<KafkaConsumer>(info.This());
|
|
+
|
|
+ // Hand over the partitions to the consumer.
|
|
+ Baton b = consumer->IncrementalAssign(topic_partitions);
|
|
+
|
|
+ if (b.err() != RdKafka::ERR_NO_ERROR) {
|
|
+ Nan::ThrowError(RdKafka::err2str(b.err()).c_str());
|
|
+ }
|
|
+
|
|
+ info.GetReturnValue().Set(Nan::True());
|
|
+}
|
|
+
|
|
NAN_METHOD(KafkaConsumer::NodeUnassign) {
|
|
Nan::HandleScope scope;
|
|
|
|
@@ -779,6 +906,71 @@ NAN_METHOD(KafkaConsumer::NodeUnassign) {
|
|
info.GetReturnValue().Set(Nan::True());
|
|
}
|
|
|
|
+NAN_METHOD(KafkaConsumer::NodeIncrementalUnassign) {
|
|
+ Nan::HandleScope scope;
|
|
+
|
|
+ if (info.Length() < 1 || !info[0]->IsArray()) {
|
|
+ // Just throw an exception
|
|
+ return Nan::ThrowError("Need to specify an array of partitions");
|
|
+ }
|
|
+
|
|
+ v8::Local<v8::Array> partitions = info[0].As<v8::Array>();
|
|
+ std::vector<RdKafka::TopicPartition*> topic_partitions;
|
|
+
|
|
+ for (unsigned int i = 0; i < partitions->Length(); ++i) {
|
|
+ v8::Local<v8::Value> partition_obj_value;
|
|
+ if (!(
|
|
+ Nan::Get(partitions, i).ToLocal(&partition_obj_value) &&
|
|
+ partition_obj_value->IsObject())) {
|
|
+ Nan::ThrowError("Must pass topic-partition objects");
|
|
+ }
|
|
+
|
|
+ v8::Local<v8::Object> partition_obj = partition_obj_value.As<v8::Object>();
|
|
+
|
|
+ // Got the object
|
|
+ int64_t partition = GetParameter<int64_t>(partition_obj, "partition", -1);
|
|
+ std::string topic = GetParameter<std::string>(partition_obj, "topic", "");
|
|
+
|
|
+ if (!topic.empty()) {
|
|
+ RdKafka::TopicPartition* part;
|
|
+
|
|
+ if (partition < 0) {
|
|
+ part = Connection::GetPartition(topic);
|
|
+ } else {
|
|
+ part = Connection::GetPartition(topic, partition);
|
|
+ }
|
|
+
|
|
+ // Set the default value to offset invalid. If provided, we will not set
|
|
+ // the offset.
|
|
+ int64_t offset = GetParameter<int64_t>(
|
|
+ partition_obj, "offset", RdKafka::Topic::OFFSET_INVALID);
|
|
+ if (offset != RdKafka::Topic::OFFSET_INVALID) {
|
|
+ part->set_offset(offset);
|
|
+ }
|
|
+
|
|
+ topic_partitions.push_back(part);
|
|
+ }
|
|
+ }
|
|
+
|
|
+ KafkaConsumer* consumer = ObjectWrap::Unwrap<KafkaConsumer>(info.This());
|
|
+ // Hand over the partitions to the consumer.
|
|
+ Baton b = consumer->IncrementalUnassign(topic_partitions);
|
|
+
|
|
+ if (b.err() != RdKafka::ERR_NO_ERROR) {
|
|
+ Nan::ThrowError(RdKafka::err2str(b.err()).c_str());
|
|
+ }
|
|
+
|
|
+ info.GetReturnValue().Set(Nan::True());
|
|
+}
|
|
+
|
|
+NAN_METHOD(KafkaConsumer::NodeAssignmentLost) {
|
|
+ Nan::HandleScope scope;
|
|
+
|
|
+ KafkaConsumer* consumer = ObjectWrap::Unwrap<KafkaConsumer>(info.This());
|
|
+ bool b = consumer->AssignmentLost();
|
|
+ info.GetReturnValue().Set(Nan::New<v8::Boolean>(b));
|
|
+}
|
|
+
|
|
NAN_METHOD(KafkaConsumer::NodeUnsubscribe) {
|
|
Nan::HandleScope scope;
|
|
|
|
diff --git a/src/kafka-consumer.h b/src/kafka-consumer.h
|
|
index c91590ecc5d47c1d7a2a93c3e46b4b4657525df0..43e016db4ec47121051cb282f718a2b3156aacd4 100644
|
|
--- a/src/kafka-consumer.h
|
|
+++ b/src/kafka-consumer.h
|
|
@@ -72,7 +72,10 @@ class KafkaConsumer : public Connection {
|
|
int AssignedPartitionCount();
|
|
|
|
Baton Assign(std::vector<RdKafka::TopicPartition*>);
|
|
+ Baton IncrementalAssign(std::vector<RdKafka::TopicPartition*>);
|
|
Baton Unassign();
|
|
+ Baton IncrementalUnassign(std::vector<RdKafka::TopicPartition*>);
|
|
+ bool AssignmentLost();
|
|
|
|
Baton Seek(const RdKafka::TopicPartition &partition, int timeout_ms);
|
|
|
|
@@ -105,7 +108,10 @@ class KafkaConsumer : public Connection {
|
|
static NAN_METHOD(NodeSubscribe);
|
|
static NAN_METHOD(NodeDisconnect);
|
|
static NAN_METHOD(NodeAssign);
|
|
+ static NAN_METHOD(NodeIncrementalAssign);
|
|
static NAN_METHOD(NodeUnassign);
|
|
+ static NAN_METHOD(NodeIncrementalUnassign);
|
|
+ static NAN_METHOD(NodeAssignmentLost);
|
|
static NAN_METHOD(NodeAssignments);
|
|
static NAN_METHOD(NodeUnsubscribe);
|
|
static NAN_METHOD(NodeCommit);
|
|
diff --git a/test/consumer.spec.js b/test/consumer.spec.js
|
|
index 40b52ee4e1c718890f43b91adfb543319d5cc342..5e1a5655be0d2598163478aaaae936213c3bf27c 100644
|
|
--- a/test/consumer.spec.js
|
|
+++ b/test/consumer.spec.js
|
|
@@ -77,7 +77,7 @@ module.exports = {
|
|
});
|
|
},
|
|
'has necessary bindings for librdkafka 1:1 binding': function() {
|
|
- var methods = ['assign', 'unassign', 'subscribe'];
|
|
+ var methods = ['assign', 'unassign', 'subscribe', 'incrementalAssign', 'incrementalUnassign', 'assignmentLost'];
|
|
methods.forEach(function(m) {
|
|
t.equal(typeof(client[m]), 'function', 'Client is missing ' + m + ' method');
|
|
});
|
|
diff --git a/test/kafka-consumer.spec.js b/test/kafka-consumer.spec.js
|
|
index 0f4de520ed6b8a06dfe355e0bb9091273def98a5..ada72a7e621ea5433f194ab3d22eef326082c155 100644
|
|
--- a/test/kafka-consumer.spec.js
|
|
+++ b/test/kafka-consumer.spec.js
|
|
@@ -7,7 +7,8 @@
|
|
* of the MIT license. See the LICENSE.txt file for details.
|
|
*/
|
|
|
|
-var KafkaConsumer = require('../lib/kafka-consumer');
|
|
+var KafkaConsumer = require('../lib/kafka-consumer').KafkaConsumer;
|
|
+
|
|
var t = require('assert');
|
|
|
|
var client;
|
|
diff --git a/deps/librdkafka/src/rdkafka_partition.h b/deps/librdkafka/src/rdkafka_partition.h
|
|
index f9dd686423..aef704b95f 100644
|
|
--- a/deps/librdkafka/src/rdkafka_partition.h
|
|
+++ b/deps/librdkafka/src/rdkafka_partition.h
|
|
@@ -68,24 +68,35 @@ struct rd_kafka_toppar_err {
|
|
* last msg sequence */
|
|
};
|
|
|
|
-
|
|
+/**
|
|
+ * @brief Fetchpos comparator, only offset is compared.
|
|
+ */
|
|
+static RD_UNUSED RD_INLINE int
|
|
+rd_kafka_fetch_pos_cmp_offset(const rd_kafka_fetch_pos_t *a,
|
|
+ const rd_kafka_fetch_pos_t *b) {
|
|
+ if (a->offset < b->offset)
|
|
+ return -1;
|
|
+ else if (a->offset > b->offset)
|
|
+ return 1;
|
|
+ else
|
|
+ return 0;
|
|
+}
|
|
|
|
/**
|
|
* @brief Fetchpos comparator, leader epoch has precedence.
|
|
+ * iff both values are not null.
|
|
*/
|
|
static RD_UNUSED RD_INLINE int
|
|
rd_kafka_fetch_pos_cmp(const rd_kafka_fetch_pos_t *a,
|
|
const rd_kafka_fetch_pos_t *b) {
|
|
+ if (a->leader_epoch == -1 || b->leader_epoch == -1)
|
|
+ return rd_kafka_fetch_pos_cmp_offset(a, b);
|
|
if (a->leader_epoch < b->leader_epoch)
|
|
return -1;
|
|
else if (a->leader_epoch > b->leader_epoch)
|
|
return 1;
|
|
- else if (a->offset < b->offset)
|
|
- return -1;
|
|
- else if (a->offset > b->offset)
|
|
- return 1;
|
|
else
|
|
- return 0;
|
|
+ return rd_kafka_fetch_pos_cmp_offset(a, b);
|
|
}
|
|
|
|
|