您的位置:首页 > 其它

实时数仓中flink异步io补维操作

2020-04-04 12:10 369 查看

    在实时数据仓库中,事实表可以通过flink实时清洗到操作数据层ods层。操作基础数据到dw明细数据层需要对一些维度进行补充,形成一个宽表。本文通过异步io的方式对mysql数据库的维度信息进行抽取,同时使用缓存对维度数据进行缓存。线上使用发现这种方式非常稳定。需要注意的是要注意对数据库的连接数需要设置,避免连接数被用尽的情况。

package com.mgtv.data.dimension;

import java.io.Serializable;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import com.google.common.base.Strings;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheStats;
import com.mgtv.data.external.JdbcConnector;
import com.mgtv.data.pojo.AbstractLog;
import com.mgtv.data.service.ParameterToolService;
import com.mgtv.data.util.Constant;
import com.zaxxer.hikari.HikariDataSource;

import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;

import io.vertx.core.json.JsonArray;
import io.vertx.ext.sql.SQLClient;
import io.vertx.ext.sql.SQLConnection;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

public class ChannelDimension<T extends AbstractLog, E extends AbstractLog> extends RichAsyncFunction<AbstractLog, AbstractLog> implements Serializable {

    private static final long serialVersionUID = 1L;

    private static SQLClient dbs;

    private static Cache<String, ChannelEntity> nonExistCache;

    private static Cache<String, ChannelEntity> existCache;

    private static Map<String, Set<String>> bidType = new HashMap<>();

    private static Map<String, String> sqlType = new HashMap<>();

    private static ParameterTool parameterTool;

    private static String sqlFull = "select v.bname, c.did as pid, c.name as pname, c.type, s.did as id, s.name, v.did, v.name " +
   ");
    }

    @Override
    public void open(final Configuration config) throws Exception {
        parameterTool = (ParameterTool) getRuntimeContext().getExecutionConfig()
                    .getGlobalJobParameters();
        ParameterToolService.setParameterTool(parameterTool);
        dbs = JdbcConnector.getAsyncPool(parameterTool, "mofang-metadata");

        /**
         * 数据库中也不存在的缓存一分钟,防止缓存穿透
         */
        nonExistCache = CacheBuilder.newBuilder().maximumSize(1000).expireAfterWrite(1, TimeUnit.MINUTES).build();

        /**
         * 数据库中存在的永久缓存
         */
        existCache = CacheBuilder.newBuilder().maximumSize(50000).build();

        HikariDataSource dbSync = JdbcConnector.getPool(parameterTool, "mofang-metadata");
        try (Connection connection = dbSync.getConnection();) {
            PreparedStatement pidStatement = connection.prepareStatement(sqlFull);
            ResultSet resultSet = pidStatement.executeQuery();
            while (resultSet.next()) {
                ChannelEntity entity = new ChannelEntity();
                entity.setBid(resultSet.getString(1));
                entity.setPid(resultSet.getInt(2));
                entity.setPname(resultSet.getString(3));
                entity.setType(resultSet.getInt(4));
                entity.setId(resultSet.getInt(5));
                entity.setName(resultSet.getString(6));
                entity.setVersion_id(resultSet.getInt(7));
                if (entity.getBid().equals("ott") || entity.getBid().equals("tvos")) {
                    entity.setVersion(resultSet.getString(8));
                } else {
                    entity.setSource(resultSet.getString(8));
                }
                existCache.put(entity.uniqueKey(), entity);
            }
        } catch (Exception e) {
            System.out.println("[error]初始化加载渠道缓存失败");
            // 失败了没关系,等待被动更新缓存
        }
    }

    private static void buildChannel(ChannelEntity entity, final AbstractLog input) {
        input.setSub_channel_id(entity.getId());
        input.setSub_channel_name(entity.getName());
        input.setChannel_id(entity.getPid());
        input.setChannel_name(entity.getPname());
        input.setChannel_type(entity.getType());
        input.setVersion_id(entity.getVersion_id());
    }

    @Override
    public void close() throws Exception {
        final CacheStats nonCacheStats = nonExistCache.stats();
        System.out.println(
            String.format("[info]不存在缓存统计信息:\n请求数:%s\nmiss数:%s\n命中数:%s\n", 
                nonCacheStats.requestCount(), 
                nonCacheStats.missCount(), 
                nonCacheStats.hitCount()
            )
        );
        final CacheStats existCacheStats = existCache.stats();
        System.out.println(
            String.format("[info]存在缓存统计信息:\n请求数:%s\nmiss数:%s\n命中数:%s\n", 
                existCacheStats.requestCount(), 
                existCacheStats.missCount(), 
                existCacheStats.hitCount()
            )
        );
        nonExistCache.cleanUp();
        existCache.cleanUp();
        dbs.close();
    }

    @Override
    public void asyncInvoke(final AbstractLog input, final ResultFuture<AbstractLog> resultFuture) throws Exception {
        final Optional<Map.Entry<String, Set<String>>> typeOptional = bidType.entrySet().stream().filter(each -> each.getValue().contains(input.getBid())).findFirst();
        if (!typeOptional.isPresent()) {
            resultFuture.complete(Collections.singleton(input));
            return;
        }
        final String type = typeOptional.get().getKey();
        final ChannelEntity.ChannelEntityBuilder entityBuilder = ChannelEntity.builder()
            .bid(input.getBid())
            .version(input.getVersion_name())
            .source(input.getSub_channel_source());
        if (type.equals("ott") || type.equals("tvos")) {
            entityBuilder.version(input.getVersion_name());
        } else if (type.equals("one")) {
            entityBuilder.source(input.getChannel_source());
        } else {
            entityBuilder.source(input.getSub_channel_source());
        }
        final ChannelEntity entity = entityBuilder.build();
        if ((type.equals("ott") || type.equals("tvos")) 
            && Strings.isNullOrEmpty(entity.getVersion()) 
            || (!type.equals("ott") && !type.equals("tvos"))
            && Strings.isNullOrEmpty(entity.getSource())) {
            resultFuture.complete(Collections.singleton(input));
            return;
        }
        final String key = entity.uniqueKey();
        ChannelEntity cache = existCache.getIfPresent(key);
        if (null != cache) {
            buildChannel(cache, input);
            resultFuture.complete(Collections.singleton(input));
            return;
        }
        cache = nonExistCache.getIfPresent(key);
        if (null != cache) {
            resultFuture.complete(Collections.singleton(input));
            return;
        }
        final String sql = sqlType.get(type);
        final String source = type.equals("ott") || type.equals("tvos") ? entity.getVersion() : entity.getSource();
        dbs.getConnection(connection -> {
            if (connection.failed()) {
                System.out.println("[error]渠道补维获取数据库连接失败");
                resultFuture.complete(Collections.singleton(input));
                return;
            }
            final SQLConnection conn = connection.result();
            final Integer bid = Constant.BNAME_TO_BID.get(entity.getBid());
            conn.queryWithParams(sql, new JsonArray().add(bid).add(source), rs -> {
                if (rs.failed()) {
                  System.out.println("[error]查询渠道数据出错");
                  resultFuture.complete(Collections.singleton(input));
                  conn.close();
                  return;
                }
                final List<JsonArray> lines = rs.result().getResults();
                if (lines.isEmpty()) {
                    nonExistCache.put(key, entity);
                    resultFuture.complete(Collections.singleton(input));
                    conn.close();
                    return;
                }
                entity.setPid(lines.get(0).getInteger(0));
                entity.setPname(lines.get(0).getString(1));
                entity.setType(lines.get(0).getInteger(2));
                switch (type) {
                    case "tvos":
                        entity.setVersion_id(lines.get(0).getInteger(3));
                        break;
                    case "ott":
                        entity.setId(lines.get(0).getInteger(3));
                        entity.setName(lines.get(0).getString(4));
                        entity.setVersion_id(lines.get(0).getInteger(5));
                        break;
                    case "sub":
                        entity.setId(lines.get(0).getInteger(3));
                        entity.setName(lines.get(0).getString(4));
                        break;
                }
                existCache.put(key, entity);
                buildChannel(entity, input);
                resultFuture.complete(Collections.singleton(input));
                conn.close();
              });
        });
    }

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    @Builder
    public static class ChannelEntity {
        private String bid;

        private String source;

      
        private String version;

        private Integer version_id;

        private Integer id;

        private String name;

        private Integer pid;

        private String pname;

        private Integer type;

        public String uniqueKey() {
            if (bid.equals("tvos") || bid.equals("ott")) {
                return String.format("%s:%s", bid, version);
            }
            return String.format("%s:%s", bid, source);
        }
    }
}
 

  • 点赞
  • 收藏
  • 分享
  • 文章举报
haungtan07 发布了9 篇原创文章 · 获赞 0 · 访问量 308 私信 关注
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: