Flink Sql Redis Connector

2024-06-20 21:12

本文主要是介绍Flink Sql Redis Connector,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

经常做开发的小伙伴肯定知道用flink连接redis的时候比较麻烦,更麻烦的是解析redis数据,如果rdis可以普通数据库那样用flink sql连接并且数据可以像表格那样展示出来就会非常方便。

历时多天,我终于把flink sql redis connector写出来了,并且已经测试过可以用sql解析数据,下面直接展示写好的代码和执行结果,完整的代码可以在我的github上面看:https://github.com/niuhu3/flink_sql_redis_connector.git

目前该connector已提交给flink,详见:[FLINK-35588] flink sql redis connector - ASF JIRA (apache.org)

希望大家可以帮忙点个fork和stars,后面会持续更新这个连接器,欢迎大家试用,试用的时候遇到什么问题也可以给我反馈,或者在社区反馈,有什么好的想法也可以联系我哦。

1.使用案例和讲解

1.读取数据案例

CREATE TABLE orders (`order_id` STRING,`price` STRING,`order_time` STRING,PRIMARY KEY(order_id) NOT ENFORCED
) WITH ('connector' = 'redis','mode' = 'single','single.host' = '192.168.10.101','single.port' = '6379','password' = 'xxxxxx','command' = 'hgetall','key' = 'orders'
);select * from orders

注:redis表必须定义主键,可以是单个主键,也可以是联合主键

以下为sql读取结果,直接将redis数据解析成我们需要的表格形式

2.写入数据案例

1. generate source data
CREATE TABLE order_source (`order_number` BIGINT,`price` DECIMAL(32,2),`order_time` TIMESTAMP(3),PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
'connector' = 'datagen',
'number-of-rows' = '5',
'fields.order_number.min' = '1',
'fields.order_number.max' = '20',
'fields.price.min' = '1001',
'fields.price.max' = '1100'
);2. define redis sink table CREATE TABLE orders (`order_number` STRING,`price` STRING,`order_time` STRING,PRIMARY KEY(order_id) NOT ENFORCED
) WITH ('connector' = 'redis','mode' = 'single','single.host' = '192.168.10.101','single.port' = '6379','password' = 'xxxxxx','command' = 'hmset','key' = 'orders'
);3. insert data to redis sink table (cast data type to string)insert into redis_sinkselectcast(order_number as STRING) order_number,cast(price as STRING) price,cast(order_time as STRING) order_timefrom orders

redis表不会保存数据类型,所以在写入redis之前需要转成字符串类型,以下为写入redis数据的结果,redis的主键用 key + primary key + value 拼接而成,保证每条数据的唯一性,所以这也就要为什么redis table要定义主键

3.目前支持的功能 

1. 该connector目前支持多个写入和读取命令:

        读取:   get    hget     hgetall     hscan   lrange    smembers    zrange

        写入:   set   hset      hmset      lpush    rpush     sadd

2.针对最常用的hash类型数据支持模糊匹配,只输入表名可以查询整张表数据   

4. 连接参数说明

OptionRequiredDefaultTypeDescription
connectorrequirednoStringconnector name
moderequirednoStringredis cluster mode (single or cluster)
single.hostoptionalnoStringredis single mode machine host
single.portoptionalnointredis single mode running port
passwordoptionalnoStringredis database password
commandrequirednoStringredis write data or read data command
keyrequirednoStringredis key
expireoptionalnoIntset key ttl
fieldoptionalnoStringget a value with field when using hget command
cursoroptionalnoIntusing hscan command(e.g:1,2)
startoptional0Intread data when using lrange command
endoptional10Intread data when using lrange command
connection.max.wait-millsoptionalnoIntredis connection parameter
connection.timeout-msoptionalnoIntredis connection parameter
connection.max-totaloptionalnoIntredis connection parameter
connection.max-idleoptionalnoIntredis connection parameter
connection.test-on-borrowoptionalnoBooleanredis connection parameter
connection.test-on-returnoptionalnoBooleanredis connection parameter
connection.test-while-idleoptionalnoBooleanredis connection parameter
so.timeout-msoptionalnoIntredis connection parameter
max.attemptsoptionalnoIntredis connection parameter

2.动态读取和写入的工厂类

import org.apache.flink.common.RedisOptions;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.sink.RedisDynamicTableSink;
import org.apache.flink.source.RedisDynamicTableSource;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;public class RedisSourceSinkFactory implements DynamicTableSinkFactory, DynamicTableSourceFactory {private ReadableConfig options;public RedisSourceSinkFactory(){}public RedisSourceSinkFactory(ReadableConfig options){this.options = options;}//DynamicTableSourceFactory的实现方法,要用flink sql 读取数据需要实现这个接口@Overridepublic DynamicTableSource createDynamicTableSource(Context context) {FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);helper.validate();options = helper.getOptions();ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();List<Column> columns = schema.getColumns();ArrayList<String> columnNames = new ArrayList<>();columns.forEach(column -> columnNames.add(column.getName()));List<String> primaryKey = schema.getPrimaryKey().get().getColumns();return new RedisDynamicTableSource(options,columnNames,primaryKey);}/DynamicTableSinkFactory的实现方法,要用flink sql往redis中写数据这个也必须要实现@Overridepublic DynamicTableSink createDynamicTableSink(Context context) {FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);helper.validate();ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();List<Column> columns = schema.getColumns();ArrayList<String> columnNames = new ArrayList<>();columns.forEach(column -> columnNames.add(column.getName()));List<String> primaryKey = schema.getPrimaryKey().get().getColumns();ReadableConfig options = helper.getOptions();return new RedisDynamicTableSink(options,columnNames,primaryKey);}@Overridepublic String factoryIdentifier() {return "redis";}//sql connector 必填项@Overridepublic Set<ConfigOption<?>> requiredOptions() {HashSet<ConfigOption<?>> options = new HashSet<>();options.add(RedisOptions.PASSWORD);options.add(RedisOptions.KEY);options.add(RedisOptions.MODE);return options;}//sql connector 选填项@Overridepublic Set<ConfigOption<?>> optionalOptions() {HashSet<ConfigOption<?>> options = new HashSet<>();options.add(RedisOptions.SINGLE_HOST);options.add(RedisOptions.SINGLE_PORT);options.add(RedisOptions.CLUSTER_NODES);options.add(RedisOptions.FIELD);options.add(RedisOptions.CURSOR);options.add(RedisOptions.EXPIRE);options.add(RedisOptions.COMMAND);options.add(RedisOptions.START);options.add(RedisOptions.END);options.add(RedisOptions.CONNECTION_MAX_TOTAL);options.add(RedisOptions.CONNECTION_MAX_IDLE);options.add(RedisOptions.CONNECTION_TEST_WHILE_IDLE);options.add(RedisOptions.CONNECTION_TEST_ON_BORROW);options.add(RedisOptions.CONNECTION_TEST_ON_RETURN);options.add(RedisOptions.CONNECTION_TIMEOUT_MS);options.add(RedisOptions.TTL_SEC);options.add(RedisOptions.LOOKUP_ADDITIONAL_KEY);options.add(RedisOptions.LOOKUP_CACHE_MAX_ROWS);options.add(RedisOptions.LOOKUP_CACHE_TTL_SEC);return options;}

3. Redis Source 读取类

import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceFunctionProvider;
import org.apache.flink.util.Preconditions;import java.util.List;public class RedisDynamicTableSource implements ScanTableSource {private ReadableConfig options;private List<String> primaryKey;private List<String> columns;public RedisDynamicTableSource(ReadableConfig options, List<String> columns, List<String> primaryKey) {this.options = Preconditions.checkNotNull(options);this.columns = Preconditions.checkNotNull(columns);this.primaryKey = Preconditions.checkNotNull(primaryKey);}@Overridepublic DynamicTableSource copy() {return new RedisDynamicTableSource(this.options, this.columns, this.primaryKey);}@Overridepublic String asSummaryString() {return "redis table source";}@Overridepublic ChangelogMode getChangelogMode() {return ChangelogMode.all();}@Overridepublic ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {RedisSourceFunction redisSourceFunction = new RedisSourceFunction(this.options, this.columns, this.primaryKey);return SourceFunctionProvider.of(redisSourceFunction,false);}
}

支持redis string, set ,zset ,hash数据的读取并解析成rowdata传入 flink

import org.apache.flink.common.RedisClusterMode;
import org.apache.flink.common.RedisCommandOptions;
import org.apache.flink.common.RedisOptions;
import org.apache.flink.common.RedisSplitSymbol;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryStringData;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.RedisUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.ScanResult;import java.util.*;public class RedisSourceFunction extends RichSourceFunction<RowData>{private static final Logger LOG = LoggerFactory.getLogger(RedisSourceFunction.class);private ReadableConfig options;private List<String> primaryKey;private List<String> columns;private Jedis jedis;private JedisCluster jedisCluster;private String value;private String field;private String[] fields;private String cursor;private Integer start;private Integer end;private String[] keySplit;private static int position = 1;private GenericRowData rowData;public RedisSourceFunction(ReadableConfig options, List<String> columns, List<String> primaryKey){this.options = Preconditions.checkNotNull(options);this.columns = Preconditions.checkNotNull(columns);this.primaryKey = Preconditions.checkNotNull(primaryKey);}@Overridepublic void run(SourceContext<RowData> ctx) throws Exception {String password = options.get(RedisOptions.PASSWORD);Preconditions.checkNotNull(password,"password is null,please set value for password");Integer expire = options.get(RedisOptions.EXPIRE);String key = options.get(RedisOptions.KEY);Preconditions.checkNotNull(key,"key is null,please set value for key");String[] keyArr = key.split(RedisSplitSymbol.CLUSTER_NODES_SPLIT);String command = options.get(RedisOptions.COMMAND);// judge if command is redis set data command and stop methodList<String> sourceCommand = Arrays.asList(RedisCommandOptions.SET, RedisCommandOptions.HSET, RedisCommandOptions.HMSET, RedisCommandOptions.LPUSH,RedisCommandOptions.RPUSH, RedisCommandOptions.SADD);if(sourceCommand.contains(command.toUpperCase())){ return;}Preconditions.checkNotNull(command,"command is null,please set value for command");String mode = options.get(RedisOptions.MODE);Preconditions.checkNotNull(command,"mode is null,please set value for mode");Integer maxIdle = options.get(RedisOptions.CONNECTION_MAX_IDLE);Integer maxTotal = options.get(RedisOptions.CONNECTION_MAX_TOTAL);Integer maxWaitMills = options.get(RedisOptions.CONNECTION_MAX_WAIT_MILLS);Boolean testOnBorrow = options.get(RedisOptions.CONNECTION_TEST_ON_BORROW);Boolean testOnReturn = options.get(RedisOptions.CONNECTION_TEST_ON_RETURN);Boolean testWhileIdle = options.get(RedisOptions.CONNECTION_TEST_WHILE_IDLE);if(mode.toUpperCase().equals(RedisClusterMode.SINGLE.name())){String host = options.get(RedisOptions.SINGLE_HOST);Integer port = options.get(RedisOptions.SINGLE_PORT);JedisPool jedisPool = RedisUtil.getSingleJedisPool(mode, host, port, maxTotal,maxIdle, maxWaitMills, testOnBorrow, testOnReturn, testWhileIdle);jedis = jedisPool.getResource();jedis.auth(password);switch (command.toUpperCase()){case RedisCommandOptions.GET:value = jedis.get(key);rowData = new GenericRowData(2);rowData.setField(0,BinaryStringData.fromString(key));rowData.setField(1,BinaryStringData.fromString(value));break;case RedisCommandOptions.HGET:field = options.get(RedisOptions.FIELD);value = jedis.hget(key, field);rowData = new GenericRowData(3);keySplit = key.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);for (int i = 0; i < primaryKey.size(); i++) {rowData.setField(i,BinaryStringData.fromString(keyArr[2 * primaryKey.size()]));}rowData.setField(primaryKey.size(),BinaryStringData.fromString(value));break;case RedisCommandOptions.HGETALL:if (keyArr.length > 1){for (String str : keyArr) {rowData = new GenericRowData(columns.size());keySplit = str.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);for (int i = 0; i < primaryKey.size(); i++) {rowData.setField(i,BinaryStringData.fromString(keySplit[2 * primaryKey.size()]));}for (int i = primaryKey.size(); i < columns.size(); i++) {String value = jedis.hget(str, columns.get(i));rowData.setField(i,BinaryStringData.fromString(value));}ctx.collect(rowData);}}else if(key.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT).length == (primaryKey.size() * 2 + 1)){rowData = new GenericRowData(columns.size());keySplit = key.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);for (int i = 0; i < primaryKey.size(); i++) {rowData.setField(i,BinaryStringData.fromString(keySplit[2 * primaryKey.size()]));}for (int i = primaryKey.size(); i < columns.size(); i++) {String value = jedis.hget(key, columns.get(i));rowData.setField(i,BinaryStringData.fromString(value));}ctx.collect(rowData);}else{//Fuzzy matching ,gets the data of the entire tableString fuzzyKey = new StringBuffer(key).append("*").toString();Set<String> keys = jedis.keys(fuzzyKey);for (String keyStr : keys) {keySplit = keyStr.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);rowData = new GenericRowData(columns.size());for (int i = 0; i < primaryKey.size(); i++) {rowData.setField(i,BinaryStringData.fromString(keySplit[2 * primaryKey.size()]));}for (int i = primaryKey.size(); i < columns.size(); i++) {String value = jedis.hget(keyStr, columns.get(i));rowData.setField(i,BinaryStringData.fromString(value));}ctx.collect(rowData);}}break;case RedisCommandOptions.HSCAN:cursor = options.get(RedisOptions.CURSOR);ScanResult<Map.Entry<String, String>> entries = jedis.hscan(key, cursor);List<Map.Entry<String, String>> result = entries.getResult();keySplit = key.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);rowData = new GenericRowData(columns.size());for (int i = 0; i < primaryKey.size(); i++) {rowData.setField(i,BinaryStringData.fromString(keySplit[2 * primaryKey.size()]));}position = primaryKey.size();for (int i = 0; i < result.size(); i++) {value = result.get(i).getValue();rowData.setField(position,BinaryStringData.fromString(value));position++;}break;case RedisCommandOptions.LRANGE:start = options.get(RedisOptions.START);end = options.get(RedisOptions.END);List<String> list = jedis.lrange(key, start, end);rowData = new GenericRowData(list.size() +1);rowData.setField(0,BinaryStringData.fromString(key));list.forEach(s -> {rowData.setField(position,BinaryStringData.fromString(s));position++;});break;case RedisCommandOptions.SMEMBERS:Set<String> smembers = jedis.smembers(key);rowData = new GenericRowData(smembers.size() +1);rowData.setField(0,BinaryStringData.fromString(key));smembers.forEach(s -> {rowData.setField(position,BinaryStringData.fromString(s));position++;});break;case RedisCommandOptions.ZRANGE:start = options.get(RedisOptions.START);end = options.get(RedisOptions.END);Set<String> sets = jedis.zrange(key, start, end);rowData = new GenericRowData(sets.size() +1);rowData.setField(0,BinaryStringData.fromString(key));sets.forEach(s -> {rowData.setField(position,BinaryStringData.fromString(s));position++;});break;default:LOG.error("Cannot process such data type: {}", command);break;}if(!command.toUpperCase().equals(RedisCommandOptions.HGETALL)){ctx.collect(rowData);}}else if(mode.toUpperCase().equals(RedisClusterMode.CLUSTER.name())){String nodes = options.get(RedisOptions.CLUSTER_NODES);String[] hostAndPorts = nodes.split(RedisSplitSymbol.CLUSTER_NODES_SPLIT);String[] host = new String[hostAndPorts.length];int[] port = new int[hostAndPorts.length];for (int i = 0; i < hostAndPorts.length; i++) {String[] splits = hostAndPorts[i].split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);host[i] = splits[0];port[i] = Integer.parseInt(splits[1]);}Integer connTimeOut = options.get(RedisOptions.CONNECTION_TIMEOUT_MS);Integer soTimeOut = options.get(RedisOptions.SO_TIMEOUT_MS);Integer maxAttempts = options.get(RedisOptions.MAX_ATTEMPTS);jedisCluster = RedisUtil.getJedisCluster(mode, host, password, port, maxTotal,maxIdle, maxWaitMills, connTimeOut, soTimeOut, maxAttempts, testOnBorrow, testOnReturn, testWhileIdle);switch (command.toUpperCase()){case RedisCommandOptions.GET:value = jedisCluster.get(key);rowData = new GenericRowData(2);rowData.setField(0,BinaryStringData.fromString(key));rowData.setField(1,BinaryStringData.fromString(value));break;case RedisCommandOptions.HGET:field = options.get(RedisOptions.FIELD);value = jedisCluster.hget(key, field);rowData = new GenericRowData(3);keySplit = key.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);for (int i = 0; i < primaryKey.size(); i++) {rowData.setField(i,BinaryStringData.fromString(keyArr[2 * primaryKey.size()]));}rowData.setField(primaryKey.size(),BinaryStringData.fromString(value));break;case RedisCommandOptions.HGETALL:if (keyArr.length > 1){for (String str : keyArr) {rowData = new GenericRowData(columns.size());keySplit = str.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);for (int i = 0; i < primaryKey.size(); i++) {rowData.setField(i,BinaryStringData.fromString(keySplit[2 * primaryKey.size()]));}for (int i = primaryKey.size(); i < columns.size(); i++) {String value = jedisCluster.hget(str, columns.get(i));rowData.setField(i,BinaryStringData.fromString(value));}ctx.collect(rowData);}}else if(key.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT).length == (primaryKey.size() * 2 + 1)){rowData = new GenericRowData(columns.size());keySplit = key.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);for (int i = 0; i < primaryKey.size(); i++) {rowData.setField(i,BinaryStringData.fromString(keySplit[2 * primaryKey.size()]));}for (int i = primaryKey.size(); i < columns.size(); i++) {String value = jedisCluster.hget(key, columns.get(i));rowData.setField(i,BinaryStringData.fromString(value));}ctx.collect(rowData);}else{//Fuzzy matching ,gets the data of the entire tableString fuzzyKey = new StringBuffer(key).append("*").toString();Set<String> keys = jedisCluster.keys(fuzzyKey);for (String keyStr : keys) {keySplit = keyStr.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);rowData = new GenericRowData(columns.size());for (int i = 0; i < primaryKey.size(); i++) {rowData.setField(i,BinaryStringData.fromString(keySplit[2 * primaryKey.size()]));}for (int i = primaryKey.size(); i < columns.size(); i++) {String value = jedisCluster.hget(keyStr, columns.get(i));rowData.setField(i,BinaryStringData.fromString(value));}ctx.collect(rowData);}}break;case RedisCommandOptions.HSCAN:cursor = options.get(RedisOptions.CURSOR);ScanResult<Map.Entry<String, String>> entries = jedisCluster.hscan(key, cursor);List<Map.Entry<String, String>> result = entries.getResult();keySplit = key.split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);rowData = new GenericRowData(columns.size());for (int i = 0; i < primaryKey.size(); i++) {rowData.setField(i,BinaryStringData.fromString(keySplit[2 * primaryKey.size()]));}position = primaryKey.size();for (int i = 0; i < result.size(); i++) {value = result.get(i).getValue();rowData.setField(position,BinaryStringData.fromString(value));position++;}break;case RedisCommandOptions.LRANGE:start = options.get(RedisOptions.START);end = options.get(RedisOptions.END);List<String> list = jedisCluster.lrange(key, start, end);rowData = new GenericRowData(list.size() +1);rowData.setField(0,BinaryStringData.fromString(key));list.forEach(s -> {rowData.setField(position,BinaryStringData.fromString(s));position++;});break;case RedisCommandOptions.SMEMBERS:Set<String> smembers = jedisCluster.smembers(key);rowData = new GenericRowData(smembers.size() +1);rowData.setField(0,BinaryStringData.fromString(key));smembers.forEach(s -> {rowData.setField(position,BinaryStringData.fromString(s));position++;});break;case RedisCommandOptions.ZRANGE:start = options.get(RedisOptions.START);end = options.get(RedisOptions.END);Set<String> sets = jedisCluster.zrange(key, start, end);rowData = new GenericRowData(sets.size() +1);rowData.setField(0,BinaryStringData.fromString(key));sets.forEach(s -> {rowData.setField(position,BinaryStringData.fromString(s));position++;});break;default:LOG.error("Cannot process such data type: {}", command);break;}if(!command.toUpperCase().equals(RedisCommandOptions.HGETALL)){ctx.collect(rowData);}}else{LOG.error("Unsupport such {} mode",mode);}}@Overridepublic void cancel() {if(jedis != null){jedis.close();}if(jedisCluster != null){jedisCluster.close();}}
}

4. Redis sink 写入类

public class RedisDynamicTableSink implements DynamicTableSink {private static final long serialVersionUID = 1L;private static final Logger LOG = LoggerFactory.getLogger(RedisDynamicTableSink.class);private ReadableConfig options;private List<String> primaryKey;private List<String> columns;public RedisDynamicTableSink(ReadableConfig options, List<String> columns, List<String> primaryKey) {this.options = Preconditions.checkNotNull(options);this.columns = Preconditions.checkNotNull(columns);this.primaryKey = Preconditions.checkNotNull(primaryKey);}@Overridepublic ChangelogMode getChangelogMode(ChangelogMode changelogMode) {return ChangelogMode.newBuilder().addContainedKind(RowKind.INSERT).addContainedKind(RowKind.DELETE).addContainedKind(RowKind.UPDATE_BEFORE).addContainedKind(RowKind.UPDATE_AFTER).build();}@Overridepublic SinkRuntimeProvider getSinkRuntimeProvider(Context context) {RedisSinkFunction myRedisSinkFunction = new RedisSinkFunction(this.options,this.columns,this.primaryKey);return SinkFunctionProvider.of(myRedisSinkFunction);}@Overridepublic DynamicTableSink copy() {return new RedisDynamicTableSink(this.options,this.columns,this.primaryKey);}@Overridepublic String asSummaryString() {return "redis table sink";}
}
package org.apache.flink.sink;import org.apache.flink.common.RedisClusterMode;
import org.apache.flink.common.RedisCommandOptions;
import org.apache.flink.common.RedisOptions;
import org.apache.flink.common.RedisSplitSymbol;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.RedisUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPool;import java.util.List;public class RedisSinkFunction extends RichSinkFunction<RowData>{private static final long serialVersionUID = 1L;private static final Logger LOG = LoggerFactory.getLogger(RedisSinkFunction.class);private ReadableConfig options;private List<String> primaryKey;private List<String> columns;private String fields;private Jedis jedis;private JedisCluster jedisCluster;private String[] fieldsArr;private StringBuffer redisTableKey;private String value;public RedisSinkFunction(ReadableConfig options, List<String> columns, List<String> primaryKey){this.options = Preconditions.checkNotNull(options);this.columns = Preconditions.checkNotNull(columns);this.primaryKey = Preconditions.checkNotNull(primaryKey);}@Overridepublic void invoke(RowData rowData, Context context) throws Exception {String password = options.get(RedisOptions.PASSWORD);Preconditions.checkNotNull(password,"password is null,please set value for password");Integer expire = options.get(RedisOptions.EXPIRE);String key = options.get(RedisOptions.KEY);Preconditions.checkNotNull(key,"key is null,please set value for key");String command = options.get(RedisOptions.COMMAND);Preconditions.checkNotNull(command,"command is null,please set value for command");String mode = options.get(RedisOptions.MODE);Preconditions.checkNotNull(command,"mode is null,please set value for mode");Integer maxIdle = options.get(RedisOptions.CONNECTION_MAX_IDLE);Integer maxTotal = options.get(RedisOptions.CONNECTION_MAX_TOTAL);Integer maxWaitMills = options.get(RedisOptions.CONNECTION_MAX_WAIT_MILLS);Boolean testOnBorrow = options.get(RedisOptions.CONNECTION_TEST_ON_BORROW);Boolean testOnReturn = options.get(RedisOptions.CONNECTION_TEST_ON_RETURN);Boolean testWhileIdle = options.get(RedisOptions.CONNECTION_TEST_WHILE_IDLE);if (mode.toUpperCase().equals(RedisClusterMode.SINGLE.name())) {String host = options.get(RedisOptions.SINGLE_HOST);Integer port = options.get(RedisOptions.SINGLE_PORT);JedisPool jedisPool = RedisUtil.getSingleJedisPool(mode, host, port, maxTotal,maxIdle, maxWaitMills, testOnBorrow, testOnReturn, testWhileIdle);jedis = jedisPool.getResource();jedis.auth(password);switch (command.toUpperCase()){case RedisCommandOptions.SET:value = rowData.getString(0).toString();jedis.set(String.valueOf(key),String.valueOf(value));break;case RedisCommandOptions.HSET:String field = columns.get(1);//construct redis key:table_name:primary key col name: primary key valueredisTableKey = new StringBuffer(key).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);for (int i = 0; i < primaryKey.size(); i++) {if(primaryKey.size() <= 1){redisTableKey.append(primaryKey.get(i)).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);redisTableKey.append(rowData.getString(i).toString());break;}else{redisTableKey.append(primaryKey.get(i)).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);redisTableKey.append(rowData.getString(i).toString());}redisTableKey.append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);}value = rowData.getString(1).toString();jedis.hset(String.valueOf(redisTableKey),String.valueOf(field),String.valueOf(value));case RedisCommandOptions.HMSET://construct redis key:table_name:primary key col name: primary key valueredisTableKey = new StringBuffer(key).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);for (int i = 0; i < primaryKey.size(); i++) {if(primaryKey.size() <= 1){redisTableKey.append(primaryKey.get(i)).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);redisTableKey.append(rowData.getString(i).toString());break;}else{redisTableKey.append(primaryKey.get(i)).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);redisTableKey.append(rowData.getString(i).toString());}if (i != primaryKey.size() -1){redisTableKey.append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);}}for (int i = 1; i < columns.size(); i++) {if (!primaryKey.contains(columns.get(i))){value = rowData.getString(i).toString();jedis.hset(String.valueOf(redisTableKey),String.valueOf(columns.get(i)),String.valueOf(value));}}break;case RedisCommandOptions.LPUSH:value = rowData.getString(0).toString();jedis.lpush(key,value);break;case RedisCommandOptions.RPUSH:value = rowData.getString(0).toString();jedis.rpush(key,value);break;case RedisCommandOptions.SADD:value = rowData.getString(0).toString();jedis.sadd(key,value);break;default:LOG.error("Cannot process such data type: {}", command);break;}}else if(mode.toUpperCase().equals(RedisClusterMode.CLUSTER.name())){String nodes = options.get(RedisOptions.CLUSTER_NODES);String[] hostAndPorts = nodes.split(RedisSplitSymbol.CLUSTER_NODES_SPLIT);String[] host = new String[hostAndPorts.length];int[] port = new int[hostAndPorts.length];for (int i = 0; i < hostAndPorts.length; i++) {String[] splits = hostAndPorts[i].split(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);host[i] = splits[0];port[i] = Integer.parseInt(splits[1]);}Integer connTimeOut = options.get(RedisOptions.CONNECTION_TIMEOUT_MS);Integer soTimeOut = options.get(RedisOptions.SO_TIMEOUT_MS);Integer maxAttempts = options.get(RedisOptions.MAX_ATTEMPTS);jedisCluster = RedisUtil.getJedisCluster(mode, host, password, port, maxTotal,maxIdle, maxWaitMills, connTimeOut, soTimeOut, maxAttempts, testOnBorrow, testOnReturn, testWhileIdle);switch (command.toUpperCase()){case RedisCommandOptions.SET:value = rowData.getString(0).toString();jedisCluster.set(String.valueOf(key),String.valueOf(value));break;case RedisCommandOptions.HSET:String field = columns.get(1);//construct redis key:table_name:primary key col name: primary key valueredisTableKey = new StringBuffer(key).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);for (int i = 0; i < primaryKey.size(); i++) {if(primaryKey.size() <= 1){redisTableKey.append(primaryKey.get(i)).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);redisTableKey.append(rowData.getString(i).toString());break;}else{redisTableKey.append(primaryKey.get(i)).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);redisTableKey.append(rowData.getString(i).toString());}redisTableKey.append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);}value = rowData.getString(1).toString();jedisCluster.hset(String.valueOf(redisTableKey),String.valueOf(field),String.valueOf(value));case RedisCommandOptions.HMSET://construct redis key:table_name:primary key col name: primary key valueredisTableKey = new StringBuffer(key).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);for (int i = 0; i < primaryKey.size(); i++) {if(primaryKey.size() <= 1){redisTableKey.append(primaryKey.get(i)).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);redisTableKey.append(rowData.getString(i).toString());break;}else{redisTableKey.append(primaryKey.get(i)).append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);redisTableKey.append(rowData.getString(i).toString());}redisTableKey.append(RedisSplitSymbol.CLUSTER_HOST_PORT_SPLIT);}for (int i = 1; i < columns.size(); i++) {value = rowData.getString(i).toString();jedisCluster.hset(String.valueOf(redisTableKey),String.valueOf(columns.get(i)),String.valueOf(value));}break;case RedisCommandOptions.LPUSH:value = rowData.getString(0).toString();jedisCluster.lpush(key,value);break;case RedisCommandOptions.RPUSH:value = rowData.getString(0).toString();jedisCluster.rpush(key,value);break;case RedisCommandOptions.SADD:value = rowData.getString(0).toString();jedisCluster.sadd(key,value);break;default:LOG.error("Cannot process such data type: {}", command);break;}}else{LOG.error("Unsupport such {} mode",mode);}}@Overridepublic void close() throws Exception {if(jedis != null){jedis.close();}if(jedisCluster != null){jedisCluster.close();}}
}

对以上代码不理解为啥这样写的,可以参考我的上一篇帖子:

Flink Sql-用户自定义 Sources & Sinks_source表和sink表-CSDN博客

 最后再次希望大家可以去github或者社区支持一下,让这个连接器可以正式开源

这篇关于Flink Sql Redis Connector的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1079265

相关文章

MySQL常用字符串函数示例和场景介绍

《MySQL常用字符串函数示例和场景介绍》MySQL提供了丰富的字符串函数帮助我们高效地对字符串进行处理、转换和分析,本文我将全面且深入地介绍MySQL常用的字符串函数,并结合具体示例和场景,帮你熟练... 目录一、字符串函数概述1.1 字符串函数的作用1.2 字符串函数分类二、字符串长度与统计函数2.1

Redis客户端连接机制的实现方案

《Redis客户端连接机制的实现方案》本文主要介绍了Redis客户端连接机制的实现方案,包括事件驱动模型、非阻塞I/O处理、连接池应用及配置优化,具有一定的参考价值,感兴趣的可以了解一下... 目录1. Redis连接模型概述2. 连接建立过程详解2.1 连php接初始化流程2.2 关键配置参数3. 最大连

SQL Server跟踪自动统计信息更新实战指南

《SQLServer跟踪自动统计信息更新实战指南》本文详解SQLServer自动统计信息更新的跟踪方法,推荐使用扩展事件实时捕获更新操作及详细信息,同时结合系统视图快速检查统计信息状态,重点强调修... 目录SQL Server 如何跟踪自动统计信息更新:深入解析与实战指南 核心跟踪方法1️⃣ 利用系统目录

MySQL 内存使用率常用分析语句

《MySQL内存使用率常用分析语句》用户整理了MySQL内存占用过高的分析方法,涵盖操作系统层确认及数据库层bufferpool、内存模块差值、线程状态、performance_schema性能数据... 目录一、 OS层二、 DB层1. 全局情况2. 内存占js用详情最近连续遇到mysql内存占用过高导致

Mysql中设计数据表的过程解析

《Mysql中设计数据表的过程解析》数据库约束通过NOTNULL、UNIQUE、DEFAULT、主键和外键等规则保障数据完整性,自动校验数据,减少人工错误,提升数据一致性和业务逻辑严谨性,本文介绍My... 目录1.引言2.NOT NULL——制定某列不可以存储NULL值2.UNIQUE——保证某一列的每一

解密SQL查询语句执行的过程

《解密SQL查询语句执行的过程》文章讲解了SQL语句的执行流程,涵盖解析、优化、执行三个核心阶段,并介绍执行计划查看方法EXPLAIN,同时提出性能优化技巧如合理使用索引、避免SELECT*、JOIN... 目录1. SQL语句的基本结构2. SQL语句的执行过程3. SQL语句的执行计划4. 常见的性能优

SQL Server 中的 WITH (NOLOCK) 示例详解

《SQLServer中的WITH(NOLOCK)示例详解》SQLServer中的WITH(NOLOCK)是一种表提示,等同于READUNCOMMITTED隔离级别,允许查询在不获取共享锁的情... 目录SQL Server 中的 WITH (NOLOCK) 详解一、WITH (NOLOCK) 的本质二、工作

MySQL 强制使用特定索引的操作

《MySQL强制使用特定索引的操作》MySQL可通过FORCEINDEX、USEINDEX等语法强制查询使用特定索引,但优化器可能不采纳,需结合EXPLAIN分析执行计划,避免性能下降,注意版本差异... 目录1. 使用FORCE INDEX语法2. 使用USE INDEX语法3. 使用IGNORE IND

SQL Server安装时候没有中文选项的解决方法

《SQLServer安装时候没有中文选项的解决方法》用户安装SQLServer时界面全英文,无中文选项,通过修改安装设置中的国家或地区为中文中国,重启安装程序后界面恢复中文,解决了问题,对SQLSe... 你是不是在安装SQL Server时候发现安装界面和别人不同,并且无论如何都没有中文选项?这个问题也

2025版mysql8.0.41 winx64 手动安装详细教程

《2025版mysql8.0.41winx64手动安装详细教程》本文指导Windows系统下MySQL安装配置,包含解压、设置环境变量、my.ini配置、初始化密码获取、服务安装与手动启动等步骤,... 目录一、下载安装包二、配置环境变量三、安装配置四、启动 mysql 服务,修改密码一、下载安装包安装地