Почему при выполнении приложения структурированной потоковой передачи происходит сбой с ошибкой «Не удалось найти источник данных: kafka»?

Я пытаюсь подключить структурированную потоковую передачу Spark к kafka, и она выдает следующую ошибку:

Исключение в потоке «основной» java.lang.ClassNotFoundException: не удалось найти источник данных: kafka. Пожалуйста, найдите пакеты в ...

введите здесь описание изображения

На основании документации я добавил требуемые зависимости

и мои серверы kafka и zookeeper работают. Не уверен, в чем проблема. Кроме того, я использую его таким образом

import spark.implicits._
val feedback =spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:2181").option("subscribe", "kafka_input_topic")
      .load().as[InputMessage].filter(_.lang.equals("en"))

Любая помощь приветствуется. Спасибо


person Rahul Kumar    schedule 11.02.2018    source источник
comment
Вы добавили "org.apache.spark" % "spark-sql-kafka-0-10_2.11" % SPARK_VERSION в свой список зависимостей? (SPARK_VERSION — это заполнитель для вашей правильной версии искры).   -  person Yuval Itzchakov    schedule 11.02.2018
comment
да. @YuvalItzchakov <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql-kafka-0-10_2.11</artifactId> <version>2.2.0</version> <scope>provided</scope> </dependency>   -  person Rahul Kumar    schedule 11.02.2018


Ответы (4)


Проблема, как вы упомянули в своих комментариях, заключается в следующем:

<scope>provided</scope>

Удалите область provided для sql-kafka, так как она не предоставляется при установке Spark.

person Yuval Itzchakov    schedule 11.02.2018
comment
все та же ошибка - person Rahul Kumar; 11.02.2018
comment
Вы правильно обновили свои зависимости? Вы собираете uber jar? - person Yuval Itzchakov; 11.02.2018
comment
да. это uber jar .. но ошибка не появляется, когда я добавляю это в spark submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0 - person Rahul Kumar; 11.02.2018
comment
@RahulKumar Это означает, что что-то в вашей упаковке не обновилось должным образом, и она все еще кэширует версию с provided. - person Yuval Itzchakov; 11.02.2018
comment
@RahulKumar проверьте эту тему stackoverflow.com/questions/48011941/ - person Kleyson Rios; 12.02.2018

Проблема в том, что необходимая банка не включена в CLASSPATH во время выполнения (не во время сборки).

На основании документации, на которую вы ссылаетесь. добавили необходимые зависимости в ваш файл определения сборки (pom.xml или build.sbt или build.gradle), но исключение возникает, когда вы пытаетесь запустить приложение после его сборки, не так ли?

Что вы пропустите, так это часть документации по развертыванию, т. е. Развертывание:

Как и в случае с любыми приложениями Spark, spark-submit используется для запуска вашего приложения. spark-sql-kafka-0-10_2.11 и его зависимости можно напрямую добавить в spark-submit с помощью --packages, например,

./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0 ..

Вы должны добавить этот --packages, иначе вам придется создать uber-jar, который сделает зависимой частью вашего файла jar.

person Jacek Laskowski    schedule 27.07.2018

Если вы используете maven, то следующий способ создания jar с зависимостями может решить вашу проблему.

Добавьте искровые зависимости, как показано ниже:

<dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.2.1</version>
        <scope>${spark.scope}</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
        <version>2.2.1</version>
    </dependency>

Затем настройте свои профили maven, как показано ниже:

<profiles>
    <profile>
        <id>default</id>
        <properties>
            <profile.id>dev</profile.id>
            <spark.scope>compile</spark.scope>
        </properties>
        <activation>
            <activeByDefault>true</activeByDefault>
        </activation>
    </profile>
    <profile>
        <id>test</id>
        <properties>
            <profile.id>test</profile.id>
            <spark.scope>provided</spark.scope>
        </properties>
    </profile>
    <profile>
        <id>online</id>
        <properties>
            <profile.id>online</profile.id>
            <spark.scope>provided</spark.scope>
        </properties>
    </profile>
</profiles>

Добавьте следующий плагин:

<plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <version>3.1.0</version>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id> <!-- this is used for inheritance merges -->
                    <phase>package</phase> <!-- bind to the packaging phase -->
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>

Затем создайте банку, используя mvn clean install -Ponline -DskipTests. Это должно решить вашу проблему

person Akhil Bojedla    schedule 12.03.2018

вы можете использовать источник данных kafka по полному имени (не псевдониму) следующим образом:

spark.readStream.format("org.apache.spark.sql.kafka010.KafkaSourceProvider").load
person Punk Monday    schedule 08.06.2018
comment
Это не работа для меня, я действительно надеялся на такой вариант - person Aaron Stainback; 30.06.2020