Geting Started
import static org.pgstreaming.PGStreaming.*;
import org.pgstreaming.model.record.Record;
import example.home_insurance.service.EmailService;
public class HIEmailDSLDemo
{
public static void main(String[] args) throws Exception
{
// enable metrics logging
// PGStreamMetrics.enableCSVReporter();
// email service
EmailService emailer = new EmailService();
// consume the stream
pgStreaming()
.source()
.name("policies")
.ofRecord()
.from("jdbc:postgresql://127.0.0.1:5414/streaming", "rep", "")
.slot("welcome_email")
.wal2json()
.includeTable("event.policy")
.then()
.consume((Record key, Record value) -> {
emailer.sendWelcomeEmail(
value.get("customer_email"),
value.get("customer_name"),
value.get("policy_reference"),
value.get("post_code")
);
})
.register()
.launch();
}
}