示例:
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = BatchTableEnvironment.getTableEnvironment(env);
String filePath = "D:\\yly\\BaiduNetdiskDownload\\最全最新flink教程\\000.代码+环境\\00.flink-train-master\\flink-train\\data\\04\\sales.csv";
DataSet
.ignoreFirstLine().includeFields("1111").fieldDelimiter(",")
.pojoType(Sales.class,"transactionId","customerId","itemId","amountPaid");
//csv.print();
Table sales = tableEnv.fromDataSet(csv);
tableEnv.registerTable("sales", sales);
Table resultTable = tableEnv.sqlQuery("select customerId, sum(amountPaid) money from sales group by customerId order by money");
DataSet
result.print();
}
public static class Sales{
public String transactionId;
public String customerId;
public String itemId;
public Double amountPaid;
@Override
public String toString() {
return "Sales{" +
"transactionId='" + transactionId + '\'' +
", customerId='" + customerId + '\'' +
", itemId='" + itemId + '\'' +
", amountPaid=" + amountPaid +
'}';
}
public String getTransactionId() {
return transactionId;
}
public void setTransactionId(String transactionId) {
this.transactionId = transactionId;
}
public String getCustomerId() {
return customerId;
}
public void setCustomerId(String customerId) {
this.customerId = customerId;
}
public String getItemId() {
return itemId;
}
public void setItemId(String itemId) {
this.itemId = itemId;
}
public Double getAmountPaid() {
return amountPaid;
}
public void setAmountPaid(Double amountPaid) {
this.amountPaid = amountPaid;
}
}