' Requires NuGet: ' - Confluent.Kafka ' - Newtonsoft.Json ' Target framework: .NET Framework 4.8 oder .NET 6/8 (passt beides) Imports System.Threading Imports System.Threading.Tasks Imports Confluent.Kafka Imports Newtonsoft.Json Namespace Verag.Udm ''' ''' UDM-Record inkl. Beispielbefüllung und Kafka-Producer. ''' Datenschema gemäß bereitgestellter JSON-Struktur. :contentReference[oaicite:1]{index=1} ''' Public Class cATEZ_Greenpulse_KafkaDecs '======================== '== Kafka: Konfiguration (Klassenebene) '======================== Public Shared BootstrapServers As String = "192.168.85.250:8888" 'http://192.168.85.250:8888 Public Shared TopicName As String = "greenpulse.declarationdata.v1" ' Falls SASL/TLS benötigt: Public Shared UseSasl As Boolean = False Public Shared SaslUsername As String = "" Public Shared SaslPassword As String = "" Public Shared SecurityProtocolSetting As SecurityProtocol = SecurityProtocol.Plaintext Public Shared SaslMechanismSetting As SaslMechanism = SaslMechanism.Plain '======================== '== Datenobjekte lt. UDM-Schema '======================== Public Property Declaration As DeclarationNode Public Property Parties As PartiesNode Public Property Commercial As CommercialNode Public Property ExporterDetails As ExporterDetailsNode Public Property ImporterDetails As ImporterDetailsNode '--- declaration --- Public Class DeclarationNode Public Property DeclarationSourceId As String Public Property DeclarationNo As String Public Property DeclarationDate As String Public Property RequestedProcedure As String Public Property PreviousProcedure As String Public Property Goods As List(Of GoodItem) End Class Public Class GoodItem Public Property CommodityCode As String Public Property OriginCountryCode As String Public Property NetMass As String Public Property TypeOfMeasurementUnit As String Public Property SpecialProcedures As SpecialProceduresNode End Class Public Class SpecialProceduresNode Public Property MemberStateAutharization As String Public Property DischargeBillWaiver As String Public Property Authorisation As String Public Property StartTime As String Public Property EndTime As String Public Property Deadline As String End Class '--- parties --- Public Class PartiesNode Public Property ImporterIdentificationNumber As String Public Property ExporterIdentificationNumber As String Public Property ReportingDeclarantEORINumber As String Public Property TypeOfRepresentation As String End Class '--- commercial --- Public Class CommercialNode Public Property InvoiceNumbers As String Public Property InvoiceDate As String End Class '--- exporterDetails --- Public Class ExporterDetailsNode Public Property ExporterTitle As String Public Property ExporterEmail As String Public Property ExporterPhone As String End Class '--- importerDetails --- Public Class ImporterDetailsNode Public Property ImporterTitle As String Public Property ImporterEmail As String Public Property ImporterPhone As String Public Property ImporterCountryCodeOrMemberState As String Public Property ImporterSubdivision As String Public Property ImporterCity As String Public Property ImporterStreet As String Public Property ImporterStreetAdditional As String Public Property ImporterAddressNumber As String Public Property ImporterPostCode As String Public Property ImporterPoBox As String Public Property ImporterCoordinateLongitudeX As String Public Property ImporterCoordinateLatitudeY As String End Class '======================== '== Serialisierung '======================== Public Function ToJson(Optional pretty As Boolean = True) As String Dim format = If(pretty, Formatting.Indented, Formatting.None) Return JsonConvert.SerializeObject(Me, format) End Function '======================== '== Beispielbefüllung '======================== Public Shared Function BuildDemo() As cATEZ_Greenpulse_KafkaDecs Return New cATEZ_Greenpulse_KafkaDecs() With { .Declaration = New DeclarationNode() With { .DeclarationSourceId = "xx123", .DeclarationNo = "24AT000000INL0JD01", .DeclarationDate = "2024-11-22", .RequestedProcedure = "40", .PreviousProcedure = "00", .Goods = New List(Of GoodItem) From { New GoodItem() With { .CommodityCode = "72072710", .OriginCountryCode = "TR", .NetMass = "150", .TypeOfMeasurementUnit = "Tonnes", .SpecialProcedures = New SpecialProceduresNode() With { .MemberStateAutharization = "AT", .DischargeBillWaiver = "01", .Authorisation = "Name of authorisation", .StartTime = "2024-10-22", .EndTime = "2024-11-22", .Deadline = "2024-12-22" } } } }, .Parties = New PartiesNode() With { .ImporterIdentificationNumber = "ATEOS1000000001", .ExporterIdentificationNumber = "FR123456789000", .ReportingDeclarantEORINumber = "ATEOS1000000002", .TypeOfRepresentation = "01" }, .Commercial = New CommercialNode() With { .InvoiceNumbers = "123456789", .InvoiceDate = "2024-11-22" }, .ExporterDetails = New ExporterDetailsNode() With { .ExporterTitle = "", .ExporterEmail = "", .ExporterPhone = "" }, .ImporterDetails = New ImporterDetailsNode() With { .ImporterTitle = "Importer name", .ImporterEmail = "info@test.com", .ImporterPhone = "123456789", .ImporterCountryCodeOrMemberState = "DE", .ImporterSubdivision = "Sub-division", .ImporterCity = "City name", .ImporterStreet = "Street Name", .ImporterStreetAdditional = "Street additonal name", .ImporterAddressNumber = "10", .ImporterPostCode = "DCL-123", .ImporterPoBox = "PO DCL-123", .ImporterCoordinateLongitudeX = "41.0091982", .ImporterCoordinateLatitudeY = "28.9662187" } } End Function '======================== '== Unique-Key-Ermittlung (leer gelassen – später definieren) '======================== Public Shared Function GetUniqueKey(ByVal record As cATEZ_Greenpulse_KafkaDecs) As String ' TODO: Hier Logik zur Schlüsselbildung implementieren (z.B. declarationsourceId + declarationNo) Return "" End Function '======================== '== Kafka: Insert/Update (per Message-Key) '======================== Public Shared Async Function InsertOrUpdateToKafkaAsync(ByVal record As cATEZ_Greenpulse_KafkaDecs, Optional ct As CancellationToken = Nothing) As Task(Of DeliveryResult(Of String, String)) Dim cfg As New ProducerConfig() With { .BootstrapServers = BootstrapServers, .Acks = Acks.All, .EnableIdempotence = True, .MessageTimeoutMs = 30000 } If UseSasl Then cfg.SecurityProtocol = SecurityProtocolSetting cfg.SaslMechanism = SaslMechanismSetting cfg.SaslUsername = SaslUsername cfg.SaslPassword = SaslPassword ' Optional: cfg.SslCaLocation = "path\to\ca.pem" End If Dim key As String = GetUniqueKey(record) ' bleibt leer bis du definierst Dim payload As String = record.ToJson(False) Using producer As IProducer(Of String, String) = New ProducerBuilder(Of String, String)(cfg).Build() Dim msg As New Message(Of String, String) With { .key = key, .Value = payload } Dim result = Await producer.ProduceAsync(TopicName, msg, ct) ' Flush ist bei Await ProduceAsync nicht zwingend nötig, hier dennoch zur Sicherheit: producer.Flush(TimeSpan.FromSeconds(5)) Return result End Using End Function '======================== '== Sync-Wrapper (falls bevorzugt) '======================== Public Shared Function InsertOrUpdateToKafka(ByVal record As cATEZ_Greenpulse_KafkaDecs) As DeliveryResult(Of String, String) Return InsertOrUpdateToKafkaAsync(record).GetAwaiter().GetResult() End Function End Class End Namespace