Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
1.3k views
in Technique[技术] by (71.8m points)

apache spark - How to save dataframe to Elasticsearch in PySpark?

I have a spark dataframe that I am trying to push to AWS Elasticsearch, but before that I was testing this sample code snippet to push to ES,

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('ES_indexer').getOrCreate()
df = spark.createDataFrame([{'num': i} for i in xrange(10)])
df = df.drop('_id')
df.write.format(
    'org.elasticsearch.spark.sql'
).option(
    'es.nodes', 'http://spark-data-push-adertadaltdpioy124.us-west-2.es.amazonaws.com'
).option(
    'es.port', 9200
).option(
    'es.resource', '%s/%s' % ('index_name', 'doc_type_name'),
).save()

I get an error saying,

java.lang.ClassNotFoundException: Failed to find data source: org.elasticsearch.spark.sql. Please find packages at http://spark.apache.org/third-party-projects.html

Any suggestions would be greatly appreciated.

Error Trace:

Traceback (most recent call last):
  File "es_3.py", line 12, in <module>
    'es.resource', '%s/%s' % ('index_name', 'doc_type_name'),
  File "/usr/local/lib/python2.7/site-packages/pyspark/sql/readwriter.py", line 732, in save
    self._jwrite.save()
  File "/usr/local/lib/python2.7/site-packages/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/local/lib/python2.7/site-packages/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/usr/local/lib/python2.7/site-packages/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o46.save.
: java.lang.ClassNotFoundException: Failed to find data source: org.elasticsearch.spark.sql. Please find packages at http://spark.apache.org/third-party-projects.html
        at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:657)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:245)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.elasticsearch.spark.sql.DefaultSource
        at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20$$anonfun$apply$12.apply(DataSource.scala:634)
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20$$anonfun$apply$12.apply(DataSource.scala:634)
        at scala.util.Try$.apply(Try.scala:192)
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20.apply(DataSource.scala:634)
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20.apply(DataSource.scala:634)
        at scala.util.Try.orElse(Try.scala:84)
        at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:634)
        ... 12 more
See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

tl;dr Use pyspark --packages org.elasticsearch:elasticsearch-hadoop:7.2.0 and use format("es") to reference the connector.


Quoting Installation from the official documentation of the Elasticsearch for Apache Hadoop product:

Just like other libraries, elasticsearch-hadoop needs to be available in Spark’s classpath.

And later in Supported Spark SQL versions:

elasticsearch-hadoop supports both version Spark SQL 1.3-1.6 and Spark SQL 2.0 through two different jars: elasticsearch-spark-1.x-<version>.jar and elasticsearch-hadoop-<version>.jar

elasticsearch-spark-2.0-<version>.jar supports Spark SQL 2.0

That looks like an issue with the document (as they use two different versions of the jar file), but does mean that you have to use the proper jar file on the CLASSPATH of your Spark application.

And later in the same document:

Spark SQL support is available under org.elasticsearch.spark.sql package.

That simply says that the format (in df.write.format('org.elasticsearch.spark.sql')) is correct.

Further down the document you can find that you could even use an alias df.write.format("es") (!)

I found Apache Spark section in the project's repository on GitHub more readable and current.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...