Press n or j to go to the next uncovered block, b, p or k for the previous block.
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 | 1x 1x 1x 1x 1x 1x 1x 5x 5x 5x 5x 5x 5x 5x 5x 1x 1x 5x 5x 5x 5x 5x 5x 5x 5x 5x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x | import {adapter} from '@feasibleone/blong';
import {randomUUID} from 'node:crypto';
type KafkaMessage = {topic: string; partition: number; value: Buffer};
class KafkaJsonCodec {
encode(data: Record<string, unknown>, $meta: Record<string, unknown>) {
if (!$meta.trace) $meta.trace = randomUUID();
const {topic = 'blong-integration', partition = 0, ...rest} = data;
return {
topic,
partition,
value: Buffer.from(JSON.stringify({...rest, _trace: $meta.trace})),
};
}
decode(msg: KafkaMessage, $meta: Record<string, unknown>) {
let parsed: Record<string, unknown> = {};
try {
parsed = JSON.parse(msg.value.toString()) as Record<string, unknown>;
} catch {
parsed = {payload: msg.value.toString()};
}
const {_trace, ...rest} = parsed;
if (_trace) $meta.trace = _trace;
$meta.mtid = 'response';
return {sent: true, topic: msg.topic, partition: msg.partition, ...rest};
}
}
export default adapter<{
connection: {
'metadata.broker.list': string;
[key: string]: unknown;
};
consume: {
topics: string[];
groupId: string;
};
codec: typeof KafkaJsonCodec;
}>(() => ({
extends: 'adapter.kafka',
activation: {
default: {
connection: {
'client.id': 'blong-test',
'metadata.broker.list': 'localhost:9092',
'security.protocol': 'plaintext',
},
consume: {
topics: ['blong-integration'],
groupId: 'blong-integration-group',
},
codec: KafkaJsonCodec,
namespace: 'broker',
},
},
}));
|