Merge branch 'newMaster2024' of https://dev.azure.com/VeragAG/_git/SDL into newMaster2024

This commit is contained in:
2025-09-03 17:32:09 +02:00
18 changed files with 1755 additions and 1195 deletions

View File

@@ -1,204 +1,217 @@

' Requires NuGet:
' Requires NuGet:
' - Confluent.Kafka
' - Newtonsoft.Json
' Target framework: .NET Framework 4.8 oder .NET 6/8 (passt beides)
Imports System.Data.SqlClient
Imports System.Threading
Imports System.Threading.Tasks
Imports Confluent.Kafka
Imports Newtonsoft.Json
Imports System.Threading.Tasks
Imports Confluent.Kafka
Imports Newtonsoft.Json
Namespace Verag.Udm
''' <summary>
''' UDM-Record inkl. Beispielbefüllung und Kafka-Producer.
''' Datenschema gemäß bereitgestellter JSON-Struktur. :contentReference[oaicite:1]{index=1}
''' </summary>
Public Class cATEZ_Greenpulse_KafkaDecs
'========================
'== Kafka: Konfiguration (Klassenebene)
'========================
Public Shared BootstrapServers As String = "192.168.85.250:8888" 'http://192.168.85.250:8888
Public Shared TopicName As String = "greenpulse.declarationdata.v1"
' Falls SASL/TLS benötigt:
Public Shared UseSasl As Boolean = False
Public Shared SaslUsername As String = ""
Public Shared SaslPassword As String = ""
Public Shared SecurityProtocolSetting As SecurityProtocol = SecurityProtocol.Plaintext
Public Shared SaslMechanismSetting As SaslMechanism = SaslMechanism.Plain
''' <summary>
''' UDM-Record inkl. Beispielbefüllung und Kafka-Producer.
''' Datenschema gemäß bereitgestellter JSON-Struktur. :contentReference[oaicite:1]{index=1}
''' </summary>
Public Class cATEZ_Greenpulse_KafkaDecs
'========================
'== Datenobjekte lt. UDM-Schema
'========================
'========================
'== Kafka: Konfiguration (Klassenebene)
'========================
Public Shared BootstrapServers As String = "192.168.85.250:9092" 'http://192.168.85.250:8888
Public Shared TopicName As String = "greenpulse.declarationdata.v1"
' Falls SASL/TLS benötigt:
Public Shared UseSasl As Boolean = False
Public Shared SaslUsername As String = ""
Public Shared SaslPassword As String = ""
Public Shared SecurityProtocolSetting As SecurityProtocol = SecurityProtocol.Plaintext
Public Shared SaslMechanismSetting As SaslMechanism = SaslMechanism.Plain
<JsonProperty("declaration")>
Public Property Declaration As DeclarationNode
<JsonProperty("parties")>
Public Property Parties As PartiesNode
Private Const KEY_VERSION As String = "v1"
Private Const SEP_PIPE As Char = "|"c
<JsonProperty("commercial")>
Public Property Commercial As CommercialNode
<JsonProperty("exporterDetails")>
Public Property ExporterDetails As ExporterDetailsNode
'========================
'== Unique-Key-Ermittlung
'========================
Public Shared Function GetUniqueKey_Pipe(country As String, system As String, mrn As String) As String
Dim c = (country).ToUpperInvariant()
Dim s = (system).ToUpperInvariant()
Dim m = (mrn).ToUpperInvariant()
Return String.Join(SEP_PIPE, New String() {KEY_VERSION, c, s, m})
End Function
<JsonProperty("importerDetails")>
Public Property ImporterDetails As ImporterDetailsNode
'========================
'== Datenobjekte lt. UDM-Schema
'========================
'--- declaration ---
Public Class DeclarationNode
<JsonProperty("declarationsourceId")>
Public Property DeclarationSourceId As String
<JsonProperty("declaration")>
Public Property Declaration As DeclarationNode
<JsonProperty("declarationNo")>
Public Property DeclarationNo As String
<JsonProperty("parties")>
Public Property Parties As PartiesNode
<JsonProperty("declarationDate")>
Public Property DeclarationDate As String
<JsonProperty("commercial")>
Public Property Commercial As CommercialNode
<JsonProperty("requestedProcedure")>
Public Property RequestedProcedure As String
<JsonProperty("exporterDetails")>
Public Property ExporterDetails As ExporterDetailsNode
<JsonProperty("previousProcedure")>
Public Property PreviousProcedure As String
<JsonProperty("importerDetails")>
Public Property ImporterDetails As ImporterDetailsNode
<JsonProperty("goods")>
Public Property Goods As List(Of GoodItem)
End Class
'--- declaration ---
Public Class DeclarationNode
<JsonProperty("declarationsourceId")>
Public Property DeclarationSourceId As String
Public Class GoodItem
<JsonProperty("commodityCode")>
Public Property CommodityCode As String
<JsonProperty("declarationNo")>
Public Property DeclarationNo As String
<JsonProperty("originCountryCode")>
Public Property OriginCountryCode As String
<JsonProperty("declarationDate")>
Public Property DeclarationDate As String
<JsonProperty("netMass")>
Public Property NetMass As String
<JsonProperty("requestedProcedure")>
Public Property RequestedProcedure As String
<JsonProperty("typeOfMeasurementUnit")>
Public Property TypeOfMeasurementUnit As String
<JsonProperty("previousProcedure")>
Public Property PreviousProcedure As String
<JsonProperty("specialProcedures")>
Public Property SpecialProcedures As SpecialProceduresNode
End Class
<JsonProperty("goods")>
Public Property Goods As List(Of GoodItem)
End Class
Public Class SpecialProceduresNode
<JsonProperty("memberStateAutharization")>
Public Property MemberStateAutharization As String
Public Class GoodItem
<JsonProperty("commodityCode")>
Public Property CommodityCode As String
<JsonProperty("dischargeBillWaiver")>
Public Property DischargeBillWaiver As String
<JsonProperty("originCountryCode")>
Public Property OriginCountryCode As String
<JsonProperty("authorisation")>
Public Property Authorisation As String
<JsonProperty("netMass")>
Public Property NetMass As String
<JsonProperty("startTime")>
Public Property StartTime As String
<JsonProperty("typeOfMeasurementUnit")>
Public Property TypeOfMeasurementUnit As String
<JsonProperty("endTime")>
Public Property EndTime As String
<JsonProperty("specialProcedures")>
Public Property SpecialProcedures As SpecialProceduresNode
End Class
<JsonProperty("deadline")>
Public Property Deadline As String
End Class
Public Class SpecialProceduresNode
<JsonProperty("memberStateAutharization")>
Public Property MemberStateAutharization As String
'--- parties ---
Public Class PartiesNode
<JsonProperty("importerIdentificationNumber")>
Public Property ImporterIdentificationNumber As String
<JsonProperty("dischargeBillWaiver")>
Public Property DischargeBillWaiver As String
<JsonProperty("exporterIdentificationNumber")>
Public Property ExporterIdentificationNumber As String
<JsonProperty("authorisation")>
Public Property Authorisation As String
<JsonProperty("reportingDeclarantEORINumber")>
Public Property ReportingDeclarantEORINumber As String
<JsonProperty("startTime")>
Public Property StartTime As String
<JsonProperty("typeOfRepresentation")>
Public Property TypeOfRepresentation As String
End Class
<JsonProperty("endTime")>
Public Property EndTime As String
'--- commercial ---
Public Class CommercialNode
<JsonProperty("invoiceNumbers")>
Public Property InvoiceNumbers As String
<JsonProperty("deadline")>
Public Property Deadline As String
End Class
<JsonProperty("invoiceDate")>
Public Property InvoiceDate As String
End Class
'--- parties ---
Public Class PartiesNode
<JsonProperty("importerIdentificationNumber")>
Public Property ImporterIdentificationNumber As String
'--- exporterDetails ---
Public Class ExporterDetailsNode
<JsonProperty("exporterTitle")>
Public Property ExporterTitle As String
<JsonProperty("exporterIdentificationNumber")>
Public Property ExporterIdentificationNumber As String
<JsonProperty("exporterEmail")>
Public Property ExporterEmail As String
<JsonProperty("reportingDeclarantEORINumber")>
Public Property ReportingDeclarantEORINumber As String
<JsonProperty("exporterPhone")>
Public Property ExporterPhone As String
End Class
<JsonProperty("typeOfRepresentation")>
Public Property TypeOfRepresentation As String
End Class
'--- importerDetails ---
Public Class ImporterDetailsNode
<JsonProperty("importerTitle")>
Public Property ImporterTitle As String
'--- commercial ---
Public Class CommercialNode
<JsonProperty("invoiceNumbers")>
Public Property InvoiceNumbers As String
<JsonProperty("importerEmail")>
Public Property ImporterEmail As String
<JsonProperty("invoiceDate")>
Public Property InvoiceDate As String
End Class
<JsonProperty("importerPhone")>
Public Property ImporterPhone As String
'--- exporterDetails ---
Public Class ExporterDetailsNode
<JsonProperty("exporterTitle")>
Public Property ExporterTitle As String
<JsonProperty("importerCountryCodeOrMemberState")>
Public Property ImporterCountryCodeOrMemberState As String
<JsonProperty("exporterEmail")>
Public Property ExporterEmail As String
<JsonProperty("importerSubdivision")>
Public Property ImporterSubdivision As String
<JsonProperty("exporterPhone")>
Public Property ExporterPhone As String
End Class
<JsonProperty("importerCity")>
Public Property ImporterCity As String
'--- importerDetails ---
Public Class ImporterDetailsNode
<JsonProperty("importerTitle")>
Public Property ImporterTitle As String
<JsonProperty("importerStreet")>
Public Property ImporterStreet As String
<JsonProperty("importerEmail")>
Public Property ImporterEmail As String
<JsonProperty("importerStreetAdditional")>
Public Property ImporterStreetAdditional As String
<JsonProperty("importerPhone")>
Public Property ImporterPhone As String
<JsonProperty("importerAddressNumber")>
Public Property ImporterAddressNumber As String
<JsonProperty("importerCountryCodeOrMemberState")>
Public Property ImporterCountryCodeOrMemberState As String
<JsonProperty("importerPostCode")>
Public Property ImporterPostCode As String
<JsonProperty("importerSubdivision")>
Public Property ImporterSubdivision As String
<JsonProperty("importerPoBox")>
Public Property ImporterPoBox As String
<JsonProperty("importerCity")>
Public Property ImporterCity As String
<JsonProperty("importerCoordinateLongitudeX")>
Public Property ImporterCoordinateLongitudeX As String
<JsonProperty("importerStreet")>
Public Property ImporterStreet As String
<JsonProperty("importerCoordinateLatitudeY")>
Public Property ImporterCoordinateLatitudeY As String
End Class
<JsonProperty("importerStreetAdditional")>
Public Property ImporterStreetAdditional As String
'========================
'== Serialisierung
'========================
Public Function ToJson(Optional pretty As Boolean = True) As String
Dim format = If(pretty, Formatting.Indented, Formatting.None)
Return JsonConvert.SerializeObject(Me, format)
End Function
<JsonProperty("importerAddressNumber")>
Public Property ImporterAddressNumber As String
'========================
'== Beispielbefüllung
'========================
Public Shared Function BuildDemo() As cATEZ_Greenpulse_KafkaDecs
Return New cATEZ_Greenpulse_KafkaDecs() With {
<JsonProperty("importerPostCode")>
Public Property ImporterPostCode As String
<JsonProperty("importerPoBox")>
Public Property ImporterPoBox As String
<JsonProperty("importerCoordinateLongitudeX")>
Public Property ImporterCoordinateLongitudeX As String
<JsonProperty("importerCoordinateLatitudeY")>
Public Property ImporterCoordinateLatitudeY As String
End Class
'========================
'== Serialisierung
'========================
Public Function ToJson(Optional pretty As Boolean = True) As String
Dim format = If(pretty, Formatting.Indented, Formatting.None)
Return JsonConvert.SerializeObject(Me, format)
End Function
'========================
'== Beispielbefüllung
'========================
Public Shared Function BuildDemo() As cATEZ_Greenpulse_KafkaDecs
Return New cATEZ_Greenpulse_KafkaDecs() With {
.Declaration = New DeclarationNode() With {
.DeclarationSourceId = "xx123",
.DeclarationNo = "24AT000000INL0JD01",
@@ -253,59 +266,242 @@ Namespace Verag.Udm
.ImporterCoordinateLatitudeY = "28.9662187"
}
}
End Function
End Function
'========================
'== Unique-Key-Ermittlung (leer gelassen später definieren)
'========================
Public Shared Function GetUniqueKey(ByVal record As cATEZ_Greenpulse_KafkaDecs) As String
' TODO: Hier Logik zur Schlüsselbildung implementieren (z.B. declarationsourceId + declarationNo)
Return ""
End Function
'========================
'== Kafka: Insert/Update (per Message-Key)
'========================
Public Shared Async Function InsertOrUpdateToKafkaAsync(ByVal record As cATEZ_Greenpulse_KafkaDecs,
Optional ct As CancellationToken = Nothing) As Task(Of DeliveryResult(Of String, String))
'========================
'== Kafka: Insert/Update (per Message-Key)
'========================
Public Shared Function InsertOrUpdateToKafkaSync(rec As cATEZ_Greenpulse_KafkaDecs, unique_KEY As String, Optional waitMs As Integer = 30000) As DeliveryResult(Of String, String)
Dim cfg As New ProducerConfig() With {
.BootstrapServers = BootstrapServers,
.Acks = Acks.All,
.EnableIdempotence = True,
.MessageTimeoutMs = 30000
}
Dim cfg As New ProducerConfig With {
.BootstrapServers = BootstrapServers,
.EnableIdempotence = True,
.Acks = Acks.All,
.MaxInFlight = 5,
.MessageTimeoutMs = Math.Max(waitMs, 60000),
.RequestTimeoutMs = 30000,
.EnableDeliveryReports = True,
.AllowAutoCreateTopics = True
}
If UseSasl Then
cfg.SecurityProtocol = SecurityProtocolSetting
cfg.SaslMechanism = SaslMechanismSetting
cfg.SaslUsername = SaslUsername
cfg.SaslPassword = SaslPassword
' Optional: cfg.SslCaLocation = "path\to\ca.pem"
Using producer = New ProducerBuilder(Of String, String)(cfg).Build()
Dim key = unique_KEY ' GetUniqueKey(rec)
Dim msg = New Message(Of String, String) With {.Key = key, .Value = rec.ToJson(False)}
Dim done As New Threading.ManualResetEventSlim(False)
Dim lastReport As DeliveryResult(Of String, String) = Nothing
Dim prodEx As ProduceException(Of String, String) = Nothing
producer.Produce(TopicName, msg,
Sub(r)
lastReport = r
done.Set()
End Sub)
' Warten wir gezielt auf den Delivery-Callback:
If Not done.Wait(waitMs) Then
' Producer ggf. noch auslaufen lassen
producer.Flush(TimeSpan.FromSeconds(5))
Throw New TimeoutException($"DeliveryCallback nach {waitMs} ms nicht eingetroffen.")
End If
Dim key As String = GetUniqueKey(record) ' bleibt leer bis du definierst
Dim payload As String = record.ToJson(False)
' Fehler im Report?
' (Bei neueren Clients ist r.Error nur in der Exception; bei älteren ggf. r.Status prüfen.)
If lastReport Is Nothing Then
Throw New TimeoutException("DeliveryResult leer.")
End If
If lastReport.Status <> PersistenceStatus.Persisted Then
Throw New Exception($"Sende-Status: {lastReport.Status} @ {lastReport.TopicPartitionOffset}")
End If
Using producer As IProducer(Of String, String) = New ProducerBuilder(Of String, String)(cfg).Build()
Dim msg As New Message(Of String, String) With {
.key = key,
.Value = payload
}
Dim result = Await producer.ProduceAsync(TopicName, msg, ct)
' Flush ist bei Await ProduceAsync nicht zwingend nötig, hier dennoch zur Sicherheit:
producer.Flush(TimeSpan.FromSeconds(5))
Return result
Return lastReport
End Using
End Function
'========================
'== Sync-Wrapper (falls bevorzugt)
'========================
'Public Shared Function InsertOrUpdateToKafka(rec As cATEZ_Greenpulse_KafkaDecs) _
'As DeliveryResult(Of String, String)
' Return InsertOrUpdateToKafkaAsync(rec).GetAwaiter().GetResult()
'End Function
End Class
Public Class cATEZ_Greenpulse_KafkaDecsBuilder_DAKOSY
Public Shared Function BuildByMrn(mrn As String) As cATEZ_Greenpulse_KafkaDecs
Using con As SqlConnection = SQL.GetNewOpenConnectionAVISO()
'con.Open()
' Alle Zeilen zur MRN laden (Kopf + Positionen). Kopfinfo ist je Zeile dupliziert.
Dim sql As String = "
SELECT
*
FROM [tbl_DY_Zollmeldungen_Import]
WHERE [Registriernummer_MRN] = @mrn
ORDER BY cast([PositionNo] as int) , cast([Positionen] as int) , [Id];
"
Dim dt As New DataTable()
Using cmd As New SqlCommand(sql, con)
cmd.Parameters.AddWithValue("@mrn", mrn)
Using da As New SqlDataAdapter(cmd)
da.Fill(dt)
End Using
End Using
End Function
'========================
'== Sync-Wrapper (falls bevorzugt)
'========================
Public Shared Function InsertOrUpdateToKafka(ByVal record As cATEZ_Greenpulse_KafkaDecs) As DeliveryResult(Of String, String)
Return InsertOrUpdateToKafkaAsync(record).GetAwaiter().GetResult()
End Function
If dt.Rows.Count = 0 Then
Throw New InvalidOperationException("Keine Daten zur angegebenen MRN gefunden: " & mrn)
End If
End Class
' 1) Kopf aus der ersten Zeile ableiten
Dim head = dt.Rows(0)
End Namespace
Dim obj As New cATEZ_Greenpulse_KafkaDecs() With {
.Declaration = New cATEZ_Greenpulse_KafkaDecs.DeclarationNode() With {
.DeclarationSourceId = SafeStr(head("Bezugsnummer_LRN")),
.DeclarationNo = SafeStr(head("Registriernummer_MRN")),
.DeclarationDate = FirstNonEmptyDateStr(head, {"Annahmedatum", "Überlassungsdatum"}),
.RequestedProcedure = SafeStr(head("Verfahren")),
.PreviousProcedure = SafeStr(head("Verfahren2")),
.Goods = New List(Of cATEZ_Greenpulse_KafkaDecs.GoodItem)()
},
.Parties = New cATEZ_Greenpulse_KafkaDecs.PartiesNode() With {
.ImporterIdentificationNumber = FirstNonEmptyStr(head, {"Empfänger_CN_EORI", "UST_ID_Einführer"}),
.ExporterIdentificationNumber = SafeStr(head("Versender_CZ_EORI")),
.ReportingDeclarantEORINumber = SafeStr(head("Anmelder_DT_EORI")),
.TypeOfRepresentation = SafeStr(head("Art_der_Vertretung"))
},
.Commercial = New cATEZ_Greenpulse_KafkaDecs.CommercialNode(),
.ExporterDetails = New cATEZ_Greenpulse_KafkaDecs.ExporterDetailsNode() With {
.ExporterTitle = SafeStr(head("CZ_Name")),
.ExporterEmail = "",
.ExporterPhone = ""
},
.ImporterDetails = New cATEZ_Greenpulse_KafkaDecs.ImporterDetailsNode() With {
.ImporterTitle = SafeStr(head("CN_Name")),
.ImporterEmail = "",
.ImporterPhone = "",
.ImporterCountryCodeOrMemberState = SafeStr(head("CN_Ländercode")),
.ImporterSubdivision = "",
.ImporterCity = "",
.ImporterStreet = "",
.ImporterStreetAdditional = "",
.ImporterAddressNumber = "",
.ImporterPostCode = "",
.ImporterPoBox = "",
.ImporterCoordinateLongitudeX = "",
.ImporterCoordinateLatitudeY = ""
}
}
' 2) Commercial (Rechnung) aus Unterlagen N380, falls vorhanden
Dim invRow As DataRow = dt.AsEnumerable() _
.Where(Function(r) SafeStr(r("Unterlagenart")).Equals("N380", StringComparison.OrdinalIgnoreCase) _
AndAlso Not String.IsNullOrWhiteSpace(SafeStr(r("Unterlagennummer")))) _
.OrderBy(Function(r) SafeInt(r("Id"))) _
.Cast(Of DataRow)() _
.DefaultIfEmpty(Nothing) _
.FirstOrDefault()
If invRow IsNot Nothing Then
obj.Commercial.InvoiceNumbers = SafeStr(invRow("Unterlagennummer"))
obj.Commercial.InvoiceDate = SafeDateStr(invRow("Unterlagendatum"))
Else
obj.Commercial.InvoiceNumbers = ""
obj.Commercial.InvoiceDate = ""
End If
' 3) Goods je Positionszeile
For Each row As DataRow In dt.Rows
Dim commodity As String = SafeStr(row("Warentarifnummer"))
Dim hasPositionData As Boolean =
Not String.IsNullOrWhiteSpace(commodity) OrElse
Not IsNullOrEmpty(row("PositionNo")) OrElse
Not IsNullOrEmpty(row("Positionen"))
If hasPositionData Then
Dim origin As String = FirstNonEmptyStr(row, {"Ursprung", "Präferenzursprungsland"})
Dim netMass As String = FirstNonEmptyStr(row, {"Eigenmasse"})
Dim unit As String = FirstNonEmptyStr(row, {"Eigenmasseeinheit", "Maßeinheit"})
Dim gi As New cATEZ_Greenpulse_KafkaDecs.GoodItem() With {
.CommodityCode = commodity,
.OriginCountryCode = origin,
.NetMass = netMass,
.TypeOfMeasurementUnit = unit,
.SpecialProcedures = New cATEZ_Greenpulse_KafkaDecs.SpecialProceduresNode() With {
.MemberStateAutharization = SafeStr(row("DT_Ländercode")), ' Annahme: Anmelder-Land
.DischargeBillWaiver = "", ' kein Feld vorhanden
.Authorisation = SafeStr(row("Bewilligungsnummer")),
.StartTime = "",
.EndTime = "",
.Deadline = ""
}
}
obj.Declaration.Goods.Add(gi)
End If
Next
Return obj
End Using
End Function
Public Shared Function BuildJsonByMrn(mrn As String, Optional pretty As Boolean = True) As String
Dim o = BuildByMrn(mrn)
Return o.ToJson(pretty)
End Function
'---------------------------
' Helper
'---------------------------
Private Shared Function SafeStr(value As Object) As String
If value Is Nothing OrElse Convert.IsDBNull(value) Then Return ""
Return Convert.ToString(value).Trim()
End Function
Private Shared Function SafeDateStr(value As Object) As String
If value Is Nothing OrElse Convert.IsDBNull(value) Then Return ""
Dim dt As DateTime
If DateTime.TryParse(Convert.ToString(value), dt) Then
Return dt.ToString("yyyy-MM-dd")
End If
Return ""
End Function
Private Shared Function FirstNonEmptyStr(row As DataRow, fields As IEnumerable(Of String)) As String
For Each f In fields
If row.Table.Columns.Contains(f) Then
Dim s = SafeStr(row(f))
If Not String.IsNullOrWhiteSpace(s) Then Return s
End If
Next
Return ""
End Function
Private Shared Function FirstNonEmptyDateStr(row As DataRow, fields As IEnumerable(Of String)) As String
For Each f In fields
If row.Table.Columns.Contains(f) Then
Dim s = SafeDateStr(row(f))
If Not String.IsNullOrWhiteSpace(s) Then Return s
End If
Next
Return ""
End Function
Private Shared Function SafeInt(value As Object) As Integer
If value Is Nothing OrElse Convert.IsDBNull(value) Then Return Integer.MaxValue
Dim i As Integer
If Integer.TryParse(Convert.ToString(value), i) Then Return i
Return Integer.MaxValue
End Function
Private Shared Function IsNullOrEmpty(value As Object) As Boolean
If value Is Nothing OrElse Convert.IsDBNull(value) Then Return True
Return String.IsNullOrWhiteSpace(Convert.ToString(value))
End Function
End Class