entrega tercer laboratorio

parent ce43bda0
# syntax=docker/dockerfile:1
#FROM eclipse-temurin:17-jdk-jammy
FROM maven:3.8.6-amazoncorretto-17
#RUN apt-get update
#RUN apt-get install -y --no-install-recommends apt-utils
#RUN apt-get install -y dialog
#RUN apt-get install -y wget unzip curl maven
WORKDIR demo
COPY pom.xml ./
RUN mvn dependency:resolve
COPY src ./src
RUN mvn clean package
CMD mvn spring-boot:run
\ No newline at end of file
---
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.2.2
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-server:7.2.2
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://broker:9092
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
schema-registry:
image: confluentinc/cp-schema-registry:7.2.2
hostname: schema-registry
container_name: schema-registry
depends_on:
- broker
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
connect:
image: cnfldemos/cp-server-connect-datagen:0.5.3-7.1.0
hostname: connect
container_name: connect
depends_on:
- broker
- schema-registry
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
# CLASSPATH required due to CC-2422
CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-7.2.2.jar
CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
control-center:
image: confluentinc/cp-enterprise-control-center:7.2.2
hostname: control-center
container_name: control-center
depends_on:
- broker
- schema-registry
- connect
- ksqldb-server
ports:
- "9021:9021"
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: 'connect:8083'
CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://ksqldb-server:8088"
CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://localhost:8088"
CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
CONFLUENT_METRICS_TOPIC_REPLICATION: 1
PORT: 9021
ksqldb-server:
image: confluentinc/cp-ksqldb-server:7.2.2
hostname: ksqldb-server
container_name: ksqldb-server
depends_on:
- broker
- connect
ports:
- "8088:8088"
environment:
KSQL_CONFIG_DIR: "/etc/ksql"
KSQL_BOOTSTRAP_SERVERS: "broker:29092"
KSQL_HOST_NAME: ksqldb-server
KSQL_LISTENERS: "http://0.0.0.0:8088"
KSQL_CACHE_MAX_BYTES_BUFFERING: 0
KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
KSQL_KSQL_CONNECT_URL: "http://connect:8083"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: 1
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: 'true'
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: 'true'
ksqldb-cli:
image: confluentinc/cp-ksqldb-cli:7.2.2
container_name: ksqldb-cli
depends_on:
- broker
- connect
- ksqldb-server
entrypoint: /bin/sh
tty: true
ksql-datagen:
image: confluentinc/ksqldb-examples:7.2.2
hostname: ksql-datagen
container_name: ksql-datagen
depends_on:
- ksqldb-server
- broker
- schema-registry
- connect
command: "bash -c 'echo Waiting for Kafka to be ready... && \
cub kafka-ready -b broker:29092 1 40 && \
echo Waiting for Confluent Schema Registry to be ready... && \
cub sr-ready schema-registry 8081 40 && \
echo Waiting a few seconds for topic creation to finish... && \
sleep 11 && \
tail -f /dev/null'"
environment:
KSQL_CONFIG_DIR: "/etc/ksql"
STREAMS_BOOTSTRAP_SERVERS: broker:29092
STREAMS_SCHEMA_REGISTRY_HOST: schema-registry
STREAMS_SCHEMA_REGISTRY_PORT: 8081
rest-proxy:
image: confluentinc/cp-kafka-rest:7.2.2
depends_on:
- broker
- schema-registry
ports:
- 8082:8082
hostname: rest-proxy
container_name: rest-proxy
environment:
KAFKA_REST_HOST_NAME: rest-proxy
KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092'
KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
dba:
image: postgres:13.8-alpine
expose:
- 5432
environment:
- POSTGRES_USER=laboratorioadm
- POSTGRES_PASSWORD=laboratorioadm
- POSTGRES_DB=laboratorio
flyway-a:
image: flyway/flyway
command: -url=jdbc:postgresql://dba:5432/laboratorio -user=laboratorioadm -password=laboratorioadm -connectRetries=60 migrate
volumes:
- ./src/flyway:/flyway/sql
depends_on:
- dba
bordero-backend-a:
build: .
ports:
- 8090:8080
environment:
- SPRING_DATASOURCE_URL=jdbc:postgresql://dba:5432/laboratorio
- SPRING_DATASOURCE_USER=laboratorioadm
- SPRING_DATASOURCE_PASSWORD=laboratorioadm
- BORDERO_SERVER_ID=Bordero-A
depends_on:
- dba
- flyway-a
- broker
dbb:
image: postgres:13.8-alpine
expose:
- 5432
environment:
- POSTGRES_USER=laboratorioadm
- POSTGRES_PASSWORD=laboratorioadm
- POSTGRES_DB=laboratorio
flyway-b:
image: flyway/flyway
command: -url=jdbc:postgresql://dbb:5432/laboratorio -user=laboratorioadm -password=laboratorioadm -connectRetries=60 migrate
volumes:
- ./src/flyway:/flyway/sql
depends_on:
- dbb
bordero-backend-b:
build: .
ports:
- 8091:8080
environment:
- SPRING_DATASOURCE_URL=jdbc:postgresql://dbb:5432/laboratorio
- SPRING_DATASOURCE_USER=laboratorioadm
- SPRING_DATASOURCE_PASSWORD=laboratorioadm
- BORDERO_SERVER_ID=Bordero-B
depends_on:
- dbb
- flyway-b
- broker
\ No newline at end of file
......@@ -14,7 +14,8 @@
<name>demo</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>18</java.version>
<java.version>17</java.version>
<spring-cloud.version>2021.0.4</spring-cloud.version>
</properties>
<dependencies>
<dependency>
......@@ -51,14 +52,145 @@
<artifactId>spring-web</artifactId>
<version>5.3.2</version>
</dependency> -->
<!-- inicio dependencias agregadas -->
<dependency>
<groupId>com.github.dozermapper</groupId>
<artifactId>dozer-core</artifactId>
<version>6.5.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.22</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
<scope>test</scope>
<classifier>test-binder</classifier>
<type>test-jar</type>
</dependency>
<!-- fin dependencias agregadas -->
</dependencies>
<!-- agregado -->
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<!-- fin agregado -->
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
<!-- agregado -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>3.0.0-M5</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<version>3.0.0-M5</version>
<executions>
<execution>
<goals>
<goal>integration-test</goal>
<goal>verify</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- fin agregado -->
<!-- inicio plugin flyway -->
<plugin>
<groupId>org.flywaydb</groupId>
<artifactId>flyway-maven-plugin</artifactId>
<version>8.5.13</version>
<configuration>
<driver>org.postgresql.Driver</driver>
<url>jdbc:postgresql://localhost:5432/laboratorio</url>
<user>laboratorioadm</user>
<password>laboratorioadm</password>
<locations>
<location>filesystem:src/flyway/sql-1</location>
</locations>
</configuration>
<dependencies>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>RELEASE</version>
</dependency>
</dependencies>
<executions>
<execution>
<id>clean-flyway</id>
<phase>pre-integration-test</phase>
<goals>
<goal>clean</goal>
</goals>
</execution>
<execution>
<id>migrate-flyway</id>
<phase>pre-integration-test</phase>
<goals>
<goal>migrate</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- fin plugin flyway-->
<!-- inicio plugin docker -->
<plugin>
......@@ -69,8 +201,8 @@
<logDate>default</logDate>
<autoPull>true</autoPull>
<verbose>true</verbose>
<!-- <dockerHost>unix:///var/run/docker.sock</dockerHost> -->
<dockerHost>npipe:////./pipe/docker_engine</dockerHost>
<dockerHost>unix:///var/run/docker.sock</dockerHost>
<!-- <dockerHost>npipe:////./pipe/docker_engine</dockerHost> -->
<images>
<image>
<alias>${db}</alias>
......@@ -91,11 +223,38 @@
</log>
</run>
</image>
<!-- inicio imagen flyway -->
<image>
<alias>flyway</alias>
<name>flyway/flyway</name>
<run>
<volumes>
<bind>
<volume>${basedir}/src/flyway:/flyway/sql</volume>
</bind>
</volumes>
<log>
<prefix>flyway:</prefix>
<color>green</color>
</log>
<dependsOn>
<container>${db}</container>
</dependsOn>
<links>
<link>${db}</link>
</links>
<cmd>-url=jdbc:postgresql://${db}:5432/laboratorio -user=laboratorioadm -password=laboratorioadm
-connectRetries=60 migrate
</cmd>
</run>
</image>
<!-- fin imagen flyway -->
<image>
<name>${project.name}:${project.version}</name>
<build>
<from>openjdk:18</from>
<from>openjdk:17</from>
<assembly>
<name>build</name>
<descriptorRef>artifact</descriptorRef>
......@@ -113,6 +272,7 @@
<SPRING_DATASOURCE_URL>jdbc:postgresql://${db}:5432/laboratorio</SPRING_DATASOURCE_URL>
<SPRING_DATASOURCE_USER>laboratorioadm</SPRING_DATASOURCE_USER>
<SPRING_DATASOURCE_PASSWORD>laboratorioadm</SPRING_DATASOURCE_PASSWORD>
<BORDERO_SERVER_ID>${bordero.server.id}</BORDERO_SERVER_ID>
</env>
<log>
<prefix>spring: </prefix>
......
package com.trabajopractico.demo;
import com.trabajopractico.demo.backend.kafka.Event;
import com.trabajopractico.demo.backend.kafka.Producer;
import com.trabajopractico.demo.backend.kafka.serdes.EventDeserializer;
import com.trabajopractico.demo.backend.kafka.serdes.EventSerializer;
import com.trabajopractico.demo.backend.model.Operacion;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.*;
import java.util.Map;
@Configuration
public class KafkaConfiguration {
@Value("${spring.kafka.properties.bootstrap.servers}")
private String bootstrapServer;
@Bean
public ProducerFactory<Integer, Event<Operacion>> producerFactoryPlayDTO() {
return new DefaultKafkaProducerFactory<>(
Map.of(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer,
ProducerConfig.RETRIES_CONFIG, 0,
ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432,
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, EventSerializer.class
));
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerProperties());
}
@Bean
public Map<String, Object> consumerProperties() {
return Map.of(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer,
ConsumerConfig.GROUP_ID_CONFIG, "bordero-kafka",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false,
ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, EventDeserializer.class);
}
@Bean
public KafkaTemplate<Integer, Event<Operacion>> kafkaTemplatePlayDTO() {
return new KafkaTemplate<>(producerFactoryPlayDTO());
}
@Bean
public Producer<Operacion> producerPlayDTO() {
return new Producer<Operacion>(kafkaTemplatePlayDTO());
}
}
......@@ -4,7 +4,11 @@ package com.trabajopractico.demo.backend.controller;
import com.trabajopractico.demo.backend.model.Operacion;
import com.trabajopractico.demo.backend.model.OperacionType;
import com.trabajopractico.demo.backend.service.OperacionService;
import com.trabajopractico.demo.backend.kafka.Producer;
import com.trabajopractico.demo.backend.service.Response;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Example;
import org.springframework.data.domain.ExampleMatcher;
import org.springframework.http.HttpStatus;
......@@ -22,6 +26,10 @@ import java.util.Optional;
public class OperacionController {
@Autowired
Producer<Operacion> producer;
private OperacionService service;
List<Operacion> transacciones;
......@@ -61,6 +69,8 @@ public class OperacionController {
Operacion consulta = new Operacion(OperacionType.SALDO, this.saldo);
producer.logCreate("plays", consulta);
return consulta;
}
......@@ -82,6 +92,8 @@ public class OperacionController {
public Operacion saveDeposito(Operacion transaccion) {
Operacion opeNueva = service.saveOperacion(transaccion);
producer.logCreate("plays", opeNueva);
return opeNueva;
}
......@@ -90,6 +102,9 @@ public class OperacionController {
public Operacion saveExtraccion(Operacion transaccion) {
Operacion opeNueva = service.saveOperacion(transaccion);
producer.logCreate("plays", opeNueva);
return opeNueva;
}
......@@ -98,6 +113,8 @@ public class OperacionController {
public Operacion saveInteres(Operacion transaccion) {
Operacion opeNueva = service.saveOperacion(transaccion);
producer.logCreate("plays", opeNueva);
return opeNueva;
}
......
package com.trabajopractico.demo.backend.kafka;
import com.trabajopractico.demo.backend.model.Operacion;
import com.trabajopractico.demo.backend.service.OperacionService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class Consumer {
@Autowired
OperacionService operacionService;
@Value("${bordero.server.id}")
private String serverId;
@KafkaListener(topics="plays", groupId = "${bordero.server.id}")
public void consume(Event event) {
log.info("Event received at: " + serverId);
log.info("Consuming event: " + event.toString());
if (serverId.compareTo(event.serverId)!=0) {
if (event.type == EventType.CREATE) {
// Operacion play = mapper.toModel((PlayDTO) event.dto);
Operacion play = (Operacion) event.dto;
operacionService.saveOperacion(play);
log.info("Play inserted by consumer");
}
}
}
}
package com.trabajopractico.demo.backend.kafka;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.SuperBuilder;
@SuperBuilder(toBuilder = true)
@AllArgsConstructor
@NoArgsConstructor
@ToString
public class Event<Operacion> {
@Builder.Default
public final EventType type = null;
@Builder.Default
public final String serverId = null;
@Builder.Default
public final Operacion dto = null;
}
package com.trabajopractico.demo.backend.kafka;
public enum EventType {
CREATE, UPDATE, DELETE
}
package com.trabajopractico.demo.backend.kafka;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@Slf4j
@RequiredArgsConstructor
@Component
public class Producer<Operacion> {
@Value("${bordero.server.id}")
private String serverId;
private final KafkaTemplate<Integer, Event<Operacion>> template;
public void logCreate(String topic, Operacion dto) {
Event event = Event.builder()
.serverId(serverId)
.type(EventType.CREATE)
.dto(dto)
.build();
log.info("Sending event: " + event.toString());
template.send(topic, 1, event);
}
}
package com.trabajopractico.demo.backend.kafka.serdes;
import com.trabajopractico.demo.backend.kafka.Event;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.trabajopractico.demo.backend.model.Operacion;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
@Slf4j
public class EventDeserializer implements Deserializer<Event> {
private final ObjectMapper mapper = new ObjectMapper();
@Override
public Event deserialize(String topic, byte[] data) {
try {
if (data == null) {
log.error("Se pidió deserializar un valor nulo");
return null;
}
log.debug("Deserializando ...");
return mapper.readValue(
new String(data, "UTF-8"),
new TypeReference<Event<Operacion>>() {
});
} catch (Exception e) {
throw new SerializationException("Error en la deserialización de byte[] a Event");
}
}
}
package com.trabajopractico.demo.backend.kafka.serdes;
import com.trabajopractico.demo.backend.kafka.Event;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;
@Slf4j
public class EventSerializer implements Serializer<Event> {
private final ObjectMapper mapper = new ObjectMapper();
@Override
public byte[] serialize(String s, Event event) {
try {