Files
SDL/VERAG_PROG_ALLGEMEIN/Schnittstellen/ATEZ/GREENPULSE/cATEZ_Greenpulse_KafkaDecs.vb

588 lines
22 KiB
VB.net
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

' Requires NuGet:
' - Confluent.Kafka
' - Newtonsoft.Json
' Target framework: .NET Framework 4.8 oder .NET 6/8 (passt beides)
Imports System.Data.SqlClient
Imports System.Threading
Imports System.Threading.Tasks
Imports System.Windows.Forms
Imports Confluent.Kafka
Imports Newtonsoft.Json
''' <summary>
''' UDM-Record inkl. Beispielbefüllung und Kafka-Producer.
''' Datenschema gemäß bereitgestellter JSON-Struktur. :contentReference[oaicite:1]{index=1}
''' </summary>
Public Class cATEZ_Greenpulse_KafkaDecs
'========================
'== Kafka: Konfiguration (Klassenebene)
'========================
Public Shared BootstrapServers As String = "192.168.85.250:9092" 'http://192.168.85.250:8888
' Public Shared TopicName As String = "greenpulse.declarationdata.v1"
Public Shared TopicName As String = "dev.greenpulse.declarationdata.v1"
' Falls SASL/TLS benötigt:
Public Shared UseSasl As Boolean = False
Public Shared SaslUsername As String = ""
Public Shared SaslPassword As String = ""
Public Shared SecurityProtocolSetting As SecurityProtocol = SecurityProtocol.Plaintext
Public Shared SaslMechanismSetting As SaslMechanism = SaslMechanism.Plain
Private Const KEY_VERSION As String = "v1"
Private Const SEP_PIPE As Char = "|"c
'========================
'== Unique-Key-Ermittlung
'========================
Public Shared Function GetUniqueKey_Pipe(country As String, system As String, mrn As String) As String
Dim c = (country).ToUpperInvariant()
Dim s = (system).ToUpperInvariant()
Dim m = (mrn).ToUpperInvariant()
Return String.Join(SEP_PIPE, New String() {KEY_VERSION, c, s, m})
End Function
'========================
'== Datenobjekte lt. UDM-Schema
'========================
<JsonProperty("declaration")>
Public Property Declaration As DeclarationNode
<JsonProperty("parties")>
Public Property Parties As PartiesNode
<JsonProperty("commercial")>
Public Property Commercial As CommercialNode
<JsonProperty("exporterDetails")>
Public Property ExporterDetails As ExporterDetailsNode
<JsonProperty("importerDetails")>
Public Property ImporterDetails As ImporterDetailsNode
'--- documents ---
<JsonProperty("documents")>
Public Property Documents As List(Of DocumentNode)
'--- declaration ---
Public Class DeclarationNode
<JsonProperty("declarationsourceId")>
Public Property DeclarationSourceId As String
<JsonProperty("declarationNo")>
Public Property DeclarationNo As String
<JsonProperty("declarationDate")>
Public Property DeclarationDate As String
<JsonProperty("requestedProcedure")>
Public Property RequestedProcedure As String
<JsonProperty("previousProcedure")>
Public Property PreviousProcedure As String
<JsonProperty("goods")>
Public Property Goods As List(Of GoodItem)
End Class
Public Class GoodItem
<JsonProperty("commodityCode")>
Public Property CommodityCode As String
<JsonProperty("originCountryCode")>
Public Property OriginCountryCode As String
<JsonProperty("netMass")>
Public Property NetMass As String
<JsonProperty("typeOfMeasurementUnit")>
Public Property TypeOfMeasurementUnit As String
<JsonProperty("specialProcedures")>
Public Property SpecialProcedures As SpecialProceduresNode
End Class
Public Class SpecialProceduresNode
<JsonProperty("memberStateAutharization")>
Public Property MemberStateAutharization As String
<JsonProperty("dischargeBillWaiver")>
Public Property DischargeBillWaiver As String
<JsonProperty("authorisation")>
Public Property Authorisation As String
<JsonProperty("startTime")>
Public Property StartTime As String
<JsonProperty("endTime")>
Public Property EndTime As String
<JsonProperty("deadline")>
Public Property Deadline As String
End Class
'--- parties ---
Public Class PartiesNode
<JsonProperty("importerIdentificationNumber")>
Public Property ImporterIdentificationNumber As String
<JsonProperty("exporterIdentificationNumber")>
Public Property ExporterIdentificationNumber As String
<JsonProperty("reportingDeclarantEORINumber")>
Public Property ReportingDeclarantEORINumber As String
<JsonProperty("typeOfRepresentation")>
Public Property TypeOfRepresentation As String
End Class
'--- commercial ---
Public Class CommercialNode
<JsonProperty("invoiceNumbers")>
Public Property InvoiceNumbers As String
<JsonProperty("invoiceDate")>
Public Property InvoiceDate As String
End Class
'--- exporterDetails ---
Public Class ExporterDetailsNode
<JsonProperty("exporterTitle")>
Public Property ExporterTitle As String
<JsonProperty("exporterEmail")>
Public Property ExporterEmail As String
<JsonProperty("exporterPhone")>
Public Property ExporterPhone As String
End Class
'--- importerDetails ---
Public Class ImporterDetailsNode
<JsonProperty("importerTitle")>
Public Property ImporterTitle As String
<JsonProperty("importerEmail")>
Public Property ImporterEmail As String
<JsonProperty("importerPhone")>
Public Property ImporterPhone As String
<JsonProperty("importerCountryCodeOrMemberState")>
Public Property ImporterCountryCodeOrMemberState As String
<JsonProperty("importerSubdivision")>
Public Property ImporterSubdivision As String
<JsonProperty("importerCity")>
Public Property ImporterCity As String
<JsonProperty("importerStreet")>
Public Property ImporterStreet As String
<JsonProperty("importerStreetAdditional")>
Public Property ImporterStreetAdditional As String
<JsonProperty("importerAddressNumber")>
Public Property ImporterAddressNumber As String
<JsonProperty("importerPostCode")>
Public Property ImporterPostCode As String
<JsonProperty("importerPoBox")>
Public Property ImporterPoBox As String
<JsonProperty("importerCoordinateLongitudeX")>
Public Property ImporterCoordinateLongitudeX As String
<JsonProperty("importerCoordinateLatitudeY")>
Public Property ImporterCoordinateLatitudeY As String
End Class
Public Class DocumentNode
<JsonProperty("reference")>
Public Property Reference As String
<JsonProperty("doc-type")>
Public Property DocType As String
<JsonProperty("mime-type")>
Public Property MimeType As String
<JsonProperty("blob")>
Public Property Blob As String
End Class
'========================
'== Serialisierung
'========================
Public Function ToJson(Optional pretty As Boolean = True) As String
Dim format = If(pretty, Formatting.Indented, Formatting.None)
Return JsonConvert.SerializeObject(Me, format)
End Function
'========================
'== Beispielbefüllung
'========================
Public Shared Function BuildDemo() As cATEZ_Greenpulse_KafkaDecs
Return New cATEZ_Greenpulse_KafkaDecs() With {
.Declaration = New DeclarationNode() With {
.DeclarationSourceId = "xx123",
.DeclarationNo = "24AT000000INL0JD01",
.DeclarationDate = "2024-11-22",
.RequestedProcedure = "40",
.PreviousProcedure = "00",
.Goods = New List(Of GoodItem) From {
New GoodItem() With {
.CommodityCode = "72072710",
.OriginCountryCode = "TR",
.NetMass = "150",
.TypeOfMeasurementUnit = "Tonnes",
.SpecialProcedures = New SpecialProceduresNode() With {
.MemberStateAutharization = "AT",
.DischargeBillWaiver = "01",
.Authorisation = "Name of authorisation",
.StartTime = "2024-10-22",
.EndTime = "2024-11-22",
.Deadline = "2024-12-22"
}
}
}
},
.Parties = New PartiesNode() With {
.ImporterIdentificationNumber = "ATEOS1000000001",
.ExporterIdentificationNumber = "FR123456789000",
.ReportingDeclarantEORINumber = "ATEOS1000000002",
.TypeOfRepresentation = "01"
},
.Commercial = New CommercialNode() With {
.InvoiceNumbers = "123456789",
.InvoiceDate = "2024-11-22"
},
.ExporterDetails = New ExporterDetailsNode() With {
.ExporterTitle = "",
.ExporterEmail = "",
.ExporterPhone = ""
},
.ImporterDetails = New ImporterDetailsNode() With {
.ImporterTitle = "Importer name",
.ImporterEmail = "info@test.com",
.ImporterPhone = "123456789",
.ImporterCountryCodeOrMemberState = "DE",
.ImporterSubdivision = "Sub-division",
.ImporterCity = "City name",
.ImporterStreet = "Street Name",
.ImporterStreetAdditional = "Street additonal name",
.ImporterAddressNumber = "10",
.ImporterPostCode = "DCL-123",
.ImporterPoBox = "PO DCL-123",
.ImporterCoordinateLongitudeX = "41.0091982",
.ImporterCoordinateLatitudeY = "28.9662187"
},
.Documents = New List(Of cATEZ_Greenpulse_KafkaDecs.DocumentNode)()
}
End Function
'========================
'== Kafka: Insert/Update (per Message-Key)
'========================
Public Shared Function InsertOrUpdateToKafkaSync_Bool(rec As cATEZ_Greenpulse_KafkaDecs, unique_KEY As String, Optional waitMs As Integer = 30000) As Boolean
Try
Dim result = InsertOrUpdateToKafkaSync(rec, unique_KEY, waitMs)
Return True
Catch ex As Exception
MessageBox.Show("Fehler beim Senden an Kafka: " & ex.Message, "Fehler", MessageBoxButtons.OK, MessageBoxIcon.Error)
Return False
End Try
End Function
Public Shared Function InsertOrUpdateToKafkaSync(rec As cATEZ_Greenpulse_KafkaDecs, unique_KEY As String, Optional waitMs As Integer = 30000) As DeliveryResult(Of String, String)
Dim cfg As New ProducerConfig With {
.BootstrapServers = BootstrapServers,
.EnableIdempotence = True,
.Acks = Acks.All,
.MaxInFlight = 5,
.MessageTimeoutMs = Math.Max(waitMs, 60000),
.RequestTimeoutMs = 30000,
.CompressionType = Confluent.Kafka.CompressionType.Zstd, ' gute Kompression
.MessageMaxBytes = 20971520, ' ≈ 20 MB darf Topic/Broker nicht übersteigen
.EnableDeliveryReports = True,
.AllowAutoCreateTopics = True
}
Using producer = New ProducerBuilder(Of String, String)(cfg).Build()
Dim key = unique_KEY ' GetUniqueKey(rec)
Dim msg = New Message(Of String, String) With {.Key = key, .Value = rec.ToJson(False)}
Dim done As New Threading.ManualResetEventSlim(False)
Dim lastReport As DeliveryResult(Of String, String) = Nothing
Dim prodEx As ProduceException(Of String, String) = Nothing
producer.Produce(TopicName, msg,
Sub(r)
lastReport = r
done.Set()
End Sub)
' Warten wir gezielt auf den Delivery-Callback:
If Not done.Wait(waitMs) Then
' Producer ggf. noch auslaufen lassen
producer.Flush(TimeSpan.FromSeconds(5))
Throw New TimeoutException($"DeliveryCallback nach {waitMs} ms nicht eingetroffen.")
End If
' Fehler im Report?
' (Bei neueren Clients ist r.Error nur in der Exception; bei älteren ggf. r.Status prüfen.)
If lastReport Is Nothing Then
Throw New TimeoutException("DeliveryResult leer.")
End If
If lastReport.Status <> PersistenceStatus.Persisted Then
Throw New Exception($"Sende-Status: {lastReport.Status} @ {lastReport.TopicPartitionOffset}")
End If
Return lastReport
End Using
End Function
'========================
'== Sync-Wrapper (falls bevorzugt)
'========================
'Public Shared Function InsertOrUpdateToKafka(rec As cATEZ_Greenpulse_KafkaDecs) _
'As DeliveryResult(Of String, String)
' Return InsertOrUpdateToKafkaAsync(rec).GetAwaiter().GetResult()
'End Function
End Class
Public Class cATEZ_Greenpulse_KafkaDecsBuilder_DAKOSY 'WHSL nciht mehr
Public Shared Function BuildByMrn_DAKOSY_Archiv(mrn As String) As cATEZ_Greenpulse_KafkaDecs
Using con As SqlConnection = SQL.GetNewOpenConnectionAVISO()
'con.Open()
' Alle Zeilen zur MRN laden (Kopf + Positionen). Kopfinfo ist je Zeile dupliziert.
Dim sql As String = "
SELECT
*
FROM [tbl_DY_Zollmeldungen_Import]
WHERE [Registriernummer_MRN] = @mrn
ORDER BY cast([PositionNo] as int) , cast([Positionen] as int) , [Id];
"
Dim dt As New DataTable()
Using cmd As New SqlCommand(sql, con)
cmd.Parameters.AddWithValue("@mrn", mrn)
Using da As New SqlDataAdapter(cmd)
da.Fill(dt)
End Using
End Using
If dt.Rows.Count = 0 Then
Throw New InvalidOperationException("Keine Daten zur angegebenen MRN gefunden: " & mrn)
End If
' 1) Kopf aus der ersten Zeile ableiten
Dim head = dt.Rows(0)
Dim obj As New cATEZ_Greenpulse_KafkaDecs() With {
.Declaration = New cATEZ_Greenpulse_KafkaDecs.DeclarationNode() With {
.DeclarationSourceId = SafeStr(head("Registriernummer_MRN")),
.DeclarationNo = SafeStr(head("Registriernummer_MRN")),
.DeclarationDate = FirstNonEmptyDateStr(head, {"Annahmedatum", "Überlassungsdatum"}),
.RequestedProcedure = SafeStr(head("Verfahren")),
.PreviousProcedure = SafeStr(head("Verfahren2")),
.Goods = New List(Of cATEZ_Greenpulse_KafkaDecs.GoodItem)()
},
.Parties = New cATEZ_Greenpulse_KafkaDecs.PartiesNode() With {
.ImporterIdentificationNumber = FirstNonEmptyStr(head, {"Empfänger_CN_EORI", "UST_ID_Einführer"}),
.ExporterIdentificationNumber = SafeStr(head("Versender_CZ_EORI")),
.ReportingDeclarantEORINumber = SafeStr(head("Anmelder_DT_EORI")),
.TypeOfRepresentation = SafeStr(head("Art_der_Vertretung"))
},
.Commercial = New cATEZ_Greenpulse_KafkaDecs.CommercialNode(),
.ExporterDetails = New cATEZ_Greenpulse_KafkaDecs.ExporterDetailsNode() With {
.ExporterTitle = SafeStr(head("CZ_Name")),
.ExporterEmail = "",
.ExporterPhone = ""
},
.ImporterDetails = New cATEZ_Greenpulse_KafkaDecs.ImporterDetailsNode() With {
.ImporterTitle = SafeStr(head("CN_Name")),
.ImporterEmail = "",
.ImporterPhone = "",
.ImporterCountryCodeOrMemberState = SafeStr(head("CN_Ländercode")),
.ImporterSubdivision = "",
.ImporterCity = "",
.ImporterStreet = "",
.ImporterStreetAdditional = "",
.ImporterAddressNumber = "",
.ImporterPostCode = "",
.ImporterPoBox = "",
.ImporterCoordinateLongitudeX = "",
.ImporterCoordinateLatitudeY = ""
},
.Documents = New List(Of cATEZ_Greenpulse_KafkaDecs.DocumentNode)()
}
' 2) Commercial (Rechnung) aus Unterlagen N380, falls vorhanden
Dim invRow As DataRow = dt.AsEnumerable() _
.Where(Function(r) SafeStr(r("Unterlagenart")).Equals("N380", StringComparison.OrdinalIgnoreCase) _
AndAlso Not String.IsNullOrWhiteSpace(SafeStr(r("Unterlagennummer")))) _
.OrderBy(Function(r) SafeInt(r("Id"))) _
.Cast(Of DataRow)() _
.DefaultIfEmpty(Nothing) _
.FirstOrDefault()
' --- Dokumente aus Unterlagen übernehmen ---
Dim SQLS As New VERAG_PROG_ALLGEMEIN.SQL
Dim SenungsId = SQLS.getValueTxtBySql("SELECT dy_SendungsId from [tblDakosy_Zollanmeldungen] where dy_BezugsNr=''", "FMZOLL",,, Nothing)
If SenungsId IsNot Nothing Then
If IsNumeric(SenungsId) AndAlso SenungsId > 0 Then
Dim ANH_LIST As New List(Of cAvisoAnhaenge)
cAvisoAnhaenge.LOAD_LIST_BySendung(ANH_LIST, SenungsId)
For Each doc In ANH_LIST
Select Case doc.anh_Art
Case "Rechnung", "eFatura"
Dim dateiBytes As Byte() = System.IO.File.ReadAllBytes(VERAG_PROG_ALLGEMEIN.cDATENSERVER.GET_PDFPath_BY_DocID(doc.anh_docId))
Dim d As New cATEZ_Greenpulse_KafkaDecs.DocumentNode With {
.Reference = doc.anh_Name,
.DocType = "invoice",
.MimeType = cATEZ_Greenpulse_KafkaDecsBuilder_DAKOSY.GuessMimeTypeFromNumber(doc.anh_Typ),
.Blob = Convert.ToBase64String(dateiBytes)
}
obj.Documents.Add(d)
End Select
Next
End If
End If
If invRow IsNot Nothing Then
obj.Commercial.InvoiceNumbers = SafeStr(invRow("Unterlagennummer"))
obj.Commercial.InvoiceDate = SafeDateStr(invRow("Unterlagendatum"))
Else
obj.Commercial.InvoiceNumbers = ""
obj.Commercial.InvoiceDate = ""
End If
' 3) Goods je Positionszeile
For Each row As DataRow In dt.Rows
Dim commodity As String = SafeStr(row("Warentarifnummer"))
Dim hasPositionData As Boolean =
Not String.IsNullOrWhiteSpace(commodity) OrElse
Not IsNullOrEmpty(row("PositionNo")) OrElse
Not IsNullOrEmpty(row("Positionen"))
If hasPositionData Then
Dim origin As String = FirstNonEmptyStr(row, {"Ursprung", "Präferenzursprungsland"})
Dim netMass As String = FirstNonEmptyStr(row, {"Eigenmasse"})
Dim unit As String = FirstNonEmptyStr(row, {"Eigenmasseeinheit", "Maßeinheit"})
Dim gi As New cATEZ_Greenpulse_KafkaDecs.GoodItem() With {
.CommodityCode = commodity,
.OriginCountryCode = origin,
.NetMass = netMass,
.TypeOfMeasurementUnit = unit,
.SpecialProcedures = New cATEZ_Greenpulse_KafkaDecs.SpecialProceduresNode() With {
.MemberStateAutharization = SafeStr(row("DT_Ländercode")), ' Annahme: Anmelder-Land
.DischargeBillWaiver = "", ' kein Feld vorhanden
.Authorisation = SafeStr(row("Bewilligungsnummer")),
.StartTime = "",
.EndTime = "",
.Deadline = ""
}
}
obj.Declaration.Goods.Add(gi)
End If
Next
Return obj
End Using
End Function
'---------------------------
' Helper
'---------------------------
Private Shared Function SafeStr(value As Object) As String
If value Is Nothing OrElse Convert.IsDBNull(value) Then Return ""
Return Convert.ToString(value).Trim()
End Function
Private Shared Function SafeDateStr(value As Object) As String
If value Is Nothing OrElse Convert.IsDBNull(value) Then Return ""
Dim dt As DateTime
If DateTime.TryParse(Convert.ToString(value), dt) Then
Return dt.ToString("yyyy-MM-dd")
End If
Return ""
End Function
Private Shared Function FirstNonEmptyStr(row As DataRow, fields As IEnumerable(Of String)) As String
For Each f In fields
If row.Table.Columns.Contains(f) Then
Dim s = SafeStr(row(f))
If Not String.IsNullOrWhiteSpace(s) Then Return s
End If
Next
Return ""
End Function
Private Shared Function FirstNonEmptyDateStr(row As DataRow, fields As IEnumerable(Of String)) As String
For Each f In fields
If row.Table.Columns.Contains(f) Then
Dim s = SafeDateStr(row(f))
If Not String.IsNullOrWhiteSpace(s) Then Return s
End If
Next
Return ""
End Function
Private Shared Function SafeInt(value As Object) As Integer
If value Is Nothing OrElse Convert.IsDBNull(value) Then Return Integer.MaxValue
Dim i As Integer
If Integer.TryParse(Convert.ToString(value), i) Then Return i
Return Integer.MaxValue
End Function
Private Shared Function IsNullOrEmpty(value As Object) As Boolean
If value Is Nothing OrElse Convert.IsDBNull(value) Then Return True
Return String.IsNullOrWhiteSpace(Convert.ToString(value))
End Function
Public Shared Function GuessMimeTypeFromNumber(num As Object) As String
' Wenn du Dateiendungen erkennst (z. B. .pdf oder .jpg im Namen)
Dim s As String = SafeStr(num).ToLowerInvariant()
If s.EndsWith(".pdf") Or s.ToLower = "PDF" Then Return "application/pdf"
If s.EndsWith(".jpg") Or s.EndsWith(".jpeg") Or s.ToLower = "JPG" Or s.ToLower = "JPEG" Then Return "image/jpeg"
If s.EndsWith(".png") Or s.ToLower = "PNG" Then Return "image/png"
Return "application/octet-stream"
End Function
End Class