diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index 02b711f8..c9b40815 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -9,6 +9,7 @@ on: jobs: autoformatter: strategy: + fail-fast: false matrix: source-dir: ["./src/", "./examples/src/", "./slo/src/"] include: @@ -38,6 +39,7 @@ jobs: inspection: strategy: + fail-fast: false matrix: solutionPath: ["./src/YdbSdk.sln", "./examples/src/YdbExamples.sln", "./slo/src/src.sln"] runs-on: ubuntu-latest diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index a3e60cb8..5354c72b 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -8,6 +8,7 @@ on: jobs: unit-tests: strategy: + fail-fast: false matrix: os: [ubuntu-22.04, windows-2022, macos-12] dotnet-version: [6.0.x, 7.0.x] diff --git a/examples/src/BasicExample/BasicExample.cs b/examples/src/BasicExample/BasicExample.cs index 4bb2607d..13d4edf6 100644 --- a/examples/src/BasicExample/BasicExample.cs +++ b/examples/src/BasicExample/BasicExample.cs @@ -1,6 +1,4 @@ -using System; using System.Security.Cryptography.X509Certificates; -using System.Threading.Tasks; using Microsoft.Extensions.Logging; using Ydb.Sdk.Auth; using Ydb.Sdk.Services.Table; diff --git a/examples/src/BasicExample/BasicExample.csproj b/examples/src/BasicExample/BasicExample.csproj index bc6690be..912bc598 100644 --- a/examples/src/BasicExample/BasicExample.csproj +++ b/examples/src/BasicExample/BasicExample.csproj @@ -4,6 +4,7 @@ Exe net6.0 + enable enable Ydb.Sdk.Examples.BasicExample Ydb.Sdk.Examples @@ -11,8 +12,8 @@ git - https://github.com/ydb-platform/ydb-dotnet-examples - https://github.com/ydb-platform/ydb-dotnet-examples + https://github.com/ydb-platform/ydb-dotnet-sdk + https://github.com/ydb-platform/ydb-dotnet-sdk YANDEX LLC diff --git a/examples/src/BasicExample/DataQuery.cs b/examples/src/BasicExample/DataQuery.cs index f27406ac..4f5d1248 100644 --- a/examples/src/BasicExample/DataQuery.cs +++ b/examples/src/BasicExample/DataQuery.cs @@ -1,6 +1,3 @@ -using System; -using System.Collections.Generic; -using System.Threading.Tasks; using Ydb.Sdk.Services.Table; using Ydb.Sdk.Value; diff --git a/examples/src/BasicExample/FillData.cs b/examples/src/BasicExample/FillData.cs index b4f2fb48..48e6d4c8 100644 --- a/examples/src/BasicExample/FillData.cs +++ b/examples/src/BasicExample/FillData.cs @@ -1,9 +1,4 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Threading.Tasks; using Ydb.Sdk.Services.Table; -using Ydb.Sdk.Value; namespace Ydb.Sdk.Examples; @@ -50,152 +45,11 @@ REPLACE INTO episodes return await session.ExecuteDataQuery( query: query, txControl: TxControl.BeginSerializableRW().Commit(), - parameters: GetDataParams(), + parameters: DataUtils.GetDataParams(), settings: DefaultDataQuerySettings ); }); response.Status.EnsureSuccess(); } - - internal record Series(int SeriesId, string Title, DateTime ReleaseDate, string Info); - - internal record Season(int SeriesId, int SeasonId, string Title, DateTime FirstAired, DateTime LastAired); - - internal record Episode(int SeriesId, int SeasonId, int EpisodeId, string Title, DateTime AirDate); - - private static Dictionary GetDataParams() - { - var series = new Series[] - { - new(SeriesId: 1, Title: "IT Crowd", ReleaseDate: DateTime.Parse("2006-02-03"), - Info: "The IT Crowd is a British sitcom produced by Channel 4, written by Graham Linehan, " + - "produced by Ash Atalla and starring Chris O'Dowd, Richard Ayoade, Katherine Parkinson, " + - "and Matt Berry."), - new(SeriesId: 2, Title: "Silicon Valley", ReleaseDate: DateTime.Parse("2014-04-06"), - Info: "Silicon Valley is an American comedy television series created by Mike Judge, " + - "John Altschuler and Dave Krinsky. The series focuses on five young men who founded " + - "a startup company in Silicon Valley.") - }; - - var seasons = new Season[] - { - new(1, 1, "Season 1", DateTime.Parse("2006-02-03"), DateTime.Parse("2006-03-03")), - new(1, 2, "Season 2", DateTime.Parse("2007-08-24"), DateTime.Parse("2007-09-28")), - new(1, 3, "Season 3", DateTime.Parse("2008-11-21"), DateTime.Parse("2008-12-26")), - new(1, 4, "Season 4", DateTime.Parse("2010-06-25"), DateTime.Parse("2010-07-30")), - new(2, 1, "Season 1", DateTime.Parse("2014-04-06"), DateTime.Parse("2014-06-01")), - new(2, 2, "Season 2", DateTime.Parse("2015-04-12"), DateTime.Parse("2015-06-14")), - new(2, 3, "Season 3", DateTime.Parse("2016-04-24"), DateTime.Parse("2016-06-26")), - new(2, 4, "Season 4", DateTime.Parse("2017-04-23"), DateTime.Parse("2017-06-25")), - new(2, 5, "Season 5", DateTime.Parse("2018-03-25"), DateTime.Parse("2018-05-13")) - }; - - var episodes = new Episode[] - { - new(1, 1, 1, "Yesterday's Jam", DateTime.Parse("2006-02-03")), - new(1, 1, 2, "Calamity Jen", DateTime.Parse("2006-02-03")), - new(1, 1, 3, "Fifty-Fifty", DateTime.Parse("2006-02-10")), - new(1, 1, 4, "The Red Door", DateTime.Parse("2006-02-17")), - new(1, 1, 5, "The Haunting of Bill Crouse", DateTime.Parse("2006-02-24")), - new(1, 1, 6, "Aunt Irma Visits", DateTime.Parse("2006-03-03")), - new(1, 2, 1, "The Work Outing", DateTime.Parse("2006-08-24")), - new(1, 2, 2, "Return of the Golden Child", DateTime.Parse("2007-08-31")), - new(1, 2, 3, "Moss and the German", DateTime.Parse("2007-09-07")), - new(1, 2, 4, "The Dinner Party", DateTime.Parse("2007-09-14")), - new(1, 2, 5, "Smoke and Mirrors", DateTime.Parse("2007-09-21")), - new(1, 2, 6, "Men Without Women", DateTime.Parse("2007-09-28")), - new(1, 3, 1, "From Hell", DateTime.Parse("2008-11-21")), - new(1, 3, 2, "Are We Not Men?", DateTime.Parse("2008-11-28")), - new(1, 3, 3, "Tramps Like Us", DateTime.Parse("2008-12-05")), - new(1, 3, 4, "The Speech", DateTime.Parse("2008-12-12")), - new(1, 3, 5, "Friendface", DateTime.Parse("2008-12-19")), - new(1, 3, 6, "Calendar Geeks", DateTime.Parse("2008-12-26")), - new(1, 4, 1, "Jen The Fredo", DateTime.Parse("2010-06-25")), - new(1, 4, 2, "The Final Countdown", DateTime.Parse("2010-07-02")), - new(1, 4, 3, "Something Happened", DateTime.Parse("2010-07-09")), - new(1, 4, 4, "Italian For Beginners", DateTime.Parse("2010-07-16")), - new(1, 4, 5, "Bad Boys", DateTime.Parse("2010-07-23")), - new(1, 4, 6, "Reynholm vs Reynholm", DateTime.Parse("2010-07-30")), - new(2, 1, 1, "Minimum Viable Product", DateTime.Parse("2014-04-06")), - new(2, 1, 2, "The Cap Table", DateTime.Parse("2014-04-13")), - new(2, 1, 3, "Articles of Incorporation", DateTime.Parse("2014-04-20")), - new(2, 1, 4, "Fiduciary Duties", DateTime.Parse("2014-04-27")), - new(2, 1, 5, "Signaling Risk", DateTime.Parse("2014-05-04")), - new(2, 1, 6, "Third Party Insourcing", DateTime.Parse("2014-05-11")), - new(2, 1, 7, "Proof of Concept", DateTime.Parse("2014-05-18")), - new(2, 1, 8, "Optimal Tip-to-Tip Efficiency", DateTime.Parse("2014-06-01")), - new(2, 2, 1, "Sand Hill Shuffle", DateTime.Parse("2015-04-12")), - new(2, 2, 2, "Runaway Devaluation", DateTime.Parse("2015-04-19")), - new(2, 2, 3, "Bad Money", DateTime.Parse("2015-04-26")), - new(2, 2, 4, "The Lady", DateTime.Parse("2015-05-03")), - new(2, 2, 5, "Server Space", DateTime.Parse("2015-05-10")), - new(2, 2, 6, "Homicide", DateTime.Parse("2015-05-17")), - new(2, 2, 7, "Adult Content", DateTime.Parse("2015-05-24")), - new(2, 2, 8, "White Hat/Black Hat", DateTime.Parse("2015-05-31")), - new(2, 2, 9, "Binding Arbitration", DateTime.Parse("2015-06-07")), - new(2, 2, 10, "Two Days of the Condor", DateTime.Parse("2015-06-14")), - new(2, 3, 1, "Founder Friendly", DateTime.Parse("2016-04-24")), - new(2, 3, 2, "Two in the Box", DateTime.Parse("2016-05-01")), - new(2, 3, 3, "Meinertzhagen's Haversack", DateTime.Parse("2016-05-08")), - new(2, 3, 4, "Maleant Data Systems Solutions", DateTime.Parse("2016-05-15")), - new(2, 3, 5, "The Empty Chair", DateTime.Parse("2016-05-22")), - new(2, 3, 6, "Bachmanity Insanity", DateTime.Parse("2016-05-29")), - new(2, 3, 7, "To Build a Better Beta", DateTime.Parse("2016-06-05")), - new(2, 3, 8, "Bachman's Earnings Over-Ride", DateTime.Parse("2016-06-12")), - new(2, 3, 9, "Daily Active Users", DateTime.Parse("2016-06-19")), - new(2, 3, 10, "The Uptick", DateTime.Parse("2016-06-26")), - new(2, 4, 1, "Success Failure", DateTime.Parse("2017-04-23")), - new(2, 4, 2, "Terms of Service", DateTime.Parse("2017-04-30")), - new(2, 4, 3, "Intellectual Property", DateTime.Parse("2017-05-07")), - new(2, 4, 4, "Teambuilding Exercise", DateTime.Parse("2017-05-14")), - new(2, 4, 5, "The Blood Boy", DateTime.Parse("2017-05-21")), - new(2, 4, 6, "Customer Service", DateTime.Parse("2017-05-28")), - new(2, 4, 7, "The Patent Troll", DateTime.Parse("2017-06-04")), - new(2, 4, 8, "The Keenan Vortex", DateTime.Parse("2017-06-11")), - new(2, 4, 9, "Hooli-Con", DateTime.Parse("2017-06-18")), - new(2, 4, 10, "Server Error", DateTime.Parse("2017-06-25")), - new(2, 5, 1, "Grow Fast or Die Slow", DateTime.Parse("2018-03-25")), - new(2, 5, 2, "Reorientation", DateTime.Parse("2018-04-01")), - new(2, 5, 3, "Chief Operating Officer", DateTime.Parse("2018-04-08")), - new(2, 5, 4, "Tech Evangelist", DateTime.Parse("2018-04-15")), - new(2, 5, 5, "Facial Recognition", DateTime.Parse("2018-04-22")), - new(2, 5, 6, "Artificial Emotional Intelligence", DateTime.Parse("2018-04-29")), - new(2, 5, 7, "Initial Coin Offering", DateTime.Parse("2018-05-06")), - new(2, 5, 8, "Fifty-One Percent", DateTime.Parse("2018-05-13")) - }; - - var seriesData = series.Select(s => YdbValue.MakeStruct(new Dictionary - { - { "series_id", YdbValue.MakeUint64((ulong)s.SeriesId) }, - { "title", YdbValue.MakeUtf8(s.Title) }, - { "series_info", YdbValue.MakeUtf8(s.Info) }, - { "release_date", YdbValue.MakeDate(s.ReleaseDate) } - })).ToList(); - - var seasonsData = seasons.Select(s => YdbValue.MakeStruct(new Dictionary - { - { "series_id", YdbValue.MakeUint64((ulong)s.SeriesId) }, - { "season_id", YdbValue.MakeUint64((ulong)s.SeasonId) }, - { "title", YdbValue.MakeUtf8(s.Title) }, - { "first_aired", YdbValue.MakeDate(s.FirstAired) }, - { "last_aired", YdbValue.MakeDate(s.LastAired) } - })).ToList(); - - var episodesData = episodes.Select(e => YdbValue.MakeStruct(new Dictionary - { - { "series_id", YdbValue.MakeUint64((ulong)e.SeriesId) }, - { "season_id", YdbValue.MakeUint64((ulong)e.SeasonId) }, - { "episode_id", YdbValue.MakeUint64((ulong)e.EpisodeId) }, - { "title", YdbValue.MakeUtf8(e.Title) }, - { "air_date", YdbValue.MakeDate(e.AirDate) } - })).ToList(); - - return new Dictionary - { - { "$seriesData", YdbValue.MakeList(seriesData) }, - { "$seasonsData", YdbValue.MakeList(seasonsData) }, - { "$episodesData", YdbValue.MakeList(episodesData) } - }; - } } \ No newline at end of file diff --git a/examples/src/BasicExample/InteractiveTx.cs b/examples/src/BasicExample/InteractiveTx.cs index cfeca9ff..e49390bc 100644 --- a/examples/src/BasicExample/InteractiveTx.cs +++ b/examples/src/BasicExample/InteractiveTx.cs @@ -1,6 +1,3 @@ -using System; -using System.Collections.Generic; -using System.Threading.Tasks; using Ydb.Sdk.Services.Table; using Ydb.Sdk.Value; diff --git a/examples/src/BasicExample/Program.cs b/examples/src/BasicExample/Program.cs index e3e5b38d..b0c08eb8 100644 --- a/examples/src/BasicExample/Program.cs +++ b/examples/src/BasicExample/Program.cs @@ -1,5 +1,3 @@ -using System; -using System.Threading.Tasks; using CommandLine; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; diff --git a/examples/src/BasicExample/ReadTable.cs b/examples/src/BasicExample/ReadTable.cs index c74e4f09..dd1c4829 100644 --- a/examples/src/BasicExample/ReadTable.cs +++ b/examples/src/BasicExample/ReadTable.cs @@ -1,6 +1,3 @@ -using System; -using System.Collections.Generic; -using System.Threading.Tasks; using Ydb.Sdk.Services.Table; namespace Ydb.Sdk.Examples; diff --git a/examples/src/BasicExample/ScanQuery.cs b/examples/src/BasicExample/ScanQuery.cs index 22ca3fee..e8effbc2 100644 --- a/examples/src/BasicExample/ScanQuery.cs +++ b/examples/src/BasicExample/ScanQuery.cs @@ -1,6 +1,3 @@ -using System; -using System.Collections.Generic; -using System.Threading.Tasks; using Ydb.Sdk.Value; namespace Ydb.Sdk.Examples; diff --git a/examples/src/BasicExample/SchemeQuery.cs b/examples/src/BasicExample/SchemeQuery.cs index 216ef271..24bbaa23 100644 --- a/examples/src/BasicExample/SchemeQuery.cs +++ b/examples/src/BasicExample/SchemeQuery.cs @@ -1,5 +1,3 @@ -using System.Threading.Tasks; - namespace Ydb.Sdk.Examples; internal partial class BasicExample diff --git a/examples/src/Common/Common.csproj b/examples/src/Common/Common.csproj index 7c6e5df5..a7cdb16c 100644 --- a/examples/src/Common/Common.csproj +++ b/examples/src/Common/Common.csproj @@ -9,15 +9,18 @@ git - https://github.com/ydb-platform/ydb-dotnet-examples - https://github.com/ydb-platform/ydb-dotnet-examples + https://github.com/ydb-platform/ydb-dotnet-sdk + https://github.com/ydb-platform/ydb-dotnet-sdk YANDEX LLC - + + + + diff --git a/examples/src/Common/DataUtils.cs b/examples/src/Common/DataUtils.cs new file mode 100644 index 00000000..fbe048ce --- /dev/null +++ b/examples/src/Common/DataUtils.cs @@ -0,0 +1,173 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using Ydb.Sdk.Value; + +namespace Ydb.Sdk.Examples; + +public record Series(ulong SeriesId, string Title, DateTime ReleaseDate, string Info) +{ + public static Series FromRow(Value.ResultSet.Row row) + { + return new Series( + SeriesId: (ulong)row["series_id"].GetOptionalUint64()!, + Title: (string)row["title"]!, + ReleaseDate: (DateTime)row["release_date"].GetOptionalDate()!, + Info: (string)row["series_info"]! + ); + } +} + +public record Season(ulong SeriesId, ulong SeasonId, string Title, DateTime FirstAired, DateTime LastAired); + +public record Episode(ulong SeriesId, ulong SeasonId, ulong EpisodeId, string Title, DateTime AirDate) +{ + public static Episode FromRow(Value.ResultSet.Row row) + { + return new Episode( + SeriesId: (ulong)row["series_id"].GetOptionalUint64()!, + SeasonId: (ulong)row["season_id"].GetOptionalUint64()!, + EpisodeId: (ulong)row["episode_id"].GetOptionalUint64()!, + Title: (string)row["title"]!, + AirDate: (DateTime)row["air_date"].GetOptionalDate()! + ); + } +} + +public static class DataUtils +{ + public static Dictionary GetDataParams() + { + var series = new Series[] + { + new(SeriesId: 1, Title: "IT Crowd", ReleaseDate: DateTime.Parse("2006-02-03"), + Info: "The IT Crowd is a British sitcom produced by Channel 4, written by Graham Linehan, " + + "produced by Ash Atalla and starring Chris O'Dowd, Richard Ayoade, Katherine Parkinson, " + + "and Matt Berry."), + new(SeriesId: 2, Title: "Silicon Valley", ReleaseDate: DateTime.Parse("2014-04-06"), + Info: "Silicon Valley is an American comedy television series created by Mike Judge, " + + "John Altschuler and Dave Krinsky. The series focuses on five young men who founded " + + "a startup company in Silicon Valley.") + }; + + var seasons = new Season[] + { + new(1, 1, "Season 1", DateTime.Parse("2006-02-03"), DateTime.Parse("2006-03-03")), + new(1, 2, "Season 2", DateTime.Parse("2007-08-24"), DateTime.Parse("2007-09-28")), + new(1, 3, "Season 3", DateTime.Parse("2008-11-21"), DateTime.Parse("2008-12-26")), + new(1, 4, "Season 4", DateTime.Parse("2010-06-25"), DateTime.Parse("2010-07-30")), + new(2, 1, "Season 1", DateTime.Parse("2014-04-06"), DateTime.Parse("2014-06-01")), + new(2, 2, "Season 2", DateTime.Parse("2015-04-12"), DateTime.Parse("2015-06-14")), + new(2, 3, "Season 3", DateTime.Parse("2016-04-24"), DateTime.Parse("2016-06-26")), + new(2, 4, "Season 4", DateTime.Parse("2017-04-23"), DateTime.Parse("2017-06-25")), + new(2, 5, "Season 5", DateTime.Parse("2018-03-25"), DateTime.Parse("2018-05-13")) + }; + + var episodes = new Episode[] + { + new(1, 1, 1, "Yesterday's Jam", DateTime.Parse("2006-02-03")), + new(1, 1, 2, "Calamity Jen", DateTime.Parse("2006-02-03")), + new(1, 1, 3, "Fifty-Fifty", DateTime.Parse("2006-02-10")), + new(1, 1, 4, "The Red Door", DateTime.Parse("2006-02-17")), + new(1, 1, 5, "The Haunting of Bill Crouse", DateTime.Parse("2006-02-24")), + new(1, 1, 6, "Aunt Irma Visits", DateTime.Parse("2006-03-03")), + new(1, 2, 1, "The Work Outing", DateTime.Parse("2006-08-24")), + new(1, 2, 2, "Return of the Golden Child", DateTime.Parse("2007-08-31")), + new(1, 2, 3, "Moss and the German", DateTime.Parse("2007-09-07")), + new(1, 2, 4, "The Dinner Party", DateTime.Parse("2007-09-14")), + new(1, 2, 5, "Smoke and Mirrors", DateTime.Parse("2007-09-21")), + new(1, 2, 6, "Men Without Women", DateTime.Parse("2007-09-28")), + new(1, 3, 1, "From Hell", DateTime.Parse("2008-11-21")), + new(1, 3, 2, "Are We Not Men?", DateTime.Parse("2008-11-28")), + new(1, 3, 3, "Tramps Like Us", DateTime.Parse("2008-12-05")), + new(1, 3, 4, "The Speech", DateTime.Parse("2008-12-12")), + new(1, 3, 5, "Friendface", DateTime.Parse("2008-12-19")), + new(1, 3, 6, "Calendar Geeks", DateTime.Parse("2008-12-26")), + new(1, 4, 1, "Jen The Fredo", DateTime.Parse("2010-06-25")), + new(1, 4, 2, "The Final Countdown", DateTime.Parse("2010-07-02")), + new(1, 4, 3, "Something Happened", DateTime.Parse("2010-07-09")), + new(1, 4, 4, "Italian For Beginners", DateTime.Parse("2010-07-16")), + new(1, 4, 5, "Bad Boys", DateTime.Parse("2010-07-23")), + new(1, 4, 6, "Reynholm vs Reynholm", DateTime.Parse("2010-07-30")), + new(2, 1, 1, "Minimum Viable Product", DateTime.Parse("2014-04-06")), + new(2, 1, 2, "The Cap Table", DateTime.Parse("2014-04-13")), + new(2, 1, 3, "Articles of Incorporation", DateTime.Parse("2014-04-20")), + new(2, 1, 4, "Fiduciary Duties", DateTime.Parse("2014-04-27")), + new(2, 1, 5, "Signaling Risk", DateTime.Parse("2014-05-04")), + new(2, 1, 6, "Third Party Insourcing", DateTime.Parse("2014-05-11")), + new(2, 1, 7, "Proof of Concept", DateTime.Parse("2014-05-18")), + new(2, 1, 8, "Optimal Tip-to-Tip Efficiency", DateTime.Parse("2014-06-01")), + new(2, 2, 1, "Sand Hill Shuffle", DateTime.Parse("2015-04-12")), + new(2, 2, 2, "Runaway Devaluation", DateTime.Parse("2015-04-19")), + new(2, 2, 3, "Bad Money", DateTime.Parse("2015-04-26")), + new(2, 2, 4, "The Lady", DateTime.Parse("2015-05-03")), + new(2, 2, 5, "Server Space", DateTime.Parse("2015-05-10")), + new(2, 2, 6, "Homicide", DateTime.Parse("2015-05-17")), + new(2, 2, 7, "Adult Content", DateTime.Parse("2015-05-24")), + new(2, 2, 8, "White Hat/Black Hat", DateTime.Parse("2015-05-31")), + new(2, 2, 9, "Binding Arbitration", DateTime.Parse("2015-06-07")), + new(2, 2, 10, "Two Days of the Condor", DateTime.Parse("2015-06-14")), + new(2, 3, 1, "Founder Friendly", DateTime.Parse("2016-04-24")), + new(2, 3, 2, "Two in the Box", DateTime.Parse("2016-05-01")), + new(2, 3, 3, "Meinertzhagen's Haversack", DateTime.Parse("2016-05-08")), + new(2, 3, 4, "Maleant Data Systems Solutions", DateTime.Parse("2016-05-15")), + new(2, 3, 5, "The Empty Chair", DateTime.Parse("2016-05-22")), + new(2, 3, 6, "Bachmanity Insanity", DateTime.Parse("2016-05-29")), + new(2, 3, 7, "To Build a Better Beta", DateTime.Parse("2016-06-05")), + new(2, 3, 8, "Bachman's Earnings Over-Ride", DateTime.Parse("2016-06-12")), + new(2, 3, 9, "Daily Active Users", DateTime.Parse("2016-06-19")), + new(2, 3, 10, "The Uptick", DateTime.Parse("2016-06-26")), + new(2, 4, 1, "Success Failure", DateTime.Parse("2017-04-23")), + new(2, 4, 2, "Terms of Service", DateTime.Parse("2017-04-30")), + new(2, 4, 3, "Intellectual Property", DateTime.Parse("2017-05-07")), + new(2, 4, 4, "Teambuilding Exercise", DateTime.Parse("2017-05-14")), + new(2, 4, 5, "The Blood Boy", DateTime.Parse("2017-05-21")), + new(2, 4, 6, "Customer Service", DateTime.Parse("2017-05-28")), + new(2, 4, 7, "The Patent Troll", DateTime.Parse("2017-06-04")), + new(2, 4, 8, "The Keenan Vortex", DateTime.Parse("2017-06-11")), + new(2, 4, 9, "Hooli-Con", DateTime.Parse("2017-06-18")), + new(2, 4, 10, "Server Error", DateTime.Parse("2017-06-25")), + new(2, 5, 1, "Grow Fast or Die Slow", DateTime.Parse("2018-03-25")), + new(2, 5, 2, "Reorientation", DateTime.Parse("2018-04-01")), + new(2, 5, 3, "Chief Operating Officer", DateTime.Parse("2018-04-08")), + new(2, 5, 4, "Tech Evangelist", DateTime.Parse("2018-04-15")), + new(2, 5, 5, "Facial Recognition", DateTime.Parse("2018-04-22")), + new(2, 5, 6, "Artificial Emotional Intelligence", DateTime.Parse("2018-04-29")), + new(2, 5, 7, "Initial Coin Offering", DateTime.Parse("2018-05-06")), + new(2, 5, 8, "Fifty-One Percent", DateTime.Parse("2018-05-13")) + }; + + var seriesData = series.Select(s => YdbValue.MakeStruct(new Dictionary + { + { "series_id", YdbValue.MakeUint64(s.SeriesId) }, + { "title", YdbValue.MakeUtf8(s.Title) }, + { "series_info", YdbValue.MakeUtf8(s.Info) }, + { "release_date", YdbValue.MakeDate(s.ReleaseDate) } + })).ToList(); + + var seasonsData = seasons.Select(s => YdbValue.MakeStruct(new Dictionary + { + { "series_id", YdbValue.MakeUint64(s.SeriesId) }, + { "season_id", YdbValue.MakeUint64(s.SeasonId) }, + { "title", YdbValue.MakeUtf8(s.Title) }, + { "first_aired", YdbValue.MakeDate(s.FirstAired) }, + { "last_aired", YdbValue.MakeDate(s.LastAired) } + })).ToList(); + + var episodesData = episodes.Select(e => YdbValue.MakeStruct(new Dictionary + { + { "series_id", YdbValue.MakeUint64(e.SeriesId) }, + { "season_id", YdbValue.MakeUint64(e.SeasonId) }, + { "episode_id", YdbValue.MakeUint64(e.EpisodeId) }, + { "title", YdbValue.MakeUtf8(e.Title) }, + { "air_date", YdbValue.MakeDate(e.AirDate) } + })).ToList(); + + return new Dictionary + { + { "$seriesData", YdbValue.MakeList(seriesData) }, + { "$seasonsData", YdbValue.MakeList(seasonsData) }, + { "$episodesData", YdbValue.MakeList(episodesData) } + }; + } +} \ No newline at end of file diff --git a/examples/src/QueryExample/Program.cs b/examples/src/QueryExample/Program.cs new file mode 100644 index 00000000..ea237423 --- /dev/null +++ b/examples/src/QueryExample/Program.cs @@ -0,0 +1,59 @@ +using CommandLine; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; + +namespace Ydb.Sdk.Examples; + +internal class CmdOptions +{ + [Option('e', "endpoint", Required = true, HelpText = "Database endpoint")] + public string Endpoint { get; set; } = ""; + + [Option('d', "database", Required = true, HelpText = "Database name")] + public string Database { get; set; } = ""; + + [Option('p', "path", HelpText = "Base path for tables")] + public string Path { get; set; } = "ydb-dotnet-basic"; + + [Option("anonymous", Required = false, HelpText = "Fallback anonymous")] + public bool FallbackAnonymous { get; set; } = false; +} + +// See https://aka.ms/new-console-template for more information + +internal static class Program +{ + private static ServiceProvider GetServiceProvider() + { + return new ServiceCollection() + .AddLogging(configure => configure.AddConsole().SetMinimumLevel(LogLevel.Information)) + .BuildServiceProvider(); + } + + private static async Task Run(CmdOptions cmdOptions) + { + await using var serviceProvider = GetServiceProvider(); + var loggerFactory = serviceProvider.GetService(); + + loggerFactory ??= NullLoggerFactory.Instance; + + await QueryExample.Run( + endpoint: cmdOptions.Endpoint, + database: cmdOptions.Database, + credentialsProvider: await AuthUtils.MakeCredentialsFromEnv( + fallbackAnonymous: cmdOptions.FallbackAnonymous, + loggerFactory: loggerFactory), + customServerCertificate: AuthUtils.GetCustomServerCertificate(), + path: cmdOptions.Path, + loggerFactory: loggerFactory + ); + } + + public static async Task Main(string[] args) + { + AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true); + + await Parser.Default.ParseArguments(args).WithParsedAsync(Run); + } +} \ No newline at end of file diff --git a/examples/src/QueryExample/QueryExample.cs b/examples/src/QueryExample/QueryExample.cs new file mode 100644 index 00000000..cbf6d121 --- /dev/null +++ b/examples/src/QueryExample/QueryExample.cs @@ -0,0 +1,382 @@ +using System.Security.Cryptography.X509Certificates; +using Microsoft.Extensions.Logging; +using Ydb.Sdk.Auth; +using Ydb.Sdk.Services.Query; +using Ydb.Sdk.Services.Table; +using Ydb.Sdk.Value; + +namespace Ydb.Sdk.Examples; + +public class QueryExample +{ + private QueryClient Client { get; } + private string BasePath { get; } + + private Driver Driver { get; } + + protected QueryExample(QueryClient client, string database, string path, Driver driver) + { + Client = client; + BasePath = string.Join('/', database, path); + Driver = driver; + } + + + public static async Task Run( + string endpoint, + string database, + ICredentialsProvider credentialsProvider, + X509Certificate? customServerCertificate, + string path, + ILoggerFactory loggerFactory) + { + var config = new DriverConfig( + endpoint: endpoint, + database: database, + credentials: credentialsProvider, + customServerCertificate: customServerCertificate + ); + + await using var driver = await Driver.CreateInitialized( + config: config, + loggerFactory: loggerFactory + ); + + using var tableClient = new QueryClient(driver, new QueryClientConfig()); + + var example = new QueryExample(tableClient, database, path, driver); + + await example.SchemeQuery(); + await example.FillData(); + await example.SimpleSelect(1); + await example.SimpleUpsert(10, "Coming soon", DateTime.UtcNow); + await example.SimpleSelect(10); + await example.InteractiveTx(); + await example.StreamSelect(); + await example.ReadScalar(); + await example.ReadSingleRow(); + await example.ReadAllRows(); + await example.ReadAllResultSets(); + } + + private static ExecuteQuerySettings DefaultQuerySettings => + new() + { + // Transport timeout from the moment operation was sent to server. It is useful in case + // of possible network issues, to that query doesn't hang forever. + // It is recommended to set this value to a larger value than OperationTimeout to give + // server some time to issue a response. + TransportTimeout = TimeSpan.FromSeconds(5), + + ExecMode = ExecMode.Execute, + Syntax = Syntax.YqlV1 + }; + + + private async Task SchemeQuery() + { + var createQuery = @$" + PRAGMA TablePathPrefix('{BasePath}'); + + CREATE TABLE series ( + series_id Uint64, + title Utf8, + series_info Utf8, + release_date Date, + PRIMARY KEY (series_id) + ); + + CREATE TABLE seasons ( + series_id Uint64, + season_id Uint64, + title Utf8, + first_aired Date, + last_aired Date, + PRIMARY KEY (series_id, season_id) + ); + + CREATE TABLE episodes ( + series_id Uint64, + season_id Uint64, + episode_id Uint64, + title Utf8, + air_date Date, + PRIMARY KEY (series_id, season_id, episode_id) + ); + "; + + // TODO replace with QueryClient + // var response = await Client.Exec( + // queryString: createQuery + // ); + // response.EnsureSuccess(); + using var client = new TableClient(Driver); + var response = await client.SessionExec(async session => + await session.ExecuteSchemeQuery(createQuery)); + + response.Status.EnsureSuccess(); + } + + private async Task FillData() + { + var query = @$" + PRAGMA TablePathPrefix('{BasePath}'); + + REPLACE INTO series + SELECT * FROM AS_TABLE($seriesData); + + REPLACE INTO seasons + SELECT * FROM AS_TABLE($seasonsData); + + REPLACE INTO episodes + SELECT * FROM AS_TABLE($episodesData); + "; + + var response = await Client.Exec( + queryString: query, + parameters: DataUtils.GetDataParams(), + txModeSettings: new TxModeSerializableSettings(), + executeQuerySettings: DefaultQuerySettings + ); + response.EnsureSuccess(); + } + + + private async Task SimpleSelect(ulong id) + { + var query = @$" + PRAGMA TablePathPrefix('{BasePath}'); + + SELECT * + FROM series + WHERE series_id = $id; + "; + + var parameters = new Dictionary { { "$id", (YdbValue)id } }; + + + var response = await Client.ReadAllRows( + query, + parameters: parameters + ); + response.EnsureSuccess(); + + if (response.Result is not null) + { + foreach (var row in response.Result) + { + var series = Series.FromRow(row); + Console.WriteLine("> Series, " + + $"series_id: {series.SeriesId}, " + + $"title: {series.Title}, " + + $"release_date: {series.ReleaseDate}"); + } + } + } + + + private async Task SimpleUpsert(ulong id, string title, DateTime date) + { + var query = @$" + PRAGMA TablePathPrefix('{BasePath}'); + + UPSERT INTO series (series_id, title, release_date) VALUES + ($id, $title, $release_date); + "; + var parameters = new Dictionary + { + { "$id", YdbValue.MakeUint64(id) }, + { "$title", YdbValue.MakeUtf8(title) }, + { "$release_date", YdbValue.MakeDate(date) } + }; + + var response = await Client.Exec( + query, + parameters + ); + response.EnsureSuccess(); + } + + + private async Task InteractiveTx() + { + var doTxResponse = await Client.DoTx( + func: async tx => + { + var query1 = @$" + PRAGMA TablePathPrefix('{BasePath}'); + + SELECT first_aired FROM seasons + WHERE series_id = $series_id AND season_id = $season_id + LIMIT 1; + "; + var parameters1 = new Dictionary + { + { "$series_id", YdbValue.MakeUint64(1) }, + { "$season_id", YdbValue.MakeUint64(3) } + }; + + var response = await tx.ReadScalar( + query1, + parameters1 + ); + response.EnsureSuccess(); + + var firstAired = response.Result!.GetOptionalDate()!.Value; + var newAired = firstAired.AddDays(2); + + var query2 = @$" + PRAGMA TablePathPrefix('{BasePath}'); + + UPSERT INTO seasons (series_id, season_id, first_aired) VALUES + ($series_id, $season_id, $air_date); + "; + var parameters2 = new Dictionary + { + { "$series_id", YdbValue.MakeUint64(1) }, + { "$season_id", YdbValue.MakeUint64(3) }, + { "$air_date", YdbValue.MakeDate(newAired) } + }; + + var response2 = await tx.Exec(query2, parameters2); + response2.EnsureSuccess(); + } + ); + doTxResponse.EnsureSuccess(); + } + + private async Task StreamSelect() + { + var query = @$" + PRAGMA TablePathPrefix('{BasePath}'); + + SELECT * + FROM series; + "; + + + var response = await Client.Query( + query, + func: async stream => + { + var result = new List(); + await foreach (var part in stream) + { + if (part.ResultSet == null) continue; + foreach (var row in part.ResultSet.Rows) + { + result.Add(Series.FromRow(row)); + } + } + + return result; + } + ).ConfigureAwait(false); + response.EnsureSuccess(); + if (response.Result != null) + { + foreach (var series in response.Result) + { + Console.WriteLine(series); + } + } + } + + private async Task ReadScalar() + { + var query = @$" + PRAGMA TablePathPrefix('{BasePath}'); + + SELECT COUNT(*) + FROM series; + "; + + + var response = await Client.ReadScalar(query).ConfigureAwait(false); + response.EnsureSuccess(); + + var count = response.Result!.GetUint64(); + + Console.WriteLine($"There is {count} rows in 'series' table"); + } + + private async Task ReadSingleRow() + { + Console.WriteLine("StreamSelect"); + var query = @$" + PRAGMA TablePathPrefix('{BasePath}'); + + SELECT * + FROM series + LIMIT 1; + "; + + + var response = await Client.ReadSingleRow(query).ConfigureAwait(false); + response.EnsureSuccess(); + + var series = Series.FromRow(response.Result!); + + Console.WriteLine($"First row in 'series' table is {series}"); + } + + private async Task ReadAllRows() + { + Console.WriteLine("StreamSelect"); + var query = @$" + PRAGMA TablePathPrefix('{BasePath}'); + + SELECT * + FROM series; + "; + + + var response = await Client.ReadAllRows(query).ConfigureAwait(false); + response.EnsureSuccess(); + + var series = response.Result!.Select(Series.FromRow); + + Console.WriteLine("'series' table contains:"); + foreach (var elem in series) + { + Console.WriteLine($"\t{elem}"); + } + } + + private async Task ReadAllResultSets() + { + Console.WriteLine("StreamSelect"); + var query = @$" + PRAGMA TablePathPrefix('{BasePath}'); + + SELECT * + FROM series; -- First result set + + SELECT * + FROM episodes; -- Second result set + "; + + + var response = await Client.ReadAllResultSets(query).ConfigureAwait(false); + response.EnsureSuccess(); + + var resultSets = response.Result!; + + var seriesSet = resultSets[0]; + var episodesSet = resultSets[1]; + + Console.WriteLine("Multiple sets selected:"); + + Console.WriteLine("\t'series' contains:"); + foreach (var row in seriesSet) + { + Console.WriteLine($"\t\t{Series.FromRow(row)}"); + } + + Console.WriteLine("\t'episodes' contains:"); + foreach (var row in episodesSet) + { + Console.WriteLine($"\t\t{Episode.FromRow(row)}"); + } + } +} \ No newline at end of file diff --git a/examples/src/QueryExample/QueryExample.csproj b/examples/src/QueryExample/QueryExample.csproj new file mode 100644 index 00000000..6e487ba2 --- /dev/null +++ b/examples/src/QueryExample/QueryExample.csproj @@ -0,0 +1,28 @@ + + + + Exe + net6.0 + enable + enable + Ydb.Sdk.Examples.QueryExample + Ydb.Sdk.Examples + + + + git + https://github.com/ydb-platform/ydb-dotnet-sdk + https://github.com/ydb-platform/ydb-dotnet-sdk + YANDEX LLC + + + + + + + + + + + + diff --git a/examples/src/YdbExamples.sln b/examples/src/YdbExamples.sln index 926c9a52..c46a8179 100644 --- a/examples/src/YdbExamples.sln +++ b/examples/src/YdbExamples.sln @@ -7,6 +7,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Common", "Common\Common.csp EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "BasicExample", "BasicExample\BasicExample.csproj", "{9DAD5FF3-B7C2-4A9E-B4B2-A0FBD6097727}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "QueryExample", "QueryExample\QueryExample.csproj", "{0BA2CD4F-BF38-4C0D-878B-3D0C2C2970D9}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -21,6 +23,10 @@ Global {9DAD5FF3-B7C2-4A9E-B4B2-A0FBD6097727}.Debug|Any CPU.Build.0 = Debug|Any CPU {9DAD5FF3-B7C2-4A9E-B4B2-A0FBD6097727}.Release|Any CPU.ActiveCfg = Release|Any CPU {9DAD5FF3-B7C2-4A9E-B4B2-A0FBD6097727}.Release|Any CPU.Build.0 = Release|Any CPU + {0BA2CD4F-BF38-4C0D-878B-3D0C2C2970D9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {0BA2CD4F-BF38-4C0D-878B-3D0C2C2970D9}.Debug|Any CPU.Build.0 = Debug|Any CPU + {0BA2CD4F-BF38-4C0D-878B-3D0C2C2970D9}.Release|Any CPU.ActiveCfg = Release|Any CPU + {0BA2CD4F-BF38-4C0D-878B-3D0C2C2970D9}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/slo/src/Client.cs b/slo/src/Client.cs index 810ca198..4cdc7279 100644 --- a/slo/src/Client.cs +++ b/slo/src/Client.cs @@ -2,6 +2,7 @@ using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; using Ydb.Sdk; +using Ydb.Sdk.Services.Sessions; using Ydb.Sdk.Services.Table; namespace slo; diff --git a/src/Ydb.Sdk/src/Retry.cs b/src/Ydb.Sdk/src/Retry.cs new file mode 100644 index 00000000..dc47eab5 --- /dev/null +++ b/src/Ydb.Sdk/src/Retry.cs @@ -0,0 +1,113 @@ +namespace Ydb.Sdk; + +public class BackoffSettings +{ + public static readonly double MaxBackoffDurationMs = TimeSpan.FromHours(1).TotalMilliseconds; + + public BackoffSettings(TimeSpan slotDuration, uint ceiling, double uncertainRation) + { + SlotDuration = slotDuration; + Ceiling = ceiling; + UncertainRatio = uncertainRation; + } + + public TimeSpan SlotDuration { get; } + public uint Ceiling { get; } + public double UncertainRatio { get; } + + public static BackoffSettings DefaultFastBackoff { get; } = new( + slotDuration: TimeSpan.FromMilliseconds(1), + ceiling: 10, + uncertainRation: 0.5 + ); + + public static BackoffSettings DefaultSlowBackoff { get; } = new( + slotDuration: TimeSpan.FromSeconds(1), + ceiling: 6, + uncertainRation: 0.5 + ); + + public TimeSpan CalcBackoff(uint attemptNumber) + { + var random = new Random(); + + var backoffSlots = 1u << (int)Math.Min(attemptNumber, Ceiling); + var maxDuration = SlotDuration * backoffSlots; + var uncertaintyRatio = Math.Max(Math.Min(UncertainRatio, 1.0), 0.0); + var uncertaintyMultiplier = random.NextDouble() * uncertaintyRatio - uncertaintyRatio + 1.0; + + var durationMs = Math.Round(maxDuration.TotalMilliseconds * uncertaintyMultiplier); + var durationFinalMs = Math.Max(Math.Min(durationMs, MaxBackoffDurationMs), 0.0); + return TimeSpan.FromMilliseconds(durationFinalMs); + } +} + +public enum Idempotency +{ + /// No retry + None, + + /// Retry only if IsIdempotent is true + Idempotent, + + /// Retry always + NonIdempotent +} + +public record RetryRule(BackoffSettings BackoffSettings, bool DeleteSession, Idempotency Idempotency); + +public class RetrySettings +{ + public RetrySettings( + uint maxAttempts = 10, + BackoffSettings? fastBackoff = null, + BackoffSettings? slowBackoff = null) + { + MaxAttempts = maxAttempts; + FastBackoff = fastBackoff ?? BackoffSettings.DefaultFastBackoff; + SlowBackoff = slowBackoff ?? BackoffSettings.DefaultSlowBackoff; + } + + public uint MaxAttempts { get; } + public BackoffSettings FastBackoff { get; } + public BackoffSettings SlowBackoff { get; } + + public bool IsIdempotent { get; set; } + + private static readonly BackoffSettings NoBackoff = new(TimeSpan.Zero, 10, 0.5); + + public RetryRule GetRetryRule(StatusCode statusCode) + { + return statusCode switch + { + StatusCode.Unspecified => new RetryRule(NoBackoff, false, Idempotency.None), + StatusCode.BadRequest => new RetryRule(NoBackoff, false, Idempotency.None), + StatusCode.Unauthorized => new RetryRule(NoBackoff, false, Idempotency.None), + StatusCode.InternalError => new RetryRule(NoBackoff, false, Idempotency.None), + StatusCode.Aborted => new RetryRule(FastBackoff, false, Idempotency.NonIdempotent), + StatusCode.Unavailable => new RetryRule(FastBackoff, false, Idempotency.NonIdempotent), + StatusCode.Overloaded => new RetryRule(SlowBackoff, false, Idempotency.NonIdempotent), + StatusCode.SchemeError => new RetryRule(NoBackoff, false, Idempotency.None), + StatusCode.GenericError => new RetryRule(NoBackoff, false, Idempotency.None), + StatusCode.Timeout => new RetryRule(NoBackoff, false, Idempotency.None), + StatusCode.BadSession => new RetryRule(NoBackoff, true, Idempotency.NonIdempotent), + StatusCode.PreconditionFailed => new RetryRule(NoBackoff, false, Idempotency.None), + StatusCode.AlreadyExists => new RetryRule(NoBackoff, false, Idempotency.None), + StatusCode.NotFound => new RetryRule(NoBackoff, false, Idempotency.None), + StatusCode.SessionExpired => new RetryRule(NoBackoff, true, Idempotency.None), + StatusCode.Cancelled => new RetryRule(FastBackoff, false, Idempotency.None), + StatusCode.Undetermined => new RetryRule(FastBackoff, false, Idempotency.Idempotent), + StatusCode.Unsupported => new RetryRule(NoBackoff, false, Idempotency.None), + StatusCode.SessionBusy => new RetryRule(FastBackoff, true, Idempotency.NonIdempotent), + StatusCode.Success => new RetryRule(NoBackoff, false, Idempotency.None), + StatusCode.ClientResourceExhausted => new RetryRule(SlowBackoff, false, Idempotency.NonIdempotent), + StatusCode.ClientInternalError => new RetryRule(FastBackoff, true, Idempotency.Idempotent), + StatusCode.ClientTransportUnknown => new RetryRule(NoBackoff, true, Idempotency.None), + StatusCode.ClientTransportUnavailable => new RetryRule(FastBackoff, true, Idempotency.Idempotent), + StatusCode.ClientTransportTimeout => new RetryRule(FastBackoff, true, Idempotency.Idempotent), + StatusCode.ClientTransportResourceExhausted => new RetryRule(SlowBackoff, true, Idempotency.NonIdempotent), + StatusCode.ClientTransportUnimplemented => new RetryRule(NoBackoff, true, Idempotency.None), + _ => throw new ArgumentOutOfRangeException(nameof(statusCode), statusCode, null) + }; + } +} diff --git a/src/Ydb.Sdk/src/Services/Query/QueryClient.cs b/src/Ydb.Sdk/src/Services/Query/QueryClient.cs new file mode 100644 index 00000000..f838d7ba --- /dev/null +++ b/src/Ydb.Sdk/src/Services/Query/QueryClient.cs @@ -0,0 +1,366 @@ +using Microsoft.Extensions.Logging; +using Ydb.Sdk.Client; +using Ydb.Sdk.Services.Sessions; +using Ydb.Sdk.Value; + +namespace Ydb.Sdk.Services.Query; + +public class QueryClientConfig +{ + public SessionPoolConfig SessionPoolConfig { get; } + + public QueryClientConfig( + SessionPoolConfig? sessionPoolConfig = null) + { + SessionPoolConfig = sessionPoolConfig ?? new SessionPoolConfig(); + } +} + +public class QueryClient : QueryClientGrpc, IDisposable +{ + private readonly ISessionPool _sessionPool; + private readonly ILogger _logger; + private bool _disposed; + + public QueryClient(Driver driver, QueryClientConfig? config = null) : base(driver) + { + config ??= new QueryClientConfig(); + + _logger = Driver.LoggerFactory.CreateLogger(); + + _sessionPool = new SessionPool(driver, config.SessionPoolConfig); + } + + internal QueryClient(Driver driver, ISessionPool sessionPool) : base(driver) + { + _logger = driver.LoggerFactory.CreateLogger(); + + _sessionPool = sessionPool; + } + + private async Task ExecOnSession( + Func> func, + RetrySettings? retrySettings = null + ) + { + if (_sessionPool is not SessionPool sessionPool) + { + throw new InvalidCastException( + $"Unexpected cast error: {nameof(_sessionPool)} is not object of type {typeof(SessionPool).FullName}"); + } + + return await sessionPool.ExecOnSession(func, retrySettings); + } + + internal static async Task EmptyStreamReadFunc(ExecuteQueryStream stream) + { + while (await stream.Next()) + { + stream.Response.EnsureSuccess(); + } + + return None.Instance; + } + + public async Task> Query( + string queryString, + Dictionary? parameters, + Func> func, + ITxModeSettings? txModeSettings = null, + ExecuteQuerySettings? executeQuerySettings = null, + RetrySettings? retrySettings = null) + { + parameters ??= new Dictionary(); + txModeSettings ??= new TxModeSerializableSettings(); + executeQuerySettings ??= new ExecuteQuerySettings(); + + var response = await ExecOnSession( + async session => + { + var tx = Tx.Begin(txModeSettings, this, session.Id); + return await tx.Query(queryString, parameters, func, executeQuerySettings); + }, + retrySettings + ); + return response switch + { + QueryResponseWithResult queryResponseWithResult => queryResponseWithResult, + _ => throw new InvalidCastException( + $"Unexpected cast error: {nameof(response)} is not object of type {typeof(QueryResponseWithResult).FullName}") + }; + } + + public async Task> Query( + string queryString, + Func> func, + ITxModeSettings? txModeSettings = null, + ExecuteQuerySettings? executeQuerySettings = null, + RetrySettings? retrySettings = null) + { + return await Query(queryString, new Dictionary(), func, txModeSettings, executeQuerySettings, + retrySettings); + } + + public async Task Exec(string queryString, + Dictionary? parameters = null, + ITxModeSettings? txModeSettings = null, + ExecuteQuerySettings? executeQuerySettings = null, + RetrySettings? retrySettings = null) + { + var response = await Query( + queryString, + parameters, + async session => await EmptyStreamReadFunc(session), + txModeSettings, + executeQuerySettings, + retrySettings); + return response; + } + + internal static async Task>> ReadAllResultSetsHelper( + ExecuteQueryStream stream) + { + var resultSets = new List>(); + await foreach (var part in stream) + { + if (part.ResultSet is null) continue; + while (resultSets.Count <= part.ResultSetIndex) + { + resultSets.Add(new List()); + } + + resultSets[(int)part.ResultSetIndex].AddRange(part.ResultSet.Rows); + } + + return resultSets; + } + + internal static async Task> ReadAllRowsHelper(ExecuteQueryStream stream) + { + var resultSets = await ReadAllResultSetsHelper(stream); + if (resultSets.Count > 1) + { + throw new QueryWrongResultFormatException("Should be only one resultSet"); + } + + return resultSets[0]; + } + + internal static async Task ReadSingleRowHelper(ExecuteQueryStream stream) + { + Value.ResultSet.Row? row = null; + await foreach (var part in stream) + { + if (row is null && part.ResultSet is null) + { + throw new QueryWrongResultFormatException("ResultSet is null"); + } + + if (part.ResultSet is not null) + { + if (row is not null || part.ResultSet.Rows.Count != 1) + { + throw new QueryWrongResultFormatException("ResultSet should contain exactly one row"); + } + + row = part.ResultSet.Rows[0]; + } + } + + return row!; + } + + internal static async Task ReadScalarHelper(ExecuteQueryStream stream) + { + var row = await ReadSingleRowHelper(stream); + if (row.ColumnCount != 1) + { + throw new QueryWrongResultFormatException("Row should contain exactly one field"); + } + + return row[0]; + } + + public async Task>>> ReadAllResultSets( + string queryString, + Dictionary? parameters = null, + ITxModeSettings? txModeSettings = null, + ExecuteQuerySettings? executeQuerySettings = null, + RetrySettings? retrySettings = null) + { + var response = await Query(queryString, parameters, ReadAllResultSetsHelper, txModeSettings, + executeQuerySettings, retrySettings); + return response; + } + + + public async Task>> ReadAllRows(string queryString, + Dictionary? parameters = null, + ITxModeSettings? txModeSettings = null, + ExecuteQuerySettings? executeQuerySettings = null, + RetrySettings? retrySettings = null) + { + var response = await Query(queryString, parameters, ReadAllRowsHelper, txModeSettings, + executeQuerySettings, retrySettings); + return response; + } + + + public async Task> ReadSingleRow(string queryString, + Dictionary? parameters = null, + ITxModeSettings? txModeSettings = null, + ExecuteQuerySettings? executeQuerySettings = null, + RetrySettings? retrySettings = null) + { + var response = await Query(queryString, parameters, ReadSingleRowHelper, txModeSettings, + executeQuerySettings, retrySettings); + return response; + } + + public async Task> ReadScalar(string queryString, + Dictionary? parameters = null, + ITxModeSettings? txModeSettings = null, + ExecuteQuerySettings? executeQuerySettings = null, + RetrySettings? retrySettings = null) + { + var response = await Query(queryString, parameters, ReadScalarHelper, txModeSettings, + executeQuerySettings, retrySettings); + return response; + } + + private async Task> Rollback(Session session, Tx tx, Status status) + { + _logger.LogTrace($"Transaction {tx.TxId} not committed, try to rollback"); + try + { + var rollbackResponse = await RollbackTransaction(session.Id, tx); + rollbackResponse.EnsureSuccess(); + } + catch (StatusUnsuccessfulException e) + { + _logger.LogError($"Transaction {tx.TxId} rollback not successful {e.Status}"); + return new QueryResponseWithResult(e.Status); + } + + return new QueryResponseWithResult(status); + } + + public async Task> DoTx(Func> func, + ITxModeSettings? txModeSettings = null, + RetrySettings? retrySettings = null) + { + var response = await ExecOnSession( + async session => + { + var beginTransactionResponse = + await BeginTransaction(session.Id, Tx.Begin(txModeSettings, this, session.Id)); + beginTransactionResponse.EnsureSuccess(); + var tx = beginTransactionResponse.Tx!; + + T response; + try + { + response = await func(tx); + } + catch (StatusUnsuccessfulException e) + { + var rollbackResponse = await Rollback(session, tx, e.Status); + return rollbackResponse; + } + catch (Exception e) + { + var status = new Status( + StatusCode.ClientInternalError, + $"Failed to execute lambda on tx {tx.TxId}: {e.Message}"); + var rollbackResponse = await Rollback(session, tx, status); + return rollbackResponse; + } + + var commitResponse = await CommitTransaction(session.Id, tx); + if (!commitResponse.Status.IsSuccess) + { + var rollbackResponse = await Rollback(session, tx, commitResponse.Status); + return rollbackResponse; + } + + return response switch + { + None => new QueryResponseWithResult(Status.Success), + _ => new QueryResponseWithResult(Status.Success, response) + }; + }, + retrySettings + ); + return response switch + { + QueryResponseWithResult queryResponseWithResult => queryResponseWithResult, + _ => throw new InvalidCastException( + $"Unexpected cast error: {nameof(response)} is not object of type {typeof(QueryResponseWithResult).FullName}") + }; + } + + public async Task DoTx(Func func, + ITxModeSettings? txModeSettings = null, + RetrySettings? retrySettings = null) + { + var response = await DoTx( + async tx => + { + await func(tx); + return None.Instance; + }, + txModeSettings, + retrySettings + ); + return response; + } + + internal record None + { + internal static readonly None Instance = new(); + } + + public void Dispose() + { + Dispose(true); + } + + private void Dispose(bool disposing) + { + if (_disposed) + { + return; + } + + if (disposing) + { + _sessionPool.Dispose(); + } + + _disposed = true; + } +} + +public class QueryResponse : ResponseBase +{ + public QueryResponse(Status status) : base(status) + { + } +} + +public sealed class QueryResponseWithResult : QueryResponse +{ + public readonly TResult? Result; + + public QueryResponseWithResult(Status status, TResult? result = default) : base(status) + { + Result = result; + } +} + +public class QueryWrongResultFormatException : Exception +{ + public QueryWrongResultFormatException(string message) : base(message) + { + } +} diff --git a/src/Ydb.Sdk/src/Services/Query/QueryClientGrpc.cs b/src/Ydb.Sdk/src/Services/Query/QueryClientGrpc.cs new file mode 100644 index 00000000..5efa940c --- /dev/null +++ b/src/Ydb.Sdk/src/Services/Query/QueryClientGrpc.cs @@ -0,0 +1,183 @@ +using Microsoft.Extensions.Logging; +using Ydb.Query; +using Ydb.Query.V1; +using Ydb.Sdk.Client; +using Ydb.Sdk.Value; + +namespace Ydb.Sdk.Services.Query; + +public abstract class QueryClientGrpc : + ClientBase +{ + private protected QueryClientGrpc(Driver driver) : base(driver) + { + Driver.LoggerFactory.CreateLogger(); + } + + internal async Task CreateSession(CreateSessionSettings? settings = null) + { + settings ??= new CreateSessionSettings(); + var request = new CreateSessionRequest(); + + try + { + var response = await Driver.UnaryCall( + method: QueryService.CreateSessionMethod, + request: request, + settings: settings); + + var status = Status.FromProto(response.Data.Status, response.Data.Issues); + + CreateSessionResponse.ResultData? result = null; + + if (status.IsSuccess) + { + result = CreateSessionResponse.ResultData.FromProto(response.Data, Driver, response.UsedEndpoint); + } + + return new CreateSessionResponse(status, result); + } + catch (Driver.TransportException e) + { + return new CreateSessionResponse(e.Status); + } + } + + internal async Task DeleteSession(string sessionId, DeleteSessionSettings? settings = null) + { + settings ??= new DeleteSessionSettings(); + var request = new DeleteSessionRequest + { + SessionId = sessionId + }; + + try + { + var response = await Driver.UnaryCall( + method: QueryService.DeleteSessionMethod, + request: request, + settings: settings); + + + return DeleteSessionResponse.FromProto(response.Data); + } + catch (Driver.TransportException e) + { + return new DeleteSessionResponse(e.Status); + } + } + + internal SessionStateStream AttachSession(string sessionId, AttachSessionSettings? settings = null) + { + settings ??= new AttachSessionSettings { TransportTimeout = TimeSpan.FromDays(1) }; + + var request = new AttachSessionRequest { SessionId = sessionId }; + + var streamIterator = Driver.StreamCall( + method: QueryService.AttachSessionMethod, + request: request, + settings: settings + ); + return new SessionStateStream(streamIterator); + } + + private protected async Task BeginTransaction( + string sessionId, + Tx tx, + BeginTransactionSettings? settings = null) + { + settings ??= new BeginTransactionSettings(); + + var request = new BeginTransactionRequest { SessionId = sessionId, TxSettings = tx.ToProto().BeginTx }; + try + { + var response = await Driver.UnaryCall( + QueryService.BeginTransactionMethod, + request: request, + settings: settings + ); + return BeginTransactionResponse.FromProto(response.Data, this, sessionId); + } + catch (Driver.TransportException e) + { + return new BeginTransactionResponse(e.Status); + } + } + + private protected async Task CommitTransaction( + string sessionId, + Tx tx, + CommitTransactionSettings? settings = null) + { + settings ??= new CommitTransactionSettings(); + + var request = new CommitTransactionRequest { SessionId = sessionId, TxId = tx.TxId }; + + try + { + var response = await Driver.UnaryCall( + QueryService.CommitTransactionMethod, + request: request, + settings: settings + ); + return CommitTransactionResponse.FromProto(response.Data); + } + catch (Driver.TransportException e) + { + return new CommitTransactionResponse(e.Status); + } + } + + private protected async Task RollbackTransaction( + string sessionId, + Tx tx, + RollbackTransactionSettings? settings = null) + { + settings ??= new RollbackTransactionSettings(); + + var request = new RollbackTransactionRequest { SessionId = sessionId, TxId = tx.TxId }; + try + { + var response = await Driver.UnaryCall( + QueryService.RollbackTransactionMethod, + request: request, + settings: settings + ); + return RollbackTransactionResponse.FromProto(response.Data); + } + catch (Driver.TransportException e) + { + return new RollbackTransactionResponse(e.Status); + } + } + + + protected internal ExecuteQueryStream ExecuteQuery( + string sessionId, + string queryString, + Tx tx, + IReadOnlyDictionary? parameters, + ExecuteQuerySettings? settings = null) + { + settings ??= new ExecuteQuerySettings(); + parameters ??= new Dictionary(); + + var request = new ExecuteQueryRequest + { + SessionId = sessionId, + ExecMode = (Ydb.Query.ExecMode)settings.ExecMode, + TxControl = tx.ToProto(), + QueryContent = new QueryContent { Syntax = (Ydb.Query.Syntax)settings.Syntax, Text = queryString }, + StatsMode = (Ydb.Query.StatsMode)settings.StatsMode + }; + + request.Parameters.Add(parameters.ToDictionary(p => p.Key, p => p.Value.GetProto())); + + var streamIterator = Driver.StreamCall( + method: QueryService.ExecuteQueryMethod, + request: request, + settings: settings); + + return new ExecuteQueryStream(streamIterator); + } +} diff --git a/src/Ydb.Sdk/src/Services/Query/QueryGrpcData.cs b/src/Ydb.Sdk/src/Services/Query/QueryGrpcData.cs new file mode 100644 index 00000000..f3d3e52b --- /dev/null +++ b/src/Ydb.Sdk/src/Services/Query/QueryGrpcData.cs @@ -0,0 +1,291 @@ +using Ydb.Query; +using Ydb.Sdk.Client; + +namespace Ydb.Sdk.Services.Query; + +public enum ExecMode +{ + Unspecified = 0, + Parse = 10, + Validate = 20, + Explain = 30, + + // reserved 40; // EXEC_MODE_PREPARE + Execute = 50 +} + +public enum Syntax +{ + Unspecified = 0, + + /// + /// YQL + /// + YqlV1 = 1, + + /// + /// PostgresQL + /// + Pg = 2 +} + +public enum StatsMode +{ + Unspecified = 0, + + /// + /// Stats collection is disabled + /// + None = 10, + + /// + /// Aggregated stats of reads, updates and deletes per table + /// + Basic = 20, + + /// + /// Add execution stats and plan on top of STATS_MODE_BASIC + /// + Full = 30, + + /// + /// Detailed execution stats including stats for individual tasks and channels + /// + Profile = 40 +} + +public class ExecuteQuerySettings : RequestSettings +{ + public ExecMode ExecMode { get; set; } = ExecMode.Execute; + public Syntax Syntax { get; set; } + + public StatsMode StatsMode { get; set; } +} + +internal class CreateSessionSettings : RequestSettings +{ +} + +internal class DeleteSessionSettings : RequestSettings +{ +} + +internal class AttachSessionSettings : RequestSettings +{ +} + +internal class BeginTransactionSettings : RequestSettings +{ +} + +internal class CommitTransactionSettings : RequestSettings +{ +} + +internal class RollbackTransactionSettings : RequestSettings +{ +} + +internal class CreateSessionResponse : ResponseWithResultBase +{ + internal CreateSessionResponse(Status status, ResultData? result = null) + : base(status, result) + { + } + + public class ResultData + { + private ResultData(Session session) + { + Session = session; + } + + public Session Session { get; } + + internal static ResultData FromProto(Ydb.Query.CreateSessionResponse resultProto, Driver driver, + string endpoint) + { + var session = new Session( + driver: driver, + sessionPool: null, + id: resultProto.SessionId, + nodeId: resultProto.NodeId, + endpoint: endpoint); + + return new ResultData( + session: session + ); + } + } +} + +internal class DeleteSessionResponse : ResponseBase +{ + internal DeleteSessionResponse(Status status) : base(status) + { + } + + private DeleteSessionResponse(Ydb.Query.DeleteSessionResponse proto) + : base(Status.FromProto(proto.Status, proto.Issues)) + { + } + + internal static DeleteSessionResponse FromProto(Ydb.Query.DeleteSessionResponse proto) + { + return new DeleteSessionResponse(proto); + } +} + +internal class SessionState : ResponseBase +{ + internal SessionState(Status status) : base(status) + { + } + + private SessionState(Ydb.Query.SessionState proto) + : base(Status.FromProto(proto.Status, proto.Issues)) + { + } + + internal static SessionState FromProto(Ydb.Query.SessionState proto) + { + return new SessionState(proto); + } +} + +internal class SessionStateStream : StreamResponse +{ + internal SessionStateStream(Driver.StreamIterator iterator) : base(iterator) + { + } + + protected override SessionState MakeResponse(Ydb.Query.SessionState protoResponse) + { + return SessionState.FromProto(protoResponse); + } + + protected override SessionState MakeResponse(Status status) + { + return new SessionState(status); + } +} + +public class ExecuteQueryResponsePart : ResponseBase +{ + public readonly long ResultSetIndex; + public readonly Value.ResultSet? ResultSet; + + internal ExecuteQueryResponsePart(Status status) : base(status) + { + } + + private ExecuteQueryResponsePart(Ydb.Query.ExecuteQueryResponsePart proto) + : base(Status.FromProto(proto.Status, proto.Issues)) + { + ResultSetIndex = proto.ResultSetIndex; + if (proto.ResultSet is not null) + { + ResultSet = Value.ResultSet.FromProto(proto.ResultSet); + } + } + + internal static ExecuteQueryResponsePart FromProto(Ydb.Query.ExecuteQueryResponsePart proto) + { + return new ExecuteQueryResponsePart(proto); + } +} + +public class ExecuteQueryStream : StreamResponse + , IAsyncEnumerable +{ + internal ExecuteQueryStream(Driver.StreamIterator iterator) : base(iterator) + { + } + + public new async Task Next() + { + var isNext = await base.Next(); + if (isNext) + { + Response.EnsureSuccess(); + } + + return isNext; + } + + protected override ExecuteQueryResponsePart MakeResponse(Ydb.Query.ExecuteQueryResponsePart protoResponse) + { + return ExecuteQueryResponsePart.FromProto(protoResponse); + } + + protected override ExecuteQueryResponsePart MakeResponse(Status status) + { + return new ExecuteQueryResponsePart(status); + } + + public async IAsyncEnumerator GetAsyncEnumerator( + CancellationToken cancellationToken = new()) + { + while (await Next()) + { + yield return Response; + } + } +} + +internal class BeginTransactionResponse : ResponseBase +{ + internal BeginTransactionResponse(Status status) : base(status) + { + } + + internal Tx? Tx { get; } + + private BeginTransactionResponse(Ydb.Query.BeginTransactionResponse proto, QueryClientGrpc client, string sessionId) + : base( + Status.FromProto(proto.Status, proto.Issues)) + { + var txId = proto.TxMeta.Id; + Tx = new Tx(new TransactionControl { TxId = txId }, client, sessionId); + } + + internal static BeginTransactionResponse FromProto(Ydb.Query.BeginTransactionResponse proto, QueryClientGrpc client, + string sessionId) + { + return new BeginTransactionResponse(proto, client, sessionId); + } +} + +internal class CommitTransactionResponse : ResponseBase +{ + internal CommitTransactionResponse(Status status) : base(status) + { + } + + private CommitTransactionResponse(Ydb.Query.CommitTransactionResponse proto) : base( + Status.FromProto(proto.Status, proto.Issues)) + { + } + + internal static CommitTransactionResponse FromProto(Ydb.Query.CommitTransactionResponse proto) + { + return new CommitTransactionResponse(proto); + } +} + +internal class RollbackTransactionResponse : ResponseBase +{ + internal RollbackTransactionResponse(Status status) : base(status) + { + } + + private RollbackTransactionResponse(Ydb.Query.RollbackTransactionResponse proto) : base( + Status.FromProto(proto.Status, proto.Issues)) + { + } + + internal static RollbackTransactionResponse FromProto(Ydb.Query.RollbackTransactionResponse proto) + { + return new RollbackTransactionResponse(proto); + } +} diff --git a/src/Ydb.Sdk/src/Services/Query/Session.cs b/src/Ydb.Sdk/src/Services/Query/Session.cs new file mode 100644 index 00000000..cbd5372e --- /dev/null +++ b/src/Ydb.Sdk/src/Services/Query/Session.cs @@ -0,0 +1,54 @@ +using Microsoft.Extensions.Logging; +using Ydb.Sdk.Services.Sessions; + +namespace Ydb.Sdk.Services.Query; + +/// +/// Sessions are basic primitives for communicating with YDB Query Service. The are similar to +/// connections for classic relational DBs. Sessions serve three main purposes: +/// 1. Provide a flow control for DB requests with limited number of active channels. +/// 2. Distribute load evenly across multiple DB nodes. +/// 3. Store state for volatile stateful operations, such as short-living transactions. +/// +public class Session : SessionBase +{ + private readonly SessionPool? _sessionPool; + + internal Session(Driver driver, SessionPool? sessionPool, string id, long nodeId, string? endpoint) + : base(driver, id, endpoint, driver.LoggerFactory.CreateLogger()) + { + _sessionPool = sessionPool; + NodeId = nodeId; + } + + public long NodeId { get; } + + + protected override void Dispose(bool disposing) + { + if (Disposed) + { + return; + } + + if (disposing) + { + if (_sessionPool is null) + { + Logger.LogTrace($"Closing detached session on dispose: {Id}"); + + var client = new QueryClient(Driver, new NoPool()); + _ = client.DeleteSession(Id, new DeleteSessionSettings + { + TransportTimeout = DeleteSessionTimeout + }); + } + else + { + _sessionPool.ReturnSession(Id); + } + } + + Disposed = true; + } +} diff --git a/src/Ydb.Sdk/src/Services/Query/SessionPool.cs b/src/Ydb.Sdk/src/Services/Query/SessionPool.cs new file mode 100644 index 00000000..467b29dd --- /dev/null +++ b/src/Ydb.Sdk/src/Services/Query/SessionPool.cs @@ -0,0 +1,165 @@ +using Microsoft.Extensions.Logging; +using Ydb.Sdk.Services.Sessions; + +namespace Ydb.Sdk.Services.Query; + +using GetSessionResponse = GetSessionResponse; +using NoPool = NoPool; + +internal class SessionPool : SessionPoolBase +{ + private readonly Dictionary _attachedSessions = new(); + + public SessionPool(Driver driver, SessionPoolConfig config) : + base( + driver: driver, + config: config, + client: new QueryClient(driver, new NoPool()), + logger: driver.LoggerFactory.CreateLogger()) + { + } + + private protected override async Task CreateSession() + { + var createSessionResponse = await Client.CreateSession(new CreateSessionSettings + { TransportTimeout = Config.CreateSessionTimeout }); + + lock (Lock) + { + PendingSessions--; + if (createSessionResponse.Status.IsSuccess) + { + var session = new Session( + driver: Driver, + sessionPool: this, + id: createSessionResponse.Result.Session.Id, + nodeId: createSessionResponse.Result.Session.NodeId, + endpoint: createSessionResponse.Result.Session.Endpoint); + + Sessions.Add(session.Id, new SessionState(session)); + + _ = Task.Run(() => AttachAndMonitor(session.Id)); + + + Logger.LogTrace($"Session {session.Id} created, " + + $"endpoint: {session.Endpoint}, " + + $"nodeId: {session.NodeId}"); + return new GetSessionResponse(createSessionResponse.Status, session); + } + + Logger.LogWarning($"Failed to create session: {createSessionResponse.Status}"); + return new GetSessionResponse(createSessionResponse.Status); + } + } + + private async Task AttachAndMonitor(string sessionId) + { + var stream = Client.AttachSession(sessionId); + + var cts = new CancellationTokenSource(); + cts.CancelAfter(Config.CreateSessionTimeout); + + var firstPartTask = Task.Run(async () => + { + if (await stream.Next()) + { + return stream.Response; + } + + return null; + }, cts.Token); + + var firstPart = await firstPartTask; + + if (firstPartTask.IsCanceled || firstPart is null) + { + InvalidateSession(sessionId); + return; + } + + CheckPart(firstPart, sessionId); + + cts = new CancellationTokenSource(); + + var monitorTask = Task.Run(async () => await Monitor(sessionId, stream), cts.Token); + lock (Lock) + { + _attachedSessions.Add(sessionId, cts); + } + + await monitorTask; + lock (Lock) + { + _attachedSessions.Remove(sessionId); + } + } + + private async Task Monitor(string sessionId, SessionStateStream stream) + { + while (await stream.Next()) + { + var part = stream.Response; + if (!CheckPart(part, sessionId)) + { + break; + } + } + } + + private bool CheckPart(Query.SessionState part, string sessionId) + { + if (part.Status.IsSuccess) + { + Logger.LogTrace($"Successful stream response for session: {sessionId}"); + + lock (Lock) + { + if (Sessions.TryGetValue(sessionId, out var sessionState)) + { + sessionState.LastAccessTime = DateTime.Now; + } + } + + return true; + } + + InvalidateSession(sessionId); + return false; + } + + private new void InvalidateSession(string id) + { + DetachSession(id); + base.InvalidateSession(id); + } + + private void DetachSession(string id) + { + lock (Lock) + { + _attachedSessions.Remove(id, out var cts); + cts?.Cancel(); + Logger.LogInformation($"Session detached: {id}"); + } + } + + private protected override Session CopySession(Session other) + { + return new Session( + driver: Driver, + sessionPool: this, + id: other.Id, + nodeId: other.NodeId, + endpoint: other.Endpoint); + } + + private protected override async Task DeleteSession(string id) + { + DetachSession(id); + + await Client.DeleteSession(id, new DeleteSessionSettings + { + TransportTimeout = SessionBase.DeleteSessionTimeout + }); + } +} diff --git a/src/Ydb.Sdk/src/Services/Query/Tx.cs b/src/Ydb.Sdk/src/Services/Query/Tx.cs new file mode 100644 index 00000000..63332824 --- /dev/null +++ b/src/Ydb.Sdk/src/Services/Query/Tx.cs @@ -0,0 +1,177 @@ +using Ydb.Query; +using Ydb.Sdk.Value; + +namespace Ydb.Sdk.Services.Query; + +public interface ITxModeSettings +{ +} + +public class TxModeSerializableSettings : ITxModeSettings +{ +} + +public class TxModeOnlineSettings : ITxModeSettings +{ + public TxModeOnlineSettings(bool allowInconsistentReads = false) + { + AllowInconsistentReads = allowInconsistentReads; + } + + public bool AllowInconsistentReads { get; } +} + +public class TxModeStaleSettings : ITxModeSettings +{ +} + +public class TxModeSnapshotSettings : ITxModeSettings +{ +} + +public class Tx +{ + private QueryClientGrpc Client { get; } + private string SessionId { get; } + + public string? TxId => _proto.TxId; + + private readonly TransactionControl _proto; + + internal Tx(TransactionControl proto, QueryClientGrpc client, string sessionId) + { + _proto = proto; + Client = client; + SessionId = sessionId; + } + + internal TransactionControl ToProto() + { + return _proto.Clone(); + } + + internal static Tx Begin(ITxModeSettings? txModeSettings, QueryClient client, string sessionId, bool commit = true) + { + txModeSettings ??= new TxModeSerializableSettings(); + + var txSettings = GetTransactionSettings(txModeSettings); + + var tx = new Tx(new TransactionControl { BeginTx = txSettings, CommitTx = commit }, client, sessionId); + return tx; + } + + private static TransactionSettings GetTransactionSettings(ITxModeSettings txModeSettings) + { + var txSettings = txModeSettings switch + { + TxModeSerializableSettings => new TransactionSettings + { + SerializableReadWrite = new SerializableModeSettings() + }, + TxModeOnlineSettings onlineModeSettings => new TransactionSettings + { + OnlineReadOnly = new OnlineModeSettings + { + AllowInconsistentReads = onlineModeSettings.AllowInconsistentReads + } + }, + TxModeStaleSettings => new TransactionSettings + { + StaleReadOnly = new StaleModeSettings() + }, + TxModeSnapshotSettings => new TransactionSettings + { + SnapshotReadOnly = new SnapshotModeSettings() + }, + _ => throw new InvalidCastException(nameof(txModeSettings)) + }; + return txSettings; + } + + public async Task> Query( + string queryString, + Dictionary? parameters, + Func> func, + ExecuteQuerySettings? executeQuerySettings = null) + { + if (SessionId is null) + { + throw new NullReferenceException($"{nameof(SessionId)} is null"); + } + + var stream = Client.ExecuteQuery(SessionId, queryString, this, parameters, + executeQuerySettings); + try + { + var response = await func(stream); + return response is QueryClient.None + ? new QueryResponseWithResult(Status.Success) + : new QueryResponseWithResult(Status.Success, response); + } + catch (StatusUnsuccessfulException e) + { + return new QueryResponseWithResult(e.Status); + } + } + + public async Task> Query(string queryString, Func> func, + ExecuteQuerySettings? executeQuerySettings = null) + { + return await Query(queryString, new Dictionary(), func, executeQuerySettings); + } + + public async Task Exec(string queryString, + Dictionary? parameters = null + , ExecuteQuerySettings? executeQuerySettings = null) + { + var response = await Query( + queryString, + parameters, + async session => + { + await QueryClient.EmptyStreamReadFunc(session); + return QueryClient.None.Instance; + }, + executeQuerySettings); + return response; + } + + public async Task>>> ReadAllResultSets( + string queryString, + Dictionary? parameters = null, + ExecuteQuerySettings? executeQuerySettings = null) + { + var response = await Query(queryString, parameters, QueryClient.ReadAllResultSetsHelper, + executeQuerySettings); + return response; + } + + public async Task>> ReadAllRows( + string queryString, + Dictionary? parameters = null, + ExecuteQuerySettings? executeQuerySettings = null) + { + var response = await Query(queryString, parameters, QueryClient.ReadAllRowsHelper, executeQuerySettings); + return response; + } + + public async Task> ReadSingleRow( + string queryString, + Dictionary? parameters = null, + ExecuteQuerySettings? executeQuerySettings = null) + { + var response = await Query(queryString, parameters, QueryClient.ReadSingleRowHelper, + executeQuerySettings); + return response; + } + + public async Task> ReadScalar( + string queryString, + Dictionary? parameters = null, + ExecuteQuerySettings? executeQuerySettings = null) + { + var response = await Query(queryString, parameters, QueryClient.ReadScalarHelper, + executeQuerySettings); + return response; + } +} diff --git a/src/Ydb.Sdk/src/Services/Sessions/SessionBase.cs b/src/Ydb.Sdk/src/Services/Sessions/SessionBase.cs new file mode 100644 index 00000000..837de4bb --- /dev/null +++ b/src/Ydb.Sdk/src/Services/Sessions/SessionBase.cs @@ -0,0 +1,39 @@ +using Microsoft.Extensions.Logging; +using Ydb.Sdk.Client; + +namespace Ydb.Sdk.Services.Sessions; + +public abstract class SessionBase : ClientBase, IDisposable +{ + internal static readonly TimeSpan DeleteSessionTimeout = TimeSpan.FromSeconds(1); + + + public string Id { get; } + internal string? Endpoint { get; } + + private protected bool Disposed; + protected readonly ILogger Logger; + + + protected SessionBase(Driver driver, string id, string? endpoint, ILogger logger) : base(driver) + { + Id = id; + Endpoint = endpoint; + Logger = logger; + } + + private protected void CheckSession() + { + if (Disposed) + { + throw new ObjectDisposedException(GetType().FullName); + } + } + + public void Dispose() + { + Dispose(true); + } + + protected abstract void Dispose(bool disposing); +} diff --git a/src/Ydb.Sdk/src/Services/Sessions/SessionPoolBase.cs b/src/Ydb.Sdk/src/Services/Sessions/SessionPoolBase.cs new file mode 100644 index 00000000..0c60b3c6 --- /dev/null +++ b/src/Ydb.Sdk/src/Services/Sessions/SessionPoolBase.cs @@ -0,0 +1,279 @@ +using Microsoft.Extensions.Logging; +using Ydb.Sdk.Client; + +namespace Ydb.Sdk.Services.Sessions; + +public class SessionPoolConfig +{ + public SessionPoolConfig( + uint? sizeLimit = null) + { + SizeLimit = sizeLimit ?? 100; + } + + public uint SizeLimit { get; } + + public TimeSpan KeepAliveIdleThreshold { get; set; } = TimeSpan.FromMinutes(5); + public TimeSpan PeriodicCheckInterval { get; set; } = TimeSpan.FromSeconds(10); + public TimeSpan KeepAliveTimeout { get; set; } = TimeSpan.FromSeconds(1); + public TimeSpan CreateSessionTimeout { get; set; } = TimeSpan.FromSeconds(1); +} + +public class GetSessionResponse : ResponseWithResultBase, IDisposable where TSession : SessionBase +{ + internal GetSessionResponse(Status status, TSession? session = null) + : base(status, session) + { + } + + public void Dispose() + { + Dispose(true); + } + + protected void Dispose(bool disposing) + { + if (disposing) + { + Result.Dispose(); + } + } +} + +internal interface ISessionPool : IDisposable where TSession : SessionBase +{ + public Task> GetSession(); +} + +internal class NoPool : ISessionPool where TSession : SessionBase +{ + public Task> GetSession() + { + throw new InvalidOperationException("Unexpected session pool access."); + } + + public void Dispose() + { + } +} + +public abstract class SessionPoolBase : ISessionPool + where TSession : SessionBase + where TClient : ClientBase +{ + private protected readonly Driver Driver; + private protected readonly TClient Client; + private protected readonly ILogger Logger; + private protected readonly SessionPoolConfig Config; + + + protected readonly object Lock = new(); + protected bool Disposed; + + + private protected readonly Dictionary Sessions = new(); + private protected readonly Stack IdleSessions = new(); + protected uint PendingSessions; + + protected SessionPoolBase(Driver driver, SessionPoolConfig config, TClient client, ILogger logger) + { + Driver = driver; + Config = config; + Client = client; + Logger = logger; + } + + public async Task> GetSession() + { + const int maxAttempts = 100; + + GetSessionResponse getSessionResponse = null!; + for (var attempt = 0; attempt < maxAttempts; attempt++) + { + getSessionResponse = await AttemptGetSession(); + if (getSessionResponse.Status.IsSuccess) return getSessionResponse; + } + + Logger.LogError($"Failed to get session from pool or create it (attempts: {maxAttempts})"); + return getSessionResponse; + } + + private async Task> AttemptGetSession() + { + lock (Lock) + { + while (IdleSessions.Count > 0) + { + var sessionId = IdleSessions.Pop(); + + if (!Sessions.TryGetValue(sessionId, out var sessionState)) + { + continue; + } + + Logger.LogTrace($"Session removed from pool: {sessionId}"); + return new GetSessionResponse(new Status(StatusCode.Success), sessionState.Session); + } + + if (Sessions.Count + PendingSessions >= Config.SizeLimit) + { + Logger.LogWarning($"Session pool size limit exceeded" + + $", limit: {Config.SizeLimit}" + + $", pending sessions: {PendingSessions}"); + + var status = new Status(StatusCode.ClientResourceExhausted, new List + { + new("Session pool max active sessions limit exceeded.") + }); + + return new GetSessionResponse(status); + } + + ++PendingSessions; + } + + return await CreateSession(); + } + + internal void ReturnSession(string id) + { + lock (Lock) + { + if (!Sessions.TryGetValue(id, out var oldSession)) + { + return; + } + + var session = CopySession(oldSession.Session); + + Sessions[id] = new SessionState(session); + IdleSessions.Push(id); + + Logger.LogTrace($"Session returned to pool: {session.Id}"); + } + } + + private protected abstract Task> CreateSession(); + private protected abstract TSession CopySession(TSession other); + private protected abstract Task DeleteSession(string id); + + internal void InvalidateSession(string id) + { + lock (Lock) + { + Sessions.Remove(id); + Logger.LogInformation($"Session invalidated in pool: {id}"); + } + } + + public void Dispose() + { + Dispose(true); + } + + private void Dispose(bool disposing) + { + lock (Lock) + { + if (Disposed) + { + return; + } + + if (disposing) + { + var tasks = new Task[Sessions.Count]; + var i = 0; + foreach (var state in Sessions.Values) + { + Logger.LogTrace($"Closing session on session pool dispose: {state.Session.Id}"); + + var task = DeleteSession(state.Session.Id); + tasks[i++] = task; + } + + Task.WaitAll(tasks); + } + + Disposed = true; + } + } + + internal async Task ExecOnSession( + Func> func, + RetrySettings? retrySettings) + { + retrySettings ??= new RetrySettings(); + + IResponse response = new ClientInternalErrorResponse("SessionRetry, unexpected response value."); + TSession? session = null; + + try + { + for (uint attempt = 0; attempt < retrySettings.MaxAttempts; attempt++) + { + if (session is null) + { + var getSessionResponse = await GetSession(); + if (getSessionResponse.Status.IsSuccess) + { + session = getSessionResponse.Result; + } + + response = getSessionResponse; + } + + if (session is not null) + { + response = await func(session); + if (response.Status.IsSuccess) + { + return response; + } + } + + var retryRule = retrySettings.GetRetryRule(response.Status.StatusCode); + if (session is not null && retryRule.DeleteSession) + { + Logger.LogTrace($"Retry: attempt {attempt}, Session ${session.Id} invalid, disposing"); + InvalidateSession(session.Id); + session = null; + } + + if ((retryRule.Idempotency == Idempotency.Idempotent && retrySettings.IsIdempotent) || + retryRule.Idempotency == Idempotency.NonIdempotent) + { + Logger.LogTrace( + $"Retry: attempt {attempt}, Session ${session?.Id}, " + + $"idempotent error {response.Status} retrying"); + await Task.Delay(retryRule.BackoffSettings.CalcBackoff(attempt)); + } + else + { + Logger.LogTrace( + $"Retry: attempt {attempt}, Session ${session?.Id}, " + + $"not idempotent error {response.Status}"); + return response; + } + } + } + finally + { + session?.Dispose(); + } + + return response; + } + + protected class SessionState + { + public SessionState(TSession session) + { + Session = session; + LastAccessTime = DateTime.Now; + } + + public TSession Session { get; } + public DateTime LastAccessTime { get; set; } + } +} diff --git a/src/Ydb.Sdk/src/Services/Table/ExecuteDataQuery.cs b/src/Ydb.Sdk/src/Services/Table/ExecuteDataQuery.cs index 03fe2553..79a5d215 100644 --- a/src/Ydb.Sdk/src/Services/Table/ExecuteDataQuery.cs +++ b/src/Ydb.Sdk/src/Services/Table/ExecuteDataQuery.cs @@ -40,7 +40,7 @@ internal ResultData(IReadOnlyList resultSets) internal static ResultData FromProto(ExecuteQueryResult resultProto) { var resultSets = resultProto.ResultSets - .Select(r => Value.ResultSet.FromProto(r)) + .Select(Value.ResultSet.FromProto) .ToList(); return new ResultData( @@ -65,8 +65,8 @@ public async Task ExecuteDataQuery( { OperationParams = MakeOperationParams(settings), SessionId = Id, - TxControl = txControl.ToProto(_logger), - Query = new Query + TxControl = txControl.ToProto(), + Query = new Ydb.Table.Query { YqlText = query }, @@ -96,7 +96,7 @@ public async Task ExecuteDataQuery( ? TransactionState.Active : TransactionState.Void; - tx = Transaction.FromProto(resultProto.TxMeta, _logger); + tx = Transaction.FromProto(resultProto.TxMeta, Logger); } ExecuteDataQueryResponse.ResultData? result = null; diff --git a/src/Ydb.Sdk/src/Services/Table/ExecuteScanQuery.cs b/src/Ydb.Sdk/src/Services/Table/ExecuteScanQuery.cs index 5abb41c9..51e476b2 100644 --- a/src/Ydb.Sdk/src/Services/Table/ExecuteScanQuery.cs +++ b/src/Ydb.Sdk/src/Services/Table/ExecuteScanQuery.cs @@ -68,7 +68,7 @@ public ExecuteScanQueryStream ExecuteScanQuery( var request = new ExecuteScanQueryRequest { Mode = ExecuteScanQueryRequest.Types.Mode.Exec, - Query = new Query + Query = new Ydb.Table.Query { YqlText = query } diff --git a/src/Ydb.Sdk/src/Services/Table/Retry.cs b/src/Ydb.Sdk/src/Services/Table/Retry.cs index 04e070a9..e420d81f 100644 --- a/src/Ydb.Sdk/src/Services/Table/Retry.cs +++ b/src/Ydb.Sdk/src/Services/Table/Retry.cs @@ -2,137 +2,18 @@ namespace Ydb.Sdk.Services.Table; -public class BackoffSettings -{ - public static readonly double MaxBackoffDurationMs = TimeSpan.FromHours(1).TotalMilliseconds; - - public BackoffSettings(TimeSpan slotDuration, uint ceiling, double uncertainRation) - { - SlotDuration = slotDuration; - Ceiling = ceiling; - UncertainRatio = uncertainRation; - } - - public TimeSpan SlotDuration { get; } - public uint Ceiling { get; } - public double UncertainRatio { get; } - - public static BackoffSettings DefaultFastBackoff { get; } = new( - slotDuration: TimeSpan.FromMilliseconds(1), - ceiling: 10, - uncertainRation: 0.5 - ); - - public static BackoffSettings DefaultSlowBackoff { get; } = new( - slotDuration: TimeSpan.FromSeconds(1), - ceiling: 6, - uncertainRation: 0.5 - ); - - public TimeSpan CalcBackoff(uint attemptNumber) - { - var random = new Random(); - - var backoffSlots = 1u << (int)Math.Min(attemptNumber, Ceiling); - var maxDuration = SlotDuration * backoffSlots; - var uncertaintyRatio = Math.Max(Math.Min(UncertainRatio, 1.0), 0.0); - var uncertaintyMultiplier = random.NextDouble() * uncertaintyRatio - uncertaintyRatio + 1.0; - - var durationMs = Math.Round(maxDuration.TotalMilliseconds * uncertaintyMultiplier); - var durationFinalMs = Math.Max(Math.Min(durationMs, MaxBackoffDurationMs), 0.0); - return TimeSpan.FromMilliseconds(durationFinalMs); - } -} - -public class RetrySettings -{ - public RetrySettings( - uint maxAttempts = 10, - BackoffSettings? fastBackoff = null, - BackoffSettings? slowBackoff = null) - { - MaxAttempts = maxAttempts; - FastBackoff = fastBackoff ?? BackoffSettings.DefaultFastBackoff; - SlowBackoff = slowBackoff ?? BackoffSettings.DefaultSlowBackoff; - } - - public uint MaxAttempts { get; } - public BackoffSettings FastBackoff { get; } - public BackoffSettings SlowBackoff { get; } -} - public partial class TableClient { public async Task SessionExec( Func> operationFunc, RetrySettings? retrySettings = null) { - retrySettings ??= new RetrySettings(); - - IResponse response = new ClientInternalErrorResponse("SessionRetry, unexpected response value."); - Session? session = null; - try - { - for (uint attemptNumber = 0; attemptNumber < retrySettings.MaxAttempts; ++attemptNumber) - { - if (session is null) - { - var sessionResponse = await _sessionPool.GetSession(); - response = sessionResponse; - - if (sessionResponse.Status.IsSuccess) - { - session = sessionResponse.Result; - } - } - - if (session != null) - { - var operationResponse = await operationFunc(session); - if (operationResponse.Status.IsSuccess) - { - return operationResponse; - } - - response = operationResponse; - } - - switch (response.Status.StatusCode) - { - case StatusCode.Aborted: - case StatusCode.Unavailable: - await Task.Delay(retrySettings.FastBackoff.CalcBackoff(attemptNumber)); - break; - - case StatusCode.Overloaded: - case StatusCode.ClientResourceExhausted: - case StatusCode.ClientTransportResourceExhausted: - await Task.Delay(retrySettings.SlowBackoff.CalcBackoff(attemptNumber)); - break; - - case StatusCode.BadSession: - case StatusCode.SessionBusy: - if (session != null) - { - session.Dispose(); - } - - session = null; - break; - - default: - return response; - } - } - } - finally + if (_sessionPool is not SessionPool sessionPool) { - if (session != null) - { - session.Dispose(); - } + throw new InvalidCastException( + $"Unexpected cast error: {nameof(_sessionPool)} is not object of type {typeof(SessionPool).FullName}"); } - return response; + return await sessionPool.ExecOnSession(operationFunc, retrySettings); } } diff --git a/src/Ydb.Sdk/src/Services/Table/Session.cs b/src/Ydb.Sdk/src/Services/Table/Session.cs index bb70201c..36a2e75d 100644 --- a/src/Ydb.Sdk/src/Services/Table/Session.cs +++ b/src/Ydb.Sdk/src/Services/Table/Session.cs @@ -1,37 +1,19 @@ using Grpc.Core; using Microsoft.Extensions.Logging; -using ClientBase = Ydb.Sdk.Client.ClientBase; +using Ydb.Sdk.Services.Sessions; namespace Ydb.Sdk.Services.Table; -public partial class Session : ClientBase, IDisposable +public partial class Session : SessionBase { - internal static readonly TimeSpan DeleteSessionTimeout = TimeSpan.FromSeconds(1); - private readonly SessionPool? _sessionPool; - private readonly ILogger _logger; - private bool _disposed; internal Session(Driver driver, SessionPool? sessionPool, string id, string? endpoint) - : base(driver) + : base(driver, id, endpoint, driver.LoggerFactory.CreateLogger()) { _sessionPool = sessionPool; - _logger = Driver.LoggerFactory.CreateLogger(); - Id = id; - Endpoint = endpoint; } - public string Id { get; } - - internal string? Endpoint { get; } - - private void CheckSession() - { - if (_disposed) - { - throw new ObjectDisposedException(GetType().FullName); - } - } private void OnResponseStatus(Status status) { @@ -57,14 +39,10 @@ private void OnResponseTrailers(Grpc.Core.Metadata? trailers) } } - public void Dispose() - { - Dispose(true); - } - protected virtual void Dispose(bool disposing) + protected override void Dispose(bool disposing) { - if (_disposed) + if (Disposed) { return; } @@ -73,9 +51,9 @@ protected virtual void Dispose(bool disposing) { if (_sessionPool is null) { - _logger.LogTrace($"Closing detached session on dispose: {Id}"); + Logger.LogTrace($"Closing detached session on dispose: {Id}"); - var client = new TableClient(Driver, new NoPool()); + var client = new TableClient(Driver, new NoPool()); var task = client.DeleteSession(Id, new DeleteSessionSettings { TransportTimeout = DeleteSessionTimeout @@ -88,7 +66,7 @@ protected virtual void Dispose(bool disposing) } } - _disposed = true; + Disposed = true; } internal async Task> UnaryCall( diff --git a/src/Ydb.Sdk/src/Services/Table/SessionPool.cs b/src/Ydb.Sdk/src/Services/Table/SessionPool.cs index 345ce11b..0ad1a324 100644 --- a/src/Ydb.Sdk/src/Services/Table/SessionPool.cs +++ b/src/Ydb.Sdk/src/Services/Table/SessionPool.cs @@ -1,202 +1,63 @@ using Microsoft.Extensions.Logging; -using Ydb.Sdk.Client; +using Ydb.Sdk.Services.Sessions; namespace Ydb.Sdk.Services.Table; -public class SessionPoolConfig -{ - public SessionPoolConfig( - uint? sizeLimit = null, - TimeSpan? keepAliveIdleThreshold = null, - TimeSpan? periodicCheckInterval = null, - TimeSpan? keepAliveTimeout = null, - TimeSpan? createSessionTimeout = null - ) - { - SizeLimit = sizeLimit ?? 100; - KeepAliveIdleThreshold = keepAliveIdleThreshold ?? TimeSpan.FromMinutes(5); - PeriodicCheckInterval = periodicCheckInterval ?? TimeSpan.FromSeconds(10); - KeepAliveTimeout = keepAliveTimeout ?? TimeSpan.FromSeconds(1); - CreateSessionTimeout = createSessionTimeout ?? TimeSpan.FromSeconds(1); - } - - public uint SizeLimit { get; } - public TimeSpan KeepAliveIdleThreshold { get; } - public TimeSpan PeriodicCheckInterval { get; } - public TimeSpan KeepAliveTimeout { get; } - public TimeSpan CreateSessionTimeout { get; } -} - -internal class GetSessionResponse : ResponseWithResultBase, IDisposable -{ - internal GetSessionResponse(Status status, Session? session = null) - : base(status, session) - { - } - - public void Dispose() - { - Dispose(true); - } - - protected virtual void Dispose(bool disposing) - { - if (disposing) - { - Result.Dispose(); - } - } -} - -internal interface ISessionPool : IDisposable -{ - public Task GetSession(); -} - -internal class NoPool : ISessionPool -{ - public Task GetSession() - { - throw new InvalidOperationException("Unexpected session pool access."); - } - - public void Dispose() - { - } -} +using GetSessionResponse = GetSessionResponse; +using NoPool = NoPool; -internal sealed class SessionPool : ISessionPool +internal sealed class SessionPool : SessionPoolBase { - private readonly Driver _driver; - private readonly SessionPoolConfig _config; - - private readonly object _lock = new(); - private readonly ILogger _logger; - private readonly TableClient _client; - private bool _disposed; - - private readonly Dictionary _sessions = new(); - private readonly Stack _idleSessions = new(); - private uint _pendingSessions; - - public SessionPool(Driver driver, SessionPoolConfig config) + public SessionPool(Driver driver, SessionPoolConfig config) : + base( + driver: driver, + config: config, + client: new TableClient(driver, new NoPool()), + logger: driver.LoggerFactory.CreateLogger()) { - _driver = driver; - _config = config; - - _logger = _driver.LoggerFactory.CreateLogger(); - _client = new TableClient(_driver, new NoPool()); - Task.Run(PeriodicCheck); } - public async Task GetSession() + private protected override async Task CreateSession() { - const int maxAttempts = 100; - - GetSessionResponse getSessionResponse = null!; - for (var attempt = 0; attempt < maxAttempts; attempt++) - { - getSessionResponse = await GetSessionAttempt(); - if (getSessionResponse.Status.IsSuccess) return getSessionResponse; - } - - _logger.LogError($"Failed to get session from pool or create it (attempts: {maxAttempts})"); - return getSessionResponse; - } - - private async Task GetSessionAttempt() - { - lock (_lock) - { - while (_idleSessions.Count > 0) - { - var sessionId = _idleSessions.Pop(); - - if (!_sessions.TryGetValue(sessionId, out var sessionState)) - { - continue; - } - - _logger.LogTrace($"Session removed from pool: {sessionId}"); - return new GetSessionResponse(new Status(StatusCode.Success), sessionState.Session); - } - - if (_sessions.Count + _pendingSessions >= _config.SizeLimit) - { - _logger.LogWarning($"Session pool size limit exceeded" + - $", limit: {_config.SizeLimit}" + - $", pending sessions: {_pendingSessions}"); - - var status = new Status(StatusCode.ClientResourceExhausted, new List - { - new("Session pool max active sessions limit exceeded.") - }); - - return new GetSessionResponse(status); - } - - ++_pendingSessions; - } - - var createSessionResponse = await _client.CreateSession(new CreateSessionSettings + var createSessionResponse = await Client.CreateSession(new CreateSessionSettings { - TransportTimeout = _config.CreateSessionTimeout, - OperationTimeout = _config.CreateSessionTimeout + TransportTimeout = Config.CreateSessionTimeout, + OperationTimeout = Config.CreateSessionTimeout }); - lock (_lock) + lock (Lock) { - --_pendingSessions; + --PendingSessions; if (createSessionResponse.Status.IsSuccess) { var session = new Session( - driver: _driver, + driver: Driver, sessionPool: this, id: createSessionResponse.Result.Session.Id, endpoint: createSessionResponse.Result.Session.Endpoint); - _sessions.Add(session.Id, new SessionState(session)); + Sessions.Add(session.Id, new SessionState(session)); - _logger.LogTrace($"Session created from pool: {session.Id}, endpoint: {session.Endpoint}"); + Logger.LogTrace($"Session created from pool: {session.Id}, endpoint: {session.Endpoint}"); return new GetSessionResponse(createSessionResponse.Status, session); } - _logger.LogWarning($"Failed to create session: {createSessionResponse.Status}"); + Logger.LogWarning($"Failed to create session: {createSessionResponse.Status}"); } return new GetSessionResponse(createSessionResponse.Status); } - internal void ReturnSession(string id) + private protected override Session CopySession(Session other) { - lock (_lock) - { - if (_sessions.TryGetValue(id, out var oldSession)) - { - var session = new Session( - driver: _driver, - sessionPool: this, - id: id, - endpoint: oldSession.Session.Endpoint); - - _sessions[id] = new SessionState(session); - _idleSessions.Push(id); - - _logger.LogTrace($"Session returned to pool: {session.Id}"); - } - } - } - - internal void InvalidateSession(string id) - { - lock (_lock) - { - _sessions.Remove(id); - _logger.LogInformation($"Session invalidated in pool: {id}"); - } + return new Session( + driver: Driver, + sessionPool: this, + id: other.Id, + endpoint: other.Endpoint); } private async Task PeriodicCheck() @@ -206,35 +67,35 @@ private async Task PeriodicCheck() { try { - await Task.Delay(_config.PeriodicCheckInterval); + await Task.Delay(Config.PeriodicCheckInterval); await CheckSessions(); } catch (Exception e) { - _logger.LogError($"Unexpected exception during session pool periodic check: {e}"); + Logger.LogError($"Unexpected exception during session pool periodic check: {e}"); } - lock (_lock) + lock (Lock) { - stop = _disposed; + stop = Disposed; } } } private async Task CheckSessions() { - _logger.LogDebug($"Check sessions" + - $", sessions: {_sessions.Count}" + - $", pending sessions: {_pendingSessions}" + - $", idle sessions: {_idleSessions.Count}"); + Logger.LogDebug("Check sessions" + + $", sessions: {Sessions.Count}" + + $", pending sessions: {PendingSessions}" + + $", idle sessions: {IdleSessions.Count}"); var keepAliveIds = new List(); - lock (_lock) + lock (Lock) { - foreach (var state in _sessions.Values) + foreach (var state in Sessions.Values) { - if (state.LastAccessTime + _config.KeepAliveIdleThreshold < DateTime.Now) + if (state.LastAccessTime + Config.KeepAliveIdleThreshold < DateTime.Now) { keepAliveIds.Add(state.Session.Id); } @@ -243,20 +104,19 @@ private async Task CheckSessions() foreach (var id in keepAliveIds) { - var response = await _client.KeepAlive(id, new KeepAliveSettings + var response = await Client.KeepAlive(id, new KeepAliveSettings { - TransportTimeout = _config.KeepAliveTimeout, - OperationTimeout = _config.KeepAliveTimeout + TransportTimeout = Config.KeepAliveTimeout, + OperationTimeout = Config.KeepAliveTimeout }); if (response.Status.IsSuccess) { - _logger.LogTrace($"Successful keepalive for session: {id}"); + Logger.LogTrace($"Successful keepalive for session: {id}"); - lock (_lock) + lock (Lock) { - SessionState? sessionState; - if (_sessions.TryGetValue(id, out sessionState)) + if (Sessions.TryGetValue(id, out var sessionState)) { sessionState.LastAccessTime = DateTime.Now; } @@ -264,67 +124,27 @@ private async Task CheckSessions() } else if (response.Status.StatusCode == StatusCode.BadSession) { - _logger.LogInformation($"Session invalidated by keepalive: {id}"); + Logger.LogInformation($"Session invalidated by keepalive: {id}"); - lock (_lock) + lock (Lock) { - _sessions.Remove(id); + Sessions.Remove(id); } } else { - _logger.LogWarning($"Unsuccessful keepalive" + - $", session: {id}" + - $", status: {response.Status}"); + Logger.LogWarning("Unsuccessful keepalive" + + $", session: {id}" + + $", status: {response.Status}"); } } } - public void Dispose() - { - Dispose(true); - } - - private void Dispose(bool disposing) + private protected override async Task DeleteSession(string id) { - lock (_lock) + await Client.DeleteSession(id, new DeleteSessionSettings { - if (_disposed) - { - return; - } - - if (disposing) - { - var tasks = new Task[_sessions.Count]; - var i = 0; - foreach (var state in _sessions.Values) - { - _logger.LogTrace($"Closing session on session pool dispose: {state.Session.Id}"); - - var task = _client.DeleteSession(state.Session.Id, new DeleteSessionSettings - { - TransportTimeout = Session.DeleteSessionTimeout - }); - tasks[i++] = task; - } - - Task.WaitAll(tasks); - } - - _disposed = true; - } - } - - private class SessionState - { - public SessionState(Session session) - { - Session = session; - LastAccessTime = DateTime.Now; - } - - public Session Session { get; } - public DateTime LastAccessTime { get; set; } + TransportTimeout = SessionBase.DeleteSessionTimeout + }); } } diff --git a/src/Ydb.Sdk/src/Services/Table/TableClient.cs b/src/Ydb.Sdk/src/Services/Table/TableClient.cs index 51479858..c51a00a3 100644 --- a/src/Ydb.Sdk/src/Services/Table/TableClient.cs +++ b/src/Ydb.Sdk/src/Services/Table/TableClient.cs @@ -1,4 +1,5 @@ using Ydb.Sdk.Client; +using Ydb.Sdk.Services.Sessions; namespace Ydb.Sdk.Services.Table; @@ -15,7 +16,7 @@ public TableClientConfig( public partial class TableClient : ClientBase, IDisposable { - private readonly ISessionPool _sessionPool; + private readonly ISessionPool _sessionPool; private bool _disposed; public TableClient(Driver driver, TableClientConfig? config = null) @@ -28,7 +29,7 @@ public TableClient(Driver driver, TableClientConfig? config = null) config: config.SessionPoolConfig); } - internal TableClient(Driver driver, ISessionPool sessionPool) + internal TableClient(Driver driver, ISessionPool sessionPool) : base(driver) { _sessionPool = sessionPool; diff --git a/src/Ydb.Sdk/src/Status.cs b/src/Ydb.Sdk/src/Status.cs index e4074b7c..744d5858 100644 --- a/src/Ydb.Sdk/src/Status.cs +++ b/src/Ydb.Sdk/src/Status.cs @@ -185,6 +185,8 @@ public Status(StatusCode statusCode, string message) : this( { } + public static readonly Status Success = new(StatusCode.Success); + public bool IsSuccess => StatusCode == StatusCode.Success; public void EnsureSuccess() diff --git a/src/Ydb.Sdk/src/Value/ResultSet.cs b/src/Ydb.Sdk/src/Value/ResultSet.cs index 33f39851..56df8305 100644 --- a/src/Ydb.Sdk/src/Value/ResultSet.cs +++ b/src/Ydb.Sdk/src/Value/ResultSet.cs @@ -129,5 +129,7 @@ internal Row(Ydb.Value row, IReadOnlyList columns, IReadOnlyDictionary new(_columns[columnIndex].Type, _row.Items[columnIndex]); public YdbValue this[string columnName] => this[_columnsMap[columnName]]; + + internal int ColumnCount => _columns.Count; } } diff --git a/src/Ydb.Sdk/src/Ydb.Sdk.csproj b/src/Ydb.Sdk/src/Ydb.Sdk.csproj index 99dec4b8..497e106f 100644 --- a/src/Ydb.Sdk/src/Ydb.Sdk.csproj +++ b/src/Ydb.Sdk/src/Ydb.Sdk.csproj @@ -26,7 +26,7 @@ - + diff --git a/src/Ydb.Sdk/tests/Query/TestQueryIntegration.cs b/src/Ydb.Sdk/tests/Query/TestQueryIntegration.cs new file mode 100644 index 00000000..c4561e45 --- /dev/null +++ b/src/Ydb.Sdk/tests/Query/TestQueryIntegration.cs @@ -0,0 +1,328 @@ +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; +using Xunit; +using Ydb.Sdk.Client; +using Ydb.Sdk.Services.Query; +using Ydb.Sdk.Services.Table; +using Ydb.Sdk.Value; + +namespace Ydb.Sdk.Tests.Query; + +[Trait("Category", "Integration")] +public class TestQueryIntegration +{ + private readonly ILoggerFactory _loggerFactory; + + private readonly DriverConfig _driverConfig = new( + endpoint: "grpc://localhost:2136", + database: "/local" + ); + + public TestQueryIntegration() + { + _loggerFactory = Utils.GetLoggerFactory() ?? NullLoggerFactory.Instance; + _loggerFactory.CreateLogger(); + } + + // TODO uncomment when DDL support will be available on the server side + // [Fact] + // public async Task TestSchemeQuery() + // { + // await using var driver = await Driver.CreateInitialized(_driverConfig, _loggerFactory); + // using var client = new QueryClient(driver); + // + // var createResponse = await client.Exec("CREATE TABLE demo_table (id Int32, data Text, PRIMARY KEY(id));"); + // Assert.Equal(StatusCode.Success, createResponse.Status.StatusCode); + // var dropResponse = await client.Exec("DROP TABLE demo_table;", + // retrySettings: new RetrySettings { IsIdempotent = false }); + // Assert.Equal(StatusCode.Success, dropResponse.Status.StatusCode); + // } + + [Fact] + public async Task TestSimpleSelect() + { + await using var driver = await Driver.CreateInitialized(_driverConfig, _loggerFactory); + using var client = new QueryClient(driver); + + const string queryString = "SELECT 2 + 3 AS sum"; + + var responseQuery = await client.Query(queryString, async stream => + { + var rows = new List(); + await foreach (var part in stream) + { + Assert.Equal(StatusCode.Success, part.Status.StatusCode); + if (part.ResultSet != null) + { + rows.AddRange(part.ResultSet.Rows); + } + } + + return rows; + }); + + var responseAllRows = await client.ReadAllRows(queryString); + var responseSingleRow = await client.ReadSingleRow(queryString); // ReadOneRow + + Assert.Equal(StatusCode.Success, responseQuery.Status.StatusCode); + Assert.Equal(StatusCode.Success, responseAllRows.Status.StatusCode); + Assert.Equal(StatusCode.Success, responseSingleRow.Status.StatusCode); + Assert.NotNull(responseQuery.Result); + Assert.NotNull(responseSingleRow.Result); + Assert.NotNull(responseAllRows.Result); + Assert.Single(responseQuery.Result!); + Assert.Single(responseAllRows.Result!); + + var valueQuery = (int)responseQuery.Result!.First()["sum"]; + var valueReadAll = (int)responseAllRows.Result!.First()["sum"]; + var valueReadSingle = (int)responseSingleRow.Result!["sum"]; + + Assert.Equal(valueQuery, valueReadAll); + Assert.Equal(valueQuery, valueReadSingle); + Assert.Equal(5, valueQuery); + } + + + private record Entity(int Id, string Name, byte[] Payload, bool IsValid); + + private async Task InitEntityTable(TableClient client, string tableName) + { + var query = $@" + CREATE TABLE `{tableName}` ( + id Int32 NOT NULL, + name Utf8, + payload String, + is_valid Bool, + PRIMARY KEY (id) + )"; + + await ExecSchemeQueryOnTableClient(client, query); + } + + private async Task DropTable(TableClient client, string tableName) + { + var query = $"DROP TABLE `{tableName}`"; + await ExecSchemeQueryOnTableClient(client, query); + } + + private async Task ExecSchemeQueryOnTableClient(TableClient client, string query) + { + var response = await client.SessionExec( + async session => await session.ExecuteSchemeQuery(query) + ); + response.Status.EnsureSuccess(); + } + + [Fact] + public async Task TestSimpleCrud() + { + await using var driver = await Driver.CreateInitialized(_driverConfig, _loggerFactory); + using var client = new QueryClient(driver); + using var tableClient = new TableClient(driver); + + const string tableName = "crudTable"; + await InitEntityTable(tableClient, tableName); + + var entities = new List + { + new(1, "entity 1", Array.Empty(), true), + new(2, "entity 2", Array.Empty(), true), + new(3, "entity 3", new byte[] { 0x00, 0x22 }, true), + new(3, "duplicate", new byte[] { 0x00, 0x22 }, false), + new(5, "entity 5", new byte[] { 0x12, 0x23, 0x34, 0x45, 0x56 }, false) + }; + + const string upsertQuery = @$" + UPSERT INTO `{tableName}` (id, name, payload, is_valid) + VALUES ($id, $name, $payload, $is_valid) + "; + + foreach (var entity in entities) + { + var parameters = new Dictionary + { + { "$id", (YdbValue)entity.Id }, + { "$name", YdbValue.MakeUtf8(entity.Name) }, + { "$payload", YdbValue.MakeString(entity.Payload) }, + { "$is_valid", (YdbValue)entity.IsValid } + }; + var upsertResponse = await client.Exec(upsertQuery, parameters); + Assert.Equal(StatusCode.Success, upsertResponse.Status.StatusCode); + } + + var response = await client.DoTx(async tx => + { + const string selectQuery = @$" + SELECT * FROM {tableName} + ORDER BY name DESC + LIMIT 1;"; + var selectResponse = await tx.ReadSingleRow(selectQuery); + Assert.Equal(StatusCode.Success, selectResponse.Status.StatusCode); + + var entityId = selectResponse.Result!["id"]; + + const string deleteQuery = @$" + DELETE FROM {tableName} + WHERE id = $id + "; + + var deleteParameters = new Dictionary + { + { "$id", entityId } + }; + + var deleteResponse = await tx.Exec(deleteQuery, deleteParameters); + Assert.Equal(StatusCode.Success, deleteResponse.Status.StatusCode); + } + ); + Assert.Equal(StatusCode.Success, response.Status.StatusCode); + + await DropTable(tableClient, tableName); + } + + [Fact] + public async Task TestDoTxRollback() + { + await using var driver = await Driver.CreateInitialized(_driverConfig, _loggerFactory); + using var client = new QueryClient(driver); + + var response = await client.DoTx(_ => + { + var response = new ClientInternalErrorResponse("test rollback if status unsuccessful"); + response.EnsureSuccess(); + return Task.CompletedTask; + }); + Assert.Equal(StatusCode.ClientInternalError, response.Status.StatusCode); + + + response = await client.DoTx(_ => throw new ArithmeticException("2 + 2 = 5")); + Assert.Equal(StatusCode.ClientInternalError, response.Status.StatusCode); + } + + [Theory] + [InlineData(StatusCode.ClientInternalError, StatusCode.Success, 2, true)] + [InlineData(StatusCode.ClientInternalError, StatusCode.ClientInternalError, 1, false)] + [InlineData(StatusCode.InternalError, StatusCode.InternalError, 1, true)] + [InlineData(StatusCode.Aborted, StatusCode.Success, 2, false)] + public async Task TestIdempotency(StatusCode statusCode, StatusCode expectedStatusCode, int expectedAttempts, + bool isIdempotent) + { + await using var driver = await Driver.CreateInitialized(_driverConfig, _loggerFactory); + using var client = new QueryClient(driver); + + var attempts = 0; + var response = await client.Query("SELECT 1", async stream => + { + attempts += 1; + var rows = new List(); + await foreach (var part in stream) + { + if (part.ResultSet is not null) + { + rows.AddRange(part.ResultSet.Rows); + } + } + + if (attempts == 1) + { + throw new StatusUnsuccessfulException(new Status(statusCode, "test idempotency")); + } + + return rows; + }, + retrySettings: new RetrySettings { IsIdempotent = isIdempotent }); + + Assert.Equal(expectedStatusCode, response.Status.StatusCode); + Assert.Equal(expectedAttempts, attempts); + } + + [Fact] + public async Task TestReaders() + { + await using var driver = await Driver.CreateInitialized(_driverConfig, _loggerFactory); + using var queryClient = new QueryClient(driver); + using var tableClient = new TableClient(driver); + const string tableName = "readTable"; + await InitEntityTable(tableClient, tableName); + + + const string upsertQuery = @$" + UPSERT INTO `{tableName}` (id, name, payload, is_valid) + VALUES ($id, $name, $payload, $is_valid) + "; + var entities = new List + { + new(1, "entity 1", Array.Empty(), true), + new(2, "entity 2", Array.Empty(), true) + }; + foreach (var entity in entities) + { + var parameters = new Dictionary + { + { "$id", (YdbValue)entity.Id }, + { "$name", YdbValue.MakeUtf8(entity.Name) }, + { "$payload", YdbValue.MakeString(entity.Payload) }, + { "$is_valid", (YdbValue)entity.IsValid } + }; + var upsertResponse = await queryClient.Exec(upsertQuery, parameters); + Assert.Equal(StatusCode.Success, upsertResponse.Status.StatusCode); + } + + const string SelectMultipleResultSetsQuery = @$" + SELECT name FROM {tableName}; + SELECT * FROM {tableName} + "; + + const string SelectMultipleRowsQuery = @$" + SELECT * FROM {tableName} + "; + const string SelectSingleRowQuery = @$" + SELECT * FROM {tableName} LIMIT 1 + "; + const string SelectScalarQuery = @$" + SELECT name FROM {tableName} LIMIT 1 + "; + + { + var response = await queryClient.ReadAllResultSets(SelectMultipleResultSetsQuery); + Assert.Equal(StatusCode.Success, response.Status.StatusCode); + var resultSets = response.Result!; + Assert.Equal(2, resultSets.Count); + Assert.Equal(2, resultSets[0].Count); + } + + { + await Assert.ThrowsAsync(() => + queryClient.ReadAllRows(SelectMultipleResultSetsQuery)); + var response = await queryClient.ReadAllRows(SelectMultipleRowsQuery); + Assert.Equal(StatusCode.Success, response.Status.StatusCode); + var resultSet = response.Result; + Assert.NotNull(resultSet); + Assert.Equal(2, resultSet!.Count); + } + + { + await Assert.ThrowsAsync(() => + queryClient.ReadSingleRow(SelectMultipleResultSetsQuery)); + await Assert.ThrowsAsync(() => + queryClient.ReadSingleRow(SelectMultipleRowsQuery)); + var response = await queryClient.ReadSingleRow(SelectSingleRowQuery); + var resultSet = response.Result; + Assert.NotNull(resultSet); + } + + { + await Assert.ThrowsAsync(() => + queryClient.ReadScalar(SelectMultipleResultSetsQuery)); + await Assert.ThrowsAsync(() => + queryClient.ReadScalar(SelectMultipleRowsQuery)); + await Assert.ThrowsAsync(() => + queryClient.ReadScalar(SelectSingleRowQuery)); + var response = await queryClient.ReadScalar(SelectScalarQuery); + var resultSet = response.Result; + Assert.NotNull(resultSet); + } + + await DropTable(tableClient, tableName); + } +} diff --git a/src/Ydb.Sdk/tests/TestRetry.cs b/src/Ydb.Sdk/tests/TestRetry.cs new file mode 100644 index 00000000..10d39679 --- /dev/null +++ b/src/Ydb.Sdk/tests/TestRetry.cs @@ -0,0 +1,37 @@ +using Google.Protobuf.Collections; +using Xunit; +using Ydb.Issue; + +namespace Ydb.Sdk.Tests; + +[Trait("Category", "Unit")] +public class TestRetry +{ + private const StatusCode WrongStatusCode = (StatusCode)123456; // there is no status code with this value + + [Fact] + public void GetRetryRuleOutOfRange() + { + var retrySettings = new RetrySettings(); + foreach (var statusCode in (StatusCode[])Enum.GetValues(typeof(StatusCode))) + { + var exception = Record.Exception(() => retrySettings.GetRetryRule(statusCode)); + Assert.Null(exception); + } + + Assert.DoesNotContain(WrongStatusCode, (StatusCode[])Enum.GetValues(typeof(StatusCode))); + + Assert.Throws(() => { retrySettings.GetRetryRule(WrongStatusCode); }); + } + + [Fact] + public void ConvertWrongGrpcStatusCode() + { + Assert.DoesNotContain(WrongStatusCode, (StatusCode[])Enum.GetValues(typeof(StatusCode))); + + var status = Status.FromProto( + statusCode: (StatusIds.Types.StatusCode)WrongStatusCode, + new RepeatedField()); + Assert.Equal(StatusCode.Unspecified, status.StatusCode); + } +}