That's a great question. Many of our customers use Xcalar to stream massive amounts of data from Kafka. In fact, we are going to publish a KB article very soon on this, but I will not keep you waiting. Some quick tips fresh from the oven. 
Assume you have a 4-node Xcalar cluster each containing 8 cores. To keep all 8 cores active, you need to run 8 Kafka Consumers concurrently on each node. This will allow a total of 32 Consumers to consume data concurrently from a single Kafka topic. We use the consumer group, demo.
Make sure you have installed kafka-python 1.3.3 on all 4 nodes of your Xcalar cluster.
We recommend your Kafka topic, say, kb has 32 partitions.
Here is an example Kafka Consumer Import UDF. On similar lines you can write an export UDF. Let me know if you need to know how to implement an export UDF.
from kafka import KafkaConsumer, TopicPartition
import optparse
import os
import time
# Standard Xcalar logging
try:
from udflib import logger
except:
import sys
XLRDIR = os.getenv('XLRDIR','/opt/xcalar')
sys.path.append(XLRDIR + '/scripts/')
from udflib import logger
try:
import xcalar
except:
pass
partition = 0
# Create a consumer, and consume pendingEvents from a Kafka topic partition # each time this UDF is invoked. Close the consumer when done.
def consume(partFile, ins):
global partition
global header
eventNo = 0
groupConsumer,pendingEvents = getConsumer(partFile)
if pendingEvents == 0:
yield {"status" : "Found no events in kafka {}".format(partFile)}
else:
for message in groupConsumer:
kafkaMessage = message.value
eventNo += 1
if eventNo > pendingEvents:
break
yield {"query" : message.value}
groupConsumer.close()
# Create a consumer, determine the topic and partition based on the
# partFile. Identify the most recent consumer offset for this groupId.
def getConsumer(partFile):
hosts = ["khost:9092"] # VERIFY HOST
groupId = "demo"
topic = "kb"
timeout = 2000
recvbuf = 3*32768
global partition
basen = os.path.basename(partFile)
topic = os.path.basename(os.path.dirname(partFile))
try:
partition = int(''.join(ele for ele in basen if ele.isdigit()))
except:
partition = 0
logger.warn("Unexpected partFile name {}. Reading from only one partition, and defaulting to topic {}".format(partFile, topic))
logger.info("Reading partition {} based on partFile {}".format(partition, partFile))
# Determine the highest offset in Kafka
consumer = KafkaConsumer(consumer_timeout_ms=timeout, receive_buffer_bytes=recvbuf, bootstrap_servers=hosts, auto_commit_interval_ms=100)
tp = TopicPartition(topic, partition)
consumer.assign([tp])
maxOffset = consumer.position(tp)
consumer.close()
if maxOffset is None:
maxOffset = 0
# Determine the offset read so far for this consumer group
groupConsumer = KafkaConsumer(group_id=groupId, consumer_timeout_ms=timeout, receive_buffer_bytes=recvbuf, bootstrap_servers=hosts, auto_commit_interval_ms=100)
groupConsumer.assign([tp])
minOffset = groupConsumer.committed(tp)
if minOffset is None:
minOffset = 0
# Compute pending events using difference in offsets
pendingEvents = maxOffset - minOffset
logger.info("Assigned partition {} for partFile {}, maxOffset: {}, minOffset: {}".format([tp], partFile, maxOffset, minOffset))
# Explicitly seek to the consumer group offset
groupConsumer.seek(tp, minOffset)
return groupConsumer, pendingEvents