I’m trying to do some very basic stream processing using PySpark (3.2.4) Structured Streaming, using Kafka as my data source. Just to get up and running, I’m attempting the really basic task of parsing a field changeType from my source messages and appending it out to the console. However, when I run my script I get an
pyspark.errors.exceptions.captured.StreamingQueryException.
See below for script and traceback:
pyspark consumer
from pyspark.sql import SparkSession from pyspark.sql.functions import from_json, col from pyspark.sql.types import StructType, StructField, StringType if __name__ == "__main__": bootstrapServers = "localhost:9092" subscribeType = "subscribe" topics = "test" spark = SparkSession.builder.appName("StructuredKafkaWordCount").getOrCreate() spark.sparkContext.setLogLevel("ERROR") lines = spark \ .readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", bootstrapServers) \ .option(subscribeType, topics) \ .load() # Define the expected schema expected_schema = StructType([ StructField("timestamp", StringType(), True), StructField("message", StringType(), True), StructField("iss_position", StructType([ StructField("longitude", StringType(), True), StructField("latitude", StringType(), True) ]), True) ]) # Use from_json with the expected schema value_df = lines.select(from_json(col("value").cast("string"), expected_schema).alias("data")) query = value_df \ .select("data.*") \ .writeStream \ .outputMode('append') \ .format('console') \ .start() query.awaitTermination()
this the kafka producer
import json import requests from kafka import KafkaProducer from time import sleep producer=KafkaProducer(bootstrap_servers=['localhost:9092'], value_serializer=lambda x: json.dumps(x).encode('utf-8') ) for i in range(50): res=requests.get('http://api.open-notify.org/iss-now.json') data=json.loads(res.content.decode('utf-8')) print(data) producer.send("test", value=data) sleep(5) producer.flush()
output of kafka producer {‘timestamp’: 1703220954, ‘message’: ‘success’, ‘iss_position’: {‘longitude’: ‘-30.5346’, ‘latitude’: ‘19.9839’}} {‘timestamp’: 1703220960, ‘message’: ‘success’, ‘iss_position’: {‘longitude’: ‘-30.3066’, ‘latitude’: ‘20.2516’}} {‘timestamp’: 1703220965, ‘message’: ‘success’, ‘iss_position’: {‘longitude’: ‘-30.0774’, ‘latitude’: ‘20.5195’}}
error pyspark.errors.exceptions.captured.StreamingQueryException: [STREAM_FAILED] Query [id = 27d6712d-45fc-4c32-ba01-bda504ea5e46, runId = dbb59a5f-693e-4616-8126-2dc9139ac11e] terminated with exception: ‘boolean org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(java.lang.String, int)’
‘boolean org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(java.lang.String, int)’
I’ve checked all my environment variables and still encountered the error.
The error you are encountering ('boolean org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(java.lang.String, int)') seems to be related to Hadoop native libraries, and it’s common when running Spark on Windows. PySpark on Windows might have some limitations, and certain features, especially those related to native libraries, may not work as expected.
'boolean org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(java.lang.String, int)'
Here are a few things you can try to resolve the issue:
Check Environment Variables: Ensure that your HADOOP_HOME and HADOOP_HOME/bin are correctly set in your system environment variables.
HADOOP_HOME
HADOOP_HOME/bin
Use a Linux Environment: If possible, consider running your PySpark application on a Linux environment. Spark is typically better supported on Linux, and you might encounter fewer issues.
Use a Prebuilt Spark Binaries for Windows: There are some prebuilt Spark binaries for Windows available that might help in avoiding certain issues. You can find them on the official Spark download page.
Update Spark and Hadoop Versions: Make sure you are using the latest version of Spark and Hadoop that is compatible with each other. Sometimes, upgrading Spark and Hadoop can resolve certain issues.
Check Spark Configuration: Check your spark-defaults.conf and spark-env.cmd files for any configurations related to Hadoop native libraries. Ensure they are set correctly.
spark-defaults.conf
spark-env.cmd
Use Spark in Local Mode: When running Spark on Windows, try running it in local mode for development purposes. This can be done by setting the master to local:
local
config("spark.master", "local[*]")
This way, you won’t be dealing with Hadoop native libraries on Windows.
Remember that Windows support for Spark and Hadoop might have some limitations, and for production use, running on a Linux environment is often recommended. If you continue to face issues on Windows, consider setting up a virtual machine or using Docker with a Linux image to run your Spark applications.