java - Error executing PySpark script on Google dataproc -


i trying execute pyspark script trying consume data kafka spark streaming through directstream api.

following code

import os   import py4j import pyspark os.environ['pyspark_submit_args'] = '--packages org.apache.spark:spark-streaming-kafka-0-8-assembly_2.11-2.2.0 pyspark-shell'   pyspark import sparkcontext   pyspark.streaming import streamingcontext   pyspark.streaming.kafka import kafkautils   import json     sc = sparkcontext(appname="pythonsparkstreamingkafka") ssc = streamingcontext(sc, 2) kafkastream = kafkautils.createdirectstream(ssc, ['test1'], {"metadata.broker.list": '<ip-address>:2181','auto.offset.reset': 'smallest'}) #parsed = kafka_stream.map(lambda (k, v): json.loads(v)) offsetranges = [{'topic':'test1','partition':0,'fromoffset':1,'untiloffset':6},] def storeoffsetranges(rdd):     global offsetranges     offsetranges = rdd.offsetranges()     return rdd  def printoffsetranges(rdd):     o in offsetranges:         print "%s %s %s %s" % (o.topic, o.partition, o.fromoffset, o.untiloffset)  kafkastream.transform(storeoffsetranges).foreachrdd(printoffsetranges) ssc.start() ssc.awaittermination() 

17/09/11 11:45:44 info org.spark_project.jetty.util.log: logging initialized @2247ms 17/09/11 11:45:45 info org.spark_project.jetty.server.server: jetty-9.3.z-snapshot 17/09/11 11:45:45 info org.spark_project.jetty.server.server: started @2324ms 17/09/11 11:45:45 info org.spark_project.jetty.server.abstractconnector: started serverconnector@2d4587ca{http/1.1,[http/1.1]}{0.0.0.0:4040} 17/09/11 11:45:45 info com.google.cloud.hadoop.fs.gcs.googlehadoopfilesystembase: ghfs version: 1.6.1-hadoop2 17/09/11 11:45:46 info org.apache.hadoop.yarn.client.rmproxy: connecting resourcemanager @ dataproc-m/10.148.0.2:8032 17/09/11 11:45:49 info org.apache.hadoop.yarn.client.api.impl.yarnclientimpl: submitted application application_1505106215465_0026 17/09/11 11:45:53 info kafka.utils.verifiableproperties: verifying properties 17/09/11 11:45:53 info kafka.utils.verifiableproperties: property auto.offset.reset overridden smallest 17/09/11 11:45:53 info kafka.utils.verifiableproperties: property group.id overridden 17/09/11 11:45:53 info kafka.utils.verifiableproperties: property zookeeper.connect overridden 17/09/11 11:45:54 info kafka.consumer.simpleconsumer: reconnect due socket error: java.io.eofexception: received -1 when reading channel, socket has been closed. traceback (most recent call last): file "/tmp/7ac4b0c2-4b7a-4be9-97d1-c12efc9b7548/test_p.py", line 19, in kafkastream = kafkautils.createdirectstream(ssc, ['test1'], {"metadata.broker.list": ':2181','auto.offset.reset': 'smallest'}) file "/usr/lib/spark/python/lib/pyspark.zip/pyspark/streaming/kafka.py", line 130, in createdirectstream file "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in call file "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value py4j.protocol.py4jjavaerror: error occurred while calling o47.createdirectstreamwithoutmessagehandler. : org.apache.spark.sparkexception: java.io.eofexception: received -1 when reading channel, socket has been closed. @ org.apache.spark.streaming.kafka.kafkacluster$$anonfun$checkerrors$1.apply(kafkacluster.scala:385) @ org.apache.spark.streaming.kafka.kafkacluster$$anonfun$checkerrors$1.apply(kafkacluster.scala:385) @ scala.util.either.fold(either.scala:98) @ org.apache.spark.streaming.kafka.kafkacluster$.checkerrors(kafkacluster.scala:384) @ org.apache.spark.streaming.kafka.kafkautils$.getfromoffsets(kafkautils.scala:222) @ org.apache.spark.streaming.kafka.kafkautilspythonhelper.createdirectstream(kafkautils.scala:720) @ org.apache.spark.streaming.kafka.kafkautilspythonhelper.createdirectstreamwithoutmessagehandler(kafkautils.scala:688) @ sun.reflect.nativemethodaccessorimpl.invoke0(native method) @ sun.reflect.nativemethodaccessorimpl.invoke(nativemethodaccessorimpl.java:62) @ sun.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodaccessorimpl.java:43) @ java.lang.reflect.method.invoke(method.java:498) @ py4j.reflection.methodinvoker.invoke(methodinvoker.java:244) @ py4j.reflection.reflectionengine.invoke(reflectionengine.java:357) @ py4j.gateway.invoke(gateway.java:280) @ py4j.commands.abstractcommand.invokemethod(abstractcommand.java:132) @ py4j.commands.callcommand.execute(callcommand.java:79) @ py4j.gatewayconnection.run(gatewayconnection.java:214) @ java.lang.thread.run(thread.java:748)

17/09/11 11:45:54 info org.spark_project.jetty.server.abstractconnector: stopped spark@2d4587ca{http/1.1,[http/1.1]}{0.0.0.0:4040}

let me know if able identify issue of whether jar issue or kafka connection issue? note:- removed ip address of broker security reasons


Comments

Popular posts from this blog

Sort a complex associative array in PHP -

vb.net - How to ignore if a cell is empty nothing -

recursion - Can every recursive algorithm be improved with dynamic programming? -