Flink 内存模型的分配策略 主公式推导

结论: 启动flink设定的 ytm数值 与实际监控展示的JVM_Heap数值关系是 (ytm大于1920的简化公式) JVM_Heap = ytm * 0.45 - 256 啓動參數: -ytm 设定的实际是 进程总内存,相当于yarn容器大小 Total_Process_Memory: ytm JVM_Metaspace: 默認 256m JVM_Overhead: 默認 jtm * 0.1 (必須在 192m ~ 1g (默認)) Total_Flink_Memory: Total_Process_Memory - JVM_Overhead - JVM_Metaspace Framework_Heap: 默認 128m Managed_Memory: 默認 Total_Flink_Memory * 0.4 Framework_Off-Heap: 默認 128m Task_Off-Heap: 默認 0 Network: 默認 Total_Flink_Memory * 0.1 (必須在 64m ~ 1g (默認)) 所以 Task_Heap = Total_Flink_Memory - Framework_Heap - Managed_Memory - Framework_Off-Heap - Task_Off-Heap - Network 如果都用默认配置,那么代入化简就是...

SuCicada

Flink 算子Function实例化的坑

问题回顾 关于一段代码: object MySingleObj{ // 陷阱: // 单例对象中一个是可变引用,一个是可变数组 var str:String = _ val list = new ListBuffer[String] } ... dataStream .map(new RichMapFunction(){ // 问题1:obj1 和 obj2 的实例方式有什么区别。 // 问题2:考虑参数0的作用以及是否会得到预期效果。 val obj1:MyClass = new MyClass(参数0) var obj2:MyClass = _ override def open(paramation:Configuration): Unit = { obj2 = new MyClass(参数0) } override def map(value, ....) = { // 问题3:如果在这里使用 obj1 和 obj2 会有什么区别。 // 问题4:单个slot中对单例对象中的变量修改,造成的影响是。 MySingleObj.str = value MySingleObj.list += value } }) ....

SuCicada

Flume 数据流的处理过程 从源码看 Source Channel Sink

Event 构成 Source Flume的三个部分:Source,Channel,Sink 数据是存储在Event对象中在这三部分之间传递 Event 构成 Event 接口 public interface Event { public Map<String, String> getHeaders(); public void setHeaders(Map<String, String> headers); public byte[] getBody(); public void setBody(byte[] body); } 以 最简单的实现类 SimpleEvent 为例子 public class SimpleEvent implements Event { private Map<String, String> headers; private byte[] body; ... } 可见 header 以 Map<String, String> 的形式存储键值对信息 body 二进制形式存储,从Source接收到的数据会存储在这里 Source 带着问题 header 里有什么 一些实现类 ExecSource ExecRunnable 主要处理 通过 EventBuilder.withBody 基础创建 Event , 一行一个 Event header 为空 flushEventBatch 中 ChannelProcessor :: processEventBatch...

SuCicada

Hbase2 没有org.apache.hadoop.hbase.mapreduce.TableInputFormat

导入 <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-mapreduce</artifactId> <version>${hbase.version}</version> </dependency> 参考链接:https://juejin.cn/post/6844903959585374221

SuCicada

Jetty 报错 Tomcat不报错 maven jetty插件不报错, 可能是jetty版本不统一造成

要在服务器上部署, jetty报错找不见hibernate某个函数, 然而此函数存在. 开发环境为 Intellij jetty插件版本: 9.2.6.v20141205 外置jetty版本: 9.4.21.v20190926 如果你也有类似问题, 不妨试试将jetty版本统一 我将外置jetty降版本后成功 来自这里的启发

SuCicada

Spark 2.4.0 cdh6.3.2连接 Hive 2.1.1 cdh6.3.2

以下maven配置能正确读取hive. 不该加的不要加. spark版本用cdh的. <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <spark.version>2.4.0-cdh6.3.2</spark.version> <hive.version>2.1.1-cdh6.3.2</hive.version> </properties> <dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.11</artifactId> <version>${spark.version}</version> <exclusions> <exclusion> <artifactId>hive-metastore</artifactId> <groupId>org.spark-project.hive</groupId> </exclusion> <exclusion> <artifactId>hive-exec</artifactId> <groupId>org.spark-project.hive</groupId> </exclusion> </exclusions> </dependency> <!-- <dependency>--> <!-- <groupId>org.apache.hive</groupId>--> <!-- <artifactId>hive-metastore</artifactId>--> <!-- <version>${hive.version}</version>--> <!-- </dependency>--> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>${hive.version}</version> <exclusions> <exclusion> <groupId>*</groupId> <artifactId>*</artifactId> </exclusion> </exclusions> </dependency> </dependencies>

SuCicada

Spark2 Sql 遇到 Caused by: java.lang.NoSuchFieldError: HIVE_STATS_JDBC_TIMEOUT

问题版本: Spark:2.14.0 Hive:2.1.0 原因参见spark hive java.lang.NoSuchFieldError: HIVE_STATS_JDBC_TIMEOUT 解决方案: 使用 cdh版本的包,比如 <spark.version>2.4.0-cdh6.3.2</spark.version> <hive.version>2.1.1-cdh6.3.2</hive.version> 如果遇到有关 apache的log4j包缺失,加入 <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>2.8.2</version> </dependency> 注意版本 相关问题参见I’m getting “NoClassDefFoundError: org/apache/logging/log4j/util/ReflectionUtil” 掰掰

SuCicada

关于 Flink1.11.1 找不到 Hadoop Native库解决方法

可以试试在 flink 的 conf/flink-conf.yaml 配置文件中加入配置如下 其中的native库的具体路径换成你自己的。 yarn.application-master.env.LD_LIBRARY_PATH: /opt/cloudera/parcels/CDH/lib/hadoop/lib/native:$LD_LIBRARY_PATH yarn.taskmanager.env.LD_LIBRARY_PATH: /opt/cloudera/parcels/CDH/lib/hadoop/lib/native:$LD_LIBRARY_PATH

SuCicada

关于Flink 本地测试,自定义WebUI 端口的方法

以1.11.1版本举例,相差不大的版本之间大同小异。 先给成品:以Scala代码举例,Java大同小异。 通过反射将配置加入env的配置对象中。之后使用修改过的env来创建flink的任务流即可。 val env = StreamExecutionEnvironment.getExecutionEnvironment val javaEnv: environment.StreamExecutionEnvironment = env.getJavaEnv val field = classOf[org.apache.flink.streaming.api.environment.StreamExecutionEnvironment].getDeclaredField("configuration") field.setAccessible(true) import org.apache.flink.configuration.Configuration val configuration: Configuration = field.get(javaEnv).asInstanceOf[Configuration] configuration.setString("rest.bind-port", "8081") 下面是探索过程,没兴趣的可以过了。 当我们加入了pom依赖后.发现能够看到本地IDE中的flink的webUI了. <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-runtime-web_2.11</artifactId> <version>${flink.version}</version> <scope>compile</scope> </dependency> 根据日志中显示可知我们的本地web端口为16434. 这不是一个我们想要看到的. 而且每一次运行都会产生一个随机的端口.这实在很痛苦. 17:15:28,577 INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint - Rest endpoint listening at localhost:16434 17:15:28,578 INFO org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService - Proposing leadership to contender http://localhost:16434 17:15:28,581 INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint - Web frontend listening at http://localhost:16434 17:15:28,581 INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint - http://localhost:16434 was granted leadership with leaderSessionID=eb84fead-f735-4350-aff4-a7f883013432 所以我们要想办法来固定端口....

SuCicada

关于Flink写入Redis没有incrByFloat等方法的解决措施

首当其冲:改源码。 使用的是org.apache.bahir:flink-connector-redis_2.11 目前2020年8月中maven官方库中最新的版本只有1.0。此版本未提供incrByFloat的方法。 首先猜测可能maven库不是最新的。去到此项目的github上一看。居然是1.1-SNAPSHOT的版本。但是此版本中仍然没有找到incrByFloat。 所以我们可以使用改源码重新编译的方式来解决这个问题。org.apache.flink:flink-connector-redis_2.11库与其类似。好在这个库比较简单,不需要做结构性的改动。 本博客意在记录需要改动源码的地方。 org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer redis的操作接口,在这里添加void incrByFloat(String key, Double value); 的接口定义。 操作接口的实现有2处。org.apache.flink.streaming.connectors.redis.common.container.RedisClusterContainer和org.apache.flink.streaming.connectors.redis.common.container.RedisContainer 加入incrByFloat实现:(写法完全照抄其他方法实现) @Override public void incrByFloat(String key, Double value) { Jedis jedis = null; try { jedis = getInstance(); jedis.incrByFloat(key, value); } catch (Exception e) { if (LOG.isErrorEnabled()) { LOG.error("Cannot send Redis message with command INCRBYFLOAT to key {} and value {} error message {}", key, value, e.getMessage()); } throw e; }finally { releaseInstance(jedis); } } org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand...

SuCicada

关于Scala 的尾递归,使用JITWatch从字节码观察其原理

先说概念: Scala的尾递归会被编译器自动优化成循环 主题直通车 先来简单看下一个简单验证方法 对比普通的递归: def fun2(x: Int): Int = { if (x == 1) throw new Exception("nooo") else fun2(x - 1) + 0 } 结果: Exception in thread "main" java.lang.Exception: nooo at Main$.fun2(Main.scala:17) at Main$.fun2(Main.scala:19) at Main$.fun2(Main.scala:19) at Main$.fun2(Main.scala:19) at Main$.fun2(Main.scala:19) 。。。。 at Main$.fun2(Main.scala:19) at Main$.main(Main.scala:25) at Main.main(Main.scala) 现象: 我们能看到在递归结束前,这个方法已经进入自己很多次了。 尾递归 def fun(x: Int): Int = { if (x == 0) throw new Exception("nooo") else fun(x - 1) } 结果:...

SuCicada

证明 scala 不能从外部调用内部函数

一段代码 object ASD { def main(args: Array[String]): Unit = { def f(a: Any): Unit = { println(a) } f("sfsfsfdsdfd") } } 如果我们想进行类似ASD.main.f(xx)或ASD.f(xx)的操作, 是否可行. 事实是残酷的, 它告诉我们不可行. 那么下面从反编译角度来探究为什么不可行: 首先我们打开编译后的 class 文件所在目录 有2个文件: ASD.class和ASD$.class ASD.class 节选 public final class ASD { public static void main(String[] paramArrayOfString) { ASD$.MODULE$.main(paramArrayOfString); } } 这个不重要 ASD$.class 节选 public final class ASD$ { public static final ASD$ MODULE$; private final void f$1(Object a) { Predef$.MODULE$.println(a); } public void main(String[] args) { f$1("sfsfsfdsdfd"); } private ASD$() { MODULE$ = this; } } 发现了吗, 内部函数 f 被编译为 私有的 final 方法f$1....

SuCicada

针对导入外部Gradle项目发生的版本冲突错误。

比如一个使用场景:Intellij 导入Kafka 2.2.1版本源码。 在使用gradle初始化项目时各种grafana配置文件报错。 原因主要是本机构建用的Gradle版本与项目编写配置文件产生冲突,简而言之就是Gradle版本不对。 比如Kafka 2.2.1 在发布的时候,Gradle 版本最高直到5.4.1。而本机使用了Gradle 6 。导致了冲突,下载 Gradle 5.4.1 并用其构建即可解决。

SuCicada