Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Flink cdc paimon ingestion recovery issue #4969

Open
1 of 2 tasks
kwokhh opened this issue Jan 21, 2025 · 3 comments
Open
1 of 2 tasks

[Feature] Flink cdc paimon ingestion recovery issue #4969

kwokhh opened this issue Jan 21, 2025 · 3 comments
Labels
enhancement New feature or request

Comments

@kwokhh
Copy link

kwokhh commented Jan 21, 2025

Search before asking

  • I searched in the issues and found nothing similar.

Motivation

I face some issue when loading a big database from mysql to paimon just using one flink paimon cdc ingestion job. If flink ingestion job encounters an exception triggered by loading one of the tables in the database, then the whole flink job will stop and affect the ingestion work of the other table ingestion process. If I encounter the exception which requires me to restart the job without initial savepoint to solve it, is there any configuration to avoid initial loading all table in the job configured and just initial loading the table which trigger the exception?

Solution

No Solution

Anything else?

Using paimon 0.9 and flink1.20

Are you willing to submit a PR?

  • I'm willing to submit a PR!
@kwokhh kwokhh added the enhancement New feature or request label Jan 21, 2025
@yangjf2019
Copy link
Contributor

Hi @kwokhh ,Thanks for your issue!Would you like to provide more information for this problem? For example, flink code, logs,env and so on.

@kwokhh
Copy link
Author

kwokhh commented Feb 11, 2025

Hi @yangjf2019 the flink deployment is described below:

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: <name>
spec:
  image: <name>
  
  mode: native
  flinkVersion: v1_17
  flinkConfiguration:
    taskmanager.memory.jvm-metaspace.size: 512 mb
    jobmanager.memory.jvm-metaspace.size: 512 mb
    metrics.job.status.enable: "STATE"
    metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
    taskmanager.numberOfTaskSlots: "4"
    taskmanager.memory.managed.fraction: "0.1"
    high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory


    state.savepoints.dir: <dir>
    state.checkpoints.dir: <dir>
    high-availability.storageDir: <dir>


    kubernetes.hadoop.conf.config-map.name: <name>


    kubernetes.operator.periodic.savepoint.interval: 30 min
    kubernetes.operator.savepoint.history.max.age: 14 d
    kubernetes.operator.savepoint.history.max.count: "5"
    kubernetes.operator.job.upgrade.last-state.max.allowed.checkpoint.age: 30 min


    execution.checkpointing.unaligned.enabled: "false"
    execution.checkpointing.tolerable-failed-checkpoints: "100"
    execution.checkpointing.interval: 15 min
    execution.checkpointing.min-pause: 2 min
    execution.checkpointing.timeout: 10 min
    restart-strategy: fixed-delay
    restart-strategy.fixed-delay.attempts: "2147483647"
    restart-strategy.fixed-delay.delay: 120 s


    state.checkpoints.num-retained: "10"


    pipeline.object-reuse: "true"


    akka.ask.timeout: 10m


    client.timeout: 15 min
e
    akka.framesize: 41943040b


    table.local-time-zone: <region>


  podTemplate:
    apiVersion: v1
    kind: Pod
    metadata:
      name: pod-template
    spec:
      imagePullSecrets:
        - name: <name>
      containers:
        - name: flink-main-container
          ports:
            - name: metrics
              containerPort: 9249
              protocol: TCP
  serviceAccount: flink
  jobManager:
    replicas: 1
    resource:
      memory: "32G"
      cpu: 4
  taskManager:
    replicas: 2
    resource:
      memory: "32G"
      cpu: 4
  restartNonce: 0
  job:
    jarURI: local:////opt/flink/usrlib/paimon-flink-action-0.8.2.jar
    entryClass: "org.apache.paimon.flink.action.FlinkActions"


#
    args: [
        "mysql_sync_database",
        "--warehouse", "<link>",
        "--database" , "<db>",
        "--mysql_conf", "hostname=<hostname>",
        "--mysql_conf", "port=<port>",
        "--mysql_conf", "username=<name>",
        "--mysql_conf", "password=<password>",
        "--mysql_conf", "database-name=<name>",
        "--mysql_conf", "server-id=<id>",
        "--mysql_conf", "server-time-zone=<zone>",
        "--including_tables", "<tables>",
        "--catalog_conf", "metastore=filesystem",
        "--catalog_conf", "case-sensitive=false",
        "--table_conf", "bucket=4",
        "--table_conf", "changelog-producer=input"
    ]
#      parallelism: 2
    upgradeMode: last-state
    allowNonRestoredState: true
    state: running 
    savepointTriggerNonce: 1


  ingress:
    template: "<link>"
    className: "nginx"
    annotations:
      nginx.ingress.kubernetes.io/rewrite-target: "/$2"
  logConfiguration:
    log4j-console.properties: |+
      # This affects logging for both user code and Flink
      rootLogger.level = INFO
      rootLogger.appenderRef.console.ref = ConsoleAppender
      rootLogger.appenderRef.rolling.ref = RollingFileAppender






      logger.akka.name = <name>
      logger.akka.level = INFO
      logger.kafka.name= org.apache.kafka
      logger.kafka.level = INFO
      logger.hadoop.name = org.apache.hadoop
      logger.hadoop.level = INFO
      logger.zookeeper.name = org.apache.zookeeper
      logger.zookeeper.level = INFO


      appender.console.name = ConsoleAppender
      appender.console.type = CONSOLE
      appender.console.layout.type = PatternLayout
      appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %replace{%m}{(password['|\"]*\\s*[=|:]\\s*['|\"]*)[^'\",\r\n\\s]+('|\"|,|\r\n|\r|\n|\\s|$)}{$1****$2}%n


      # Log all infos in the given rolling file
      appender.rolling.name = RollingFileAppender
      appender.rolling.type = RollingFile
      appender.rolling.append = false
      appender.rolling.fileName = ${sys:log.file}
      appender.rolling.filePattern = ${sys:log.file}.%i
      appender.rolling.layout.type = PatternLayout
      appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %replace{%m}{(password['|\"]*\\s*[=|:]\\s*['|\"]*)[^'\",\r\n\\s]+('|\"|,|\r\n|\r|\n|\\s|$)}{$1****$2}%n
      appender.rolling.policies.type = Policies
      appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
      appender.rolling.policies.size.size=100MB
      appender.rolling.strategy.type = DefaultRolloverStrategy
      appender.rolling.strategy.max = 10




      logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
      logger.netty.level = OFF

@yangjf2019
Copy link
Contributor

Thanks for your information! and from the information provided in the yaml, I see that the versions of flink and paimon are not the ones that are running with errors.

 flinkVersion: v1_17

 jarURI: local:////opt/flink/usrlib/paimon-flink-action-0.8.2.jar
Image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants