三、分类模型
3.1 继续任务5的步骤,假设Type 1为标签,将其进行labelencoder
from pyspark import SparkFiles
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
spark = SparkSession.builder.appName('pyspark').getOrCreate()
spark.sparkContext.addFile("https://cdn.coggle.club/Pokemon.csv")
path = "file://"+SparkFiles.get("Pokemon.csv")
df = spark.read.csv(path=path, header=True, inferSchema= True)
df = df.withColumnRenamed('Sp. Atk', 'SpAtk')
df = df.withColumnRenamed('Sp. Def', 'SpDef')
df = df.withColumnRenamed('Type 1', 'Type1')
df = df.withColumnRenamed('Type 2', 'Type2')
df = df.withColumn("Legendary", col("Legendary").cast('string'))
indexer = StringIndexer(inputCol="Type1", outputCol="Type1_idx")
df = indexer.fit(df).transform(df)
3.2 导入合适的标签评价指标,说出选择的原因?
3.3 选择至少3种分类方法,完成训练。
in_cols = ["Type2", "Generation", "Legendary"]
out_cols = ["Type2_idx", "Generation_idx", "Legendary_idx"]
indexer = StringIndexer(inputCols=in_cols, outputCols=out_cols, handleInvalid="skip")
df = indexer.fit(df).transform(df)
columns_to_scale = ["Total", "HP", "Attack", "Defense", "SpAtk", "SpDef", "Speed"]
assemblers, scalers = list(), list()
for col in columns_to_scale:
vec = VectorAssembler(inputCols=[col], outputCol=col + "_vec")
assemblers.append(vec)
sc = MinMaxScaler(inputCol=col + "_vec", outputCol=col + "_scl")
scalers.append(sc)
pipeline = Pipeline(stages=assemblers + scalers)
df = pipeline.fit(df).transform(df)
cols = ["Type2_idx", "Generation_idx", "Legendary_idx",
"Total_scl", "HP_scl", "Attack_scl", "Defense_scl", "SpAtk_scl", "SpDef_scl", "Speed_scl"]
assembler = VectorAssembler(inputCols=cols, outputCol="feature")
df = assembler.transform(df)
train, test = df.randomSplit(weights=[0.8, 0.2], seed=42)
evaluator = MulticlassClassificationEvaluator(
labelCol="Type1_idx",
predictionCol="prediction",
metricName="accuracy")
models = {
"Decision Tree": DecisionTreeClassifier(labelCol="Type1_idx", featuresCol="feature", predictionCol="prediction"),
"Random Forest": RandomForestClassifier(labelCol="Type1_idx", featuresCol="feature", predictionCol="prediction"),
"Naive Bayes": NaiveBayes(labelCol="Type1_idx", featuresCol="feature", predictionCol="prediction"),
}
for name, cls in models.items():
predictions = cls.fit(train).transform(test)
accuracy = evaluator.evaluate(predictions)
print("Accuracy of %s is %.4f" % (name, accuracy))
四、聚类模型
4.1 继续任务5的步骤,假设Type 1为标签,将其进行labelencoder
from pyspark import SparkFiles
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.clustering import KMeans
from pyspark.sql.types import DoubleType
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
spark = SparkSession.builder.appName('pyspark').getOrCreate()
spark.sparkContext.addFile("https://cdn.coggle.club/Pokemon.csv")
path = "file://"+SparkFiles.get("Pokemon.csv")
df = spark.read.csv(path=path, header=True, inferSchema= True)
df = df.withColumnRenamed('Sp. Atk', 'SpAtk')
df = df.withColumnRenamed('Sp. Def', 'SpDef')
df = df.withColumnRenamed('Type 1', 'Type1')
df = df.withColumnRenamed('Type 2', 'Type2')
df = df.withColumn("Legendary", col("Legendary").cast('string'))
indexer = StringIndexer(inputCol="Type1", outputCol="Type1_idx")
df = indexer.fit(df).transform(df)
4.2 使用kmeans对宝可梦进行聚类,使用肘部法选择合适聚类个数。
in_cols = ["Type2", "Generation", "Legendary"]
out_cols = ["Type2_idx", "Generation_idx", "Legendary_idx"]
indexer = StringIndexer(inputCols=in_cols, outputCols=out_cols, handleInvalid="skip")
df = indexer.fit(df).transform(df)
columns_to_scale = ["Total", "HP", "Attack", "Defense", "SpAtk", "SpDef", "Speed"]
assemblers, scalers = list(), list()
for col in columns_to_scale:
vec = VectorAssembler(inputCols=[col], outputCol=col + "_vec")
assemblers.append(vec)
sc = MinMaxScaler(inputCol=col + "_vec", outputCol=col + "_scl")
scalers.append(sc)
pipeline = Pipeline(stages=assemblers + scalers)
df = pipeline.fit(df).transform(df)
cols = ["Type2_idx", "Generation_idx", "Legendary_idx",
"Total_scl", "HP_scl", "Attack_scl", "Defense_scl", "SpAtk_scl", "SpDef_scl", "Speed_scl"]
assembler = VectorAssembler(inputCols=cols, outputCol="feature")
df = assembler.transform(df)
train, test = df.randomSplit(weights=[0.8, 0.2], seed=42)
evaluator = MulticlassClassificationEvaluator(
labelCol="Type1_idx",
predictionCol="prediction",
metricName="accuracy")
num_of_type1 = df.select("Type1").distinct().count()
for k in range(2, num_of_type1+1):
cluster = KMeans(featuresCol="feature", predictionCol="prediction", k=k, seed=42)
model = cluster.fit(train)
prediction = model.transform(test)
prediction = prediction.withColumn("prediction", prediction.prediction.cast(DoubleType()))
cost = model.summary.trainingCost
accuracy = evaluator.evaluate(prediction)
print("Accuracy of k=%d is %.4f, with cost is %.4f" % (k, accuracy, cost))