bigdata-ibecs/bigdata/spark.py
2025-06-23 14:15:50 +08:00

133 lines
4.2 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, year,count, to_date, from_unixtime, unix_timestamp
def initialize_spark():
"""初始化SparkSession"""
try:
spark = SparkSession.builder \
.appName("FinancialAnalysis") \
.config("spark.jars", "/path/to/mysql-connector-java-8.0.23.jar") \
.enableHiveSupport() \
.config("hive.metastore.uris", "thrift://192.168.229.122:9083") \
.getOrCreate()
return spark
except Exception as e:
raise
def analyze_bank_grade(spark, mysql_url, mysql_properties):
"""分析企业信用等级分布"""
dim_bank_grade = spark.sql("""
SELECT
bank_grade AS name,
COUNT(*) AS value
FROM
tb_banks
GROUP BY
bank_grade
""")
dim_bank_grade.write \
.jdbc(mysql_url, "dim_bank_grade", mode="overwrite", properties=mysql_properties)
def analyze_loan_status(spark, mysql_url, mysql_properties):
"""分析还款状态"""
dim_loan_status = spark.sql("""
SELECT
loan_status AS name,
COUNT(*) AS value
FROM
tb_loans
GROUP BY
loan_status
""")
dim_loan_status.write \
.jdbc(mysql_url, "dim_loan_status", mode="overwrite", properties=mysql_properties)
def analyze_loan_amount(spark, mysql_url, mysql_properties):
"""分析贷款金额变化趋势"""
dim_loan_amount = spark.sql("""
SELECT
loan_date AS date,
ROUND(SUM(loan_amount) / 10000, 2) AS amount
FROM
tb_loans
GROUP BY
loan_date
""")
dim_loan_amount.write \
.jdbc(mysql_url, "dim_loan_amount", mode="overwrite", properties=mysql_properties)
def analyze_repayment_status(spark, mysql_url, mysql_properties):
"""分析企业还款状态"""
dim_repayment_status = spark.sql("""
SELECT
repayment_date,
ROUND(SUM(CASE WHEN repayment_status = '按时还款' THEN repayment_amount ELSE 0 END), 2) AS on_time_amount,
ROUND(SUM(CASE WHEN repayment_status = '逾期还款' THEN repayment_amount ELSE 0 END), 2) AS overdue_amount,
ROUND(SUM(repayment_amount), 2) AS total_amount
FROM
tb_repayments
WHERE
repayment_status IN ('按时还款', '逾期还款')
GROUP BY
repayment_date
ORDER BY
repayment_date
""")
dim_repayment_status.write \
.jdbc(mysql_url, "dim_repayment_status", mode="overwrite", properties=mysql_properties)
def analyze_jie_count(spark, mysql_url, mysql_properties):
"""分析每年借款数量"""
tb_loans_df = spark.table("tb_loans")
dim_jie_count = tb_loans_df.withColumn(
"loan_date_parsed",
to_date(from_unixtime(unix_timestamp(col("loan_date"), "yyyy/MM/dd")))
).groupBy(
year("loan_date_parsed").alias("name")
).agg(
count("*").alias("value")
).orderBy("name")
dim_jie_count.write \
.jdbc(mysql_url, "dim_jie_count", mode="overwrite", properties=mysql_properties)
def main():
try:
# 初始化Spark
spark = initialize_spark()
# MySQL连接配置
mysql_properties = {
"user": "root",
"password": "123456",
"driver": "com.mysql.jdbc.Driver"
}
mysql_url = "jdbc:mysql://192.168.229.122:3306/bigdata_ibecs?useSSL=false"
# 执行各项分析任务
analyze_bank_grade(spark, mysql_url, mysql_properties)
analyze_loan_status(spark, mysql_url, mysql_properties)
analyze_loan_amount(spark, mysql_url, mysql_properties)
analyze_repayment_status(spark, mysql_url, mysql_properties)
analyze_jie_count(spark, mysql_url, mysql_properties)
finally:
# 关闭SparkSession
if 'spark' in locals():
spark.stop()
if __name__ == "__main__":
main()