Special shoutout to Will Benton and Erik Erlandson, their advice and knowledge of Spark’s internals helped me to fully grok these concepts. Check their stuff out!
Recently I have been doing some refactoring of the Spark/Kafka applications in the bones-brigade project. I started this project as a place to collect several application framework skeletons that I seem to be continually recreating while working on OpenShift and other container-based platforms. I wanted to ensure that the codebases were staying fresh and providing a solid foundation for useful construction, and to that end, I’ve been re-examining the state of the streaming applications because they were never truly in alignment across the Java/Python language divide.
This all started with a mis-step on my part. When I originally created the Java version of the Spark/Kafka application, I only had it reading from a Kafka topic and then printing out the messages it received. The Python version however, was doing something more useful; reading from a topic and then writing back to a second topic.
The reason I find the second version of this application more useful, namely reading then writing, is that it more closely approximates a style of software design known colloquially as a kappa architecture (as opposed to lambda architectures). I’m not going to dive into why I prefer this design other than to say that I find it makes building pipelines of processing applications simpler than having to deal with a database or other disk-like persistance store. (yes, I realize there is much to nitpick in that last statement, but bear with me here)
So, in general what I want my application skeletons to do is something like this:
Where the Spark component reads from Topic1
, does some processing, then
writes to Topic2
. In the skeletons there is no actual processing happening,
just a convenient placeholder to drop code.
A tale of distributed computing
Something that had been bugging me for awhile, and that had become more prominent
as I refactored the Java version, was that the Python version of this code used
Spark’s collect
method to gather all the messages in an
RDD
before rebroadcasting them to the second Kafka topic.
The API documentation for the collect
method
says:
Return a list that contains all of the elements in this RDD.
Ok, that sounds good, but there is a caveat:
Note: This method should only be used if the resulting array is expected to be small, as all the data is loaded into the driver’s memory.
What this means is that every RDD is transmitted back to the driver application, and in the case of my original code this made it easy to retransmit the messages. But it also meant that there could be extra network trips for any processing that was done as the driver would need to have all the results, it would be far more efficient if the executors could send the messages to Kafka directly.
The first version of the code looked like this:
def send_response(rdd):
"""A function to publish an RDD to a Kafka topic"""
producer = kafka.KafkaProducer(bootstrap_servers=self.servers)
for r in rdd.collect():
try:
record = r.encode('ascii', 'backslashreplace')
producer.send(self.output_topic, record)
except Exception as e:
print('Error sending collected RDD')
print('Original exception: {}'.format(e))
producer.flush()
messages = self.kafka_stream.map(lambda m: m[1])
messages.foreachRDD(send_response)
I wanted to remove that pesky collect
and I ended up with a piece of
trimmed code that looked, more or less, like this:
def send_response(r):
"""A function to publish an RDD to a Kafka topic"""
producer = kafka.KafkaProducer(bootstrap_servers=r.get('servers'))
try:
record = r.get('message').encode('ascii', 'backslashreplace')
producer.send(r.get('topic'), record)
except Exception as e:
print('Error sending collected RDD')
print('Original exception: {}'.format(e))
producer.flush()
messages.foreachRDD(lambda r: r.foreach(send_response))
This certainly looks cleaner and now the driver no longer needs to bring
all the information back before sending it out. But, this tradeoff
introduces a new issue: all the executors will need access to the kafka
package in order to make a KafkaProducer
. What this means in my little
cloud-orchestrated playground is that I will need to somehow get the
Python Kafka package to all my executor images. Which would require me
using some new features in the oshinko-s2i project or rebuilding my executor image to contain the
dependency.
A wild solution appears!
As I was considering how to proceed, I kept thinking about the
structured streaming interface
in Spark and specifically how it made doing these type of interactions really
easy. As I started to sketch out how the new code might look with structured
streaming, another light bulb went off for me; I could leverage the standard
--packages
argument to spark-submit
to gain access to the Kafka connector.
This means that I can use a standard Spark interface to distribute the
dependencies for me, no need to inject extra Python packages \o/
The code also became much simpler, allowing me to remove some of the scaffolding which was necessary for creating the distributed stream interface. The new code now looks like this:
records = (
spark
.readStream
.format('kafka')
.option('kafka.bootstrap.servers', args.brokers)
.option('subscribe', args.intopic)
.load()
.select(
functions.column('value').cast(types.StringType()).alias('value'))
)
writer = (
records
.writeStream
.format('kafka')
.option('kafka.bootstrap.servers', args.brokers)
.option('topic', args.outtopic)
.option('checkpointLocation', '/tmp')
.start()
)
This looks quite different from the previous version, but I think it makes the code easier to read. Additionally, the skeleton now gives easy access to the powerful structured streaming operations available in Spark.
Bottom line: structured streaming is way cool
Confession time. At heart I love a good low-level implementation of anything XD
I started my programming obsession as a youth doing assembly language and continued that into a career of C and embedded software and systems design. To this day I am fascinated by low-level approaches and set my learning bar accordingly, and although I originally started these applications with the intent of using the low-level RDD interface for distributed streams, it became clear over time that all the advice my colleagues kept giving me about structured streaming plus the side-effect bonuses on dependencies were just too much to ignore any longer.
This shift to structured streaming has been merged into both the Python and Java versions of this skeleton, and I am working on getting a Scala version up also. I hope you found my experiences on this journey interesting, and if you need some simple cloud-native style applications please check out the whole bones-brigade collection. I plan to add more applications here as I come across them in my daily hackings, and of course I am always willing to consider contributions from anyone who is interested in these topics =)
as always, have fun and happy hacking!