' Requires NuGet:
' - Confluent.Kafka
' - Newtonsoft.Json
' Target framework: .NET Framework 4.8 oder .NET 6/8 (passt beides)
Imports System.Data.SqlClient
Imports System.Threading
Imports System.Threading.Tasks
Imports System.Windows.Forms
Imports Confluent.Kafka
Imports Newtonsoft.Json
'''
''' UDM-Record inkl. Beispielbefüllung und Kafka-Producer.
''' Datenschema gemäß bereitgestellter JSON-Struktur. :contentReference[oaicite:1]{index=1}
'''
Public Class cATEZ_Greenpulse_KafkaDecs
'========================
'== Kafka: Konfiguration (Klassenebene)
'========================
Public Shared BootstrapServers As String = "192.168.85.250:9092" 'http://192.168.85.250:8888
' Public Shared TopicName As String = "greenpulse.declarationdata.v1"
Public Shared TopicName As String = "dev.greenpulse.declarationdata.v1"
' Falls SASL/TLS benötigt:
Public Shared UseSasl As Boolean = False
Public Shared SaslUsername As String = ""
Public Shared SaslPassword As String = ""
Public Shared SecurityProtocolSetting As SecurityProtocol = SecurityProtocol.Plaintext
Public Shared SaslMechanismSetting As SaslMechanism = SaslMechanism.Plain
Private Const KEY_VERSION As String = "v1"
Private Const SEP_PIPE As Char = "|"c
'========================
'== Unique-Key-Ermittlung
'========================
Public Shared Function GetUniqueKey_Pipe(country As String, system As String, mrn As String) As String
Dim c = (country).ToUpperInvariant()
Dim s = (system).ToUpperInvariant()
Dim m = (mrn).ToUpperInvariant()
Return String.Join(SEP_PIPE, New String() {KEY_VERSION, c, s, m})
End Function
'========================
'== Datenobjekte lt. UDM-Schema
'========================
Public Property Declaration As DeclarationNode
Public Property Parties As PartiesNode
Public Property Commercial As CommercialNode
Public Property ExporterDetails As ExporterDetailsNode
Public Property ImporterDetails As ImporterDetailsNode
'--- documents ---
Public Property Documents As List(Of DocumentNode)
'--- declaration ---
Public Class DeclarationNode
Public Property DeclarationSourceId As String
Public Property DeclarationNo As String
Public Property DeclarationDate As String
Public Property RequestedProcedure As String
Public Property PreviousProcedure As String
Public Property Goods As List(Of GoodItem)
End Class
Public Class GoodItem
Public Property CommodityCode As String
Public Property OriginCountryCode As String
Public Property NetMass As String
Public Property TypeOfMeasurementUnit As String
Public Property SpecialProcedures As SpecialProceduresNode
End Class
Public Class SpecialProceduresNode
Public Property MemberStateAutharization As String
Public Property DischargeBillWaiver As String
Public Property Authorisation As String
Public Property StartTime As String
Public Property EndTime As String
Public Property Deadline As String
End Class
'--- parties ---
Public Class PartiesNode
Public Property ImporterIdentificationNumber As String
Public Property ExporterIdentificationNumber As String
Public Property ReportingDeclarantEORINumber As String
Public Property TypeOfRepresentation As String
End Class
'--- commercial ---
Public Class CommercialNode
Public Property InvoiceNumbers As String
Public Property InvoiceDate As String
End Class
'--- exporterDetails ---
Public Class ExporterDetailsNode
Public Property ExporterTitle As String
Public Property ExporterEmail As String
Public Property ExporterPhone As String
End Class
'--- importerDetails ---
Public Class ImporterDetailsNode
Public Property ImporterTitle As String
Public Property ImporterEmail As String
Public Property ImporterPhone As String
Public Property ImporterCountryCodeOrMemberState As String
Public Property ImporterSubdivision As String
Public Property ImporterCity As String
Public Property ImporterStreet As String
Public Property ImporterStreetAdditional As String
Public Property ImporterAddressNumber As String
Public Property ImporterPostCode As String
Public Property ImporterPoBox As String
Public Property ImporterCoordinateLongitudeX As String
Public Property ImporterCoordinateLatitudeY As String
End Class
Public Class DocumentNode
Public Property Reference As String
Public Property DocType As String
Public Property MimeType As String
Public Property Blob As String
End Class
'========================
'== Serialisierung
'========================
Public Function ToJson(Optional pretty As Boolean = True) As String
Dim format = If(pretty, Formatting.Indented, Formatting.None)
Return JsonConvert.SerializeObject(Me, format)
End Function
'========================
'== Beispielbefüllung
'========================
Public Shared Function BuildDemo() As cATEZ_Greenpulse_KafkaDecs
Return New cATEZ_Greenpulse_KafkaDecs() With {
.Declaration = New DeclarationNode() With {
.DeclarationSourceId = "xx123",
.DeclarationNo = "24AT000000INL0JD01",
.DeclarationDate = "2024-11-22",
.RequestedProcedure = "40",
.PreviousProcedure = "00",
.Goods = New List(Of GoodItem) From {
New GoodItem() With {
.CommodityCode = "72072710",
.OriginCountryCode = "TR",
.NetMass = "150",
.TypeOfMeasurementUnit = "Tonnes",
.SpecialProcedures = New SpecialProceduresNode() With {
.MemberStateAutharization = "AT",
.DischargeBillWaiver = "01",
.Authorisation = "Name of authorisation",
.StartTime = "2024-10-22",
.EndTime = "2024-11-22",
.Deadline = "2024-12-22"
}
}
}
},
.Parties = New PartiesNode() With {
.ImporterIdentificationNumber = "ATEOS1000000001",
.ExporterIdentificationNumber = "FR123456789000",
.ReportingDeclarantEORINumber = "ATEOS1000000002",
.TypeOfRepresentation = "01"
},
.Commercial = New CommercialNode() With {
.InvoiceNumbers = "123456789",
.InvoiceDate = "2024-11-22"
},
.ExporterDetails = New ExporterDetailsNode() With {
.ExporterTitle = "",
.ExporterEmail = "",
.ExporterPhone = ""
},
.ImporterDetails = New ImporterDetailsNode() With {
.ImporterTitle = "Importer name",
.ImporterEmail = "info@test.com",
.ImporterPhone = "123456789",
.ImporterCountryCodeOrMemberState = "DE",
.ImporterSubdivision = "Sub-division",
.ImporterCity = "City name",
.ImporterStreet = "Street Name",
.ImporterStreetAdditional = "Street additonal name",
.ImporterAddressNumber = "10",
.ImporterPostCode = "DCL-123",
.ImporterPoBox = "PO DCL-123",
.ImporterCoordinateLongitudeX = "41.0091982",
.ImporterCoordinateLatitudeY = "28.9662187"
},
.Documents = New List(Of cATEZ_Greenpulse_KafkaDecs.DocumentNode)()
}
End Function
'========================
'== Kafka: Insert/Update (per Message-Key)
'========================
Public Shared Function InsertOrUpdateToKafkaSync_Bool(rec As cATEZ_Greenpulse_KafkaDecs, unique_KEY As String, Optional waitMs As Integer = 30000) As Boolean
Try
Dim result = InsertOrUpdateToKafkaSync(rec, unique_KEY, waitMs)
Return True
Catch ex As Exception
MessageBox.Show("Fehler beim Senden an Kafka: " & ex.Message, "Fehler", MessageBoxButtons.OK, MessageBoxIcon.Error)
Return False
End Try
End Function
Public Shared Function InsertOrUpdateToKafkaSync(rec As cATEZ_Greenpulse_KafkaDecs, unique_KEY As String, Optional waitMs As Integer = 30000) As DeliveryResult(Of String, String)
Dim cfg As New ProducerConfig With {
.BootstrapServers = BootstrapServers,
.EnableIdempotence = True,
.Acks = Acks.All,
.MaxInFlight = 5,
.MessageTimeoutMs = Math.Max(waitMs, 60000),
.RequestTimeoutMs = 30000,
.CompressionType = Confluent.Kafka.CompressionType.Zstd, ' gute Kompression
.MessageMaxBytes = 20971520, ' ≈ 20 MB – darf Topic/Broker nicht übersteigen
.EnableDeliveryReports = True,
.AllowAutoCreateTopics = True
}
Using producer = New ProducerBuilder(Of String, String)(cfg).Build()
Dim key = unique_KEY ' GetUniqueKey(rec)
Dim msg = New Message(Of String, String) With {.Key = key, .Value = rec.ToJson(False)}
Dim done As New Threading.ManualResetEventSlim(False)
Dim lastReport As DeliveryResult(Of String, String) = Nothing
Dim prodEx As ProduceException(Of String, String) = Nothing
producer.Produce(TopicName, msg,
Sub(r)
lastReport = r
done.Set()
End Sub)
' Warten wir gezielt auf den Delivery-Callback:
If Not done.Wait(waitMs) Then
' Producer ggf. noch auslaufen lassen
producer.Flush(TimeSpan.FromSeconds(5))
Throw New TimeoutException($"DeliveryCallback nach {waitMs} ms nicht eingetroffen.")
End If
' Fehler im Report?
' (Bei neueren Clients ist r.Error nur in der Exception; bei älteren ggf. r.Status prüfen.)
If lastReport Is Nothing Then
Throw New TimeoutException("DeliveryResult leer.")
End If
If lastReport.Status <> PersistenceStatus.Persisted Then
Throw New Exception($"Sende-Status: {lastReport.Status} @ {lastReport.TopicPartitionOffset}")
End If
Return lastReport
End Using
End Function
'========================
'== Sync-Wrapper (falls bevorzugt)
'========================
'Public Shared Function InsertOrUpdateToKafka(rec As cATEZ_Greenpulse_KafkaDecs) _
'As DeliveryResult(Of String, String)
' Return InsertOrUpdateToKafkaAsync(rec).GetAwaiter().GetResult()
'End Function
End Class
Public Class cATEZ_Greenpulse_KafkaDecsBuilder_DAKOSY
Public Shared Function BuildByMrn_DAKOSY_Archiv(mrn As String) As cATEZ_Greenpulse_KafkaDecs
Using con As SqlConnection = SQL.GetNewOpenConnectionAVISO()
'con.Open()
' Alle Zeilen zur MRN laden (Kopf + Positionen). Kopfinfo ist je Zeile dupliziert.
Dim sql As String = "
SELECT
*
FROM [tbl_DY_Zollmeldungen_Import]
WHERE [Registriernummer_MRN] = @mrn
ORDER BY cast([PositionNo] as int) , cast([Positionen] as int) , [Id];
"
Dim dt As New DataTable()
Using cmd As New SqlCommand(sql, con)
cmd.Parameters.AddWithValue("@mrn", mrn)
Using da As New SqlDataAdapter(cmd)
da.Fill(dt)
End Using
End Using
If dt.Rows.Count = 0 Then
Throw New InvalidOperationException("Keine Daten zur angegebenen MRN gefunden: " & mrn)
End If
' 1) Kopf aus der ersten Zeile ableiten
Dim head = dt.Rows(0)
Dim obj As New cATEZ_Greenpulse_KafkaDecs() With {
.Declaration = New cATEZ_Greenpulse_KafkaDecs.DeclarationNode() With {
.DeclarationSourceId = SafeStr(head("Registriernummer_MRN")),
.DeclarationNo = SafeStr(head("Registriernummer_MRN")),
.DeclarationDate = FirstNonEmptyDateStr(head, {"Annahmedatum", "Überlassungsdatum"}),
.RequestedProcedure = SafeStr(head("Verfahren")),
.PreviousProcedure = SafeStr(head("Verfahren2")),
.Goods = New List(Of cATEZ_Greenpulse_KafkaDecs.GoodItem)()
},
.Parties = New cATEZ_Greenpulse_KafkaDecs.PartiesNode() With {
.ImporterIdentificationNumber = FirstNonEmptyStr(head, {"Empfänger_CN_EORI", "UST_ID_Einführer"}),
.ExporterIdentificationNumber = SafeStr(head("Versender_CZ_EORI")),
.ReportingDeclarantEORINumber = SafeStr(head("Anmelder_DT_EORI")),
.TypeOfRepresentation = SafeStr(head("Art_der_Vertretung"))
},
.Commercial = New cATEZ_Greenpulse_KafkaDecs.CommercialNode(),
.ExporterDetails = New cATEZ_Greenpulse_KafkaDecs.ExporterDetailsNode() With {
.ExporterTitle = SafeStr(head("CZ_Name")),
.ExporterEmail = "",
.ExporterPhone = ""
},
.ImporterDetails = New cATEZ_Greenpulse_KafkaDecs.ImporterDetailsNode() With {
.ImporterTitle = SafeStr(head("CN_Name")),
.ImporterEmail = "",
.ImporterPhone = "",
.ImporterCountryCodeOrMemberState = SafeStr(head("CN_Ländercode")),
.ImporterSubdivision = "",
.ImporterCity = "",
.ImporterStreet = "",
.ImporterStreetAdditional = "",
.ImporterAddressNumber = "",
.ImporterPostCode = "",
.ImporterPoBox = "",
.ImporterCoordinateLongitudeX = "",
.ImporterCoordinateLatitudeY = ""
},
.Documents = New List(Of cATEZ_Greenpulse_KafkaDecs.DocumentNode)()
}
' 2) Commercial (Rechnung) – aus Unterlagen N380, falls vorhanden
Dim invRow As DataRow = dt.AsEnumerable() _
.Where(Function(r) SafeStr(r("Unterlagenart")).Equals("N380", StringComparison.OrdinalIgnoreCase) _
AndAlso Not String.IsNullOrWhiteSpace(SafeStr(r("Unterlagennummer")))) _
.OrderBy(Function(r) SafeInt(r("Id"))) _
.Cast(Of DataRow)() _
.DefaultIfEmpty(Nothing) _
.FirstOrDefault()
' --- Dokumente aus Unterlagen übernehmen ---
Dim SQLS As New VERAG_PROG_ALLGEMEIN.SQL
Dim SenungsId = SQLS.getValueTxtBySql("SELECT dy_SendungsId from [tblDakosy_Zollanmeldungen] where dy_BezugsNr=''", "FMZOLL",,, Nothing)
If SenungsId IsNot Nothing Then
If IsNumeric(SenungsId) AndAlso SenungsId > 0 Then
Dim ANH_LIST As New List(Of cAvisoAnhaenge)
cAvisoAnhaenge.LOAD_LIST_BySendung(ANH_LIST, SenungsId)
For Each doc In ANH_LIST
Select Case doc.anh_Art
Case "Rechnung", "eFatura"
Dim dateiBytes As Byte() = System.IO.File.ReadAllBytes(VERAG_PROG_ALLGEMEIN.cDATENSERVER.GET_PDFPath_BY_DocID(doc.anh_docId))
Dim d As New cATEZ_Greenpulse_KafkaDecs.DocumentNode With {
.Reference = doc.anh_Name,
.DocType = "invoice",
.MimeType = cATEZ_Greenpulse_KafkaDecsBuilder_DAKOSY.GuessMimeTypeFromNumber(doc.anh_Typ),
.Blob = Convert.ToBase64String(dateiBytes)
}
obj.Documents.Add(d)
End Select
Next
End If
End If
If invRow IsNot Nothing Then
obj.Commercial.InvoiceNumbers = SafeStr(invRow("Unterlagennummer"))
obj.Commercial.InvoiceDate = SafeDateStr(invRow("Unterlagendatum"))
Else
obj.Commercial.InvoiceNumbers = ""
obj.Commercial.InvoiceDate = ""
End If
' 3) Goods je Positionszeile
For Each row As DataRow In dt.Rows
Dim commodity As String = SafeStr(row("Warentarifnummer"))
Dim hasPositionData As Boolean =
Not String.IsNullOrWhiteSpace(commodity) OrElse
Not IsNullOrEmpty(row("PositionNo")) OrElse
Not IsNullOrEmpty(row("Positionen"))
If hasPositionData Then
Dim origin As String = FirstNonEmptyStr(row, {"Ursprung", "Präferenzursprungsland"})
Dim netMass As String = FirstNonEmptyStr(row, {"Eigenmasse"})
Dim unit As String = FirstNonEmptyStr(row, {"Eigenmasseeinheit", "Maßeinheit"})
Dim gi As New cATEZ_Greenpulse_KafkaDecs.GoodItem() With {
.CommodityCode = commodity,
.OriginCountryCode = origin,
.NetMass = netMass,
.TypeOfMeasurementUnit = unit,
.SpecialProcedures = New cATEZ_Greenpulse_KafkaDecs.SpecialProceduresNode() With {
.MemberStateAutharization = SafeStr(row("DT_Ländercode")), ' Annahme: Anmelder-Land
.DischargeBillWaiver = "", ' kein Feld vorhanden
.Authorisation = SafeStr(row("Bewilligungsnummer")),
.StartTime = "",
.EndTime = "",
.Deadline = ""
}
}
obj.Declaration.Goods.Add(gi)
End If
Next
Return obj
End Using
End Function
'---------------------------
' Helper
'---------------------------
Private Shared Function SafeStr(value As Object) As String
If value Is Nothing OrElse Convert.IsDBNull(value) Then Return ""
Return Convert.ToString(value).Trim()
End Function
Private Shared Function SafeDateStr(value As Object) As String
If value Is Nothing OrElse Convert.IsDBNull(value) Then Return ""
Dim dt As DateTime
If DateTime.TryParse(Convert.ToString(value), dt) Then
Return dt.ToString("yyyy-MM-dd")
End If
Return ""
End Function
Private Shared Function FirstNonEmptyStr(row As DataRow, fields As IEnumerable(Of String)) As String
For Each f In fields
If row.Table.Columns.Contains(f) Then
Dim s = SafeStr(row(f))
If Not String.IsNullOrWhiteSpace(s) Then Return s
End If
Next
Return ""
End Function
Private Shared Function FirstNonEmptyDateStr(row As DataRow, fields As IEnumerable(Of String)) As String
For Each f In fields
If row.Table.Columns.Contains(f) Then
Dim s = SafeDateStr(row(f))
If Not String.IsNullOrWhiteSpace(s) Then Return s
End If
Next
Return ""
End Function
Private Shared Function SafeInt(value As Object) As Integer
If value Is Nothing OrElse Convert.IsDBNull(value) Then Return Integer.MaxValue
Dim i As Integer
If Integer.TryParse(Convert.ToString(value), i) Then Return i
Return Integer.MaxValue
End Function
Private Shared Function IsNullOrEmpty(value As Object) As Boolean
If value Is Nothing OrElse Convert.IsDBNull(value) Then Return True
Return String.IsNullOrWhiteSpace(Convert.ToString(value))
End Function
Public Shared Function GuessMimeTypeFromNumber(num As Object) As String
' Wenn du Dateiendungen erkennst (z. B. .pdf oder .jpg im Namen)
Dim s As String = SafeStr(num).ToLowerInvariant()
If s.EndsWith(".pdf") Or s.ToLower = "PDF" Then Return "application/pdf"
If s.EndsWith(".jpg") Or s.EndsWith(".jpeg") Or s.ToLower = "JPG" Or s.ToLower = "JPEG" Then Return "image/jpeg"
If s.EndsWith(".png") Or s.ToLower = "PNG" Then Return "image/png"
Return "application/octet-stream"
End Function
End Class