All files / blong-int-adapter/kafka/adapter broker.ts

96.55% Statements 56/58
75% Branches 3/4
100% Functions 2/2
96.55% Lines 56/58

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 591x 1x 1x 1x 1x 1x 1x 5x 5x 5x 5x 5x 5x 5x 5x 1x 1x 5x 5x 5x 5x     5x 5x 5x 5x 5x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x  
import {adapter} from '@feasibleone/blong';
import {randomUUID} from 'node:crypto';
 
type KafkaMessage = {topic: string; partition: number; value: Buffer};
 
class KafkaJsonCodec {
    encode(data: Record<string, unknown>, $meta: Record<string, unknown>) {
        if (!$meta.trace) $meta.trace = randomUUID();
        const {topic = 'blong-integration', partition = 0, ...rest} = data;
        return {
            topic,
            partition,
            value: Buffer.from(JSON.stringify({...rest, _trace: $meta.trace})),
        };
    }
 
    decode(msg: KafkaMessage, $meta: Record<string, unknown>) {
        let parsed: Record<string, unknown> = {};
        try {
            parsed = JSON.parse(msg.value.toString()) as Record<string, unknown>;
        } catch {
            parsed = {payload: msg.value.toString()};
        }
        const {_trace, ...rest} = parsed;
        if (_trace) $meta.trace = _trace;
        $meta.mtid = 'response';
        return {sent: true, topic: msg.topic, partition: msg.partition, ...rest};
    }
}
 
export default adapter<{
    connection: {
        'metadata.broker.list': string;
        [key: string]: unknown;
    };
    consume: {
        topics: string[];
        groupId: string;
    };
    codec: typeof KafkaJsonCodec;
}>(() => ({
    extends: 'adapter.kafka',
    activation: {
        default: {
            connection: {
                'client.id': 'blong-test',
                'metadata.broker.list': 'localhost:9092',
                'security.protocol': 'plaintext',
            },
            consume: {
                topics: ['blong-integration'],
                groupId: 'blong-integration-group',
            },
            codec: KafkaJsonCodec,
            namespace: 'broker',
        },
    },
}));