大数据Presto四:Presto自定义函数和JDBC连接

​Presto自定义函数和JDBC连接

一、Presto 自定义函数

我们可以登录Presto客户端,使用命令:show functions 来查询对应的内置函数。我们也可以自己定义函数,自定义的函数包含UDF和UDAF函数。

1、​​​​​​​​​​​​​​UDF函数

自定义UDF函数及使用可以按照下面步骤来实现。

1.1、创建Maven项目,加入如下依赖

<dependency>
    <groupId>com.facebook.presto</groupId>
    <artifactId>presto-spi</artifactId>
    <version>0.259</version>
</dependency>
<dependency>
    <groupId>com.facebook.presto</groupId>
    <artifactId>presto-array</artifactId>
    <version>0.259</version>
</dependency>
<dependency>
    <groupId>io.airlift</groupId>
    <artifactId>stats</artifactId>
    <version>0.163</version>
</dependency>

<build>
  <plugins>

    <plugin>
      <artifactId>maven-assembly-plugin</artifactId>
      <version>2.4</version>
      <configuration>
        <!-- 设置false后是去掉 MySpark-1.0-SNAPSHOT-jar-with-dependencies.jar 后的 “-jar-with-dependencies” -->
        <!--<appendAssemblyId>false</appendAssemblyId>-->
        <descriptorRefs>
          <descriptorRef>jar-with-dependencies</descriptorRef>
        </descriptorRefs>
        <archive>
          <manifest>
            <mainClass>com.lw.java.myflink.Streaming.example.FlinkReadSocketData</mainClass>
          </manifest>
        </archive>
      </configuration>
      <executions>
        <execution>
          <id>make-assembly</id>
          <phase>package</phase>
          <goals>
            <goal>assembly</goal>
          </goals>
        </execution>
      </executions>
    </plugin>
  </plugins>
</build>

1.2、创建Presto注册插件类

package com.lansonjy.prestocode;
import com.facebook.presto.spi.Plugin;
import com.google.common.collect.ImmutableSet;

import java.util.Set;
//Presto 注册自定义函数的类,此类需要继承Plugin接口
public class MyFunctionsPlugin implements Plugin {
    @Override
    public Set<Class<?>> getFunctions()
    {
        return ImmutableSet.<Class<?>>builder()
                //注册UDF,这里填写对应的UDF类
                .add(MyUDF.class)
                .build();
    }
}

1.3、创建“MyUDF”类,实现自定义UDF逻辑

这里自定义的UDF函数实现大写字母转换成小写字母。代码如下:

package com.lansonjy.prestocode;

import com.facebook.presto.spi.function.Description;
import com.facebook.presto.spi.function.ScalarFunction;
import com.facebook.presto.spi.function.SqlType;
import com.facebook.presto.spi.type.StandardTypes;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;

//自定义UDF函数
public class MyUDF {
    //自定义UDF函数使用时的名称
    @ScalarFunction("myudf")
    //函数的描述
    @Description("转换字母大写为小写")
    //指定函数的返回类型,字符串类型必须返回Slice, 使用 Slices.utf8Slice 方法可以方便的将 String 类型转换成 Slice 类型
    @SqlType(StandardTypes.VARCHAR)
    public static Slice lowercase(@SqlType(StandardTypes.VARCHAR) Slice in)
    {
        String argument = in.toStringUtf8();
        return Slices.utf8Slice(argument.toLowerCase());
    }
}

1.4、创建“resources”资源目录

在resouces资源目录中创建“META-INF/services”多级目录,在目录中创建“com.facebook.presto.spi.Plugin”配置文件,Presto将会根据此配置文件找到对应的注册自定义函数类。在此文件中需要指定注册自定义函数的类:

com.lansonjy.prestocode.MyFunctionsPlugin

1.5、将项目打包,上传到Presto集群

将项目打包,上传到每台Presto节点的“$PRESTO_HOME/plugin/udf”目录下,默认udf目录没有,需要手动预先创建。所有Presto节点上传完成后,重启Presto集群。

1.6、使用自定义UDF函数

#登录Presto客户端
./presto --server node3:8080 --catalog mysql --schema presto_db

#查询所有函数
presto:presto_db> show functions;

#使用这个函数查询转换数据
presto:presto_db> select myudf('ABCDEF');
 _col0  
--------
 abcdef 
(1 row)

2、​​​​​​​UDAF函数

UDAF是自定义聚合函数,下面自定义一个UDAF实现计算平均数聚合函数功能,步骤如下:

2.1、在项目中创建“MyUDAF”类

package com.lansonjy.prestocode;

import com.facebook.presto.spi.block.BlockBuilder;
import com.facebook.presto.spi.function.*;
import com.facebook.presto.spi.type.DoubleType;
import com.facebook.presto.spi.type.StandardTypes;

//presto 自定义聚合函数实现-实现平均数计算
//自定义聚合函数使用时的名称
@AggregationFunction("myudaf")
//自定义聚合函数注释
@Description("我的自定义聚合函数,实现计算平均数")
public class MyUDAF {
    //输入数据注释
    @InputFunction
    public static void input(LongAndDoubleState state, @SqlType(StandardTypes.DOUBLE) double value) {
        //针对每条数据,执行 input 函数。这个过程是并行执行的,因此在每个有数据的节点都会执行,最终得到多个累积的状态数据。
        state.setLong(state.getLong() + 1);
        state.setDouble(state.getDouble() + value);
    }

    //聚合数据注释
    @CombineFunction
    public static void combine(LongAndDoubleState state, LongAndDoubleState otherState) {
        //将所有节点的状态数据聚合起来,多次执行,直至所有状态数据被聚合成一个最终状态,也就是 Aggregation 函数的输出结果。
        state.setLong(state.getLong() + otherState.getLong());
        state.setDouble(state.getDouble() + otherState.getDouble());
    }

    //输出数据注释
    @OutputFunction(StandardTypes.DOUBLE)
    public static void output(LongAndDoubleState state, BlockBuilder out) {
        //最终输出结果到一个 BlockBuilder。
        long count = state.getLong();
        if (count == 0) {
            out.appendNull();
        } else {
            double value = state.getDouble();
            DoubleType.DOUBLE.writeDouble(out, value / count);
        }
    }
}

以上类中涉及到了自定义类型LongAndDoubelState接口实现,此接口继承了AccumulatorState接口,对于简单的计算逻辑,只是获取设置值,那么可以定义简单接口来实现,里面只需要实现对应的get,set方法实现即可。对于复杂的计算逻辑需要自定义类实现接口,实现复杂的计算逻辑,代码如下:

package com.lansonjy.prestocode;

import com.facebook.presto.spi.function.AccumulatorState;

public interface LongAndDoubleState extends AccumulatorState {
    long getLong();

    void setLong(long value);

    double getDouble();

    void setDouble(double value);
}

2.2、在“MyFunctionPlugin”中注册UDAF

//Presto 注册自定义函数的类,此类需要继承Plugin接口
public class MyFunctionsPlugin implements Plugin {
    @Override
    public Set<Class<?>> getFunctions()
    {
        return ImmutableSet.<Class<?>>builder()
                //注册UDF,这里填写对应的UDF类
                .add(MyUDF.class)
                //注册UDAF,这里填写对应的UDAF 类
                .add(MyUDAF.class)
                .build();
    }
}

2.3、打包,上传到各个Presto

将项目打包,上传到每台Presto节点的“$PRESTO_HOME/plugin/udf”目录下,默认udf目录没有,需要手动预先创建。所有Presto节点上传完成后,重启Presto集群。

2.4、在presto中执行如下命令

#登录Presto客户端
[root@node3 presto-0.259]# ./presto --server node3:8080 --catalog mysql --schema presto_db

#查看函数
presto:presto_db> show functions;

#执行聚合查询
presto:presto_db> select pkg_name,myudaf(amount) as abc from machine_consume_detail group by pkg_name;

二、Presto JDBC连接

使用JDBC连接Presto需要在项目中导入以下依赖:

<dependency>
    <groupId>io.prestosql</groupId>
    <artifactId>presto-jdbc</artifactId>
    <version>312</version>
</dependency>

JDBC连接代码如下:

public class ReadDataFromPresto {
    public static void main(String[] args) throws ClassNotFoundException, SQLException {
        Connection conn = DriverManager.getConnection("jdbc:presto://node3:8080/mysql/presto_db","root",null);
        Statement stmt = conn.createStatement();
        ResultSet rs = stmt.executeQuery("select pkg_name,sum(amount) as total_amount from machine_consume_detail group by pkg_name");
        while (rs.next()) {
            String pkgName = rs.getString("pkg_name");
            double totalAmount = rs.getDouble("total_amount");
            System.out.println("pkgName = "+pkgName+",totalAmount="+totalAmount);
        }
        rs.close();
        conn.close();
    }
}

原文地址:https://cloud.tencent.com/developer/article/2176247

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


学习编程是顺着互联网的发展潮流,是一件好事。新手如何学习编程?其实不难,不过在学习编程之前你得先了解你的目的是什么?这个很重要,因为目的决定你的发展方向、决定你的发展速度。
IT行业是什么工作做什么?IT行业的工作有:产品策划类、页面设计类、前端与移动、开发与测试、营销推广类、数据运营类、运营维护类、游戏相关类等,根据不同的分类下面有细分了不同的岗位。
女生学Java好就业吗?女生适合学Java编程吗?目前有不少女生学习Java开发,但要结合自身的情况,先了解自己适不适合去学习Java,不要盲目的选择不适合自己的Java培训班进行学习。只要肯下功夫钻研,多看、多想、多练
Can’t connect to local MySQL server through socket \'/var/lib/mysql/mysql.sock问题 1.进入mysql路径
oracle基本命令 一、登录操作 1.管理员登录 # 管理员登录 sqlplus / as sysdba 2.普通用户登录
一、背景 因为项目中需要通北京网络,所以需要连vpn,但是服务器有时候会断掉,所以写个shell脚本每五分钟去判断是否连接,于是就有下面的shell脚本。
BETWEEN 操作符选取介于两个值之间的数据范围内的值。这些值可以是数值、文本或者日期。
假如你已经使用过苹果开发者中心上架app,你肯定知道在苹果开发者中心的web界面,无法直接提交ipa文件,而是需要使用第三方工具,将ipa文件上传到构建版本,开...
下面的 SQL 语句指定了两个别名,一个是 name 列的别名,一个是 country 列的别名。**提示:**如果列名称包含空格,要求使用双引号或方括号:
在使用H5混合开发的app打包后,需要将ipa文件上传到appstore进行发布,就需要去苹果开发者中心进行发布。​
+----+--------------+---------------------------+-------+---------+
数组的声明并不是声明一个个单独的变量,比如 number0、number1、...、number99,而是声明一个数组变量,比如 numbers,然后使用 nu...
第一步:到appuploader官网下载辅助工具和iCloud驱动,使用前面创建的AppID登录。
如需删除表中的列,请使用下面的语法(请注意,某些数据库系统不允许这种在数据库表中删除列的方式):
前不久在制作win11pe,制作了一版,1.26GB,太大了,不满意,想再裁剪下,发现这次dism mount正常,commit或discard巨慢,以前都很快...
赛门铁克各个版本概览:https://knowledge.broadcom.com/external/article?legacyId=tech163829
实测Python 3.6.6用pip 21.3.1,再高就报错了,Python 3.10.7用pip 22.3.1是可以的
Broadcom Corporation (博通公司,股票代号AVGO)是全球领先的有线和无线通信半导体公司。其产品实现向家庭、 办公室和移动环境以及在这些环境...
发现个问题,server2016上安装了c4d这些版本,低版本的正常显示窗格,但红色圈出的高版本c4d打开后不显示窗格,
TAT:https://cloud.tencent.com/document/product/1340