• Flink Table API & SQL


    示例:

    public static void main(String[] args) throws Exception {

        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        BatchTableEnvironment tableEnv = BatchTableEnvironment.getTableEnvironment(env);

        String filePath = "D:\\yly\\BaiduNetdiskDownload\\最全最新flink教程\\000.代码+环境\\00.flink-train-master\\flink-train\\data\\04\\sales.csv";

        DataSet csv = env.readCsvFile(filePath)

                .ignoreFirstLine().includeFields("1111").fieldDelimiter(",")

                .pojoType(Sales.class,"transactionId","customerId","itemId","amountPaid");

        //csv.print();

        Table sales = tableEnv.fromDataSet(csv);

        tableEnv.registerTable("sales", sales);

        Table resultTable = tableEnv.sqlQuery("select customerId, sum(amountPaid) money from sales group by customerId order by money");

        DataSet result = tableEnv.toDataSet(resultTable, Row.class);

        result.print();

    }

    public static class Sales{

        public String transactionId;

        public String customerId;

        public String itemId;

        public Double  amountPaid;

        @Override

        public String toString() {

            return "Sales{" +

                    "transactionId='" + transactionId + '\'' +

                    ", customerId='" + customerId + '\'' +

                    ", itemId='" + itemId + '\'' +

                    ", amountPaid=" + amountPaid +

                    '}';

        }

        public String getTransactionId() {

            return transactionId;

        }

        public void setTransactionId(String transactionId) {

            this.transactionId = transactionId;

        }

        public String getCustomerId() {

            return customerId;

        }

        public void setCustomerId(String customerId) {

            this.customerId = customerId;

        }

        public String getItemId() {

            return itemId;

        }

        public void setItemId(String itemId) {

            this.itemId = itemId;

        }

        public Double getAmountPaid() {

            return amountPaid;

        }

        public void setAmountPaid(Double amountPaid) {

            this.amountPaid = amountPaid;

        }

    }

  • 相关阅读:
    Java基础—普通阻塞队列
    Postman传参后台接收问题
    蛇形填空 I
    基于显扬科技自主研发3D机器视觉HY-X5在RJ45接插件缺陷检测的应用
    pg和oracle的区别
    EffiecientNetV2架构复现--CVPR2021
    C#/.NET/.NET Core拾遗补漏合集(24年6月更新)
    Linux 学习之路 -- 进程篇 -- 进程控制
    java毕业设计日租房管理系统源码+lw文档+mybatis+系统+mysql数据库+调试
    基于Spring Boot应用Apache CXF发布Web Services服务
  • 原文地址:https://blog.csdn.net/daliyuan350649623/article/details/124981521