Изи ишью: 5 минут, зашли и вышли
Нужно просто в тестовых клиентах отфильтровывать тех подписчиков, что еще не запущены.
Нужно подправить все места включения
https://github.com/ag2ai/faststream/issues/2053
#faststream
Нужно просто в тестовых клиентах отфильтровывать тех подписчиков, что еще не запущены.
Нужно подправить все места включения
self.broker.subscribers и добавить соответствующий тестhttps://github.com/ag2ai/faststream/issues/2053
#faststream
GitHub
Bug: Subscribers registered in runtime do not clean up after close · Issue #2053 · ag2ai/faststream
I am trying to use the new dynamic subscription from https://github.com/airtai/faststream/releases/tag/0.5.0 (see point number 9). While adding the dynamic subscription works well, I have issues cl...
🚀 New issue to ag2ai/faststream by @jsonvot
📝 Bug: The coexistence issue between URL and virtualhost (#2652)
Describe the bug
In version v0.5.33, the first method works properly, and the trailing slash (/) at the end of the URL cannot be omitted. However, in versions >=0.5.34, due to additional handling of virtualhost, only parameter-based formats like 2, 3, 4 and 5 are supported.
I believe method 5 is the most intuitive and should be handled correctly, but it is currently treated as an invalid format. Using this method will result in the following error:
🖼️Image
Environment
faststream[rabbit]>=0.5.34
#bug #good_first_issue #faststream #ag2ai
sent via relator
📝 Bug: The coexistence issue between URL and virtualhost (#2652)
Describe the bug
import asyncio
from faststream.rabbit import RabbitBroker
async def pub():
broker = RabbitBroker('amqp://guest:guest@localhost:5672/', virtualhost='/domestic-aed') # 1
#broker = RabbitBroker('amqp://guest:guest@localhost:5672', virtualhost='//domestic-aed') # 2
#broker = RabbitBroker('amqp://guest:guest@localhost:5672/', virtualhost='//domestic-aed') # 3
#broker = RabbitBroker('amqp://guest:guest@localhost:5672//domestic-aed') # 4
#broker = RabbitBroker('amqp://guest:guest@localhost:5672', virtualhost='/domestic-aed') # 5
async with broker:
await broker.publish(
"Hi!",
queue="test-queue",
exchange="test-exchange"
)
asyncio.run(pub())
In version v0.5.33, the first method works properly, and the trailing slash (/) at the end of the URL cannot be omitted. However, in versions >=0.5.34, due to additional handling of virtualhost, only parameter-based formats like 2, 3, 4 and 5 are supported.
I believe method 5 is the most intuitive and should be handled correctly, but it is currently treated as an invalid format. Using this method will result in the following error:
🖼️Image
Environment
faststream[rabbit]>=0.5.34
#bug #good_first_issue #faststream #ag2ai
sent via relator
🚀 New issue to ag2ai/faststream by @Sehat1137
📝 Skip relator notification from the dependabot (#2665)
Improve pipeline so that it is not triggered by events from dependabot.
https://github.com/ag2ai/faststream/blob/main/.github/workflows/relator.yaml#L19
#good_first_issue #github_actions #faststream #ag2ai
sent via relator
📝 Skip relator notification from the dependabot (#2665)
Improve pipeline so that it is not triggered by events from dependabot.
https://github.com/ag2ai/faststream/blob/main/.github/workflows/relator.yaml#L19
#good_first_issue #github_actions #faststream #ag2ai
sent via relator
🤔3
🚀 New issue to ag2ai/faststream by @GrigoriyKuzevanov
📝 Bug: min_idle_time ignored when group and consumer are specified (#2678)
Describe the bug
When a StreamSub has both 'group' and 'consumer', and 'min_idle_time' specified, Faststream uses 'XREADGROUP' instead of 'XAUTOCALIM'
How to reproduce
Include source code:
Redis MONITOR output shows:
Expected behavior
Observed behavior
I suppose that a root cause in
Or i just misunderstand the logic.
Environment
Running FastStream 0.6.3 with CPython 3.12.3 on Linux
#bug #good_first_issue #faststream #ag2ai
sent via relator
📝 Bug: min_idle_time ignored when group and consumer are specified (#2678)
Describe the bug
When a StreamSub has both 'group' and 'consumer', and 'min_idle_time' specified, Faststream uses 'XREADGROUP' instead of 'XAUTOCALIM'
How to reproduce
Include source code:
from faststream import FastStream
from faststream.redis import RedisBroker, StreamSub
broker = RedisBroker("redis://localhost:6379")
@broker.subscriber(
stream=StreamSub(
"orders",
group="processors",
consumer="claimer",
min_idle_time=10000, # Should trigger XAUTOCLAIM
)
)
async def claiming_handler(msg):
print("Should use XAUTOCLAIM, but uses XREADGROUP")
app = FastStream(broker)
Redis MONITOR output shows:
XREADGROUP GROUP processors claimer BLOCK 100 STREAMS orders >
Expected behavior
XAUTOCLAIM orders processors claimer 10000 0-0 COUNT 1
Observed behavior
I suppose that a root cause in
faststream/redis/subscriber/use_cases/stream_subscriber, method _StreamHandlerMixin.start():if stream.group and stream.consumer: # ← Checked FIRST
# Uses XREADGROUP
...
elif self.stream_sub.min_idle_time is None:
# Uses XREAD
...
else:
# Uses XAUTOCLAIM ← Never reached when group is set!
...
Or i just misunderstand the logic.
Environment
Running FastStream 0.6.3 with CPython 3.12.3 on Linux
#bug #good_first_issue #faststream #ag2ai
sent via relator
🚀 New issue to ag2ai/faststream by @gaby
📝 bug: Usage of custom logger results in no logs (#2677)
Is your feature request related to a problem? Please describe.
The built-logger is configured to always add colors, even when passing a logger to faststream. This is hardcoded here https://github.com/ag2ai/faststream/blob/main/faststream/_internal/logger/logging.py#L80 This affects systems collecting logs from faststream hosts. This makes loga generated by faststream to show in raw text as
Describe the solution you'd like
Make the
Describe alternatives you've considered
Writing a custom log parser.
#enhancement #good_first_issue #faststream #ag2ai
sent via relator
📝 bug: Usage of custom logger results in no logs (#2677)
Is your feature request related to a problem? Please describe.
The built-logger is configured to always add colors, even when passing a logger to faststream. This is hardcoded here https://github.com/ag2ai/faststream/blob/main/faststream/_internal/logger/logging.py#L80 This affects systems collecting logs from faststream hosts. This makes loga generated by faststream to show in raw text as
"\033[36mDEBUG\033[0m" instead of DEBUG.Describe the solution you'd like
Make the
use_colors param configurable instead of a hardcoded value.Describe alternatives you've considered
Writing a custom log parser.
#enhancement #good_first_issue #faststream #ag2ai
sent via relator
🚀 New issue to ag2ai/faststream by @nectarindev
📝 feat: merge `Broker(context=...)` and `FastStream(context=...)` at broker-level (#2693)
I recently migrated my project to faststream 0.6. I was very interested in how I could add my dependencies to the context. Prior to version 0.6, I did something like this:
I launched the broker as part of my application in lifespan without using the FastStream class.
For version 0.6, I saw examples where it was suggested to pass the context to FastStream, but that solution did not suit me. I discovered that the broker also accepts context, and that solves my problem:
But I also discovered that if I create a FastStream instance, its context will be used, even though I didn't use it to start the broker.
I'm not sure that's normal behavior. It would make much more sense if only the broker's dependency were available.
⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯
Running FastStream 0.6.3 with CPython 3.12.4 on Linux
#enhancement #good_first_issue #core #faststream #ag2ai
sent via relator
📝 feat: merge `Broker(context=...)` and `FastStream(context=...)` at broker-level (#2693)
I recently migrated my project to faststream 0.6. I was very interested in how I could add my dependencies to the context. Prior to version 0.6, I did something like this:
from faststream.annotations import ContextRepo
from faststream.kafka import KafkaBroker
from faststream.utils.context import context
broker = KafkaBroker()
@broker.subscriber("my_topic", group_id="my_group")
async def handle(
context: ContextRepo,
):
print("dependency: ", context.get("dependency")) # 42
async def lifespan(*args, **kwargs):
context.set_global("dependency", 42)
await broker.start()
try:
yield
finally:
await broker.stop()
I launched the broker as part of my application in lifespan without using the FastStream class.
For version 0.6, I saw examples where it was suggested to pass the context to FastStream, but that solution did not suit me. I discovered that the broker also accepts context, and that solves my problem:
broker = KafkaBroker(context=ContextRepo({"dependency": 42}))
...
async def lifespan(*args, **kwargs):
await broker.start()
try:
yield
finally:
await broker.stop()
But I also discovered that if I create a FastStream instance, its context will be used, even though I didn't use it to start the broker.
from fastapi import FastAPI
from faststream import ContextRepo, FastStream
from faststream.kafka import KafkaBroker
broker = KafkaBroker(context=ContextRepo({"broker_dependency": 2}))
app = FastStream(broker, context=ContextRepo({"application_dependency": 1}))
@broker.subscriber("my_topic", group_id="my_group")
async def handle(
context: ContextRepo,
):
print("broker_dependency: ", context.get("broker_dependency")) # None
print("application_dependency: ", context.get("application_dependency")) # 1
async def lifespan(*args, **kwargs):
await broker.start()
try:
yield
finally:
await broker.stop()
asgi = FastAPI(lifespan=lifespan)
I'm not sure that's normal behavior. It would make much more sense if only the broker's dependency were available.
⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯
Running FastStream 0.6.3 with CPython 3.12.4 on Linux
#enhancement #good_first_issue #core #faststream #ag2ai
sent via relator
❤1
🚀 New issue to ag2ai/faststream by @esinevgeny
📝 Bug: ValueError while calling redis client.xautoclaim (#2736)
Describe the bug
ValueError while calling xautoclaim
How to reproduce
Use the next route and launch the faststream, if there are messages with PENDING status in stream the issue will be observed
Actual
Traceback (most recent call last):
File "/venv/lib64/python3.12/site-packages/faststream/redis/subscriber/usecases/basic.py", line 91, in _consume
await self._get_msgs(*args)
File "/venv/lib64/python3.12/site-packages/faststream/redis/subscriber/usecases/stream_subscriber.py", line 341, in _get_msgs
for stream_name, msgs in await read(self.last_id):
^^^^^^^^^^^^^^^^^^^^^^^^
File "/venv/lib64/python3.12/site-packages/faststream/redis/subscriber/usecases/stream_subscriber.py", line 140, in read
(next_id, messages, _) = stream_message
^^^^^^^^^^^^^^^^^^^^^^
ValueError: not enough values to unpack (expected 3, got 2)
Environment
Running FastStream 0.6.5 with CPython 3.12.11 on Linux
redis 7.1.0
Also checked on redis 5.3.0
#bug #good_first_issue #faststream #ag2ai
sent via relator
📝 Bug: ValueError while calling redis client.xautoclaim (#2736)
Describe the bug
ValueError while calling xautoclaim
How to reproduce
Use the next route and launch the faststream, if there are messages with PENDING status in stream the issue will be observed
@broker.subscriber(
stream=StreamSub("test:test", group="workers",
consumer="worker",
min_idle_time=5000)
)
async def worker(msg: RedisStreamMessage, redis: Redis):
logger.error(f"Claim {msg.correlation_id}")
await msg.ack(redis=redis, group="workers")
Actual
Traceback (most recent call last):
File "/venv/lib64/python3.12/site-packages/faststream/redis/subscriber/usecases/basic.py", line 91, in _consume
await self._get_msgs(*args)
File "/venv/lib64/python3.12/site-packages/faststream/redis/subscriber/usecases/stream_subscriber.py", line 341, in _get_msgs
for stream_name, msgs in await read(self.last_id):
^^^^^^^^^^^^^^^^^^^^^^^^
File "/venv/lib64/python3.12/site-packages/faststream/redis/subscriber/usecases/stream_subscriber.py", line 140, in read
(next_id, messages, _) = stream_message
^^^^^^^^^^^^^^^^^^^^^^
ValueError: not enough values to unpack (expected 3, got 2)
Environment
Running FastStream 0.6.5 with CPython 3.12.11 on Linux
redis 7.1.0
Also checked on redis 5.3.0
#bug #good_first_issue #faststream #ag2ai
sent via relator
🚀 New issue to ag2ai/faststream by @yann-combarnous
📝 Bug: AsyncAPI documentation fails when Confluent uses oauth bearer authentication (#2774)
Describe the bug
When using Confluent Kafka with oauthbearer authentication, the generated AsyncAPI schema is:
An empty $ref: "" causes the AsyncAPI React component to attempt file resolution via readFile. This is a FastStream bug in schema generation.
How to reproduce
Include source code:
Expected behavior
AsyncAPI documentation is correctly generated.
Observed behavior
It fails with a file cannot be read error.
Screenshots
If applicable, attach screenshots to help illustrate the problem.
Environment
Running FastStream 0.6.5 with CPython 3.13.11 on Darwin
Additional context
#bug #good_first_issue #faststream #ag2ai
sent via relator
📝 Bug: AsyncAPI documentation fails when Confluent uses oauth bearer authentication (#2774)
Describe the bug
When using Confluent Kafka with oauthbearer authentication, the generated AsyncAPI schema is:
"securitySchemes":{"oauthbearer":{"type":"oauth2","$ref":""}}
An empty $ref: "" causes the AsyncAPI React component to attempt file resolution via readFile. This is a FastStream bug in schema generation.
How to reproduce
Include source code:
from faststream.confluent import KafkaBroker
broker = KafkaBroker(
config={ ...config... },
security=SASLOAuthBearer(use_ssl=True)
)
...
Expected behavior
AsyncAPI documentation is correctly generated.
Observed behavior
It fails with a file cannot be read error.
Screenshots
If applicable, attach screenshots to help illustrate the problem.
Environment
Running FastStream 0.6.5 with CPython 3.13.11 on Darwin
Additional context
#bug #good_first_issue #faststream #ag2ai
sent via relator
🚀 New issue to ag2ai/faststream by @lesnik512
📝 Feature: Static membership for aiokafka broker (group_instance_id) (#2782)
Hi!
Seems like aiokafka broker missing group_instance_id argument which required for static membership to work.
https://aiokafka.readthedocs.io/en/stable/api.html
For confluent kafka it exists #2606
#enhancement #good_first_issue #faststream #ag2ai
sent via relator
📝 Feature: Static membership for aiokafka broker (group_instance_id) (#2782)
Hi!
Seems like aiokafka broker missing group_instance_id argument which required for static membership to work.
https://aiokafka.readthedocs.io/en/stable/api.html
group_instance_id (str or None) – name of the group instance ID used for static membership (KIP-345)
For confluent kafka it exists #2606
#enhancement #good_first_issue #faststream #ag2ai
sent via relator
🚀 New issue to ag2ai/faststream by @hmvp
📝 Bug: topic patterns are not correctly documented (#2804)
Describe the bug
When using a pattern for a kafka topic the generated asyncapi spec (and docs) does not include that handler
How to reproduce
With something along the lines of:
Expected behavior
The subscriber is included in the asyncapi spec and docs. As far as I can see the asyncapi spec actually allows for this even when using Path variables.
Observed behavior
The asyncapi docs don't contain that subscriber.
Screenshots
Environment
Running FastStream 0.6.7 with CPython 3.14.0 on Linux
Additional context
#bug #good_first_issue #faststream #ag2ai
sent via relator
📝 Bug: topic patterns are not correctly documented (#2804)
Describe the bug
When using a pattern for a kafka topic the generated asyncapi spec (and docs) does not include that handler
How to reproduce
With something along the lines of:
broker = KafkaBroker(...)
router = KafkaRouter()
@router.subscriber(pattern="some.wildcard.topic.*")
def handle_event(event: Event): ...
broker.include_router(router)
app = AsgiFastStream(
broker,
asyncapi_path="/docs",
)
Expected behavior
The subscriber is included in the asyncapi spec and docs. As far as I can see the asyncapi spec actually allows for this even when using Path variables.
Observed behavior
The asyncapi docs don't contain that subscriber.
Screenshots
Environment
Running FastStream 0.6.7 with CPython 3.14.0 on Linux
Additional context
#bug #good_first_issue #faststream #ag2ai
sent via relator
🚀 New issue to ag2ai/faststream by @dumpler
📝 Bug: Logger not properly passed to Confluent Kafka Producer and AdminClient (#2691)
Describe the bug
There are two related issues with logger configuration in the Confluent Kafka components:
1.
Late Logger Setup in AsyncConfluentProducer
In faststream/confluent/helpers/client.py at line 46, the Producer object is created before the _setup() method is called for loger_state. This results in the Producer receiving a NoSetLoggerObject instead of the intended logger passed during initialization.
2.
Missing Logger in AdminClient
The AdminClient in AdminService does not accept a logger parameter, even though it should. Currently, only the configuration is passed, leaving the AdminClient without proper logging.
How to reproduce
Include source code:
Expected behavior
1. The Producer in AsyncConfluentProducer should use the logger passed to KafkaRouter.
2. The AdminClient should inherit the same logger as other components.
Observed behavior
1. The Producer uses NoSetLoggerObject instead of the provided logger.
2. The AdminClient lacks logger configuration, leading to potential silent failures or inadequate logging.
Environment
Running FastStream 0.6.3 with CPython 3.13.1 on Linux
#bug #good_first_issue #faststream #ag2ai
sent via relator
📝 Bug: Logger not properly passed to Confluent Kafka Producer and AdminClient (#2691)
Describe the bug
There are two related issues with logger configuration in the Confluent Kafka components:
1.
Late Logger Setup in AsyncConfluentProducer
In faststream/confluent/helpers/client.py at line 46, the Producer object is created before the _setup() method is called for loger_state. This results in the Producer receiving a NoSetLoggerObject instead of the intended logger passed during initialization.
2.
Missing Logger in AdminClient
The AdminClient in AdminService does not accept a logger parameter, even though it should. Currently, only the configuration is passed, leaving the AdminClient without proper logging.
How to reproduce
Include source code:
import logging
import uvicorn
from fastapi import FastAPI
from faststream.confluent.fastapi import KafkaRouter
logger = logging.getLogger("faststream")
router = KafkaRouter(
bootstrap_servers="kafka:9092",
enable_idempotence=True,
allow_auto_create_topics=False,
schema_url="/asyncapi",
include_in_schema=True,
logger=logger,
)
app = FastAPI()
app.include_router(router)
uvicorn.run(app, host="0.0.0.0", port=8000)
Expected behavior
1. The Producer in AsyncConfluentProducer should use the logger passed to KafkaRouter.
2. The AdminClient should inherit the same logger as other components.
Observed behavior
1. The Producer uses NoSetLoggerObject instead of the provided logger.
2. The AdminClient lacks logger configuration, leading to potential silent failures or inadequate logging.
Environment
Running FastStream 0.6.3 with CPython 3.13.1 on Linux
#bug #good_first_issue #faststream #ag2ai
sent via relator
🚀 New issue to ag2ai/faststream by @ce1ebrimbor
📝 Feature: Support broker-level ack_policy default with per-subscriber override (#2826)
Problem
Currently
There is no way to set a default
Proposed solution
Add an
The resolution order would be: subscriber-level > broker-level > built-in default (
Why this matters
• Safety:
• DRY: Services with 10+ subscribers shouldn't need to repeat
• Consistency with other broker-level settings:
#enhancement #good_first_issue #faststream #ag2ai
sent via relator
📝 Feature: Support broker-level ack_policy default with per-subscriber override (#2826)
Problem
Currently
ack_policy can only be set per-subscriber:@broker.subscriber("orders.topic", ack_policy=AckPolicy.NACK_ON_ERROR)
async def handle_order(msg: OrderMessage) -> None:
...
There is no way to set a default
ack_policy at the broker (or router) level. In practice, most services want the same policy across all subscribers — typically NACK_ON_ERROR for at-least-once delivery with DLQ. This forces every @broker.subscriber() call to repeat the same ack_policy= argument, which is error-prone: forgetting it on a single subscriber silently falls back to ACK_FIRST (at-most-once), which can cause silent message loss.Proposed solution
Add an
ack_policy parameter to KafkaBroker (and equivalently to RabbitBroker, NatsBroker, etc.) that sets the default for all subscribers registered on that broker. Individual subscribers can still override it.broker = KafkaBroker("localhost:9092", ack_policy=AckPolicy.NACK_ON_ERROR)
# Inherits NACK_ON_ERROR from broker
@broker.subscriber("orders.topic")
async def handle_order(msg: OrderMessage) -> None:
...
# Overrides to ACK for this specific subscriber
@broker.subscriber("notifications.topic", ack_policy=AckPolicy.ACK)
async def handle_notification(msg: NotificationMessage) -> None:
...
The resolution order would be: subscriber-level > broker-level > built-in default (
ACK_FIRST).Why this matters
• Safety:
ACK_FIRST as a silent default is dangerous for services that use DLQ or need at-least-once delivery. A broker-level default lets teams enforce their delivery guarantee in one place.• DRY: Services with 10+ subscribers shouldn't need to repeat
ack_policy=AckPolicy.NACK_ON_ERROR on every one.• Consistency with other broker-level settings:
decoder, middlewares, security, and logger are all broker-level defaults that subscribers inherit. ack_policy is the notable exception.#enhancement #good_first_issue #faststream #ag2ai
sent via relator
🚀 New issue to ag2ai/faststream by @BrikozO
📝 Bug: (#2842)
#bug #good_first_issue #faststream #ag2ai
sent via relator
📝 Bug: (#2842)
#bug #good_first_issue #faststream #ag2ai
sent via relator
🚀 New issue to ag2ai/faststream by @zueve
📝 Bug: (#2868)
Describe the bug
Can't forward
How to reproduce
Include source code:
And/Or steps to reproduce the behaviour:
1. Create a KafkaBroker.
2. Try to pass client_rack to the broker constructor.
3. Observe that the parameter is not accepted.
4. There is no alternative way to configure it via subscriber(...) either.
Expected behavior
KafkaBroker should allow passing client_rack (or generic consumer kwargs) and forward it to the underlying aiokafka.AIOKafkaConsumer.
Observed behavior
client_rack is not supported in KafkaBroker constructor, nor in subscriber configuration, so it cannot be passed to aiokafka.
Screenshots
N/A.
Environment
Running FastStream 0.6.7 with CPython 3.14.5 on Linux
Additional context
aiokafka.AIOKafkaConsumer supports client_rack since 0.14.0 version
#bug #good_first_issue #aiokafka #faststream #ag2ai
sent via relator
📝 Bug: (#2868)
Describe the bug
Can't forward
client_rack to aiokafka consumer through FastStream Kafka API.How to reproduce
Include source code:
from faststream import FastStream
from faststream.kafka import KafkaBroker
# Attempt to configure at broker level
broker = KafkaBroker(
"localhost:9092",
client_rack="us-east-1a", # not supported
)
app = FastStream(broker)
And/Or steps to reproduce the behaviour:
1. Create a KafkaBroker.
2. Try to pass client_rack to the broker constructor.
3. Observe that the parameter is not accepted.
4. There is no alternative way to configure it via subscriber(...) either.
Expected behavior
KafkaBroker should allow passing client_rack (or generic consumer kwargs) and forward it to the underlying aiokafka.AIOKafkaConsumer.
Observed behavior
client_rack is not supported in KafkaBroker constructor, nor in subscriber configuration, so it cannot be passed to aiokafka.
Screenshots
N/A.
Environment
Running FastStream 0.6.7 with CPython 3.14.5 on Linux
Additional context
aiokafka.AIOKafkaConsumer supports client_rack since 0.14.0 version
#bug #good_first_issue #aiokafka #faststream #ag2ai
sent via relator
🚀 New issue to ag2ai/faststream by @dzeveloper
📝 Feature: Consumer-Only Mode for KafkaBroker (#2879)
When deploying a subscriber-only service, it's common to provision Kafka credentials scoped strictly to
Proposal:
#enhancement #good_first_issue #confluent #aiokafka #faststream #ag2ai
sent via relator
📝 Feature: Consumer-Only Mode for KafkaBroker (#2879)
When deploying a subscriber-only service, it's common to provision Kafka credentials scoped strictly to
READ + DESCRIBE ACLs on specific topics. Today, KafkaBroker always starts a producer during broker.start(), regardless of whether any @broker.publisher is registered. This causes the producer to attempt a connection using consumer-only credentials, which fails or raises errors at the broker/ACL level.Proposal:
# Option A: broker-level flag
broker = KafkaBroker("localhost:9092", consumer_only=True)
# Option B: skip producer if no publishers are registered (automatic)
# No API change needed — broker inspects registered publishers at start()
#enhancement #good_first_issue #confluent #aiokafka #faststream #ag2ai
sent via relator
👍1