-
Notifications
You must be signed in to change notification settings - Fork 56
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make use of Postgres pub/sub ? #86
Comments
I am aware of that feature but have never tried it myself. |
IMO, if you get down this road, it's more efficient to just do everything in the DB here. |
This should be more efficient than polling the table. Especially on high polling frequency. We should give it a try. Like @Ocramius mentioned, a trigger should work. |
Interestingly, I was reading about this stuff yesterday:
That's haskell keeping a connection with the DB, but the |
Follow my first link "durable pub/sub". You will be surprised :D |
DB trigger for |
More like: "I don't know how to read, duh" |
Amp v2's postgres package supports listening and notifying that is entirely async. use Amp\Loop;
use Amp\Postgres;
Loop::run(function () {
$pool = Postgres\pool('host=localhost user=postgres');
$channel = "test";
/** @var \Amp\Postgres\Listener $listener */
$listener = yield $pool->listen($channel);
printf("Listening on channel '%s'\n", $listener->getChannel());
Loop::delay(3000, function () use ($listener) { // Unlisten in 3 seconds.
printf("Unlistening from channel '%s'\n", $listener->getChannel());
return $listener->unlisten();
});
Loop::delay(1000, function () use ($pool, $channel) {
return $pool->notify($channel, "Data 1"); // Send first notification.
});
Loop::delay(2000, function () use ($pool, $channel) {
return $pool->notify($channel, "Data 2"); // Send second notification.
});
while (yield $listener->advance()) {
$notification = $listener->getCurrent();
printf(
"Received notification from PID %d on channel '%s' with payload: %s\n",
$notification->pid,
$notification->channel,
$notification->payload
);
}
}); |
@trowski wow, that's nice. Did not read the source code carefully enough. Just to make sure I understand correctly: watcher callback is only invoked if data is available. But it will also be invoked if I would notify on channel Anyway, that is a killer feature for an amp postgres projection. Thx for the reply! |
@codeliner Yes, the watcher callback is only invoked when the connection associated with that watcher has data available. This is done by creating the watcher using the stream socket resource returned from You can listen and notify on multiple channels. All connection ops return promises that resolve with various objects depending on the op when complete. Make a new project requiring #!/usr/bin/env php
<?php
require __DIR__ . '/vendor/autoload.php';
use Amp\Iterator;
use Amp\Loop;
use Amp\Postgres;
Loop::run(function () {
$pool = Postgres\pool('host=localhost user=postgres');
$channel1 = "test1";
$channel2 = "test2";
/** @var \Amp\Postgres\Listener $listener1 */
$listener1 = yield $pool->listen($channel1);
printf("Listening on channel '%s'\n", $listener1->getChannel());
/** @var \Amp\Postgres\Listener $listener2 */
$listener2 = yield $pool->listen($channel2);
printf("Listening on channel '%s'\n", $listener2->getChannel());
Loop::delay(6000, function () use ($listener1) { // Unlisten in 6 seconds.
printf("Unlistening from channel '%s'\n", $listener1->getChannel());
return $listener1->unlisten();
});
Loop::delay(4000, function () use ($listener2) { // Unlisten in 4 seconds.
printf("Unlistening from channel '%s'\n", $listener2->getChannel());
return $listener2->unlisten();
});
Loop::delay(1000, function () use ($pool, $channel1) {
return $pool->notify($channel1, "Data 1.1");
});
Loop::delay(2000, function () use ($pool, $channel2) {
return $pool->notify($channel2, "Data 2.1");
});
Loop::delay(3000, function () use ($pool, $channel2) {
return $pool->notify($channel2, "Data 2.2");
});
Loop::delay(5000, function () use ($pool, $channel1) {
return $pool->notify($channel1, "Data 1.2");
});
// Merge both listeners into single iterator.
$listener = Iterator\merge([$listener1, $listener2]);
while (yield $listener->advance()) {
$notification = $listener->getCurrent();
printf(
"Received notification from PID %d on channel '%s' with payload: %s\n",
$notification->pid,
$notification->channel,
$notification->payload
);
}
}); A connection can have multiple listeners, though a connection can only perform a single query at a time. The library also provides a connection pool that sends concurrent queries over multiple connection. In general you will always want to use a connection pool. |
Coming to this again today (more then one year later) as we are planning event-store v8, I think this is another reason to pin this new major release to postgres only. I leave this issue open for now, until we get to implement and work really on v8 |
But MongoDB 4 has such a feature too and it is implemented here. |
For reference/inspiration, a similar PR is opened on symfony/messenger (symfony/symfony#35485). |
@gquemener thanks for the link. That's indeed a good inspiration. |
I read recently that PostgresSql supports durable pub/sub
I'm wondering if this is useful for our projections to avoid polling event streams directly ...
The mechanism is relatively simple: When creating a stream or appending to it we could use NOTIFY to indicate that new events are available.
Running projections can use pg_get_notify to check for new notifications and only query effected event streams if new events are available.
pg_get_notify still uses polling, so I'm not sure if it is worth the effort. I also checked amphp but they use pg_get_notify, too. But an AmphpProjectionManager would be nice so that a single non-blocking Postgres connection is shared between multiple projections. But that's a different story.
Thoughts @prolic @basz @sandrokeil @oqq @shochdoerfer ?
@shochdoerfer Do you use that feature or do you know someone who uses it?
The text was updated successfully, but these errors were encountered: