• Flink Table API & SQL


    示例:

    public static void main(String[] args) throws Exception {

        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        BatchTableEnvironment tableEnv = BatchTableEnvironment.getTableEnvironment(env);

        String filePath = "D:\\yly\\BaiduNetdiskDownload\\最全最新flink教程\\000.代码+环境\\00.flink-train-master\\flink-train\\data\\04\\sales.csv";

        DataSet csv = env.readCsvFile(filePath)

                .ignoreFirstLine().includeFields("1111").fieldDelimiter(",")

                .pojoType(Sales.class,"transactionId","customerId","itemId","amountPaid");

        //csv.print();

        Table sales = tableEnv.fromDataSet(csv);

        tableEnv.registerTable("sales", sales);

        Table resultTable = tableEnv.sqlQuery("select customerId, sum(amountPaid) money from sales group by customerId order by money");

        DataSet result = tableEnv.toDataSet(resultTable, Row.class);

        result.print();

    }

    public static class Sales{

        public String transactionId;

        public String customerId;

        public String itemId;

        public Double  amountPaid;

        @Override

        public String toString() {

            return "Sales{" +

                    "transactionId='" + transactionId + '\'' +

                    ", customerId='" + customerId + '\'' +

                    ", itemId='" + itemId + '\'' +

                    ", amountPaid=" + amountPaid +

                    '}';

        }

        public String getTransactionId() {

            return transactionId;

        }

        public void setTransactionId(String transactionId) {

            this.transactionId = transactionId;

        }

        public String getCustomerId() {

            return customerId;

        }

        public void setCustomerId(String customerId) {

            this.customerId = customerId;

        }

        public String getItemId() {

            return itemId;

        }

        public void setItemId(String itemId) {

            this.itemId = itemId;

        }

        public Double getAmountPaid() {

            return amountPaid;

        }

        public void setAmountPaid(Double amountPaid) {

            this.amountPaid = amountPaid;

        }

    }

  • 相关阅读:
    编程练习【重复至少 K 次且长度为 M 的模式】
    线程安全和线程安全的解决方案
    混合牛奶 | 贪心算法 (USACO练习题)
    RUST 和 GO 如何管理它们的内存
    (附源码)php柘城县农产品销售网站 毕业设计 020832
    oracle:让is null使用索引
    SSH 免密登录:普通用户免密配置登录仍需输入密码
    需求管理-架构真题(三十四)
    轻松与任何 SQL 数据库集成:Directus 助你无代码开发 | 开源日报 No.69
    Monkey压力测试
  • 原文地址:https://blog.csdn.net/daliyuan350649623/article/details/124981521