Skip to main content

I’ve got the following setup:

  • MySQL → Striim → Kafka + CSR and avro encoding

 

The table that I’m replicating is pretty simple:

 

items | CREATE TABLE `items` (

  `id` bigint unsigned NOT NULL AUTO_INCREMENT,

  `name` varchar(100) DEFAULT NULL,

  `category` varchar(100) DEFAULT NULL,

  `price` decimal(7,2) DEFAULT NULL,

  `inventory` int DEFAULT NULL,

  `inventory_updated_at` timestamp NULL DEFAULT CURRENT_TIMESTAMP,

  `created_at` timestamp NULL DEFAULT CURRENT_TIMESTAMP,

  `updated_at` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,

  PRIMARY KEY (`id`),

  UNIQUE KEY `id` (`id`)

) ENGINE=InnoDB AUTO_INCREMENT=2001 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci

 

However the bigint seems to be causing the following error when Striim starts ingesting messages:


Message: KafkaWriter is unable to produce events to shop. Component Name: Kafka_2_1_0_mysql_kafka_csr_Target. Component Type: TARGET. Cause: KafkaWriter is unable to produce events to shop. Cause: Suitable Avro type not found for field - ID with java.math.BigInteger

 

As a test if I use a schema that does not use `bigint` it works as expected.

Thanks for using Striim! Can you share your TQL file to see how your MySQL reader and KafkaWriter are configured? Please be sure to block out and remove any sensitive fields. 

 


Hey!

Yes, here it is:

 

CREATE OR REPLACE APPLICATION mysql_kafka_csr USE EXCEPTIONSTORE TTL : '7d' ;

CREATE FLOW mysql_kafka_csr_SourceFlow;

CREATE OR REPLACE SOURCE mysql_kafka_csr_MysqlCDC USING Global.MysqlReader ( 

  Password: 'MY_PASSWORD', 

  Compression: false, 

  Password_encrypted: 'true', 

  CDDLAction: 'Process', 

  Username: 'MY_USER', 

  CDDLCapture: true, 

  ConnectionURL: 'jdbc:mysql://MY_HOST:3306', 

  Tables: '"shop"."users"', 

  StartTimestamp: 'NOW', 

  adapterName: 'MysqlReader', 

  connectionRetryPolicy: 'retryInterval=30, maxRetries=3', 

  FilterTransactionBoundaries: true, 

  SendBeforeImage: true ) 

OUTPUT TO mysql_kafka_csr_OutputStream;

END FLOW mysql_kafka_csr_SourceFlow;

CREATE OR REPLACE TARGET Kafka_2_1_0_mysql_kafka_csr_Target USING Global.KafkaWriter VERSION '0.11.0'( 

  KafkaConfigValueSeparator: '=', 

  MessageKey: '', 

  adapterName: 'KafkaWriter', 

  MessageHeader: '', 

  KafkaConfigPropertySeparator: ';', 

  Mode: 'Sync', 

  Topic: 'shop.users', 

  brokerAddress: 'MY_HOST:9092', 

  KafkaConfig: 'request.timeout.ms=60001;session.timeout.ms=60000' ) 

FORMAT USING Global.AvroFormatter  ( 

  formatAs: 'Table', 

  schemaregistryurl: 'http://MY_HOST:8081', 

  handler: 'com.webaction.proc.AvroFormatter', 

  formatterName: 'AvroFormatter', 

  schemaregistryConfiguration: '' ) 

INPUT FROM mysql_kafka_csr_OutputStream;

END APPLICATION mysql_kafka_csr;

 

As a test if I use a table that does not use `bigint` it works as expected.


Reply