小能豆

ERROR MicroBatchExecution: pyspark streaming consumer

python

I’m trying to do some very basic stream processing using PySpark (3.2.4) Structured Streaming, using Kafka as my data source. Just to get up and running, I’m attempting the really basic task of parsing a field changeType from my source messages and appending it out to the console. However, when I run my script I get an

pyspark.errors.exceptions.captured.StreamingQueryException.

See below for script and traceback:

pyspark consumer

from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType

if __name__ == "__main__":
    bootstrapServers = "localhost:9092"
    subscribeType = "subscribe"
    topics = "test"

    spark = SparkSession.builder.appName("StructuredKafkaWordCount").getOrCreate()
    spark.sparkContext.setLogLevel("ERROR")

    lines = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", bootstrapServers) \
        .option(subscribeType, topics) \
        .load()

    # Define the expected schema
    expected_schema = StructType([
        StructField("timestamp", StringType(), True),
        StructField("message", StringType(), True),
        StructField("iss_position", StructType([
            StructField("longitude", StringType(), True),
            StructField("latitude", StringType(), True)
        ]), True)
    ])

    # Use from_json with the expected schema
    value_df = lines.select(from_json(col("value").cast("string"), expected_schema).alias("data"))

    query = value_df \
        .select("data.*") \
        .writeStream \
        .outputMode('append') \
        .format('console') \
        .start()

    query.awaitTermination()

this the kafka producer

import json 
import requests
from kafka import KafkaProducer
from time import sleep

producer=KafkaProducer(bootstrap_servers=['localhost:9092'],
                       value_serializer=lambda x: json.dumps(x).encode('utf-8')
                       )

for i in range(50):
    res=requests.get('http://api.open-notify.org/iss-now.json')
    data=json.loads(res.content.decode('utf-8'))
    print(data)
    producer.send("test", value=data)
    sleep(5)
    producer.flush()

output of kafka producer {‘timestamp’: 1703220954, ‘message’: ‘success’, ‘iss_position’: {‘longitude’: ‘-30.5346’, ‘latitude’: ‘19.9839’}} {‘timestamp’: 1703220960, ‘message’: ‘success’, ‘iss_position’: {‘longitude’: ‘-30.3066’, ‘latitude’: ‘20.2516’}} {‘timestamp’: 1703220965, ‘message’: ‘success’, ‘iss_position’: {‘longitude’: ‘-30.0774’, ‘latitude’: ‘20.5195’}}

error pyspark.errors.exceptions.captured.StreamingQueryException: [STREAM_FAILED] Query [id = 27d6712d-45fc-4c32-ba01-bda504ea5e46, runId = dbb59a5f-693e-4616-8126-2dc9139ac11e] terminated with exception: ‘boolean org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(java.lang.String, int)’

‘boolean org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(java.lang.String, int)’

I’ve checked all my environment variables and still encountered the error.


阅读 88

收藏
2023-12-22

共1个答案

小能豆

The error you are encountering ('boolean org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(java.lang.String, int)') seems to be related to Hadoop native libraries, and it’s common when running Spark on Windows. PySpark on Windows might have some limitations, and certain features, especially those related to native libraries, may not work as expected.

Here are a few things you can try to resolve the issue:

  1. Check Environment Variables: Ensure that your HADOOP_HOME and HADOOP_HOME/bin are correctly set in your system environment variables.

  2. Use a Linux Environment: If possible, consider running your PySpark application on a Linux environment. Spark is typically better supported on Linux, and you might encounter fewer issues.

  3. Use a Prebuilt Spark Binaries for Windows: There are some prebuilt Spark binaries for Windows available that might help in avoiding certain issues. You can find them on the official Spark download page.

  4. Update Spark and Hadoop Versions: Make sure you are using the latest version of Spark and Hadoop that is compatible with each other. Sometimes, upgrading Spark and Hadoop can resolve certain issues.

  5. Check Spark Configuration: Check your spark-defaults.conf and spark-env.cmd files for any configurations related to Hadoop native libraries. Ensure they are set correctly.

  6. Use Spark in Local Mode: When running Spark on Windows, try running it in local mode for development purposes. This can be done by setting the master to local:

config("spark.master", "local[*]")

This way, you won’t be dealing with Hadoop native libraries on Windows.

  1. Run Spark in Docker: Another option is to run Spark inside a Docker container. This provides a more consistent environment across platforms.

Remember that Windows support for Spark and Hadoop might have some limitations, and for production use, running on a Linux environment is often recommended. If you continue to face issues on Windows, consider setting up a virtual machine or using Docker with a Linux image to run your Spark applications.

2023-12-22