V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
summerlv
V2EX  ›  程序员

询问本地 flink 应用写入到阿里云主机时遇到写入不了的问题

  •  
  •   summerlv · 277 天前 · 645 次点击
    这是一个创建于 277 天前的主题,其中的信息可能已经有所发展或是发生改变。

    问题是这样的,本来是在本地开发 flink 应用,然后想连接云主机 Doris 把内容写入到 Doris 中,但是一开始报错是返回了远程 Doris 运行时的一个内网地址。

    于是改了以下源码的这个类:BackendV2 在其中增加了 convertToHostname() 方法将内网地址映射为公网地址

    // Licensed to the Apache Software Foundation (ASF) under one
    // or more contributor license agreements.  See the NOTICE file
    // distributed with this work for additional information
    // regarding copyright ownership.  The ASF licenses this file
    // to you under the Apache License, Version 2.0 (the
    // "License"); you may not use this file except in compliance
    // with the License.  You may obtain a copy of the License at
    //
    //   http://www.apache.org/licenses/LICENSE-2.0
    //
    // Unless required by applicable law or agreed to in writing,
    // software distributed under the License is distributed on an
    // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    // KIND, either express or implied.  See the License for the
    // specific language governing permissions and limitations
    // under the License.
    
    package org.apache.doris.flink.rest.models;
    
    import com.car.common.Constant;
    import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
    import com.fasterxml.jackson.annotation.JsonProperty;
    
    import java.util.List;
    import java.util.Objects;
    
    /**
     * Be response model
     **/
    @JsonIgnoreProperties(ignoreUnknown = true)
    public class BackendV2 {
    
        @JsonProperty(value = "backends")
        private List<BackendRowV2> backends;
    
        public void setBackends(List<BackendRowV2> backends) { this.backends = backends; }
    
        public static class BackendRowV2 {
            @JsonProperty("ip")
            public String ip;
            @JsonProperty(value="http_port")
            public int httpPort;
            @JsonProperty("is_alive")
            public boolean isAlive;
            
            public String getIp() {
                return ip;
            }
    
            public void setIp(String ip) {
                this.ip = ip;
            }
    
            public int getHttpPort() {
                return httpPort;
            }
    
            public void setHttpPort(int httpPort) {
                this.httpPort = httpPort;
            }
    
            public boolean isAlive() {
                return isAlive;
            }
    
            public void setAlive(boolean alive) {
                isAlive = alive;
            }
            
            public String convertToHostname(String ip) {
                System.out.println("================"+ip);
                this.ip = ip;
                if(Objects.equals(ip,"192.168.0.107")) {
                    return ip= Constant.HADOOP102;
                } else if(Objects.equals(ip,"192.168.0.108")) {
                    return ip = Constant.HADOOP103;
                } else if (Objects.equals(ip,"192.168.0.109")) {
                    return ip = Constant.HADOOP104;
                } else {
                    return null;
                }
            }
            public String toBackendString(){
    			return convertToHostname(ip) + ":" + httpPort;
                //return ip + ":" + httpPort;
            }
    
        }
    }
    
    
    

    本来的源码是这样的:

    // Licensed to the Apache Software Foundation (ASF) under one
    // or more contributor license agreements.  See the NOTICE file
    // distributed with this work for additional information
    // regarding copyright ownership.  The ASF licenses this file
    // to you under the Apache License, Version 2.0 (the
    // "License"); you may not use this file except in compliance
    // with the License.  You may obtain a copy of the License at
    //
    //   http://www.apache.org/licenses/LICENSE-2.0
    //
    // Unless required by applicable law or agreed to in writing,
    // software distributed under the License is distributed on an
    // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    // KIND, either express or implied.  See the License for the
    // specific language governing permissions and limitations
    // under the License.
    
    package org.apache.doris.flink.rest.models;
    
    import com.car.common.Constant;
    import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
    import com.fasterxml.jackson.annotation.JsonProperty;
    
    import java.util.List;
    import java.util.Objects;
    
    /**
     * Be response model
     **/
    @JsonIgnoreProperties(ignoreUnknown = true)
    public class BackendV2 {
    
        @JsonProperty(value = "backends")
        private List<BackendRowV2> backends;
    
        public void setBackends(List<BackendRowV2> backends) { this.backends = backends; }
    
        public static class BackendRowV2 {
            @JsonProperty("ip")
            public String ip;
            @JsonProperty(value="http_port")
            public int httpPort;
            @JsonProperty("is_alive")
            public boolean isAlive;
            
            public String getIp() {
                return ip;
            }
    
            public void setIp(String ip) {
                this.ip = ip;
            }
    
            public int getHttpPort() {
                return httpPort;
            }
    
            public void setHttpPort(int httpPort) {
                this.httpPort = httpPort;
            }
    
            public boolean isAlive() {
                return isAlive;
            }
    
            public void setAlive(boolean alive) {
                isAlive = alive;
            }
            
           
            public String toBackendString(){
                return ip + ":" + httpPort;
            }
    
        }
    }
    
    

    但是替换完以后报错如下:

    Caused by: org.apache.doris.shaded.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "http_port" (class org.apache.doris.flink.rest.models.BackendV2$BackendRowV2), not marked as ignorable (4 known properties: "httpPort", "isAlive", "alive", "ip"])
     at [Source: (String)"{"backends":[{"ip":"192.168.0.109","http_port":7040,"is_alive":true}]}"; line: 1, column: 52] (through reference chain: org.apache.doris.flink.rest.models.BackendV2["backends"]->java.util.ArrayList[0]->org.apache.doris.flink.rest.models.BackendV2$BackendRowV2["http_port"])
    	at org.apache.doris.shaded.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException.from(UnrecognizedPropertyException.java:61)
    	at org.apache.doris.shaded.com.fasterxml.jackson.databind.DeserializationContext.handleUnknownProperty(DeserializationContext.java:1127)
    	at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.std.StdDeserializer.handleUnknownProperty(StdDeserializer.java:2023)
    	at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperty(BeanDeserializerBase.java:1700)
    	at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownVanilla(BeanDeserializerBase.java:1678)
    	at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:319)
    	at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:176)
    	at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.std.CollectionDeserializer._deserializeFromArray(CollectionDeserializer.java:355)
    	at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.std.CollectionDeserializer.deserialize(CollectionDeserializer.java:244)
    	at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.std.CollectionDeserializer.deserialize(CollectionDeserializer.java:28)
    	at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.impl.MethodProperty.deserializeAndSet(MethodProperty.java:129)
    	at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:313)
    	at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:176)
    	at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:323)
    	at org.apache.doris.shaded.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4674)
    	at org.apache.doris.shaded.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3629)
    	at org.apache.doris.shaded.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3597)
    	at org.apache.doris.flink.rest.RestService.parseBackendV2(RestService.java:380)
    

    替换前报错如下:

    2023-07-25 20:54:43 WARN (org.apache.doris.flink.sink.writer.DorisWriter:tryHttpConnection) - Failed to connect to backend:http://192.168.0.109:7040
    java.net.ConnectException: Connection timed out: connect
    	at java.net.DualStackPlainSocketImpl.waitForConnect(Native Method)
    	at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:85)
    	at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
    	at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
    	at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
    	at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172)
    	at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    	at java.net.Socket.connect(Socket.java:607)
    	at sun.net.NetworkClient.doConnect(NetworkClient.java:175)
    	at sun.net.www.http.HttpClient.openServer(HttpClient.java:463)
    	at sun.net.www.http.HttpClient.openServer(HttpClient.java:558)
    	at sun.net.www.http.HttpClient.<init>(HttpClient.java:242)
    	at sun.net.www.http.HttpClient.New(HttpClient.java:339)
    	at sun.net.www.http.HttpClient.New(HttpClient.java:357)
    	at sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1228)
    	at sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1162)
    	at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1056)
    	at sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:990)
    	at org.apache.doris.flink.sink.writer.DorisWriter.tryHttpConnection(DorisWriter.java:259)
    	at org.apache.doris.flink.sink.writer.DorisWriter.getAvailableBackend(DorisWriter.java:245)
    	at org.apache.doris.flink.sink.writer.DorisWriter.initializeLoad(DorisWriter.java:108)
    	at org.apache.doris.flink.sink.DorisSink.createWriter(DorisSink.java:64)
    	at org.apache.flink.streaming.api.transformations.SinkV1Adapter.createWriter(SinkV1Adapter.java:77)
    	at org.apache.flink.streaming.api.transformations.SinkV1Adapter$PlainSinkAdapter.createWriter(SinkV1Adapter.java:306)
    	at org.apache.flink.streaming.api.transformations.SinkV1Adapter$StatefulSinkAdapter.createWriter(SinkV1Adapter.java:315)
    	at org.apache.flink.streaming.runtime.operators.sink.StatefulSinkWriterStateHandler.createWriter(StatefulSinkWriterStateHandler.java:117)
    	at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.initializeState(SinkWriterOperator.java:146)
    	at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
    	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:274)
    	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:734)
    	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:709)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:675)
    	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
    	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:921)
    	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
    	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
    	at java.lang.Thread.run(Thread.java:750)
    

    想问问大家有没遇到过本地 flink 应用远程连接 Doris 并写入数据的问题啊? 感谢大家!!!

    liprais
        1
    liprais  
       277 天前 via iPhone
    端口转发到本地不就完了
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   我们的愿景   ·   实用小工具   ·   5604 人在线   最高记录 6543   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 27ms · UTC 06:46 · PVG 14:46 · LAX 23:46 · JFK 02:46
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.