509 lines
18 KiB
VB.net
509 lines
18 KiB
VB.net
' Requires NuGet:
|
||
' - Confluent.Kafka
|
||
' - Newtonsoft.Json
|
||
' Target framework: .NET Framework 4.8 oder .NET 6/8 (passt beides)
|
||
|
||
Imports System.Data.SqlClient
|
||
Imports System.Threading
|
||
Imports System.Threading.Tasks
|
||
Imports Confluent.Kafka
|
||
|
||
Imports Newtonsoft.Json
|
||
|
||
|
||
|
||
''' <summary>
|
||
''' UDM-Record inkl. Beispielbefüllung und Kafka-Producer.
|
||
''' Datenschema gemäß bereitgestellter JSON-Struktur. :contentReference[oaicite:1]{index=1}
|
||
''' </summary>
|
||
Public Class cATEZ_Greenpulse_KafkaDecs
|
||
|
||
'========================
|
||
'== Kafka: Konfiguration (Klassenebene)
|
||
'========================
|
||
Public Shared BootstrapServers As String = "192.168.85.250:9092" 'http://192.168.85.250:8888
|
||
Public Shared TopicName As String = "greenpulse.declarationdata.v1"
|
||
'Public Shared TopicName As String = "dev.greenpulse.declarationdata.v1"
|
||
' Falls SASL/TLS benötigt:
|
||
Public Shared UseSasl As Boolean = False
|
||
Public Shared SaslUsername As String = ""
|
||
Public Shared SaslPassword As String = ""
|
||
Public Shared SecurityProtocolSetting As SecurityProtocol = SecurityProtocol.Plaintext
|
||
Public Shared SaslMechanismSetting As SaslMechanism = SaslMechanism.Plain
|
||
|
||
|
||
Private Const KEY_VERSION As String = "v1"
|
||
Private Const SEP_PIPE As Char = "|"c
|
||
|
||
|
||
'========================
|
||
'== Unique-Key-Ermittlung
|
||
'========================
|
||
Public Shared Function GetUniqueKey_Pipe(country As String, system As String, mrn As String) As String
|
||
Dim c = (country).ToUpperInvariant()
|
||
Dim s = (system).ToUpperInvariant()
|
||
Dim m = (mrn).ToUpperInvariant()
|
||
Return String.Join(SEP_PIPE, New String() {KEY_VERSION, c, s, m})
|
||
End Function
|
||
|
||
'========================
|
||
'== Datenobjekte lt. UDM-Schema
|
||
'========================
|
||
|
||
<JsonProperty("declaration")>
|
||
Public Property Declaration As DeclarationNode
|
||
|
||
<JsonProperty("parties")>
|
||
Public Property Parties As PartiesNode
|
||
|
||
<JsonProperty("commercial")>
|
||
Public Property Commercial As CommercialNode
|
||
|
||
<JsonProperty("exporterDetails")>
|
||
Public Property ExporterDetails As ExporterDetailsNode
|
||
|
||
<JsonProperty("importerDetails")>
|
||
Public Property ImporterDetails As ImporterDetailsNode
|
||
|
||
'--- declaration ---
|
||
Public Class DeclarationNode
|
||
<JsonProperty("declarationsourceId")>
|
||
Public Property DeclarationSourceId As String
|
||
|
||
<JsonProperty("declarationNo")>
|
||
Public Property DeclarationNo As String
|
||
|
||
<JsonProperty("declarationDate")>
|
||
Public Property DeclarationDate As String
|
||
|
||
<JsonProperty("requestedProcedure")>
|
||
Public Property RequestedProcedure As String
|
||
|
||
<JsonProperty("previousProcedure")>
|
||
Public Property PreviousProcedure As String
|
||
|
||
<JsonProperty("goods")>
|
||
Public Property Goods As List(Of GoodItem)
|
||
End Class
|
||
|
||
Public Class GoodItem
|
||
<JsonProperty("commodityCode")>
|
||
Public Property CommodityCode As String
|
||
|
||
<JsonProperty("originCountryCode")>
|
||
Public Property OriginCountryCode As String
|
||
|
||
<JsonProperty("netMass")>
|
||
Public Property NetMass As String
|
||
|
||
<JsonProperty("typeOfMeasurementUnit")>
|
||
Public Property TypeOfMeasurementUnit As String
|
||
|
||
<JsonProperty("specialProcedures")>
|
||
Public Property SpecialProcedures As SpecialProceduresNode
|
||
End Class
|
||
|
||
Public Class SpecialProceduresNode
|
||
<JsonProperty("memberStateAutharization")>
|
||
Public Property MemberStateAutharization As String
|
||
|
||
<JsonProperty("dischargeBillWaiver")>
|
||
Public Property DischargeBillWaiver As String
|
||
|
||
<JsonProperty("authorisation")>
|
||
Public Property Authorisation As String
|
||
|
||
<JsonProperty("startTime")>
|
||
Public Property StartTime As String
|
||
|
||
<JsonProperty("endTime")>
|
||
Public Property EndTime As String
|
||
|
||
<JsonProperty("deadline")>
|
||
Public Property Deadline As String
|
||
End Class
|
||
|
||
'--- parties ---
|
||
Public Class PartiesNode
|
||
<JsonProperty("importerIdentificationNumber")>
|
||
Public Property ImporterIdentificationNumber As String
|
||
|
||
<JsonProperty("exporterIdentificationNumber")>
|
||
Public Property ExporterIdentificationNumber As String
|
||
|
||
<JsonProperty("reportingDeclarantEORINumber")>
|
||
Public Property ReportingDeclarantEORINumber As String
|
||
|
||
<JsonProperty("typeOfRepresentation")>
|
||
Public Property TypeOfRepresentation As String
|
||
End Class
|
||
|
||
'--- commercial ---
|
||
Public Class CommercialNode
|
||
<JsonProperty("invoiceNumbers")>
|
||
Public Property InvoiceNumbers As String
|
||
|
||
<JsonProperty("invoiceDate")>
|
||
Public Property InvoiceDate As String
|
||
End Class
|
||
|
||
'--- exporterDetails ---
|
||
Public Class ExporterDetailsNode
|
||
<JsonProperty("exporterTitle")>
|
||
Public Property ExporterTitle As String
|
||
|
||
<JsonProperty("exporterEmail")>
|
||
Public Property ExporterEmail As String
|
||
|
||
<JsonProperty("exporterPhone")>
|
||
Public Property ExporterPhone As String
|
||
End Class
|
||
|
||
'--- importerDetails ---
|
||
Public Class ImporterDetailsNode
|
||
<JsonProperty("importerTitle")>
|
||
Public Property ImporterTitle As String
|
||
|
||
<JsonProperty("importerEmail")>
|
||
Public Property ImporterEmail As String
|
||
|
||
<JsonProperty("importerPhone")>
|
||
Public Property ImporterPhone As String
|
||
|
||
<JsonProperty("importerCountryCodeOrMemberState")>
|
||
Public Property ImporterCountryCodeOrMemberState As String
|
||
|
||
<JsonProperty("importerSubdivision")>
|
||
Public Property ImporterSubdivision As String
|
||
|
||
<JsonProperty("importerCity")>
|
||
Public Property ImporterCity As String
|
||
|
||
<JsonProperty("importerStreet")>
|
||
Public Property ImporterStreet As String
|
||
|
||
<JsonProperty("importerStreetAdditional")>
|
||
Public Property ImporterStreetAdditional As String
|
||
|
||
<JsonProperty("importerAddressNumber")>
|
||
Public Property ImporterAddressNumber As String
|
||
|
||
<JsonProperty("importerPostCode")>
|
||
Public Property ImporterPostCode As String
|
||
|
||
<JsonProperty("importerPoBox")>
|
||
Public Property ImporterPoBox As String
|
||
|
||
<JsonProperty("importerCoordinateLongitudeX")>
|
||
Public Property ImporterCoordinateLongitudeX As String
|
||
|
||
<JsonProperty("importerCoordinateLatitudeY")>
|
||
Public Property ImporterCoordinateLatitudeY As String
|
||
End Class
|
||
|
||
'========================
|
||
'== Serialisierung
|
||
'========================
|
||
Public Function ToJson(Optional pretty As Boolean = True) As String
|
||
Dim format = If(pretty, Formatting.Indented, Formatting.None)
|
||
Return JsonConvert.SerializeObject(Me, format)
|
||
End Function
|
||
|
||
'========================
|
||
'== Beispielbefüllung
|
||
'========================
|
||
Public Shared Function BuildDemo() As cATEZ_Greenpulse_KafkaDecs
|
||
Return New cATEZ_Greenpulse_KafkaDecs() With {
|
||
.Declaration = New DeclarationNode() With {
|
||
.DeclarationSourceId = "xx123",
|
||
.DeclarationNo = "24AT000000INL0JD01",
|
||
.DeclarationDate = "2024-11-22",
|
||
.RequestedProcedure = "40",
|
||
.PreviousProcedure = "00",
|
||
.Goods = New List(Of GoodItem) From {
|
||
New GoodItem() With {
|
||
.CommodityCode = "72072710",
|
||
.OriginCountryCode = "TR",
|
||
.NetMass = "150",
|
||
.TypeOfMeasurementUnit = "Tonnes",
|
||
.SpecialProcedures = New SpecialProceduresNode() With {
|
||
.MemberStateAutharization = "AT",
|
||
.DischargeBillWaiver = "01",
|
||
.Authorisation = "Name of authorisation",
|
||
.StartTime = "2024-10-22",
|
||
.EndTime = "2024-11-22",
|
||
.Deadline = "2024-12-22"
|
||
}
|
||
}
|
||
}
|
||
},
|
||
.Parties = New PartiesNode() With {
|
||
.ImporterIdentificationNumber = "ATEOS1000000001",
|
||
.ExporterIdentificationNumber = "FR123456789000",
|
||
.ReportingDeclarantEORINumber = "ATEOS1000000002",
|
||
.TypeOfRepresentation = "01"
|
||
},
|
||
.Commercial = New CommercialNode() With {
|
||
.InvoiceNumbers = "123456789",
|
||
.InvoiceDate = "2024-11-22"
|
||
},
|
||
.ExporterDetails = New ExporterDetailsNode() With {
|
||
.ExporterTitle = "",
|
||
.ExporterEmail = "",
|
||
.ExporterPhone = ""
|
||
},
|
||
.ImporterDetails = New ImporterDetailsNode() With {
|
||
.ImporterTitle = "Importer name",
|
||
.ImporterEmail = "info@test.com",
|
||
.ImporterPhone = "123456789",
|
||
.ImporterCountryCodeOrMemberState = "DE",
|
||
.ImporterSubdivision = "Sub-division",
|
||
.ImporterCity = "City name",
|
||
.ImporterStreet = "Street Name",
|
||
.ImporterStreetAdditional = "Street additonal name",
|
||
.ImporterAddressNumber = "10",
|
||
.ImporterPostCode = "DCL-123",
|
||
.ImporterPoBox = "PO DCL-123",
|
||
.ImporterCoordinateLongitudeX = "41.0091982",
|
||
.ImporterCoordinateLatitudeY = "28.9662187"
|
||
}
|
||
}
|
||
End Function
|
||
|
||
|
||
'========================
|
||
'== Kafka: Insert/Update (per Message-Key)
|
||
'========================
|
||
Public Shared Function InsertOrUpdateToKafkaSync(rec As cATEZ_Greenpulse_KafkaDecs, unique_KEY As String, Optional waitMs As Integer = 30000) As DeliveryResult(Of String, String)
|
||
|
||
|
||
Dim cfg As New ProducerConfig With {
|
||
.BootstrapServers = BootstrapServers,
|
||
.EnableIdempotence = True,
|
||
.Acks = Acks.All,
|
||
.MaxInFlight = 5,
|
||
.MessageTimeoutMs = Math.Max(waitMs, 60000),
|
||
.RequestTimeoutMs = 30000,
|
||
.EnableDeliveryReports = True,
|
||
.AllowAutoCreateTopics = True
|
||
}
|
||
|
||
Using producer = New ProducerBuilder(Of String, String)(cfg).Build()
|
||
Dim key = unique_KEY ' GetUniqueKey(rec)
|
||
Dim msg = New Message(Of String, String) With {.key = key, .Value = rec.ToJson(False)}
|
||
|
||
Dim done As New Threading.ManualResetEventSlim(False)
|
||
Dim lastReport As DeliveryResult(Of String, String) = Nothing
|
||
Dim prodEx As ProduceException(Of String, String) = Nothing
|
||
|
||
producer.Produce(TopicName, msg,
|
||
Sub(r)
|
||
lastReport = r
|
||
done.Set()
|
||
End Sub)
|
||
|
||
' Warten wir gezielt auf den Delivery-Callback:
|
||
If Not done.Wait(waitMs) Then
|
||
' Producer ggf. noch auslaufen lassen
|
||
producer.Flush(TimeSpan.FromSeconds(5))
|
||
Throw New TimeoutException($"DeliveryCallback nach {waitMs} ms nicht eingetroffen.")
|
||
End If
|
||
|
||
' Fehler im Report?
|
||
' (Bei neueren Clients ist r.Error nur in der Exception; bei älteren ggf. r.Status prüfen.)
|
||
If lastReport Is Nothing Then
|
||
Throw New TimeoutException("DeliveryResult leer.")
|
||
End If
|
||
If lastReport.Status <> PersistenceStatus.Persisted Then
|
||
Throw New Exception($"Sende-Status: {lastReport.Status} @ {lastReport.TopicPartitionOffset}")
|
||
End If
|
||
|
||
Return lastReport
|
||
End Using
|
||
End Function
|
||
|
||
'========================
|
||
'== Sync-Wrapper (falls bevorzugt)
|
||
'========================
|
||
'Public Shared Function InsertOrUpdateToKafka(rec As cATEZ_Greenpulse_KafkaDecs) _
|
||
'As DeliveryResult(Of String, String)
|
||
' Return InsertOrUpdateToKafkaAsync(rec).GetAwaiter().GetResult()
|
||
'End Function
|
||
|
||
End Class
|
||
|
||
|
||
Public Class cATEZ_Greenpulse_KafkaDecsBuilder_DAKOSY
|
||
|
||
Public Shared Function BuildByMrn_DAKOSY_Archiv(mrn As String) As cATEZ_Greenpulse_KafkaDecs
|
||
Using con As SqlConnection = SQL.GetNewOpenConnectionAVISO()
|
||
|
||
'con.Open()
|
||
|
||
' Alle Zeilen zur MRN laden (Kopf + Positionen). Kopfinfo ist je Zeile dupliziert.
|
||
Dim sql As String = "
|
||
SELECT
|
||
*
|
||
FROM [tbl_DY_Zollmeldungen_Import]
|
||
WHERE [Registriernummer_MRN] = @mrn
|
||
ORDER BY cast([PositionNo] as int) , cast([Positionen] as int) , [Id];
|
||
"
|
||
Dim dt As New DataTable()
|
||
Using cmd As New SqlCommand(sql, con)
|
||
cmd.Parameters.AddWithValue("@mrn", mrn)
|
||
Using da As New SqlDataAdapter(cmd)
|
||
da.Fill(dt)
|
||
End Using
|
||
End Using
|
||
|
||
If dt.Rows.Count = 0 Then
|
||
Throw New InvalidOperationException("Keine Daten zur angegebenen MRN gefunden: " & mrn)
|
||
End If
|
||
|
||
' 1) Kopf aus der ersten Zeile ableiten
|
||
Dim head = dt.Rows(0)
|
||
|
||
Dim obj As New cATEZ_Greenpulse_KafkaDecs() With {
|
||
.Declaration = New cATEZ_Greenpulse_KafkaDecs.DeclarationNode() With {
|
||
.DeclarationSourceId = SafeStr(head("Bezugsnummer_LRN")),
|
||
.DeclarationNo = SafeStr(head("Registriernummer_MRN")),
|
||
.DeclarationDate = FirstNonEmptyDateStr(head, {"Annahmedatum", "Überlassungsdatum"}),
|
||
.RequestedProcedure = SafeStr(head("Verfahren")),
|
||
.PreviousProcedure = SafeStr(head("Verfahren2")),
|
||
.Goods = New List(Of cATEZ_Greenpulse_KafkaDecs.GoodItem)()
|
||
},
|
||
.Parties = New cATEZ_Greenpulse_KafkaDecs.PartiesNode() With {
|
||
.ImporterIdentificationNumber = FirstNonEmptyStr(head, {"Empfänger_CN_EORI", "UST_ID_Einführer"}),
|
||
.ExporterIdentificationNumber = SafeStr(head("Versender_CZ_EORI")),
|
||
.ReportingDeclarantEORINumber = SafeStr(head("Anmelder_DT_EORI")),
|
||
.TypeOfRepresentation = SafeStr(head("Art_der_Vertretung"))
|
||
},
|
||
.Commercial = New cATEZ_Greenpulse_KafkaDecs.CommercialNode(),
|
||
.ExporterDetails = New cATEZ_Greenpulse_KafkaDecs.ExporterDetailsNode() With {
|
||
.ExporterTitle = SafeStr(head("CZ_Name")),
|
||
.ExporterEmail = "",
|
||
.ExporterPhone = ""
|
||
},
|
||
.ImporterDetails = New cATEZ_Greenpulse_KafkaDecs.ImporterDetailsNode() With {
|
||
.ImporterTitle = SafeStr(head("CN_Name")),
|
||
.ImporterEmail = "",
|
||
.ImporterPhone = "",
|
||
.ImporterCountryCodeOrMemberState = SafeStr(head("CN_Ländercode")),
|
||
.ImporterSubdivision = "",
|
||
.ImporterCity = "",
|
||
.ImporterStreet = "",
|
||
.ImporterStreetAdditional = "",
|
||
.ImporterAddressNumber = "",
|
||
.ImporterPostCode = "",
|
||
.ImporterPoBox = "",
|
||
.ImporterCoordinateLongitudeX = "",
|
||
.ImporterCoordinateLatitudeY = ""
|
||
}
|
||
}
|
||
|
||
' 2) Commercial (Rechnung) – aus Unterlagen N380, falls vorhanden
|
||
Dim invRow As DataRow = dt.AsEnumerable() _
|
||
.Where(Function(r) SafeStr(r("Unterlagenart")).Equals("N380", StringComparison.OrdinalIgnoreCase) _
|
||
AndAlso Not String.IsNullOrWhiteSpace(SafeStr(r("Unterlagennummer")))) _
|
||
.OrderBy(Function(r) SafeInt(r("Id"))) _
|
||
.Cast(Of DataRow)() _
|
||
.DefaultIfEmpty(Nothing) _
|
||
.FirstOrDefault()
|
||
|
||
If invRow IsNot Nothing Then
|
||
obj.Commercial.InvoiceNumbers = SafeStr(invRow("Unterlagennummer"))
|
||
obj.Commercial.InvoiceDate = SafeDateStr(invRow("Unterlagendatum"))
|
||
Else
|
||
obj.Commercial.InvoiceNumbers = ""
|
||
obj.Commercial.InvoiceDate = ""
|
||
End If
|
||
|
||
' 3) Goods je Positionszeile
|
||
For Each row As DataRow In dt.Rows
|
||
Dim commodity As String = SafeStr(row("Warentarifnummer"))
|
||
Dim hasPositionData As Boolean =
|
||
Not String.IsNullOrWhiteSpace(commodity) OrElse
|
||
Not IsNullOrEmpty(row("PositionNo")) OrElse
|
||
Not IsNullOrEmpty(row("Positionen"))
|
||
|
||
If hasPositionData Then
|
||
Dim origin As String = FirstNonEmptyStr(row, {"Ursprung", "Präferenzursprungsland"})
|
||
Dim netMass As String = FirstNonEmptyStr(row, {"Eigenmasse"})
|
||
Dim unit As String = FirstNonEmptyStr(row, {"Eigenmasseeinheit", "Maßeinheit"})
|
||
|
||
Dim gi As New cATEZ_Greenpulse_KafkaDecs.GoodItem() With {
|
||
.CommodityCode = commodity,
|
||
.OriginCountryCode = origin,
|
||
.NetMass = netMass,
|
||
.TypeOfMeasurementUnit = unit,
|
||
.SpecialProcedures = New cATEZ_Greenpulse_KafkaDecs.SpecialProceduresNode() With {
|
||
.MemberStateAutharization = SafeStr(row("DT_Ländercode")), ' Annahme: Anmelder-Land
|
||
.DischargeBillWaiver = "", ' kein Feld vorhanden
|
||
.Authorisation = SafeStr(row("Bewilligungsnummer")),
|
||
.StartTime = "",
|
||
.EndTime = "",
|
||
.Deadline = ""
|
||
}
|
||
}
|
||
|
||
obj.Declaration.Goods.Add(gi)
|
||
End If
|
||
Next
|
||
|
||
Return obj
|
||
End Using
|
||
End Function
|
||
|
||
|
||
|
||
'---------------------------
|
||
' Helper
|
||
'---------------------------
|
||
Private Shared Function SafeStr(value As Object) As String
|
||
If value Is Nothing OrElse Convert.IsDBNull(value) Then Return ""
|
||
Return Convert.ToString(value).Trim()
|
||
End Function
|
||
|
||
Private Shared Function SafeDateStr(value As Object) As String
|
||
If value Is Nothing OrElse Convert.IsDBNull(value) Then Return ""
|
||
Dim dt As DateTime
|
||
If DateTime.TryParse(Convert.ToString(value), dt) Then
|
||
Return dt.ToString("yyyy-MM-dd")
|
||
End If
|
||
Return ""
|
||
End Function
|
||
|
||
Private Shared Function FirstNonEmptyStr(row As DataRow, fields As IEnumerable(Of String)) As String
|
||
For Each f In fields
|
||
If row.Table.Columns.Contains(f) Then
|
||
Dim s = SafeStr(row(f))
|
||
If Not String.IsNullOrWhiteSpace(s) Then Return s
|
||
End If
|
||
Next
|
||
Return ""
|
||
End Function
|
||
|
||
Private Shared Function FirstNonEmptyDateStr(row As DataRow, fields As IEnumerable(Of String)) As String
|
||
For Each f In fields
|
||
If row.Table.Columns.Contains(f) Then
|
||
Dim s = SafeDateStr(row(f))
|
||
If Not String.IsNullOrWhiteSpace(s) Then Return s
|
||
End If
|
||
Next
|
||
Return ""
|
||
End Function
|
||
|
||
Private Shared Function SafeInt(value As Object) As Integer
|
||
If value Is Nothing OrElse Convert.IsDBNull(value) Then Return Integer.MaxValue
|
||
Dim i As Integer
|
||
If Integer.TryParse(Convert.ToString(value), i) Then Return i
|
||
Return Integer.MaxValue
|
||
End Function
|
||
|
||
Private Shared Function IsNullOrEmpty(value As Object) As Boolean
|
||
If value Is Nothing OrElse Convert.IsDBNull(value) Then Return True
|
||
Return String.IsNullOrWhiteSpace(Convert.ToString(value))
|
||
End Function
|
||
|
||
End Class
|