Geting Started

import static org.pgstreaming.PGStreaming.*;
import org.pgstreaming.model.record.Record;
import example.home_insurance.service.EmailService;

public class HIEmailDSLDemo
{
    public static void main(String[] args) throws Exception
    {
        // enable metrics logging
        // PGStreamMetrics.enableCSVReporter();
        // email service
        EmailService emailer = new EmailService();
        // consume the stream
        pgStreaming()
        .source()
        .name("policies")
        .ofRecord()
        .from("jdbc:postgresql://127.0.0.1:5414/streaming", "rep", "")
        .slot("welcome_email")
        .wal2json()
        .includeTable("event.policy")
        .then()
        .consume((Record key, Record value) -> {
            emailer.sendWelcomeEmail(
                    value.get("customer_email"), 
                    value.get("customer_name"), 
                    value.get("policy_reference"), 
                    value.get("post_code")
            );
        })
        .register()
        .launch();
    }
}