' Requires NuGet: ' - Confluent.Kafka ' - Newtonsoft.Json ' Target framework: .NET Framework 4.8 oder .NET 6/8 Imports System.Data.SqlClient Imports System.Threading Imports System.Windows.Forms Imports Confluent.Kafka Imports Newtonsoft.Json ''' ''' Einfaches Invoice-Payload-Objekt + Kafka-Producer. ''' JSON-Struktur: { "documents": [ { ... } ] } ''' Topic: dev.greenpulse.invoicedata.v1 ''' Public Class cATEZ_Greenpulse_KafkaInvoices '======================== '== Kafka: Konfiguration (Klassenebene) '======================== Public Shared BootstrapServers As String = "192.168.85.250:9092" Public Shared TopicName As String = "dev.greenpulse.invoicedata.v1" ' Falls SASL/TLS benötigt: Public Shared UseSasl As Boolean = False Public Shared SaslUsername As String = "" Public Shared SaslPassword As String = "" Public Shared SecurityProtocolSetting As SecurityProtocol = SecurityProtocol.Plaintext Public Shared SaslMechanismSetting As SaslMechanism = SaslMechanism.Plain Private Const KEY_VERSION As String = "v1" Private Const SEP_PIPE As Char = "|"c '======================== '== Unique-Key-Ermittlung (z. B. nach Country/System/MRN) '======================== Public Shared Function GetUniqueKey_Pipe(country As String, system As String, mrn As String) As String Dim c = (country).ToUpperInvariant() Dim s = (system).ToUpperInvariant() Dim m = (mrn).ToUpperInvariant() Return String.Join(SEP_PIPE, New String() {KEY_VERSION, c, s, m}) End Function '======================== '== Datenobjekt '======================== Public Property Documents As List(Of DocumentNode) Public Class DocumentNode Public Property Reference As String Public Property DocType As String Public Property MimeType As String Public Property Blob As String ' Base64 End Class '======================== '== Serialisierung '======================== Public Function ToJson(Optional pretty As Boolean = True) As String Dim format = If(pretty, Formatting.Indented, Formatting.None) Return JsonConvert.SerializeObject(Me, format) End Function '======================== '== Beispielbefüllung '======================== Public Shared Function BuildDemo() As cATEZ_Greenpulse_KafkaInvoices Return New cATEZ_Greenpulse_KafkaInvoices() With { .Documents = New List(Of DocumentNode) From { New DocumentNode() With { .Reference = "Handelsrechnung.pdf", .DocType = "invoice", .MimeType = "application/octet-stream", .Blob = "" ' Base64-String } } } End Function '======================== '== Kafka: Insert/Update (per Message-Key) '======================== Public Shared Function InsertOrUpdateToKafkaSync_Bool( rec As cATEZ_Greenpulse_KafkaInvoices, unique_KEY As String, Optional waitMs As Integer = 30000 ) As Boolean Try Dim result = InsertOrUpdateToKafkaSync(rec, unique_KEY, waitMs) Return True Catch ex As Exception MessageBox.Show("Fehler beim Senden an Kafka: " & ex.Message, "Fehler", MessageBoxButtons.OK, MessageBoxIcon.Error) Return False End Try End Function Public Shared Function InsertOrUpdateToKafkaSync( rec As cATEZ_Greenpulse_KafkaInvoices, unique_KEY As String, Optional waitMs As Integer = 30000 ) As DeliveryResult(Of String, String) Dim cfg As New ProducerConfig With { .BootstrapServers = BootstrapServers, .EnableIdempotence = True, .Acks = Acks.All, .MaxInFlight = 5, .MessageTimeoutMs = Math.Max(waitMs, 60000), .RequestTimeoutMs = 30000, .CompressionType = Confluent.Kafka.CompressionType.Zstd, .MessageMaxBytes = 20971520, ' ≈ 20 MB .EnableDeliveryReports = True, .AllowAutoCreateTopics = True } Using producer = New ProducerBuilder(Of String, String)(cfg).Build() Dim key = unique_KEY Dim msg = New Message(Of String, String) With { .Key = key, .Value = rec.ToJson(False) } Dim done As New Threading.ManualResetEventSlim(False) Dim lastReport As DeliveryResult(Of String, String) = Nothing producer.Produce(TopicName, msg, Sub(r) lastReport = r done.Set() End Sub) If Not done.Wait(waitMs) Then producer.Flush(TimeSpan.FromSeconds(5)) Throw New TimeoutException($"DeliveryCallback nach {waitMs} ms nicht eingetroffen.") End If If lastReport Is Nothing Then Throw New TimeoutException("DeliveryResult leer.") End If If lastReport.Status <> PersistenceStatus.Persisted Then Throw New Exception($"Sende-Status: {lastReport.Status} @ {lastReport.TopicPartitionOffset}") End If Return lastReport End Using End Function End Class Public Class cATEZ_Greenpulse_KafkaInvoicesBuilder_DAKOSY ''' ''' Baut ein reines Invoice-Dokument-Payload für Greenpulse ''' aus dem DAKOSY-Archiv zur angegebenen MRN. ''' Public Shared Function BuildByMrn_DAKOSY_Archiv(mrn As String) As cATEZ_Greenpulse_KafkaInvoices Using con As SqlConnection = SQL.GetNewOpenConnectionAVISO() Dim sql As String = " SELECT * FROM [tbl_DY_Zollmeldungen_Import] WHERE [Registriernummer_MRN] = @mrn ORDER BY cast([PositionNo] as int), cast([Positionen] as int), [Id]; " Dim dt As New DataTable() Using cmd As New SqlCommand(sql, con) cmd.Parameters.AddWithValue("@mrn", mrn) Using da As New SqlDataAdapter(cmd) da.Fill(dt) End Using End Using If dt.Rows.Count = 0 Then Throw New InvalidOperationException("Keine Daten zur angegebenen MRN gefunden: " & mrn) End If ' Nur Dokumente transportieren Dim obj As New cATEZ_Greenpulse_KafkaInvoices() With { .Documents = New List(Of cATEZ_Greenpulse_KafkaInvoices.DocumentNode)() } ' --- Dokumente/Anhänge aus Aviso-Struktur übernehmen --- Dim sqlHelper As New VERAG_PROG_ALLGEMEIN.SQL ' Achtung: dy_BezugsNr ggf. passend setzen/anpassen Dim sendungsIdObj As Object = sqlHelper.getValueTxtBySql( "SELECT dy_SendungsId FROM [tblDakosy_Zollanmeldungen] WHERE dy_BezugsNr=''", "FMZOLL", , , Nothing) Dim sendungsId As Integer If sendungsIdObj IsNot Nothing AndAlso Integer.TryParse(sendungsIdObj.ToString(), sendungsId) AndAlso sendungsId > 0 Then Dim anhListe As New List(Of cAvisoAnhaenge) cAvisoAnhaenge.LOAD_LIST_BySendung(anhListe, sendungsId) For Each doc In anhListe Select Case doc.anh_Art Case "Rechnung", "eFatura" Dim pfad As String = VERAG_PROG_ALLGEMEIN.cDATENSERVER.GET_PDFPath_BY_DocID(doc.anh_docId) Dim dateiBytes As Byte() = System.IO.File.ReadAllBytes(pfad) Dim d As New cATEZ_Greenpulse_KafkaInvoices.DocumentNode With { .Reference = doc.anh_Name, .DocType = "invoice", .MimeType = GuessMimeTypeFromNumber(doc.anh_Typ), .Blob = Convert.ToBase64String(dateiBytes) } obj.Documents.Add(d) End Select Next End If Return obj End Using End Function '--------------------------- ' Helper '--------------------------- Private Shared Function SafeStr(value As Object) As String If value Is Nothing OrElse Convert.IsDBNull(value) Then Return "" Return Convert.ToString(value).Trim() End Function Public Shared Function GuessMimeTypeFromNumber(num As Object) As String Dim s As String = SafeStr(num).ToLowerInvariant() If s.EndsWith(".pdf") OrElse s = "pdf" Then Return "application/pdf" End If If s.EndsWith(".jpg") OrElse s.EndsWith(".jpeg") OrElse s = "jpg" OrElse s = "jpeg" Then Return "image/jpeg" End If If s.EndsWith(".png") OrElse s = "png" Then Return "image/png" End If Return "application/octet-stream" End Function End Class