Alink漫谈(八) : 二分类评估 AUC、K-S、PRC、Precision、Recall、LiftChart 如何实现

Alink漫谈(八) : 二分类评估 AUC、K-S、PRC、Precision、Recall、LiftChart 如何实现

0x00 摘要

Alink 是阿里巴巴基于实时计算引擎 Flink 研发的新一代机器学习算法平台,是业界首个同时支持批式算法、流式算法的机器学习平台。二分类评估是对二分类算法的预测结果进行效果评估。本文将剖析Alink中对应代码实现。

0x01 相关概念

如果对本文某些概念有疑惑,可以参见之前文章 [白话解析] 通过实例来梳理概念 :准确率 (Accuracy)、精准率(Precision)、召回率(Recall) 和 F值(F-Measure)

0x02 示例代码

public class EvalBinaryClassExample {

    AlgoOperator getData(boolean isBatch) {
        Row[] rows = new Row[]{
                Row.of("prefix1","{\"prefix1\": 0.9,\"prefix0\": 0.1}"),Row.of("prefix1","{\"prefix1\": 0.8,\"prefix0\": 0.2}"),"{\"prefix1\": 0.7,\"prefix0\": 0.3}"),Row.of("prefix0","{\"prefix1\": 0.75,\"prefix0\": 0.25}"),"{\"prefix1\": 0.6,\"prefix0\": 0.4}")
        };

        String[] schema = new String[]{"label","detailInput"};

        if (isBatch) {
            return new MemSourceBatchOp(rows,schema);
        } else {
            return new MemSourceStreamOp(rows,schema);
        }
    }

    public static void main(String[] args) throws Exception {
        EvalBinaryClassExample test = new EvalBinaryClassExample();
        BatchOperator batchData = (BatchOperator) test.getData(true);

        BinaryClassMetrics metrics = new EvalBinaryClassBatchOp()
                .setLabelCol("label")
                .setPredictionDetailCol("detailInput")
                .linkFrom(batchData)
                .collectMetrics();

        System.out.println("RocCurve:" + metrics.getRocCurve());
        System.out.println("AUC:" + metrics.getAuc());
        System.out.println("KS:" + metrics.getKs());
        System.out.println("PRC:" + metrics.getPrc());
        System.out.println("Accuracy:" + metrics.getAccuracy());
        System.out.println("Macro Precision:" + metrics.getMacroPrecision());
        System.out.println("Micro Recall:" + metrics.getMicroRecall());
        System.out.println("Weighted Sensitivity:" + metrics.getWeightedSensitivity());
    }
}

程序输出

RocCurve:([0.0,0.0,0.5,1.0,1.0],[0.0,0.3333333333333333,0.6666666666666666,1.0])
AUC:0.8333333333333333
KS:0.6666666666666666
PRC:0.9027777777777777
Accuracy:0.6
Macro Precision:0.3
Micro Recall:0.6
Weighted Sensitivity:0.6

在 Alink 中,二分类评估有批处理,流处理两种实现,下面一一为大家介绍( Alink 复杂之一在于大量精细的数据结构,所以下文会大量打印程序中变量以便大家理解)。

2.1 主要思路

  • 把 [0,1] 分成假设 100000个桶(bin)。所以得到positiveBin / negativeBin 两个100000的数组。

  • 根据输入给positiveBin / negativeBin赋值。positiveBin就是 TP + FP,negativeBin就是 TN + FN。这些是后续计算的基础。

  • 遍历bins中每一个有意义的点,计算出totalTrue和totalFalse,并且在每一个点上计算该点的混淆矩阵,tpr,以及rocCurve,recallPrecisionCurve,liftChart在该点对应的数据;

  • 依据曲线内容计算并且存储 AUC/PRC/KS

具体后续还有详细调用关系综述。

0x03 批处理

3.1 EvalBinaryClassBatchOp

EvalBinaryClassBatchOp是二分类评估的实现,功能是计算二分类的评估指标(evaluation metrics)。

输入有两种:

  • label column and predResult column
  • label column and predDetail column。如果有predDetail,则predResult被忽略

我们例子中 "prefix1" 就是 label,"{\"prefix1\": 0.9,\"prefix0\": 0.1}" 就是 predDetail

Row.of("prefix1",\"prefix0\": 0.1}")

具体类摘录如下:

public class EvalBinaryClassBatchOp extends BaseEvalClassBatchOp<EvalBinaryClassBatchOp> implements BinaryEvaluationParams <EvalBinaryClassBatchOp>,EvaluationMetricsCollector<BinaryClassMetrics> {
  
	@Override
	public BinaryClassMetrics collectMetrics() {
		return new BinaryClassMetrics(this.collect().get(0));
	}  
}

可以看到,其主要工作都是在基类BaseEvalClassBatchOp中完成,所以我们会首先看BaseEvalClassBatchOp。

3.2 BaseEvalClassBatchOp

我们还是从 linkFrom 函数入手,其主要是做了几件事:

  • 获取配置信息
  • 从输入中提取某些列:"label","detailInput"
  • calLabelPredDetailLocal会按照partition分别计算evaluation metrics
  • 综合reduce上述计算结果
  • SaveDataAsParams函数会把最终数值输入到 output table

具体代码如下

@Override
public T linkFrom(BatchOperator<?>... inputs) {
    BatchOperator<?> in = checkAndGetFirst(inputs);
    String labelColName = this.get(MultiEvaluationParams.LABEL_COL);
    String positiveValue = this.get(BinaryEvaluationParams.POS_LABEL_VAL_STR);

    // Judge the evaluation type from params.
    ClassificationEvaluationUtil.Type type = ClassificationEvaluationUtil.judgeEvaluationType(this.getParams());

    DataSet<BaseMetricsSummary> res;
    switch (type) {
        case PRED_DETAIL: {
            String predDetailColName = this.get(MultiEvaluationParams.PREDICTION_DETAIL_COL);
            // 从输入中提取某些列:"label","detailInput" 
            DataSet<Row> data = in.select(new String[] {labelColName,predDetailColName}).getDataSet();
            // 按照partition分别计算evaluation metrics
            res = calLabelPredDetailLocal(data,positiveValue,binary);
            break;
        }
        ......
    }

    // 综合reduce上述计算结果
    DataSet<BaseMetricsSummary> metrics = res
        .reduce(new EvaluationUtil.ReduceBaseMetrics());

    // 把最终数值输入到 output table
    this.setOutput(metrics.flatMap(new EvaluationUtil.SaveDataAsParams()),new String[] {DATA_OUTPUT},new TypeInformation[] {Types.STRING});

    return (T)this;
}

// 执行中一些变量如下
labelColName = "label"
predDetailColName = "detailInput"  
type = {ClassificationEvaluationUtil$Type@2532} "PRED_DETAIL"
binary = true
positiveValue = null  

3.2.0 调用关系综述

因为后续代码调用关系复杂,所以先给出一个调用关系

  • 从输入中提取某些列:"label","detailInput",in.select(new String[] {labelColName,predDetailColName}).getDataSet()。因为可能输入还有其他列,而只有某些列是我们计算需要的,所以只提取这些列。
  • 按照partition分别计算evaluation metrics,即调用 calLabelPredDetailLocal(data,binary);
    • flatMap会从label列和prediction列中,取出所有labels(注意是取出labels的名字 ),发送给下游算子。
    • reduceGroup主要功能是通过 buildLabelIndexLabelArray 去重 "labels名字",然后给每一个label一个ID,得到一个 <labels,ID>的map,最后返回是二元组(map,labels),即({prefix1=0,prefix0=1},[prefix1,prefix0])。从后文看,<labels,ID>Map看来是多分类才用到。二分类只用到了labels。
    • mapPartition 分区调用 CalLabelDetailLocal 来计算混淆矩阵,主要是分区调用getDetailStatistics,前文中得到的二元组(map,labels)会作为参数传递进来 。
      • getDetailStatistics 遍历 rows 数据,提取每一个item(比如 "prefix1,{"prefix1": 0.8,"prefix0": 0.2}"),然后通过updateBinaryMetricsSummary累积计算混淆矩阵所需数据。
        • updateBinaryMetricsSummary 把 [0,1] 分成假设 100000个桶(bin)。所以得到positiveBin / negativeBin 两个100000的数组。positiveBin就是 TP + FP,negativeBin就是 TN + FN。
          • 如果某个 sample 为 正例 (positive value) 的概率是 p,则该 sample 对应的 bin index 就是 p * 100000。如果 p 被预测为正例 (positive value) ,则positiveBin[index]++,
          • 否则就是被预测为负例(negative value) ,则negativeBin[index]++。
  • 综合reduce上述计算结果,metrics = res.reduce(new EvaluationUtil.ReduceBaseMetrics());
    • 具体计算是在BinaryMetricsSummary.merge,其作用就是Merge the bins,and add the logLoss。
  • 把最终数值输入到 output table,setOutput(metrics.flatMap(new EvaluationUtil.SaveDataAsParams()..);
    • 归并所有BaseMetrics后,得到total BaseMetrics,计算indexes存入params。collector.collect(t.toMetrics().serialize());
      • 实际业务在BinaryMetricsSummary.toMetrics,即基于bin的信息计算,然后存储到params。
        • extractMatrixThreCurve函数取出非空的bins,据此计算出ConfusionMatrix array(混淆矩阵),threshold array,rocCurve/recallPrecisionCurve/LiftChart.
          • 遍历bins中每一个有意义的点,计算出totalTrue和totalFalse,并且在每一个点上计算:
          • curTrue += positiveBin[index]; curFalse += negativeBin[index];
          • 得到该点的混淆矩阵 new ConfusionMatrix(new long[][] {{curTrue,curFalse},{totalTrue - curTrue,totalFalse - curFalse}});
          • 得到 tpr = (totalTrue == 0 ? 1.0 : 1.0 * curTrue / totalTrue);
          • rocCurve,recallPrecisionCurve,liftChart在该点对应的数据;
        • 依据曲线内容计算并且存储 AUC/PRC/KS
        • 对生成的rocCurve/recallPrecisionCurve/LiftChart输出进行抽样
        • 依据抽样后的输出存储 RocCurve/RecallPrecisionCurve/LiftChar
        • 存储正例样本的度量指标
        • 存储Logloss
        • Pick the middle point where threshold is 0.5.

3.2.1 calLabelPredDetailLocal

本函数按照partition分别计算评估指标 evaluation metrics。是的,这代码很短,但是有个地方需要注意。有时候越简单的地方越容易疏漏。容易疏漏点是:

第一行代码的结果 labels 是第二行代码的参数,而并非第二行主体。第二行代码主体和第一行代码主体一样,都是data。

private static DataSet<BaseMetricsSummary> calLabelPredDetailLocal(DataSet<Row> data,final String positiveValue,oolean binary) {
  
    DataSet<Tuple2<Map<String,Integer>,String[]>> labels = data.flatMap(new FlatMapFunction<Row,String>() {
        @Override
        public void flatMap(Row row,Collector<String> collector) {
            TreeMap<String,Double> labelProbMap;
            if (EvaluationUtil.checkRowFieldNotNull(row)) {
                labelProbMap = EvaluationUtil.extractLabelProbMap(row);
                labelProbMap.keySet().forEach(collector::collect);
                collector.collect(row.getField(0).toString());
            }
        }
    }).reduceGroup(new EvaluationUtil.DistinctLabelIndexMap(binary,positiveValue));

    return data
        .rebalance()
        .mapPartition(new CalLabelDetailLocal(binary))
        .withBroadcastSet(labels,LABELS);
}

calLabelPredDetailLocal中具体分为三步骤:

  • 在flatMap会从label列和prediction列中,取出所有labels(注意是取出labels的名字 ),发送给下游算子。
  • reduceGroup的主要功能是去重 "labels名字",然后给每一个label一个ID,最后结果是一个<labels,ID>Map。
  • mapPartition 是分区调用 CalLabelDetailLocal 来计算混淆矩阵。

下面具体看看。

3.2.1.1 flatMap

在flatMap中,主要是从label列和prediction列中,取出所有labels(注意是取出labels的名字 ),发送给下游算子。

EvaluationUtil.extractLabelProbMap 作用就是解析输入的json,获得具体detailInput中的信息。

下游算子是reduceGroup,所以Flink runtime会对这些labels自动去重。如果对这部分有兴趣,可以参见我之前介绍reduce的文章。CSDN : [源码解析] Flink的groupBy和reduce究竟做了什么 博客园 : [源码解析] Flink的groupBy和reduce究竟做了什么

程序中变量如下

row = {Row@8922} "prefix1,{"prefix1": 0.9,"prefix0": 0.1}"
 fields = {Object[2]@8925} 
  0 = "prefix1"
  1 = "{"prefix1": 0.9,"prefix0": 0.1}"
    
labelProbMap = {TreeMap@9008}  size = 2
 "prefix0" -> {Double@9015} 0.1
 "prefix1" -> {Double@9017} 0.9
    
labelProbMap.keySet().forEach(collector::collect); //这里发送 "prefix0","prefix1" 
collector.collect(row.getField(0).toString());  // 这里发送 "prefix1"   
// 因为下一个操作是reduceGroup,所以这些label会被runtime去重
3.2.1.2 reduceGroup

主要功能是通过buildLabelIndexLabelArray去重labels,然后给每一个label一个ID,最后结果是一个<labels,ID>的Map。

reduceGroup(new EvaluationUtil.DistinctLabelIndexMap(binary,positiveValue));

DistinctLabelIndexMap的作用是从label列和prediction列中,取出所有不同的labels,返回一个<labels,ID>的map,根据后续代码看,这个map是多分类才用到。Get all the distinct labels from label column and prediction column,and return the map of labels and their IDs.

前面已经提到,这里的参数rows已经被自动去重。

public static class DistinctLabelIndexMap implements
    GroupReduceFunction<String,Tuple2<Map<String,String[]>> {
    ......
    @Override
    public void reduce(Iterable<String> rows,Collector<Tuple2<Map<String,String[]>> collector) throws Exception {
        HashSet<String> labels = new HashSet<>();
        rows.forEach(labels::add);
        collector.collect(buildLabelIndexLabelArray(labels,binary,positiveValue));
    }
}

// 变量为
labels = {HashSet@9008}  size = 2
 0 = "prefix1"
 1 = "prefix0"
binary = true

buildLabelIndexLabelArray的作用是给每一个label一个ID,得到一个 <labels,prefix0])。

// Give each label an ID,return a map of label and ID.
public static Tuple2<Map<String,String[]> buildLabelIndexLabelArray(HashSet<String> set,boolean binary,String positiveValue) {
    String[] labels = set.toArray(new String[0]);
    Arrays.sort(labels,Collections.reverseOrder());

    Map<String,Integer> map = new HashMap<>(labels.length);
    if (binary && null != positiveValue) {
        if (labels[1].equals(positiveValue)) {
            labels[1] = labels[0];
            labels[0] = positiveValue;
        } 
        map.put(labels[0],0);
        map.put(labels[1],1);
    } else {
        for (int i = 0; i < labels.length; i++) {
            map.put(labels[i],i);
        }
    }
    return Tuple2.of(map,labels);
}

// 程序变量如下
labels = {String[2]@9013} 
 0 = "prefix1"
 1 = "prefix0"
map = {HashMap@9014}  size = 2
 "prefix1" -> {Integer@9020} 0
 "prefix0" -> {Integer@9021} 1
3.2.1.3 mapPartition

这里主要功能是分区调用 CalLabelDetailLocal 来为后来计算混淆矩阵做准备。

return data
    .rebalance()
    .mapPartition(new CalLabelDetailLocal(binary)) //这里是业务所在
    .withBroadcastSet(labels,LABELS);

具体工作是 CalLabelDetailLocal 完成的,其作用是分区调用getDetailStatistics

// Calculate the confusion matrix based on the label and predResult.
static class CalLabelDetailLocal extends RichMapPartitionFunction<Row,BaseMetricsSummary> {
        private Tuple2<Map<String,String[]> map;
        private boolean binary;

        @Override
        public void open(Configuration parameters) throws Exception {
            List<Tuple2<Map<String,String[]>> list = getRuntimeContext().getBroadcastVariable(LABELS);
            this.map = list.get(0);// 前文生成的二元组(map,labels)
        }

        @Override
        public void mapPartition(Iterable<Row> rows,Collector<BaseMetricsSummary> collector) {
            // 调用到了 getDetailStatistics
            collector.collect(getDetailStatistics(rows,map));
        }
    }  

getDetailStatistics 的作用是:初始化分类评估的度量指标 base classification evaluation metrics,累积计算混淆矩阵需要的数据。主要就是遍历 rows 数据,提取每一个item(比如 "prefix1,"prefix0": 0.2}"),然后累积计算混淆矩阵所需数据。

// Initialize the base classification evaluation metrics. There are two cases: BinaryClassMetrics and MultiClassMetrics.
    private static BaseMetricsSummary getDetailStatistics(Iterable<Row> rows,String positiveValue,String[]> tuple) {
        BinaryMetricsSummary binaryMetricsSummary = null;
        MultiMetricsSummary multiMetricsSummary = null;
        Tuple2<Map<String,String[]> labelIndexLabelArray = tuple;  // 前文生成的二元组(map,labels)

        Iterator<Row> iterator = rows.iterator();
        Row row = null;
        while (iterator.hasNext() && !checkRowFieldNotNull(row)) {
            row = iterator.next();
        }

        Map<String,Integer> labelIndexMap = null;
        if (binary) {
           // 二分法在这里 
            binaryMetricsSummary = new BinaryMetricsSummary(
                new long[ClassificationEvaluationUtil.DETAIL_BIN_NUMBER],new long[ClassificationEvaluationUtil.DETAIL_BIN_NUMBER],labelIndexLabelArray.f1,0L);
        } else {
            // 
            labelIndexMap = labelIndexLabelArray.f0; // 前文生成的<labels,ID>Map看来是多分类才用到。
            multiMetricsSummary = new MultiMetricsSummary(
                new long[labelIndexMap.size()][labelIndexMap.size()],0L);
        }

        while (null != row) {
            if (checkRowFieldNotNull(row)) {
                TreeMap<String,Double> labelProbMap = extractLabelProbMap(row);
                String label = row.getField(0).toString();
                if (ArrayUtils.indexOf(labelIndexLabelArray.f1,label) >= 0) {
                    if (binary) {
                        // 二分法在这里 
                        updateBinaryMetricsSummary(labelProbMap,label,binaryMetricsSummary);
                    } else {
                        updateMultiMetricsSummary(labelProbMap,labelIndexMap,multiMetricsSummary);
                    }
                }
            }
            row = iterator.hasNext() ? iterator.next() : null;
        }

        return binary ? binaryMetricsSummary : multiMetricsSummary;
}

//变量如下
tuple = {Tuple2@9252} "({prefix1=0,prefix0])"
 f0 = {HashMap@9257}  size = 2
  "prefix1" -> {Integer@9264} 0
  "prefix0" -> {Integer@9266} 1
 f1 = {String[2]@9258} 
  0 = "prefix1"
  1 = "prefix0"
 
row = {Row@9271} "prefix1,"prefix0": 0.2}"
 fields = {Object[2]@9276} 
  0 = "prefix1"
  1 = "{"prefix1": 0.8,"prefix0": 0.2}"
    
labelIndexLabelArray = {Tuple2@9240} "({prefix1=0,prefix0])"
 f0 = {HashMap@9288}  size = 2
  "prefix1" -> {Integer@9294} 0
  "prefix0" -> {Integer@9296} 1
 f1 = {String[2]@9242} 
  0 = "prefix1"
  1 = "prefix0"
    
labelProbMap = {TreeMap@9342}  size = 2
 "prefix0" -> {Double@9378} 0.1
 "prefix1" -> {Double@9380} 0.9    

先回忆下混淆矩阵:

预测值 0 预测值 1
真实值 0 TN FP
真实值 1 FN TP

针对混淆矩阵,BinaryMetricsSummary 的作用是Save the evaluation data for binary classification。函数具体计算思路是:

  • 把 [0,1] 分成ClassificationEvaluationUtil.DETAIL_BIN_NUMBER(100000)这么多桶(bin)。所以binaryMetricsSummary的positiveBin/negativeBin分别是两个100000的数组。如果某一个 sample 为 正例(positive value) 的概率是 p,则该 sample 对应的 bin index 就是 p * 100000。如果 p 被预测为正例(positive value) ,则positiveBin[index]++,否则就是被预测为负例(negative value) ,则negativeBin[index]++。positiveBin就是 TP + FP,negativeBin就是 TN + FN。

  • 所以这里会遍历输入,如果某一个输入(以"prefix1",\"prefix0\": 0.1}"为例),0.9 是prefix1(正例) 的概率,0.1 是为prefix0(负例) 的概率。

    • 既然这个算法选择了 prefix1(正例) ,所以就说明此算法是判别成 positive 的,所以在 positiveBin 的 90000 处 + 1。
    • 假设这个算法选择了 prefix0(负例) ,则说明此算法是判别成 negative 的,所以应该在 negativeBin 的 90000 处 + 1。

具体对应我们示例代码的5个采样,分类如下:

Row.of("prefix1",positiveBin 90000处+1
Row.of("prefix1",positiveBin 80000处+1
Row.of("prefix1",positiveBin 70000处+1
Row.of("prefix0",negativeBin 75000处+1
Row.of("prefix0",\"prefix0\": 0.4}")  negativeBin 60000处+1

具体代码如下

public static void updateBinaryMetricsSummary(TreeMap<String,Double> labelProbMap,String label,BinaryMetricsSummary binaryMetricsSummary) {
    binaryMetricsSummary.total++;
    binaryMetricsSummary.logLoss += extractLogloss(labelProbMap,label);

    double d = labelProbMap.get(binaryMetricsSummary.labels[0]);
    int idx = d == 1.0 ? ClassificationEvaluationUtil.DETAIL_BIN_NUMBER - 1 :
        (int)Math.floor(d * ClassificationEvaluationUtil.DETAIL_BIN_NUMBER);
    if (idx >= 0 && idx < ClassificationEvaluationUtil.DETAIL_BIN_NUMBER) {
        if (label.equals(binaryMetricsSummary.labels[0])) {
            binaryMetricsSummary.positiveBin[idx] += 1;
        } else if (label.equals(binaryMetricsSummary.labels[1])) {
            binaryMetricsSummary.negativeBin[idx] += 1;
        } else {
					.....
        }
    }
}

private static double extractLogloss(TreeMap<String,String label) {
   Double prob = labelProbMap.get(label);
   prob = null == prob ? 0. : prob;
   return -Math.log(Math.max(Math.min(prob,1 - LOG_LOSS_EPS),LOG_LOSS_EPS));
}

// 变量如下
ClassificationEvaluationUtil.DETAIL_BIN_NUMBER=100000
  
// 当 "prefix1",\"prefix0\": 0.1}" 时候
labelProbMap = {TreeMap@9305}  size = 2
 "prefix0" -> {Double@9331} 0.1
 "prefix1" -> {Double@9333} 0.9
  
d = 0.9
idx = 90000
binaryMetricsSummary = {BinaryMetricsSummary@9262} 
 labels = {String[2]@9242} 
  0 = "prefix1"
  1 = "prefix0"
 total = 1
 positiveBin = {long[100000]@9263}  // 90000处+1
 negativeBin = {long[100000]@9264} 
 logLoss = 0.10536051565782628
   
// 当 "prefix0",\"prefix0\": 0.4}" 时候  
labelProbMap = {TreeMap@9514}  size = 2
 "prefix0" -> {Double@9546} 0.4
 "prefix1" -> {Double@9547} 0.6
   
d = 0.6
idx = 60000    
 binaryMetricsSummary = {BinaryMetricsSummary@9262} 
 labels = {String[2]@9242} 
  0 = "prefix1"
  1 = "prefix0"
 total = 2
 positiveBin = {long[100000]@9263}  
 negativeBin = {long[100000]@9264} // 60000处+1
 logLoss = 1.0216512475319812  

3.2.2 ReduceBaseMetrics

ReduceBaseMetrics作用是把局部计算的 BaseMetrics 聚合起来。

DataSet<BaseMetricsSummary> metrics = res
    .reduce(new EvaluationUtil.ReduceBaseMetrics());

ReduceBaseMetrics如下

public static class ReduceBaseMetrics implements ReduceFunction<BaseMetricsSummary> {
    @Override
    public BaseMetricsSummary reduce(BaseMetricsSummary t1,BaseMetricsSummary t2) throws Exception {
        return null == t1 ? t2 : t1.merge(t2);
    }
}

具体计算是在BinaryMetricsSummary.merge,其作用就是Merge the bins,and add the logLoss。

@Override
public BinaryMetricsSummary merge(BinaryMetricsSummary binaryClassMetrics) {
    for (int i = 0; i < this.positiveBin.length; i++) {
        this.positiveBin[i] += binaryClassMetrics.positiveBin[i];
    }
    for (int i = 0; i < this.negativeBin.length; i++) {
        this.negativeBin[i] += binaryClassMetrics.negativeBin[i];
    }
    this.logLoss += binaryClassMetrics.logLoss;
    this.total += binaryClassMetrics.total;
    return this;
}

// 程序变量是
this = {BinaryMetricsSummary@9316} 
 labels = {String[2]@9322} 
  0 = "prefix1"
  1 = "prefix0"
 total = 2
 positiveBin = {long[100000]@9320} 
 negativeBin = {long[100000]@9323} 
 logLoss = 1.742969305058623

3.2.3 SaveDataAsParams

this.setOutput(metrics.flatMap(new EvaluationUtil.SaveDataAsParams()),new TypeInformation[] {Types.STRING});

当归并所有BaseMetrics之后,得到了total BaseMetrics,计算indexes,存入到params。

public static class SaveDataAsParams implements FlatMapFunction<BaseMetricsSummary,Row> {
    @Override
    public void flatMap(BaseMetricsSummary t,Collector<Row> collector) throws Exception {
        collector.collect(t.toMetrics().serialize());
    }
}

实际业务在BinaryMetricsSummary.toMetrics中完成,即基于bin的信息计算,得到confusionMatrix array,rocCurve/recallPrecisionCurve/LiftChart等等,然后存储到params。

public BinaryClassMetrics toMetrics() {
    Params params = new Params();
    // 生成若干曲线,比如rocCurve/recallPrecisionCurve/LiftChart
    Tuple3<ConfusionMatrix[],double[],EvaluationCurve[]> matrixThreCurve =
        extractMatrixThreCurve(positiveBin,negativeBin,total);

    // 依据曲线内容计算并且存储 AUC/PRC/KS
    setCurveAreaParams(params,matrixThreCurve.f2);

    // 对生成的rocCurve/recallPrecisionCurve/LiftChart输出进行抽样
    Tuple3<ConfusionMatrix[],EvaluationCurve[]> sampledMatrixThreCurve = sample(
        PROBABILITY_INTERVAL,matrixThreCurve);

    // 依据抽样后的输出存储 RocCurve/RecallPrecisionCurve/LiftChar
    setCurvePointsParams(params,sampledMatrixThreCurve);
    ConfusionMatrix[] matrices = sampledMatrixThreCurve.f0;
  
    // 存储正例样本的度量指标
    setComputationsArrayParams(params,sampledMatrixThreCurve.f1,sampledMatrixThreCurve.f0);
  
    // 存储Logloss
    setLoglossParams(params,logLoss,total);
  
    // Pick the middle point where threshold is 0.5.
    int middleIndex = getMiddleThresholdIndex(sampledMatrixThreCurve.f1);  
    setMiddleThreParams(params,matrices[middleIndex],labels);
    return new BinaryClassMetrics(params);
}

extractMatrixThreCurve是全文重点。这里是 Extract the bins who are not empty,keep the middle threshold 0.5,然后初始化了 RocCurve,Recall-Precision Curve and Lift Curve,计算出ConfusionMatrix array(混淆矩阵),rocCurve/recallPrecisionCurve/LiftChart.。

/**
 * Extract the bins who are not empty,keep the middle threshold 0.5.
 * Initialize the RocCurve,Recall-Precision Curve and Lift Curve.
 * RocCurve: (FPR,TPR),starts with (0,0). Recall-Precision Curve: (recall,precision),p),p is the precision with the lowest. LiftChart: (TP+FP/total,TP),0). confusion matrix = [TP FP][FN * TN].
 *
 * @param positiveBin positiveBins.
 * @param negativeBin negativeBins.
 * @param total       sample number
 * @return ConfusionMatrix array,rocCurve/recallPrecisionCurve/LiftChart.
 */
static Tuple3<ConfusionMatrix[],EvaluationCurve[]> extractMatrixThreCurve(long[] positiveBin,long[] negativeBin,long total) {
    ArrayList<Integer> effectiveIndices = new ArrayList<>();
    long totalTrue = 0,totalFalse = 0;
  
    // 计算totalTrue,totalFalse,effectiveIndices
    for (int i = 0; i < ClassificationEvaluationUtil.DETAIL_BIN_NUMBER; i++) {
        if (0L != positiveBin[i] || 0L != negativeBin[i]
            || i == ClassificationEvaluationUtil.DETAIL_BIN_NUMBER / 2) {
            effectiveIndices.add(i);
            totalTrue += positiveBin[i];
            totalFalse += negativeBin[i];
        }
    }

// 以我们例子,得到  
effectiveIndices = {ArrayList@9273}  size = 6
 0 = {Integer@9277} 50000 //这里加入了中间点
 1 = {Integer@9278} 60000
 2 = {Integer@9279} 70000
 3 = {Integer@9280} 75000
 4 = {Integer@9281} 80000
 5 = {Integer@9282} 90000
totalTrue = 3
totalFalse = 2
  
    // 继续初始化,生成若干curve
    final int length = effectiveIndices.size();
    final int newLen = length + 1;
    final double m = 1.0 / ClassificationEvaluationUtil.DETAIL_BIN_NUMBER;
    EvaluationCurvePoint[] rocCurve = new EvaluationCurvePoint[newLen];
    EvaluationCurvePoint[] recallPrecisionCurve = new EvaluationCurvePoint[newLen];
    EvaluationCurvePoint[] liftChart = new EvaluationCurvePoint[newLen];
    ConfusionMatrix[] data = new ConfusionMatrix[newLen];
    double[] threshold = new double[newLen];
    long curTrue = 0;
    long curFalse = 0;
  
// 以我们例子,得到 
length = 6
newLen = 7
m = 1.0E-5
  
    // 计算,其中rocCurve,recallPrecisionCurve,liftChart 都可以从代码中看出
    for (int i = 1; i < newLen; i++) {
        int index = effectiveIndices.get(length - i);
        curTrue += positiveBin[index];
        curFalse += negativeBin[index];
        threshold[i] = index * m;
        // 计算出混淆矩阵
        data[i] = new ConfusionMatrix(
            new long[][] {{curTrue,totalFalse - curFalse}});
        double tpr = (totalTrue == 0 ? 1.0 : 1.0 * curTrue / totalTrue);
        // 比如当 90000 这点,得到 curTrue = 1 curFalse = 0 i = 1 index = 90000 tpr = 0.3333333333333333。totalTrue = 3 totalFalse = 2, 
        // 我们也知道,TPR = TP / (TP + FN) ,所以可以计算 tpr = 1 / 3   
        rocCurve[i] = new EvaluationCurvePoint(totalFalse == 0 ? 1.0 : 1.0 * curFalse / totalFalse,tpr,threshold[i]);
        recallPrecisionCurve[i] = new EvaluationCurvePoint(tpr,curTrue + curTrue == 0 ? 1.0 : 1.0 * curTrue / (curTrue + curFalse),threshold[i]);
        liftChart[i] = new EvaluationCurvePoint(1.0 * (curTrue + curFalse) / total,curTrue,threshold[i]);
    }
  
// 以我们例子,得到 
curTrue = 3
curFalse = 2
  
threshold = {double[7]@9349} 
 0 = 0.0
 1 = 0.9
 2 = 0.8
 3 = 0.7500000000000001
 4 = 0.7000000000000001
 5 = 0.6000000000000001
 6 = 0.5  
   
rocCurve = {EvaluationCurvePoint[7]@9315} 
 1 = {EvaluationCurvePoint@9440} 
  x = 0.0
  y = 0.3333333333333333
  p = 0.9
 2 = {EvaluationCurvePoint@9448} 
  x = 0.0
  y = 0.6666666666666666
  p = 0.8
 3 = {EvaluationCurvePoint@9449} 
  x = 0.5
  y = 0.6666666666666666
  p = 0.7500000000000001
 4 = {EvaluationCurvePoint@9450} 
  x = 0.5
  y = 1.0
  p = 0.7000000000000001
 5 = {EvaluationCurvePoint@9451} 
  x = 1.0
  y = 1.0
  p = 0.6000000000000001
 6 = {EvaluationCurvePoint@9452} 
  x = 1.0
  y = 1.0
  p = 0.5
    
recallPrecisionCurve = {EvaluationCurvePoint[7]@9320} 
 1 = {EvaluationCurvePoint@9444} 
  x = 0.3333333333333333
  y = 1.0
  p = 0.9
 2 = {EvaluationCurvePoint@9453} 
  x = 0.6666666666666666
  y = 1.0
  p = 0.8
 3 = {EvaluationCurvePoint@9454} 
  x = 0.6666666666666666
  y = 0.6666666666666666
  p = 0.7500000000000001
 4 = {EvaluationCurvePoint@9455} 
  x = 1.0
  y = 0.75
  p = 0.7000000000000001
 5 = {EvaluationCurvePoint@9456} 
  x = 1.0
  y = 0.6
  p = 0.6000000000000001
 6 = {EvaluationCurvePoint@9457} 
  x = 1.0
  y = 0.6
  p = 0.5
    
liftChart = {EvaluationCurvePoint[7]@9325} 
 1 = {EvaluationCurvePoint@9458} 
  x = 0.2
  y = 1.0
  p = 0.9
 2 = {EvaluationCurvePoint@9459} 
  x = 0.4
  y = 2.0
  p = 0.8
 3 = {EvaluationCurvePoint@9460} 
  x = 0.6
  y = 2.0
  p = 0.7500000000000001
 4 = {EvaluationCurvePoint@9461} 
  x = 0.8
  y = 3.0
  p = 0.7000000000000001
 5 = {EvaluationCurvePoint@9462} 
  x = 1.0
  y = 3.0
  p = 0.6000000000000001
 6 = {EvaluationCurvePoint@9463} 
  x = 1.0
  y = 3.0
  p = 0.5
    
data = {ConfusionMatrix[7]@9339} 
 0 = {ConfusionMatrix@9486} 
  longMatrix = {LongMatrix@9488} 
   matrix = {long[2][]@9491} 
    0 = {long[2]@9492} 
     0 = 0
     1 = 0
    1 = {long[2]@9493} 
     0 = 3
     1 = 2
   rowNum = 2
   colNum = 2
  labelCnt = 2
  total = 5
  actualLabelFrequency = {long[2]@9489} 
   0 = 3
   1 = 2
  predictLabelFrequency = {long[2]@9490} 
   0 = 0
   1 = 5
  tpCount = 2.0
  tnCount = 2.0
  fpCount = 3.0
  fnCount = 3.0
 1 = {ConfusionMatrix@9435} 
  longMatrix = {LongMatrix@9469} 
   matrix = {long[2][]@9472} 
    0 = {long[2]@9474} 
     0 = 1
     1 = 0
    1 = {long[2]@9475} 
     0 = 2
     1 = 2
   rowNum = 2
   colNum = 2
  labelCnt = 2
  total = 5
  actualLabelFrequency = {long[2]@9470} 
   0 = 3
   1 = 2
  predictLabelFrequency = {long[2]@9471} 
   0 = 1
   1 = 4
  tpCount = 3.0
  tnCount = 3.0
  fpCount = 2.0
  fnCount = 2.0
  ......  
    
    threshold[0] = 1.0;
    data[0] = new ConfusionMatrix(new long[][] {{0,0},{totalTrue,totalFalse}});
    rocCurve[0] = new EvaluationCurvePoint(0,threshold[0]);
    recallPrecisionCurve[0] = new EvaluationCurvePoint(0,recallPrecisionCurve[1].getY(),threshold[0]);
    liftChart[0] = new EvaluationCurvePoint(0,threshold[0]);

    return Tuple3.of(data,threshold,new EvaluationCurve[] {new EvaluationCurve(rocCurve),new EvaluationCurve(recallPrecisionCurve),new EvaluationCurve(liftChart)});
}

3.2.4 计算混淆矩阵

这里再给大家讲讲混淆矩阵如何计算,这里思路比较绕。

3.2.4.1 原始矩阵

调用之处是:

// 调用之处
data[i] = new ConfusionMatrix(
        new long[][] {{curTrue,totalFalse - curFalse}});
// 调用时候各种赋值
i = 1
index = 90000
totalTrue = 3
totalFalse = 2
curTrue = 1
curFalse = 0

得到原始矩阵,以下都有cur,说明只针对当前点来说

curTrue = 1 curFalse = 0
totalTrue - curTrue = 2 totalFalse - curFalse = 2
3.2.4.2 计算标签

后续ConfusionMatrix计算中,由此可以得到

actualLabelFrequency = longMatrix.getColSums();
predictLabelFrequency = longMatrix.getRowSums();

actualLabelFrequency = {long[2]@9322} 
 0 = 3
 1 = 2
predictLabelFrequency = {long[2]@9323} 
 0 = 1
 1 = 4  

可以看出来,Alink算法认为:每列的sum和实际标签有关;每行sum和预测标签有关。

得到新矩阵如下

predictLabelFrequency
curTrue = 1 curFalse = 0 1 = curTrue + curFalse
totalTrue - curTrue = 2 totalFalse - curFalse = 2 4 = total - curTrue - curFalse
actualLabelFrequency 3 = totalTrue 2 = totalFalse

后续计算将要基于这些来计算:

计算中就用到longMatrix 对角线上的数据,即longMatrix(0)(0)和 longMatrix(1)(1)。一定要注意,这里考虑的都是 当前状态 (画重点强调)

longMatrix(0)(0) :curTrue

longMatrix(1)(1) :totalFalse - curFalse

totalFalse :( TN + FN )

totalTrue :( TP + FP )

double numTrueNegative(Integer labelIndex) {
  // labelIndex为 0 时候,return 1 + 5 - 1 - 3 = 2;
  // labelIndex为 1 时候,return 2 + 5 - 4 - 2 = 1;
	return null == labelIndex ? tnCount : longMatrix.getValue(labelIndex,labelIndex) + total - predictLabelFrequency[labelIndex] - actualLabelFrequency[labelIndex];
}

double numTruePositive(Integer labelIndex) {
  // labelIndex为 0 时候,return 1; 这个是 curTrue,就是真实标签是True,判别也是True。是TP
  // labelIndex为 1 时候,return 2; 这个是 totalFalse - curFalse,总判别错 - 当前判别错。这就意味着“本来判别错了但是当前没有发现”,所以认为在当前状态下,这也算是TP
	return null == labelIndex ? tpCount : longMatrix.getValue(labelIndex,labelIndex);
}

double numFalseNegative(Integer labelIndex) {
  // labelIndex为 0 时候,return 3 - 1; 
  // actualLabelFrequency[0] = totalTrue。所以return totalTrue - curTrue,即当前“全部正确”中没有“判别为正确”,这个就可以认为是“判别错了且判别为负”
  // labelIndex为 1 时候,return 2 - 2;   
  // actualLabelFrequency[1] = totalFalse。所以return totalFalse - ( totalFalse - curFalse )  = curFalse
	return null == labelIndex ? fnCount : actualLabelFrequency[labelIndex] - longMatrix.getValue(labelIndex,labelIndex);
}

double numFalsePositive(Integer labelIndex) {
  // labelIndex为 0 时候,return 1 - 1;
  // predictLabelFrequency[0] = curTrue + curFalse。
  // 所以 return = curTrue + curFalse - curTrue = curFalse = current( TN + FN ) 这可以认为是判断错了实际是正确标签
  // labelIndex为 1 时候,return 4 - 2; 
  // predictLabelFrequency[1] = total - curTrue - curFalse。
  // 所以 return = total - curTrue - curFalse - (totalFalse - curFalse) = totalTrue - curTrue = ( TP + FP ) - currentTP = currentFP 
	return null == labelIndex ? fpCount : predictLabelFrequency[labelIndex] - longMatrix.getValue(labelIndex,labelIndex);
}

// 最后得到
tpCount = 3.0
tnCount = 3.0
fpCount = 2.0
fnCount = 2.0
3.2.4.3 具体代码
// 具体计算 
public ConfusionMatrix(LongMatrix longMatrix) {
  
longMatrix = {LongMatrix@9297} 
  0 = {long[2]@9324} 
   0 = 1
   1 = 0
  1 = {long[2]@9325} 
   0 = 2
   1 = 2
     
    this.longMatrix = longMatrix;
    labelCnt = this.longMatrix.getRowNum();
    // 这里就是计算
    actualLabelFrequency = longMatrix.getColSums();
    predictLabelFrequency = longMatrix.getRowSums();
  
actualLabelFrequency = {long[2]@9322} 
 0 = 3
 1 = 2
predictLabelFrequency = {long[2]@9323} 
 0 = 1
 1 = 4  
labelCnt = 2
total = 5  

    total = longMatrix.getTotal();
    for (int i = 0; i < labelCnt; i++) {
        tnCount += numTrueNegative(i);
        tpCount += numTruePositive(i);
        fnCount += numFalseNegative(i);
        fpCount += numFalsePositive(i);
    }
}

0x04 流处理

4.1 示例

Alink原有python示例代码中,Stream部分是没有输出的,因为MemSourceStreamOp没有和时间相关联,而Alink中没有提供基于时间的StreamOperator,所以只能自己仿照MemSourceBatchOp写了一个。虽然代码有些丑,但是至少可以提供输出,这样就能够调试。

4.1.1 主类

public class EvalBinaryClassExampleStream {

    AlgoOperator getData(boolean isBatch) {
        Row[] rows = new Row[]{
                Row.of("prefix1",\"prefix0\": 0.1}")
        };
        String[] schema = new String[]{"label","detailInput"};
        if (isBatch) {
            return new MemSourceBatchOp(rows,schema);
        } else {
            return new TimeMemSourceStreamOp(rows,schema,new EvalBinaryStreamSource());
        }
    }

    public static void main(String[] args) throws Exception {
        EvalBinaryClassExampleStream test = new EvalBinaryClassExampleStream();
        StreamOperator streamData = (StreamOperator) test.getData(false);
        StreamOperator sOp = new EvalBinaryClassStreamOp()
                .setLabelCol("label")
                .setPredictionDetailCol("detailInput")
                .setTimeInterval(1)
                .linkFrom(streamData);
        sOp.print();
        StreamOperator.execute();
    }
}

4.1.2 TimeMemSourceStreamOp

这个是我自己炮制的。借鉴了MemSourceStreamOp。

public final class TimeMemSourceStreamOp extends StreamOperator<TimeMemSourceStreamOp> {

    public TimeMemSourceStreamOp(Row[] rows,String[] colNames,EvalBinaryStrSource source) {
        super(null);
        init(source,Arrays.asList(rows),colNames);
    }

    private void init(EvalBinaryStreamSource source,List <Row> rows,String[] colNames) {
        Row first = rows.iterator().next();
        int arity = first.getArity();
        TypeInformation <?>[] types = new TypeInformation[arity];

        for (int i = 0; i < arity; ++i) {
            types[i] = TypeExtractor.getForObject(first.getField(i));
        }

        init(source,colNames,types);
    }

    private void init(EvalBinaryStreamSource source,TypeInformation <?>[] colTypes) {
        DataStream <Row> dastr = MLEnvironmentFactory.get(getMLEnvironmentId())
                .getStreamExecutionEnvironment().addSource(source);
        StringBuilder sbd = new StringBuilder();
        sbd.append(colNames[0]);
      
        for (int i = 1; i < colNames.length; i++) {
            sbd.append(",").append(colNames[i]);
        }
        this.setOutput(dastr,colTypes);
    }

    @Override
    public TimeMemSourceStreamOp linkFrom(StreamOperator<?>... inputs) {
        return null;
    }
}

4.1.3 Source

定时提供Row,加入了随机数,让概率有变化。

class EvalBinaryStreamSource extends RichSourceFunction[Row] {

  override def run(ctx: SourceFunction.SourceContext[Row]) = {
    while (true) {
      val rdm = Math.random() // 这里加入了随机数,让概率有变化
      val rows: Array[Row] = Array[Row](
        Row.of("prefix1","{\"prefix1\": " + rdm + ",\"prefix0\": " + (1-rdm) + "}"),\"prefix0\": 0.4}"))
      for(row <- rows) {
        println(s"当前值:$row")
        ctx.collect(row)
      }
      Thread.sleep(1000)
    }
  }

  override def cancel() = ???
}

4.2 BaseEvalClassStreamOp

Alink流处理类是 EvalBinaryClassStreamOp,主要工作在其基类 BaseEvalClassStreamOp,所以我们重点看后者。

public class BaseEvalClassStreamOp<T extends BaseEvalClassStreamOp<T>> extends StreamOperator<T> {
    @Override
    public T linkFrom(StreamOperator<?>... inputs) {
        StreamOperator<?> in = checkAndGetFirst(inputs);
        String labelColName = this.get(MultiEvaluationStreamParams.LABEL_COL);
        String positiveValue = this.get(BinaryEvaluationStreamParams.POS_LABEL_VAL_STR);
        Integer timeInterval = this.get(MultiEvaluationStreamParams.TIME_INTERVAL);

        ClassificationEvaluationUtil.Type type = ClassificationEvaluationUtil.judgeEvaluationType(this.getParams());

        DataStream<BaseMetricsSummary> statistics;

        switch (type) {
            case PRED_RESULT: {
              ......
            }
            case PRED_DETAIL: {               
                String predDetailColName = this.get(MultiEvaluationStreamParams.PREDICTION_DETAIL_COL);
                // 
                PredDetailLabel eval = new PredDetailLabel(positiveValue,binary);
                // 获取输入数据,重点是timeWindowAll
                statistics = in.select(new String[] {labelColName,predDetailColName})
                    .getDataStream()
                    .timeWindowAll(Time.of(timeInterval,TimeUnit.SECONDS))
                    .apply(eval);
                break;
            }
        }
        // 把各个窗口的数据累积到 totalStatistics,注意,这里是新变量了。
        DataStream<BaseMetricsSummary> totalStatistics = statistics
            .map(new EvaluationUtil.AllDataMerge())
            .setParallelism(1); // 并行度设置为1

        // 基于两种 bins 计算&序列化,得到当前的 statistics
        DataStream<Row> windowOutput = statistics.map(
            new EvaluationUtil.SaveDataStream(ClassificationEvaluationUtil.WINDOW.f0));
        // 基于bins计算&序列化,得到累积的 totalStatistics
        DataStream<Row> allOutput = totalStatistics.map(
            new EvaluationUtil.SaveDataStream(ClassificationEvaluationUtil.ALL.f0));

      	// "当前" 和 "累积" 做联合,最终返回
        DataStream<Row> union = windowOutput.union(allOutput);

        this.setOutput(union,new String[] {ClassificationEvaluationUtil.STATISTICS_OUTPUT,DATA_OUTPUT},new TypeInformation[] {Types.STRING,Types.STRING});

        return (T)this;
    }
}

具体业务是:

  • PredDetailLabel 会进行去重标签名字 和 累积计算混淆矩阵所需数据
    • buildLabelIndexLabelArray 去重 "labels名字",然后给每一个label一个ID,最后结果是一个<labels,ID>Map。
    • getDetailStatistics 遍历 rows 数据,提取每一个item(比如 "prefix1,"prefix0": 0.2}"),然后通过updateBinaryMetricsSummary累积计算混淆矩阵所需数据。
  • 根据标签从Window中获取数据 statistics = in.select().getDataStream().timeWindowAll() .apply(eval);
  • EvaluationUtil.AllDataMerge 把各个窗口的数据累积到 totalStatistics 。
  • 得到windowOutput -------- EvaluationUtil.SaveDataStream,对"当前数据statistics"做处理。实际业务在BinaryMetricsSummary.toMetrics,即基于bin的信息计算,然后存储到params,并序列化返回Row。
    • extractMatrixThreCurve函数取出非空的bins,据此计算出ConfusionMatrix array(混淆矩阵),rocCurve/recallPrecisionCurve/LiftChart.
    • 依据曲线内容计算并且存储 AUC/PRC/KS
    • 对生成的rocCurve/recallPrecisionCurve/LiftChart输出进行抽样
    • 依据抽样后的输出存储 RocCurve/RecallPrecisionCurve/LiftChar
    • 存储正例样本的度量指标
    • 存储Logloss
    • Pick the middle point where threshold is 0.5.
  • 得到allOutput -------- EvaluationUtil.SaveDataStream,对"累积数据totalStatistics"做处理。
    • 详细处理流程同windowOutput。
  • windowOutput 和 allOutput 做联合。最终返回 DataStream union = windowOutput.union(allOutput);

4.2.1 PredDetailLabel

static class PredDetailLabel implements AllWindowFunction<Row,BaseMetricsSummary,TimeWindow> {
    @Override
    public void apply(TimeWindow timeWindow,Iterable<Row> rows,Collector<BaseMetricsSummary> collector) throws Exception {
        HashSet<String> labels = new HashSet<>();
        // 首先还是获取 labels 名字
        for (Row row : rows) {
            if (EvaluationUtil.checkRowFieldNotNull(row)) {
                labels.addAll(EvaluationUtil.extractLabelProbMap(row).keySet());
                labels.add(row.getField(0).toString());
            }
        }
labels = {HashSet@9757}  size = 2
 0 = "prefix1"
 1 = "prefix0"   
        // 之前介绍过,buildLabelIndexLabelArray 去重 "labels名字",然后给每一个label一个ID,最后结果是一个<labels,ID>Map。
        // getDetailStatistics 遍历 rows 数据,累积计算混淆矩阵所需数据( "TP + FN"  /  "TN + FP")。
        if (labels.size() > 0) {
            collector.collect(
                getDetailStatistics(rows,buildLabelIndexLabelArray(labels,positiveValue)));
        }
    }
}

4.2.2 AllDataMerge

EvaluationUtil.AllDataMerge 把各个窗口的数据累积

/**
 * Merge data from different windows.
 */
public static class AllDataMerge implements MapFunction<BaseMetricsSummary,BaseMetricsSummary> {
    private BaseMetricsSummary statistics;
    @Override
    public BaseMetricsSummary map(BaseMetricsSummary value) {
        this.statistics = (null == this.statistics ? value : this.statistics.merge(value));
        return this.statistics;
    }
}

4.2.3 SaveDataStream

SaveDataStream具体调用的函数之前批处理介绍过,实际业务在BinaryMetricsSummary.toMetrics,即基于bin的信息计算,存储到params。

这里与批处理不同的是直接就把"构建出的度量信息“返回给用户。

public static class SaveDataStream implements MapFunction<BaseMetricsSummary,Row> {
    @Override
    public Row map(BaseMetricsSummary baseMetricsSummary) throws Exception {
        BaseMetricsSummary metrics = baseMetricsSummary;
        BaseMetrics baseMetrics = metrics.toMetrics();
        Row row = baseMetrics.serialize();
        return Row.of(funtionName,row.getField(0));
    }
}

// 最后得到的 row 其实就是最终返回给用户的度量信息
row = {Row@10008} "{"PRC":"0.9164636268708667","SensitivityArray":"[0.38461538461538464,0.6923076923076923,1.0]","ConfusionMatrix":"[[13,8],[0,0]]","MacroRecall":"0.5","MacroSpecificity":"0.5","FalsePositiveRateArray":"[0.0,1.0]" ...... 还有很多其他的

4.2.4 Union

DataStream<Row> windowOutput = statistics.map(
    new EvaluationUtil.SaveDataStream(ClassificationEvaluationUtil.WINDOW.f0));
DataStream<Row> allOutput = totalStatistics.map(
    new EvaluationUtil.SaveDataStream(ClassificationEvaluationUtil.ALL.f0));

DataStream<Row> union = windowOutput.union(allOutput);

最后返回两种统计数据

4.2.4.1 allOutput
all|{"PRC":"0.7341146115890359","SensitivityArray":"[0.3333333333333333,0.7333333333333333,0.8,0.8666666666666667,0.9333333333333333,10],[2,"MacroRecall":"0.43333333333333335","MacroSpecificity":"0.43333333333333335","TruePositiveRateArray":"[0.3333333333333333,"AUC":"0.5666666666666667","MacroAccuracy":"0.52",......

4.2.4.2 windowOutput

window|{"PRC":"0.7638888888888888","ConfusionMatrix":"[[3,2],"AUC":"0.6666666666666666","MacroAccuracy":"0.6","RecallArray":"[0.3333333333333333,"KappaArray":"[0.28571428571428564,-0.15384615384615377,0.1666666666666666,0.5454545454545455,0.0]","MicroFalseNegativeRate":"0.4","WeightedRecall":"0.6","WeightedPrecision":"0.36","Recall":"1.0","MacroPrecision":"0.3",......

0xFF 参考

[[白话解析] 通过实例来梳理概念 :准确率 (Accuracy)、精准率(Precision)、召回率(Recall) 和 F值(F-Measure)](

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


文章浏览阅读5.3k次,点赞10次,收藏39次。本章详细写了mysql的安装,环境的搭建以及安装时常见的问题和解决办法。_mysql安装及配置超详细教程
文章浏览阅读1.8k次,点赞50次,收藏31次。本篇文章讲解Spark编程基础这门课程的期末大作业,主要围绕Hadoop基本操作、RDD编程、SparkSQL和SparkStreaming编程展开。_直接将第4题的计算结果保存到/user/root/lisi目录中lisipi文件里。
文章浏览阅读7.8k次,点赞9次,收藏34次。ES查询常用语法目录1. ElasticSearch之查询返回结果各字段含义2. match 查询3. term查询4. terms 查询5. range 范围6. 布尔查询6.1 filter加快查询效率的原因7. boosting query(提高查询)8. dis_max(最佳匹配查询)9. 分页10. 聚合查询【内含实际的demo】_es查询语法
文章浏览阅读928次,点赞27次,收藏18次。
文章浏览阅读1.1k次,点赞24次,收藏24次。作用描述分布式协调和一致性协调多个节点的活动,确保一致性和顺序。实现一致性、领导选举、集群管理等功能,确保系统的稳定和可靠性。高可用性和容错性Zookeeper是高可用的分布式系统,通过多个节点提供服务,容忍节点故障并自动进行主从切换。作为其他分布式系统的高可用组件,提供稳定的分布式协调和管理服务,保证系统的连续可用性。配置管理和动态更新作为配置中心,集中管理和分发配置信息。通过订阅机制,实现对配置的动态更新,以适应系统的变化和需求的变化。分布式锁和并发控制。
文章浏览阅读1.5k次,点赞26次,收藏29次。为贯彻执行集团数字化转型的需要,该知识库将公示集团组织内各产研团队不同角色成员的职务“职级”岗位的评定标准;
文章浏览阅读1.2k次,点赞26次,收藏28次。在安装Hadoop之前,需要进行以下准备工作:确认操作系统:Hadoop可以运行在多种操作系统上,包括Linux、Windows和Mac OS等。选择适合你的操作系统,并确保操作系统版本符合Hadoop的要求。安装Java环境:Hadoop是基于Java开发的,因此需要先安装和配置Java环境。确保已经安装了符合Hadoop版本要求的Java Development Kit (JDK),并设置好JAVA_HOME环境变量。确认硬件要求:Hadoop是一个分布式系统,因此需要多台计算机组成集群。
文章浏览阅读974次,点赞19次,收藏24次。# 基于大数据的K-means广告效果分析毕业设计 基于大数据的K-means广告效果分析。
文章浏览阅读1.7k次,点赞6次,收藏10次。Hadoop入门理论
文章浏览阅读1.3w次,点赞28次,收藏232次。通过博客和文献调研整理的一些农业病虫害数据集与算法。_病虫害数据集
文章浏览阅读699次,点赞22次,收藏7次。ZooKeeper使用的是Zab(ZooKeeper Atomic Broadcast)协议,其选举过程基于一种名为Fast Leader Election(FLE)的算法进行。:每个参与选举的ZooKeeper服务器称为一个“Follower”或“Candidate”,它们都有一个唯一的标识ID(通常是一个整数),并且都知道集群中其他服务器的ID。总之,ZooKeeper的选举机制确保了在任何时刻集群中只有一个Leader存在,并通过过半原则保证了即使部分服务器宕机也能维持高可用性和一致性。
文章浏览阅读10w+次,点赞62次,收藏73次。informatica 9.x是一款好用且功能强大的数据集成平台,主要进行各类数据库的管理操作,是使用相当广泛的一款ETL工具(注: ETL就是用来描述将数据从源端经过抽取(extract)、转换(transform)、加载(load)到目的端的过程)。本文主要为大家图文详细介绍Windows10下informatica powercenter 9.6.1安装与配置步骤。文章到这里就结束了,本人是在虚拟机中装了一套win10然后在此基础上测试安装的这些软件,因为工作学习要分开嘛哈哈哈。!!!!!_informatica客户端安装教程
文章浏览阅读7.8w次,点赞245次,收藏2.9k次。111个Python数据分析实战项目,代码已跑通,数据可下载_python数据分析项目案例
文章浏览阅读1.9k次,点赞61次,收藏64次。TDH企业级一站式大数据基础平台致力于帮助企业更全面、更便捷、更智能、更安全的加速数字化转型。通过数年时间的打磨创新,已帮助数千家行业客户利用大数据平台构建核心商业系统,加速商业创新。为了让大数据技术得到更广泛的使用与应用从而创造更高的价值,依托于TDH强大的技术底座,星环科技推出TDH社区版(Transwarp Data Hub Community Edition)版本,致力于为企业用户、高校师生、科研机构以及其他专业开发人员提供更轻量、更简单、更易用的数据分析开发环境,轻松应对各类人员数据分析需求。_星环tdh没有hive
文章浏览阅读836次,点赞21次,收藏19次。
文章浏览阅读1k次,点赞21次,收藏15次。主要介绍ETL相关工作的一些概念和需求点
文章浏览阅读1.4k次。本文以Android、java为开发技术,实现了一个基于Android的博物馆线上导览系统 app。基于Android的博物馆线上导览系统 app的主要使用者分为管理员和用户,app端:首页、菜谱信息、甜品信息、交流论坛、我的,管理员:首页、个人中心、用户管理、菜谱信息管理、菜谱分类管理、甜品信息管理、甜品分类管理、宣传广告管理、交流论坛、系统管理等功能。通过这些功能模块的设计,基本上实现了整个博物馆线上导览的过程。
文章浏览阅读897次,点赞19次,收藏26次。1.背景介绍在当今的数字时代,数据已经成为企业和组织中最宝贵的资源之一。随着互联网、移动互联网和物联网等技术的发展,数据的产生和收集速度也急剧增加。这些数据包括结构化数据(如数据库、 spreadsheet 等)和非结构化数据(如文本、图像、音频、视频等)。这些数据为企业和组织提供了更多的信息和见解,从而帮助他们做出更明智的决策。业务智能(Business Intelligence,BI)...
文章浏览阅读932次,点赞22次,收藏16次。也就是说,一个类应该对自己需要耦合或调用的类知道的最少,类与类之间的关系越密切,耦合度越大,那么类的变化对其耦合的类的影响也会越大,这也是我们面向对象设计的核心原则:低耦合,高内聚。优秀的架构和产品都是一步一步迭代出来的,用户量的不断增大,业务的扩展进行不断地迭代升级,最终演化成优秀的架构。其根本思想是强调了类的松耦合,类之间的耦合越弱,越有利于复用,一个处在弱耦合的类被修改,不会波及有关系的类。缓存,从操作系统到浏览器,从数据库到消息队列,从应用软件到操作系统,从操作系统到CPU,无处不在。
文章浏览阅读937次,点赞22次,收藏23次。大数据可视化是关于数据视觉表现形式的科学技术研究[9],将数据转换为图形或图像在屏幕上显示出来,并进行各种交互处理的理论、方法和技术。将数据直观地展现出来,以帮助人们理解数据,同时找出包含在海量数据中的规律或者信息,更多的为态势监控和综合决策服务。数据可视化是大数据生态链的最后一公里,也是用户最直接感知数据的环节。数据可视化系统并不是为了展示用户的已知的数据之间的规律,而是为了帮助用户通过认知数据,有新的发现,发现这些数据所反映的实质。大数据可视化的实施是一系列数据的转换过程。