[ad_1]
Having set-up our Postgres database, let’s delve into the small print of the spark job. The purpose is to stream the information from the Kafka matter rappel_conso to the Postgres desk rappel_conso_table.
from pyspark.sql import SparkSession
from pyspark.sql.varieties import (
StructType,
StructField,
StringType,
)
from pyspark.sql.capabilities import from_json, col
from src.constants import POSTGRES_URL, POSTGRES_PROPERTIES, DB_FIELDS
import logginglogging.basicConfig(
degree=logging.INFO, format="%(asctime)s:%(funcName)s:%(levelname)s:%(message)s"
)
def create_spark_session() -> SparkSession:
spark = (
SparkSession.builder.appName("PostgreSQL Reference to PySpark")
.config(
"spark.jars.packages",
"org.postgresql:postgresql:42.5.4,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0",
)
.getOrCreate()
)
logging.data("Spark session created efficiently")
return spark
def create_initial_dataframe(spark_session):
"""
Reads the streaming information and creates the preliminary dataframe accordingly.
"""
attempt:
# Will get the streaming information from matter random_names
df = (
spark_session.readStream.format("kafka")
.possibility("kafka.bootstrap.servers", "kafka:9092")
.possibility("subscribe", "rappel_conso")
.possibility("startingOffsets", "earliest")
.load()
)
logging.data("Preliminary dataframe created efficiently")
besides Exception as e:
logging.warning(f"Preliminary dataframe could not be created attributable to exception: {e}")
increase
return df
def create_final_dataframe(df):
"""
Modifies the preliminary dataframe, and creates the ultimate dataframe.
"""
schema = StructType(
[StructField(field_name, StringType(), True) for field_name in DB_FIELDS]
)
df_out = (
df.selectExpr("CAST(worth AS STRING)")
.choose(from_json(col("worth"), schema).alias("information"))
.choose("information.*")
)
return df_out
def start_streaming(df_parsed, spark):
"""
Begins the streaming to desk spark_streaming.rappel_conso in postgres
"""
# Learn current information from PostgreSQL
existing_data_df = spark.learn.jdbc(
POSTGRES_URL, "rappel_conso", properties=POSTGRES_PROPERTIES
)
unique_column = "reference_fiche"
logging.data("Begin streaming ...")
question = df_parsed.writeStream.foreachBatch(
lambda batch_df, _: (
batch_df.be part of(
existing_data_df, batch_df[unique_column] == existing_data_df[unique_column], "leftanti"
)
.write.jdbc(
POSTGRES_URL, "rappel_conso", "append", properties=POSTGRES_PROPERTIES
)
)
).set off(as soon as=True)
.begin()
return question.awaitTermination()
def write_to_postgres():
spark = create_spark_session()
df = create_initial_dataframe(spark)
df_final = create_final_dataframe(df)
start_streaming(df_final, spark=spark)
if __name__ == "__main__":
write_to_postgres()
Let’s break down the important thing highlights and functionalities of the spark job:
- First we create the Spark session
def create_spark_session() -> SparkSession:
spark = (
SparkSession.builder.appName("PostgreSQL Reference to PySpark")
.config(
"spark.jars.packages",
"org.postgresql:postgresql:42.5.4,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0",)
.getOrCreate()
)
logging.data("Spark session created efficiently")
return spark
2. The create_initial_dataframe
operate ingests streaming information from the Kafka matter utilizing Spark’s structured streaming.
def create_initial_dataframe(spark_session):
"""
Reads the streaming information and creates the preliminary dataframe accordingly.
"""
attempt:
# Will get the streaming information from matter random_names
df = (
spark_session.readStream.format("kafka")
.possibility("kafka.bootstrap.servers", "kafka:9092")
.possibility("subscribe", "rappel_conso")
.possibility("startingOffsets", "earliest")
.load()
)
logging.data("Preliminary dataframe created efficiently")
besides Exception as e:
logging.warning(f"Preliminary dataframe could not be created attributable to exception: {e}")
increasereturn df
3. As soon as the information is ingested, create_final_dataframe
transforms it. It applies a schema (outlined by the columns DB_FIELDS) to the incoming JSON information, guaranteeing that the information is structured and prepared for additional processing.
def create_final_dataframe(df):
"""
Modifies the preliminary dataframe, and creates the ultimate dataframe.
"""
schema = StructType(
[StructField(field_name, StringType(), True) for field_name in DB_FIELDS]
)
df_out = (
df.selectExpr("CAST(worth AS STRING)")
.choose(from_json(col("worth"), schema).alias("information"))
.choose("information.*")
)
return df_out
4. The start_streaming
operate reads current information from the database, compares it with the incoming stream, and appends new data.
def start_streaming(df_parsed, spark):
"""
Begins the streaming to desk spark_streaming.rappel_conso in postgres
"""
# Learn current information from PostgreSQL
existing_data_df = spark.learn.jdbc(
POSTGRES_URL, "rappel_conso", properties=POSTGRES_PROPERTIES
)unique_column = "reference_fiche"
logging.data("Begin streaming ...")
question = df_parsed.writeStream.foreachBatch(
lambda batch_df, _: (
batch_df.be part of(
existing_data_df, batch_df[unique_column] == existing_data_df[unique_column], "leftanti"
)
.write.jdbc(
POSTGRES_URL, "rappel_conso", "append", properties=POSTGRES_PROPERTIES
)
)
).set off(as soon as=True)
.begin()
return question.awaitTermination()
The entire code for the Spark job is within the file src/spark_pgsql/spark_streaming.py
. We’ll use the Airflow DockerOperator to run this job, as defined within the upcoming part.
Let’s undergo the method of making the Docker picture we have to run our Spark job. Right here’s the Dockerfile for reference:
FROM bitnami/spark:newestWORKDIR /choose/bitnami/spark
RUN pip set up py4j
COPY ./src/spark_pgsql/spark_streaming.py ./spark_streaming.py
COPY ./src/constants.py ./src/constants.py
ENV POSTGRES_DOCKER_USER=host.docker.inside
ARG POSTGRES_PASSWORD
ENV POSTGRES_PASSWORD=$POSTGRES_PASSWORD
On this Dockerfile, we begin with the bitnami/spark
picture as our base. It is a ready-to-use Spark picture. We then set up py4j
, a software wanted for Spark to work with Python.
The atmosphere variables POSTGRES_DOCKER_USER
and POSTGRES_PASSWORD
are arrange for connecting to a PostgreSQL database. Since our database is on the host machine, we use host.docker.inside
because the consumer. This enables our Docker container to entry providers on the host, on this case, the PostgreSQL database. The password for PostgreSQL is handed as a construct argument, so it is not hard-coded into the picture.
It’s necessary to notice that this method, particularly passing the database password at construct time, won’t be safe for manufacturing environments. It might doubtlessly expose delicate info. In such circumstances, safer strategies like Docker BuildKit needs to be thought-about.
Now, let’s construct the Docker picture for Spark:
docker construct -f spark/Dockerfile -t rappel-conso/spark:newest --build-arg POSTGRES_PASSWORD=$POSTGRES_PASSWORD .
This command will construct the picture rappel-conso/spark:newest
. This picture contains all the pieces wanted to run our Spark job and might be utilized by Airflow’s DockerOperator to execute the job. Keep in mind to exchange $POSTGRES_PASSWORD
along with your precise PostgreSQL password when working this command.
[ad_2]